From 57de938d7cb7a89dea3191ff4d8c266eb129fe85 Mon Sep 17 00:00:00 2001 From: zhoushuliang Date: Thu, 10 Jul 2025 17:03:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20concurrency/IntelligentRec?= =?UTF-8?q?ognitionProcess.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- concurrency/IntelligentRecognitionProcess.py | 199 ++++++++++++++----- 1 file changed, 151 insertions(+), 48 deletions(-) diff --git a/concurrency/IntelligentRecognitionProcess.py b/concurrency/IntelligentRecognitionProcess.py index 7c40632..2160aa8 100644 --- a/concurrency/IntelligentRecognitionProcess.py +++ b/concurrency/IntelligentRecognitionProcess.py @@ -91,8 +91,7 @@ class IntelligentRecognitionProcess(Process): hb_thread.start() return hb_thread - - + class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): @@ -229,7 +228,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): # 事件队列、拉流队列、心跳队列、反馈队列 event_queue, pull_queue, hb_queue, fb_queue = self.event_queue, self._pull_queue, self._hb_queue, self._fb_queue - + # 推流队列、推流异常队列、图片队列 push_queue, push_ex_queue, image_queue = self._push_queue, self._push_ex_queue, self._image_queue try: @@ -237,16 +236,15 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): init_log(base_dir, env) # 打印启动日志 logger.info("开始启动实时分析进程!requestId: {}", request_id) - + # 启动拉流进程(包含拉流线程, 图片上传线程,mqtt读取线程) # 拉流进程初始化时间长, 先启动 pull_process = self.start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, 25) #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0, # 启动心跳线程 hb_thread = self.start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context) - - - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0, + + # print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0, # 加载算法模型 model_array = get_model(msg, context, analyse_type) #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #9.5 @@ -273,7 +271,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): raise ServiceException(push_status[1], push_status[2]) # 获取停止指令 event_result = get_no_block_queue(event_queue) - + if event_result: cmdStr = event_result.get("command") #接收到算法开启、或者关闭的命令 @@ -281,7 +279,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): logger.info("发送向推流进程发送算法命令, requestId: {}, {}", request_id,cmdStr ) put_queue(push_queue, (2, cmdStr), timeout=1, is_ex=True) pull_process.sendCommand({"command": cmdStr}) - + # 接收到停止指令 if "stop" == cmdStr: logger.info("实时任务开始停止, requestId: {}", request_id) @@ -301,20 +299,31 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): task_status[0] = 1 for i, model in enumerate(model_array): model_conf, code = model - model_param = model_conf[1] - # (modeType, model_param, allowedList, names, rainbows) - MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0], - model_conf) - if draw_config.get("font_config") is None: - draw_config["font_config"] = model_param['font_config'] - if draw_config.get(code) is None: - draw_config[code] = {} - draw_config[code]["allowedList"] = model_conf[2] - draw_config[code]["rainbows"] = model_conf[4] - draw_config[code]["label_arrays"] = model_param['label_arraylist'] - if "label_dict" in model_param: - draw_config[code]["label_dict"] = model_param['label_dict'] - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) + if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code) or \ + ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code): + if draw_config.get(code) is None: + draw_config[code] = {} + draw_config["font_config"] = model_conf[4] + draw_config[code]["allowedList"] = 0 + draw_config[code]["label_arrays"] = [None] + draw_config[code]["rainbows"] = model_conf[4] + else: + model_param = model_conf[1] + # (modeType, model_param, allowedList, names, rainbows) + MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0], + model_conf) + if draw_config.get("font_config") is None: + draw_config["font_config"] = model_param['font_config'] + if draw_config.get(code) is None: + draw_config[code] = {} + draw_config[code]["allowedList"] = model_conf[2] + draw_config[code]["rainbows"] = model_conf[4] + draw_config[code]["label_arrays"] = model_param['label_arraylist'] + if "label_dict" in model_param: + draw_config[code]["label_dict"] = model_param['label_dict'] + + + # print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) # 多线程并发处理, 经过测试两个线程最优 det_array = [] for i, frame in enumerate(frame_list): @@ -437,23 +446,23 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): __slots__ = () - + def upload_video(self,base_dir, env, request_id, aiFilePath): aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - + if self._storage_source==1: minioSdk = MinioSdk(base_dir, env, request_id ) upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "ai_online_%s.mp4" % request_id) else: aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - + upload_video_thread_ai.setDaemon(True) upload_video_thread_ai.start() ai_url = upload_video_thread_ai.get_result() return ai_url - + ''' @staticmethod def upload_video(base_dir, env, request_id, aiFilePath): @@ -602,7 +611,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): logger.info("发送向推流进程发送算法命令, requestId: {}, {}", request_id,cmdStr ) put_queue(push_queue, (2, cmdStr), timeout=1, is_ex=True) pull_process.sendCommand({"command": cmdStr}) - + pull_result = get_no_block_queue(pull_queue) if pull_result is None: sleep(1) @@ -616,19 +625,31 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): task_status[0] = 1 for i, model in enumerate(model_array): model_conf, code = model - model_param = model_conf[1] - # (modeType, model_param, allowedList, names, rainbows) - MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0], - model_conf) - if draw_config.get("font_config") is None: - draw_config["font_config"] = model_param['font_config'] - if draw_config.get(code) is None: - draw_config[code] = {} - draw_config[code]["allowedList"] = model_conf[2] - draw_config[code]["rainbows"] = model_conf[4] - draw_config[code]["label_arrays"] = model_param['label_arraylist'] - if "label_dict" in model_param: - draw_config[code]["label_dict"] = model_param['label_dict'] + if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code) or \ + ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code): + if draw_config.get(code) is None: + draw_config[code] = {} + draw_config["font_config"] = model_conf[4] + draw_config[code]["allowedList"] = 0 + draw_config[code]["label_arrays"] = [None] + draw_config[code]["rainbows"] = model_conf[4] + + else: + model_param = model_conf[1] + # (modeType, model_param, allowedList, names, rainbows) + MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0], + model_conf) + if draw_config.get("font_config") is None: + draw_config["font_config"] = model_param['font_config'] + if draw_config.get(code) is None: + draw_config[code] = {} + draw_config[code]["allowedList"] = model_conf[2] + draw_config[code]["rainbows"] = model_conf[4] + draw_config[code]["label_arrays"] = model_param['label_arraylist'] + if "label_dict" in model_param: + draw_config[code]["label_dict"] = model_param['label_dict'] + + det_array = [] for i, frame in enumerate(frame_list): det_result = t.submit(self.obj_det, self, model_array, frame, task_status, @@ -922,6 +943,63 @@ class PhotosIntelligentRecognitionProcess(Process): logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id) raise e + #自研究车牌模型 + def carplate_rec(self, imageUrl, mod, image_queue, request_id): + try: + # model_conf: modeType, allowedList, detpar, ocrmodel, rainbows + model_conf, code = mod + modeType, device, modelList, detpar, rainbows = model_conf + image = url2Array(imageUrl) + dets = {code: {}} + # param = [image, new_device, model, par, img_type, request_id] + # model_conf, frame, device, requestId + dataBack = MODEL_CONFIG[code][3]([[modeType, device, modelList, detpar], image, request_id])[0][2] + dets[code][0] = dataBack + if not dataBack: + logger.info("车牌识别为空") + + # for ai_result in dataBack: + # label, box = ai_result + # color = rainbows + + if len(dataBack) > 0: + put_queue(image_queue, (1, (dets, imageUrl, image, rainbows, "")), timeout=2, is_ex=False) + + except ServiceException as s: + raise s + except Exception as e: + logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id) + raise e + + #密集人群计数 + def denscrowdcount_rec(self, imageUrl, mod, image_queue, request_id): + try: + # model_conf: modeType, allowedList, detpar, ocrmodel, rainbows + model_conf, code = mod + modeType, device, model, postPar, rainbows = model_conf + image = url2Array(imageUrl) + dets = {code: {}} + # param = [image, new_device, model, par, img_type, request_id] + # model_conf, frame, device, requestId + dataBack = MODEL_CONFIG[code][3]([[modeType, device, model, postPar], image, request_id])[0][2] + logger.info("当前人数:{}", dataBack[0][0]) + dets[code][0] = dataBack + if not dataBack: + logger.info("当前页面无人") + + # for ai_result in dataBack: + # label, box = ai_result + # color = rainbows + + if len(dataBack) > 0: + put_queue(image_queue, (1, (dets, imageUrl, image, rainbows, '')), timeout=2, is_ex=False) + + except ServiceException as s: + raise s + except Exception as e: + logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id) + raise e + ''' # 防疫模型 ''' @@ -936,6 +1014,26 @@ class PhotosIntelligentRecognitionProcess(Process): for r in obj_list: r.result(60) + # 自研车牌识别模型: + def carpalteRec(self, imageUrls, model, image_queue, request_id): + with ThreadPoolExecutor(max_workers=2) as t: + obj_list = [] + for imageUrl in imageUrls: + obj = t.submit(self.carplate_rec, imageUrl, model, image_queue, request_id) + obj_list.append(obj) + for r in obj_list: + r.result(60) + + # 密集人群计数:CITY_DENSECROWDCOUNT_MODEL + def denscrowdcountRec(self, imageUrls, model, image_queue, request_id): + with ThreadPoolExecutor(max_workers=2) as t: + obj_list = [] + for imageUrl in imageUrls: + obj = t.submit(self.denscrowdcount_rec, imageUrl, model, image_queue, request_id) + obj_list.append(obj) + for r in obj_list: + r.result(60) + def image_recognition(self, imageUrl, mod, image_queue, logo, request_id): try: model_conf, code = mod @@ -1125,7 +1223,7 @@ class PhotosIntelligentRecognitionProcess(Process): except requests.exceptions.RequestException as e: # 捕获请求过程中可能出现的异常(如网络问题、超时等) return False,str(e) - + def run(self): fb_queue, msg, analyse_type, context = self._fb_queue, self._msg, self._analyse_type, self._context request_id, logo, image_queue = msg["request_id"], context['logo'], self._image_queue @@ -1136,7 +1234,7 @@ class PhotosIntelligentRecognitionProcess(Process): valFlag=True for url in imageUrls: valFlag,ret = self.check_ImageUrl_Vaild(url,timeout=1) - + if not valFlag: logger.error("图片分析异常: {}, requestId:{},url:{}",ret, request_id,url) #print("AnalysisStatus.FAILED.value:{},ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0]:{},ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1]:{}".format(AnalysisStatus.FAILED.value,ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0],ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1] ) ) @@ -1168,6 +1266,14 @@ class PhotosIntelligentRecognitionProcess(Process): elif model[1] == ModelType.PLATE_MODEL.value[1]: result = t.submit(self.epidemicPrevention, imageUrls, model, base_dir, env, request_id) task_list.append(result) + # 自研车牌模型 + elif model[1] == ModelType.CITY_CARPLATE_MODEL.value[1]: + result = t.submit(self.carpalteRec, imageUrls, model, image_queue, request_id) + task_list.append(result) + # 人群计数模型 + elif model[1] == ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1]: + result = t.submit(self.denscrowdcountRec, imageUrls, model, image_queue, request_id) + task_list.append(result) else: result = t.submit(self.publicIdentification, imageUrls, model, image_queue, logo, request_id) task_list.append(result) @@ -1214,7 +1320,7 @@ class ScreenRecordingProcess(Process): put_queue(self._fb_queue, recording_feedback(self._msg["request_id"], RecordingStatus.RECORDING_WAITING.value[0]), timeout=1, is_ex=True) - self._storage_source = self._context['service']['storage_source'] + self._storage_source = self._context['service']['storage_source'] def sendEvent(self, result): put_queue(self._event_queue, result, timeout=2, is_ex=True) @@ -1380,9 +1486,6 @@ class ScreenRecordingProcess(Process): clear_queue(self._hb_queue) clear_queue(self._pull_queue) - - - def upload_video(self,base_dir, env, request_id, orFilePath): if self._storage_source==1: minioSdk = MinioSdk(base_dir, env, request_id ) @@ -1390,7 +1493,7 @@ class ScreenRecordingProcess(Process): else: aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - + upload_video_thread_ai.setDaemon(True) upload_video_thread_ai.start() or_url = upload_video_thread_ai.get_result()