395 lines
16 KiB
Python
395 lines
16 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
import json
|
|||
|
|
import subprocess as sp
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
from loguru import logger
|
|||
|
|
|
|||
|
|
from alg_airport_ffmpeg.enums.ExceptionEnum import ExceptionType
|
|||
|
|
from alg_airport_ffmpeg.exception.CustomerException import ServiceException
|
|||
|
|
from alg_airport_ffmpeg.concurrency.CommonThread import Common
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
推流工具
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Cv2Util:
|
|||
|
|
|
|||
|
|
def __init__(self, pullUrl=None, pushUrl=None):
|
|||
|
|
self.__pullUrl = pullUrl
|
|||
|
|
self.__pushUrl = pushUrl
|
|||
|
|
self.__push_stream = None
|
|||
|
|
self.__pull_stream = None
|
|||
|
|
self.__width = None
|
|||
|
|
self.__height = None
|
|||
|
|
self.__wh = None
|
|||
|
|
self.__fps = None
|
|||
|
|
self.__cap = None
|
|||
|
|
|
|||
|
|
def probe(self):
|
|||
|
|
p = None
|
|||
|
|
try:
|
|||
|
|
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.__pullUrl]
|
|||
|
|
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True)
|
|||
|
|
out, err = p.communicate(timeout=7)
|
|||
|
|
if p.returncode != 0:
|
|||
|
|
# logger.error("获取视频信息异常: {}", err.stderr.decode(encoding='utf-8'))
|
|||
|
|
return None
|
|||
|
|
return json.loads(out.decode('utf-8'))
|
|||
|
|
except Exception as e:
|
|||
|
|
# logger.error("获取视频信息异常: {}", e)
|
|||
|
|
return None
|
|||
|
|
finally:
|
|||
|
|
if p:
|
|||
|
|
# if p.stdout:
|
|||
|
|
# p.stdout.flush()
|
|||
|
|
# p.stdout.close()
|
|||
|
|
# if p.stderr:
|
|||
|
|
# p.stderr.close()
|
|||
|
|
p.terminate()
|
|||
|
|
# parent_proc = psutil.Process(p.pid)
|
|||
|
|
# for child_proc in parent_proc.children(recursive=True):
|
|||
|
|
# child_proc.kill()
|
|||
|
|
# parent_proc.kill()
|
|||
|
|
# p.kill()
|
|||
|
|
p.wait()
|
|||
|
|
# logger.info("关闭获取视频管道完成!")
|
|||
|
|
|
|||
|
|
# 获取视频信息
|
|||
|
|
def get_video_info(self):
|
|||
|
|
try:
|
|||
|
|
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
|||
|
|
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
|||
|
|
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
|||
|
|
probe = self.probe()
|
|||
|
|
if probe is None or probe.get("streams") is None:
|
|||
|
|
return
|
|||
|
|
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
|
|||
|
|
if video_stream is None:
|
|||
|
|
return
|
|||
|
|
width = video_stream.get('width')
|
|||
|
|
height = video_stream.get('height')
|
|||
|
|
fps = video_stream.get('r_frame_rate')
|
|||
|
|
self.__width = int(width)
|
|||
|
|
self.__height = int(height)
|
|||
|
|
self.__wh = int(width * height * 3)
|
|||
|
|
up, down = str(fps).split('/')
|
|||
|
|
self.__fps = int(eval(up) / eval(down))
|
|||
|
|
logger.info("视频信息, width:{}|height:{}|fps:{}", self.__width, self.__height, self.__fps)
|
|||
|
|
except ServiceException as s:
|
|||
|
|
# logger.error("获取视频信息异常: {}", s.msg)
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
# logger.error("获取视频信息异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def build_cap(self, args):
|
|||
|
|
try:
|
|||
|
|
pullUrl = args[0]
|
|||
|
|
return cv2.VideoCapture(pullUrl)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("初始化cap异常: {}", e)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 构建 cv2
|
|||
|
|
def build_cv2(self):
|
|||
|
|
try:
|
|||
|
|
if self.__cap is not None:
|
|||
|
|
# logger.info("重试, 关闭cap")
|
|||
|
|
self.__cap.release()
|
|||
|
|
self.__cap = None
|
|||
|
|
self.__fps = None
|
|||
|
|
self.__width = None
|
|||
|
|
self.__height = None
|
|||
|
|
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
|||
|
|
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
|||
|
|
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
|||
|
|
cap_thread = Common(timeout=7, func=self.build_cap, args=(self.__pullUrl,))
|
|||
|
|
cap_thread.setDaemon(True)
|
|||
|
|
cap_thread.start()
|
|||
|
|
self.__cap = cap_thread.get_result()
|
|||
|
|
if self.__cap is None:
|
|||
|
|
return
|
|||
|
|
if self.__cap.isOpened():
|
|||
|
|
if self.__fps is None or self.__fps == 0:
|
|||
|
|
self.__fps = int(self.__cap.get(cv2.CAP_PROP_FPS))
|
|||
|
|
if self.__width is None or self.__width == 0:
|
|||
|
|
self.__width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|||
|
|
if self.__height is None or self.__height == 0:
|
|||
|
|
self.__height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|||
|
|
logger.info("fps:{}|height:{}|width:{}", self.__fps, self.__height, self.__width)
|
|||
|
|
except ServiceException as s:
|
|||
|
|
logger.error("构建cv2异常: {}", s.msg)
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("初始化cv2异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def cv2_read(self):
|
|||
|
|
result = None
|
|||
|
|
try:
|
|||
|
|
if self.__cap is None:
|
|||
|
|
self.build_cv2()
|
|||
|
|
if self.__cap.isOpened():
|
|||
|
|
ret, frame = self.__cap.read()
|
|||
|
|
if ret:
|
|||
|
|
result = frame
|
|||
|
|
del ret
|
|||
|
|
del frame
|
|||
|
|
except ServiceException as s:
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("读流异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
finally:
|
|||
|
|
if result is None:
|
|||
|
|
self.__fps = None
|
|||
|
|
self.__height = None
|
|||
|
|
self.__width = None
|
|||
|
|
if self.__cap:
|
|||
|
|
self.__cap.release()
|
|||
|
|
self.__cap = None
|
|||
|
|
logger.info("关闭cv2!")
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
# 拉取视频
|
|||
|
|
def build_pull_stream(self):
|
|||
|
|
try:
|
|||
|
|
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
|||
|
|
logger.error("拉流地址不能为空!")
|
|||
|
|
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
|||
|
|
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
|||
|
|
if self.__pull_stream:
|
|||
|
|
logger.info("重试, 关闭拉流管道")
|
|||
|
|
if self.__pull_stream.stdout:
|
|||
|
|
self.__pull_stream.stdout.close()
|
|||
|
|
self.__pull_stream.terminate()
|
|||
|
|
self.__pull_stream.wait()
|
|||
|
|
command = ['ffmpeg',
|
|||
|
|
'-rtsp_transport', 'tcp',
|
|||
|
|
'-i', self.__pullUrl,
|
|||
|
|
'-f', 'rawvideo',
|
|||
|
|
'-pix_fmt', 'bgr24',
|
|||
|
|
'-an',
|
|||
|
|
'-']
|
|||
|
|
# command = ['ffmpeg',
|
|||
|
|
# '-re',
|
|||
|
|
# '-y',
|
|||
|
|
# '-c:v', 'h264_cuvid',
|
|||
|
|
# '-resize', self.wah,
|
|||
|
|
# '-i', self.pullUrl,
|
|||
|
|
# '-f', 'rawvideo',
|
|||
|
|
# '-an',
|
|||
|
|
# '-']
|
|||
|
|
self.__pull_stream = sp.Popen(command, stdout=sp.PIPE)
|
|||
|
|
except ServiceException as s:
|
|||
|
|
logger.error("构建拉流管道异常: {}", s.msg)
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("构建拉流管道异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def check_config(self):
|
|||
|
|
if self.__fps is None or self.__width is None or self.__height is None:
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def read(self):
|
|||
|
|
result = None
|
|||
|
|
try:
|
|||
|
|
if self.__pull_stream is None:
|
|||
|
|
self.build_pull_stream()
|
|||
|
|
in_bytes = self.__pull_stream.stdout.read(self.__wh)
|
|||
|
|
if in_bytes is not None and len(in_bytes) > 0:
|
|||
|
|
result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.__height), int(self.__width), 3]))
|
|||
|
|
except ServiceException as s:
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("读流异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
finally:
|
|||
|
|
if result is None:
|
|||
|
|
self.__fps = None
|
|||
|
|
self.__height = None
|
|||
|
|
self.__width = None
|
|||
|
|
if self.__pull_stream:
|
|||
|
|
if self.__pull_stream.stdout:
|
|||
|
|
self.__pull_stream.stdout.close()
|
|||
|
|
self.__pull_stream.terminate()
|
|||
|
|
self.__pull_stream.wait()
|
|||
|
|
logger.info("关闭拉流管道完成!")
|
|||
|
|
self.__pull_stream = None
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
# 关闭管道
|
|||
|
|
def close(self):
|
|||
|
|
if self.__pull_stream:
|
|||
|
|
if self.__pull_stream.stdout:
|
|||
|
|
self.__pull_stream.stdout.close()
|
|||
|
|
self.__pull_stream.terminate()
|
|||
|
|
self.__pull_stream.wait()
|
|||
|
|
logger.info("关闭拉流管道完成!")
|
|||
|
|
if self.__push_stream:
|
|||
|
|
if self.__push_stream.stdin:
|
|||
|
|
self.__push_stream.stdin.close()
|
|||
|
|
self.__push_stream.terminate()
|
|||
|
|
self.__push_stream.wait()
|
|||
|
|
logger.info("关闭推流管道完成!")
|
|||
|
|
if self.__cap:
|
|||
|
|
self.__cap.release()
|
|||
|
|
|
|||
|
|
# 开始推流
|
|||
|
|
def build_push_stream(self):
|
|||
|
|
try:
|
|||
|
|
if self.__push_stream:
|
|||
|
|
logger.info("重试, 关闭管道, 重新开启新管道")
|
|||
|
|
if self.__push_stream.stdin:
|
|||
|
|
self.__push_stream.stdin.close()
|
|||
|
|
self.__push_stream.terminate()
|
|||
|
|
self.__push_stream.wait()
|
|||
|
|
if self.__pushUrl is None or len(self.__pushUrl) == 0:
|
|||
|
|
logger.error("推流地址不能为空!")
|
|||
|
|
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
|
|||
|
|
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
|
|||
|
|
command = ['ffmpeg',
|
|||
|
|
# '-loglevel', 'debug',
|
|||
|
|
'-f', 'rawvideo',
|
|||
|
|
'-vcodec', 'rawvideo',
|
|||
|
|
'-pix_fmt', 'bgr24',
|
|||
|
|
'-s', "{}x{}".format(int(self.__width), int(self.__height)),
|
|||
|
|
'-r', str(self.__fps),
|
|||
|
|
'-i', '-',
|
|||
|
|
# '-g', str(self.__fps),
|
|||
|
|
# '-maxrate', '15000k',
|
|||
|
|
# '-minrate', '3000k',
|
|||
|
|
# '-profile:v', 'high',
|
|||
|
|
# '-level', '5.1',
|
|||
|
|
# '-b:v', '4000k',
|
|||
|
|
# '-crf', '26',
|
|||
|
|
# '-bufsize', '4000k',
|
|||
|
|
# '-c:v', 'libx264',
|
|||
|
|
# '-tune', 'zerolatency',
|
|||
|
|
# '-sc_threshold', '0',
|
|||
|
|
# '-pix_fmt', 'yuv420p',
|
|||
|
|
# "-an",
|
|||
|
|
# '-preset', 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
|
|||
|
|
# superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
|
|||
|
|
]
|
|||
|
|
for url in self.__pushUrl:
|
|||
|
|
command.extend(['-f', 'flv',
|
|||
|
|
'-g', str(self.__fps),
|
|||
|
|
'-maxrate', '15000k',
|
|||
|
|
'-minrate', '3000k',
|
|||
|
|
'-b:v', '4000k',
|
|||
|
|
'-bufsize', '4000k',
|
|||
|
|
'-c:v', 'libx264',
|
|||
|
|
'-tune', 'zerolatency',
|
|||
|
|
'-sc_threshold', '0',
|
|||
|
|
'-pix_fmt', 'yuv420p',
|
|||
|
|
'-preset', 'fast',
|
|||
|
|
"-an", "-y", url
|
|||
|
|
])
|
|||
|
|
self.__push_stream = sp.Popen(command, stdin=sp.PIPE, shell=False)
|
|||
|
|
except ServiceException as s:
|
|||
|
|
logger.error("构建推流管道异常: {}", s.msg)
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("初始化推流管道异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def push_stream_write(self, frame):
|
|||
|
|
try:
|
|||
|
|
if self.__push_stream is None:
|
|||
|
|
self.build_push_stream()
|
|||
|
|
self.__push_stream.stdin.write(frame.tostring())
|
|||
|
|
except Exception as ex:
|
|||
|
|
logger.error("推流异常:{}", ex)
|
|||
|
|
current_retry_num = 0
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
self.build_push_stream()
|
|||
|
|
self.__push_stream.stdin.write(frame.tostring())
|
|||
|
|
logger.info("推流重试成功, 当前重试次数: {}", current_retry_num)
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
current_retry_num += 1
|
|||
|
|
logger.error("推流异常:{}, 开始重试, 当前重试次数:{}", e, current_retry_num)
|
|||
|
|
time.sleep(1)
|
|||
|
|
if current_retry_num > 1:
|
|||
|
|
raise Exception("推流异常,请检查通道是否被占用!")
|
|||
|
|
|
|||
|
|
def close_push_stream(self):
|
|||
|
|
if self.__push_stream:
|
|||
|
|
self.__push_stream.terminate()
|
|||
|
|
self.__push_stream.wait()
|
|||
|
|
self.__push_stream = None
|
|||
|
|
|
|||
|
|
def is_push_stream_ok(self):
|
|||
|
|
if self.__push_stream:
|
|||
|
|
if self.__push_stream.poll() is not None:
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 开始推流
|
|||
|
|
def start_push_stream(self):
|
|||
|
|
try:
|
|||
|
|
if self.__push_stream:
|
|||
|
|
return
|
|||
|
|
if self.__pullUrl is None or len(self.__pullUrl) == 0:
|
|||
|
|
logger.error("拉流地址不能为空!")
|
|||
|
|
raise ServiceException(ExceptionType.PUll_STREAM_URL_EXCEPTION.value[0],
|
|||
|
|
ExceptionType.PUll_STREAM_URL_EXCEPTION.value[1])
|
|||
|
|
if self.__pushUrl is None or len(self.__pushUrl) == 0:
|
|||
|
|
logger.error("推流地址不能为空!")
|
|||
|
|
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
|
|||
|
|
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
|
|||
|
|
command = ['ffmpeg',
|
|||
|
|
'-re',
|
|||
|
|
'-rtsp_transport', 'tcp',
|
|||
|
|
'-i', self.__pullUrl,
|
|||
|
|
]
|
|||
|
|
for url in self.__pushUrl:
|
|||
|
|
command.extend(['-f', 'flv',
|
|||
|
|
'-g', str(25),
|
|||
|
|
'-c:v', 'copy',
|
|||
|
|
"-an", "-y", url
|
|||
|
|
])
|
|||
|
|
self.__push_stream = sp.Popen(command, shell=False)
|
|||
|
|
except ServiceException as s:
|
|||
|
|
logger.error("构建推流管道异常: {}", s.msg)
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("初始化推流管道异常:{}", e)
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def is_video_stream(self, url):
|
|||
|
|
p = None
|
|||
|
|
try:
|
|||
|
|
if url is None or len(url) == 0:
|
|||
|
|
raise Exception("流地址不能为空!")
|
|||
|
|
args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', url]
|
|||
|
|
p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, close_fds=True)
|
|||
|
|
out, err = p.communicate(timeout=7)
|
|||
|
|
if p.returncode != 0:
|
|||
|
|
return False
|
|||
|
|
probe = json.loads(out.decode('utf-8'))
|
|||
|
|
if probe is None or probe.get("streams") is None:
|
|||
|
|
return False
|
|||
|
|
video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
|
|||
|
|
if video_stream is None:
|
|||
|
|
return False
|
|||
|
|
return True
|
|||
|
|
except ServiceException as s:
|
|||
|
|
raise s
|
|||
|
|
except Exception as e:
|
|||
|
|
return False
|
|||
|
|
finally:
|
|||
|
|
if p:
|
|||
|
|
p.terminate()
|
|||
|
|
p.wait()
|
|||
|
|
p = None
|