|
- # -*- coding: utf-8 -*-
- import json
- import time
-
- import cv2
- import subprocess as sp
-
- import numpy as np
- from loguru import logger
- from exception.CustomerException import ServiceException
- from enums.ExceptionEnum import ExceptionType
-
-
- class Cv2Util():
-
- def __init__(self, pullUrl, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None):
- self.pullUrl = pullUrl
- self.pushUrl = pushUrl
- self.orFilePath = orFilePath
- self.aiFilePath = aiFilePath
- self.cap = None
- self.p = None
- self.or_video_file = None
- self.ai_video_file = None
- self.fps = None
- self.width = None
- self.height = None
- self.wah = None
- self.wh = None
- self.h = None
- self.hn = None
- self.w = None
- self.all_frames = None
- self.bit_rate = None
- self.pull_p = None
- self.requestId = requestId
- self.p_push_retry_num = 0
- self.resize_status = False
- self.current_frame = 0
-
- def getFrameConfig(self, fps, width, height):
- if self.fps is None:
- self.fps = fps
- self.width = width
- self.height = height
- if width > 1600:
- self.wh = int(width * height * 3 // 8)
- self.wah = '%sx%s' % (int(self.width / 2), int(self.height / 2))
- self.h = int(self.height * 3 // 4)
- self.w = int(self.width // 2)
- self.hn = int(self.height // 2)
- self.wn = int(self.width // 2)
- w_f = self.wh != width * height * 3 / 8
- h_f = self.h != self.height * 3 / 4
- wd_f = self.w != self.width / 2
- if w_f or h_f or wd_f:
- self.resize_status = True
- self.wh = int(width * height * 3 // 2)
- self.wah = '%sx%s' % (int(self.width), int(self.height))
- self.h = int(self.height * 3 // 2)
- self.w = int(self.width)
- else:
- self.wh = int(width * height * 3 // 2)
- self.wah = '%sx%s' % (int(self.width), int(self.height))
- self.h = int(self.height * 3 // 2)
- self.w = int(self.width)
- self.hn = int(self.height)
- self.wn = int(self.width)
-
- def clear_video_info(self):
- self.fps = None
- self.width = None
- self.height = None
-
-
- '''
- 获取视频信息
- '''
-
- def get_video_info(self):
- try:
- if self.pullUrl is None:
- logger.error("拉流地址不能为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
- ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
- args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.pullUrl]
- p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
- out, err = p.communicate(timeout=20)
- if p.returncode != 0:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- probe = json.loads(out.decode('utf-8'))
- if probe is None or probe.get("streams") is None:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- # 视频大小
- # format = probe['format']
- # size = int(format['size'])/1024/1024
- video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
- if video_stream is None:
- raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
- width = video_stream.get('width')
- height = video_stream.get('height')
- nb_frames = video_stream.get('nb_frames')
- fps = video_stream.get('r_frame_rate')
- # duration = video_stream.get('duration')
- # bit_rate = video_stream.get('bit_rate')
- self.width = int(width)
- self.height = int(height)
- if width > 1600:
- self.wh = int(width * height * 3 // 8)
- self.wah = '%sx%s' % (int(self.width / 2), int(self.height / 2))
- self.h = int(self.height * 3 // 4)
- self.w = int(self.width / 2)
- self.hn = int(self.height / 2)
- self.wn = int(self.width // 2)
- w_f = self.wh != width * height * 3 / 8
- h_f = self.h != self.height * 3 / 4
- wd_f = self.w != self.width / 2
- if w_f or h_f or wd_f:
- self.resize_status = True
- self.wh = int(width * height * 3 // 2)
- self.wah = '%sx%s' % (int(self.width), int(self.height))
- self.h = int(self.height * 3 // 2)
- self.w = int(self.width)
- else:
- self.wh = int(width * height * 3 // 2)
- self.wah = '%sx%s' % (int(self.width), int(self.height))
- self.h = int(self.height * 3 // 2)
- self.w = int(self.width)
- self.hn = int(self.height)
- self.wn = int(self.width)
- if nb_frames:
- self.all_frames = int(nb_frames)
- up, down = str(fps).split('/')
- self.fps = int(eval(up) / eval(down))
- # if duration:
- # self.duration = float(video_stream['duration'])
- # self.bit_rate = int(bit_rate) / 1000
- logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}|bit_rate:{}, requestId:{}", self.width,
- self.height, self.fps, self.all_frames, self.bit_rate, self.requestId)
- except ServiceException as s:
- logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
- self.clear_video_info()
- raise s
- except Exception as e:
- logger.exception("获取视频信息异常:{}, requestId:{}", e, self.requestId)
- self.clear_video_info()
-
- '''
- 拉取视频
- '''
-
- def build_pull_p(self):
- try:
- if self.wah is None:
- return
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- # command = ['ffmpeg',
- # # '-b:v', '3000k',
- # '-i', self.pullUrl,
- # '-f', 'rawvideo',
- # '-vcodec', 'rawvideo',
- # '-pix_fmt', 'bgr24',
- # # '-s', "{}x{}".format(int(width), int(height)),
- # '-an',
- # '-']
- # input_config = {'c:v': 'h264_cuvid', 'resize': self.wah}
- # process = (
- # ffmpeg
- # .input(self.pullUrl, **input_config)
- # .output('pipe:', format='rawvideo', r=str(self.fps)) # pix_fmt='bgr24'
- # .overwrite_output()
- # .global_args('-an')
- # .run_async(pipe_stdout=True)
- # )
- command = ['ffmpeg',
- '-re',
- '-y',
- '-c:v', 'h264_cuvid',
- '-resize', self.wah,
- '-i', self.pullUrl,
- '-f', 'rawvideo',
- '-an',
- '-']
- self.pull_p = sp.Popen(command, stdout=sp.PIPE)
- # self.pull_p = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE)
- # self.pull_p = process
- except ServiceException as s:
- logger.exception("构建拉流管道异常: {}, requestId:{}", s, self.requestId)
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
- raise s
- except Exception as e:
- logger.exception("构建拉流管道异常:{}, requestId:{}", e, self.requestId)
- if self.pull_p:
- logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- self.pull_p = None
-
- def checkconfig(self):
- if self.fps is None or self.width is None or self.height is None:
- return True
- return False
-
- def read(self):
- result = None
- try:
- # if self.pull_p is None:
- # logger.error("拉流管道为空, requestId:{}", self.requestId)
- # raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0],
- # ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1])
- in_bytes = self.pull_p.stdout.read(self.wh)
- self.current_frame += 1
- if in_bytes is not None and len(in_bytes) > 0:
- # result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.height), int(self.width), 3]))
- try:
- img = (np.frombuffer(in_bytes, np.uint8)).reshape((self.h, self.w))
- except Exception as ei:
- logger.exception("视频格式异常:{}, requestId:{}", ei, self.requestId)
- raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
- ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
- result = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
- if self.resize_status:
- if self.width > 1600:
- result = cv2.resize(result, (int(self.width / 2), int(self.height / 2)),
- interpolation=cv2.INTER_LINEAR)
- except ServiceException as s:
- raise s
- except Exception as e:
- logger.exception("读流异常:{}, requestId:{}", e, self.requestId)
- return result
-
- def close(self):
- if self.pull_p:
- if self.pull_p.stdout:
- self.pull_p.stdout.close()
- self.pull_p.terminate()
- self.pull_p.wait()
- logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
- if self.p:
- if self.p.stdin:
- self.p.stdin.close()
- self.p.terminate()
- self.p.wait()
- # self.p.communicate()
- # self.p.kill()
- logger.info("关闭管道完成, requestId:{}", self.requestId)
- if self.or_video_file:
- self.or_video_file.release()
- logger.info("关闭原视频写入流完成, requestId:{}", self.requestId)
- if self.ai_video_file:
- self.ai_video_file.release()
- logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
-
- # 构建 cv2
- # def build_cv2(self):
- # try:
- # if self.cap is not None:
- # logger.info("重试, 关闭cap, requestId:{}", self.requestId)
- # self.cap.release()
- # if self.p is not None:
- # logger.info("重试, 关闭管道, requestId:{}", self.requestId)
- # self.p.stdin.close()
- # self.p.terminate()
- # self.p.wait()
- # if self.pullUrl is None:
- # logger.error("拉流地址不能为空, requestId:{}", self.requestId)
- # raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
- # ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
- # if self.pushUrl is None:
- # logger.error("推流地址不能为空, requestId:{}", self.requestId)
- # raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
- # ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
- # self.cap = cv2.VideoCapture(self.pullUrl)
- # if self.fps is None or self.fps == 0:
- # self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
- # if self.width is None or self.width == 0:
- # self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- # if self.height is None or self.height == 0:
- # self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # command = ['/usr/bin/ffmpeg',
- # # '-y', # 不经过确认,输出时直接覆盖同名文件。
- # '-f', 'rawvideo',
- # '-vcodec', 'rawvideo',
- # '-pix_fmt', 'bgr24', # 显示可用的像素格式
- # # '-s', "{}x{}".format(self.width * 2, self.height),
- # '-s', "{}x{}".format(int(self.width), int(self.height/2)),
- # # '-r', str(15),
- # '-i', '-', # 指定输入文件
- # '-g', '25',
- # '-b:v', '3000k',
- # '-tune', 'zerolatency', # 加速编码速度
- # '-c:v', 'libx264', # 指定视频编码器
- # '-sc_threshold', '0',
- # '-pix_fmt', 'yuv420p',
- # '-an',
- # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
- # '-f', 'flv',
- # self.pushUrl]
- # # 管道配置
- # logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
- # self.p = sp.Popen(command, stdin=sp.PIPE)
- # except ServiceException as s:
- # logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId)
- # raise s
- # except Exception as e:
- # logger.exception("初始化cv2异常:{}, requestId:{}", e, self.requestId)
-
- # 构建 cv2
- def build_p(self):
- try:
- if self.p:
- logger.info("重试, 关闭管道, requestId:{}", self.requestId)
- if self.p.stdin:
- self.p.stdin.close()
- self.p.terminate()
- self.p.wait()
- # self.p.communicate()
- # self.p.kill()
- if self.pushUrl is None:
- logger.error("推流地址不能为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
- ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
- width = int(self.width)
- if width <= 1600:
- width = 2 * int(self.width)
- command = ['ffmpeg',
- # '-loglevel', 'debug',
- '-y',
- '-f', 'rawvideo',
- '-vcodec', 'rawvideo',
- '-pix_fmt', 'bgr24',
- '-thread_queue_size', '1024',
- # '-s', "{}x{}".format(self.width * 2, self.height),
- '-s', "{}x{}".format(width, int(self.hn)),
- '-r', str(self.fps),
- '-i', '-', # 指定输入文件
- '-g', str(self.fps),
- '-maxrate', '8000k',
- # '-profile:v', 'high',
- '-b:v', '5000k',
- # '-crf', '18',
- # '-rc:v', 'vbr',
- # '-cq:v', '25',
- # '-qmin', '25',
- # '-qmax', '25',
- '-c:v', 'h264_nvenc', #
- '-bufsize', '5000k',
- # '-c:v', 'libx264', # 指定视频编码器
- # '-tune', 'zerolatency', # 加速编码速度
- # '-sc_threshold', '0',
- '-pix_fmt', 'yuv420p',
- "-an",
- # '-flvflags', 'no_duration_filesize',
- # '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- '-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- '-tune', 'll',
- '-f', 'flv',
- self.pushUrl]
- # command = 'ffmpeg -loglevel debug -y -f rawvideo -vcodec rawvideo -pix_fmt bgr24' +\
- # ' -s ' + "{}x{}".format(int(self.width), int(self.height/2))\
- # + ' -i - ' + '-g ' + str(self.fps)+\
- # ' -b:v 6000k -tune zerolatency -c:v libx264 -pix_fmt yuv420p -preset ultrafast'+\
- # ' -f flv ' + self.pushUrl
-
- # kwargs = {'format': 'rawvideo',
- # # 'vcodec': 'rawvideo',
- # 'pix_fmt': 'bgr24',
- # 's': '{}x{}'.format(int(self.width), int(self.height/2))}
- # out = {
- # 'r': str(self.fps),
- # 'g': str(self.fps),
- # 'b:v': '5500k', # 恒定码率
- # # 'maxrate': '15000k',
- # # 'crf': '18',
- # 'bufsize': '5500k',
- # 'tune': 'zerolatency', # 加速编码速度
- # 'c:v': 'libx264', # 指定视频编码器
- # 'sc_threshold': '0',
- # 'pix_fmt': 'yuv420p',
- # # 'flvflags': 'no_duration_filesize',
- # 'preset': 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
- # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
- # 'format': 'flv'}
- # 管道配置
- # process2 = (
- # ffmpeg
- # .input('pipe:', **kwargs)
- # .output(self.pushUrl, **out)
- # .global_args('-y', '-an')
- # .overwrite_output()
- # .run_async(pipe_stdin=True)
- # )
- logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
- self.p = sp.Popen(command, stdin=sp.PIPE, shell=False)
- # self.p = process2
- except ServiceException as s:
- logger.exception("构建p管道异常: {}, requestId:{}", s, self.requestId)
- raise s
- except Exception as e:
- logger.exception("初始化p管道异常:{}, requestId:{}", e, self.requestId)
-
- async def push_stream_write(self, frame):
- self.p.stdin.write(frame.tostring())
-
- async def push_stream(self, frame):
- if self.p is None:
- self.build_p()
- try:
- await self.push_stream_write(frame)
- return True
- except Exception as ex:
- logger.exception("推流进管道异常:{}, requestId: {}", ex, self.requestId)
- current_retry_num = 0
- while True:
- try:
- time.sleep(1)
- self.p_push_retry_num += 1
- current_retry_num += 1
- if current_retry_num > 3 or self.p_push_retry_num > 600:
- return False
- self.build_p()
- await self.push_stream_write(frame)
- logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
- self.requestId)
- return True
- except Exception as e:
- logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
- current_retry_num, self.requestId)
- return False
-
- async def video_frame_write(self, or_frame, ai_frame):
- if or_frame is not None:
- self.or_video_file.write(or_frame)
- if ai_frame is not None:
- self.ai_video_file.write(ai_frame)
-
- async def video_write(self, or_frame, ai_frame):
- try:
- self.build_write()
- if or_frame is not None and len(or_frame) > 0:
- await self.video_frame_write(or_frame, None)
- if ai_frame is not None and len(ai_frame) > 0:
- await self.video_frame_write(None, ai_frame)
- return True
- except Exception as ex:
- ai_retry_num = 0
- while True:
- try:
- ai_retry_num += 1
- if ai_retry_num > 3:
- logger.exception("重新写入离线分析后视频到本地,重试失败:{}, requestId: {}", e, self.requestId)
- return False
- if or_frame is not None and len(or_frame) > 0:
- await self.or_video_file.write(or_frame)
- if ai_frame is not None and len(ai_frame) > 0:
- await self.ai_video_file.write(ai_frame)
- logger.info("重新写入离线分析后视频到本地, 当前重试次数: {}, requestId: {}", ai_retry_num,
- self.requestId)
- return True
- except Exception as e:
- logger.exception("重新写入离线分析后视频到本地:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
- ai_retry_num, self.requestId)
-
- def build_write(self):
- try:
- if self.fps is None or self.width is None or self.height is None:
- logger.error("fps、 width、 height为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- if self.orFilePath is not None and self.or_video_file is None:
- self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
- (int(self.wn), int(self.hn)))
- if self.or_video_file is None:
- logger.error("or_video_file为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- if self.aiFilePath is not None and self.ai_video_file is None:
- self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
- (int(self.wn * 2), int(self.hn)))
- if self.ai_video_file is None:
- logger.error("ai_video_file为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- except ServiceException as s:
- logger.exception("构建文件写对象异常: {}, requestId:{}", s, self.requestId)
- raise s
- except Exception as e:
- logger.exception("构建文件写对象异常: {}, requestId:{}", e, self.requestId)
- raise e
-
- def video_merge(self, frame1, frame2):
- # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
- # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
- # frame_merge = np.hstack((frameLeft, frameRight))
- frame_merge = np.hstack((frame1, frame2))
- return frame_merge
-
- def getP(self):
- if self.p is None:
- logger.error("获取管道为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return self.p
-
- def getOrVideoFile(self):
- if self.or_video_file is None:
- logger.error("获取原视频写入对象为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return self.or_video_file
-
- def getAiVideoFile(self):
- if self.ai_video_file is None:
- logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId)
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- return self.ai_video_file
|