# -*- coding: utf-8 -*- import time import cv2 import subprocess as sp import ffmpeg import numpy as np from loguru import logger from exception.CustomerException import ServiceException from enums.ExceptionEnum import ExceptionType class Cv2Util(): def __init__(self, pullUrl, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None): self.pullUrl = pullUrl self.pushUrl = pushUrl self.orFilePath = orFilePath self.aiFilePath = aiFilePath self.cap = None self.p = None self.or_video_file = None self.ai_video_file = None self.fps = None self.width = None self.height = None self.wh = None self.all_frames = None self.bit_rate = None self.pull_p = None self.requestId = requestId self.p_push_retry_num = 0 ''' 获取视频信息 ''' def get_video_info(self): try: if self.pullUrl is None: logger.error("拉流地址不能为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) probe = ffmpeg.probe(self.pullUrl) # 视频大小 # format = probe['format'] # size = int(format['size'])/1024/1024 video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: logger.error("根据拉流地址未获取到视频流, requestId:{}", self.requestId) return width = video_stream.get('width') height = video_stream.get('height') nb_frames = video_stream.get('nb_frames') fps = video_stream.get('r_frame_rate') # duration = video_stream.get('duration') bit_rate = video_stream.get('bit_rate') if width: self.width = int(width) if height: self.height = int(height) if width is not None and height is not None: self.wh = int(width * height * 3) if nb_frames: self.all_frames = int(nb_frames) if fps: up, down = str(fps).split('/') self.fps = int(eval(up) / eval(down)) # if duration: # self.duration = float(video_stream['duration']) if bit_rate: self.bit_rate = int(bit_rate) / 1000 logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}|bit_rate:{}, requestId:{}", self.width, self.height, self.fps, self.all_frames, self.bit_rate, self.requestId) except ServiceException as s: logger.exception("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId) raise s except Exception as e: logger.exception("获取视频信息异常:{}, requestId:{}", e, self.requestId) ''' 拉取视频 ''' def build_pull_p(self): try: if self.width is None or self.height is None: return if self.pull_p: logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId) self.pull_p.stdout.close() self.pull_p.terminate() self.pull_p.wait() if self.pullUrl is None: logger.error("拉流地址不能为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) # command = ['ffmpeg', # # '-b:v', '3000k', # '-i', self.pullUrl, # '-f', 'rawvideo', # '-vcodec', 'rawvideo', # '-pix_fmt', 'bgr24', # # '-s', "{}x{}".format(int(width), int(height)), # '-an', # '-'] aa = {'loglevel': 'error', 'c:v': 'h264_cuvid'} process = ( ffmpeg .input(self.pullUrl, **aa) .output('pipe:', format='rawvideo', pix_fmt='bgr24', loglevel='error') .overwrite_output() .global_args('-an') .run_async(pipe_stdout=True) ) # self.pull_p = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE) self.pull_p = process except ServiceException as s: logger.exception("构建拉流管道异常: {}, requestId:{}", s, self.requestId) raise s except Exception as e: logger.exception("构建拉流管道异常:{}, requestId:{}", e, self.requestId) def checkconfig(self): if self.fps is None or self.width is None or self.height is None: return True return False def read(self): result = None try: # if self.pull_p is None: # logger.error("拉流管道为空, requestId:{}", self.requestId) # raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0], # ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1]) in_bytes = self.pull_p.stdout.read(self.wh) if in_bytes is not None and len(in_bytes) > 0: result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.height), int(self.width), 3])) result = cv2.resize(result, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR) except ServiceException as s: logger.exception("读流异常: {}, requestId:{}", s, self.requestId) except Exception as e: logger.exception("读流异常:{}, requestId:{}", e, self.requestId) return result def close(self): if self.pull_p: if self.pull_p.stdout: self.pull_p.stdout.close() self.pull_p.terminate() self.pull_p.wait() logger.info("关闭拉流管道完成, requestId:{}", self.requestId) if self.p: if self.p.stdin: self.p.stdin.close() self.p.terminate() self.p.wait() # self.p.communicate() # self.p.kill() logger.info("关闭管道完成, requestId:{}", self.requestId) if self.or_video_file: self.or_video_file.release() logger.info("关闭原视频写入流完成, requestId:{}", self.requestId) if self.ai_video_file: self.ai_video_file.release() logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId) # 构建 cv2 # def build_cv2(self): # try: # if self.cap is not None: # logger.info("重试, 关闭cap, requestId:{}", self.requestId) # self.cap.release() # if self.p is not None: # logger.info("重试, 关闭管道, requestId:{}", self.requestId) # self.p.stdin.close() # self.p.terminate() # self.p.wait() # if self.pullUrl is None: # logger.error("拉流地址不能为空, requestId:{}", self.requestId) # raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], # ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) # if self.pushUrl is None: # logger.error("推流地址不能为空, requestId:{}", self.requestId) # raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0], # ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1]) # self.cap = cv2.VideoCapture(self.pullUrl) # if self.fps is None or self.fps == 0: # self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) # if self.width is None or self.width == 0: # self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # if self.height is None or self.height == 0: # self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # command = ['/usr/bin/ffmpeg', # # '-y', # 不经过确认,输出时直接覆盖同名文件。 # '-f', 'rawvideo', # '-vcodec', 'rawvideo', # '-pix_fmt', 'bgr24', # 显示可用的像素格式 # # '-s', "{}x{}".format(self.width * 2, self.height), # '-s', "{}x{}".format(int(self.width), int(self.height/2)), # # '-r', str(15), # '-i', '-', # 指定输入文件 # '-g', '25', # '-b:v', '3000k', # '-tune', 'zerolatency', # 加速编码速度 # '-c:v', 'libx264', # 指定视频编码器 # '-sc_threshold', '0', # '-pix_fmt', 'yuv420p', # '-an', # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 # '-f', 'flv', # self.pushUrl] # # 管道配置 # logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId) # self.p = sp.Popen(command, stdin=sp.PIPE) # except ServiceException as s: # logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId) # raise s # except Exception as e: # logger.exception("初始化cv2异常:{}, requestId:{}", e, self.requestId) # 构建 cv2 def build_p(self): try: if self.p: logger.info("重试, 关闭管道, requestId:{}", self.requestId) if self.p.stdin: self.p.stdin.close() self.p.terminate() self.p.wait() # self.p.communicate() # self.p.kill() if self.pushUrl is None: logger.error("推流地址不能为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1]) command = ['ffmpeg', # '-loglevel', 'debug', '-y', # 不经过确认,输出时直接覆盖同名文件。 '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-thread_queue_size', '16', # '-s', "{}x{}".format(self.width * 2, self.height), '-s', "{}x{}".format(int(self.width), int(self.height / 2)), '-r', str(self.fps), '-i', '-', # 指定输入文件 '-g', str(self.fps), # '-maxrate', '15000k', # '-profile:v', 'high', # '-b:v', '4000k', # '-crf', '18', '-rc:v', 'vbr', '-cq:v', '30', '-qmin', '30', '-qmax', '30', '-c:v', 'h264_nvenc', # # '-bufsize', '4000k', # '-c:v', 'libx264', # 指定视频编码器 # '-tune', 'zerolatency', # 加速编码速度 # '-sc_threshold', '0', '-pix_fmt', 'yuv420p', "-an", # '-flvflags', 'no_duration_filesize', '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 '-f', 'flv', self.pushUrl] # command = 'ffmpeg -loglevel debug -y -f rawvideo -vcodec rawvideo -pix_fmt bgr24' +\ # ' -s ' + "{}x{}".format(int(self.width), int(self.height/2))\ # + ' -i - ' + '-g ' + str(self.fps)+\ # ' -b:v 6000k -tune zerolatency -c:v libx264 -pix_fmt yuv420p -preset ultrafast'+\ # ' -f flv ' + self.pushUrl # kwargs = {'format': 'rawvideo', # # 'vcodec': 'rawvideo', # 'pix_fmt': 'bgr24', # 's': '{}x{}'.format(int(self.width), int(self.height/2))} # out = { # 'r': str(self.fps), # 'g': str(self.fps), # 'b:v': '5500k', # 恒定码率 # # 'maxrate': '15000k', # # 'crf': '18', # 'bufsize': '5500k', # 'tune': 'zerolatency', # 加速编码速度 # 'c:v': 'libx264', # 指定视频编码器 # 'sc_threshold': '0', # 'pix_fmt': 'yuv420p', # # 'flvflags': 'no_duration_filesize', # 'preset': 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 # 'format': 'flv'} # 管道配置 # process2 = ( # ffmpeg # .input('pipe:', **kwargs) # .output(self.pushUrl, **out) # .global_args('-y', '-an') # .overwrite_output() # .run_async(pipe_stdin=True) # ) logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId) self.p = sp.Popen(command, stdin=sp.PIPE, shell=False) # self.p = process2 except ServiceException as s: logger.exception("构建p管道异常: {}, requestId:{}", s, self.requestId) raise s except Exception as e: logger.exception("初始化p管道异常:{}, requestId:{}", e, self.requestId) def push_stream(self, frame): if self.p is None: self.build_p() try: self.p.stdin.write(frame.tostring()) except Exception as ex: logger.exception("推流进管道异常:{}, requestId: {}", ex, self.requestId) current_retry_num = 0 while True: try: if self.p_push_retry_num > 20: logger.info("推流失败重试次数过多, 请检查相关配置信息, 当前重试次数: {}, requestId: {}", self.p_push_retry_num, self.requestId) current_retry_num = 4 break self.p_push_retry_num += 1 time.sleep(10) self.build_p() self.p.stdin.write(frame.tostring()) logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num, self.requestId) break except Exception as e: current_retry_num += 1 logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e, current_retry_num, self.requestId) if current_retry_num > 3: logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.requestId) break if current_retry_num > 3: raise ServiceException(ExceptionType.PUSH_STREAM_URL_E_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_URL_E_EXCEPTION.value[1]) def video_write(self, or_frame, ai_frame): try: self.build_write() if or_frame is not None and len(or_frame) > 0: self.or_video_file.write(or_frame) if ai_frame is not None and len(ai_frame) > 0: self.ai_video_file.write(ai_frame) except Exception as ex: ai_retry_num = 0 while True: try: if or_frame is not None and len(or_frame) > 0: self.or_video_file.write(or_frame) if ai_frame is not None and len(ai_frame) > 0: self.ai_video_file.write(ai_frame) logger.info("重新写入离线分析后视频到本地, 当前重试次数: {}, requestId: {}", ai_retry_num, self.requestId) break except Exception as e: ai_retry_num += 1 logger.exception("重新写入离线分析后视频到本地:{}, 开始重试, 当前重试次数:{}, requestId: {}", e, ai_retry_num, self.requestId) if ai_retry_num > 3: logger.exception("重新写入离线分析后视频到本地,重试失败:{}, requestId: {}", e, self.requestId) raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]) def build_write(self): try: if self.fps is None or self.width is None or self.height is None: raise ServiceException(ExceptionType.VIDEO_CONFIG_EXCEPTION.value[0], ExceptionType.VIDEO_CONFIG_EXCEPTION.value[1]) if self.orFilePath is not None and self.or_video_file is None: self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (int(self.width / 2), int(self.height / 2))) if self.or_video_file is None: raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0], ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1]) if self.aiFilePath is not None and self.ai_video_file is None: self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (int(self.width), int(self.height / 2))) if self.ai_video_file is None: raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0], ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1]) except ServiceException as s: logger.exception("构建文件写对象异常: {}, requestId:{}", s, self.requestId) raise s except Exception as e: logger.exception("构建文件写对象异常: {}, requestId:{}", e, self.requestId) raise e def video_merge(self, frame1, frame2): # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR) # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR) # frame_merge = np.hstack((frameLeft, frameRight)) frame_merge = np.hstack((frame1, frame2)) return frame_merge def getP(self): if self.p is None: logger.error("获取管道为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0], ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1]) return self.p def getCap(self): if self.cap is None: logger.error("获取cv2为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.CV2_IS_NULL_EXCEPTION.value[0], ExceptionType.CV2_IS_NULL_EXCEPTION.value[1]) return self.cap def getOrVideoFile(self): if self.or_video_file is None: logger.error("获取原视频写入对象为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0], ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1]) return self.or_video_file def getAiVideoFile(self): if self.ai_video_file is None: logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0], ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1]) return self.ai_video_file