You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

206 lines
12KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from traceback import format_exc
  4. from multiprocessing import Process, Queue
  5. from loguru import logger
  6. from concurrency.PushStreamThread import PushSteamThread
  7. from enums.StatusEnum import PushStreamStatus, ExecuteStatus
  8. from util.LogUtils import init_log
  9. from enums.ExceptionEnum import ExceptionType
  10. from entity.FeedBack import pull_stream_feedback
  11. from exception.CustomerException import ServiceException
  12. from util.QueUtil import get_no_block_queue
  13. class PushStreamProcess(Process):
  14. __slots__ = [
  15. '_fbQueue',
  16. '_eventQueue',
  17. '_context',
  18. '_msg',
  19. '_analysisType',
  20. '_base_dir'
  21. ]
  22. def __init__(self, param):
  23. super().__init__()
  24. self._fbQueue = param.fbqueue
  25. self.eventQueue = Queue()
  26. self._context = param.context
  27. self._msg = param.msg
  28. self._analysisType = param.analyse_type
  29. self._base_dir = param.base_dir
  30. def sendEvent(self, eBody):
  31. self.eventQueue.put(eBody)
  32. # 获取下一个事件
  33. def getEvent(self):
  34. return get_no_block_queue(self.eventQueue)
  35. # 推送执行结果
  36. def sendResult(self, result):
  37. self._fbQueue.put(result)
  38. def sendhbMessage(self, status, videoIds, error_code="", error_msg=""):
  39. self.sendResult({"pull_stream": pull_stream_feedback(self._msg.get("requestId"),
  40. status,
  41. error_code,
  42. error_msg, videoIds)})
  43. def run(self):
  44. task = {}
  45. videoStatus = {}
  46. requestId = self._msg.get("requestId")
  47. try:
  48. # 程序开始时间
  49. init_log(self._base_dir)
  50. videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.WAITING.value[0]} for url in
  51. self._msg.get("videoUrls", []) if url.get("id")]
  52. self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.WAITING.value[0], "", "",
  53. videoInfo)})
  54. videoUrls = self._msg.get("videoUrls")
  55. requestId = self._msg.get("requestId")
  56. if videoUrls is None or len(videoUrls) == 0:
  57. raise ServiceException(ExceptionType.PUSH_STREAM_URL_IS_NULL.value[0],
  58. ExceptionType.PUSH_STREAM_URL_IS_NULL.value[1])
  59. if len(videoUrls) > 5:
  60. logger.error("推流数量超过限制, 当前推流数量: {}, requestId:{}", len(videoUrls), requestId)
  61. raise ServiceException(ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[0],
  62. ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[1])
  63. for videoUrl in videoUrls:
  64. pushThread = PushSteamThread(videoUrl["pullUrl"], videoUrl["pushUrl"], requestId, videoUrl["id"])
  65. pushThread.start()
  66. task[videoUrl["id"]] = pushThread
  67. enable_time = time.time()
  68. for video in videoInfo:
  69. videoStatus[video.get("id")] = video.get("status")
  70. count = 0
  71. while True:
  72. # 整个推流任务超时时间
  73. if time.time() - enable_time > 43200:
  74. logger.error("任务执行超时, requestId:{}", requestId)
  75. for t in list(task.keys()):
  76. if task[t].is_alive():
  77. task[t].pushStreamUtil.status = False
  78. task[t].pushStreamUtil.close_push_stream_sp()
  79. task[t].join(120)
  80. videoStatus[t] = PushStreamStatus.TIMEOUT.value[0]
  81. videoInfo_timeout = [{"id": k, "status": v} for k, v in videoStatus.items()]
  82. self.sendResult({"pull_stream": pull_stream_feedback(requestId,
  83. ExecuteStatus.TIMEOUT.value[0],
  84. ExceptionType.TASK_EXCUTE_TIMEOUT_EXCEPTION.value[
  85. 0],
  86. ExceptionType.TASK_EXCUTE_TIMEOUT_EXCEPTION.value[
  87. 1],
  88. videoInfo_timeout)})
  89. break
  90. # 接受停止指令
  91. event_result = get_no_block_queue(self.eventQueue)
  92. if event_result is not None:
  93. command = event_result.get("command")
  94. videoIds = event_result.get("videoIds")
  95. if "stop" == command:
  96. # 如果videoIds是空停止所有任务
  97. if videoIds is None or len(videoIds) == 0:
  98. logger.info("停止所有执行的推流任务, requestId:{}", requestId)
  99. for t in list(task.keys()):
  100. if task[t].is_alive():
  101. task[t].pushStreamUtil.status = False
  102. task[t].pushStreamUtil.close_push_stream_sp()
  103. task[t].join(120)
  104. videoStatus[t] = PushStreamStatus.SUCCESS.value[0]
  105. videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()]
  106. self.sendResult({"pull_stream": pull_stream_feedback(requestId,
  107. ExecuteStatus.SUCCESS.value[0],
  108. "",
  109. "",
  110. videoInfo_sucess)})
  111. break
  112. else:
  113. logger.info("停止指定的推流任务, requestId:{}", requestId)
  114. alive_thread = 0
  115. for t in list(task.keys()):
  116. if task[t].is_alive():
  117. if t in videoIds:
  118. task[t].pushStreamUtil.status = False
  119. task[t].pushStreamUtil.close_push_stream_sp()
  120. task[t].join(120)
  121. videoStatus[t] = PushStreamStatus.SUCCESS.value[0]
  122. else:
  123. alive_thread += 1
  124. if alive_thread == 0:
  125. videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()]
  126. self.sendResult({"pull_stream": pull_stream_feedback(requestId,
  127. ExecuteStatus.SUCCESS.value[0],
  128. "",
  129. "",
  130. videoInfo_sucess)})
  131. break
  132. for t in list(task.keys()):
  133. if task[t].pushStreamUtil.status and not task[t].is_alive():
  134. logger.error("检测到推流线程异常停止!videoId:{}, requestId:{}", t, requestId)
  135. raise Exception("检测到推流线程异常停止!")
  136. statusQueue = get_no_block_queue(task[t].statusQueue)
  137. if statusQueue is not None and statusQueue[0] == 1:
  138. logger.error("推流任务异常: videoId:{}, requestId:{}", t, requestId)
  139. task[t].join(timeout=120)
  140. videoStatus[t] = PushStreamStatus.FAILED.value[0]
  141. raise statusQueue[1]
  142. if task[t].is_alive() and statusQueue is not None and statusQueue[0] == 2:
  143. if PushStreamStatus.RETRYING.value[0] == statusQueue[1]:
  144. videoStatus[t] = PushStreamStatus.RETRYING.value[0]
  145. task[t].pushStreamUtil.start_time = time.time()
  146. if task[t].is_alive() and time.time() - task[t].pushStreamUtil.start_time > 21:
  147. videoStatus[t] = PushStreamStatus.RUNNING.value[0]
  148. if count % 10 == 0:
  149. videoInfo_hb = [{"id": k, "status": v} for k, v in videoStatus.items()]
  150. self.sendResult({"pull_stream": pull_stream_feedback(requestId,
  151. ExecuteStatus.RUNNING.value[0],
  152. "",
  153. "",
  154. videoInfo_hb)})
  155. count = 0
  156. count += 1
  157. time.sleep(1)
  158. except ServiceException as s:
  159. logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, requestId)
  160. for t in list(task.keys()):
  161. if task[t].is_alive():
  162. task[t].pushStreamUtil.status = False
  163. task[t].pushStreamUtil.close_push_stream_sp()
  164. task[t].join(120)
  165. videoStatus[t] = PushStreamStatus.FAILED.value[0]
  166. videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()]
  167. self.sendResult({"pull_stream": pull_stream_feedback(requestId,
  168. ExecuteStatus.FAILED.value[0],
  169. s.code,
  170. s.msg,
  171. videoInfo_ex)})
  172. except Exception:
  173. logger.error("服务异常: {}, requestId: {},", format_exc(), requestId)
  174. for t in list(task.keys()):
  175. if task[t].is_alive():
  176. task[t].pushStreamUtil.status = False
  177. task[t].pushStreamUtil.close_push_stream_sp()
  178. task[t].join(120)
  179. videoStatus[t] = PushStreamStatus.FAILED.value[0]
  180. videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()]
  181. self.sendResult({"pull_stream": pull_stream_feedback(requestId,
  182. ExecuteStatus.FAILED.value[0],
  183. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  184. ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  185. videoInfo_ex)})
  186. finally:
  187. if len(task) > 0:
  188. for t in list(task.keys()):
  189. if task[t].is_alive():
  190. task[t].pushStreamUtil.status = False
  191. task[t].pushStreamUtil.close_push_stream_sp()
  192. task[t].join(120)
  193. logger.info("推流任务完成, requestId: {}", requestId)