75 lines
3.5 KiB
Python
75 lines
3.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
import time
|
|
from traceback import format_exc
|
|
|
|
import subprocess as sp
|
|
|
|
from loguru import logger
|
|
from exception.CustomerException import ServiceException
|
|
from enums.ExceptionEnum import ExceptionType
|
|
|
|
|
|
class PushStreamUtil:
|
|
__slots__ = ('pullUrl', 'pushUrl', 'requestId', "push_stream_sp", "start_time")
|
|
|
|
def __init__(self, pullUrl=None, pushUrl=None, requestId=None):
|
|
self.pullUrl = pullUrl
|
|
self.pushUrl = pushUrl
|
|
self.requestId = requestId
|
|
self.push_stream_sp = None
|
|
self.start_time = time.time()
|
|
|
|
def close_push_stream_sp(self):
|
|
logger.info("开始关闭推流管道, requestId:{}", self.requestId)
|
|
if self.push_stream_sp:
|
|
logger.info("尝试关闭推流管道, requestId:{}", self.requestId)
|
|
self.push_stream_sp.terminate()
|
|
self.push_stream_sp.wait()
|
|
if self.push_stream_sp and self.push_stream_sp.poll() is None:
|
|
logger.error("尝试关闭推流管道失败, requestId:{}", self.requestId)
|
|
self.push_stream_sp.communicate(input=b"q\n", timeout=30)
|
|
if self.push_stream_sp and self.push_stream_sp.poll() is not None:
|
|
logger.info("尝试关闭推流管道成功, requestId:{}", self.requestId)
|
|
self.push_stream_sp = None
|
|
if self.push_stream_sp and self.push_stream_sp.poll() is not None:
|
|
logger.info("尝试关闭推流管道成功, requestId:{}", self.requestId)
|
|
self.push_stream_sp = None
|
|
else:
|
|
logger.info("推流管道已关闭, requestId:{}", self.requestId)
|
|
|
|
# 构建 cv2
|
|
def start_push_stream(self):
|
|
try:
|
|
if self.push_stream_sp:
|
|
return
|
|
if self.pullUrl is None or len(self.pullUrl) == 0:
|
|
logger.error("推流地址不能为空, requestId:{}", self.requestId)
|
|
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
|
|
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
|
|
if self.pushUrl is None or len(self.pushUrl) == 0:
|
|
logger.error("推流地址不能为空, requestId:{}", self.requestId)
|
|
raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
|
|
ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
|
|
command = ['ffmpeg', '-re', '-y', "-an", "-hide_banner"]
|
|
if self.pullUrl.startswith("rtsp://"):
|
|
command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp'])
|
|
if self.pullUrl.startswith("rtmp://") or self.pullUrl.startswith("http"):
|
|
command.extend(['-rw_timeout', '20000000'])
|
|
command.extend(['-i', self.pullUrl,
|
|
'-c:v', 'copy',
|
|
'-b:v', '4000k',
|
|
'-bufsize', '4000k',
|
|
'-f', 'flv',
|
|
self.pushUrl])
|
|
logger.info("推流指令:{}, requestId:{}", command, self.requestId)
|
|
self.push_stream_sp = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=False)
|
|
except ServiceException as s:
|
|
self.close_push_stream_sp()
|
|
logger.error("构建p管道异常: {}, requestId:{}", s.msg, self.requestId)
|
|
raise s
|
|
except Exception as e:
|
|
self.close_push_stream_sp()
|
|
logger.error("初始化推流管道异常:{}, requestId:{}", format_exc(), self.requestId)
|
|
raise e
|
|
|