手头上有几十个 rtsp 链接,需要接受所有的 rtsp 视频流,并进行相应的处理。现有是使用 ffmpeg-python 库来接受 rtsp 流。 由于没有什么好的方法,所以只能用 python 的 Process 对象来管理。
class RTSPStream(Process): def __init__(self, queue, stream_id, stream_url): Process.__init__(self) self.q = q self.stream_id = stream_id self.stream_url = stream_url def get_hnw(self, url): probe = ffmpeg.probe(url, rtsp_transport='tcp') video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) width = video_stream['width'] height = video_stream['height'] return height, width def run(self): h, w = get_hnw(self.stream_url) assert h != 0 assert w != 0 out = ( ffmpeg .input(url, rtsp_transport='tcp') .output('pipe:', format='rawvideo', pix_fmt='bgr24', loglevel="quiet", r=1) .run_async(pipe_stdout=True) ) cnt_empty = 0 while True: in_bytes = out.stdout.read(h*w*3) if not in_bytes: cnt_empty += 1 if cnt_empty > 10: break sleep(0.1) continue cnt_empty = 0 frame = np.frombuffer(in_bytes, dtype=np.uint8).reshape(h, w, 3) data = { 'stream_id': self.stream_id, 'origin': frame } if not queue.full(): self.queue.put(data)
class DoSth(Process): def __init__(self, queue): Process.__init__(self) self.queue = queue def run(self): while True: if queue.empty(): sleep(0.05) continue data = queue.get() # do sth to ndarray
我会初始化一个 DoSth 进程对象,并根据不同的 rtsp 链接初始化多个 RTSPStream 进程对象。两种进程对象之间通过进程安全的 queue 连接。
这种方法,不太优雅,但没有 在处理 25 个 rtsp 流的时候还能正常运行,但当数量增加到 28 左右时,就无法达到预期:CPU 爆增至 100%、内存无限制增长……
所以,我应该用什么方法来同时接受这么多个 RTSP 视频流呢?
![]() | 1 eojessie 2020-07-22 15:39:27 +08:00 楼主有好的方法管理多个 ff 进程了么? |
2 mrhhhdx 2022-09-09 23:08:41 +08:00 减小 buffer 或者直接 nobuffer ,以及每一帧用完使用 del( ) ,或许能改善内存增长问题 |