|
- # -*- coding: utf-8 -*-
- from json import loads
- from time import time
- from traceback import format_exc
-
- import cv2
- import subprocess as sp
-
- import numpy as np
- from loguru import logger
- from common import Constant
- from exception.CustomerException import ServiceException
- from enums.ExceptionEnum import ExceptionType
-
-
- class Cv2Util:
- __slots__ = [
- 'pullUrl',
- 'pushUrl',
- 'orFilePath',
- 'aiFilePath',
- 'p',
- 'or_video_file',
- 'ai_video_file',
- 'fps',
- 'width',
- 'height',
- 'wh',
- 'h',
- 'w',
- 'all_frames',
- 'bit_rate',
- 'pull_p',
- 'requestId',
- 'p_push_retry_num',
- 'isGpu',
- 'read_w_h',
- 'context',
- 'p_push_time'
- ]
-
- def __init__(self, pullUrl=None, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None, context=None,
- gpu_ids=None):
- self.pullUrl = pullUrl
- self.pushUrl = pushUrl
- self.orFilePath = orFilePath
- self.aiFilePath = aiFilePath
- self.p = None
- self.or_video_file = None
- self.ai_video_file = None
- self.fps = None
- self.width = None
- self.height = None
- self.wh = None
- self.h = None
- self.w = None
- self.all_frames = None
- self.bit_rate = None
- self.pull_p = None
- self.requestId = requestId
- self.p_push_time = 0
- self.p_push_retry_num = 0
- self.isGpu = False
- self.read_w_h = None
- self.context = context
- if gpu_ids is not None and len(gpu_ids) > 0:
- self.isGpu = True
-
- def getFrameConfig(self, fps, width, height):
- if self.fps is None or self.width != width or self.height != height:
- self.fps = fps
- self.width = width
- self.height = height
- if width > Constant.width:
- self.h = int(self.height // 2)
- self.w = int(self.width // 2)
- else:
- self.h = int(self.height)
- self.w = int(self.width)
-
- def clear_video_info(self):
- self.fps = None
- self.width = None
- self.height = None
-
- '''
- 获取视频信息
- '''
-
- def get_video_info(self):
- try:
- if self.pullUrl is None or len(self.pullUrl) == 0:
- logger.error("拉流地址不能为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
- ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
- args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.pullUrl]
- p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
- out, err = p.communicate(timeout=20)
- if p.returncode != 0:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- probe = loads(out.decode('utf-8'))
- if probe is None or probe.get("streams") is None:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- # 视频大小
- # format = probe['format']
- # size = int(format['size'])/1024/1024
- video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
- if video_stream is None:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- width = video_stream.get('width')
- height = video_stream.get('height')
- nb_frames = video_stream.get('nb_frames')
- fps = video_stream.get('r_frame_rate')
- # duration = video_stream.get('duration')
- # bit_rate = video_stream.get('bit_rate')
- if width is not None and int(width) != 0 and height is not None and int(height) != 0:
- self.width = int(width)
- self.height = int(height)
- self.wh = self.width * self.height * 3
- if width > Constant.width:
- self.h = int(self.height // 2)
- self.w = int(self.width // 2)
- else:
- self.h = int(self.height)
- self.w = int(self.width)
- if nb_frames:
- self.all_frames = int(nb_frames)
- up, down = str(fps).split('/')
- self.fps = int(eval(up) / eval(down))
- if self.fps > 30:
- logger.info("获取视频FPS大于30帧, FPS:{}, requestId:{}", self.fps, self.requestId)
- self.fps = 30
- if self.fps < 25:
- logger.info("获取视频FPS小于25帧, FPS:{}, requestId:{}", self.fps, self.requestId)
- self.fps = 25
- # if duration:
- # self.duration = float(video_stream['duration'])
- # self.bit_rate = int(bit_rate) / 1000
- logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}, requestId:{}",
- self.width, self.height, self.fps, self.all_frames, self.requestId)
- except ServiceException as s:
- logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
- self.clear_video_info()
- raise s
- except Exception:
- logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), self.requestId)
- self.clear_video_info()
-
- '''
- 录屏任务获取视频信息
- '''
-
- def get_recording_video_info(self):
- try:
- video_info = 'ffprobe -show_format -show_streams -of json %s' % self.pullUrl
- p = sp.Popen(video_info, stdout=sp.PIPE, stderr=sp.PIPE, shell=True)
- out, err = p.communicate(timeout=17)
- if p.returncode != 0:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- probe = loads(out.decode('utf-8'))
- if probe is None or probe.get("streams") is None:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
- if video_stream is None:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- width = video_stream.get('width')
- height = video_stream.get('height')
- nb_frames = video_stream.get('nb_frames')
- fps = video_stream.get('r_frame_rate')
- if width and int(width) > 0:
- self.width = int(width)
- if height and int(height) > 0:
- self.height = int(height)
- if self.width and self.height:
- self.wh = int(width * height * 3)
- self.read_w_h = ([self.height, self.width, 3])
- if nb_frames and int(nb_frames) > 0:
- self.all_frames = int(nb_frames)
- if fps:
- up, down = str(fps).split('/')
- self.fps = int(eval(up) / eval(down))
- logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}, requestId:{}", self.width,
- self.height, self.fps, self.all_frames, self.requestId)
- except ServiceException as s:
- logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
- self.clear_video_info()
- raise s
- except Exception:
- logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), self.requestId)
- self.clear_video_info()
-
- def getRecordingFrameConfig(self, fps, width, height):
- self.fps = fps
- self.width = width
- self.height = height
-
- '''
- 录屏拉取视频
- '''
-
- def recording_pull_p(self):
- try:
- # 如果视频信息不存在, 不初始化拉流
- if self.checkconfig():
- return
- # 如果已经初始化, 不再初始化
- if self.pull_p:
- return
- command = ['ffmpeg -re', '-y', '-an'
- # '-hide_banner',
- ]
- if self.pullUrl.startswith('rtsp://'):
- command.extend(['-rtsp_transport', 'tcp'])
- if self.isGpu:
- command.extend(['-hwaccel', 'cuda'])
- command.extend(['-i', self.pullUrl,
- '-f', 'rawvideo',
- '-pix_fmt', 'bgr24',
- '-r', '25',
- '-'])
- self.pull_p = sp.Popen(command, stdout=sp.PIPE)
- except ServiceException as s:
- logger.exception("构建拉流管道异常: {}, requestId:{}", s.msg, self.requestId)
- self.clear_video_info()
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
- raise s
- except Exception as e:
- logger.error("构建拉流管道异常:{}, requestId:{}", e, self.requestId)
- self.clear_video_info()
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
-
- def recording_read(self):
- result = None
- try:
- self.recording_pull_p()
- in_bytes = self.pull_p.stdout.read(self.wh)
- if in_bytes is not None and len(in_bytes) > 0:
- try:
- result = np.frombuffer(in_bytes, np.uint8).reshape(self.read_w_h)
- except Exception:
- logger.error("视频格式异常:{}, requestId:{}", format_exc(), self.requestId)
- raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
- ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
- except ServiceException as s:
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
- raise s
- except Exception:
- logger.error("读流异常:{}, requestId:{}", format_exc(), self.requestId)
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
- return result
-
- '''
- 拉取视频
- '''
-
- def build_pull_p(self):
- try:
- command = ['ffmpeg']
- if self.pullUrl.startswith("rtsp://"):
- command.extend(['-rtsp_transport', 'tcp'])
- command.extend(['-re',
- '-y',
- '-an',
- # '-hwaccel', 'cuda', cuvid
- '-c:v', 'h264_cuvid',
- # '-resize', self.wah,
- '-i', self.pullUrl,
- '-f', 'rawvideo',
- '-pix_fmt', 'bgr24',
- '-r', '25',
- '-'])
- self.pull_p = sp.Popen(command, stdout=sp.PIPE)
- except ServiceException as s:
- logger.exception("构建拉流管道异常: {}, requestId:{}", s.msg, self.requestId)
- raise s
- except Exception as e:
- logger.error("构建拉流管道异常:{}, requestId:{}", format_exc(), self.requestId)
- self.clear_video_info()
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
-
- def checkconfig(self):
- if self.width is None or self.height is None or self.fps is None:
- return True
- return False
-
- def read(self):
- result = None
- try:
- if self.pull_p is None:
- self.build_pull_p()
- in_bytes = self.pull_p.stdout.read(self.wh)
- if in_bytes is not None and len(in_bytes) > 0:
- try:
- result = (np.frombuffer(in_bytes, np.uint8).reshape([self.height, self.width, 3]))
- # img = (np.frombuffer(in_bytes, np.uint8)).reshape((self.h, self.w))
- except Exception as ei:
- logger.error("视频格式异常:{}, requestId:{}", format_exc(), self.requestId)
- raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
- ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
- # result = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
- # result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
- if self.width > Constant.width:
- result = cv2.resize(result, (self.w, self.h), interpolation=cv2.INTER_LINEAR)
- except ServiceException as s:
- raise s
- except Exception as e:
- self.clear_video_info()
- if self.pull_p:
- logger.info("关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
- logger.error("读流异常:{}, requestId:{}", format_exc(), self.requestId)
- return result
-
- def close(self):
- self.clear_video_info()
- if self.pull_p:
- if self.pull_p.stdout:
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
- logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
- if self.p:
- if self.p.stdin:
- self.p.stdin.close()
- self.p.terminate()
- self.p.wait()
- self.p = None
- # self.p.communicate()
- # self.p.kill()
- logger.info("关闭管道完成, requestId:{}", self.requestId)
- if self.or_video_file:
- self.or_video_file.release()
- self.or_video_file = None
- logger.info("关闭原视频写入流完成, requestId:{}", self.requestId)
- if self.ai_video_file:
- self.ai_video_file.release()
- self.ai_video_file = None
- logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
-
- # 构建 cv2
- # def build_cv2(self):
- # try:
- # if self.cap is not None:
- # logger.info("重试, 关闭cap, requestId:{}", self.requestId)
- # self.cap.release()
- # if self.p is not None:
- # logger.info("重试, 关闭管道, requestId:{}", self.requestId)
- # self.p.stdin.close()
- # self.p.terminate()
- # self.p.wait()
- # if self.pullUrl is None:
- # logger.error("拉流地址不能为空, requestId:{}", self.requestId)
- # raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
- # ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
- # if self.pushUrl is None:
- # logger.error("推流地址不能为空, requestId:{}", self.requestId)
- # raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
- # ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
- # self.cap = cv2.VideoCapture(self.pullUrl)
- # if self.fps is None or self.fps == 0:
- # self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
- # if self.width is None or self.width == 0:
- # self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- # if self.height is None or self.height == 0:
- # self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # command = ['/usr/bin/ffmpeg',
- # # '-y', # 不经过确认,输出时直接覆盖同名文件。
- # '-f', 'rawvideo',
- # '-vcodec', 'rawvideo',
- # '-pix_fmt', 'bgr24', # 显示可用的像素格式
- # # '-s', "{}x{}".format(self.width * 2, self.height),
- # '-s', "{}x{}".format(int(self.width), int(self.height/2)),
- # # '-r', str(15),
- # '-i', '-', # 指定输入文件
- # '-g', '25',
- # '-b:v', '3000k',
- # '-tune', 'zerolatency', # 加速编码速度
- # '-c:v', 'libx264', # 指定视频编码器
- # '-sc_threshold', '0',
- # '-pix_fmt', 'yuv420p',
- # '-an',
- # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
- # '-f', 'flv',
- # self.pushUrl]
- # # 管道配置
- # logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
- # self.p = sp.Popen(command, stdin=sp.PIPE)
- # except ServiceException as s:
- # logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId)
- # raise s
- # except Exception as e:
- # logger.exception("初始化cv2异常:{}, requestId:{}", e, self.requestId)
-
- # 构建 cv2
- def build_p(self):
- try:
- if self.pushUrl is None or len(self.pushUrl) == 0:
- logger.error("推流地址不能为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
- ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
- command = ['ffmpeg',
- # '-loglevel', 'debug',
- '-re',
- '-y',
- "-an",
- '-f', 'rawvideo',
- '-vcodec', 'rawvideo',
- '-pix_fmt', 'bgr24',
- '-thread_queue_size', '1024',
- '-s', "{}x{}".format(self.w * 2, self.h),
- '-i', '-', # 指定输入文件
- '-r', str(25),
- '-g', str(25),
- '-maxrate', '6000k',
- # '-profile:v', 'high',
- '-b:v', '5000k',
- # '-crf', '18',
- # '-rc:v', 'vbr',
- # '-cq:v', '25',
- # '-qmin', '25',
- # '-qmax', '25',
- '-c:v', 'h264_nvenc', #
- '-bufsize', '5000k',
- # '-c:v', 'libx264', # 指定视频编码器
- # '-tune', 'zerolatency', # 加速编码速度
- # '-sc_threshold', '0',
- '-pix_fmt', 'yuv420p',
- # '-flvflags', 'no_duration_filesize',
- # '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- '-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- '-tune', 'll',
- '-f', 'flv',
- self.pushUrl]
- logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width,
- self.requestId)
- self.p = sp.Popen(command, stdin=sp.PIPE, shell=False)
- except ServiceException as s:
- if self.p:
- if self.p.stdin:
- self.p.stdin.close()
- self.p.terminate()
- self.p.wait()
- logger.exception("构建p管道异常: {}, requestId:{}", s.msg, self.requestId)
- raise s
- except Exception as e:
- if self.p:
- if self.p.stdin:
- self.p.stdin.close()
- self.p.terminate()
- self.p.wait()
- logger.error("初始化p管道异常:{}, requestId:{}", format_exc(), self.requestId)
-
- def push_stream(self, frame):
- current_retry_num = 0
- while True:
- try:
- if self.p is None:
- self.build_p()
- self.p.stdin.write(frame.tostring())
- break
- except ServiceException as s:
- raise s
- except Exception as ex:
- if self.p_push_time == 0:
- self.p_push_time = time.time()
- if time.time() - self.p_push_time < 2:
- self.p_push_retry_num += 1
- self.p_push_time = time.time()
- if time.time() - self.p_push_time > 60:
- self.p_push_retry_num = 0
- self.p_push_time = time.time()
- logger.error("推流管道异常:{}, requestId: {}", format_exc(), self.requestId)
- if self.p:
- try:
- if self.p.stdin:
- self.p.stdin.close()
- self.p.terminate()
- self.p.wait()
- except:
- logger.error("推流管道异常:{}, requestId: {}", format_exc(), self.requestId)
- self.p = None
- current_retry_num += 1
- if self.p_push_retry_num > 100:
- logger.error("推流进管道异常:{}, requestId: {}", format_exc(), self.requestId)
- raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
- ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
- if current_retry_num > 3:
- logger.error("推流进管道异常:{}, requestId: {}", format_exc(), self.requestId)
- raise ServiceException(ExceptionType.PUSH_STREAM_EXCEPTION.value[0],
- ExceptionType.PUSH_STREAM_EXCEPTION.value[1])
-
- def build_or_write(self):
- try:
- if self.orFilePath is not None and self.or_video_file is None:
- self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25,
- (self.w, self.h))
- # self.or_video_file.set(cv2.CAP_PROP_BITRATE, 5000)
- if self.or_video_file is None:
- logger.error("or_video_file为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- except ServiceException as s:
- if self.or_video_file:
- self.or_video_file.release()
- self.or_video_file = None
- logger.error("构建OR文件写对象异常: {}, requestId:{}", s.msg, self.requestId)
- raise s
- except Exception as e:
- if self.or_video_file:
- self.or_video_file.release()
- self.or_video_file = None
- logger.error("构建OR文件写对象异常: {}, requestId:{}", format_exc(), self.requestId)
- raise e
- except:
- if self.or_video_file:
- self.or_video_file.release()
- self.or_video_file = None
- logger.exception("构建OR文件写对象异常:{}, requestId:{}", format_exc(), self.requestId)
- raise Exception("构建OR文件写对象异常")
-
- def build_ai_write(self):
- try:
- if self.aiFilePath is not None and self.ai_video_file is None:
- self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25,
- (self.w * 2, self.h))
- # self.ai_video_file.set(cv2.CAP_PROP_BITRATE, 5000)
- if self.ai_video_file is None:
- logger.error("ai_video_file为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- except ServiceException as s:
- if self.ai_video_file:
- self.ai_video_file.release()
- self.ai_video_file = None
- logger.error("构建AI文件写对象异常: {}, requestId:{}", s.msg, self.requestId)
- raise s
- except Exception as e:
- if self.ai_video_file:
- self.ai_video_file.release()
- self.ai_video_file = None
- logger.error("构建AI文件写对象异常: {}, requestId:{}", format_exc(), self.requestId)
- raise e
- except:
- if self.ai_video_file:
- self.ai_video_file.release()
- self.ai_video_file = None
- logger.error("构建AI文件写对象异常:{}, requestId:{}", format_exc(), self.requestId)
- raise Exception("构建AI文件写对象异常")
-
- def video_or_write(self, frame):
- ai_retry_num = 0
- while True:
- try:
- if self.or_video_file is None:
- self.build_or_write()
- self.or_video_file.write(frame)
- break
- except ServiceException as s:
- raise s
- except Exception as ex:
- if ai_retry_num > 3:
- logger.error("重新写入原视频视频到本地, 重试失败:{}, requestId: {}", format_exc(),
- self.requestId)
- raise ex
- finally:
- ai_retry_num += 1
-
- def video_ai_write(self, frame):
- ai_retry_num = 0
- while True:
- try:
- if self.ai_video_file is None:
- self.build_ai_write()
- self.ai_video_file.write(frame)
- break
- except ServiceException as s:
- raise s
- except Exception as ex:
- if ai_retry_num > 3:
- logger.exception("重新写入分析后的视频到本地,重试失败:{}, requestId: {}", format_exc(),
- self.requestId)
- raise ex
- finally:
- ai_retry_num += 1
-
- def video_merge(self, frame1, frame2):
- # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
- # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
- # frame_merge = np.hstack((frameLeft, frameRight))
- frame_merge = np.hstack((frame1, frame2))
- return frame_merge
-
- def getP(self):
- if self.p is None:
- logger.error("获取管道为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return self.p
-
- def getOrVideoFile(self):
- if self.or_video_file is None:
- logger.error("获取原视频写入对象为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return self.or_video_file
-
- def getAiVideoFile(self):
- if self.ai_video_file is None:
- logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return self.ai_video_file
-
-
- def check_video_stream(width, height):
- if width is None or height is None:
- return True
- return False
-
-
- def build_video_info(pull_url, requestId):
- try:
- if pull_url is None or len(pull_url) == 0:
- logger.error("拉流地址不能为空, requestId:{}", requestId)
- raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
- ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
- pp = sp.Popen(['ffprobe', '-show_format', '-show_streams', '-of', 'json', pull_url], stdout=sp.PIPE,
- stderr=sp.PIPE)
- out, err = pp.communicate(timeout=18)
- if pp.returncode != 0:
- logger.error("获取视频信息失败: {}, requestId:{}", err.decode('utf-8'), requestId)
- raise Exception("未获取视频信息!!!!")
- probe = loads(out.decode('utf-8'))
- video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
- if video_stream is None:
- raise Exception("未获取视频信息!!!!")
- width_new, height_new = video_stream.get('width'), video_stream.get('height')
- nb_frames = video_stream.get('nb_frames', 0)
- duration = video_stream.get('duration')
- if duration is not None and float(duration) != float(0):
- nb_frames = int(float(duration) * 25)
- if width_new is not None and int(width_new) != 0 and height_new is not None and int(height_new) != 0:
- height_o = int(height_new)
- width, height = int(width_new), height_o * 3 // 2
- width_height_3 = width * height
- all_frames = int(nb_frames)
- w_2, h_2 = width, height_o
- if width > Constant.width:
- w_2, h_2 = width // 2, height_o // 2
- logger.info("视频信息, width:{}|height:{}|all_frames:{}, requestId:{}", width, height_o, all_frames,
- requestId)
- return width, height, width_height_3, all_frames, w_2, h_2
- raise Exception("未获取视频信息!!!!")
- except ServiceException as s:
- logger.error("获取视频信息异常: {}, requestId:{}", s.msg, requestId)
- raise s
- except Exception:
- logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), requestId)
- return None, None, None, 0, None, None
-
-
- def build_video_info2(pull_url, requestId):
- try:
- pp = sp.Popen(['ffprobe', '-show_format', '-show_streams', '-of', 'json', pull_url], stdout=sp.PIPE,
- stderr=sp.PIPE)
- out, err = pp.communicate(timeout=17)
- if pp.returncode != 0:
- logger.error("获取视频信息失败: {}, requestId:{}", err.decode('utf-8'), requestId)
- raise Exception("未获取视频信息!!!!")
- probe = loads(out.decode('utf-8'))
- video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
- if video_stream is None:
- raise Exception("未获取视频信息!!!!")
- width_new, height_new = video_stream.get('width'), video_stream.get('height')
- nb_frames = video_stream.get('nb_frames', 0)
- # fps = video_stream.get('r_frame_rate')
- duration = video_stream.get('duration')
- if duration is not None and float(duration) != float(0):
- nb_frames = int(float(duration) * 25)
- # bit_rate = video_stream.get('bit_rate')
- if width_new is not None and int(width_new) != 0 and height_new is not None and int(height_new) != 0:
- width_o, height_o = int(width_new), int(height_new)
- # width_height_3 = width * height * 3
- width_height_3 = width_o * height_o * 3 // 2
- width, height, all_frames = width_o, height_o * 3 // 2, int(nb_frames)
- w, h = width_o, height_o
- if width > Constant.width:
- w, h = width_o // 2, height_o // 2
- # up, down = str(fps).split('/')
- # self.fps = int(eval(up) / eval(down))
- # if duration:
- # self.duration = float(video_stream['duration'])
- # self.bit_rate = int(bit_rate) / 1000
- logger.info("视频信息, width:{}|height:{}|all_frames:{}, requestId:{}", width_o, height_o, all_frames,
- requestId)
- return width, height, width_height_3, all_frames, w, h
- raise Exception("未获取视频信息!!!!")
- except ServiceException as s:
- logger.error("获取视频信息异常: {}, requestId:{}", s.msg, requestId)
- raise s
- except Exception:
- logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), requestId)
- return None, None, None, 0, None, None
-
-
- def start_pull_p(pull_url, requestId):
- try:
- command = ['ffmpeg']
- if pull_url.startswith("rtsp://"):
- command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp'])
- if pull_url.startswith("http") or pull_url.startswith("rtmp"):
- command.extend(['-rw_timeout', '20000000'])
- command.extend(['-re',
- '-y',
- '-an',
- # '-hwaccel', 'cuda', cuvid
- '-c:v', 'h264_cuvid',
- # '-resize', self.wah,
- '-i', pull_url,
- '-f', 'rawvideo',
- # '-pix_fmt', 'bgr24',
- '-r', '25',
- '-'])
- return sp.Popen(command, stdout=sp.PIPE)
- except ServiceException as s:
- logger.error("构建拉流管道异常: {}, requestId:{}", s.msg, requestId)
- raise s
- except Exception as e:
- logger.error("构建拉流管道异常:{}, requestId:{}", format_exc(), requestId)
- raise e
-
-
- def clear_pull_p(pull_p, requestId):
- try:
- if pull_p and pull_p.poll() is None:
- logger.info("关闭拉流管道, requestId:{}", requestId)
- if pull_p.stdout:
- pull_p.stdout.close()
- pull_p.terminate()
- pull_p.wait(timeout=30)
- logger.info("拉流管道已关闭, requestId:{}", requestId)
- except Exception as e:
- logger.error("关闭拉流管道异常: {}, requestId:{}", format_exc(), requestId)
- if pull_p and pull_p.poll() is None:
- pull_p.kill()
- pull_p.wait(timeout=30)
- raise e
-
-
- def pull_read_video_stream(pull_p, pull_url, width, height, width_height_3, w_2, h_2, requestId):
- result = None
- try:
- if pull_p is None:
- pull_p = start_pull_p(pull_url, requestId)
- in_bytes = pull_p.stdout.read(width_height_3)
- if in_bytes is not None and len(in_bytes) > 0:
- try:
- # result = (np.frombuffer(in_bytes, np.uint8).reshape([height * 3 // 2, width, 3]))
- result = (np.frombuffer(in_bytes, np.uint8)).reshape((height, width))
- result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
- # result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
- if width > Constant.width:
- result = cv2.resize(result, (w_2, h_2), interpolation=cv2.INTER_LINEAR)
- except Exception:
- logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
- raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
- ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
- except ServiceException as s:
- clear_pull_p(pull_p, requestId)
- raise s
- except Exception:
- clear_pull_p(pull_p, requestId)
- pull_p, width, height = None, None, None
- logger.error("读流异常:{}, requestId:{}", format_exc(), requestId)
- return result, pull_p, width, height
-
-
- def pull_read_video_stream2(pull_p, pull_url, width, height, width_height_3, w, h, requestId):
- result = None
- try:
- if pull_p is None:
- pull_p = start_pull_p(pull_url, requestId)
- in_bytes = pull_p.stdout.read(width_height_3)
- if in_bytes is not None and len(in_bytes) > 0:
- try:
- result = (np.frombuffer(in_bytes, np.uint8)).reshape((height, width))
- result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
- if width > Constant.width:
- result = cv2.resize(result, (w, h), interpolation=cv2.INTER_LINEAR)
- except Exception:
- logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
- raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
- ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
- except ServiceException as s:
- clear_pull_p(pull_p, requestId)
- raise s
- except Exception:
- clear_pull_p(pull_p, requestId)
- pull_p, width, height = None, None, None
- logger.error("读流异常:{}, requestId:{}", format_exc(), requestId)
- return result, pull_p, width, height
-
-
- def video_conjuncing(frame1, frame2):
- # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
- # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
- # frame_merge = np.hstack((frameLeft, frameRight))
- frame_merge = np.hstack((frame1, frame2))
- return frame_merge
-
-
- def build_push_p(push_url, width, height, requestId):
- push_p = None
- try:
- if push_url is None or len(push_url) == 0:
- logger.error("推流地址不能为空, requestId:{}", requestId)
- raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
- ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
- command = ['ffmpeg',
- # '-loglevel', 'debug',
- # '-re',
- '-y',
- "-an",
- '-f', 'rawvideo',
- '-vcodec', 'rawvideo',
- '-pix_fmt', 'bgr24',
- '-thread_queue_size', '1024',
- '-s', "{}x{}".format(width, height),
- '-i', '-', # 指定输入文件
- '-r', str(25),
- '-g', str(25),
- '-maxrate', '6000k',
- # '-profile:v', 'high',
- '-b:v', '4000k',
- # '-crf', '18',
- # '-rc:v', 'vbr',
- # '-cq:v', '25',
- # '-qmin', '25',
- # '-qmax', '25',
- '-c:v', 'h264_nvenc', #
- '-bufsize', '4000k',
- # '-c:v', 'libx264', # 指定视频编码器
- # '-tune', 'zerolatency', # 加速编码速度
- # '-sc_threshold', '0',
- # '-rc', 'cbr_ld_hq',
- # '-zerolatency', '1',
- '-pix_fmt', 'yuv420p',
- # '-flvflags', 'no_duration_filesize',
- # '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- '-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- '-tune', 'll',
- '-f', 'flv',
- push_url]
- logger.info("height:{}|width:{}|requestId:{}", height, width, requestId)
- push_p = sp.Popen(command, stdin=sp.PIPE, shell=False)
- return push_p
- except ServiceException as s:
- if push_p:
- if push_p.stdin:
- push_p.stdin.close()
- push_p.terminate()
- push_p.wait()
- logger.error("构建p管道异常: {}, requestId:{}", s.msg, requestId)
- raise s
- except Exception as e:
- if push_p:
- if push_p.stdin:
- push_p.stdin.close()
- push_p.terminate()
- push_p.wait()
- logger.error("初始化p管道异常:{}, requestId:{}", format_exc(), requestId)
- raise e
-
-
- def push_video_stream(frame, push_p, push_url, p_push_status, requestId):
- """
- :param frame: 当前视频帧
- :param push_p: 推流管道
- :param push_url: 推流地址
- :param p_push_status: 控制异常控制
- :param requestId: 请求id
- :return: 推流管道
- """
- try:
- if push_p is None:
- height, width = frame.shape[0:2]
- push_p = build_push_p(push_url, width, height, requestId)
- push_p.stdin.write(frame.tostring())
- return push_p
- except ServiceException as s:
- clear_push_p(push_p, requestId)
- raise s
- except Exception:
- if p_push_status[0] == 0:
- p_push_status[0] = time()
- p_push_status[1] += 1
- elif time() - p_push_status[0] <= 60:
- p_push_status[1] += 1
- p_push_status[0] = time()
- elif time() - p_push_status[0] > 60:
- p_push_status[1] = 1
- p_push_status[0] = time()
- logger.error("推流管道异常:{}, requestId: {}", format_exc(), requestId)
- clear_push_p(push_p, requestId)
- if p_push_status[1] > 300:
- logger.error("推流进管道异常:{}, requestId: {}", format_exc(), requestId)
- raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
- ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
- return None
-
-
- def clear_push_p(push_p, requestId):
- if push_p:
- try:
- if push_p.stdin:
- push_p.stdin.close()
- push_p.terminate()
- push_p.wait()
- except Exception:
- logger.error("推流管道异常:{}, requestId: {}", format_exc(), requestId)
-
-
- def close_or_write_stream(or_video_file, requestId):
- try:
- if or_video_file:
- or_video_file.release()
- except Exception:
- logger.info("关闭原视频写流管道异常:{}, requestId:{}", format_exc(), requestId)
-
-
- def close_ai_write_stream(ai_video_file, requestId):
- try:
- if ai_video_file:
- ai_video_file.release()
- except Exception:
- logger.info("关闭AI视频写流管道异常:{}, requestId:{}", format_exc(), requestId)
-
-
- def close_all_p(push_p, or_video_file, ai_video_file, requestId):
- logger.info("开始停止推流、写流管道!requestId:{}", requestId)
- clear_push_p(push_p, requestId)
- close_or_write_stream(or_video_file, requestId)
- close_ai_write_stream(ai_video_file, requestId)
- logger.info("停止推流、写流管道完成!requestId:{}", requestId)
-
-
- def build_or_video(orFilePath, width, height, requestId):
- or_video_file = None
- try:
- or_video_file = cv2.VideoWriter(orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
- if or_video_file is None:
- logger.error("or_video_file为空, requestId:{}", requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return or_video_file
- except ServiceException as s:
- if or_video_file:
- or_video_file.release()
- logger.error("构建OR文件写对象异常: {}, requestId:{}", s.msg, requestId)
- raise s
- except Exception as e:
- if or_video_file:
- or_video_file.release()
- logger.error("构建OR文件写对象异常: {}, requestId:{}", format_exc(), requestId)
- raise e
-
-
- def write_or_video(frame, orFilePath, or_video_file, or_write_status, requestId):
- """
- :param frame: 当时视频帧
- :param orFilePath: 原视频名称
- :param or_video_file: 原视频本地写流对象
- :param or_write_status: 原视频写流状态检查 [0, 0] 第一个参数时间,第二个参数重试的次数
- :param requestId: 请求id
- :return: 原视频本地写流对象
- :desc 如果写流失败了, 不重试, 丢弃本次视频帧
- """
- try:
- if or_video_file is None:
- height, width = frame.shape[0], frame.shape[1]
- or_video_file = build_or_video(orFilePath, width, height, requestId)
- or_video_file.write(frame)
- return or_video_file
- except ServiceException as s:
- if or_video_file:
- or_video_file.release()
- raise s
- except Exception as ex:
- # 当第一次写视频帧到本地失败, 更新or_write_status的时间和重试次数
- if or_write_status[0] == 0:
- or_write_status[0] = time()
- or_write_status[1] += 1
- # 1分钟内失败重试次数更新, 1分钟中容忍小于5次的失败次数
- elif time() - or_write_status[0] <= 60:
- or_write_status[1] += 1
- or_write_status[0] = time()
- # 大于1分钟初始化检查数组
- elif time() - or_write_status[0] > 60:
- or_write_status[1] = 1
- or_write_status[0] = time()
- if or_write_status[1] > 5:
- if or_video_file:
- or_video_file.release()
- logger.error("重新写入原视频视频到本地, 重试失败:{}, requestId: {}", format_exc(), requestId)
- raise ex
- return or_video_file
-
-
- def build_ai_video(aiFilePath, width, height, requestId):
- ai_video_file = None
- try:
- ai_video_file = cv2.VideoWriter(aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
- if ai_video_file is None:
- logger.error("ai_video_file为空, requestId:{}", requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return ai_video_file
- except ServiceException as s:
- if ai_video_file:
- ai_video_file.release()
- logger.error("构建AI文件写对象异常: {}, requestId:{}", s.msg, requestId)
- raise s
- except Exception as e:
- if ai_video_file:
- ai_video_file.release()
- logger.error("构建AI文件写对象异常: {}, requestId:{}", format_exc(), requestId)
- raise e
-
-
- def write_ai_video(frame, aiFilePath, ai_video_file, ai_write_status, requestId):
- try:
- if ai_video_file is None:
- height, width = frame.shape[0], frame.shape[1]
- ai_video_file = build_ai_video(aiFilePath, width, height, requestId)
- ai_video_file.write(frame)
- return ai_video_file
- except ServiceException as s:
- if ai_video_file:
- ai_video_file.release()
- raise s
- except Exception as ex:
- if ai_write_status[0] == 0:
- ai_write_status[0] = time()
- ai_write_status[1] += 1
- # 大于1分钟初始化检查数组
- elif time() - ai_write_status[0] > 60:
- ai_write_status[1] = 1
- ai_write_status[0] = time()
- # 1分钟内失败重试次数更新, 1分钟中容忍小于5次的失败次数
- elif time() - ai_write_status[0] <= 60:
- ai_write_status[1] += 1
- ai_write_status[0] = time()
- if ai_write_status[1] > 5:
- if ai_video_file:
- ai_video_file.release()
- logger.error("重新写入分析后的视频到本地,重试失败:{}, requestId: {}", format_exc(), requestId)
- raise ex
- return ai_video_file
|