|
- # -*- coding: utf-8 -*-
- import json
- import os
- import time
- import copy
- from common import Constant
- from multiprocessing import Process, Queue
- from loguru import logger
- from enums.AnalysisStatusEnum import AnalysisStatus
- from enums.AnalysisTypeEnum import AnalysisType
- from enums.ExceptionEnum import ExceptionType
- from enums.ModelTypeEnum import ModelType
- from util import LogUtils, TimeUtils, ModelUtils, ImageUtils
- from util.Cv2Utils import Cv2Util
- from entity.FeedBack import message_feedback
- from util import AliyunSdk
- from concurrency.CommonThread import Common
- from concurrency.CommonProcess import CommonProcess
- from exception.CustomerException import ServiceException
-
-
- class IntelligentRecognitionProcess(Process):
- def __init__(self, cfg, config):
- super().__init__()
- self.fbQueue = cfg.get("fbQueue")
- self.eventQueue = Queue()
- self.content = cfg.get("content")
- self.msg = cfg.get("msg")
- self.imageQueue = cfg.get("imageQueue")
- self.gpu_ids = cfg.get("gpu_ids")
- self.pic = cfg.get("pic")
- self.logo = cfg.get("logo")
- self.config = config
-
- # 给本进程发送事件
- def sendEvent(self, eBody):
- while self.eventQueue.full():
- logger.info("时间队列已满, 2秒后重试!requestId:{}", self.msg.get("request_id"))
- time.sleep(2)
- self.eventQueue.put(eBody)
-
- # 获取下一个事件
- def getEvent(self):
- try:
- eBody = self.eventQueue.get(block=False)
- return eBody
- except Exception as e:
- pass
-
- # 推送执行结果
- def sendResult(self, result):
- while self.fbQueue.full():
- logger.info("问题反馈队列已满, 2秒后重试!requestId:{}", self.msg.get("request_id"))
- time.sleep(2)
- self.fbQueue.put(result)
-
-
- '''
- 实时任务进程
- '''
-
-
- class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
-
- # 停止任务方法
- def stop_task(self, cv2tool, orFilePath, aiFilePath, snalysisStatus):
- # 停止cv2相关配置
- cv2tool.stop_cv2()
- if not os.path.exists(orFilePath) or not os.path.exists(aiFilePath):
- logger.error("原视频或AI视频不存在!requestId:{}", self.msg.get("request_id"))
- raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
- ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[1])
- params1 = (orFilePath, "orOnLineVideo", self.content, logger, self.msg.get("request_id"))
- or_update_thread = Common(content=self.content, func=AliyunSdk.get_play_url, args=params1)
- params2 = (aiFilePath, "aiOnLineVideo", self.content, logger, self.msg.get("request_id"))
- ai_update_thread = Common(content=self.content, func=AliyunSdk.get_play_url, args=params2)
- or_update_thread.setDaemon(True)
- ai_update_thread.setDaemon(True)
- or_update_thread.start()
- ai_update_thread.start()
- or_play_url = or_update_thread.get_result()
- ai_play_url = ai_update_thread.get_result()
- if or_play_url is None or ai_play_url is None:
- logger.error("原视频或AI视频播放上传VOD失败!原视频播放地址:{}, AI播放地址: {}, requestId: {}", or_play_url,
- ai_play_url, self.msg.get("request_id"))
- raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
- ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
- self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
- self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), snalysisStatus,
- AnalysisType.ONLINE.value,
- progress=Constant.success_progess,
- original_url=or_play_url,
- sign_url=ai_play_url,
- analyse_time=TimeUtils.now_date_to_str())})
-
- def run(self):
- # 初始化日志
- LogUtils.init_log(self.content)
- # 程序开始时间
- start_time = time.time()
- mod, model_type_code, modelConfig = get_model((self.config, str(self.gpu_ids[0]), self.msg["models"]))
- # mod_thread = Common(None, func=get_model, args=(self.config, str(self.gpu_ids[0]), self.msg["models"]))
- # mod_thread.setDaemon(True)
- # mod_thread.start()
- # mod = None
- # model_type_code = None
- # modelConfig = None
- # 启动公共进程包含(图片上传线程,心跳线程,问题反馈线程)
- commonProcess = CommonProcess(self.fbQueue, None, self.content, self.msg, self.imageQueue,
- AnalysisType.ONLINE.value)
- commonProcess.start()
- orFilePath = None
- aiFilePath = None
- cv2tool = None
- try:
- # 定义原视频、AI视频保存名称
- random_time = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
- orFilePath = "{}{}{}{}{}".format(self.content["video"]["file_path"],
- random_time,
- "_on_or_",
- self.msg.get("request_id"),
- ".mp4")
- aiFilePath = "{}{}{}{}{}".format(self.content["video"]["file_path"],
- random_time,
- "_on_ai_",
- self.msg.get("request_id"),
- ".mp4")
- cv2tool = Cv2Util(self.msg.get('pull_url'), self.msg.get('push_url'), orFilePath, aiFilePath,
- self.msg.get("request_id"))
- cv2tool.build_cv2()
- # cv2重试初始化次数
- cv2_init_num = 1
- # 解决开始拉流失败问题,解决初始化fps,height,weight获取不到问题
- while True:
- event = self.getEvent()
- if event is not None and len(event) > 0:
- event_command = event.get("command")
- if 'stop' == event_command:
- logger.info("实时任务开始停止分析, requestId: {}", self.msg.get("request_id"))
- cv2tool.build_write()
- self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value)
- break
- if cv2tool.cap is None or not cv2tool.cap.isOpened():
- pull_stream_init_timeout = time.time() - start_time
- if pull_stream_init_timeout > int(self.content["service"]["cv2_pull_stream_timeout"]):
- logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout,
- self.msg.get("request_id"))
- raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
- ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
- logger.info("cv2开始拉流初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
- cv2_init_num += 1
- time.sleep(1)
- cv2tool.build_cv2()
- continue
- else:
- break
- cv2_init_num = 1
- cv2tool.build_write()
- high_score_image = {}
- step = int(self.content["service"]["frame_step"])
- pull_start_time = None
- read_start_time = None
- # 模型初始化次数
- # model = 0
- while True:
- end_time = time.time()
- create_task_time = end_time - start_time
- if create_task_time > int(self.content["service"]["timeout"]):
- logger.error("分析超时, 超时时间:{}, requestId: {}", create_task_time, self.msg.get("request_id"))
- raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0],
- ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1])
- if not commonProcess.is_alive():
- logger.info("图片上传、心跳、问题反馈进程停止异常, requestId: {}", self.msg.get("request_id"))
- raise Exception("图片上传、心跳、问题反馈进程异常停止")
- eBody = self.getEvent()
- if eBody is not None and len(eBody) > 0:
- cmdStr = eBody.get("command")
- # 接收到停止指令
- if 'stop' == cmdStr:
- if high_score_image is not None and len(high_score_image) > 0:
- for key in list(high_score_image.keys()):
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id"))
- self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value)
- break
- # 检测是否断流
- if cv2tool.cap is None or not cv2tool.cap.isOpened():
- if pull_start_time is None:
- pull_start_time = time.time()
- # 默认1个小时
- pull_stream_timeout = time.time() - pull_start_time
- if pull_stream_timeout > int(self.content["service"]["cv2_read_stream_timeout"]):
- if high_score_image is not None and len(high_score_image) > 0:
- for key in list(high_score_image.keys()):
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- logger.info("拉流超时, 超时时间:{}, requestId:{}", pull_stream_timeout, self.msg.get("request_id"))
- self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.TIMEOUT.value)
- break
- logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
- cv2_init_num += 1
- time.sleep(1)
- cv2tool.build_cv2()
- continue
- pull_start_time = None
- is_opened, frame = cv2tool.getCap().read()
- if is_opened is None or not is_opened:
- if read_start_time is None:
- read_start_time = time.time()
- read_stream_timeout = time.time() - read_start_time
- if read_stream_timeout > int(self.content["service"]["cv2_read_stream_timeout"]):
- if high_score_image is not None and len(high_score_image) > 0:
- for key in list(high_score_image.keys()):
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- logger.info("运行中读流超时, 超时时间: {}, requestId: {}", read_stream_timeout,
- self.msg.get("request_id"))
- self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.TIMEOUT.value)
- break
- time.sleep(1)
- logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
- cv2_init_num += 1
- cv2tool.build_cv2()
- continue
- read_start_time = None
- # if mod is None and model == 0:
- # model += 1
- # logger.info("初始化模型: {}次, requestId: {}", model, self.msg.get("request_id"))
- # mod, model_type_code, modelConfig = mod_thread.get_result()
- # time00 = time.time()
- # 调用AI模型
- p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
- # time11 = time.time()
- # if time11 - time00 > 1:
- # logger.info("算法模型调度时间:{}s, requestId:{}", int(time11-time00), self.msg.get("request_id"))
- # AI推流
- if self.content["video"]["video_add_water"]:
- frame = self.pic.common_water_1(frame, self.logo)
- p_result[1] = self.pic.common_water_1(p_result[1], self.logo)
- frame_merge = cv2tool.video_merge(copy.deepcopy(frame), copy.deepcopy(p_result[1]))
- try:
- cv2tool.getOrVideoFile().write(frame)
- cv2tool.getAiVideoFile().write(frame_merge)
- cv2tool.getP().stdin.write(frame_merge.tostring())
- except Exception as e:
- current_retry_num = 0
- while True:
- try:
- cv2tool.build_p()
- cv2tool.getP().stdin.write(frame_merge.tostring())
- logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
- self.msg.get("request_id"))
- break
- except Exception as e:
- current_retry_num += 1
- logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
- current_retry_num, self.msg.get("request_id"))
- if current_retry_num > 3:
- logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id"))
- break
- # # 问题图片加入队列, 暂时写死,后期修改为真实问题
- cf = int(cv2tool.cap.get(1))
- if p_result[2] is not None and len(p_result[2]) > 0:
- for ai_analyse_result in p_result[2]:
- order = str(int(ai_analyse_result[0]))
- high_result = high_score_image.get(order)
- conf_c = ai_analyse_result[5]
- if high_result is None and conf_c >= float(self.content["service"]["frame_score"]):
- high_score_image[order] = {
- "or_frame": frame,
- "ai_frame": p_result[1],
- "current_frame": cf,
- "last_frame": cf + step,
- "progress": "",
- "mode_service": "online",
- "model_type_code": model_type_code,
- "model_detection_code": order,
- "socre": conf_c
- }
- else:
- if conf_c >= float(self.content["service"]["frame_score"]) and conf_c > high_result.get("socre"):
- high_score_image[order] = {
- "or_frame": frame,
- "ai_frame": p_result[1],
- "current_frame": cf,
- "last_frame": cf + step,
- "progress": "",
- "mode_service": "online",
- "model_type_code": model_type_code,
- "model_detection_code": order,
- "socre": conf_c
- }
- if cf % step == 0 and len(high_score_image) > 0:
- if self.content["service"]["filter"]["picture_similarity"]:
- for key in list(high_score_image.keys()):
- hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame"))
- hash2 = ImageUtils.dHash(p_result[1])
- dist = ImageUtils.Hamming_distance(hash1, hash2)
- similarity = 1 - dist * 1.0 / 64
- if similarity < self.content["service"]["filter"]["similarity"]:
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- else:
- for value in high_score_image.values():
- self.imageQueue.put({"image": value})
- high_score_image.clear()
- logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id"))
- except ServiceException as s:
- self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
- logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, self.msg.get("request_id"))
- self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
- AnalysisType.ONLINE.value,
- s.code,
- s.msg,
- analyse_time=TimeUtils.now_date_to_str())})
- except Exception as e:
- self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
- logger.exception("服务异常: {}, requestId: {},", e, self.msg.get("request_id"))
- self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
- AnalysisType.ONLINE.value,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- analyse_time=TimeUtils.now_date_to_str())})
- finally:
- if cv2tool is not None:
- cv2tool.stop_cv2()
- self.sendResult({"command": "stop"})
- commonProcess.join()
- # 删除本地视频文件
- if orFilePath is not None and os.path.exists(orFilePath):
- logger.info("开始删除原视频, orFilePath: {}, requestId: {}", orFilePath, self.msg.get("request_id"))
- os.remove(orFilePath)
- logger.info("删除原视频成功, orFilePath: {}, requestId: {}", orFilePath, self.msg.get("request_id"))
- if aiFilePath is not None and os.path.exists(aiFilePath):
- logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
- os.remove(aiFilePath)
- logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
-
-
- class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
-
- def stop_task(self, cv2tool, aiFilePath, analysisStatus):
- cv2tool.stop_cv2()
- if not os.path.exists(aiFilePath):
- logger.error("AI视频不存在!requestId:{}", self.msg.get("request_id"))
- raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
- ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[1])
- params2 = (aiFilePath, "aiOffLineVideo", self.content, logger, self.msg.get("request_id"))
- ai_update_thread = Common(content=self.content, func=AliyunSdk.get_play_url, args=params2)
- ai_update_thread.setDaemon(True)
- ai_update_thread.start()
- ai_play_url = ai_update_thread.get_result()
- if ai_play_url is None:
- logger.error("原视频或AI视频播放上传VOD失败!requestId: {}, AI播放地址: {}",
- self.msg.get("request_id"), ai_play_url)
- raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
- ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
- self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
- self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), analysisStatus,
- AnalysisType.OFFLINE.value,
- progress=Constant.success_progess,
- sign_url=ai_play_url,
- analyse_time=TimeUtils.now_date_to_str())})
-
- def run(self):
- # 初始化日志
- LogUtils.init_log(self.content)
- # 程序开始时间
- start_time = time.time()
- mod, model_type_code, modelConfig = get_model((self.config, str(self.gpu_ids[0]), self.msg["models"]))
- # mod_thread = Common(None, func=get_model, args=(self.config, str(self.gpu_ids[0]), self.msg["models"]))
- # mod_thread.setDaemon(True)
- # mod_thread.start()
- # mod = None
- # model_type_code = None
- # modelConfig = None
- # 创建心跳队列
- hbQueue = Queue()
- # 结果反馈进程启动
- commonProcess = CommonProcess(self.fbQueue, hbQueue, self.content, self.msg, self.imageQueue,
- AnalysisType.OFFLINE.value)
- commonProcess.daemon = True
- commonProcess.start()
- aiFilePath = None
- cv2tool = None
- try:
- # 定义原视频、AI视频保存名称
- aiFilePath = "{}{}{}{}{}".format(self.content["video"]["file_path"],
- TimeUtils.now_date_to_str(TimeUtils.YMDHMSF),
- "_off_ai_",
- self.msg.get("request_id"),
- ".mp4")
- cv2tool = Cv2Util(self.msg.get('original_url'), self.msg.get('push_url'), aiFilePath=aiFilePath,
- requestId=self.msg.get("request_id"))
- cv2tool.build_cv2()
- cv2tool.build_write(False)
- # cv2重试初始化次数
- cv2_init_num = 1
- high_score_image = {}
- step = int(self.content["service"]["frame_step"])
- # 模型初始化速度
- # model = 0
- # 总视频帧数
- all_f = None
- if cv2tool.cap is not None:
- all_f = int(cv2tool.cap.get(7))
- while True:
- end_time = time.time()
- create_task_time = end_time - start_time
- if create_task_time > int(self.content["service"]["timeout"]):
- logger.error("分析超时,分析超时时间: {}s, requestId: {}", create_task_time, self.msg.get("request_id"))
- raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0],
- ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1])
- if not commonProcess.is_alive():
- logger.info("图片上传、心跳、问题反馈进程异常停止, requestId: {}", self.msg.get("request_id"))
- raise Exception("图片上传、心跳、问题反馈进程异常停止")
- eBody = self.getEvent()
- if eBody is not None and len(eBody) > 0:
- cmdStr = eBody.get("command")
- if 'stop' == cmdStr:
- if high_score_image is not None and len(high_score_image) > 0:
- for key in list(high_score_image.keys()):
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id"))
- self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value)
- break
- # 检测是否断流
- if cv2tool.cap is None or not cv2tool.cap.isOpened():
- logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
- if cv2_init_num >= 3:
- raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
- ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
- cv2_init_num += 1
- time.sleep(1)
- cv2tool.build_cv2()
- all_f = int(cv2tool.cap.get(7))
- continue
- start_read_time = time.time()
- is_opened, frame = cv2tool.cap.read()
- cf = int(cv2tool.cap.get(1))
- if is_opened is None or not is_opened:
- logger.info("总帧数: {}, 当前帧数: {}, requestId: {}, is_opened: {}", float(cv2tool.cap.get(7)),
- cv2tool.cap.get(1), self.msg.get("request_id"), is_opened)
- logger.info("离线读流结束,读流时间: {}", time.time() - start_read_time)
- if high_score_image is not None and len(high_score_image) > 0:
- for key in list(high_score_image.keys()):
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- if float(cf) < float(all_f):
- logger.info("离线异常结束:requestId: {}", self.msg.get("request_id"))
- self.stop_task(cv2tool, aiFilePath, AnalysisStatus.TIMEOUT.value)
- break
- logger.info("任务开始结束分析, requestId: {}", self.msg.get("request_id"))
- self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value)
- break
- # if mod is None and model == 0:
- # model += 1
- # logger.info("初始化模型: {}次, requestId: {}", model, self.msg.get("request_id"))
- # mod, model_type_code, modelConfig = mod_thread.get_result()
- # time00 = time.time()
- # 调用AI模型
- p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
- # logger.info("算法模型调度时间:{}s, requestId:{}", time.time() - time00, self.msg.get("request_id"))
- # 原视频保存本地、AI视频保存本地
- if self.content["video"]["video_add_water"]:
- frame = self.pic.common_water_1(frame, self.logo)
- p_result[1] = self.pic.common_water_1(p_result[1], self.logo)
- frame_merge = cv2tool.video_merge(frame, p_result[1])
- try:
- cv2tool.getAiVideoFile().write(frame_merge)
- cv2tool.getP().stdin.write(frame_merge.tostring())
- except Exception as ex:
- current_retry_num = 0
- while True:
- try:
- cv2tool.build_p()
- cv2tool.getP().stdin.write(frame_merge.tostring())
- logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
- self.msg.get("request_id"))
- break
- except Exception as e:
- current_retry_num += 1
- logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
- current_retry_num, self.msg.get("request_id"))
- if current_retry_num > 3:
- logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id"))
- break
- # # 问题图片加入队列, 暂时写死,后期修改为真实问题
- if cf % 400 == 0:
- hbQueue.put({"cf": cf, "af": all_f})
- if p_result[2] is not None and len(p_result[2]) > 0:
- for ai_analyse_result in p_result[2]:
- order = str(int(ai_analyse_result[0]))
- high_result = high_score_image.get(order)
- conf_c = ai_analyse_result[5]
- if high_result is None and conf_c >= float(self.content["service"]["frame_score"]):
- high_score_image[order] = {
- "or_frame": frame,
- "ai_frame": p_result[1],
- "current_frame": cf,
- "last_frame": cf + step,
- "progress": "",
- "mode_service": "offline",
- "model_type_code": model_type_code,
- "model_detection_code": order,
- "socre": conf_c
- }
- else:
- if conf_c >= float(self.content["service"]["frame_score"]) and conf_c > high_result.get("socre"):
- high_score_image[order] = {
- "or_frame": frame,
- "ai_frame": p_result[1],
- "current_frame": cf,
- "last_frame": cf + step,
- "progress": "",
- "mode_service": "offline",
- "model_type_code": model_type_code,
- "model_detection_code": order,
- "socre": conf_c
- }
- if cf % step == 0 and len(high_score_image) > 0:
- if self.content["service"]["filter"]["picture_similarity"]:
- for key in list(high_score_image.keys()):
- hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame"))
- hash2 = ImageUtils.dHash(p_result[1])
- dist = ImageUtils.Hamming_distance(hash1, hash2)
- similarity = 1 - dist * 1.0 / 64
- if similarity < self.content["service"]["filter"]["similarity"]:
- self.imageQueue.put({"image": high_score_image[key]})
- del high_score_image[key]
- else:
- for value in high_score_image.values():
- self.imageQueue.put({"image": value})
- high_score_image.clear()
- hbQueue.put({"cf": cf, "af": all_f})
- logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id"))
- except ServiceException as s:
- self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
- logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, self.msg.get("request_id"))
- self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
- AnalysisType.OFFLINE.value,
- s.code,
- s.msg,
- analyse_time=TimeUtils.now_date_to_str())})
- except Exception as e:
- self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
- logger.exception("服务异常: {}, requestId:{}", e, self.msg.get("request_id"))
- self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
- AnalysisType.OFFLINE.value,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- analyse_time=TimeUtils.now_date_to_str())})
- finally:
- if cv2tool is not None:
- cv2tool.stop_cv2()
- self.sendResult({"command": "stop"})
- commonProcess.join()
- # 删除本地视频文件
- if aiFilePath is not None and os.path.exists(aiFilePath):
- logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
- os.remove(aiFilePath)
- logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, self.msg.get("request_id"))
-
-
- class PhotosIntelligentRecognitionProcess(IntelligentRecognitionProcess):
- pass
-
-
- '''
- "models": [{
- "code": "模型编号",
- "categories":[{
- "id": "模型id",
- "config": {
- "k1": "v1",
- "k2": "v2"
- }
- }]
- }]
- '''
-
-
- def get_model(args):
- logger.info("######################开始加载模型######################")
- for model in args[2]:
- try:
- code = model.get("code")
- needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")]
- logger.info("code:{}, 检查目标:{}, gpuId:{}", code, needed_objectsIndex, args[1])
- if code == ModelType.WATER_SURFACE_MODEL.value[1]:
- logger.info("######################加载河道模型######################")
- return ModelUtils.SZModel(args[1], needed_objectsIndex), code, args[0].get("sz")
- elif code == ModelType.FOREST_FARM_MODEL.value[1]:
- logger.info("######################加载林场模型######################")
- return ModelUtils.LCModel(args[1], needed_objectsIndex), code, args[0].get("lc")
- elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]:
- logger.info("######################加载交通模型######################")
- return ModelUtils.RFModel(args[1], needed_objectsIndex), code, args[0].get("rf")
- else:
- logger.error("未匹配到对应的模型")
- raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],
- ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1])
- except Exception as e:
- logger.exception("获取模型配置异常: {}", e)
- raise ServiceException(ExceptionType.AI_MODEL_CONFIG_EXCEPTION.value[0],
- ExceptionType.AI_MODEL_CONFIG_EXCEPTION.value[1])
|