You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

357 lines
21KB

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor
  3. from multiprocessing import Process
  4. from os import getpid
  5. from os.path import join
  6. from time import time, sleep
  7. from traceback import format_exc
  8. import cv2
  9. import numpy as np
  10. import psutil
  11. from loguru import logger
  12. from enums.ExceptionEnum import ExceptionType
  13. from exception.CustomerException import ServiceException
  14. from util import ImageUtils
  15. from util.Cv2Utils import video_conjuncing, write_or_video, write_ai_video, push_video_stream, close_all_p
  16. from util.ImageUtils import url2Array, add_water_pic
  17. from util.LogUtils import init_log
  18. from util.PlotsUtils import draw_painting_joint, filterBox, xywh2xyxy2
  19. from util.QueUtil import get_no_block_queue, put_queue, clear_queue
  20. class PushStreamProcess2(Process):
  21. __slots__ = ("_msg", "_push_queue", "_image_queue", '_push_ex_queue', '_hb_queue', "_context")
  22. def __init__(self, *args):
  23. super().__init__()
  24. # 传参
  25. self._msg, self._push_queue, self._image_queue, self._push_ex_queue, self._hb_queue, self._context = args
  26. def build_logo_url(self):
  27. logo = None
  28. if self._context["video"]["video_add_water"]:
  29. logo = self._msg.get("logo_url")
  30. if logo:
  31. logo = url2Array(logo, enable_ex=False)
  32. if logo is None:
  33. logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1)
  34. self._context["logo"] = logo
  35. class OnPushStreamProcess2(PushStreamProcess2):
  36. __slots__ = ()
  37. def run(self):
  38. msg, context = self._msg, self._context
  39. self.build_logo_url()
  40. request_id = msg["request_id"]
  41. base_dir, env = context["base_dir"], context['env']
  42. push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \
  43. self._hb_queue
  44. orFilePath, aiFilePath, logo = context["orFilePath"], context["aiFilePath"], context["logo"]
  45. or_video_file, ai_video_file, push_p, push_url = None, None, None, msg["push_url"]
  46. service_timeout = int(context["service"]["timeout"]) + 120
  47. frame_score = context["service"]["filter"]["frame_score"]
  48. ex = None
  49. ex_status = True
  50. try:
  51. init_log(base_dir, env)
  52. logger.info("开始启动推流进程!requestId:{}", request_id)
  53. with ThreadPoolExecutor(max_workers=3) as t:
  54. # 定义三种推流、写原视频流、写ai视频流策略
  55. # 第一个参数时间, 第二个参数重试次数
  56. p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0]
  57. start_time = time()
  58. minID = 0
  59. maxID = 0
  60. while True:
  61. # 检测推流执行超时时间
  62. if time() - start_time > service_timeout:
  63. logger.error("推流超时, requestId: {}", request_id)
  64. raise ServiceException(ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[0],
  65. ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[1])
  66. # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己
  67. if psutil.Process(getpid()).ppid() == 1:
  68. ex_status = False
  69. logger.info("推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id)
  70. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  71. clear_queue(q)
  72. break
  73. # 获取推流的视频帧
  74. push_r = get_no_block_queue(push_queue)
  75. if push_r is not None:
  76. # [(1, ...] 视频帧操作
  77. # [(2, 操作指令)] 指令操作
  78. if push_r[0] == 1:
  79. # 如果是多模型push_objs数组可能包含[模型1识别数组, 模型2识别数组, 模型3识别数组]
  80. frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1]
  81. # 处理每一帧图片
  82. for i, frame in enumerate(frame_list):
  83. # 复制帧用来画图
  84. copy_frame = frame.copy()
  85. # 所有问题记录字典
  86. det_xywh, thread_p = {}, []
  87. det_tmp = {}
  88. det_xywh2 = {}
  89. # [模型1识别数组, 模型2识别数组, 模型3识别数组]
  90. for s_det_list in push_objs:
  91. code, det_result = s_det_list[0], s_det_list[1][i]
  92. if len(det_result) > 0:
  93. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  94. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  95. for qs in det_result:
  96. box, score, cls = xywh2xyxy2(qs)
  97. if cls not in allowedList or score < frame_score:
  98. continue
  99. label_array, color = label_arrays[cls], rainbows[cls]
  100. rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config)
  101. thread_p.append(rr)
  102. if det_xywh.get(code) is None:
  103. det_xywh[code] = {}
  104. cd = det_xywh[code].get(cls)
  105. is_new = False
  106. if len(qs) == 8:
  107. trackId = qs[7]
  108. elif len(qs) == 5:
  109. trackId = qs[4]
  110. if trackId > minID:
  111. is_new = True
  112. if det_tmp.get(code) is None:
  113. det_tmp[code] = [cls]
  114. else:
  115. if not (cls in det_tmp[code]):
  116. det_tmp[code].append(cls)
  117. qs_tmp = [cls, box, score, label_array, color, is_new]
  118. if trackId > maxID:
  119. maxID = trackId
  120. if cd is None:
  121. det_xywh[code][cls] = [qs_tmp]
  122. else:
  123. det_xywh[code][cls].append(qs_tmp)
  124. minID = maxID
  125. if logo:
  126. frame = add_water_pic(frame, logo, request_id)
  127. copy_frame = add_water_pic(copy_frame, logo, request_id)
  128. if len(thread_p) > 0:
  129. for r in thread_p:
  130. r.result()
  131. frame_merge = video_conjuncing(frame, copy_frame)
  132. # 写原视频到本地
  133. write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file,
  134. or_write_status, request_id)
  135. # 写识别视频到本地
  136. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath,
  137. ai_video_file, ai_write_status, request_id)
  138. push_p_result = t.submit(push_video_stream, frame_merge, push_p, push_url,
  139. p_push_status,
  140. request_id)
  141. if det_xywh:
  142. for index, (key, value) in enumerate(det_xywh.items()):
  143. for k in value.keys():
  144. if (key in det_tmp.keys()) and (k in det_tmp[key]):
  145. det_xywh2[key] = {}
  146. det_xywh2[key][k] = det_xywh[key][k]
  147. if len(det_xywh2) > 0:
  148. put_queue(image_queue, (1, [det_xywh2, frame, frame_index_list[i], all_frames, draw_config["font_config"]]))
  149. push_p = push_p_result.result(timeout=60)
  150. ai_video_file = write_ai_video_result.result(timeout=60)
  151. or_video_file = write_or_video_result.result(timeout=60)
  152. # 接收停止指令
  153. if push_r[0] == 2:
  154. if 'stop' == push_r[1]:
  155. logger.info("停止推流线程, requestId: {}", request_id)
  156. break
  157. if 'stop_ex' == push_r[1]:
  158. logger.info("停止推流线程, requestId: {}", request_id)
  159. ex_status = False
  160. break
  161. del push_r
  162. else:
  163. sleep(1)
  164. except ServiceException as s:
  165. logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id)
  166. ex = s.code, s.msg
  167. except Exception:
  168. logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id)
  169. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  170. finally:
  171. # 关闭推流管, 原视频写对象, 分析视频写对象
  172. close_all_p(push_p, or_video_file, ai_video_file, request_id)
  173. if ex:
  174. code, msg = ex
  175. put_queue(push_ex_queue, (1, code, msg), timeout=2)
  176. else:
  177. if ex_status:
  178. # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片
  179. c_time = time()
  180. while time() - c_time < 60:
  181. if image_queue.qsize() == 0 or image_queue.empty():
  182. break
  183. sleep(2)
  184. for q in [push_queue, image_queue, hb_queue]:
  185. clear_queue(q)
  186. logger.info("推流进程停止完成!requestId:{}", request_id)
  187. class OffPushStreamProcess2(PushStreamProcess2):
  188. __slots__ = ()
  189. def run(self):
  190. self.build_logo_url()
  191. msg, context = self._msg, self._context
  192. request_id = msg["request_id"]
  193. base_dir, env = context["base_dir"], context['env']
  194. push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \
  195. self._hb_queue
  196. aiFilePath, logo = context["aiFilePath"], context["logo"]
  197. ai_video_file, push_p, push_url = None, None, msg["push_url"]
  198. service_timeout = int(context["service"]["timeout"]) + 120
  199. frame_score = context["service"]["filter"]["frame_score"]
  200. ex = None
  201. ex_status = True
  202. try:
  203. init_log(base_dir, env)
  204. logger.info("开始启动离线推流进程!requestId:{}", request_id)
  205. with ThreadPoolExecutor(max_workers=2) as t:
  206. # 定义三种推流、写原视频流、写ai视频流策略
  207. # 第一个参数时间, 第二个参数重试次数
  208. p_push_status, ai_write_status = [0, 0], [0, 0]
  209. start_time = time()
  210. minID = 0
  211. maxID = 0
  212. while True:
  213. # 检测推流执行超时时间
  214. if time() - start_time > service_timeout:
  215. logger.error("离线推流超时, requestId: {}", request_id)
  216. raise ServiceException(ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[0],
  217. ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[1])
  218. # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己
  219. if psutil.Process(getpid()).ppid() == 1:
  220. ex_status = False
  221. logger.info("离线推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id)
  222. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  223. clear_queue(q)
  224. break
  225. # 获取推流的视频帧
  226. push_r = get_no_block_queue(push_queue)
  227. if push_r is not None:
  228. # [(1, ...] 视频帧操作
  229. # [(2, 操作指令)] 指令操作
  230. if push_r[0] == 1:
  231. frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1]
  232. # 处理每一帧图片
  233. for i, frame in enumerate(frame_list):
  234. if frame_index_list[i] % 300 == 0 and frame_index_list[i] <= all_frames:
  235. task_process = "%.2f" % (float(frame_index_list[i]) / float(all_frames))
  236. put_queue(hb_queue, {"hb_value": task_process}, timeout=2)
  237. # 复制帧用来画图
  238. copy_frame = frame.copy()
  239. # 所有问题记录字典
  240. det_xywh, thread_p = {}, []
  241. det_xywh2 = {}
  242. det_tmp = {}
  243. for s_det_list in push_objs:
  244. code, det_result = s_det_list[0], s_det_list[1][i]
  245. if len(det_result) > 0:
  246. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  247. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  248. for qs in det_result:
  249. box, score, cls = xywh2xyxy2(qs)
  250. if cls not in allowedList or score < frame_score:
  251. continue
  252. label_array, color = label_arrays[cls], rainbows[cls]
  253. rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config)
  254. thread_p.append(rr)
  255. if det_xywh.get(code) is None:
  256. det_xywh[code] = {}
  257. cd = det_xywh[code].get(cls)
  258. is_new = False
  259. if len(qs) == 8:
  260. trackId = qs[7]
  261. elif len(qs) == 5:
  262. trackId = qs[4]
  263. if trackId > minID:
  264. is_new = True
  265. if det_tmp.get(code) is None:
  266. det_tmp[code] = [cls]
  267. else:
  268. if not (cls in det_tmp[code]):
  269. det_tmp[code].append(cls)
  270. qs_tmp = [cls, box, score, label_array, color, is_new]
  271. if trackId > maxID:
  272. maxID = trackId
  273. if cd is None:
  274. det_xywh[code][cls] = [qs_tmp]
  275. else:
  276. det_xywh[code][cls].append(qs_tmp)
  277. minID = maxID
  278. if logo:
  279. frame = add_water_pic(frame, logo, request_id)
  280. copy_frame = add_water_pic(copy_frame, logo, request_id)
  281. if len(thread_p) > 0:
  282. for r in thread_p:
  283. r.result()
  284. frame_merge = video_conjuncing(frame, copy_frame)
  285. # 写识别视频到本地
  286. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath,
  287. ai_video_file,
  288. ai_write_status, request_id)
  289. push_p_result = t.submit(push_video_stream, frame_merge, push_p, push_url,
  290. p_push_status,
  291. request_id)
  292. if det_xywh:
  293. for index, (key, value) in enumerate(det_xywh.items()):
  294. for k in value.keys():
  295. if (key in det_tmp.keys()) and (k in det_tmp[key]):
  296. det_xywh2[key] = {}
  297. det_xywh2[key][k] = det_xywh[key][k]
  298. if len(det_xywh2) > 0:
  299. put_queue(image_queue, (1, [det_xywh2, frame, frame_index_list[i], all_frames, draw_config["font_config"]]))
  300. push_p = push_p_result.result(timeout=60)
  301. ai_video_file = write_ai_video_result.result(timeout=60)
  302. # 接收停止指令
  303. if push_r[0] == 2:
  304. if 'stop' == push_r[1]:
  305. logger.info("停止推流线程, requestId: {}", request_id)
  306. break
  307. if 'stop_ex' == push_r[1]:
  308. logger.info("停止推流线程, requestId: {}", request_id)
  309. ex_status = False
  310. break
  311. del push_r
  312. else:
  313. sleep(1)
  314. except ServiceException as s:
  315. logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id)
  316. ex = s.code, s.msg
  317. except Exception:
  318. logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id)
  319. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  320. finally:
  321. # 关闭推流管, 分析视频写对象
  322. close_all_p(push_p, None, ai_video_file, request_id)
  323. if ex:
  324. code, msg = ex
  325. put_queue(push_ex_queue, (1, code, msg), timeout=2)
  326. else:
  327. if ex_status:
  328. # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片
  329. c_time = time()
  330. while time() - c_time < 60:
  331. if image_queue.qsize() == 0 or image_queue.empty():
  332. break
  333. sleep(2)
  334. for q in [push_queue, image_queue, hb_queue]:
  335. clear_queue(q)
  336. logger.info("推流进程停止完成!requestId:{}", request_id)