tuoheng_algN/util/Cv2Utils.py

1096 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from json import loads
from time import time
from traceback import format_exc
import cv2
import subprocess as sp
import numpy as np
from loguru import logger
from common import Constant
from exception.CustomerException import ServiceException
from enums.ExceptionEnum import ExceptionType
class Cv2Util:
__slots__ = [
'pullUrl',
'pushUrl',
'orFilePath',
'aiFilePath',
'p',
'or_video_file',
'ai_video_file',
'fps',
'width',
'height',
'wh',
'h',
'w',
'all_frames',
'bit_rate',
'pull_p',
'requestId',
'p_push_retry_num',
'isGpu',
'read_w_h',
'context',
'p_push_time'
]
def __init__(self, pullUrl=None, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None, context=None,
gpu_ids=None):
self.pullUrl = pullUrl
self.pushUrl = pushUrl
self.orFilePath = orFilePath
self.aiFilePath = aiFilePath
self.p = None
self.or_video_file = None
self.ai_video_file = None
self.fps = None
self.width = None
self.height = None
self.wh = None
self.h = None
self.w = None
self.all_frames = None
self.bit_rate = None
self.pull_p = None
self.requestId = requestId
self.p_push_time = 0
self.p_push_retry_num = 0
self.isGpu = False
self.read_w_h = None
self.context = context
if gpu_ids is not None and len(gpu_ids) > 0:
self.isGpu = True
def getFrameConfig(self, fps, width, height):
if self.fps is None or self.width != width or self.height != height:
self.fps = fps
self.width = width
self.height = height
if width > Constant.width:
self.h = int(self.height // 2)
self.w = int(self.width // 2)
else:
self.h = int(self.height)
self.w = int(self.width)
def clear_video_info(self):
self.fps = None
self.width = None
self.height = None
'''
获取视频信息
'''
def get_video_info(self):
try:
if self.pullUrl is None or len(self.pullUrl) == 0:
logger.error("拉流地址不能为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.pullUrl]
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
out, err = p.communicate(timeout=20)
if p.returncode != 0:
raise Exception("未获取视频信息requestId:" + self.requestId)
probe = loads(out.decode('utf-8'))
if probe is None or probe.get("streams") is None:
raise Exception("未获取视频信息requestId:" + self.requestId)
# 视频大小
# format = probe['format']
# size = int(format['size'])/1024/1024
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
if video_stream is None:
raise Exception("未获取视频信息requestId:" + self.requestId)
width = video_stream.get('width')
height = video_stream.get('height')
nb_frames = video_stream.get('nb_frames')
fps = video_stream.get('r_frame_rate')
# duration = video_stream.get('duration')
# bit_rate = video_stream.get('bit_rate')
if width is not None and int(width) != 0 and height is not None and int(height) != 0:
self.width = int(width)
self.height = int(height)
self.wh = self.width * self.height * 3
if width > Constant.width:
self.h = int(self.height // 2)
self.w = int(self.width // 2)
else:
self.h = int(self.height)
self.w = int(self.width)
if nb_frames:
self.all_frames = int(nb_frames)
up, down = str(fps).split('/')
self.fps = int(eval(up) / eval(down))
if self.fps > 30:
logger.info("获取视频FPS大于30帧, FPS:{}, requestId:{}", self.fps, self.requestId)
self.fps = 30
if self.fps < 25:
logger.info("获取视频FPS小于25帧, FPS:{}, requestId:{}", self.fps, self.requestId)
self.fps = 25
# if duration:
# self.duration = float(video_stream['duration'])
# self.bit_rate = int(bit_rate) / 1000
logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}, requestId:{}",
self.width, self.height, self.fps, self.all_frames, self.requestId)
except ServiceException as s:
logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
self.clear_video_info()
raise s
except Exception:
logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), self.requestId)
self.clear_video_info()
'''
录屏任务获取视频信息
'''
def get_recording_video_info(self):
try:
video_info = 'ffprobe -show_format -show_streams -of json %s' % self.pullUrl
p = sp.Popen(video_info, stdout=sp.PIPE, stderr=sp.PIPE, shell=True)
out, err = p.communicate(timeout=17)
if p.returncode != 0:
raise Exception("未获取视频信息requestId:" + self.requestId)
probe = loads(out.decode('utf-8'))
if probe is None or probe.get("streams") is None:
raise Exception("未获取视频信息requestId:" + self.requestId)
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
if video_stream is None:
raise Exception("未获取视频信息requestId:" + self.requestId)
width = video_stream.get('width')
height = video_stream.get('height')
nb_frames = video_stream.get('nb_frames')
fps = video_stream.get('r_frame_rate')
if width and int(width) > 0:
self.width = int(width)
if height and int(height) > 0:
self.height = int(height)
if self.width and self.height:
self.wh = int(width * height * 3)
self.read_w_h = ([self.height, self.width, 3])
if nb_frames and int(nb_frames) > 0:
self.all_frames = int(nb_frames)
if fps:
up, down = str(fps).split('/')
self.fps = int(eval(up) / eval(down))
logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}, requestId:{}", self.width,
self.height, self.fps, self.all_frames, self.requestId)
except ServiceException as s:
logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
self.clear_video_info()
raise s
except Exception:
logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), self.requestId)
self.clear_video_info()
def getRecordingFrameConfig(self, fps, width, height):
self.fps = fps
self.width = width
self.height = height
'''
录屏拉取视频
'''
def recording_pull_p(self):
try:
# 如果视频信息不存在, 不初始化拉流
if self.checkconfig():
return
# 如果已经初始化, 不再初始化
if self.pull_p:
return
command = ['ffmpeg -re', '-y', '-an'
# '-hide_banner',
]
if self.pullUrl.startswith('rtsp://'):
command.extend(['-rtsp_transport', 'tcp'])
if self.isGpu:
command.extend(['-hwaccel', 'cuda'])
command.extend(['-i', self.pullUrl,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-r', '25',
'-'])
self.pull_p = sp.Popen(command, stdout=sp.PIPE)
except ServiceException as s:
logger.exception("构建拉流管道异常: {}, requestId:{}", s.msg, self.requestId)
self.clear_video_info()
if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
raise s
except Exception as e:
logger.error("构建拉流管道异常:{}, requestId:{}", e, self.requestId)
self.clear_video_info()
if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
def recording_read(self):
result = None
try:
self.recording_pull_p()
in_bytes = self.pull_p.stdout.read(self.wh)
if in_bytes is not None and len(in_bytes) > 0:
try:
result = np.frombuffer(in_bytes, np.uint8).reshape(self.read_w_h)
except Exception:
logger.error("视频格式异常:{}, requestId:{}", format_exc(), self.requestId)
raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
except ServiceException as s:
if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
raise s
except Exception:
logger.error("读流异常:{}, requestId:{}", format_exc(), self.requestId)
if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
return result
'''
拉取视频
'''
def build_pull_p(self):
try:
command = ['ffmpeg']
if self.pullUrl.startswith("rtsp://"):
command.extend(['-rtsp_transport', 'tcp'])
command.extend(['-re',
'-y',
'-an',
# '-hwaccel', 'cuda', cuvid
'-c:v', 'h264_cuvid',
# '-resize', self.wah,
'-i', self.pullUrl,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-r', '25',
'-'])
self.pull_p = sp.Popen(command, stdout=sp.PIPE)
except ServiceException as s:
logger.exception("构建拉流管道异常: {}, requestId:{}", s.msg, self.requestId)
raise s
except Exception as e:
logger.error("构建拉流管道异常:{}, requestId:{}", format_exc(), self.requestId)
self.clear_video_info()
if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
def checkconfig(self):
if self.width is None or self.height is None or self.fps is None:
return True
return False
def read(self):
result = None
try:
if self.pull_p is None:
self.build_pull_p()
in_bytes = self.pull_p.stdout.read(self.wh)
if in_bytes is not None and len(in_bytes) > 0:
try:
result = (np.frombuffer(in_bytes, np.uint8).reshape([self.height, self.width, 3]))
# img = (np.frombuffer(in_bytes, np.uint8)).reshape((self.h, self.w))
except Exception as ei:
logger.error("视频格式异常:{}, requestId:{}", format_exc(), self.requestId)
raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
# result = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
# result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
if self.width > Constant.width:
result = cv2.resize(result, (self.w, self.h), interpolation=cv2.INTER_LINEAR)
except ServiceException as s:
raise s
except Exception as e:
self.clear_video_info()
if self.pull_p:
logger.info("关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
logger.error("读流异常:{}, requestId:{}", format_exc(), self.requestId)
return result
def close(self):
self.clear_video_info()
if self.pull_p:
if self.pull_p.stdout:
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
self.pull_p = None
logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
if self.p:
if self.p.stdin:
self.p.stdin.close()
self.p.terminate()
self.p.wait()
self.p = None
# self.p.communicate()
# self.p.kill()
logger.info("关闭管道完成, requestId:{}", self.requestId)
if self.or_video_file:
self.or_video_file.release()
self.or_video_file = None
logger.info("关闭原视频写入流完成, requestId:{}", self.requestId)
if self.ai_video_file:
self.ai_video_file.release()
self.ai_video_file = None
logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
# 构建 cv2
# def build_cv2(self):
# try:
# if self.cap is not None:
# logger.info("重试, 关闭cap, requestId:{}", self.requestId)
# self.cap.release()
# if self.p is not None:
# logger.info("重试, 关闭管道, requestId:{}", self.requestId)
# self.p.stdin.close()
# self.p.terminate()
# self.p.wait()
# if self.pullUrl is None:
# logger.error("拉流地址不能为空, requestId:{}", self.requestId)
# raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
# ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
# if self.pushUrl is None:
# logger.error("推流地址不能为空, requestId:{}", self.requestId)
# raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
# ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
# self.cap = cv2.VideoCapture(self.pullUrl)
# if self.fps is None or self.fps == 0:
# self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
# if self.width is None or self.width == 0:
# self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# if self.height is None or self.height == 0:
# self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# command = ['/usr/bin/ffmpeg',
# # '-y', # 不经过确认,输出时直接覆盖同名文件。
# '-f', 'rawvideo',
# '-vcodec', 'rawvideo',
# '-pix_fmt', 'bgr24', # 显示可用的像素格式
# # '-s', "{}x{}".format(self.width * 2, self.height),
# '-s', "{}x{}".format(int(self.width), int(self.height/2)),
# # '-r', str(15),
# '-i', '-', # 指定输入文件
# '-g', '25',
# '-b:v', '3000k',
# '-tune', 'zerolatency', # 加速编码速度
# '-c:v', 'libx264', # 指定视频编码器
# '-sc_threshold', '0',
# '-pix_fmt', 'yuv420p',
# '-an',
# '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
# '-f', 'flv',
# self.pushUrl]
# # 管道配置
# logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
# self.p = sp.Popen(command, stdin=sp.PIPE)
# except ServiceException as s:
# logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId)
# raise s
# except Exception as e:
# logger.exception("初始化cv2异常{}, requestId:{}", e, self.requestId)
# 构建 cv2
def build_p(self):
try:
if self.pushUrl is None or len(self.pushUrl) == 0:
logger.error("推流地址不能为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
command = ['ffmpeg',
# '-loglevel', 'debug',
'-re',
'-y',
"-an",
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-thread_queue_size', '1024',
'-s', "{}x{}".format(self.w * 2, self.h),
'-i', '-', # 指定输入文件
'-r', str(25),
'-g', str(25),
'-maxrate', '6000k',
# '-profile:v', 'high',
'-b:v', '5000k',
# '-crf', '18',
# '-rc:v', 'vbr',
# '-cq:v', '25',
# '-qmin', '25',
# '-qmax', '25',
'-c:v', 'h264_nvenc', #
'-bufsize', '5000k',
# '-c:v', 'libx264', # 指定视频编码器
# '-tune', 'zerolatency', # 加速编码速度
# '-sc_threshold', '0',
'-pix_fmt', 'yuv420p',
# '-flvflags', 'no_duration_filesize',
# '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
'-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
'-tune', 'll',
'-f', 'flv',
self.pushUrl]
logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width,
self.requestId)
self.p = sp.Popen(command, stdin=sp.PIPE, shell=False)
except ServiceException as s:
if self.p:
if self.p.stdin:
self.p.stdin.close()
self.p.terminate()
self.p.wait()
logger.exception("构建p管道异常: {}, requestId:{}", s.msg, self.requestId)
raise s
except Exception as e:
if self.p:
if self.p.stdin:
self.p.stdin.close()
self.p.terminate()
self.p.wait()
logger.error("初始化p管道异常{}, requestId:{}", format_exc(), self.requestId)
def push_stream(self, frame):
current_retry_num = 0
while True:
try:
if self.p is None:
self.build_p()
self.p.stdin.write(frame.tostring())
break
except ServiceException as s:
raise s
except Exception as ex:
if self.p_push_time == 0:
self.p_push_time = time.time()
if time.time() - self.p_push_time < 2:
self.p_push_retry_num += 1
self.p_push_time = time.time()
if time.time() - self.p_push_time > 60:
self.p_push_retry_num = 0
self.p_push_time = time.time()
logger.error("推流管道异常:{}, requestId: {}", format_exc(), self.requestId)
if self.p:
try:
if self.p.stdin:
self.p.stdin.close()
self.p.terminate()
self.p.wait()
except:
logger.error("推流管道异常:{}, requestId: {}", format_exc(), self.requestId)
self.p = None
current_retry_num += 1
if self.p_push_retry_num > 100:
logger.error("推流进管道异常:{}, requestId: {}", format_exc(), self.requestId)
raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
if current_retry_num > 3:
logger.error("推流进管道异常:{}, requestId: {}", format_exc(), self.requestId)
raise ServiceException(ExceptionType.PUSH_STREAM_EXCEPTION.value[0],
ExceptionType.PUSH_STREAM_EXCEPTION.value[1])
def build_or_write(self):
try:
if self.orFilePath is not None and self.or_video_file is None:
self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25,
(self.w, self.h))
# self.or_video_file.set(cv2.CAP_PROP_BITRATE, 5000)
if self.or_video_file is None:
logger.error("or_video_file为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
except ServiceException as s:
if self.or_video_file:
self.or_video_file.release()
self.or_video_file = None
logger.error("构建OR文件写对象异常: {}, requestId:{}", s.msg, self.requestId)
raise s
except Exception as e:
if self.or_video_file:
self.or_video_file.release()
self.or_video_file = None
logger.error("构建OR文件写对象异常: {}, requestId:{}", format_exc(), self.requestId)
raise e
except:
if self.or_video_file:
self.or_video_file.release()
self.or_video_file = None
logger.exception("构建OR文件写对象异常:{}, requestId:{}", format_exc(), self.requestId)
raise Exception("构建OR文件写对象异常")
def build_ai_write(self):
try:
if self.aiFilePath is not None and self.ai_video_file is None:
self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25,
(self.w * 2, self.h))
# self.ai_video_file.set(cv2.CAP_PROP_BITRATE, 5000)
if self.ai_video_file is None:
logger.error("ai_video_file为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
except ServiceException as s:
if self.ai_video_file:
self.ai_video_file.release()
self.ai_video_file = None
logger.error("构建AI文件写对象异常: {}, requestId:{}", s.msg, self.requestId)
raise s
except Exception as e:
if self.ai_video_file:
self.ai_video_file.release()
self.ai_video_file = None
logger.error("构建AI文件写对象异常: {}, requestId:{}", format_exc(), self.requestId)
raise e
except:
if self.ai_video_file:
self.ai_video_file.release()
self.ai_video_file = None
logger.error("构建AI文件写对象异常:{}, requestId:{}", format_exc(), self.requestId)
raise Exception("构建AI文件写对象异常")
def video_or_write(self, frame):
ai_retry_num = 0
while True:
try:
if self.or_video_file is None:
self.build_or_write()
self.or_video_file.write(frame)
break
except ServiceException as s:
raise s
except Exception as ex:
if ai_retry_num > 3:
logger.error("重新写入原视频视频到本地, 重试失败:{}, requestId: {}", format_exc(),
self.requestId)
raise ex
finally:
ai_retry_num += 1
def video_ai_write(self, frame):
ai_retry_num = 0
while True:
try:
if self.ai_video_file is None:
self.build_ai_write()
self.ai_video_file.write(frame)
break
except ServiceException as s:
raise s
except Exception as ex:
if ai_retry_num > 3:
logger.exception("重新写入分析后的视频到本地,重试失败:{}, requestId: {}", format_exc(),
self.requestId)
raise ex
finally:
ai_retry_num += 1
def video_merge(self, frame1, frame2):
# frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
# frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
# frame_merge = np.hstack((frameLeft, frameRight))
frame_merge = np.hstack((frame1, frame2))
return frame_merge
def getP(self):
if self.p is None:
logger.error("获取管道为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
return self.p
def getOrVideoFile(self):
if self.or_video_file is None:
logger.error("获取原视频写入对象为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
return self.or_video_file
def getAiVideoFile(self):
if self.ai_video_file is None:
logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
return self.ai_video_file
def check_video_stream(width, height):
if width is None or height is None:
return True
return False
def build_video_info(pull_url, requestId):
try:
if pull_url is None or len(pull_url) == 0:
logger.error("拉流地址不能为空, requestId:{}", requestId)
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
pp = sp.Popen(['ffprobe', '-show_format', '-show_streams', '-of', 'json', pull_url], stdout=sp.PIPE,
stderr=sp.PIPE)
out, err = pp.communicate(timeout=18)
if pp.returncode != 0:
logger.error("获取视频信息失败: {}, requestId:{}", err.decode('utf-8'), requestId)
raise Exception("未获取视频信息!!!!")
probe = loads(out.decode('utf-8'))
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
if video_stream is None:
raise Exception("未获取视频信息!!!!")
width_new, height_new = video_stream.get('width'), video_stream.get('height')
nb_frames = video_stream.get('nb_frames', 0)
duration = video_stream.get('duration')
if duration is not None and float(duration) != float(0):
nb_frames = int(float(duration) * 25)
if width_new is not None and int(width_new) != 0 and height_new is not None and int(height_new) != 0:
height_o = int(height_new)
width, height = int(width_new), height_o * 3 // 2
width_height_3 = width * height
all_frames = int(nb_frames)
w_2, h_2 = width, height_o
if width > Constant.width:
w_2, h_2 = width // 2, height_o // 2
logger.info("视频信息, width:{}|height:{}|all_frames:{}, requestId:{}", width, height_o, all_frames,
requestId)
return width, height, width_height_3, all_frames, w_2, h_2
raise Exception("未获取视频信息!!!!")
except ServiceException as s:
logger.error("获取视频信息异常: {}, requestId:{}", s.msg, requestId)
raise s
except Exception:
logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), requestId)
return None, None, None, 0, None, None
def build_video_info2(pull_url, requestId):
try:
pp = sp.Popen(['ffprobe', '-show_format', '-show_streams', '-of', 'json', pull_url], stdout=sp.PIPE,
stderr=sp.PIPE)
out, err = pp.communicate(timeout=17)
if pp.returncode != 0:
logger.error("获取视频信息失败: {}, requestId:{}", err.decode('utf-8'), requestId)
raise Exception("未获取视频信息!!!!")
probe = loads(out.decode('utf-8'))
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
if video_stream is None:
raise Exception("未获取视频信息!!!!")
width_new, height_new = video_stream.get('width'), video_stream.get('height')
nb_frames = video_stream.get('nb_frames', 0)
# fps = video_stream.get('r_frame_rate')
duration = video_stream.get('duration')
if duration is not None and float(duration) != float(0):
nb_frames = int(float(duration) * 25)
# bit_rate = video_stream.get('bit_rate')
if width_new is not None and int(width_new) != 0 and height_new is not None and int(height_new) != 0:
width_o, height_o = int(width_new), int(height_new)
# width_height_3 = width * height * 3
width_height_3 = width_o * height_o * 3 // 2
width, height, all_frames = width_o, height_o * 3 // 2, int(nb_frames)
w, h = width_o, height_o
if width > Constant.width:
w, h = width_o // 2, height_o // 2
# up, down = str(fps).split('/')
# self.fps = int(eval(up) / eval(down))
# if duration:
# self.duration = float(video_stream['duration'])
# self.bit_rate = int(bit_rate) / 1000
logger.info("视频信息, width:{}|height:{}|all_frames:{}, requestId:{}", width_o, height_o, all_frames,
requestId)
return width, height, width_height_3, all_frames, w, h
raise Exception("未获取视频信息!!!!")
except ServiceException as s:
logger.error("获取视频信息异常: {}, requestId:{}", s.msg, requestId)
raise s
except Exception:
logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), requestId)
return None, None, None, 0, None, None
def start_pull_p(pull_url, requestId):
try:
command = ['ffmpeg']
if pull_url.startswith("rtsp://"):
command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp'])
if pull_url.startswith("http") or pull_url.startswith("rtmp"):
command.extend(['-rw_timeout', '20000000'])
command.extend(['-re',
'-y',
'-an',
# '-hwaccel', 'cuda', cuvid
'-c:v', 'h264_cuvid',
# '-resize', self.wah,
'-i', pull_url,
'-f', 'rawvideo',
# '-pix_fmt', 'bgr24',
'-r', '25',
'-'])
return sp.Popen(command, stdout=sp.PIPE)
except ServiceException as s:
logger.error("构建拉流管道异常: {}, requestId:{}", s.msg, requestId)
raise s
except Exception as e:
logger.error("构建拉流管道异常:{}, requestId:{}", format_exc(), requestId)
raise e
def clear_pull_p(pull_p, requestId):
try:
if pull_p and pull_p.poll() is None:
logger.info("关闭拉流管道, requestId:{}", requestId)
if pull_p.stdout:
pull_p.stdout.close()
pull_p.terminate()
pull_p.wait(timeout=30)
logger.info("拉流管道已关闭, requestId:{}", requestId)
except Exception as e:
logger.error("关闭拉流管道异常: {}, requestId:{}", format_exc(), requestId)
if pull_p and pull_p.poll() is None:
pull_p.kill()
pull_p.wait(timeout=30)
raise e
def pull_read_video_stream(pull_p, pull_url, width, height, width_height_3, w_2, h_2, requestId):
result = None
try:
if pull_p is None:
pull_p = start_pull_p(pull_url, requestId)
in_bytes = pull_p.stdout.read(width_height_3)
if in_bytes is not None and len(in_bytes) > 0:
try:
# result = (np.frombuffer(in_bytes, np.uint8).reshape([height * 3 // 2, width, 3]))
result = (np.frombuffer(in_bytes, np.uint8)).reshape((height, width))
result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
# result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
if width > Constant.width:
result = cv2.resize(result, (w_2, h_2), interpolation=cv2.INTER_LINEAR)
except Exception:
logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
except ServiceException as s:
clear_pull_p(pull_p, requestId)
raise s
except Exception:
clear_pull_p(pull_p, requestId)
pull_p, width, height = None, None, None
logger.error("读流异常:{}, requestId:{}", format_exc(), requestId)
return result, pull_p, width, height
def pull_read_video_stream2(pull_p, pull_url, width, height, width_height_3, w, h, requestId):
result = None
try:
if pull_p is None:
pull_p = start_pull_p(pull_url, requestId)
in_bytes = pull_p.stdout.read(width_height_3)
if in_bytes is not None and len(in_bytes) > 0:
try:
result = (np.frombuffer(in_bytes, np.uint8)).reshape((height, width))
result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
if width > Constant.width:
result = cv2.resize(result, (w, h), interpolation=cv2.INTER_LINEAR)
except Exception:
logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
except ServiceException as s:
clear_pull_p(pull_p, requestId)
raise s
except Exception:
clear_pull_p(pull_p, requestId)
pull_p, width, height = None, None, None
logger.error("读流异常:{}, requestId:{}", format_exc(), requestId)
return result, pull_p, width, height
def video_conjuncing(frame1, frame2):
# frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
# frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
# frame_merge = np.hstack((frameLeft, frameRight))
frame_merge = np.hstack((frame1, frame2))
return frame_merge
def build_push_p(push_url, width, height, requestId):
push_p = None
try:
if push_url is None or len(push_url) == 0:
logger.error("推流地址不能为空, requestId:{}", requestId)
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
command = ['ffmpeg',
# '-loglevel', 'debug',
# '-re',
'-y',
"-an",
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-thread_queue_size', '1024',
'-s', "{}x{}".format(width, height),
'-i', '-', # 指定输入文件
'-r', str(25),
'-g', str(25),
'-maxrate', '6000k',
# '-profile:v', 'high',
'-b:v', '4000k',
# '-crf', '18',
# '-rc:v', 'vbr',
# '-cq:v', '25',
# '-qmin', '25',
# '-qmax', '25',
'-c:v', 'h264_nvenc', #
'-bufsize', '4000k',
# '-c:v', 'libx264', # 指定视频编码器
# '-tune', 'zerolatency', # 加速编码速度
# '-sc_threshold', '0',
# '-rc', 'cbr_ld_hq',
# '-zerolatency', '1',
'-pix_fmt', 'yuv420p',
# '-flvflags', 'no_duration_filesize',
# '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
'-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
'-tune', 'll',
'-f', 'flv',
push_url]
logger.info("height:{}|width:{}|requestId:{}", height, width, requestId)
push_p = sp.Popen(command, stdin=sp.PIPE, shell=False)
return push_p
except ServiceException as s:
if push_p:
if push_p.stdin:
push_p.stdin.close()
push_p.terminate()
push_p.wait()
logger.error("构建p管道异常: {}, requestId:{}", s.msg, requestId)
raise s
except Exception as e:
if push_p:
if push_p.stdin:
push_p.stdin.close()
push_p.terminate()
push_p.wait()
logger.error("初始化p管道异常{}, requestId:{}", format_exc(), requestId)
raise e
def push_video_stream(frame, push_p, push_url, p_push_status, requestId):
"""
:param frame: 当前视频帧
:param push_p: 推流管道
:param push_url: 推流地址
:param p_push_status: 控制异常控制
:param requestId: 请求id
:return: 推流管道
"""
st = time()
try:
if push_p is None:
height, width = frame.shape[0:2]
push_p = build_push_p(push_url, width, height, requestId)
push_p.stdin.write(frame.tostring())
return push_p
except ServiceException as s:
clear_push_p(push_p, requestId)
raise s
except Exception:
et = time() - st
logger.error("推流异常使用时间:{}, requestId: {}", et, requestId)
if et > 20:
logger.error("推流进管道异常:{}, requestId: {}", format_exc(), requestId)
raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
if p_push_status[0] == 0:
p_push_status[0] = time()
p_push_status[1] += 1
elif time() - p_push_status[0] <= 60:
p_push_status[1] += 1
p_push_status[0] = time()
elif time() - p_push_status[0] > 60:
p_push_status[1] = 1
p_push_status[0] = time()
logger.error("推流管道异常:{}, requestId: {}", format_exc(), requestId)
clear_push_p(push_p, requestId)
if p_push_status[1] > 5:
logger.error("推流进管道异常:{}, requestId: {}", format_exc(), requestId)
raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
return None
def clear_push_p(push_p, requestId):
if push_p:
try:
if push_p.stdin:
push_p.stdin.close()
push_p.terminate()
push_p.wait()
except Exception:
logger.error("推流管道异常:{}, requestId: {}", format_exc(), requestId)
def close_or_write_stream(or_video_file, requestId):
try:
if or_video_file:
or_video_file.release()
except Exception:
logger.info("关闭原视频写流管道异常:{}, requestId:{}", format_exc(), requestId)
def close_ai_write_stream(ai_video_file, requestId):
try:
if ai_video_file:
ai_video_file.release()
except Exception:
logger.info("关闭AI视频写流管道异常:{}, requestId:{}", format_exc(), requestId)
def close_all_p(push_p, or_video_file, ai_video_file, requestId):
logger.info("开始停止推流、写流管道requestId:{}", requestId)
clear_push_p(push_p, requestId)
close_or_write_stream(or_video_file, requestId)
close_ai_write_stream(ai_video_file, requestId)
logger.info("停止推流、写流管道完成requestId:{}", requestId)
def build_or_video(orFilePath, width, height, requestId):
or_video_file = None
try:
or_video_file = cv2.VideoWriter(orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
if or_video_file is None:
logger.error("or_video_file为空, requestId:{}", requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
return or_video_file
except ServiceException as s:
if or_video_file:
or_video_file.release()
logger.error("构建OR文件写对象异常: {}, requestId:{}", s.msg, requestId)
raise s
except Exception as e:
if or_video_file:
or_video_file.release()
logger.error("构建OR文件写对象异常: {}, requestId:{}", format_exc(), requestId)
raise e
def write_or_video(frame, orFilePath, or_video_file, or_write_status, requestId):
"""
:param frame: 当时视频帧
:param orFilePath: 原视频名称
:param or_video_file: 原视频本地写流对象
:param or_write_status: 原视频写流状态检查 [0, 0] 第一个参数时间,第二个参数重试的次数
:param requestId: 请求id
:return: 原视频本地写流对象
:desc 如果写流失败了, 不重试, 丢弃本次视频帧
"""
try:
if or_video_file is None:
height, width = frame.shape[0], frame.shape[1]
or_video_file = build_or_video(orFilePath, width, height, requestId)
or_video_file.write(frame)
return or_video_file
except ServiceException as s:
if or_video_file:
or_video_file.release()
raise s
except Exception as ex:
# 当第一次写视频帧到本地失败, 更新or_write_status的时间和重试次数
if or_write_status[0] == 0:
or_write_status[0] = time()
or_write_status[1] += 1
# 1分钟内失败重试次数更新, 1分钟中容忍小于5次的失败次数
elif time() - or_write_status[0] <= 60:
or_write_status[1] += 1
or_write_status[0] = time()
# 大于1分钟初始化检查数组
elif time() - or_write_status[0] > 60:
or_write_status[1] = 1
or_write_status[0] = time()
if or_write_status[1] > 5:
if or_video_file:
or_video_file.release()
logger.error("重新写入原视频视频到本地, 重试失败:{}, requestId: {}", format_exc(), requestId)
raise ex
return or_video_file
def build_ai_video(aiFilePath, width, height, requestId):
ai_video_file = None
try:
ai_video_file = cv2.VideoWriter(aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
if ai_video_file is None:
logger.error("ai_video_file为空, requestId:{}", requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
return ai_video_file
except ServiceException as s:
if ai_video_file:
ai_video_file.release()
logger.error("构建AI文件写对象异常: {}, requestId:{}", s.msg, requestId)
raise s
except Exception as e:
if ai_video_file:
ai_video_file.release()
logger.error("构建AI文件写对象异常: {}, requestId:{}", format_exc(), requestId)
raise e
def write_ai_video(frame, aiFilePath, ai_video_file, ai_write_status, requestId):
try:
if ai_video_file is None:
height, width = frame.shape[0], frame.shape[1]
ai_video_file = build_ai_video(aiFilePath, width, height, requestId)
ai_video_file.write(frame)
return ai_video_file
except ServiceException as s:
if ai_video_file:
ai_video_file.release()
raise s
except Exception as ex:
if ai_write_status[0] == 0:
ai_write_status[0] = time()
ai_write_status[1] += 1
# 大于1分钟初始化检查数组
elif time() - ai_write_status[0] > 60:
ai_write_status[1] = 1
ai_write_status[0] = time()
# 1分钟内失败重试次数更新, 1分钟中容忍小于5次的失败次数
elif time() - ai_write_status[0] <= 60:
ai_write_status[1] += 1
ai_write_status[0] = time()
if ai_write_status[1] > 5:
if ai_video_file:
ai_video_file.release()
logger.error("重新写入分析后的视频到本地,重试失败:{}, requestId: {}", format_exc(), requestId)
raise ex
return ai_video_file