You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
3.5KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from traceback import format_exc
  4. import subprocess as sp
  5. from loguru import logger
  6. from exception.CustomerException import ServiceException
  7. from enums.ExceptionEnum import ExceptionType
  8. class PushStreamUtil:
  9. __slots__ = ('pullUrl', 'pushUrl', 'requestId', "push_stream_sp", "start_time")
  10. def __init__(self, pullUrl=None, pushUrl=None, requestId=None):
  11. self.pullUrl = pullUrl
  12. self.pushUrl = pushUrl
  13. self.requestId = requestId
  14. self.push_stream_sp = None
  15. self.start_time = time.time()
  16. def close_push_stream_sp(self):
  17. logger.info("开始关闭推流管道, requestId:{}", self.requestId)
  18. if self.push_stream_sp:
  19. logger.info("尝试关闭推流管道, requestId:{}", self.requestId)
  20. self.push_stream_sp.terminate()
  21. self.push_stream_sp.wait()
  22. if self.push_stream_sp and self.push_stream_sp.poll() is None:
  23. logger.error("尝试关闭推流管道失败, requestId:{}", self.requestId)
  24. self.push_stream_sp.communicate(input=b"q\n", timeout=30)
  25. if self.push_stream_sp and self.push_stream_sp.poll() is not None:
  26. logger.info("尝试关闭推流管道成功, requestId:{}", self.requestId)
  27. self.push_stream_sp = None
  28. if self.push_stream_sp and self.push_stream_sp.poll() is not None:
  29. logger.info("尝试关闭推流管道成功, requestId:{}", self.requestId)
  30. self.push_stream_sp = None
  31. else:
  32. logger.info("推流管道已关闭, requestId:{}", self.requestId)
  33. # 构建 cv2
  34. def start_push_stream(self):
  35. try:
  36. if self.push_stream_sp:
  37. return
  38. if self.pullUrl is None or len(self.pullUrl) == 0:
  39. logger.error("推流地址不能为空, requestId:{}", self.requestId)
  40. raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  41. ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  42. if self.pushUrl is None or len(self.pushUrl) == 0:
  43. logger.error("推流地址不能为空, requestId:{}", self.requestId)
  44. raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  45. ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  46. command = ['ffmpeg', '-re', '-y', "-an", "-hide_banner"]
  47. if self.pullUrl.startswith("rtsp://"):
  48. command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp'])
  49. if self.pullUrl.startswith("rtmp://") or self.pullUrl.startswith("http"):
  50. command.extend(['-rw_timeout', '20000000'])
  51. command.extend(['-i', self.pullUrl,
  52. '-c:v', 'copy',
  53. '-b:v', '4000k',
  54. '-bufsize', '4000k',
  55. '-f', 'flv',
  56. self.pushUrl])
  57. logger.info("推流指令:{}, requestId:{}", command, self.requestId)
  58. self.push_stream_sp = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=False)
  59. except ServiceException as s:
  60. self.close_push_stream_sp()
  61. logger.error("构建p管道异常: {}, requestId:{}", s.msg, self.requestId)
  62. raise s
  63. except Exception as e:
  64. self.close_push_stream_sp()
  65. logger.error("初始化推流管道异常:{}, requestId:{}", format_exc(), self.requestId)
  66. raise e