You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
11KB

  1. # -*- coding: utf-8 -*-
  2. from queue import Queue
  3. from threading import Thread
  4. from time import time, sleep
  5. from traceback import format_exc
  6. from loguru import logger
  7. from enums.ExceptionEnum import ExceptionType
  8. from enums.RecordingStatusEnum import RecordingStatus
  9. from exception.CustomerException import ServiceException
  10. from util.Cv2Utils import check_video_stream, clear_pull_p, build_video_info2, pull_read_video_stream2
  11. from util.QueUtil import put_queue, get_no_block_queue, clear_queue, put_queue_result
  12. class PullStreamThread(Thread):
  13. __slots__ = ('_command', '_pull_queue', '_hb_queue', '_fb_queue', '_msg', '_context')
  14. def __init__(self, *args):
  15. super().__init__()
  16. self._msg, self._context, self._pull_queue, self._hb_queue, self._fb_queue, self._frame_num = args
  17. self._command = Queue()
  18. def sendEvent(self, result):
  19. put_queue(self._command, result, timeout=10, is_ex=False)
  20. class RecordingPullStreamThread(PullStreamThread):
  21. def run(self):
  22. msg, context, frame_num = self._msg, self._context, self._frame_num
  23. request_id, pull_url = msg["request_id"], msg['pull_url']
  24. service = context["service"]
  25. pull_stream_timeout = int(service["recording_pull_stream_timeout"])
  26. read_stream_timeout = int(service["cv2_read_stream_timeout"])
  27. service_timeout = int(service["timeout"])
  28. command_queue, pull_queue, fb_queue, hb_queue = self._command, self._pull_queue, self._fb_queue, self._hb_queue
  29. width, height, width_height_3, all_frames, w, h = None, None, None, 0, None, None
  30. read_start_time, pull_p, ex = None, None, None
  31. frame_list, frame_index_list = [], []
  32. stop_ex = True
  33. pull_stream_start_time = time()
  34. try:
  35. logger.info("录屏拉流线程开始启动, requestId: {}", request_id)
  36. cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1
  37. start_time = time()
  38. while True:
  39. # 检查任务是否超时
  40. if time() - start_time > service_timeout:
  41. logger.error("录屏拉流超时, requestId: {}", request_id)
  42. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  43. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  44. # 最终停止拉流
  45. event = get_no_block_queue(command_queue)
  46. if event is not None:
  47. # 当接收到停止指令,说明不会再处理视频帧了, 直接退出
  48. if 'stop' == event.get("command"):
  49. if len(frame_list) > 0:
  50. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  51. logger.info("录屏拉流线程开始停止, requestId: {}", request_id)
  52. break
  53. # 主进程异常,停止子线程
  54. if 'stop_ex' == event.get("command"):
  55. logger.info("录屏异常拉开始停止拉流线程, requestId: {}", request_id)
  56. stop_ex = False
  57. break
  58. # 如果是离线拉流
  59. if pull_url.startswith('http'):
  60. if check_video_stream(width, height):
  61. logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
  62. # 当是离线地址重试3次还是拉取不到视频流,关闭拉流管道,返回失败信息
  63. # 目前改为等待5分钟
  64. # if cv2_init_num > 3:
  65. if time() - start_time > 300:
  66. logger.info("离线拉流重试失败, 重试次数: {}, requestId: {}", cv2_init_num, request_id)
  67. raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
  68. ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
  69. cv2_init_num += 1
  70. width, height, width_height_3, all_frames, w, h = build_video_info2(pull_url, request_id)
  71. if width is not None:
  72. put_queue(hb_queue, {"status": RecordingStatus.RECORDING_RUNNING.value[0]}, timeout=2)
  73. else:
  74. # if cv2_init_num < 2:
  75. if time() - start_time < 300:
  76. put_queue(hb_queue, {"status": RecordingStatus.RECORDING_RETRYING.value[0]}, timeout=2)
  77. continue
  78. # 当离线视频时, 队列满了, 等待1秒后再试
  79. if pull_queue.full():
  80. logger.info("pull拉流队列满了: {}, requestId: {}", pull_queue.qsize(), request_id)
  81. sleep(1)
  82. continue
  83. # 如果是实时拉流
  84. else:
  85. if check_video_stream(width, height):
  86. logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
  87. pull_stream_init_timeout = time() - pull_stream_start_time
  88. if len(frame_list) > 0:
  89. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  90. frame_list, frame_index_list = [], []
  91. if pull_stream_init_timeout > pull_stream_timeout:
  92. logger.error("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, request_id)
  93. raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
  94. ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
  95. cv2_init_num += 1
  96. width, height, width_height_3, all_frames, w, h = build_video_info2(pull_url, request_id)
  97. if width is not None:
  98. put_queue(hb_queue, {"status": RecordingStatus.RECORDING_RUNNING.value[0]}, timeout=1)
  99. else:
  100. if cv2_init_num < 3:
  101. put_queue(hb_queue, {"status": RecordingStatus.RECORDING_RETRYING.value[0]}, timeout=1)
  102. sleep(1)
  103. continue
  104. pull_stream_start_time = time()
  105. cv2_init_num = 1
  106. frame, pull_p, width, height = pull_read_video_stream2(pull_p, pull_url, width, height,
  107. width_height_3, w, h, request_id)
  108. if frame is None:
  109. if pull_url.startswith('http'):
  110. clear_pull_p(pull_p, request_id)
  111. logger.info("总帧数: {}, 当前帧数: {}, requestId: {}", all_frames, concurrent_frame, request_id)
  112. if len(frame_list) > 0:
  113. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  114. if concurrent_frame < all_frames - 100:
  115. logger.info("离线拉流异常结束:requestId: {}", request_id)
  116. raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
  117. ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
  118. logger.info("离线拉流线程结束, requestId: {}", request_id)
  119. break
  120. else:
  121. logger.info("获取帧为空, 开始重试: {}次, requestId: {}", init_pull_num, request_id)
  122. if len(frame_list) > 0:
  123. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  124. frame_list, frame_index_list = [], []
  125. if read_start_time is None:
  126. read_start_time = time()
  127. pull_stream_read_timeout = time() - read_start_time
  128. if pull_stream_read_timeout > read_stream_timeout:
  129. logger.info("拉流过程中断了重试超时, 超时时间: {}, requestId: {}", pull_stream_read_timeout,
  130. request_id)
  131. raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
  132. ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
  133. init_pull_num += 1
  134. continue
  135. init_pull_num = 1
  136. read_start_time = None
  137. if pull_queue.full():
  138. sleep(1)
  139. logger.info("pull拉流队列满了:{}, requestId: {}", pull_queue.qsize(), request_id)
  140. continue
  141. frame_list.append(frame)
  142. frame_index_list.append(concurrent_frame)
  143. if len(frame_list) >= frame_num:
  144. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  145. frame_list, frame_index_list = [], []
  146. concurrent_frame += 1
  147. del frame
  148. except ServiceException as s:
  149. ex = s.code, s.msg
  150. except Exception:
  151. logger.exception("实时拉流异常: {}, requestId:{}", format_exc(), request_id)
  152. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  153. finally:
  154. clear_pull_p(pull_p, request_id)
  155. if stop_ex:
  156. if ex:
  157. error_code, error_msg = ex
  158. result = put_queue_result(pull_queue, (1, error_code, error_msg), timeout=3)
  159. else:
  160. result = put_queue_result(pull_queue, (2,), timeout=3)
  161. if result:
  162. # 3分钟超时时间
  163. cr_time = time()
  164. while time() - cr_time < 180:
  165. event = get_no_block_queue(command_queue)
  166. if event is not None:
  167. # 当接收到停止指令,说明不会再处理视频帧了, 直接退出
  168. if 'stop' == event.get("command"):
  169. logger.info("录屏拉流线程开始停止, requestId: {}", request_id)
  170. break
  171. sleep(1)
  172. clear_queue(command_queue)
  173. clear_queue(pull_queue)
  174. clear_queue(hb_queue)
  175. del frame_list, frame_index_list
  176. logger.info("录屏拉流线程结束, requestId: {}", request_id)