You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

57 lines
2.5KB

  1. # -*- coding: utf-8 -*-
  2. from queue import Queue
  3. from threading import Thread
  4. import time
  5. from traceback import format_exc
  6. from loguru import logger
  7. from enums.StatusEnum import PushStreamStatus
  8. from exception.CustomerException import ServiceException
  9. from util.PushStreamUtils import PushStreamUtil
  10. from util.QueUtil import put_queue
  11. class PushSteamThread(Thread):
  12. __slots__ = ("pushStreamUtil", "requestId", "videoId", "statusQueue")
  13. def __init__(self, pullUrl, pushUrl, requestId, videoId):
  14. super().__init__()
  15. self.pushStreamUtil = PushStreamUtil(pullUrl, pushUrl, requestId)
  16. self.requestId = requestId
  17. self.videoId = videoId
  18. self.statusQueue = Queue()
  19. def run(self):
  20. logger.info("开始启动推流线程, 视频id: {}, requestId:{}", self.videoId, self.requestId)
  21. while True:
  22. try:
  23. self.pushStreamUtil.start_push_stream()
  24. out, err = self.pushStreamUtil.push_stream_sp.communicate()
  25. # 异常断流
  26. if self.pushStreamUtil.status:
  27. logger.warning("推流异常,请检测拉流地址和推流地址是否正常!")
  28. if self.pushStreamUtil.push_stream_sp.returncode != 0:
  29. logger.error("推流异常:{}, 视频id: {}, requestId:{}", err.decode(), self.videoId,
  30. self.requestId)
  31. put_queue(self.statusQueue, (2, PushStreamStatus.RETRYING.value[0]), is_throw_ex=False)
  32. self.pushStreamUtil.close_push_stream_sp()
  33. time.sleep(1)
  34. # 手动断流
  35. if not self.pushStreamUtil.status:
  36. self.pushStreamUtil.close_push_stream_sp()
  37. break
  38. except ServiceException as s:
  39. logger.error("异常: {}, 视频id: {}, requestId:{}", s.msg, self.videoId, self.requestId)
  40. self.pushStreamUtil.status = False
  41. self.pushStreamUtil.close_push_stream_sp()
  42. put_queue(self.statusQueue, (1, s), is_throw_ex=False)
  43. break
  44. except Exception as e:
  45. logger.error("异常:{}, 视频id: {}, requestId:{}", format_exc(), self.videoId, self.requestId)
  46. self.pushStreamUtil.status = False
  47. self.pushStreamUtil.close_push_stream_sp()
  48. put_queue(self.statusQueue, (1, e), is_throw_ex=False)
  49. break
  50. logger.info("结束推流线程, 视频id: {}, requestId:{}", self.videoId, self.requestId)