# -*- coding: utf-8 -*- import json import subprocess as sp import time import cv2 import numpy as np from loguru import logger from alg_airport_ffmpeg.enums.ExceptionEnum import ExceptionType from alg_airport_ffmpeg.exception.CustomerException import ServiceException from alg_airport_ffmpeg.concurrency.CommonThread import Common """ 推流工具 """ class Cv2Util: def __init__(self, pullUrl=None, pushUrl=None): self.__pullUrl = pullUrl self.__pushUrl = pushUrl self.__push_stream = None self.__pull_stream = None self.__width = None self.__height = None self.__wh = None self.__fps = None self.__cap = None def probe(self): p = None try: args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.__pullUrl] p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True) out, err = p.communicate(timeout=7) if p.returncode != 0: # logger.error("获取视频信息异常: {}", err.stderr.decode(encoding='utf-8')) return None return json.loads(out.decode('utf-8')) except Exception as e: # logger.error("获取视频信息异常: {}", e) return None finally: if p: # if p.stdout: # p.stdout.flush() # p.stdout.close() # if p.stderr: # p.stderr.close() p.terminate() # parent_proc = psutil.Process(p.pid) # for child_proc in parent_proc.children(recursive=True): # child_proc.kill() # parent_proc.kill() # p.kill() p.wait() # logger.info("关闭获取视频管道完成!") # 获取视频信息 def get_video_info(self): try: if self.__pullUrl is None or len(self.__pullUrl) == 0: raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) probe = self.probe() if probe is None or probe.get("streams") is None: return video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None) if video_stream is None: return width = video_stream.get('width') height = video_stream.get('height') fps = video_stream.get('r_frame_rate') self.__width = int(width) self.__height = int(height) self.__wh = int(width * height * 3) up, down = str(fps).split('/') self.__fps = int(eval(up) / eval(down)) logger.info("视频信息, width:{}|height:{}|fps:{}", self.__width, self.__height, self.__fps) except ServiceException as s: # logger.error("获取视频信息异常: {}", s.msg) raise s except Exception as e: # logger.error("获取视频信息异常:{}", e) raise e def build_cap(self, args): try: pullUrl = args[0] return cv2.VideoCapture(pullUrl) except Exception as e: logger.error("初始化cap异常: {}", e) return None # 构建 cv2 def build_cv2(self): try: if self.__cap is not None: # logger.info("重试, 关闭cap") self.__cap.release() self.__cap = None self.__fps = None self.__width = None self.__height = None if self.__pullUrl is None or len(self.__pullUrl) == 0: raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) cap_thread = Common(timeout=7, func=self.build_cap, args=(self.__pullUrl,)) cap_thread.setDaemon(True) cap_thread.start() self.__cap = cap_thread.get_result() if self.__cap is None: return if self.__cap.isOpened(): if self.__fps is None or self.__fps == 0: self.__fps = int(self.__cap.get(cv2.CAP_PROP_FPS)) if self.__width is None or self.__width == 0: self.__width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH)) if self.__height is None or self.__height == 0: self.__height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) logger.info("fps:{}|height:{}|width:{}", self.__fps, self.__height, self.__width) except ServiceException as s: logger.error("构建cv2异常: {}", s.msg) raise s except Exception as e: logger.error("初始化cv2异常:{}", e) raise e def cv2_read(self): result = None try: if self.__cap is None: self.build_cv2() if self.__cap.isOpened(): ret, frame = self.__cap.read() if ret: result = frame del ret del frame except ServiceException as s: raise s except Exception as e: logger.error("读流异常:{}", e) raise e finally: if result is None: self.__fps = None self.__height = None self.__width = None if self.__cap: self.__cap.release() self.__cap = None logger.info("关闭cv2!") return result # 拉取视频 def build_pull_stream(self): try: if self.__pullUrl is None or len(self.__pullUrl) == 0: logger.error("拉流地址不能为空!") raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) if self.__pull_stream: logger.info("重试, 关闭拉流管道") if self.__pull_stream.stdout: self.__pull_stream.stdout.close() self.__pull_stream.terminate() self.__pull_stream.wait() command = ['ffmpeg', '-rtsp_transport', 'tcp', '-i', self.__pullUrl, '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-an', '-'] # command = ['ffmpeg', # '-re', # '-y', # '-c:v', 'h264_cuvid', # '-resize', self.wah, # '-i', self.pullUrl, # '-f', 'rawvideo', # '-an', # '-'] self.__pull_stream = sp.Popen(command, stdout=sp.PIPE) except ServiceException as s: logger.error("构建拉流管道异常: {}", s.msg) raise s except Exception as e: logger.error("构建拉流管道异常:{}", e) raise e def check_config(self): if self.__fps is None or self.__width is None or self.__height is None: return True else: return False def read(self): result = None try: if self.__pull_stream is None: self.build_pull_stream() in_bytes = self.__pull_stream.stdout.read(self.__wh) if in_bytes is not None and len(in_bytes) > 0: result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.__height), int(self.__width), 3])) except ServiceException as s: raise s except Exception as e: logger.error("读流异常:{}", e) raise e finally: if result is None: self.__fps = None self.__height = None self.__width = None if self.__pull_stream: if self.__pull_stream.stdout: self.__pull_stream.stdout.close() self.__pull_stream.terminate() self.__pull_stream.wait() logger.info("关闭拉流管道完成!") self.__pull_stream = None return result # 关闭管道 def close(self): if self.__pull_stream: if self.__pull_stream.stdout: self.__pull_stream.stdout.close() self.__pull_stream.terminate() self.__pull_stream.wait() logger.info("关闭拉流管道完成!") if self.__push_stream: if self.__push_stream.stdin: self.__push_stream.stdin.close() self.__push_stream.terminate() self.__push_stream.wait() logger.info("关闭推流管道完成!") if self.__cap: self.__cap.release() # 开始推流 def build_push_stream(self): try: if self.__push_stream: logger.info("重试, 关闭管道, 重新开启新管道") if self.__push_stream.stdin: self.__push_stream.stdin.close() self.__push_stream.terminate() self.__push_stream.wait() if self.__pushUrl is None or len(self.__pushUrl) == 0: logger.error("推流地址不能为空!") raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1]) command = ['ffmpeg', # '-loglevel', 'debug', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', "{}x{}".format(int(self.__width), int(self.__height)), '-r', str(self.__fps), '-i', '-', # '-g', str(self.__fps), # '-maxrate', '15000k', # '-minrate', '3000k', # '-profile:v', 'high', # '-level', '5.1', # '-b:v', '4000k', # '-crf', '26', # '-bufsize', '4000k', # '-c:v', 'libx264', # '-tune', 'zerolatency', # '-sc_threshold', '0', # '-pix_fmt', 'yuv420p', # "-an", # '-preset', 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 ] for url in self.__pushUrl: command.extend(['-f', 'flv', '-g', str(self.__fps), '-maxrate', '15000k', '-minrate', '3000k', '-b:v', '4000k', '-bufsize', '4000k', '-c:v', 'libx264', '-tune', 'zerolatency', '-sc_threshold', '0', '-pix_fmt', 'yuv420p', '-preset', 'fast', "-an", "-y", url ]) self.__push_stream = sp.Popen(command, stdin=sp.PIPE, shell=False) except ServiceException as s: logger.error("构建推流管道异常: {}", s.msg) raise s except Exception as e: logger.error("初始化推流管道异常:{}", e) raise e def push_stream_write(self, frame): try: if self.__push_stream is None: self.build_push_stream() self.__push_stream.stdin.write(frame.tostring()) except Exception as ex: logger.error("推流异常:{}", ex) current_retry_num = 0 while True: try: self.build_push_stream() self.__push_stream.stdin.write(frame.tostring()) logger.info("推流重试成功, 当前重试次数: {}", current_retry_num) break except Exception as e: current_retry_num += 1 logger.error("推流异常:{}, 开始重试, 当前重试次数:{}", e, current_retry_num) time.sleep(1) if current_retry_num > 1: raise Exception("推流异常,请检查通道是否被占用!") def close_push_stream(self): if self.__push_stream: self.__push_stream.terminate() self.__push_stream.wait() self.__push_stream = None def is_push_stream_ok(self): if self.__push_stream: if self.__push_stream.poll() is not None: return True return False # 开始推流 def start_push_stream(self): try: if self.__push_stream: return if self.__pullUrl is None or len(self.__pullUrl) == 0: logger.error("拉流地址不能为空!") raise ServiceException(ExceptionType.PUll_STREAM_URL_EXCEPTION.value[0], ExceptionType.PUll_STREAM_URL_EXCEPTION.value[1]) if self.__pushUrl is None or len(self.__pushUrl) == 0: logger.error("推流地址不能为空!") raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1]) command = ['ffmpeg', '-re', '-rtsp_transport', 'tcp', '-i', self.__pullUrl, ] for url in self.__pushUrl: command.extend(['-f', 'flv', '-g', str(25), '-c:v', 'copy', "-an", "-y", url ]) self.__push_stream = sp.Popen(command, shell=False) except ServiceException as s: logger.error("构建推流管道异常: {}", s.msg) raise s except Exception as e: logger.error("初始化推流管道异常:{}", e) raise e def is_video_stream(self, url): p = None try: if url is None or len(url) == 0: raise Exception("流地址不能为空!") args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', url] p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True) out, err = p.communicate(timeout=7) if p.returncode != 0: return False probe = json.loads(out.decode('utf-8')) if probe is None or probe.get("streams") is None: return False video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None) if video_stream is None: return False return True except ServiceException as s: raise s except Exception as e: return False finally: if p: p.terminate() p.wait() p = None