You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
11KB

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor
  3. from os.path import join
  4. from threading import Thread
  5. from traceback import format_exc
  6. import cv2
  7. import numpy as np
  8. from loguru import logger
  9. from util.Cv2Utils import write_or_video, write_ai_video, push_video_stream, close_all_p, video_conjuncing
  10. from util.ImageUtils import url2Array, add_water_pic
  11. from util.PlotsUtils import draw_painting_joint
  12. from util.QueUtil import put_queue
  13. class OnPushStreamThread(Thread):
  14. __slots__ = ('_msg', '_push_queue', '_context', 'ex', '_logo', '_image_queue')
  15. def __init__(self, *args):
  16. super().__init__()
  17. # 传参
  18. self._msg, self._push_queue, self._image_queue, self._context = args
  19. # 自带参数
  20. self.ex = None
  21. self._logo = None
  22. if self._context["video"]["video_add_water"]:
  23. self._logo = self._msg.get("logo_url")
  24. if self._logo:
  25. self._logo = url2Array(self._logo, enable_ex=False)
  26. if not self._logo:
  27. self._logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1)
  28. def run(self):
  29. request_id, push_queue, image_queue = self._msg.get("request_id"), self._push_queue, self._image_queue
  30. orFilePath, aiFilePath, logo = self._context.get("orFilePath"), self._context.get("aiFilePath"), self._logo
  31. or_video_file, ai_video_file, push_p = None, None, None
  32. push_url = self._msg.get("push_url")
  33. try:
  34. logger.info("开始启动推流线程!requestId:{}", request_id)
  35. with ThreadPoolExecutor(max_workers=2) as t:
  36. p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0]
  37. while True:
  38. push_parm = push_queue.get()
  39. if push_parm is not None:
  40. # [(1, 原视频帧, 分析视频帧)]
  41. # # [视频帧、当前帧数、 总帧数、 [(问题数组、code、allowedList、label_arraylist、rainbows)]]
  42. # res = (1, (pull_frame[1], pull_frame[2], pull_frame[3], []))
  43. # [(2, 操作指令)]
  44. if push_parm[0] == 1: # 视频帧操作
  45. frame, current_frame, all_frames, ques_list = push_parm[1]
  46. copy_frame = frame.copy()
  47. det_xywh = {}
  48. if len(ques_list) > 0:
  49. for qs in ques_list:
  50. det_xywh[qs[1]] = {}
  51. detect_targets_code = int(qs[0][0])
  52. score = qs[0][-1]
  53. label_array = qs[3][detect_targets_code]
  54. color = qs[4][detect_targets_code]
  55. if not isinstance(qs[0][1], (list, tuple, np.ndarray)):
  56. xc, yc, x2, y2 = int(qs[0][1]), int(qs[0][2]), int(qs[0][3]), int(qs[0][4])
  57. box = [(xc, yc), (x2, yc), (x2, y2), (xc, y2)]
  58. else:
  59. box = qs[0][1]
  60. draw_painting_joint(box, copy_frame, label_array, score, color, "leftTop")
  61. cd = det_xywh[qs[1]].get(detect_targets_code)
  62. if cd is None:
  63. det_xywh[qs[1]][detect_targets_code] = [
  64. [detect_targets_code, box, score, label_array, color]]
  65. else:
  66. det_xywh[qs[1]][detect_targets_code].append(
  67. [detect_targets_code, box, score, label_array, color])
  68. if logo:
  69. frame = add_water_pic(frame, logo, request_id)
  70. copy_frame = add_water_pic(copy_frame, logo, request_id)
  71. frame_merge = video_conjuncing(frame, copy_frame)
  72. # 写原视频到本地
  73. write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file,
  74. or_write_status, request_id)
  75. # 写识别视频到本地
  76. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file,
  77. ai_write_status, request_id)
  78. if len(det_xywh) > 0:
  79. put_queue(image_queue, (1, (det_xywh, frame, current_frame, all_frames)))
  80. push_p = push_video_stream(frame_merge, push_p, push_url, p_push_status, request_id)
  81. ai_video_file = write_ai_video_result.result()
  82. or_video_file = write_or_video_result.result()
  83. if push_parm[0] == 2:
  84. if 'stop' == push_parm[1]:
  85. logger.info("停止推流线程, requestId: {}", request_id)
  86. close_all_p(push_p, or_video_file, ai_video_file, request_id)
  87. or_video_file, ai_video_file, push_p = None, None, None
  88. break
  89. except Exception as e:
  90. logger.error("推流线程异常:{}, requestId:{}", format_exc(), request_id)
  91. self.ex = e
  92. finally:
  93. close_all_p(push_p, or_video_file, ai_video_file, request_id)
  94. logger.info("推流线程停止完成!requestId:{}", request_id)
  95. class OffPushStreamThread(Thread):
  96. __slots__ = ('_msg', '_push_queue', '_context', 'ex', '_logo', '_image_queue')
  97. def __init__(self, *args):
  98. super().__init__()
  99. # 传参
  100. self._msg, self._push_queue, self._image_queue, self._context = args
  101. # 自带参数
  102. self.ex = None
  103. self._logo = None
  104. if self._context["video"]["video_add_water"]:
  105. self._logo = self._msg.get("logo_url")
  106. if self._logo:
  107. self._logo = url2Array(self._logo, enable_ex=False)
  108. if not self._logo:
  109. self._logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1)
  110. def run(self):
  111. request_id, push_queue, image_queue = self._msg.get("request_id"), self._push_queue, self._image_queue
  112. aiFilePath, logo = self._context.get("aiFilePath"), self._logo
  113. ai_video_file, push_p = None, None
  114. push_url = self._msg.get("push_url")
  115. try:
  116. logger.info("开始启动推流线程!requestId:{}", request_id)
  117. with ThreadPoolExecutor(max_workers=1) as t:
  118. p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0]
  119. while True:
  120. push_parm = push_queue.get()
  121. if push_parm is not None:
  122. # [(1, 原视频帧, 分析视频帧)]
  123. # # [视频帧、当前帧数、 总帧数、 [(问题数组、code、allowedList、label_arraylist、rainbows)]]
  124. # res = (1, (pull_frame[1], pull_frame[2], pull_frame[3], []))
  125. # [(2, 操作指令)]
  126. if push_parm[0] == 1: # 视频帧操作
  127. frame, current_frame, all_frames, ques_list = push_parm[1]
  128. copy_frame = frame.copy()
  129. det_xywh = {}
  130. if len(ques_list) > 0:
  131. for qs in ques_list:
  132. det_xywh[qs[1]] = {}
  133. detect_targets_code = int(qs[0][0])
  134. score = qs[0][-1]
  135. label_array = qs[3][detect_targets_code]
  136. color = qs[4][detect_targets_code]
  137. if not isinstance(qs[0][1], (list, tuple, np.ndarray)):
  138. xc, yc, x2, y2 = int(qs[0][1]), int(qs[0][2]), int(qs[0][3]), int(qs[0][4])
  139. box = [(xc, yc), (x2, yc), (x2, y2), (xc, y2)]
  140. else:
  141. box = qs[0][1]
  142. draw_painting_joint(box, copy_frame, label_array, score, color, "leftTop")
  143. cd = det_xywh[qs[1]].get(detect_targets_code)
  144. if cd is None:
  145. det_xywh[qs[1]][detect_targets_code] = [
  146. [detect_targets_code, box, score, label_array, color]]
  147. else:
  148. det_xywh[qs[1]][detect_targets_code].append(
  149. [detect_targets_code, box, score, label_array, color])
  150. if logo:
  151. frame = add_water_pic(frame, logo, request_id)
  152. copy_frame = add_water_pic(copy_frame, logo, request_id)
  153. frame_merge = video_conjuncing(frame, copy_frame)
  154. # 写识别视频到本地
  155. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file,
  156. ai_write_status, request_id)
  157. if len(det_xywh) > 0:
  158. put_queue(image_queue, (1, (det_xywh, frame, current_frame, all_frames)))
  159. push_p = push_video_stream(frame_merge, push_p, push_url, p_push_status, request_id)
  160. ai_video_file = write_ai_video_result.result()
  161. if push_parm[0] == 2:
  162. if 'stop' == push_parm[1]:
  163. logger.info("停止推流线程, requestId: {}", request_id)
  164. close_all_p(push_p, None, ai_video_file, request_id)
  165. ai_video_file, push_p = None, None
  166. break
  167. except Exception as e:
  168. logger.error("推流线程异常:{}, requestId:{}", format_exc(), request_id)
  169. self.ex = e
  170. finally:
  171. close_all_p(push_p, None, ai_video_file, request_id)
  172. logger.info("推流线程停止完成!requestId:{}", request_id)