You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1364 lines
77KB

  1. # -*- coding: utf-8 -*-
  2. import base64
  3. import os
  4. from concurrent.futures import ThreadPoolExecutor
  5. from os.path import join, exists, getsize
  6. from time import time, sleep
  7. from traceback import format_exc
  8. import cv2
  9. from multiprocessing import Process, Queue
  10. import numpy as np
  11. from loguru import logger
  12. from common.Constant import init_progess, success_progess
  13. from concurrency.FileUploadThread import ImageTypeImageFileUpload
  14. from concurrency.HeartbeatThread import Heartbeat
  15. from concurrency.PullVideoStreamProcess import OnlinePullVideoStreamProcess, OfflinePullVideoStreamProcess
  16. from concurrency.PushVideoStreamProcess import OnPushStreamProcess, OffPushStreamProcess
  17. from util.GPUtils import check_gpu_resource
  18. from util.LogUtils import init_log
  19. from concurrency.CommonThread import Common
  20. from concurrency.PullStreamThread import RecordingPullStreamThread
  21. from concurrency.RecordingHeartbeatThread import RecordingHeartbeat
  22. from enums.AnalysisStatusEnum import AnalysisStatus
  23. from enums.AnalysisTypeEnum import AnalysisType
  24. from enums.ExceptionEnum import ExceptionType
  25. from enums.ModelTypeEnum import ModelType
  26. from enums.RecordingStatusEnum import RecordingStatus
  27. from util.AliyunSdk import ThAliyunVodSdk
  28. from util.CpuUtils import check_cpu
  29. from util.Cv2Utils import write_or_video, push_video_stream, close_all_p
  30. from entity.FeedBack import message_feedback, recording_feedback
  31. from exception.CustomerException import ServiceException
  32. from util.ImageUtils import url2Array, add_water_pic
  33. from util.ModelUtils import MODEL_CONFIG
  34. from util.OcrBaiduSdk import OcrBaiduSdk
  35. from enums.BaiduSdkEnum import VehicleEnumVALUE
  36. from enums.ModelTypeEnum import BaiduModelTarget
  37. from util.PlotsUtils import xywh2xyxy2
  38. from util.QueUtil import put_queue, get_no_block_queue, clear_queue
  39. from util.TimeUtils import now_date_to_str, YMDHMSF
  40. class IntelligentRecognitionProcess(Process):
  41. __slots__ = ('_fb_queue', '_msg', '_analyse_type', '_context', 'event_queue', '_pull_queue', '_hb_queue',
  42. "_image_queue", "_push_queue", '_push_ex_queue')
  43. def __init__(self, *args):
  44. super().__init__()
  45. # 入参
  46. self._fb_queue, self._msg, self._analyse_type, self._context = args
  47. # 初始化参数
  48. self.event_queue, self._pull_queue, self._hb_queue, self._image_queue, self._push_queue, self._push_ex_queue = \
  49. Queue(), Queue(10), Queue(), Queue(), Queue(), Queue()
  50. # 发送waitting消息
  51. put_queue(self._fb_queue, message_feedback(self._msg["request_id"], AnalysisStatus.WAITING.value,
  52. self._analyse_type, progress=init_progess), timeout=2, is_ex=True)
  53. def sendEvent(self, eBody):
  54. put_queue(self.event_queue, eBody, timeout=2, is_ex=True)
  55. def clear_queue(self):
  56. clear_queue(self.event_queue)
  57. clear_queue(self._pull_queue)
  58. clear_queue(self._hb_queue)
  59. clear_queue(self._image_queue)
  60. clear_queue(self._push_queue)
  61. clear_queue(self._push_ex_queue)
  62. @staticmethod
  63. def build_video_path(context, msg, is_build_or=True):
  64. random_time = now_date_to_str(YMDHMSF)
  65. pre_path = '%s/%s%s' % (context["base_dir"], context["video"]["file_path"], random_time)
  66. end_path = '%s%s' % (msg["request_id"], ".mp4")
  67. if is_build_or:
  68. context["orFilePath"] = '%s%s%s' % (pre_path, "_on_or_", end_path)
  69. context["aiFilePath"] = '%s%s%s' % (pre_path, "_on_ai_", end_path)
  70. @staticmethod
  71. def start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context):
  72. hb_thread = Heartbeat(fb_queue, hb_queue, request_id, analyse_type, context)
  73. hb_thread.setDaemon(True)
  74. hb_thread.start()
  75. return hb_thread
  76. class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
  77. __slots__ = ()
  78. @staticmethod
  79. def start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context):
  80. pushProcess = OnPushStreamProcess(msg, push_queue, image_queue, push_ex_queue, hb_queue, context)
  81. pushProcess.daemon = True
  82. pushProcess.start()
  83. return pushProcess
  84. @staticmethod
  85. def start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, frame_num):
  86. pullProcess = OnlinePullVideoStreamProcess(msg, context, fb_queue, pull_queue, image_queue, analyse_type,
  87. frame_num)
  88. pullProcess.daemon = True
  89. pullProcess.start()
  90. return pullProcess
  91. @staticmethod
  92. def upload_video(base_dir, env, request_id, orFilePath, aiFilePath):
  93. aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
  94. upload_video_thread_or = Common(aliyunVodSdk.get_play_url, orFilePath, "or_online_%s" % request_id)
  95. upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id)
  96. upload_video_thread_or.setDaemon(True)
  97. upload_video_thread_ai.setDaemon(True)
  98. upload_video_thread_or.start()
  99. upload_video_thread_ai.start()
  100. or_url = upload_video_thread_or.get_result()
  101. ai_url = upload_video_thread_ai.get_result()
  102. return or_url, ai_url
  103. @staticmethod
  104. def ai_normal_dtection(model, frame, request_id):
  105. model_conf, code = model
  106. retResults = MODEL_CONFIG[code][3]([model_conf, frame, request_id])[0]
  107. if type(retResults) is np.ndarray or len(retResults) == 0:
  108. ret = retResults
  109. if type(retResults) is np.ndarray:
  110. ret = retResults.tolist()
  111. else:
  112. ret = retResults[2]
  113. return code, ret
  114. @staticmethod
  115. def obj_det(self, model_array, frame, task_status, cframe, tt, request_id):
  116. push_obj = []
  117. if task_status[1] == 1:
  118. dtection_result = []
  119. for model in model_array:
  120. result = tt.submit(self.ai_normal_dtection, model, frame, request_id)
  121. dtection_result.append(result)
  122. for d in dtection_result:
  123. code, det_r = d.result()
  124. if len(det_r) > 0:
  125. push_obj.append((code, det_r))
  126. if len(push_obj) == 0:
  127. task_status[1] = 0
  128. if task_status[1] == 0:
  129. if cframe % 30 == 0:
  130. dtection_result1 = []
  131. for model in model_array:
  132. result = tt.submit(self.ai_normal_dtection, model, frame, request_id)
  133. dtection_result1.append(result)
  134. for d in dtection_result1:
  135. code, det_r = d.result()
  136. if len(det_r) > 0:
  137. push_obj.append((code, det_r))
  138. if len(push_obj) > 0:
  139. task_status[1] = 1
  140. return push_obj
  141. @staticmethod
  142. def checkPT(start_time, service_timeout, pull_process, push_process, hb_thread, push_ex_queue, pull_queue,
  143. request_id):
  144. if time() - start_time > service_timeout:
  145. logger.error("任务执行超时, requestId: {}", request_id)
  146. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  147. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  148. if pull_process is not None and not pull_process.is_alive():
  149. while True:
  150. if pull_queue.empty() or pull_queue.qsize() == 0:
  151. break
  152. pull_result = get_no_block_queue(pull_queue)
  153. if pull_result is not None and pull_result[0] == 1:
  154. raise ServiceException(pull_result[1], pull_result[2])
  155. logger.info("拉流进程异常停止, requestId: {}", request_id)
  156. raise Exception("拉流进程异常停止!")
  157. if hb_thread is not None and not hb_thread.is_alive():
  158. logger.info("心跳线程异常停止, requestId: {}", request_id)
  159. raise Exception("心跳线程异常停止!")
  160. if push_process is not None and not push_process.is_alive():
  161. while True:
  162. if push_ex_queue.empty() or push_ex_queue.qsize() == 0:
  163. break
  164. push_result = get_no_block_queue(push_ex_queue)
  165. if push_result is not None and push_result[0] == 1:
  166. raise ServiceException(push_result[1], push_result[2])
  167. logger.info("推流进程异常停止, requestId: {}", request_id)
  168. raise Exception("推流进程异常停止!")
  169. def run(self):
  170. msg, context, analyse_type = self._msg, self._context, self._analyse_type
  171. self.build_video_path(context, msg)
  172. request_id = msg["request_id"]
  173. base_dir, env = context["base_dir"], context["env"]
  174. service_timeout = int(context["service"]["timeout"])
  175. ex = None
  176. # 拉流进程、推流进程、心跳线程
  177. pull_process, push_process, hb_thread = None, None, None
  178. # 事件队列、拉流队列、心跳队列、反馈队列
  179. event_queue, pull_queue, hb_queue, fb_queue = self.event_queue, self._pull_queue, self._hb_queue, self._fb_queue
  180. # 推流队列、推流异常队列、图片队列
  181. push_queue, push_ex_queue, image_queue = self._push_queue, self._push_ex_queue, self._image_queue
  182. try:
  183. # 初始化日志
  184. init_log(base_dir, env)
  185. # 打印启动日志
  186. logger.info("开始启动实时分析进程!requestId: {}", request_id)
  187. # 启动拉流进程(包含拉流线程, 图片上传线程)
  188. # 拉流进程初始化时间长, 先启动
  189. pull_process = self.start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, 25)
  190. # 启动心跳线程
  191. hb_thread = self.start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context)
  192. # 加载算法模型
  193. model_array = get_model(msg, context, analyse_type)
  194. # 启动推流进程
  195. push_process = self.start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context)
  196. # 第一个参数: 模型是否初始化 0:未初始化 1:初始化
  197. # 第二个参数: 检测是否有问题 0: 没有问题, 1: 有问题
  198. task_status = [0, 0]
  199. draw_config = {}
  200. start_time = time()
  201. # 识别2个线程性能最优
  202. with ThreadPoolExecutor(max_workers=2) as t:
  203. # 可能使用模型组合, 模型组合最多3个模型, 1对3, 上面的2个线程对应6个线程
  204. with ThreadPoolExecutor(max_workers=6) as tt:
  205. while True:
  206. # 检查拉流进程是否正常, 心跳线程是否正常
  207. self.checkPT(start_time, service_timeout, pull_process, push_process, hb_thread, push_ex_queue,
  208. pull_queue, request_id)
  209. # 检查推流是否异常
  210. push_status = get_no_block_queue(push_ex_queue)
  211. if push_status is not None and push_status[0] == 1:
  212. raise ServiceException(push_status[1], push_status[2])
  213. # 获取停止指令
  214. event_result = get_no_block_queue(event_queue)
  215. if event_result:
  216. cmdStr = event_result.get("command")
  217. # 接收到停止指令
  218. if "stop" == cmdStr:
  219. logger.info("实时任务开始停止, requestId: {}", request_id)
  220. pull_process.sendCommand({"command": 'stop'})
  221. pull_result = get_no_block_queue(pull_queue)
  222. if pull_result is None:
  223. sleep(1)
  224. continue
  225. # (4, (frame_list, frame_index_list, all_frames))
  226. if pull_result[0] == 4:
  227. frame_list, frame_index_list, all_frames = pull_result[1]
  228. if len(frame_list) > 0:
  229. # 判断是否已经初始化
  230. if task_status[0] == 0:
  231. task_status[0] = 1
  232. for i, model in enumerate(model_array):
  233. model_conf, code = model
  234. model_param = model_conf[1]
  235. # (modeType, model_param, allowedList, names, rainbows)
  236. MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0],
  237. model_conf)
  238. if draw_config.get("font_config") is None:
  239. draw_config["font_config"] = model_param['font_config']
  240. if draw_config.get(code) is None:
  241. draw_config[code] = {}
  242. draw_config[code]["allowedList"] = model_conf[2]
  243. draw_config[code]["rainbows"] = model_conf[4]
  244. draw_config[code]["label_arrays"] = model_param['label_arraylist']
  245. if "label_dict" in model_param:
  246. draw_config[code]["label_dict"] = model_param['label_dict']
  247. # 多线程并发处理, 经过测试两个线程最优
  248. det_array = []
  249. for i, frame in enumerate(frame_list):
  250. det_result = t.submit(self.obj_det, self, model_array, frame, task_status,
  251. frame_index_list[i], tt, request_id)
  252. det_array.append(det_result)
  253. push_objs = [det.result() for det in det_array]
  254. put_queue(push_queue,
  255. (1, (frame_list, frame_index_list, all_frames, draw_config, push_objs)),
  256. timeout=2, is_ex=True)
  257. del det_array, push_objs
  258. del frame_list, frame_index_list, all_frames
  259. elif pull_result[0] == 1:
  260. # 拉流发生异常
  261. put_queue(push_queue, (2, 'stop_ex'), timeout=1, is_ex=True)
  262. push_process.join(120)
  263. pull_process.sendCommand({"command": 'stop'})
  264. pull_process.join(120)
  265. raise ServiceException(pull_result[1], pull_result[2])
  266. elif pull_result[0] == 2:
  267. put_queue(push_queue, (2, 'stop'), timeout=1, is_ex=True)
  268. push_process.join(120)
  269. pull_process.sendCommand({"command": 'stop'})
  270. pull_process.join(120)
  271. break
  272. else:
  273. raise Exception("未知拉流状态异常!")
  274. except ServiceException as s:
  275. logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id)
  276. ex = s.code, s.msg
  277. except Exception:
  278. logger.error("服务异常: {}, requestId: {},", format_exc(), request_id)
  279. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  280. finally:
  281. orFilePath, aiFilePath = context["orFilePath"], context["aiFilePath"]
  282. base_dir, env = context["base_dir"], context["env"]
  283. or_url, ai_url, exc = "", "", None
  284. try:
  285. # 如果拉流进程存在, 关闭拉流进程(拉流线程、图片上传线程)
  286. if push_process and push_process.is_alive():
  287. put_queue(push_queue, (2, 'stop_ex'), timeout=1)
  288. logger.info("关闭推流进程, requestId:{}", request_id)
  289. push_process.join(timeout=120)
  290. logger.info("关闭推流进程1, requestId:{}", request_id)
  291. if pull_process and pull_process.is_alive():
  292. pull_process.sendCommand({"command": 'stop_ex'})
  293. pull_process.sendCommand({"command": 'stop'})
  294. logger.info("关闭拉流进程, requestId:{}", request_id)
  295. pull_process.join(timeout=120)
  296. logger.info("关闭拉流进程1, requestId:{}", request_id)
  297. if exists(orFilePath) and exists(aiFilePath) and getsize(orFilePath) > 100:
  298. or_url, ai_url = self.upload_video(base_dir, env, request_id, orFilePath, aiFilePath)
  299. if or_url is None or ai_url is None:
  300. logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", request_id)
  301. raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
  302. ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
  303. # 停止心跳线程
  304. if hb_thread and hb_thread.is_alive():
  305. put_queue(hb_queue, {"command": "stop"}, timeout=1)
  306. hb_thread.join(timeout=120)
  307. if exists(orFilePath):
  308. logger.info("开始删除原视频, orFilePath: {}, requestId: {}", orFilePath, request_id)
  309. os.remove(orFilePath)
  310. logger.info("删除原视频成功, orFilePath: {}, requestId: {}", orFilePath, request_id)
  311. if exists(aiFilePath):
  312. logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, request_id)
  313. os.remove(aiFilePath)
  314. logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, request_id)
  315. # 如果有异常, 检查是否有原视频和AI视频,有则上传,响应失败
  316. if ex:
  317. code, msg = ex
  318. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  319. analyse_type,
  320. error_code=code,
  321. error_msg=msg,
  322. video_url=or_url,
  323. ai_video_url=ai_url), timeout=2, is_ex=False)
  324. else:
  325. if or_url is None or len(or_url) == 0 or ai_url is None or len(ai_url) == 0:
  326. raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0],
  327. ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1])
  328. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.SUCCESS.value,
  329. analyse_type,
  330. progress=success_progess,
  331. video_url=or_url,
  332. ai_video_url=ai_url), timeout=2, is_ex=False)
  333. except ServiceException as s:
  334. logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id)
  335. exc = s.code, s.msg
  336. except Exception:
  337. logger.error("服务异常: {}, requestId: {},", format_exc(), request_id)
  338. exc = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  339. finally:
  340. if push_process and push_process.is_alive():
  341. put_queue(push_queue, (2, 'stop_ex'), timeout=1)
  342. logger.info("关闭推流进程, requestId:{}", request_id)
  343. push_process.join(timeout=120)
  344. logger.info("关闭推流进程1, requestId:{}", request_id)
  345. if pull_process and pull_process.is_alive():
  346. pull_process.sendCommand({"command": 'stop_ex'})
  347. pull_process.sendCommand({"command": 'stop'})
  348. logger.info("关闭拉流进程, requestId:{}", request_id)
  349. pull_process.join(timeout=120)
  350. logger.info("关闭拉流进程1, requestId:{}", request_id)
  351. if exc:
  352. code, msg = exc
  353. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  354. analyse_type,
  355. error_code=code,
  356. error_msg=msg,
  357. video_url=or_url,
  358. ai_video_url=ai_url), timeout=2, is_ex=False)
  359. logger.info("清理队列, requestId:{}", request_id)
  360. self.clear_queue()
  361. logger.info("清理队列完成, requestId:{}", request_id)
  362. class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
  363. __slots__ = ()
  364. @staticmethod
  365. def upload_video(base_dir, env, request_id, aiFilePath):
  366. aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
  367. upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id)
  368. upload_video_thread_ai.setDaemon(True)
  369. upload_video_thread_ai.start()
  370. ai_url = upload_video_thread_ai.get_result()
  371. return ai_url
  372. @staticmethod
  373. def ai_normal_dtection(model, frame, request_id):
  374. model_conf, code = model
  375. retResults = MODEL_CONFIG[code][3]([model_conf, frame, request_id])[0]
  376. if type(retResults) is np.ndarray or len(retResults) == 0:
  377. ret = retResults
  378. if type(retResults) is np.ndarray:
  379. ret = retResults.tolist()
  380. else:
  381. ret = retResults[2]
  382. return code, ret
  383. @staticmethod
  384. def obj_det(self, model_array, frame, task_status, cframe, tt, request_id):
  385. push_obj = []
  386. if task_status[1] == 1:
  387. dtection_result = []
  388. for model in model_array:
  389. result = tt.submit(self.ai_normal_dtection, model, frame, request_id)
  390. dtection_result.append(result)
  391. for d in dtection_result:
  392. code, det_r = d.result()
  393. if len(det_r) > 0:
  394. push_obj.append((code, det_r))
  395. if len(push_obj) == 0:
  396. task_status[1] = 0
  397. if task_status[1] == 0:
  398. if cframe % 30 == 0:
  399. dtection_result1 = []
  400. for model in model_array:
  401. result = tt.submit(self.ai_normal_dtection, model, frame, request_id)
  402. dtection_result1.append(result)
  403. for d in dtection_result1:
  404. code, det_r = d.result()
  405. if len(det_r) > 0:
  406. push_obj.append((code, det_r))
  407. if len(push_obj) > 0:
  408. task_status[1] = 1
  409. return push_obj
  410. @staticmethod
  411. def start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context):
  412. pushProcess = OffPushStreamProcess(msg, push_queue, image_queue, push_ex_queue, hb_queue, context)
  413. pushProcess.daemon = True
  414. pushProcess.start()
  415. return pushProcess
  416. @staticmethod
  417. def start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, frame_num):
  418. pullProcess = OfflinePullVideoStreamProcess(msg, context, fb_queue, pull_queue, image_queue, analyse_type,
  419. frame_num)
  420. pullProcess.daemon = True
  421. pullProcess.start()
  422. return pullProcess
  423. @staticmethod
  424. def checkPT(service_timeout, start_time, pull_process, push_process, hb_thread, push_ex_queue, pull_queue,
  425. request_id):
  426. if time() - start_time > service_timeout:
  427. logger.error("任务执行超时, requestId: {}", request_id)
  428. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  429. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  430. if pull_process is not None and not pull_process.is_alive():
  431. while True:
  432. if pull_queue.empty() or pull_queue.qsize() == 0:
  433. break
  434. pull_result = get_no_block_queue(pull_queue)
  435. if pull_result is not None and pull_result[0] == 1:
  436. raise ServiceException(pull_result[1], pull_result[2])
  437. logger.info("拉流进程异常停止, requestId: {}", request_id)
  438. raise Exception("拉流进程异常停止!")
  439. if hb_thread is not None and not hb_thread.is_alive():
  440. logger.info("心跳线程异常停止, requestId: {}", request_id)
  441. raise Exception("心跳线程异常停止!")
  442. if push_process is not None and not push_process.is_alive():
  443. while True:
  444. if push_ex_queue.empty() or push_ex_queue.qsize() == 0:
  445. break
  446. push_result = get_no_block_queue(push_ex_queue)
  447. if push_result is not None and push_result[0] == 1:
  448. raise ServiceException(push_result[1], push_result[2])
  449. logger.info("推流进程异常停止, requestId: {}", request_id)
  450. raise Exception("推流进程异常停止!")
  451. def run(self):
  452. msg, context, analyse_type, ex = self._msg, self._context, self._analyse_type, None
  453. self.build_video_path(context, msg, is_build_or=False)
  454. request_id, base_dir, env = msg["request_id"], context["base_dir"], context["env"]
  455. # 拉流进程、推流进程、心跳线程
  456. pull_process, push_process, hb_thread = None, None, None
  457. service_timeout = int(context["service"]["timeout"])
  458. # 事件队列、拉流队列、心跳队列、反馈队列
  459. event_queue, pull_queue, hb_queue, fb_queue = self.event_queue, self._pull_queue, self._hb_queue, self._fb_queue
  460. # 推流队列、推流异常队列、图片队列
  461. push_queue, push_ex_queue, image_queue = self._push_queue, self._push_ex_queue, self._image_queue
  462. try:
  463. # 初始化日志
  464. init_log(base_dir, env)
  465. # 打印启动日志
  466. logger.info("开始启动离线分析进程!requestId: {}", request_id)
  467. # 启动拉流进程(包含拉流线程, 图片上传线程)
  468. # 拉流进程初始化时间长, 先启动
  469. pull_process = self.start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, 25)
  470. # 启动心跳线程
  471. hb_thread = self.start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context)
  472. # 加载算法模型
  473. model_array = get_model(msg, context, analyse_type)
  474. # 启动推流进程
  475. push_process = self.start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context)
  476. # 第一个参数: 模型是否初始化 0:未初始化 1:初始化
  477. # 第二个参数: 检测是否有问题 0: 没有问题, 1: 有问题
  478. task_status = [0, 0]
  479. draw_config = {}
  480. start_time = time()
  481. # 识别2个线程性能最优
  482. with ThreadPoolExecutor(max_workers=2) as t:
  483. # 可能使用模型组合, 模型组合最多3个模型, 1对3, 上面的2个线程对应6个线程
  484. with ThreadPoolExecutor(max_workers=6) as tt:
  485. while True:
  486. # 检查拉流进程是否正常, 心跳线程是否正常
  487. self.checkPT(service_timeout, start_time, pull_process, push_process, hb_thread, push_ex_queue,
  488. pull_queue, request_id)
  489. # 检查推流是否异常
  490. push_status = get_no_block_queue(push_ex_queue)
  491. if push_status is not None and push_status[0] == 1:
  492. raise ServiceException(push_status[1], push_status[2])
  493. # 获取停止指令
  494. event_result = get_no_block_queue(event_queue)
  495. if event_result:
  496. cmdStr = event_result.get("command")
  497. # 接收到停止指令
  498. if "stop" == cmdStr:
  499. logger.info("离线任务开始停止, requestId: {}", request_id)
  500. pull_process.sendCommand({"command": 'stop'})
  501. pull_result = get_no_block_queue(pull_queue)
  502. if pull_result is None:
  503. sleep(1)
  504. continue
  505. # (4, (frame_list, frame_index_list, all_frames))
  506. if pull_result[0] == 4:
  507. frame_list, frame_index_list, all_frames = pull_result[1]
  508. if len(frame_list) > 0:
  509. # 判断是否已经初始化
  510. if task_status[0] == 0:
  511. task_status[0] = 1
  512. for i, model in enumerate(model_array):
  513. model_conf, code = model
  514. model_param = model_conf[1]
  515. # (modeType, model_param, allowedList, names, rainbows)
  516. MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0],
  517. model_conf)
  518. if draw_config.get("font_config") is None:
  519. draw_config["font_config"] = model_param['font_config']
  520. if draw_config.get(code) is None:
  521. draw_config[code] = {}
  522. draw_config[code]["allowedList"] = model_conf[2]
  523. draw_config[code]["rainbows"] = model_conf[4]
  524. draw_config[code]["label_arrays"] = model_param['label_arraylist']
  525. if "label_dict" in model_param:
  526. draw_config[code]["label_dict"] = model_param['label_dict']
  527. det_array = []
  528. for i, frame in enumerate(frame_list):
  529. det_result = t.submit(self.obj_det, self, model_array, frame, task_status,
  530. frame_index_list[i], tt, request_id)
  531. det_array.append(det_result)
  532. push_objs = [det.result() for det in det_array]
  533. put_queue(push_queue,
  534. (1, (frame_list, frame_index_list, all_frames, draw_config, push_objs)),
  535. timeout=2, is_ex=True)
  536. del det_array, push_objs
  537. del frame_list, frame_index_list, all_frames
  538. elif pull_result[0] == 1:
  539. put_queue(push_queue, (2, 'stop_ex'), timeout=1, is_ex=True)
  540. logger.info("关闭推流进程, requestId:{}", request_id)
  541. push_process.join(timeout=120)
  542. logger.info("关闭推流进程1, requestId:{}", request_id)
  543. raise ServiceException(pull_result[1], pull_result[2])
  544. elif pull_result[0] == 2:
  545. logger.info("离线任务开始停止, requestId: {}", request_id)
  546. put_queue(push_queue, (2, 'stop'), timeout=1, is_ex=True)
  547. push_process.join(120)
  548. pull_process.sendCommand({"command": 'stop'})
  549. pull_process.join(120)
  550. break
  551. else:
  552. raise Exception("未知拉流状态异常!")
  553. except ServiceException as s:
  554. logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id)
  555. ex = s.code, s.msg
  556. except Exception:
  557. logger.error("服务异常: {}, requestId: {},", format_exc(), request_id)
  558. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  559. finally:
  560. base_dir, env, aiFilePath = context["base_dir"], context["env"], context["aiFilePath"]
  561. ai_url, exc = "", None
  562. try:
  563. if push_process and push_process.is_alive():
  564. put_queue(push_queue, (2, 'stop_ex'), timeout=1)
  565. push_process.join(timeout=120)
  566. if pull_process and pull_process.is_alive():
  567. pull_process.sendCommand({"command": 'stop_ex'})
  568. pull_process.sendCommand({"command": 'stop'})
  569. pull_process.join(timeout=120)
  570. if exists(aiFilePath) and getsize(aiFilePath) > 100:
  571. ai_url = self.upload_video(base_dir, env, request_id, aiFilePath)
  572. if ai_url is None:
  573. logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", request_id)
  574. raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
  575. ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
  576. # 停止心跳线程
  577. if hb_thread and hb_thread.is_alive():
  578. put_queue(hb_queue, {"command": "stop"}, timeout=1)
  579. hb_thread.join(timeout=120)
  580. if exists(aiFilePath):
  581. logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, request_id)
  582. os.remove(aiFilePath)
  583. logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, request_id)
  584. # 如果有异常, 检查是否有原视频和AI视频,有则上传,响应失败
  585. if ex:
  586. code, msg = ex
  587. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  588. analyse_type,
  589. error_code=code,
  590. error_msg=msg,
  591. ai_video_url=ai_url), timeout=2, is_ex=False)
  592. else:
  593. if ai_url is None or len(ai_url) == 0:
  594. raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0],
  595. ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1])
  596. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.SUCCESS.value,
  597. analyse_type,
  598. progress=success_progess,
  599. ai_video_url=ai_url), timeout=2, is_ex=False)
  600. except ServiceException as s:
  601. logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id)
  602. exc = s.code, s.msg
  603. except Exception:
  604. logger.error("服务异常: {}, requestId: {},", format_exc(), request_id)
  605. exc = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  606. finally:
  607. if push_process and push_process.is_alive():
  608. put_queue(push_queue, (2, 'stop_ex'), timeout=1)
  609. push_process.join(timeout=120)
  610. if pull_process and pull_process.is_alive():
  611. pull_process.sendCommand({"command": 'stop_ex'})
  612. pull_process.sendCommand({"command": 'stop'})
  613. pull_process.join(timeout=120)
  614. if hb_thread and hb_thread.is_alive():
  615. put_queue(hb_queue, {"command": "stop"}, timeout=1)
  616. hb_thread.join(timeout=120)
  617. if exc:
  618. code, msg = exc
  619. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  620. analyse_type,
  621. error_code=code,
  622. error_msg=msg,
  623. ai_video_url=ai_url), timeout=2, is_ex=False)
  624. self.clear_queue()
  625. '''
  626. 图片识别
  627. '''
  628. class PhotosIntelligentRecognitionProcess(Process):
  629. __slots__ = ("_fb_queue", "_msg", "_analyse_type", "_context", "_image_queue")
  630. def __init__(self, *args):
  631. super().__init__()
  632. self._fb_queue, self._msg, self._analyse_type, self._context = args
  633. self._image_queue = Queue()
  634. put_queue(self._fb_queue, message_feedback(self._msg["request_id"], AnalysisStatus.WAITING.value,
  635. self._analyse_type, progress=init_progess), timeout=2, is_ex=True)
  636. self.build_logo(self._msg, self._context)
  637. @staticmethod
  638. def build_logo(msg, context):
  639. logo = None
  640. if context["video"]["video_add_water"]:
  641. logo = msg.get("logo_url")
  642. if logo is not None and len(logo) > 0:
  643. logo = url2Array(logo, enable_ex=False)
  644. if logo is None:
  645. logo = cv2.imread(join(context['base_dir'], "image/logo.png"), -1)
  646. context['logo'] = logo
  647. def epidemic_prevention(self, imageUrl, model, orc, request_id):
  648. try:
  649. # modeType, allowedList, new_device, model, par, img_type
  650. model_conf, code = model
  651. modeType, allowedList, new_device, model, par, img_type = model_conf
  652. image = url2Array(imageUrl)
  653. param = [image, new_device, model, par, img_type, request_id]
  654. dataBack = MODEL_CONFIG[code][3](param)
  655. if img_type == 'plate':
  656. carCode = ''
  657. if dataBack is None or dataBack.get("plateImage") is None or len(dataBack.get("plateImage")) == 0:
  658. result = orc.license_plate_recognition(image, request_id)
  659. score = ''
  660. if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0:
  661. logger.error("车牌识别为空: {}", result)
  662. carCode = ''
  663. else:
  664. for word in result.get("words_result"):
  665. if word is not None and word.get("number") is not None:
  666. if len(carCode) == 0:
  667. carCode = word.get("number")
  668. else:
  669. carCode = carCode + "," + word.get("number")
  670. else:
  671. result = orc.license_plate_recognition(dataBack.get("plateImage")[0], request_id)
  672. score = dataBack.get("plateImage")[1]
  673. if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0:
  674. result = orc.license_plate_recognition(image, request_id)
  675. if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0:
  676. logger.error("车牌识别为空: {}", result)
  677. carCode = ''
  678. else:
  679. for word in result.get("words_result"):
  680. if word is not None and word.get("number") is not None:
  681. if len(carCode) == 0:
  682. carCode = word.get("number")
  683. else:
  684. carCode = carCode + "," + word.get("number")
  685. else:
  686. for word in result.get("words_result"):
  687. if word is not None and word.get("number") is not None:
  688. if len(carCode) == 0:
  689. carCode = word.get("number")
  690. else:
  691. carCode = carCode + "," + word.get("number")
  692. if len(carCode) > 0:
  693. plate_result = {'type': str(3), 'modelCode': code, 'carUrl': imageUrl,
  694. 'carCode': carCode,
  695. 'score': score}
  696. put_queue(self._fb_queue, message_feedback(request_id,
  697. AnalysisStatus.RUNNING.value,
  698. AnalysisType.IMAGE.value, "", "",
  699. '',
  700. imageUrl,
  701. imageUrl,
  702. str(code),
  703. str(3),
  704. plate_result), timeout=2, is_ex=True)
  705. if img_type == 'code':
  706. if dataBack is None or dataBack.get("type") is None:
  707. return
  708. # 行程码
  709. if dataBack.get("type") == 1 and 1 in allowedList:
  710. # 手机号
  711. if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
  712. phoneNumberRecognition = ''
  713. phone_score = ''
  714. else:
  715. phone = orc.universal_text_recognition(dataBack.get("phoneNumberImage")[0], request_id)
  716. phone_score = dataBack.get("phoneNumberImage")[1]
  717. if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
  718. logger.error("手机号识别为空: {}", phone)
  719. phoneNumberRecognition = ''
  720. else:
  721. phoneNumberRecognition = phone.get("words_result")
  722. if dataBack.get("cityImage") is None or len(dataBack.get("cityImage")) == 0:
  723. cityRecognition = ''
  724. city_score = ''
  725. else:
  726. city = orc.universal_text_recognition(dataBack.get("cityImage")[0], request_id)
  727. city_score = dataBack.get("cityImage")[1]
  728. if city is None or city.get("words_result") is None or len(city.get("words_result")) == 0:
  729. logger.error("城市识别为空: {}", city)
  730. cityRecognition = ''
  731. else:
  732. cityRecognition = city.get("words_result")
  733. if len(phoneNumberRecognition) > 0 or len(cityRecognition) > 0:
  734. trip_result = {'type': str(1),
  735. 'modelCode': code,
  736. 'imageUrl': imageUrl,
  737. 'phoneNumberRecognition': phoneNumberRecognition,
  738. 'phone_sorce': phone_score,
  739. 'cityRecognition': cityRecognition,
  740. 'city_score': city_score}
  741. put_queue(self._fb_queue, message_feedback(request_id,
  742. AnalysisStatus.RUNNING.value,
  743. AnalysisType.IMAGE.value, "", "",
  744. '',
  745. imageUrl,
  746. imageUrl,
  747. str(code),
  748. str(1),
  749. trip_result), timeout=2, is_ex=True)
  750. if dataBack.get("type") == 2 and 2 in allowedList:
  751. if dataBack.get("nameImage") is None or len(dataBack.get("nameImage")) == 0:
  752. nameRecognition = ''
  753. name_score = ''
  754. else:
  755. name = orc.universal_text_recognition(dataBack.get("nameImage")[0], request_id)
  756. name_score = dataBack.get("nameImage")[1]
  757. if name is None or name.get("words_result") is None or len(name.get("words_result")) == 0:
  758. logger.error("名字识别为空: {}", name)
  759. nameRecognition = ''
  760. else:
  761. nameRecognition = name.get("words_result")
  762. if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
  763. phoneNumberRecognition = ''
  764. phone_score = ''
  765. else:
  766. phone = orc.universal_text_recognition(dataBack.get("phoneNumberImage")[0], request_id)
  767. phone_score = dataBack.get("phoneNumberImage")[1]
  768. if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
  769. logger.error("手机号识别为空: {}", phone)
  770. phoneNumberRecognition = ''
  771. else:
  772. phoneNumberRecognition = phone.get("words_result")
  773. if dataBack.get("hsImage") is None or len(dataBack.get("hsImage")) == 0:
  774. hsRecognition = ''
  775. hs_score = ''
  776. else:
  777. hs = orc.universal_text_recognition(dataBack.get("hsImage")[0], request_id)
  778. hs_score = dataBack.get("hsImage")[1]
  779. if hs is None or hs.get("words_result") is None or len(hs.get("words_result")) == 0:
  780. logger.error("核酸识别为空: {}", hs)
  781. hsRecognition = ''
  782. else:
  783. hsRecognition = hs.get("words_result")
  784. if len(nameRecognition) > 0 or len(phoneNumberRecognition) > 0 or len(hsRecognition) > 0:
  785. healthy_result = {'type': str(2),
  786. 'modelCode': code,
  787. 'imageUrl': imageUrl,
  788. 'color': dataBack.get("color"),
  789. 'nameRecognition': nameRecognition,
  790. 'name_score': name_score,
  791. 'phoneNumberRecognition': phoneNumberRecognition,
  792. 'phone_score': phone_score,
  793. 'hsRecognition': hsRecognition,
  794. 'hs_score': hs_score}
  795. put_queue(self._fb_queue, message_feedback(request_id,
  796. AnalysisStatus.RUNNING.value,
  797. AnalysisType.IMAGE.value, "", "",
  798. '',
  799. imageUrl,
  800. imageUrl,
  801. str(code),
  802. str(2),
  803. healthy_result), timeout=2, is_ex=True)
  804. except ServiceException as s:
  805. raise s
  806. except Exception as e:
  807. logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id)
  808. raise e
  809. '''
  810. # 防疫模型
  811. '''
  812. def epidemicPrevention(self, imageUrls, model, base_dir, env, request_id):
  813. with ThreadPoolExecutor(max_workers=2) as t:
  814. orc = OcrBaiduSdk(base_dir, env)
  815. obj_list = []
  816. for imageUrl in imageUrls:
  817. obj = t.submit(self.epidemic_prevention, imageUrl, model, orc, request_id)
  818. obj_list.append(obj)
  819. for r in obj_list:
  820. r.result(60)
  821. def image_recognition(self, imageUrl, mod, image_queue, logo, request_id):
  822. try:
  823. model_conf, code = mod
  824. model_param = model_conf[1]
  825. image = url2Array(imageUrl)
  826. MODEL_CONFIG[code][2](image.shape[1], image.shape[0], model_conf)
  827. p_result = MODEL_CONFIG[code][3]([model_conf, image, request_id])[0]
  828. if p_result is None or len(p_result) < 3 or p_result[2] is None or len(p_result[2]) == 0:
  829. return
  830. if logo:
  831. image = add_water_pic(image, logo, request_id)
  832. # (modeType, model_param, allowedList, names, rainbows)
  833. allowedList = model_conf[2]
  834. label_arraylist = model_param['label_arraylist']
  835. font_config = model_param['font_config']
  836. rainbows = model_conf[4]
  837. det_xywh = {code: {}}
  838. ai_result_list = p_result[2]
  839. for ai_result in ai_result_list:
  840. box, score, cls = xywh2xyxy2(ai_result)
  841. # 如果检测目标在识别任务中,继续处理
  842. if cls in allowedList:
  843. label_array = label_arraylist[cls]
  844. color = rainbows[cls]
  845. cd = det_xywh[code].get(cls)
  846. if cd is None:
  847. det_xywh[code][cls] = [[cls, box, score, label_array, color]]
  848. else:
  849. det_xywh[code][cls].append([cls, box, score, label_array, color])
  850. if len(det_xywh) > 0:
  851. put_queue(image_queue, (1, (det_xywh, imageUrl, image, font_config, "")), timeout=2, is_ex=False)
  852. except ServiceException as s:
  853. raise s
  854. except Exception as e:
  855. logger.error("模型分析异常: {}, requestId: {}", format_exc(), self._msg.get("request_id"))
  856. raise e
  857. def publicIdentification(self, imageUrls, mod, image_queue, logo, request_id):
  858. with ThreadPoolExecutor(max_workers=2) as t:
  859. obj_list = []
  860. for imageUrl in imageUrls:
  861. obj = t.submit(self.image_recognition, imageUrl, mod, image_queue, logo, request_id)
  862. obj_list.append(obj)
  863. for r in obj_list:
  864. r.result(60)
  865. '''
  866. 1. imageUrls: 图片url数组,多张图片
  867. 2. mod: 模型对象
  868. 3. image_queue: 图片队列
  869. '''
  870. def baiduRecognition(self, imageUrls, mod, image_queue, logo, request_id):
  871. with ThreadPoolExecutor(max_workers=2) as t:
  872. thread_result = []
  873. for imageUrl in imageUrls:
  874. obj = t.submit(self.baidu_recognition, imageUrl, mod, image_queue, logo, request_id)
  875. thread_result.append(obj)
  876. for r in thread_result:
  877. r.result(60)
  878. def baidu_recognition(self, imageUrl, mod, image_queue, logo, request_id):
  879. with ThreadPoolExecutor(max_workers=2) as t:
  880. try:
  881. # modeType, aipImageClassifyClient, aipBodyAnalysisClient, allowedList, rainbows,
  882. # vehicle_names, person_names, requestId
  883. model_conf, code = mod
  884. allowedList = model_conf[3]
  885. rainbows = model_conf[4]
  886. # 图片转数组
  887. img = url2Array(imageUrl)
  888. vehicle_label_arrays, person_label_arrays, font_config = MODEL_CONFIG[code][2](img.shape[1],
  889. img.shape[0],
  890. model_conf)
  891. obj_list = []
  892. for target in allowedList:
  893. parm = [target, imageUrl, model_conf[1], model_conf[2], request_id]
  894. reuslt = t.submit(self.baidu_method, code, parm, img, image_queue, vehicle_label_arrays,
  895. person_label_arrays, font_config, rainbows, logo)
  896. obj_list.append(reuslt)
  897. for r in obj_list:
  898. r.result(60)
  899. except ServiceException as s:
  900. raise s
  901. except Exception as e:
  902. logger.error("百度AI分析异常: {}, requestId: {}", format_exc(), request_id)
  903. raise e
  904. @staticmethod
  905. def baidu_method(code, parm, img, image_queue, vehicle_label_arrays, person_label_arrays, font_config,
  906. rainbows, logo):
  907. # [target, url, aipImageClassifyClient, aipBodyAnalysisClient, requestId]
  908. request_id = parm[4]
  909. target = parm[0]
  910. image_url = parm[1]
  911. result = MODEL_CONFIG[code][3](parm)
  912. if target == BaiduModelTarget.VEHICLE_DETECTION.value[1] and result is not None:
  913. vehicleInfo = result.get("vehicle_info")
  914. if vehicleInfo is not None and len(vehicleInfo) > 0:
  915. det_xywh = {code: {}}
  916. copy_frame = img.copy()
  917. for i, info in enumerate(vehicleInfo):
  918. value = VehicleEnumVALUE.get(info.get("type"))
  919. target_num = value.value[2]
  920. label_array = vehicle_label_arrays[target_num]
  921. color = rainbows[target_num]
  922. if value is None:
  923. logger.error("车辆识别出现未支持的目标类型!type:{}, requestId:{}", info.get("type"), request_id)
  924. return
  925. left_top = (int(info.get("location").get("left")), int(info.get("location").get("top")))
  926. right_top = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
  927. int(info.get("location").get("top")))
  928. right_bottom = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
  929. int(info.get("location").get("top")) + int(info.get("location").get("height")))
  930. left_bottom = (int(info.get("location").get("left")),
  931. int(info.get("location").get("top")) + int(info.get("location").get("height")))
  932. box = [left_top, right_top, right_bottom, left_bottom]
  933. score = float("%.2f" % info.get("probability"))
  934. if logo:
  935. copy_frame = add_water_pic(copy_frame, logo, request_id)
  936. if det_xywh[code].get(target) is None:
  937. det_xywh[code][target] = [[target, box, score, label_array, color]]
  938. else:
  939. det_xywh[code][target].append([target, box, score, label_array, color])
  940. info["id"] = str(i)
  941. if len(det_xywh[code]) > 0:
  942. result["type"] = str(target)
  943. result["modelCode"] = code
  944. put_queue(image_queue, (1, (det_xywh, image_url, copy_frame, font_config, result)), timeout=2,
  945. is_ex=True)
  946. # 人体识别
  947. if target == BaiduModelTarget.HUMAN_DETECTION.value[1] and result is not None:
  948. personInfo = result.get("person_info")
  949. personNum = result.get("person_num")
  950. if personNum is not None and personNum > 0 and personInfo is not None and len(personInfo) > 0:
  951. det_xywh = {code: {}}
  952. copy_frame = img.copy()
  953. for i, info in enumerate(personInfo):
  954. left_top = (int(info.get("location").get("left")), int(info.get("location").get("top")))
  955. right_top = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
  956. int(info.get("location").get("top")))
  957. right_bottom = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
  958. int(info.get("location").get("top")) + int(info.get("location").get("height")))
  959. left_bottom = (int(info.get("location").get("left")),
  960. int(info.get("location").get("top")) + int(info.get("location").get("height")))
  961. box = [left_top, right_top, right_bottom, left_bottom]
  962. score = float("%.2f" % info.get("location").get("score"))
  963. label_array = person_label_arrays[0]
  964. color = rainbows[0]
  965. if logo:
  966. copy_frame = add_water_pic(copy_frame, logo, request_id)
  967. if det_xywh[code].get(target) is None:
  968. det_xywh[code][target] = [[target, box, score, label_array, color]]
  969. else:
  970. det_xywh[code][target].append([target, box, score, label_array, color])
  971. info["id"] = str(i)
  972. if len(det_xywh[code]) > 0:
  973. result["type"] = str(target)
  974. result["modelCode"] = code
  975. put_queue(image_queue, (1, (det_xywh, image_url, copy_frame, font_config, result)), timeout=2)
  976. # 人流量
  977. if target == BaiduModelTarget.PEOPLE_COUNTING.value[1] and result is not None:
  978. base64Image = result.get("image")
  979. if base64Image is not None and len(base64Image) > 0:
  980. baiduImage = base64.b64decode(base64Image)
  981. result["type"] = str(target)
  982. result["modelCode"] = code
  983. del result["image"]
  984. put_queue(image_queue, (1, (None, image_url, baiduImage, None, result)), timeout=2)
  985. @staticmethod
  986. def start_File_upload(fb_queue, context, msg, image_queue, analyse_type):
  987. image_thread = ImageTypeImageFileUpload(fb_queue, context, msg, image_queue, analyse_type)
  988. image_thread.setDaemon(True)
  989. image_thread.start()
  990. return image_thread
  991. def run(self):
  992. fb_queue, msg, analyse_type, context = self._fb_queue, self._msg, self._analyse_type, self._context
  993. request_id, logo, image_queue = msg["request_id"], context['logo'], self._image_queue
  994. base_dir, env = context["base_dir"], context["env"]
  995. imageUrls = msg["image_urls"]
  996. image_thread = None
  997. with ThreadPoolExecutor(max_workers=2) as t:
  998. try:
  999. init_log(base_dir, env)
  1000. logger.info("开始启动图片识别进程, requestId: {}", request_id)
  1001. model_array = get_model(msg, context, analyse_type)
  1002. image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
  1003. task_list = []
  1004. for model in model_array:
  1005. # 百度模型逻辑
  1006. if model[1] == ModelType.BAIDU_MODEL.value[1]:
  1007. result = t.submit(self.baiduRecognition, imageUrls, model, image_queue, logo, request_id)
  1008. task_list.append(result)
  1009. # 防疫模型
  1010. elif model[1] == ModelType.EPIDEMIC_PREVENTION_MODEL.value[1]:
  1011. result = t.submit(self.epidemicPrevention, imageUrls, model, base_dir, env, request_id)
  1012. task_list.append(result)
  1013. # 车牌模型
  1014. elif model[1] == ModelType.PLATE_MODEL.value[1]:
  1015. result = t.submit(self.epidemicPrevention, imageUrls, model, base_dir, env, request_id)
  1016. task_list.append(result)
  1017. else:
  1018. result = t.submit(self.publicIdentification, imageUrls, model, image_queue, logo, request_id)
  1019. task_list.append(result)
  1020. for r in task_list:
  1021. r.result(60)
  1022. if image_thread and not image_thread.is_alive():
  1023. raise Exception("图片识别图片上传线程异常停止!!!")
  1024. if image_thread and image_thread.is_alive():
  1025. put_queue(image_queue, (2, 'stop'), timeout=2)
  1026. image_thread.join(120)
  1027. logger.info("图片进程任务完成,requestId:{}", request_id)
  1028. put_queue(fb_queue, message_feedback(request_id,
  1029. AnalysisStatus.SUCCESS.value,
  1030. analyse_type,
  1031. progress=success_progess), timeout=2, is_ex=True)
  1032. except ServiceException as s:
  1033. logger.error("图片分析异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, request_id)
  1034. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  1035. analyse_type,
  1036. s.code,
  1037. s.msg), timeout=2)
  1038. except Exception:
  1039. logger.error("图片分析异常: {}, requestId:{}", format_exc(), request_id)
  1040. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  1041. analyse_type,
  1042. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  1043. ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=2)
  1044. finally:
  1045. if image_thread and image_thread.is_alive():
  1046. clear_queue(image_queue)
  1047. put_queue(image_queue, (2, 'stop'), timeout=2)
  1048. image_thread.join(120)
  1049. clear_queue(image_queue)
  1050. class ScreenRecordingProcess(Process):
  1051. __slots__ = ('_fb_queue', '_context', '_msg', '_analysisType', '_event_queue', '_hb_queue', '_analysisType')
  1052. def __init__(self, *args):
  1053. super().__init__()
  1054. # 传参
  1055. self._fb_queue, self._context, self._msg, self._analysisType = args
  1056. self._event_queue, self._hb_queue, self._pull_queue = Queue(), Queue(), Queue(10)
  1057. put_queue(self._fb_queue,
  1058. recording_feedback(self._msg["request_id"], RecordingStatus.RECORDING_WAITING.value[0]),
  1059. timeout=1, is_ex=True)
  1060. def sendEvent(self, result):
  1061. put_queue(self._event_queue, result, timeout=2, is_ex=True)
  1062. @staticmethod
  1063. def start_pull_stream_thread(msg, context, pull_queue, hb_queue, fb_queue, frame_num):
  1064. pullThread = RecordingPullStreamThread(msg, context, pull_queue, hb_queue, fb_queue, frame_num)
  1065. pullThread.setDaemon(True)
  1066. pullThread.start()
  1067. return pullThread
  1068. @staticmethod
  1069. def start_hb_thread(fb_queue, hb_queue, request_id):
  1070. hb = RecordingHeartbeat(fb_queue, hb_queue, request_id)
  1071. hb.setDaemon(True)
  1072. hb.start()
  1073. return hb
  1074. @staticmethod
  1075. def check(start_time, service_timeout, pull_thread, hb_thread, request_id):
  1076. if time() - start_time > service_timeout:
  1077. logger.error("录屏超时, requestId: {}", request_id)
  1078. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  1079. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  1080. if pull_thread and not pull_thread.is_alive():
  1081. logger.info("录屏拉流线程停止异常, requestId: {}", request_id)
  1082. raise Exception("录屏拉流线程异常停止")
  1083. if hb_thread and not hb_thread.is_alive():
  1084. logger.info("录屏心跳线程异常停止, requestId: {}", request_id)
  1085. raise Exception("录屏心跳线程异常停止")
  1086. def run(self):
  1087. msg, context = self._msg, self._context
  1088. request_id, push_url = msg['request_id'], msg.get('push_url')
  1089. pull_queue, fb_queue, hb_queue, event_queue = self._pull_queue, self._fb_queue, self._hb_queue, \
  1090. self._event_queue
  1091. base_dir, env, service_timeout = context['base_dir'], context['env'], int(context["service"]["timeout"])
  1092. pre_path, end_path = '%s/%s%s' % (base_dir, context["video"]["file_path"], now_date_to_str(YMDHMSF)), \
  1093. '%s%s' % (request_id, ".mp4")
  1094. orFilePath = '%s%s%s' % (pre_path, "_on_or_", end_path)
  1095. pull_thread, hb_thread = None, None
  1096. or_write_status, p_push_status = [0, 0], [0, 0]
  1097. or_video_file, push_p = None, None
  1098. ex = None
  1099. try:
  1100. # 初始化日志
  1101. init_log(base_dir, env)
  1102. # 启动拉流线程
  1103. pull_thread = self.start_pull_stream_thread(msg, context, pull_queue, hb_queue, fb_queue, 25)
  1104. hb_thread = self.start_hb_thread(fb_queue, hb_queue, request_id)
  1105. start_time = time()
  1106. with ThreadPoolExecutor(max_workers=2) as t:
  1107. while True:
  1108. # 检查拉流线程和心跳线程
  1109. self.check(start_time, service_timeout, pull_thread, hb_thread, request_id)
  1110. # 判断是否需要停止录屏
  1111. event_result = get_no_block_queue(event_queue)
  1112. if event_result is not None:
  1113. cmdStr = event_result.get("command")
  1114. # 接收到停止指令
  1115. if 'stop' == cmdStr:
  1116. logger.info("录屏任务开始停止, requestId: {}", request_id)
  1117. pull_thread.sendEvent({"command": "stop"})
  1118. pull_result = get_no_block_queue(pull_queue)
  1119. if pull_result is None:
  1120. sleep(1)
  1121. continue
  1122. if pull_result[0] == 1:
  1123. close_all_p(push_p, or_video_file, None, request_id)
  1124. pull_thread.sendEvent({"command": "stop"})
  1125. pull_thread.join(180)
  1126. raise ServiceException(pull_result[1], pull_result[2])
  1127. elif pull_result[0] == 2:
  1128. close_all_p(push_p, or_video_file, None, request_id)
  1129. pull_thread.sendEvent({"command": "stop"})
  1130. pull_thread.join(180)
  1131. break
  1132. elif pull_result[0] == 4:
  1133. frame_list, frame_index_list, all_frames = pull_result[1]
  1134. if len(frame_list) > 0:
  1135. for i, frame in enumerate(frame_list):
  1136. if frame_index_list[i] % 300 == 0 and frame_index_list[i] < all_frames:
  1137. task_process = "%.2f" % (float(frame_index_list[i]) / float(all_frames))
  1138. put_queue(hb_queue, {"progress": task_process}, timeout=1)
  1139. write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file,
  1140. or_write_status, request_id)
  1141. if push_url is not None and len(push_url) > 0:
  1142. push_p_result = t.submit(push_video_stream, frame, push_p, push_url, p_push_status,
  1143. request_id)
  1144. push_p = push_p_result.result()
  1145. or_video_file = write_or_video_result.result()
  1146. else:
  1147. raise Exception("未知拉流状态异常!")
  1148. logger.info("录屏线程任务完成,requestId:{}", self._msg.get("request_id"))
  1149. except ServiceException as s:
  1150. logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, self._msg.get("request_id"))
  1151. ex = s.code, s.msg
  1152. except Exception:
  1153. logger.error("服务异常: {}, requestId: {},", format_exc(), self._msg.get("request_id"))
  1154. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  1155. finally:
  1156. or_url = ""
  1157. exn = None
  1158. try:
  1159. # 关闭推流管道, 原视频写流客户端
  1160. close_all_p(push_p, or_video_file, None, request_id)
  1161. # 关闭拉流线程
  1162. if pull_thread and pull_thread.is_alive():
  1163. pull_thread.sendEvent({"command": "stop_ex"})
  1164. pull_thread.sendEvent({"command": "stop"})
  1165. pull_thread.join(120)
  1166. # 判断是否需要上传视频
  1167. if exists(orFilePath) and getsize(orFilePath) > 100:
  1168. or_url = self.upload_video(base_dir, env, request_id, orFilePath)
  1169. if or_url is None or len(or_url) == 0:
  1170. logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", request_id)
  1171. raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
  1172. ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
  1173. # 停止心跳线程
  1174. if hb_thread and hb_thread.is_alive():
  1175. put_queue(hb_queue, {"command": "stop"}, timeout=10, is_ex=False)
  1176. hb_thread.join(timeout=120)
  1177. if exists(orFilePath):
  1178. logger.info("开始删除原视频, orFilePath: {}, requestId: {}", orFilePath, request_id)
  1179. os.remove(orFilePath)
  1180. logger.info("删除原视频成功, orFilePath: {}, requestId: {}", orFilePath, request_id)
  1181. # 如果有异常, 检查是否有原视频和AI视频,有则上传,响应失败
  1182. if ex:
  1183. code, msg = ex
  1184. put_queue(fb_queue, recording_feedback(request_id, RecordingStatus.RECORDING_FAILED.value[0],
  1185. error_code=code,
  1186. error_msg=msg,
  1187. video_url=or_url), timeout=10, is_ex=False)
  1188. else:
  1189. if or_url is None or len(or_url) == 0:
  1190. raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0],
  1191. ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1])
  1192. put_queue(fb_queue, recording_feedback(request_id, RecordingStatus.RECORDING_SUCCESS.value[0],
  1193. progress=success_progess,
  1194. video_url=or_url), timeout=10, is_ex=False)
  1195. except ServiceException as s:
  1196. exn = s.code, s.msg
  1197. except Exception:
  1198. logger.error("异常:{}, requestId: {}", format_exc(), request_id)
  1199. exn = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  1200. finally:
  1201. if pull_thread and pull_thread.is_alive():
  1202. pull_thread.sendEvent({"command": "stop"})
  1203. pull_thread.join(120)
  1204. if hb_thread and hb_thread.is_alive():
  1205. put_queue(hb_queue, {"command": "stop"}, timeout=10, is_ex=False)
  1206. hb_thread.join(timeout=120)
  1207. self.clear_queue_end()
  1208. if exn:
  1209. code, msg = exn
  1210. put_queue(fb_queue, recording_feedback(request_id, RecordingStatus.RECORDING_FAILED.value[0],
  1211. error_code=code,
  1212. error_msg=msg,
  1213. video_url=or_url), timeout=10, is_ex=False)
  1214. def clear_queue_end(self):
  1215. clear_queue(self._event_queue)
  1216. clear_queue(self._hb_queue)
  1217. clear_queue(self._pull_queue)
  1218. @staticmethod
  1219. def upload_video(base_dir, env, request_id, orFilePath):
  1220. aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
  1221. upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, orFilePath, "or_online_%s" % request_id)
  1222. upload_video_thread_ai.setDaemon(True)
  1223. upload_video_thread_ai.start()
  1224. or_url = upload_video_thread_ai.get_result()
  1225. return or_url
  1226. """
  1227. "models": [{
  1228. "code": "模型编号",
  1229. "categories":[{
  1230. "id": "模型id",
  1231. "config": {
  1232. "k1": "v1",
  1233. "k2": "v2"
  1234. }
  1235. }]
  1236. }]
  1237. """
  1238. def get_model(msg, context, analyse_type):
  1239. # 初始变量
  1240. request_id, base_dir, gpu_name, env = msg["request_id"], context["base_dir"], context["gpu_name"], context["env"]
  1241. models, model_num_limit = msg["models"], context["service"]["model"]['limit']
  1242. try:
  1243. # 实时、离线元组
  1244. analyse_type_tuple = (AnalysisType.ONLINE.value, AnalysisType.OFFLINE.value)
  1245. # (实时、离线)检查模型组合, 目前只支持3个模型组合
  1246. if analyse_type in analyse_type_tuple:
  1247. if len(models) > model_num_limit:
  1248. raise ServiceException(ExceptionType.MODEL_GROUP_LIMIT_EXCEPTION.value[0],
  1249. ExceptionType.MODEL_GROUP_LIMIT_EXCEPTION.value[1])
  1250. modelArray, codeArray = [], set()
  1251. for model in models:
  1252. # 模型编码
  1253. code = model["code"]
  1254. # 检验code是否重复
  1255. if code in codeArray:
  1256. raise ServiceException(ExceptionType.MODEL_DUPLICATE_EXCEPTION.value[0],
  1257. ExceptionType.MODEL_DUPLICATE_EXCEPTION.value[1])
  1258. codeArray.add(code)
  1259. # 检测目标数组
  1260. needed_objectsIndex = list(set([int(category["id"]) for category in model["categories"]]))
  1261. logger.info("模型编号: {}, 检查目标: {}, requestId: {}", code, needed_objectsIndex, request_id)
  1262. model_method = MODEL_CONFIG.get(code)
  1263. if model_method is None:
  1264. logger.error("未匹配到对应的模型, requestId:{}", request_id)
  1265. raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],
  1266. ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1])
  1267. # 检查cpu资源、gpu资源
  1268. check_cpu(base_dir, request_id)
  1269. gpu_ids = check_gpu_resource(request_id)
  1270. # 如果实时识别、离线识别
  1271. if analyse_type in analyse_type_tuple:
  1272. if model["is_video"] == "1":
  1273. mod = model_method[0](gpu_ids[0], needed_objectsIndex, request_id, gpu_name, base_dir, env)
  1274. modelArray.append((mod.model_conf, code))
  1275. else:
  1276. raise ServiceException(ExceptionType.MODEL_NOT_SUPPORT_VIDEO_EXCEPTION.value[0],
  1277. ExceptionType.MODEL_NOT_SUPPORT_VIDEO_EXCEPTION.value[1],
  1278. model_method[1].value[2])
  1279. # 如果是图片识别
  1280. if analyse_type == AnalysisType.IMAGE.value:
  1281. if model["is_image"] == "1":
  1282. mod = model_method[0](gpu_ids[0], needed_objectsIndex, request_id, gpu_name, base_dir, env)
  1283. modelArray.append((mod.model_conf, code))
  1284. else:
  1285. raise ServiceException(ExceptionType.MODEL_NOT_SUPPORT_IMAGE_EXCEPTION.value[0],
  1286. ExceptionType.MODEL_NOT_SUPPORT_IMAGE_EXCEPTION.value[1],
  1287. model_method[1].value[2])
  1288. if len(modelArray) == 0:
  1289. raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],
  1290. ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1])
  1291. return modelArray
  1292. except ServiceException as s:
  1293. raise s
  1294. except Exception:
  1295. logger.error("模型配置处理异常: {}, request_id: {}", format_exc(), request_id)
  1296. raise ServiceException(ExceptionType.MODEL_LOADING_EXCEPTION.value[0],
  1297. ExceptionType.MODEL_LOADING_EXCEPTION.value[1])