You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

429 lines
28KB

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor
  3. from multiprocessing import Process
  4. from os import getpid
  5. from os.path import join
  6. from time import time, sleep
  7. from traceback import format_exc
  8. import cv2
  9. import psutil
  10. from loguru import logger
  11. from enums.ExceptionEnum import ExceptionType
  12. from exception.CustomerException import ServiceException
  13. from util import ImageUtils
  14. from util.Cv2Utils import video_conjuncing, write_or_video, write_ai_video, push_video_stream, close_all_p
  15. from util.ImageUtils import url2Array, add_water_pic
  16. from util.LogUtils import init_log
  17. from util.PlotsUtils import draw_painting_joint, xywh2xyxy2
  18. from util.QueUtil import get_no_block_queue, put_queue, clear_queue
  19. class PushStreamProcess2(Process):
  20. __slots__ = ("_msg", "_push_queue", "_image_queue", '_push_ex_queue', '_hb_queue', "_context")
  21. def __init__(self, *args):
  22. super().__init__()
  23. # 传参
  24. self._msg, self._push_queue, self._image_queue, self._push_ex_queue, self._hb_queue, self._context = args
  25. def build_logo_url(self):
  26. logo = None
  27. if self._context["video"]["video_add_water"]:
  28. logo = self._msg.get("logo_url")
  29. if logo:
  30. logo = url2Array(logo, enable_ex=False)
  31. if logo is None:
  32. logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1)
  33. self._context["logo"] = logo
  34. class OnPushStreamProcess2(PushStreamProcess2):
  35. __slots__ = ()
  36. def run(self):
  37. msg, context = self._msg, self._context
  38. self.build_logo_url()
  39. request_id = msg["request_id"]
  40. base_dir, env = context["base_dir"], context['env']
  41. push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \
  42. self._hb_queue
  43. orFilePath, aiFilePath, logo = context["orFilePath"], context["aiFilePath"], context["logo"]
  44. or_video_file, ai_video_file, push_p, push_url = None, None, None, msg["push_url"]
  45. service_timeout = int(context["service"]["timeout"]) + 120
  46. frame_score = context["service"]["filter"]["frame_score"]
  47. ex = None
  48. ex_status = True
  49. high_score_image = {}
  50. # 相似度, 默认值0.65
  51. similarity = context["service"]["filter"]["similarity"]
  52. # 图片相似度开关
  53. picture_similarity = bool(context["service"]["filter"]["picture_similarity"])
  54. frame_step = int(context["service"]["filter"]["frame_step"])
  55. try:
  56. init_log(base_dir, env)
  57. logger.info("开始启动推流进程!requestId:{}", request_id)
  58. with ThreadPoolExecutor(max_workers=3) as t:
  59. # 定义三种推流、写原视频流、写ai视频流策略
  60. # 第一个参数时间, 第二个参数重试次数
  61. p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0]
  62. start_time = time()
  63. while True:
  64. # 检测推流执行超时时间
  65. if time() - start_time > service_timeout:
  66. logger.error("推流超时, requestId: {}", request_id)
  67. raise ServiceException(ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[0],
  68. ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[1])
  69. # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己
  70. if psutil.Process(getpid()).ppid() == 1:
  71. ex_status = False
  72. logger.info("推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id)
  73. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  74. clear_queue(q)
  75. break
  76. # 获取推流的视频帧
  77. push_r = get_no_block_queue(push_queue)
  78. if push_r is not None:
  79. # [(1, ...] 视频帧操作
  80. # [(2, 操作指令)] 指令操作
  81. if push_r[0] == 1:
  82. # 如果是多模型push_objs数组可能包含[模型1识别数组, 模型2识别数组, 模型3识别数组]
  83. frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1]
  84. # 处理每一帧图片
  85. for i, frame in enumerate(frame_list):
  86. # 复制帧用来画图
  87. copy_frame = frame.copy()
  88. # 所有问题记录字典
  89. det_xywh, thread_p = {}, []
  90. # [模型1识别数组, 模型2识别数组, 模型3识别数组]
  91. for s_det_list in push_objs:
  92. code, det_result = s_det_list[0], s_det_list[1][i]
  93. if len(det_result) > 0:
  94. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  95. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  96. for qs in det_result:
  97. box, score, cls = xywh2xyxy2(qs)
  98. if cls not in allowedList or score < frame_score:
  99. continue
  100. label_array, color = label_arrays[cls], rainbows[cls]
  101. rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config)
  102. thread_p.append(rr)
  103. if det_xywh.get(code) is None:
  104. det_xywh[code] = {}
  105. cd = det_xywh[code].get(cls)
  106. if cd is None:
  107. det_xywh[code][cls] = [[cls, box, score, label_array, color]]
  108. else:
  109. det_xywh[code][cls].append([cls, box, score, label_array, color])
  110. if logo:
  111. frame = add_water_pic(frame, logo, request_id)
  112. copy_frame = add_water_pic(copy_frame, logo, request_id)
  113. if len(thread_p) > 0:
  114. for r in thread_p:
  115. r.result()
  116. frame_merge = video_conjuncing(frame, copy_frame)
  117. # 写原视频到本地
  118. write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file,
  119. or_write_status, request_id)
  120. # 写识别视频到本地
  121. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath,
  122. ai_video_file, ai_write_status, request_id)
  123. push_p_result = t.submit(push_video_stream, frame_merge, push_p, push_url,
  124. p_push_status,
  125. request_id)
  126. # 指定多少帧上传一次问题
  127. if high_score_image.get("current_frame") is None:
  128. high_score_image["current_frame"] = frame_index_list[i]
  129. diff_frame_num = frame_index_list[i] - high_score_image["current_frame"]
  130. # 指定帧内处理逻辑
  131. if frame_step >= diff_frame_num and high_score_image.get("code") is not None and len(det_xywh) > 0:
  132. # 所有的模型编号
  133. cache_codes = set(high_score_image["code"].keys())
  134. # 遍历当前模型
  135. for det_code, det_clss in det_xywh.items():
  136. # 如果模型编号不在缓存中
  137. if det_code not in cache_codes:
  138. high_score_image["code"][det_code] = {}
  139. for cls, cls_list in det_xywh[det_code].items():
  140. if len(cls_list) > 0:
  141. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  142. else:
  143. # 如果模型编号在缓存中, 检查检测目标的情况
  144. cache_clss = set(high_score_image["code"][det_code].keys())
  145. for cls, cls_list in det_xywh[det_code].items():
  146. if len(cls_list) > 0:
  147. # 如果检测目标在缓存中
  148. if cls not in cache_clss:
  149. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  150. else:
  151. if len(det_xywh[det_code][cls]) > len(high_score_image["code"][det_code][cls][2]):
  152. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  153. elif len(det_xywh[det_code][cls]) == len(high_score_image["code"][det_code][cls][2]):
  154. # [cls, box, score, label_array, color]
  155. hs = 0
  156. for ii, s in enumerate(cls_list):
  157. if s[2] >= high_score_image["code"][det_code][cls][2][ii][2]:
  158. hs += 1
  159. if hs == len(cls_list):
  160. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  161. if diff_frame_num > frame_step:
  162. if high_score_image.get("code") is not None:
  163. high_score_image["or_frame"] = frame
  164. put_queue(image_queue, (1, [high_score_image["code"], all_frames, draw_config["font_config"]]))
  165. high_score_image["code"] = None
  166. high_score_image["current_frame"] = None
  167. # 如果有问题, 走下面的逻辑
  168. if len(det_xywh) > 0:
  169. flag = True
  170. # 比较上一次识别到问题的帧和当前问题帧的相似度
  171. if picture_similarity and high_score_image.get("or_frame") is not None:
  172. hash1 = ImageUtils.dHash(high_score_image["or_frame"])
  173. hash2 = ImageUtils.dHash(frame)
  174. dist = ImageUtils.Hamming_distance(hash1, hash2)
  175. similarity_1 = 1 - dist * 1.0 / 64
  176. if similarity_1 >= similarity:
  177. flag = False
  178. # 如果缓存问题是空的,给缓存添加问题
  179. if flag and high_score_image.get("code") is None:
  180. high_score_image["code"] = {}
  181. for code, det_list in det_xywh.items():
  182. if high_score_image["code"].get(code) is None:
  183. high_score_image["code"][code] = {}
  184. for cls, cls_list in det_list.items():
  185. high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list)
  186. push_p = push_p_result.result(timeout=5)
  187. ai_video_file = write_ai_video_result.result(timeout=5)
  188. or_video_file = write_or_video_result.result(timeout=5)
  189. # 接收停止指令
  190. if push_r[0] == 2:
  191. if 'stop' == push_r[1]:
  192. logger.info("停止推流线程, requestId: {}", request_id)
  193. break
  194. if 'stop_ex' == push_r[1]:
  195. logger.info("停止推流线程, requestId: {}", request_id)
  196. ex_status = False
  197. break
  198. del push_r
  199. else:
  200. sleep(1)
  201. except ServiceException as s:
  202. logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id)
  203. ex = s.code, s.msg
  204. except Exception:
  205. logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id)
  206. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  207. finally:
  208. # 关闭推流管, 原视频写对象, 分析视频写对象
  209. close_all_p(push_p, or_video_file, ai_video_file, request_id)
  210. if ex:
  211. code, msg = ex
  212. put_queue(push_ex_queue, (1, code, msg), timeout=2)
  213. else:
  214. if ex_status:
  215. # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片
  216. c_time = time()
  217. while time() - c_time < 60:
  218. if image_queue.qsize() == 0 or image_queue.empty():
  219. break
  220. sleep(2)
  221. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  222. clear_queue(q)
  223. logger.info("推流进程停止完成!requestId:{}", request_id)
  224. class OffPushStreamProcess2(PushStreamProcess2):
  225. __slots__ = ()
  226. def run(self):
  227. self.build_logo_url()
  228. msg, context = self._msg, self._context
  229. request_id = msg["request_id"]
  230. base_dir, env = context["base_dir"], context['env']
  231. push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \
  232. self._hb_queue
  233. aiFilePath, logo = context["aiFilePath"], context["logo"]
  234. ai_video_file, push_p, push_url = None, None, msg["push_url"]
  235. service_timeout = int(context["service"]["timeout"]) + 120
  236. frame_score = context["service"]["filter"]["frame_score"]
  237. ex = None
  238. ex_status = True
  239. high_score_image = {}
  240. # 相似度, 默认值0.65
  241. similarity = context["service"]["filter"]["similarity"]
  242. # 图片相似度开关
  243. picture_similarity = bool(context["service"]["filter"]["picture_similarity"])
  244. frame_step = int(context["service"]["filter"]["frame_step"])
  245. try:
  246. init_log(base_dir, env)
  247. logger.info("开始启动离线推流进程!requestId:{}", request_id)
  248. with ThreadPoolExecutor(max_workers=2) as t:
  249. # 定义三种推流、写原视频流、写ai视频流策略
  250. # 第一个参数时间, 第二个参数重试次数
  251. p_push_status, ai_write_status = [0, 0], [0, 0]
  252. start_time = time()
  253. while True:
  254. # 检测推流执行超时时间
  255. if time() - start_time > service_timeout:
  256. logger.error("离线推流超时, requestId: {}", request_id)
  257. raise ServiceException(ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[0],
  258. ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[1])
  259. # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己
  260. if psutil.Process(getpid()).ppid() == 1:
  261. ex_status = False
  262. logger.info("离线推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id)
  263. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  264. clear_queue(q)
  265. break
  266. # 获取推流的视频帧
  267. push_r = get_no_block_queue(push_queue)
  268. if push_r is not None:
  269. # [(1, ...] 视频帧操作
  270. # [(2, 操作指令)] 指令操作
  271. if push_r[0] == 1:
  272. frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1]
  273. # 处理每一帧图片
  274. for i, frame in enumerate(frame_list):
  275. if frame_index_list[i] % 300 == 0 and frame_index_list[i] <= all_frames:
  276. task_process = "%.2f" % (float(frame_index_list[i]) / float(all_frames))
  277. put_queue(hb_queue, {"hb_value": task_process}, timeout=2)
  278. # 复制帧用来画图
  279. copy_frame = frame.copy()
  280. # 所有问题记录字典
  281. det_xywh, thread_p = {}, []
  282. for s_det_list in push_objs:
  283. code, det_result = s_det_list[0], s_det_list[1][i]
  284. if len(det_result) > 0:
  285. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  286. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  287. for qs in det_result:
  288. box, score, cls = xywh2xyxy2(qs)
  289. if cls not in allowedList or score < frame_score:
  290. continue
  291. label_array, color = label_arrays[cls], rainbows[cls]
  292. rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config)
  293. thread_p.append(rr)
  294. if det_xywh.get(code) is None:
  295. det_xywh[code] = {}
  296. cd = det_xywh[code].get(cls)
  297. if cd is None:
  298. det_xywh[code][cls] = [[cls, box, score, label_array, color]]
  299. else:
  300. det_xywh[code][cls].append([cls, box, score, label_array, color])
  301. if logo:
  302. frame = add_water_pic(frame, logo, request_id)
  303. copy_frame = add_water_pic(copy_frame, logo, request_id)
  304. if len(thread_p) > 0:
  305. for r in thread_p:
  306. r.result()
  307. frame_merge = video_conjuncing(frame, copy_frame)
  308. # 写识别视频到本地
  309. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath,
  310. ai_video_file,
  311. ai_write_status, request_id)
  312. push_p_result = t.submit(push_video_stream, frame_merge, push_p, push_url,
  313. p_push_status,
  314. request_id)
  315. # 指定多少帧上传一次问题
  316. if high_score_image.get("current_frame") is None:
  317. high_score_image["current_frame"] = frame_index_list[i]
  318. diff_frame_num = frame_index_list[i] - high_score_image["current_frame"]
  319. # 指定帧内处理逻辑
  320. if frame_step >= diff_frame_num and high_score_image.get("code") is not None and len(det_xywh) > 0:
  321. # 所有的模型编号
  322. cache_codes = set(high_score_image["code"].keys())
  323. # 遍历当前模型
  324. for det_code, det_clss in det_xywh.items():
  325. # 如果模型编号不在缓存中
  326. if det_code not in cache_codes:
  327. high_score_image["code"][det_code] = {}
  328. for cls, cls_list in det_xywh[det_code].items():
  329. if len(cls_list) > 0:
  330. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  331. else:
  332. # 如果模型编号在缓存中, 检查检测目标的情况
  333. cache_clss = set(high_score_image["code"][det_code].keys())
  334. for cls, cls_list in det_xywh[det_code].items():
  335. if len(cls_list) > 0:
  336. # 如果检测目标在缓存中
  337. if cls not in cache_clss:
  338. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  339. else:
  340. if len(det_xywh[det_code][cls]) > len(high_score_image["code"][det_code][cls][2]):
  341. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  342. elif len(det_xywh[det_code][cls]) == len(high_score_image["code"][det_code][cls][2]):
  343. # [cls, box, score, label_array, color]
  344. hs = 0
  345. for ii, s in enumerate(cls_list):
  346. if s[2] >= high_score_image["code"][det_code][cls][2][ii][2]:
  347. hs += 1
  348. if hs == len(cls_list):
  349. high_score_image["code"][det_code][cls] = (frame, frame_index_list[i], cls_list)
  350. if diff_frame_num > frame_step:
  351. if high_score_image.get("code") is not None:
  352. high_score_image["or_frame"] = frame
  353. put_queue(image_queue, (1, [high_score_image["code"], all_frames, draw_config["font_config"]]))
  354. high_score_image["code"] = None
  355. high_score_image["current_frame"] = None
  356. # 如果有问题, 走下面的逻辑
  357. if len(det_xywh) > 0:
  358. flag = True
  359. # 比较上一次识别到问题的帧和当前问题帧的相似度
  360. if picture_similarity and high_score_image.get("or_frame") is not None:
  361. hash1 = ImageUtils.dHash(high_score_image["or_frame"])
  362. hash2 = ImageUtils.dHash(frame)
  363. dist = ImageUtils.Hamming_distance(hash1, hash2)
  364. similarity_1 = 1 - dist * 1.0 / 64
  365. if similarity_1 >= similarity:
  366. flag = False
  367. # 如果缓存问题是空的,给缓存添加问题
  368. if flag and high_score_image.get("code") is None:
  369. high_score_image["code"] = {}
  370. for code, det_list in det_xywh.items():
  371. if high_score_image["code"].get(code) is None:
  372. high_score_image["code"][code] = {}
  373. for cls, cls_list in det_list.items():
  374. high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list)
  375. push_p = push_p_result.result(timeout=5)
  376. ai_video_file = write_ai_video_result.result(timeout=5)
  377. # 接收停止指令
  378. if push_r[0] == 2:
  379. if 'stop' == push_r[1]:
  380. logger.info("停止推流线程, requestId: {}", request_id)
  381. break
  382. if 'stop_ex' == push_r[1]:
  383. logger.info("停止推流线程, requestId: {}", request_id)
  384. ex_status = False
  385. break
  386. del push_r
  387. else:
  388. sleep(1)
  389. except ServiceException as s:
  390. logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id)
  391. ex = s.code, s.msg
  392. except Exception:
  393. logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id)
  394. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  395. finally:
  396. # 关闭推流管, 分析视频写对象
  397. close_all_p(push_p, None, ai_video_file, request_id)
  398. if ex:
  399. code, msg = ex
  400. put_queue(push_ex_queue, (1, code, msg), timeout=2)
  401. else:
  402. if ex_status:
  403. # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片
  404. c_time = time()
  405. while time() - c_time < 60:
  406. if image_queue.qsize() == 0 or image_queue.empty():
  407. break
  408. sleep(2)
  409. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  410. clear_queue(q)
  411. logger.info("推流进程停止完成!requestId:{}", request_id)