395 lines
16 KiB
Python
395 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
||
import json
|
||
import subprocess as sp
|
||
import time
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from loguru import logger
|
||
|
||
from alg_airport_ffmpeg.enums.ExceptionEnum import ExceptionType
|
||
from alg_airport_ffmpeg.exception.CustomerException import ServiceException
|
||
from alg_airport_ffmpeg.concurrency.CommonThread import Common
|
||
|
||
"""
|
||
推流工具
|
||
"""
|
||
|
||
|
||
class Cv2Util:
|
||
|
||
def __init__(self, pullUrl=None, pushUrl=None):
|
||
self.__pullUrl = pullUrl
|
||
self.__pushUrl = pushUrl
|
||
self.__push_stream = None
|
||
self.__pull_stream = None
|
||
self.__width = None
|
||
self.__height = None
|
||
self.__wh = None
|
||
self.__fps = None
|
||
self.__cap = None
|
||
|
||
def probe(self):
|
||
p = None
|
||
try:
|
||
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.__pullUrl]
|
||
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True)
|
||
out, err = p.communicate(timeout=7)
|
||
if p.returncode != 0:
|
||
# logger.error("获取视频信息异常: {}", err.stderr.decode(encoding='utf-8'))
|
||
return None
|
||
return json.loads(out.decode('utf-8'))
|
||
except Exception as e:
|
||
# logger.error("获取视频信息异常: {}", e)
|
||
return None
|
||
finally:
|
||
if p:
|
||
# if p.stdout:
|
||
# p.stdout.flush()
|
||
# p.stdout.close()
|
||
# if p.stderr:
|
||
# p.stderr.close()
|
||
p.terminate()
|
||
# parent_proc = psutil.Process(p.pid)
|
||
# for child_proc in parent_proc.children(recursive=True):
|
||
# child_proc.kill()
|
||
# parent_proc.kill()
|
||
# p.kill()
|
||
p.wait()
|
||
# logger.info("关闭获取视频管道完成!")
|
||
|
||
# 获取视频信息
|
||
def get_video_info(self):
|
||
try:
|
||
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
||
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
||
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
||
probe = self.probe()
|
||
if probe is None or probe.get("streams") is None:
|
||
return
|
||
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
|
||
if video_stream is None:
|
||
return
|
||
width = video_stream.get('width')
|
||
height = video_stream.get('height')
|
||
fps = video_stream.get('r_frame_rate')
|
||
self.__width = int(width)
|
||
self.__height = int(height)
|
||
self.__wh = int(width * height * 3)
|
||
up, down = str(fps).split('/')
|
||
self.__fps = int(eval(up) / eval(down))
|
||
logger.info("视频信息, width:{}|height:{}|fps:{}", self.__width, self.__height, self.__fps)
|
||
except ServiceException as s:
|
||
# logger.error("获取视频信息异常: {}", s.msg)
|
||
raise s
|
||
except Exception as e:
|
||
# logger.error("获取视频信息异常:{}", e)
|
||
raise e
|
||
|
||
def build_cap(self, args):
|
||
try:
|
||
pullUrl = args[0]
|
||
return cv2.VideoCapture(pullUrl)
|
||
except Exception as e:
|
||
logger.error("初始化cap异常: {}", e)
|
||
return None
|
||
|
||
# 构建 cv2
|
||
def build_cv2(self):
|
||
try:
|
||
if self.__cap is not None:
|
||
# logger.info("重试, 关闭cap")
|
||
self.__cap.release()
|
||
self.__cap = None
|
||
self.__fps = None
|
||
self.__width = None
|
||
self.__height = None
|
||
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
||
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
||
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
||
cap_thread = Common(timeout=7, func=self.build_cap, args=(self.__pullUrl,))
|
||
cap_thread.setDaemon(True)
|
||
cap_thread.start()
|
||
self.__cap = cap_thread.get_result()
|
||
if self.__cap is None:
|
||
return
|
||
if self.__cap.isOpened():
|
||
if self.__fps is None or self.__fps == 0:
|
||
self.__fps = int(self.__cap.get(cv2.CAP_PROP_FPS))
|
||
if self.__width is None or self.__width == 0:
|
||
self.__width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
if self.__height is None or self.__height == 0:
|
||
self.__height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
logger.info("fps:{}|height:{}|width:{}", self.__fps, self.__height, self.__width)
|
||
except ServiceException as s:
|
||
logger.error("构建cv2异常: {}", s.msg)
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("初始化cv2异常:{}", e)
|
||
raise e
|
||
|
||
def cv2_read(self):
|
||
result = None
|
||
try:
|
||
if self.__cap is None:
|
||
self.build_cv2()
|
||
if self.__cap.isOpened():
|
||
ret, frame = self.__cap.read()
|
||
if ret:
|
||
result = frame
|
||
del ret
|
||
del frame
|
||
except ServiceException as s:
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("读流异常:{}", e)
|
||
raise e
|
||
finally:
|
||
if result is None:
|
||
self.__fps = None
|
||
self.__height = None
|
||
self.__width = None
|
||
if self.__cap:
|
||
self.__cap.release()
|
||
self.__cap = None
|
||
logger.info("关闭cv2!")
|
||
return result
|
||
|
||
# 拉取视频
|
||
def build_pull_stream(self):
|
||
try:
|
||
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
||
logger.error("拉流地址不能为空!")
|
||
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
||
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
||
if self.__pull_stream:
|
||
logger.info("重试, 关闭拉流管道")
|
||
if self.__pull_stream.stdout:
|
||
self.__pull_stream.stdout.close()
|
||
self.__pull_stream.terminate()
|
||
self.__pull_stream.wait()
|
||
command = ['ffmpeg',
|
||
'-rtsp_transport', 'tcp',
|
||
'-i', self.__pullUrl,
|
||
'-f', 'rawvideo',
|
||
'-pix_fmt', 'bgr24',
|
||
'-an',
|
||
'-']
|
||
# command = ['ffmpeg',
|
||
# '-re',
|
||
# '-y',
|
||
# '-c:v', 'h264_cuvid',
|
||
# '-resize', self.wah,
|
||
# '-i', self.pullUrl,
|
||
# '-f', 'rawvideo',
|
||
# '-an',
|
||
# '-']
|
||
self.__pull_stream = sp.Popen(command, stdout=sp.PIPE)
|
||
except ServiceException as s:
|
||
logger.error("构建拉流管道异常: {}", s.msg)
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("构建拉流管道异常:{}", e)
|
||
raise e
|
||
|
||
def check_config(self):
|
||
if self.__fps is None or self.__width is None or self.__height is None:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def read(self):
|
||
result = None
|
||
try:
|
||
if self.__pull_stream is None:
|
||
self.build_pull_stream()
|
||
in_bytes = self.__pull_stream.stdout.read(self.__wh)
|
||
if in_bytes is not None and len(in_bytes) > 0:
|
||
result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.__height), int(self.__width), 3]))
|
||
except ServiceException as s:
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("读流异常:{}", e)
|
||
raise e
|
||
finally:
|
||
if result is None:
|
||
self.__fps = None
|
||
self.__height = None
|
||
self.__width = None
|
||
if self.__pull_stream:
|
||
if self.__pull_stream.stdout:
|
||
self.__pull_stream.stdout.close()
|
||
self.__pull_stream.terminate()
|
||
self.__pull_stream.wait()
|
||
logger.info("关闭拉流管道完成!")
|
||
self.__pull_stream = None
|
||
return result
|
||
|
||
# 关闭管道
|
||
def close(self):
|
||
if self.__pull_stream:
|
||
if self.__pull_stream.stdout:
|
||
self.__pull_stream.stdout.close()
|
||
self.__pull_stream.terminate()
|
||
self.__pull_stream.wait()
|
||
logger.info("关闭拉流管道完成!")
|
||
if self.__push_stream:
|
||
if self.__push_stream.stdin:
|
||
self.__push_stream.stdin.close()
|
||
self.__push_stream.terminate()
|
||
self.__push_stream.wait()
|
||
logger.info("关闭推流管道完成!")
|
||
if self.__cap:
|
||
self.__cap.release()
|
||
|
||
# 开始推流
|
||
def build_push_stream(self):
|
||
try:
|
||
if self.__push_stream:
|
||
logger.info("重试, 关闭管道, 重新开启新管道")
|
||
if self.__push_stream.stdin:
|
||
self.__push_stream.stdin.close()
|
||
self.__push_stream.terminate()
|
||
self.__push_stream.wait()
|
||
if self.__pushUrl is None or len(self.__pushUrl) == 0:
|
||
logger.error("推流地址不能为空!")
|
||
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
|
||
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
|
||
command = ['ffmpeg',
|
||
# '-loglevel', 'debug',
|
||
'-f', 'rawvideo',
|
||
'-vcodec', 'rawvideo',
|
||
'-pix_fmt', 'bgr24',
|
||
'-s', "{}x{}".format(int(self.__width), int(self.__height)),
|
||
'-r', str(self.__fps),
|
||
'-i', '-',
|
||
# '-g', str(self.__fps),
|
||
# '-maxrate', '15000k',
|
||
# '-minrate', '3000k',
|
||
# '-profile:v', 'high',
|
||
# '-level', '5.1',
|
||
# '-b:v', '4000k',
|
||
# '-crf', '26',
|
||
# '-bufsize', '4000k',
|
||
# '-c:v', 'libx264',
|
||
# '-tune', 'zerolatency',
|
||
# '-sc_threshold', '0',
|
||
# '-pix_fmt', 'yuv420p',
|
||
# "-an",
|
||
# '-preset', 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
|
||
# superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
|
||
]
|
||
for url in self.__pushUrl:
|
||
command.extend(['-f', 'flv',
|
||
'-g', str(self.__fps),
|
||
'-maxrate', '15000k',
|
||
'-minrate', '3000k',
|
||
'-b:v', '4000k',
|
||
'-bufsize', '4000k',
|
||
'-c:v', 'libx264',
|
||
'-tune', 'zerolatency',
|
||
'-sc_threshold', '0',
|
||
'-pix_fmt', 'yuv420p',
|
||
'-preset', 'fast',
|
||
"-an", "-y", url
|
||
])
|
||
self.__push_stream = sp.Popen(command, stdin=sp.PIPE, shell=False)
|
||
except ServiceException as s:
|
||
logger.error("构建推流管道异常: {}", s.msg)
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("初始化推流管道异常:{}", e)
|
||
raise e
|
||
|
||
def push_stream_write(self, frame):
|
||
try:
|
||
if self.__push_stream is None:
|
||
self.build_push_stream()
|
||
self.__push_stream.stdin.write(frame.tostring())
|
||
except Exception as ex:
|
||
logger.error("推流异常:{}", ex)
|
||
current_retry_num = 0
|
||
while True:
|
||
try:
|
||
self.build_push_stream()
|
||
self.__push_stream.stdin.write(frame.tostring())
|
||
logger.info("推流重试成功, 当前重试次数: {}", current_retry_num)
|
||
break
|
||
except Exception as e:
|
||
current_retry_num += 1
|
||
logger.error("推流异常:{}, 开始重试, 当前重试次数:{}", e, current_retry_num)
|
||
time.sleep(1)
|
||
if current_retry_num > 1:
|
||
raise Exception("推流异常,请检查通道是否被占用!")
|
||
|
||
def close_push_stream(self):
|
||
if self.__push_stream:
|
||
self.__push_stream.terminate()
|
||
self.__push_stream.wait()
|
||
self.__push_stream = None
|
||
|
||
def is_push_stream_ok(self):
|
||
if self.__push_stream:
|
||
if self.__push_stream.poll() is not None:
|
||
return True
|
||
return False
|
||
|
||
# 开始推流
|
||
def start_push_stream(self):
|
||
try:
|
||
if self.__push_stream:
|
||
return
|
||
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
||
logger.error("拉流地址不能为空!")
|
||
raise ServiceException(ExceptionType.PUll_STREAM_URL_EXCEPTION.value[0],
|
||
ExceptionType.PUll_STREAM_URL_EXCEPTION.value[1])
|
||
if self.__pushUrl is None or len(self.__pushUrl) == 0:
|
||
logger.error("推流地址不能为空!")
|
||
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
|
||
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
|
||
command = ['ffmpeg',
|
||
'-re',
|
||
'-rtsp_transport', 'tcp',
|
||
'-i', self.__pullUrl,
|
||
]
|
||
for url in self.__pushUrl:
|
||
command.extend(['-f', 'flv',
|
||
'-g', str(25),
|
||
'-c:v', 'copy',
|
||
"-an", "-y", url
|
||
])
|
||
self.__push_stream = sp.Popen(command, shell=False)
|
||
except ServiceException as s:
|
||
logger.error("构建推流管道异常: {}", s.msg)
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("初始化推流管道异常:{}", e)
|
||
raise e
|
||
|
||
def is_video_stream(self, url):
|
||
p = None
|
||
try:
|
||
if url is None or len(url) == 0:
|
||
raise Exception("流地址不能为空!")
|
||
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', url]
|
||
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True)
|
||
out, err = p.communicate(timeout=7)
|
||
if p.returncode != 0:
|
||
return False
|
||
probe = json.loads(out.decode('utf-8'))
|
||
if probe is None or probe.get("streams") is None:
|
||
return False
|
||
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
|
||
if video_stream is None:
|
||
return False
|
||
return True
|
||
except ServiceException as s:
|
||
raise s
|
||
except Exception as e:
|
||
return False
|
||
finally:
|
||
if p:
|
||
p.terminate()
|
||
p.wait()
|
||
p = None
|