tuoheng_alg_airport/util/Cv2Utils.py

395 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import subprocess as sp
import time
import cv2
import numpy as np
from loguru import logger
from alg_airport_ffmpeg.enums.ExceptionEnum import ExceptionType
from alg_airport_ffmpeg.exception.CustomerException import ServiceException
from alg_airport_ffmpeg.concurrency.CommonThread import Common
"""
推流工具
"""
class Cv2Util:
def __init__(self, pullUrl=None, pushUrl=None):
self.__pullUrl = pullUrl
self.__pushUrl = pushUrl
self.__push_stream = None
self.__pull_stream = None
self.__width = None
self.__height = None
self.__wh = None
self.__fps = None
self.__cap = None
def probe(self):
p = None
try:
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.__pullUrl]
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True)
out, err = p.communicate(timeout=7)
if p.returncode != 0:
# logger.error("获取视频信息异常: {}", err.stderr.decode(encoding='utf-8'))
return None
return json.loads(out.decode('utf-8'))
except Exception as e:
# logger.error("获取视频信息异常: {}", e)
return None
finally:
if p:
# if p.stdout:
# p.stdout.flush()
# p.stdout.close()
# if p.stderr:
# p.stderr.close()
p.terminate()
# parent_proc = psutil.Process(p.pid)
# for child_proc in parent_proc.children(recursive=True):
# child_proc.kill()
# parent_proc.kill()
# p.kill()
p.wait()
# logger.info("关闭获取视频管道完成!")
# 获取视频信息
def get_video_info(self):
try:
if self.__pullUrl is None or len(self.__pullUrl) == 0:
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
probe = self.probe()
if probe is None or probe.get("streams") is None:
return
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
if video_stream is None:
return
width = video_stream.get('width')
height = video_stream.get('height')
fps = video_stream.get('r_frame_rate')
self.__width = int(width)
self.__height = int(height)
self.__wh = int(width * height * 3)
up, down = str(fps).split('/')
self.__fps = int(eval(up) / eval(down))
logger.info("视频信息, width:{}|height:{}|fps:{}", self.__width, self.__height, self.__fps)
except ServiceException as s:
# logger.error("获取视频信息异常: {}", s.msg)
raise s
except Exception as e:
# logger.error("获取视频信息异常:{}", e)
raise e
def build_cap(self, args):
try:
pullUrl = args[0]
return cv2.VideoCapture(pullUrl)
except Exception as e:
logger.error("初始化cap异常: {}", e)
return None
# 构建 cv2
def build_cv2(self):
try:
if self.__cap is not None:
# logger.info("重试, 关闭cap")
self.__cap.release()
self.__cap = None
self.__fps = None
self.__width = None
self.__height = None
if self.__pullUrl is None or len(self.__pullUrl) == 0:
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
cap_thread = Common(timeout=7, func=self.build_cap, args=(self.__pullUrl,))
cap_thread.setDaemon(True)
cap_thread.start()
self.__cap = cap_thread.get_result()
if self.__cap is None:
return
if self.__cap.isOpened():
if self.__fps is None or self.__fps == 0:
self.__fps = int(self.__cap.get(cv2.CAP_PROP_FPS))
if self.__width is None or self.__width == 0:
self.__width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH))
if self.__height is None or self.__height == 0:
self.__height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info("fps:{}|height:{}|width:{}", self.__fps, self.__height, self.__width)
except ServiceException as s:
logger.error("构建cv2异常: {}", s.msg)
raise s
except Exception as e:
logger.error("初始化cv2异常{}", e)
raise e
def cv2_read(self):
result = None
try:
if self.__cap is None:
self.build_cv2()
if self.__cap.isOpened():
ret, frame = self.__cap.read()
if ret:
result = frame
del ret
del frame
except ServiceException as s:
raise s
except Exception as e:
logger.error("读流异常:{}", e)
raise e
finally:
if result is None:
self.__fps = None
self.__height = None
self.__width = None
if self.__cap:
self.__cap.release()
self.__cap = None
logger.info("关闭cv2")
return result
# 拉取视频
def build_pull_stream(self):
try:
if self.__pullUrl is None or len(self.__pullUrl) == 0:
logger.error("拉流地址不能为空!")
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
if self.__pull_stream:
logger.info("重试, 关闭拉流管道")
if self.__pull_stream.stdout:
self.__pull_stream.stdout.close()
self.__pull_stream.terminate()
self.__pull_stream.wait()
command = ['ffmpeg',
'-rtsp_transport', 'tcp',
'-i', self.__pullUrl,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-an',
'-']
# command = ['ffmpeg',
# '-re',
# '-y',
# '-c:v', 'h264_cuvid',
# '-resize', self.wah,
# '-i', self.pullUrl,
# '-f', 'rawvideo',
# '-an',
# '-']
self.__pull_stream = sp.Popen(command, stdout=sp.PIPE)
except ServiceException as s:
logger.error("构建拉流管道异常: {}", s.msg)
raise s
except Exception as e:
logger.error("构建拉流管道异常:{}", e)
raise e
def check_config(self):
if self.__fps is None or self.__width is None or self.__height is None:
return True
else:
return False
def read(self):
result = None
try:
if self.__pull_stream is None:
self.build_pull_stream()
in_bytes = self.__pull_stream.stdout.read(self.__wh)
if in_bytes is not None and len(in_bytes) > 0:
result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.__height), int(self.__width), 3]))
except ServiceException as s:
raise s
except Exception as e:
logger.error("读流异常:{}", e)
raise e
finally:
if result is None:
self.__fps = None
self.__height = None
self.__width = None
if self.__pull_stream:
if self.__pull_stream.stdout:
self.__pull_stream.stdout.close()
self.__pull_stream.terminate()
self.__pull_stream.wait()
logger.info("关闭拉流管道完成!")
self.__pull_stream = None
return result
# 关闭管道
def close(self):
if self.__pull_stream:
if self.__pull_stream.stdout:
self.__pull_stream.stdout.close()
self.__pull_stream.terminate()
self.__pull_stream.wait()
logger.info("关闭拉流管道完成!")
if self.__push_stream:
if self.__push_stream.stdin:
self.__push_stream.stdin.close()
self.__push_stream.terminate()
self.__push_stream.wait()
logger.info("关闭推流管道完成!")
if self.__cap:
self.__cap.release()
# 开始推流
def build_push_stream(self):
try:
if self.__push_stream:
logger.info("重试, 关闭管道, 重新开启新管道")
if self.__push_stream.stdin:
self.__push_stream.stdin.close()
self.__push_stream.terminate()
self.__push_stream.wait()
if self.__pushUrl is None or len(self.__pushUrl) == 0:
logger.error("推流地址不能为空!")
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
command = ['ffmpeg',
# '-loglevel', 'debug',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(int(self.__width), int(self.__height)),
'-r', str(self.__fps),
'-i', '-',
# '-g', str(self.__fps),
# '-maxrate', '15000k',
# '-minrate', '3000k',
# '-profile:v', 'high',
# '-level', '5.1',
# '-b:v', '4000k',
# '-crf', '26',
# '-bufsize', '4000k',
# '-c:v', 'libx264',
# '-tune', 'zerolatency',
# '-sc_threshold', '0',
# '-pix_fmt', 'yuv420p',
# "-an",
# '-preset', 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
]
for url in self.__pushUrl:
command.extend(['-f', 'flv',
'-g', str(self.__fps),
'-maxrate', '15000k',
'-minrate', '3000k',
'-b:v', '4000k',
'-bufsize', '4000k',
'-c:v', 'libx264',
'-tune', 'zerolatency',
'-sc_threshold', '0',
'-pix_fmt', 'yuv420p',
'-preset', 'fast',
"-an", "-y", url
])
self.__push_stream = sp.Popen(command, stdin=sp.PIPE, shell=False)
except ServiceException as s:
logger.error("构建推流管道异常: {}", s.msg)
raise s
except Exception as e:
logger.error("初始化推流管道异常:{}", e)
raise e
def push_stream_write(self, frame):
try:
if self.__push_stream is None:
self.build_push_stream()
self.__push_stream.stdin.write(frame.tostring())
except Exception as ex:
logger.error("推流异常:{}", ex)
current_retry_num = 0
while True:
try:
self.build_push_stream()
self.__push_stream.stdin.write(frame.tostring())
logger.info("推流重试成功, 当前重试次数: {}", current_retry_num)
break
except Exception as e:
current_retry_num += 1
logger.error("推流异常:{}, 开始重试, 当前重试次数:{}", e, current_retry_num)
time.sleep(1)
if current_retry_num > 1:
raise Exception("推流异常,请检查通道是否被占用!")
def close_push_stream(self):
if self.__push_stream:
self.__push_stream.terminate()
self.__push_stream.wait()
self.__push_stream = None
def is_push_stream_ok(self):
if self.__push_stream:
if self.__push_stream.poll() is not None:
return True
return False
# 开始推流
def start_push_stream(self):
try:
if self.__push_stream:
return
if self.__pullUrl is None or len(self.__pullUrl) == 0:
logger.error("拉流地址不能为空!")
raise ServiceException(ExceptionType.PUll_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PUll_STREAM_URL_EXCEPTION.value[1])
if self.__pushUrl is None or len(self.__pushUrl) == 0:
logger.error("推流地址不能为空!")
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
command = ['ffmpeg',
'-re',
'-rtsp_transport', 'tcp',
'-i', self.__pullUrl,
]
for url in self.__pushUrl:
command.extend(['-f', 'flv',
'-g', str(25),
'-c:v', 'copy',
"-an", "-y", url
])
self.__push_stream = sp.Popen(command, shell=False)
except ServiceException as s:
logger.error("构建推流管道异常: {}", s.msg)
raise s
except Exception as e:
logger.error("初始化推流管道异常:{}", e)
raise e
def is_video_stream(self, url):
p = None
try:
if url is None or len(url) == 0:
raise Exception("流地址不能为空!")
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', url]
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True)
out, err = p.communicate(timeout=7)
if p.returncode != 0:
return False
probe = json.loads(out.decode('utf-8'))
if probe is None or probe.get("streams") is None:
return False
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
if video_stream is None:
return False
return True
except ServiceException as s:
raise s
except Exception as e:
return False
finally:
if p:
p.terminate()
p.wait()
p = None