You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

457 satır
28KB

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor
  3. from multiprocessing import Process
  4. from os import getpid
  5. from os.path import join
  6. from time import time, sleep
  7. from traceback import format_exc
  8. import cv2
  9. import numpy as np
  10. import psutil
  11. from loguru import logger
  12. from enums.ExceptionEnum import ExceptionType
  13. from enums.ModelTypeEnum import ModelType
  14. from exception.CustomerException import ServiceException
  15. from util import ImageUtils
  16. from util.Cv2Utils import video_conjuncing, write_or_video, write_ai_video, push_video_stream, close_all_p
  17. from util.ImageUtils import url2Array, add_water_pic
  18. from util.LogUtils import init_log
  19. from util.PlotsUtils import draw_painting_joint, filterBox, xywh2xyxy2, draw_name_joint
  20. from util.QueUtil import get_no_block_queue, put_queue, clear_queue
  21. class PushStreamProcess(Process):
  22. __slots__ = ("_msg", "_push_queue", "_image_queue", '_push_ex_queue', '_hb_queue', "_context")
  23. def __init__(self, *args):
  24. super().__init__()
  25. # 传参
  26. self._msg, self._push_queue, self._image_queue, self._push_ex_queue, self._hb_queue, self._context = args
  27. def build_logo_url(self):
  28. logo = None
  29. if self._context["video"]["video_add_water"]:
  30. logo = self._msg.get("logo_url")
  31. if logo:
  32. logo = url2Array(logo, enable_ex=False)
  33. if logo is None:
  34. logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1)
  35. self._context["logo"] = logo
  36. @staticmethod
  37. def handle_image(det_xywh, det, frame_score, copy_frame, draw_config, code_list):
  38. code, det_result = det
  39. # 每个单独模型处理
  40. # 模型编号、100帧的所有问题, 检测目标、颜色、文字图片
  41. if len(det_result) > 0:
  42. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  43. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  44. for qs in det_result:
  45. box, score, cls = xywh2xyxy2(qs)
  46. if cls not in allowedList or score < frame_score:
  47. continue
  48. label_array, color = label_arrays[cls], rainbows[cls]
  49. draw_painting_joint(box, copy_frame, label_array, score, color, font_config)
  50. if det_xywh.get(code) is None:
  51. det_xywh[code], code_list[code] = {}, {}
  52. cd = det_xywh[code].get(cls)
  53. if cd is None:
  54. code_list[code][cls] = 1
  55. det_xywh[code][cls] = [[cls, box, score, label_array, color]]
  56. else:
  57. code_list[code][cls] += 1
  58. det_xywh[code][cls].append([cls, box, score, label_array, color])
  59. class OnPushStreamProcess(PushStreamProcess):
  60. __slots__ = ()
  61. def run(self):
  62. self.build_logo_url()
  63. msg, context = self._msg, self._context
  64. base_dir, env, orFilePath, aiFilePath, logo, service_timeout, frame_score = context["base_dir"], \
  65. context['env'], context["orFilePath"], context["aiFilePath"], context["logo"], \
  66. int(context["service"]["timeout"]) + 120, context["service"]["filter"]["frame_score"]
  67. request_id, push_url = msg["request_id"], msg["push_url"]
  68. push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \
  69. self._hb_queue
  70. or_video_file, ai_video_file, push_p, ex = None, None, None, None
  71. ex_status = True
  72. # 图片相似度开关
  73. picture_similarity = bool(context["service"]["filter"]["picture_similarity"])
  74. qs_np_tmp = None
  75. pix_dis = 60
  76. try:
  77. init_log(base_dir, env)
  78. logger.info("开始实时启动推流进程!requestId:{}", request_id)
  79. with ThreadPoolExecutor(max_workers=2) as t:
  80. # 定义三种推流、写原视频流、写ai视频流策略
  81. # 第一个参数时间, 第二个参数重试次数
  82. p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0]
  83. start_time = time()
  84. while True:
  85. # 检测推流执行超时时间, 1.防止任务运行超时 2.主进程挂了,子进程运行超时
  86. if time() - start_time > service_timeout:
  87. logger.error("推流超时, requestId: {}", request_id)
  88. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  89. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  90. # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己
  91. if psutil.Process(getpid()).ppid() == 1:
  92. logger.info("推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id)
  93. ex_status = False
  94. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  95. clear_queue(q)
  96. break
  97. # 获取推流的视频帧
  98. push_r = get_no_block_queue(push_queue)
  99. if push_r is not None:
  100. if push_r[0] == 1:
  101. frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1]
  102. for i, frame in enumerate(frame_list):
  103. pix_dis = int((frame.shape[0]//10)*1.2)
  104. # 复制帧用来画图
  105. copy_frame = frame.copy()
  106. det_xywh, thread_p = {}, []
  107. det_xywh2 = {}
  108. # 所有问题的矩阵集合
  109. qs_np = None
  110. qs_reurn = []
  111. for det in push_objs[i]:
  112. code, det_result = det
  113. # 每个单独模型处理
  114. # 模型编号、100帧的所有问题, 检测目标、颜色、文字图片
  115. if len(det_result) > 0:
  116. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  117. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  118. for qs in det_result:
  119. try: # 应对NaN情况
  120. box, score, cls = xywh2xyxy2(qs)
  121. except:
  122. continue
  123. if cls not in allowedList or score < frame_score:
  124. continue
  125. label_array, color = label_arrays[cls], rainbows[cls]
  126. if ModelType.CHANNEL2_MODEL.value[1] == str(code) and cls == 2:
  127. rr = t.submit(draw_name_joint, box, copy_frame, draw_config[code]["label_dict"], score, color, font_config, qs[6])
  128. else:
  129. rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config)
  130. thread_p.append(rr)
  131. if det_xywh.get(code) is None:
  132. det_xywh[code] = {}
  133. cd = det_xywh[code].get(cls)
  134. if not (ModelType.CHANNEL2_MODEL.value[1] == str(code) and cls == 2):
  135. if cd is None:
  136. det_xywh[code][cls] = [[cls, box, score, label_array, color]]
  137. qs_np = np.array([box[0][0], box[0][1], box[1][0], box[1][1],
  138. box[2][0], box[2][1], box[3][0], box[3][1],
  139. score, cls, code],dtype=np.float32)
  140. else:
  141. det_xywh[code][cls].append([cls, box, score, label_array, color])
  142. result_li = np.array([box[0][0], box[0][1], box[1][0], box[1][1],
  143. box[2][0], box[2][1], box[3][0], box[3][1],
  144. score, cls, code],dtype=np.float32)
  145. qs_np = np.row_stack((qs_np, result_li))
  146. if logo:
  147. frame = add_water_pic(frame, logo, request_id)
  148. copy_frame = add_water_pic(copy_frame, logo, request_id)
  149. if len(thread_p) > 0:
  150. for r in thread_p:
  151. r.result()
  152. frame_merge = video_conjuncing(frame, copy_frame)
  153. # 写原视频到本地
  154. write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file,
  155. or_write_status, request_id)
  156. # 写识别视频到本地
  157. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath,
  158. ai_video_file, ai_write_status, request_id)
  159. push_stream_result = t.submit(push_video_stream, frame_merge, push_p, push_url,
  160. p_push_status, request_id)
  161. # 如果有问题, 走下面的逻辑
  162. if qs_np is not None:
  163. if len(qs_np.shape) == 1:
  164. qs_np = qs_np[np.newaxis,...]
  165. qs_np_id = qs_np.copy()
  166. b = np.ones(qs_np_id.shape[0])
  167. qs_np_id = np.column_stack((qs_np_id,b))
  168. if qs_np_tmp is None:
  169. if picture_similarity:
  170. qs_np_tmp = qs_np_id.copy()
  171. b = np.zeros(qs_np.shape[0])
  172. qs_reurn = np.column_stack((qs_np,b))
  173. else:
  174. qs_reurn = filterBox(qs_np, qs_np_tmp, pix_dis)
  175. if picture_similarity:
  176. qs_np_tmp = np.append(qs_np_tmp,qs_np_id,axis=0)
  177. qs_np_tmp[:, 11] += 1
  178. qs_np_tmp = np.delete(qs_np_tmp, np.where((qs_np_tmp[:, 11] >= 75))[0], axis=0)
  179. has = False
  180. new_lab = []
  181. for j in qs_reurn:
  182. if j[11] == 1:
  183. has = True
  184. new_lab.append(j[9])
  185. if has:
  186. for q in qs_reurn:
  187. if q[11] >= 1:
  188. cls = int(q[9])
  189. if not (cls in new_lab):
  190. continue # 为了防止其他类别被带出
  191. code = str(int(q[10])).zfill(3)
  192. if det_xywh2.get(code) is None:
  193. det_xywh2[code] = {}
  194. cd = det_xywh2[code].get(cls)
  195. score = q[8]
  196. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  197. label_array, color = label_arrays[cls], rainbows[cls]
  198. box = [(int(q[0]), int(q[1])), (int(q[2]), int(q[3])),
  199. (int(q[4]), int(q[5])), (int(q[6]), int(q[7]))]
  200. is_new = False
  201. if q[11] == 1:
  202. is_new = True
  203. if cd is None:
  204. det_xywh2[code][cls] = [[cls, box, score, label_array, color, is_new]]
  205. else:
  206. det_xywh2[code][cls].append([cls, box, score, label_array, color, is_new])
  207. if len(det_xywh2) > 0:
  208. put_queue(image_queue, (1, [det_xywh2, frame, frame_index_list[i], all_frames, draw_config["font_config"]]))
  209. push_p = push_stream_result.result(timeout=60)
  210. ai_video_file = write_ai_video_result.result(timeout=60)
  211. or_video_file = write_or_video_result.result(timeout=60)
  212. # 接收停止指令
  213. if push_r[0] == 2:
  214. if 'stop' == push_r[1]:
  215. logger.info("停止推流进程, requestId: {}", request_id)
  216. break
  217. if 'stop_ex' == push_r[1]:
  218. ex_status = False
  219. logger.info("停止推流进程, requestId: {}", request_id)
  220. break
  221. del push_r
  222. else:
  223. sleep(1)
  224. except ServiceException as s:
  225. logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id)
  226. ex = s.code, s.msg
  227. except Exception:
  228. logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id)
  229. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  230. finally:
  231. # 关闭推流管, 原视频写对象, 分析视频写对象
  232. close_all_p(push_p, or_video_file, ai_video_file, request_id)
  233. if ex:
  234. code, msg = ex
  235. put_queue(push_ex_queue, (1, code, msg), timeout=2)
  236. else:
  237. if ex_status:
  238. # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片
  239. c_time = time()
  240. while time() - c_time < 60:
  241. if image_queue.qsize() == 0 or image_queue.empty():
  242. break
  243. sleep(2)
  244. for q in [push_queue, image_queue, hb_queue]:
  245. clear_queue(q)
  246. logger.info("推流进程停止完成!图片队列大小: {}, requestId:{}", image_queue.qsize(), request_id)
  247. class OffPushStreamProcess(PushStreamProcess):
  248. __slots__ = ()
  249. def run(self):
  250. self.build_logo_url()
  251. msg, context = self._msg, self._context
  252. request_id = msg["request_id"]
  253. base_dir, env = context["base_dir"], context['env']
  254. push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \
  255. self._hb_queue
  256. aiFilePath, logo = context["aiFilePath"], context["logo"]
  257. ai_video_file, push_p, push_url = None, None, msg["push_url"]
  258. service_timeout = int(context["service"]["timeout"]) + 120
  259. frame_score = context["service"]["filter"]["frame_score"]
  260. ex = None
  261. ex_status = True
  262. # 图片相似度开关
  263. picture_similarity = bool(context["service"]["filter"]["picture_similarity"])
  264. qs_np_tmp = None
  265. pix_dis = 60
  266. try:
  267. init_log(base_dir, env)
  268. logger.info("开始启动离线推流进程!requestId:{}", request_id)
  269. with ThreadPoolExecutor(max_workers=2) as t:
  270. # 定义三种推流、写原视频流、写ai视频流策略
  271. # 第一个参数时间, 第二个参数重试次数
  272. p_push_status, ai_write_status = [0, 0], [0, 0]
  273. start_time = time()
  274. while True:
  275. # 检测推流执行超时时间
  276. if time() - start_time > service_timeout:
  277. logger.error("离线推流超时, requestId: {}", request_id)
  278. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  279. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  280. # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己
  281. if psutil.Process(getpid()).ppid() == 1:
  282. logger.info("离线推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id)
  283. ex_status = False
  284. for q in [push_queue, image_queue, push_ex_queue, hb_queue]:
  285. clear_queue(q)
  286. break
  287. # 获取推流的视频帧
  288. push_r = get_no_block_queue(push_queue)
  289. if push_r is not None:
  290. # [(1, ...] 视频帧操作
  291. # [(2, 操作指令)] 指令操作
  292. if push_r[0] == 1:
  293. frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1]
  294. # 处理每一帧图片
  295. for i, frame in enumerate(frame_list):
  296. pix_dis = int((frame.shape[0]//10)*1.2)
  297. if frame_index_list[i] % 300 == 0 and frame_index_list[i] <= all_frames:
  298. task_process = "%.2f" % (float(frame_index_list[i]) / float(all_frames))
  299. put_queue(hb_queue, {"hb_value": task_process}, timeout=2)
  300. # 复制帧用来画图
  301. copy_frame = frame.copy()
  302. # 所有问题记录字典
  303. det_xywh, thread_p = {}, []
  304. det_xywh2 = {}
  305. # 所有问题的矩阵集合
  306. qs_np = None
  307. qs_reurn = []
  308. for det in push_objs[i]:
  309. code, det_result = det
  310. # 每个单独模型处理
  311. # 模型编号、100帧的所有问题, 检测目标、颜色、文字图片
  312. if len(det_result) > 0:
  313. font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"]
  314. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  315. for qs in det_result:
  316. box, score, cls = xywh2xyxy2(qs)
  317. if cls not in allowedList or score < frame_score:
  318. continue
  319. label_array, color = label_arrays[cls], rainbows[cls]
  320. if ModelType.CHANNEL2_MODEL.value[1] == str(code) and cls == 2:
  321. rr = t.submit(draw_name_joint, box, copy_frame, draw_config[code]["label_dict"], score, color, font_config, qs[6])
  322. else:
  323. rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config)
  324. thread_p.append(rr)
  325. if det_xywh.get(code) is None:
  326. det_xywh[code] = {}
  327. cd = det_xywh[code].get(cls)
  328. if not (ModelType.CHANNEL2_MODEL.value[1] == str(code) and cls == 2):
  329. if cd is None:
  330. det_xywh[code][cls] = [[cls, box, score, label_array, color]]
  331. qs_np = np.array([box[0][0], box[0][1], box[1][0], box[1][1],
  332. box[2][0], box[2][1], box[3][0], box[3][1],
  333. score, cls, code],dtype=np.float32)
  334. else:
  335. det_xywh[code][cls].append([cls, box, score, label_array, color])
  336. result_li = np.array([box[0][0], box[0][1], box[1][0], box[1][1],
  337. box[2][0], box[2][1], box[3][0], box[3][1],
  338. score, cls, code],dtype=np.float32)
  339. qs_np = np.row_stack((qs_np, result_li))
  340. if logo:
  341. frame = add_water_pic(frame, logo, request_id)
  342. copy_frame = add_water_pic(copy_frame, logo, request_id)
  343. if len(thread_p) > 0:
  344. for r in thread_p:
  345. r.result()
  346. frame_merge = video_conjuncing(frame, copy_frame)
  347. # 写识别视频到本地
  348. write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath,
  349. ai_video_file,
  350. ai_write_status, request_id)
  351. push_stream_result = t.submit(push_video_stream, frame_merge, push_p, push_url,
  352. p_push_status, request_id)
  353. if qs_np is not None:
  354. if len(qs_np.shape) == 1:
  355. qs_np = qs_np[np.newaxis,...]
  356. qs_np_id = qs_np.copy()
  357. b = np.ones(qs_np_id.shape[0])
  358. qs_np_id = np.column_stack((qs_np_id,b))
  359. if qs_np_tmp is None:
  360. if picture_similarity:
  361. qs_np_tmp = qs_np_id.copy()
  362. b = np.zeros(qs_np.shape[0])
  363. qs_reurn = np.column_stack((qs_np,b))
  364. else:
  365. qs_reurn = filterBox(qs_np, qs_np_tmp, pix_dis)
  366. if picture_similarity:
  367. qs_np_tmp = np.append(qs_np_tmp,qs_np_id,axis=0)
  368. qs_np_tmp[:, 11] += 1
  369. qs_np_tmp = np.delete(qs_np_tmp, np.where((qs_np_tmp[:, 11] >= 75))[0], axis=0)
  370. has = False
  371. new_lab = []
  372. for j in qs_reurn:
  373. if j[11] == 1:
  374. has = True
  375. new_lab.append(j[9])
  376. if has:
  377. for q in qs_reurn:
  378. if q[11] >= 1:
  379. cls = int(q[9])
  380. if not (cls in new_lab):
  381. continue # 为了防止其他类别被带出
  382. code = str(int(q[10])).zfill(3)
  383. if det_xywh2.get(code) is None:
  384. det_xywh2[code] = {}
  385. cd = det_xywh2[code].get(cls)
  386. score = q[8]
  387. rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"]
  388. label_array, color = label_arrays[cls], rainbows[cls]
  389. box = [(int(q[0]), int(q[1])), (int(q[2]), int(q[3])),
  390. (int(q[4]), int(q[5])), (int(q[6]), int(q[7]))]
  391. is_new = False
  392. if q[11] == 1:
  393. is_new = True
  394. if cd is None:
  395. det_xywh2[code][cls] = [[cls, box, score, label_array, color, is_new]]
  396. else:
  397. det_xywh2[code][cls].append([cls, box, score, label_array, color, is_new])
  398. if len(det_xywh2) > 0:
  399. put_queue(image_queue, (1, [det_xywh2, frame, frame_index_list[i], all_frames, draw_config["font_config"]]))
  400. push_p = push_stream_result.result(timeout=60)
  401. ai_video_file = write_ai_video_result.result(timeout=60)
  402. # 接收停止指令
  403. if push_r[0] == 2:
  404. if 'stop' == push_r[1]:
  405. logger.info("停止推流进程, requestId: {}", request_id)
  406. break
  407. if 'stop_ex' == push_r[1]:
  408. logger.info("停止推流进程, requestId: {}", request_id)
  409. ex_status = False
  410. break
  411. del push_r
  412. else:
  413. sleep(1)
  414. except ServiceException as s:
  415. logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id)
  416. ex = s.code, s.msg
  417. except Exception:
  418. logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id)
  419. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  420. finally:
  421. # 关闭推流管, 分析视频写对象
  422. close_all_p(push_p, None, ai_video_file, request_id)
  423. if ex:
  424. code, msg = ex
  425. put_queue(push_ex_queue, (1, code, msg), timeout=2)
  426. else:
  427. if ex_status:
  428. # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片
  429. c_time = time()
  430. while time() - c_time < 60:
  431. if image_queue.qsize() == 0 or image_queue.empty():
  432. break
  433. sleep(2)
  434. for q in [push_queue, image_queue, hb_queue]:
  435. clear_queue(q)
  436. logger.info("推流进程停止完成!requestId:{}", request_id)