You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
4.1KB

  1. # -*- coding: utf-8 -*-
  2. import os
  3. from enums.ExceptionEnum import StreamStrExceptionType
  4. os.environ['PATH'] = "./ffmpeg/bin"
  5. from traceback import format_exc
  6. import subprocess as sp
  7. from loguru import logger
  8. from exception.CustomerException import ServiceException
  9. class PushStreamUtil:
  10. __slots__ = ('pullUrl', 'pushUrl', "push_stream_sp", 'status')
  11. def __init__(self, pullUrl=None, pushUrl=None):
  12. self.pullUrl = pullUrl
  13. self.pushUrl = pushUrl
  14. self.push_stream_sp = None
  15. self.status = True
  16. def set_url(self, pullUrl, pushUrl):
  17. if pullUrl is not None and len(pullUrl) > 0:
  18. self.pullUrl = pullUrl
  19. if pushUrl is not None and len(pushUrl) > 0:
  20. self.pushUrl = pushUrl
  21. def start_push_stream(self):
  22. try:
  23. if self.push_stream_sp:
  24. return
  25. if self.pullUrl is None or len(self.pullUrl) == 0:
  26. logger.error("拉流地址不能为空!")
  27. raise ServiceException(StreamStrExceptionType.PULL_STREAM_URL_IS_NULL.value[0],
  28. StreamStrExceptionType.PULL_STREAM_URL_IS_NULL.value[1])
  29. if self.pushUrl is None or len(self.pushUrl) == 0:
  30. logger.error("推流地址不能为空!")
  31. raise ServiceException(StreamStrExceptionType.PUSH_STREAM_URL_IS_NULL.value[0],
  32. StreamStrExceptionType.PUSH_STREAM_URL_IS_NULL.value[1])
  33. command = ['ffmpeg', '-re', '-y', "-an", "-hide_banner"]
  34. if self.pullUrl.startswith("rtsp://"):
  35. command.extend(['-timeout', '15000000', '-rtsp_transport', 'tcp'])
  36. if self.pullUrl.startswith("http") or self.pullUrl.startswith("rtmp"):
  37. command.extend(['-rw_timeout', '17000000'])
  38. command.extend(['-i', self.pullUrl, '-b:v', '4000k', '-c:v', 'copy', '-f', 'flv', self.pushUrl])
  39. logger.info("推流指令: {}", " ".join(command))
  40. self.push_stream_sp = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, shell=False)
  41. except ServiceException as s:
  42. logger.error("创建推流进程异常: {}", s.msg)
  43. self.close_push_stream_p()
  44. raise s
  45. except Exception as e:
  46. logger.error("创建推流管道异常:{}", format_exc())
  47. self.close_push_stream_p()
  48. raise e
  49. def close_push_stream_p(self):
  50. if self.push_stream_sp:
  51. self.push_stream_sp.terminate()
  52. self.push_stream_sp.wait()
  53. if self.push_stream_sp and self.push_stream_sp.poll() is None:
  54. logger.error("关闭推流管道异常!")
  55. self.push_stream_sp.communicate(input=b"q\n")
  56. if self.push_stream_sp and self.push_stream_sp.poll() is None:
  57. raise ServiceException(StreamStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  58. StreamStrExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  59. self.push_stream_sp = None
  60. logger.info("关闭推流管道异常成功!")
  61. # def push(push1):
  62. # push1.start_push_stream()
  63. # out, err = push1.push_stream_sp.communicate()
  64. # print("222222111111111111111111111111111111111")
  65. # if push1.push_stream_sp.returncode != 0:
  66. # print("11111111111111111111", out, err)
  67. #
  68. #
  69. # if __name__ == "__main__":
  70. # pullUrl = "rtsp://localhost:8554/live"
  71. # pushUrl = "rtmp://live.push.t-aaron.com/live/THSAr"
  72. # push1 = PushStreamUtil(pullUrl, pushUrl)
  73. # aa = Thread(target=push, args=(push1,))
  74. # aa.setDaemon(True)
  75. # aa.start()
  76. # # push1.start_push_stream()
  77. # time.sleep(30)
  78. # print("11111111111111", push1.push_stream_sp.poll())
  79. #
  80. # # out, err = push1.push_stream_sp.communicate(input=b"q\n")
  81. # print("222222222222222222222222", push1.push_stream_sp.poll())
  82. # # push1.push_stream_sp.stdin.write(b"q\n")
  83. # # push1.push_stream_sp.kill()
  84. # push1.push_stream_sp.terminate()
  85. # push1.push_stream_sp.wait()
  86. # time.sleep(5)
  87. # print("3333333333333333333333333", push1.push_stream_sp.poll())
  88. # time.sleep(10)