Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

609 Zeilen
35KB

  1. # -*- coding: utf-8 -*-
  2. import json
  3. import os
  4. import time
  5. import copy
  6. from common import Constant
  7. from multiprocessing import Process, Queue
  8. from loguru import logger
  9. from enums.AnalysisStatusEnum import AnalysisStatus
  10. from enums.AnalysisTypeEnum import AnalysisType
  11. from enums.ExceptionEnum import ExceptionType
  12. from enums.ModelTypeEnum import ModelType
  13. from util import LogUtils, TimeUtils, ModelUtils, ImageUtils
  14. from util.Cv2Utils import Cv2Util
  15. from entity.FeedBack import message_feedback
  16. from util import AliyunSdk
  17. from concurrency.CommonThread import Common
  18. from concurrency.CommonProcess import CommonProcess
  19. from exception.CustomerException import ServiceException
  20. class IntelligentRecognitionProcess(Process):
  21. def __init__(self, cfg, config):
  22. super().__init__()
  23. self.fbQueue = cfg.get("fbQueue")
  24. self.eventQueue = Queue()
  25. self.content = cfg.get("content")
  26. self.msg = cfg.get("msg")
  27. self.imageQueue = cfg.get("imageQueue")
  28. self.gpu_ids = cfg.get("gpu_ids")
  29. self.pic = cfg.get("pic")
  30. self.logo = cfg.get("logo")
  31. self.config = config
  32. # 给本进程发送事件
  33. def sendEvent(self, eBody):
  34. while self.eventQueue.full():
  35. logger.info("时间队列已满, 2秒后重试!requestId:{}", self.msg.get("request_id"))
  36. time.sleep(2)
  37. self.eventQueue.put(eBody)
  38. # 获取下一个事件
  39. def getEvent(self):
  40. try:
  41. eBody = self.eventQueue.get(block=False)
  42. return eBody
  43. except Exception as e:
  44. pass
  45. # 推送执行结果
  46. def sendResult(self, result):
  47. while self.fbQueue.full():
  48. logger.info("问题反馈队列已满, 2秒后重试!requestId:{}", self.msg.get("request_id"))
  49. time.sleep(2)
  50. self.fbQueue.put(result)
  51. '''
  52. 实时任务进程
  53. '''
  54. class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
  55. # 停止任务方法
  56. def stop_task(self, cv2tool, orFilePath, aiFilePath, snalysisStatus):
  57. # 停止cv2相关配置
  58. cv2tool.stop_cv2()
  59. if not os.path.exists(orFilePath) or not os.path.exists(aiFilePath):
  60. logger.error("原视频或AI视频不存在!requestId:{}", self.msg.get("request_id"))
  61. raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
  62. ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[1])
  63. params1 = (orFilePath, "orOnLineVideo", self.content, logger, self.msg.get("request_id"))
  64. or_update_thread = Common(content=self.content, func=AliyunSdk.get_play_url, args=params1)
  65. params2 = (aiFilePath, "aiOnLineVideo", self.content, logger, self.msg.get("request_id"))
  66. ai_update_thread = Common(content=self.content, func=AliyunSdk.get_play_url, args=params2)
  67. or_update_thread.setDaemon(True)
  68. ai_update_thread.setDaemon(True)
  69. or_update_thread.start()
  70. ai_update_thread.start()
  71. or_play_url = or_update_thread.get_result()
  72. ai_play_url = ai_update_thread.get_result()
  73. if or_play_url is None or ai_play_url is None:
  74. logger.error("原视频或AI视频播放上传VOD失败!原视频播放地址:{}, AI播放地址: {}, requestId: {}", or_play_url,
  75. ai_play_url, self.msg.get("request_id"))
  76. raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
  77. ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
  78. self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
  79. self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), snalysisStatus,
  80. AnalysisType.ONLINE.value,
  81. progress=Constant.success_progess,
  82. original_url=or_play_url,
  83. sign_url=ai_play_url,
  84. analyse_time=TimeUtils.now_date_to_str())})
  85. def run(self):
  86. # 初始化日志
  87. LogUtils.init_log(self.content)
  88. # 程序开始时间
  89. start_time = time.time()
  90. mod, model_type_code, modelConfig = get_model((self.config, str(self.gpu_ids[0]), self.msg["models"]))
  91. # mod_thread = Common(None, func=get_model, args=(self.config, str(self.gpu_ids[0]), self.msg["models"]))
  92. # mod_thread.setDaemon(True)
  93. # mod_thread.start()
  94. # mod = None
  95. # model_type_code = None
  96. # modelConfig = None
  97. # 启动公共进程包含(图片上传线程,心跳线程,问题反馈线程)
  98. commonProcess = CommonProcess(self.fbQueue, None, self.content, self.msg, self.imageQueue,
  99. AnalysisType.ONLINE.value)
  100. commonProcess.start()
  101. orFilePath = None
  102. aiFilePath = None
  103. cv2tool = None
  104. try:
  105. # 定义原视频、AI视频保存名称
  106. random_time = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
  107. orFilePath = "{}{}{}{}{}".format(self.content["video"]["file_path"],
  108. random_time,
  109. "_on_or_",
  110. self.msg.get("request_id"),
  111. ".mp4")
  112. aiFilePath = "{}{}{}{}{}".format(self.content["video"]["file_path"],
  113. random_time,
  114. "_on_ai_",
  115. self.msg.get("request_id"),
  116. ".mp4")
  117. cv2tool = Cv2Util(self.msg.get('pull_url'), self.msg.get('push_url'), orFilePath, aiFilePath,
  118. self.msg.get("request_id"))
  119. cv2tool.build_cv2()
  120. # cv2重试初始化次数
  121. cv2_init_num = 1
  122. # 解决开始拉流失败问题,解决初始化fps,height,weight获取不到问题
  123. while True:
  124. event = self.getEvent()
  125. if event is not None and len(event) > 0:
  126. event_command = event.get("command")
  127. if 'stop' == event_command:
  128. logger.info("实时任务开始停止分析, requestId: {}", self.msg.get("request_id"))
  129. cv2tool.build_write()
  130. self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value)
  131. break
  132. if cv2tool.cap is None or not cv2tool.cap.isOpened():
  133. pull_stream_init_timeout = time.time() - start_time
  134. if pull_stream_init_timeout > int(self.content["service"]["cv2_pull_stream_timeout"]):
  135. logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout,
  136. self.msg.get("request_id"))
  137. raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
  138. ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
  139. logger.info("cv2开始拉流初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
  140. cv2_init_num += 1
  141. time.sleep(1)
  142. cv2tool.build_cv2()
  143. continue
  144. else:
  145. break
  146. cv2_init_num = 1
  147. cv2tool.build_write()
  148. high_score_image = {}
  149. step = int(self.content["service"]["frame_step"])
  150. pull_start_time = None
  151. read_start_time = None
  152. # 模型初始化次数
  153. # model = 0
  154. while True:
  155. end_time = time.time()
  156. create_task_time = end_time - start_time
  157. if create_task_time > int(self.content["service"]["timeout"]):
  158. logger.error("分析超时, 超时时间:{}, requestId: {}", create_task_time, self.msg.get("request_id"))
  159. raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0],
  160. ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1])
  161. if not commonProcess.is_alive():
  162. logger.info("图片上传、心跳、问题反馈进程停止异常, requestId: {}", self.msg.get("request_id"))
  163. raise Exception("图片上传、心跳、问题反馈进程异常停止")
  164. eBody = self.getEvent()
  165. if eBody is not None and len(eBody) > 0:
  166. cmdStr = eBody.get("command")
  167. # 接收到停止指令
  168. if 'stop' == cmdStr:
  169. if high_score_image is not None and len(high_score_image) > 0:
  170. for key in list(high_score_image.keys()):
  171. self.imageQueue.put({"image": high_score_image[key]})
  172. del high_score_image[key]
  173. logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id"))
  174. self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value)
  175. break
  176. # 检测是否断流
  177. if cv2tool.cap is None or not cv2tool.cap.isOpened():
  178. if pull_start_time is None:
  179. pull_start_time = time.time()
  180. # 默认1个小时
  181. pull_stream_timeout = time.time() - pull_start_time
  182. if pull_stream_timeout > int(self.content["service"]["cv2_read_stream_timeout"]):
  183. if high_score_image is not None and len(high_score_image) > 0:
  184. for key in list(high_score_image.keys()):
  185. self.imageQueue.put({"image": high_score_image[key]})
  186. del high_score_image[key]
  187. logger.info("拉流超时, 超时时间:{}, requestId:{}", pull_stream_timeout, self.msg.get("request_id"))
  188. self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.TIMEOUT.value)
  189. break
  190. logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
  191. cv2_init_num += 1
  192. time.sleep(1)
  193. cv2tool.build_cv2()
  194. continue
  195. pull_start_time = None
  196. is_opened, frame = cv2tool.getCap().read()
  197. if is_opened is None or not is_opened:
  198. if read_start_time is None:
  199. read_start_time = time.time()
  200. read_stream_timeout = time.time() - read_start_time
  201. if read_stream_timeout > int(self.content["service"]["cv2_read_stream_timeout"]):
  202. if high_score_image is not None and len(high_score_image) > 0:
  203. for key in list(high_score_image.keys()):
  204. self.imageQueue.put({"image": high_score_image[key]})
  205. del high_score_image[key]
  206. logger.info("运行中读流超时, 超时时间: {}, requestId: {}", read_stream_timeout,
  207. self.msg.get("request_id"))
  208. self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.TIMEOUT.value)
  209. break
  210. time.sleep(1)
  211. logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
  212. cv2_init_num += 1
  213. cv2tool.build_cv2()
  214. continue
  215. read_start_time = None
  216. # if mod is None and model == 0:
  217. # model += 1
  218. # logger.info("初始化模型: {}次, requestId: {}", model, self.msg.get("request_id"))
  219. # mod, model_type_code, modelConfig = mod_thread.get_result()
  220. # time00 = time.time()
  221. # 调用AI模型
  222. p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
  223. # time11 = time.time()
  224. # if time11 - time00 > 1:
  225. # logger.info("算法模型调度时间:{}s, requestId:{}", int(time11-time00), self.msg.get("request_id"))
  226. # AI推流
  227. if self.content["video"]["video_add_water"]:
  228. frame = self.pic.common_water_1(frame, self.logo)
  229. p_result[1] = self.pic.common_water_1(p_result[1], self.logo)
  230. frame_merge = cv2tool.video_merge(copy.deepcopy(frame), copy.deepcopy(p_result[1]))
  231. try:
  232. cv2tool.getOrVideoFile().write(frame)
  233. cv2tool.getAiVideoFile().write(frame_merge)
  234. cv2tool.getP().stdin.write(frame_merge.tostring())
  235. except Exception as e:
  236. current_retry_num = 0
  237. while True:
  238. try:
  239. cv2tool.build_p()
  240. cv2tool.getP().stdin.write(frame_merge.tostring())
  241. logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
  242. self.msg.get("request_id"))
  243. break
  244. except Exception as e:
  245. current_retry_num += 1
  246. logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
  247. current_retry_num, self.msg.get("request_id"))
  248. if current_retry_num > 3:
  249. logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id"))
  250. break
  251. # # 问题图片加入队列, 暂时写死,后期修改为真实问题
  252. cf = int(cv2tool.cap.get(1))
  253. if p_result[2] is not None and len(p_result[2]) > 0:
  254. for ai_analyse_result in p_result[2]:
  255. order = str(int(ai_analyse_result[0]))
  256. high_result = high_score_image.get(order)
  257. conf_c = ai_analyse_result[5]
  258. if high_result is None and conf_c >= float(self.content["service"]["frame_score"]):
  259. high_score_image[order] = {
  260. "or_frame": frame,
  261. "ai_frame": p_result[1],
  262. "current_frame": cf,
  263. "last_frame": cf + step,
  264. "progress": "",
  265. "mode_service": "online",
  266. "model_type_code": model_type_code,
  267. "model_detection_code": order,
  268. "socre": conf_c
  269. }
  270. else:
  271. if conf_c >= float(self.content["service"]["frame_score"]) and conf_c > high_result.get("socre"):
  272. high_score_image[order] = {
  273. "or_frame": frame,
  274. "ai_frame": p_result[1],
  275. "current_frame": cf,
  276. "last_frame": cf + step,
  277. "progress": "",
  278. "mode_service": "online",
  279. "model_type_code": model_type_code,
  280. "model_detection_code": order,
  281. "socre": conf_c
  282. }
  283. if cf % step == 0 and len(high_score_image) > 0:
  284. if self.content["service"]["filter"]["picture_similarity"]:
  285. for key in list(high_score_image.keys()):
  286. hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame"))
  287. hash2 = ImageUtils.dHash(p_result[1])
  288. dist = ImageUtils.Hamming_distance(hash1, hash2)
  289. similarity = 1 - dist * 1.0 / 64
  290. if similarity < self.content["service"]["filter"]["similarity"]:
  291. self.imageQueue.put({"image": high_score_image[key]})
  292. del high_score_image[key]
  293. else:
  294. for value in high_score_image.values():
  295. self.imageQueue.put({"image": value})
  296. high_score_image.clear()
  297. logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id"))
  298. except ServiceException as s:
  299. self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
  300. logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, self.msg.get("request_id"))
  301. self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
  302. AnalysisType.ONLINE.value,
  303. s.code,
  304. s.msg,
  305. analyse_time=TimeUtils.now_date_to_str())})
  306. except Exception as e:
  307. self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
  308. logger.exception("服务异常: {}, requestId: {},", e, self.msg.get("request_id"))
  309. self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
  310. AnalysisType.ONLINE.value,
  311. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  312. ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  313. analyse_time=TimeUtils.now_date_to_str())})
  314. finally:
  315. if cv2tool is not None:
  316. cv2tool.stop_cv2()
  317. self.sendResult({"command": "stop"})
  318. commonProcess.join()
  319. # 删除本地视频文件
  320. if orFilePath is not None and os.path.exists(orFilePath):
  321. logger.info("开始删除原视频, orFilePath: {}, requestId: {}", orFilePath, self.msg.get("request_id"))
  322. os.remove(orFilePath)
  323. logger.info("删除原视频成功, orFilePath: {}, requestId: {}", orFilePath, self.msg.get("request_id"))
  324. if aiFilePath is not None and os.path.exists(aiFilePath):
  325. logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
  326. os.remove(aiFilePath)
  327. logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
  328. class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
  329. def stop_task(self, cv2tool, aiFilePath, analysisStatus):
  330. cv2tool.stop_cv2()
  331. if not os.path.exists(aiFilePath):
  332. logger.error("AI视频不存在!requestId:{}", self.msg.get("request_id"))
  333. raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
  334. ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[1])
  335. params2 = (aiFilePath, "aiOffLineVideo", self.content, logger, self.msg.get("request_id"))
  336. ai_update_thread = Common(content=self.content, func=AliyunSdk.get_play_url, args=params2)
  337. ai_update_thread.setDaemon(True)
  338. ai_update_thread.start()
  339. ai_play_url = ai_update_thread.get_result()
  340. if ai_play_url is None:
  341. logger.error("原视频或AI视频播放上传VOD失败!requestId: {}, AI播放地址: {}",
  342. self.msg.get("request_id"), ai_play_url)
  343. raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
  344. ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
  345. self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
  346. self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), analysisStatus,
  347. AnalysisType.OFFLINE.value,
  348. progress=Constant.success_progess,
  349. sign_url=ai_play_url,
  350. analyse_time=TimeUtils.now_date_to_str())})
  351. def run(self):
  352. # 初始化日志
  353. LogUtils.init_log(self.content)
  354. # 程序开始时间
  355. start_time = time.time()
  356. mod, model_type_code, modelConfig = get_model((self.config, str(self.gpu_ids[0]), self.msg["models"]))
  357. # mod_thread = Common(None, func=get_model, args=(self.config, str(self.gpu_ids[0]), self.msg["models"]))
  358. # mod_thread.setDaemon(True)
  359. # mod_thread.start()
  360. # mod = None
  361. # model_type_code = None
  362. # modelConfig = None
  363. # 创建心跳队列
  364. hbQueue = Queue()
  365. # 结果反馈进程启动
  366. commonProcess = CommonProcess(self.fbQueue, hbQueue, self.content, self.msg, self.imageQueue,
  367. AnalysisType.OFFLINE.value)
  368. commonProcess.daemon = True
  369. commonProcess.start()
  370. aiFilePath = None
  371. cv2tool = None
  372. try:
  373. # 定义原视频、AI视频保存名称
  374. aiFilePath = "{}{}{}{}{}".format(self.content["video"]["file_path"],
  375. TimeUtils.now_date_to_str(TimeUtils.YMDHMSF),
  376. "_off_ai_",
  377. self.msg.get("request_id"),
  378. ".mp4")
  379. cv2tool = Cv2Util(self.msg.get('original_url'), self.msg.get('push_url'), aiFilePath=aiFilePath,
  380. requestId=self.msg.get("request_id"))
  381. cv2tool.build_cv2()
  382. cv2tool.build_write(False)
  383. # cv2重试初始化次数
  384. cv2_init_num = 1
  385. high_score_image = {}
  386. step = int(self.content["service"]["frame_step"])
  387. # 模型初始化速度
  388. # model = 0
  389. # 总视频帧数
  390. all_f = None
  391. if cv2tool.cap is not None:
  392. all_f = int(cv2tool.cap.get(7))
  393. while True:
  394. end_time = time.time()
  395. create_task_time = end_time - start_time
  396. if create_task_time > int(self.content["service"]["timeout"]):
  397. logger.error("分析超时,分析超时时间: {}s, requestId: {}", create_task_time, self.msg.get("request_id"))
  398. raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0],
  399. ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1])
  400. if not commonProcess.is_alive():
  401. logger.info("图片上传、心跳、问题反馈进程异常停止, requestId: {}", self.msg.get("request_id"))
  402. raise Exception("图片上传、心跳、问题反馈进程异常停止")
  403. eBody = self.getEvent()
  404. if eBody is not None and len(eBody) > 0:
  405. cmdStr = eBody.get("command")
  406. if 'stop' == cmdStr:
  407. if high_score_image is not None and len(high_score_image) > 0:
  408. for key in list(high_score_image.keys()):
  409. self.imageQueue.put({"image": high_score_image[key]})
  410. del high_score_image[key]
  411. logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id"))
  412. self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value)
  413. break
  414. # 检测是否断流
  415. if cv2tool.cap is None or not cv2tool.cap.isOpened():
  416. logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
  417. if cv2_init_num >= 3:
  418. raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
  419. ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
  420. cv2_init_num += 1
  421. time.sleep(1)
  422. cv2tool.build_cv2()
  423. all_f = int(cv2tool.cap.get(7))
  424. continue
  425. start_read_time = time.time()
  426. is_opened, frame = cv2tool.cap.read()
  427. cf = int(cv2tool.cap.get(1))
  428. if is_opened is None or not is_opened:
  429. logger.info("总帧数: {}, 当前帧数: {}, requestId: {}, is_opened: {}", float(cv2tool.cap.get(7)),
  430. cv2tool.cap.get(1), self.msg.get("request_id"), is_opened)
  431. logger.info("离线读流结束,读流时间: {}", time.time() - start_read_time)
  432. if high_score_image is not None and len(high_score_image) > 0:
  433. for key in list(high_score_image.keys()):
  434. self.imageQueue.put({"image": high_score_image[key]})
  435. del high_score_image[key]
  436. if float(cf) < float(all_f):
  437. logger.info("离线异常结束:requestId: {}", self.msg.get("request_id"))
  438. self.stop_task(cv2tool, aiFilePath, AnalysisStatus.TIMEOUT.value)
  439. break
  440. logger.info("任务开始结束分析, requestId: {}", self.msg.get("request_id"))
  441. self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value)
  442. break
  443. # if mod is None and model == 0:
  444. # model += 1
  445. # logger.info("初始化模型: {}次, requestId: {}", model, self.msg.get("request_id"))
  446. # mod, model_type_code, modelConfig = mod_thread.get_result()
  447. # time00 = time.time()
  448. # 调用AI模型
  449. p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
  450. # logger.info("算法模型调度时间:{}s, requestId:{}", time.time() - time00, self.msg.get("request_id"))
  451. # 原视频保存本地、AI视频保存本地
  452. if self.content["video"]["video_add_water"]:
  453. frame = self.pic.common_water_1(frame, self.logo)
  454. p_result[1] = self.pic.common_water_1(p_result[1], self.logo)
  455. frame_merge = cv2tool.video_merge(frame, p_result[1])
  456. try:
  457. cv2tool.getAiVideoFile().write(frame_merge)
  458. cv2tool.getP().stdin.write(frame_merge.tostring())
  459. except Exception as ex:
  460. current_retry_num = 0
  461. while True:
  462. try:
  463. cv2tool.build_p()
  464. cv2tool.getP().stdin.write(frame_merge.tostring())
  465. logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
  466. self.msg.get("request_id"))
  467. break
  468. except Exception as e:
  469. current_retry_num += 1
  470. logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
  471. current_retry_num, self.msg.get("request_id"))
  472. if current_retry_num > 3:
  473. logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id"))
  474. break
  475. # # 问题图片加入队列, 暂时写死,后期修改为真实问题
  476. if cf % 400 == 0:
  477. hbQueue.put({"cf": cf, "af": all_f})
  478. if p_result[2] is not None and len(p_result[2]) > 0:
  479. for ai_analyse_result in p_result[2]:
  480. order = str(int(ai_analyse_result[0]))
  481. high_result = high_score_image.get(order)
  482. conf_c = ai_analyse_result[5]
  483. if high_result is None and conf_c >= float(self.content["service"]["frame_score"]):
  484. high_score_image[order] = {
  485. "or_frame": frame,
  486. "ai_frame": p_result[1],
  487. "current_frame": cf,
  488. "last_frame": cf + step,
  489. "progress": "",
  490. "mode_service": "offline",
  491. "model_type_code": model_type_code,
  492. "model_detection_code": order,
  493. "socre": conf_c
  494. }
  495. else:
  496. if conf_c >= float(self.content["service"]["frame_score"]) and conf_c > high_result.get("socre"):
  497. high_score_image[order] = {
  498. "or_frame": frame,
  499. "ai_frame": p_result[1],
  500. "current_frame": cf,
  501. "last_frame": cf + step,
  502. "progress": "",
  503. "mode_service": "offline",
  504. "model_type_code": model_type_code,
  505. "model_detection_code": order,
  506. "socre": conf_c
  507. }
  508. if cf % step == 0 and len(high_score_image) > 0:
  509. if self.content["service"]["filter"]["picture_similarity"]:
  510. for key in list(high_score_image.keys()):
  511. hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame"))
  512. hash2 = ImageUtils.dHash(p_result[1])
  513. dist = ImageUtils.Hamming_distance(hash1, hash2)
  514. similarity = 1 - dist * 1.0 / 64
  515. if similarity < self.content["service"]["filter"]["similarity"]:
  516. self.imageQueue.put({"image": high_score_image[key]})
  517. del high_score_image[key]
  518. else:
  519. for value in high_score_image.values():
  520. self.imageQueue.put({"image": value})
  521. high_score_image.clear()
  522. hbQueue.put({"cf": cf, "af": all_f})
  523. logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id"))
  524. except ServiceException as s:
  525. self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
  526. logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, self.msg.get("request_id"))
  527. self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
  528. AnalysisType.OFFLINE.value,
  529. s.code,
  530. s.msg,
  531. analyse_time=TimeUtils.now_date_to_str())})
  532. except Exception as e:
  533. self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
  534. logger.exception("服务异常: {}, requestId:{}", e, self.msg.get("request_id"))
  535. self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
  536. AnalysisType.OFFLINE.value,
  537. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  538. ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  539. analyse_time=TimeUtils.now_date_to_str())})
  540. finally:
  541. if cv2tool is not None:
  542. cv2tool.stop_cv2()
  543. self.sendResult({"command": "stop"})
  544. commonProcess.join()
  545. # 删除本地视频文件
  546. if aiFilePath is not None and os.path.exists(aiFilePath):
  547. logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
  548. os.remove(aiFilePath)
  549. logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
  550. class PhotosIntelligentRecognitionProcess(IntelligentRecognitionProcess):
  551. pass
  552. '''
  553. "models": [{
  554. "code": "模型编号",
  555. "categories":[{
  556. "id": "模型id",
  557. "config": {
  558. "k1": "v1",
  559. "k2": "v2"
  560. }
  561. }]
  562. }]
  563. '''
  564. def get_model(args):
  565. logger.info("######################开始加载模型######################")
  566. for model in args[2]:
  567. try:
  568. code = model.get("code")
  569. needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")]
  570. logger.info("code:{}, 检查目标:{}, gpuId:{}", code, needed_objectsIndex, args[1])
  571. if code == ModelType.WATER_SURFACE_MODEL.value[1]:
  572. logger.info("######################加载河道模型######################")
  573. return ModelUtils.SZModel(args[1], needed_objectsIndex), code, args[0].get("sz")
  574. elif code == ModelType.FOREST_FARM_MODEL.value[1]:
  575. logger.info("######################加载林场模型######################")
  576. return ModelUtils.LCModel(args[1], needed_objectsIndex), code, args[0].get("lc")
  577. elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]:
  578. logger.info("######################加载交通模型######################")
  579. return ModelUtils.RFModel(args[1], needed_objectsIndex), code, args[0].get("rf")
  580. else:
  581. logger.error("未匹配到对应的模型")
  582. raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],
  583. ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1])
  584. except Exception as e:
  585. logger.exception("获取模型配置异常: {}", e)
  586. raise ServiceException(ExceptionType.AI_MODEL_CONFIG_EXCEPTION.value[0],
  587. ExceptionType.AI_MODEL_CONFIG_EXCEPTION.value[1])