|
- # -*- coding: utf-8 -*-
- import os
-
- from enums.ExceptionEnum import StreamStrExceptionType
-
- os.environ['PATH'] = "./ffmpeg/bin"
- from traceback import format_exc
- import subprocess as sp
- from loguru import logger
- from exception.CustomerException import ServiceException
-
-
- class PushStreamUtil:
- __slots__ = ('pullUrl', 'pushUrl', "push_stream_sp", 'status')
-
- def __init__(self, pullUrl=None, pushUrl=None):
- self.pullUrl = pullUrl
- self.pushUrl = pushUrl
- self.push_stream_sp = None
- self.status = True
-
- def set_url(self, pullUrl, pushUrl):
- if pullUrl is not None and len(pullUrl) > 0:
- self.pullUrl = pullUrl
- if pushUrl is not None and len(pushUrl) > 0:
- self.pushUrl = pushUrl
-
- def start_push_stream(self):
- try:
- if self.push_stream_sp:
- return
- if self.pullUrl is None or len(self.pullUrl) == 0:
- logger.error("拉流地址不能为空!")
- raise ServiceException(StreamStrExceptionType.PULL_STREAM_URL_IS_NULL.value[0],
- StreamStrExceptionType.PULL_STREAM_URL_IS_NULL.value[1])
- if self.pushUrl is None or len(self.pushUrl) == 0:
- logger.error("推流地址不能为空!")
- raise ServiceException(StreamStrExceptionType.PUSH_STREAM_URL_IS_NULL.value[0],
- StreamStrExceptionType.PUSH_STREAM_URL_IS_NULL.value[1])
- command = ['ffmpeg', '-re', '-y', "-an", "-hide_banner"]
- if self.pullUrl.startswith("rtsp://"):
- command.extend(['-timeout', '15000000', '-rtsp_transport', 'tcp'])
- if self.pullUrl.startswith("http") or self.pullUrl.startswith("rtmp"):
- command.extend(['-rw_timeout', '17000000'])
- command.extend(['-i', self.pullUrl, '-b:v', '4000k', '-c:v', 'copy', '-f', 'flv', self.pushUrl])
- logger.info("推流指令: {}", " ".join(command))
- self.push_stream_sp = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=False)
- except ServiceException as s:
- logger.error("创建推流进程异常: {}", s.msg)
- self.close_push_stream_p()
- raise s
- except Exception as e:
- logger.error("创建推流管道异常:{}", format_exc())
- self.close_push_stream_p()
- raise e
-
- def close_push_stream_p(self):
- if self.push_stream_sp:
- self.push_stream_sp.terminate()
- self.push_stream_sp.wait()
- if self.push_stream_sp and self.push_stream_sp.poll() is None:
- logger.error("关闭推流管道异常!")
- self.push_stream_sp.communicate(input=b"q\n")
- if self.push_stream_sp and self.push_stream_sp.poll() is None:
- raise ServiceException(StreamStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- StreamStrExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- self.push_stream_sp = None
- logger.info("关闭推流管道异常成功!")
-
- # def push(push1):
- # push1.start_push_stream()
- # out, err = push1.push_stream_sp.communicate()
- # print("222222111111111111111111111111111111111")
- # if push1.push_stream_sp.returncode != 0:
- # print("11111111111111111111", out, err)
- #
- #
- # if __name__ == "__main__":
- # pullUrl = "rtsp://localhost:8554/live"
- # pushUrl = "rtmp://live.push.t-aaron.com/live/THSAr"
- # push1 = PushStreamUtil(pullUrl, pushUrl)
- # aa = Thread(target=push, args=(push1,))
- # aa.setDaemon(True)
- # aa.start()
- # # push1.start_push_stream()
- # time.sleep(30)
- # print("11111111111111", push1.push_stream_sp.poll())
- #
- # # out, err = push1.push_stream_sp.communicate(input=b"q\n")
- # print("222222222222222222222222", push1.push_stream_sp.poll())
- # # push1.push_stream_sp.stdin.write(b"q\n")
- # # push1.push_stream_sp.kill()
- # push1.push_stream_sp.terminate()
- # push1.push_stream_sp.wait()
- # time.sleep(5)
- # print("3333333333333333333333333", push1.push_stream_sp.poll())
- # time.sleep(10)
|