# -*- coding: utf-8 -*- import time from traceback import format_exc import subprocess as sp from loguru import logger from exception.CustomerException import ServiceException from enums.ExceptionEnum import ExceptionType class PushStreamUtil: __slots__ = ('pullUrl', 'pushUrl', 'requestId', "status", "push_stream_sp", "start_time") def __init__(self, pullUrl=None, pushUrl=None, requestId=None): self.pullUrl = pullUrl self.pushUrl = pushUrl self.requestId = requestId self.status = True self.push_stream_sp = None self.start_time = time.time() def close_push_stream_sp(self): logger.info("开始关闭推流管道, requestId:{}", self.requestId) if self.push_stream_sp: logger.info("尝试关闭推流管道, requestId:{}", self.requestId) self.push_stream_sp.terminate() self.push_stream_sp.wait() if self.push_stream_sp and self.push_stream_sp.poll() is None: logger.error("尝试关闭推流管道失败, requestId:{}", self.requestId) # self.push_stream_sp.communicate(input=b"q\n", timeout=30) if self.push_stream_sp and self.push_stream_sp.poll() is not None: logger.info("尝试关闭推流管道成功, requestId:{}", self.requestId) self.push_stream_sp = None else: logger.info("推流管道已关闭, requestId:{}", self.requestId) # 构建 cv2 def start_push_stream(self): try: if self.push_stream_sp: return if self.pullUrl is None or len(self.pullUrl) == 0: logger.error("推流地址不能为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0], ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1]) if self.pushUrl is None or len(self.pushUrl) == 0: logger.error("推流地址不能为空, requestId:{}", self.requestId) raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1]) command = ['ffmpeg', '-re', '-y', "-an", "-hide_banner"] if self.pullUrl.startswith("rtsp://"): command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp']) if self.pullUrl.startswith("rtmp://") or self.pullUrl.startswith("http"): command.extend(['-rw_timeout', '20000000']) command.extend(['-i', self.pullUrl, '-c:v', 'copy', '-b:v', '4000k', '-bufsize', '4000k', '-f', 'flv', self.pushUrl]) logger.info("推流指令:{}, requestId:{}", command, self.requestId) self.push_stream_sp = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=False) except ServiceException as s: self.close_push_stream_sp() logger.error("构建p管道异常: {}, requestId:{}", s.msg, self.requestId) raise s except Exception as e: self.close_push_stream_sp() logger.error("初始化推流管道异常:{}, requestId:{}", format_exc(), self.requestId) raise e