From 9f6c1eb8dbe719eb855175a7720d50d20f60799c Mon Sep 17 00:00:00 2001 From: zhoushuliang Date: Thu, 10 Jul 2025 16:54:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20concurrency/FileUploadThre?= =?UTF-8?q?ad.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- concurrency/FileUploadThread.py | 108 +++++++++++++++++--------------- 1 file changed, 58 insertions(+), 50 deletions(-) diff --git a/concurrency/FileUploadThread.py b/concurrency/FileUploadThread.py index f328e53..d662512 100644 --- a/concurrency/FileUploadThread.py +++ b/concurrency/FileUploadThread.py @@ -8,12 +8,13 @@ from loguru import logger import cv2 from entity.FeedBack import message_feedback from enums.ExceptionEnum import ExceptionType +from enums.ModelTypeEnum import ModelType from exception.CustomerException import ServiceException from util.AliyunSdk import AliyunOssSdk from util.MinioSdk import MinioSdk from util import TimeUtils from enums.AnalysisStatusEnum import AnalysisStatus -from util.PlotsUtils import draw_painting_joint +from util.PlotsUtils import draw_painting_joint, draw_name_ocr, draw_name_crowd from util.QueUtil import put_queue, get_no_block_queue, clear_queue import io from util.LocationUtils import locate_byMqtt @@ -42,19 +43,11 @@ class FileUpload(Thread): print("执行替代程序(defaultEnabled=False)") # 这里放非默认逻辑的代码 self._algSwitch = False - + print("---line46 :FileUploadThread.py---",self._algSwitch) - - - - - - - - - - -#如果任务是在线、离线处理,则用此类 + + +#如果任务是在线、离线处理,则用此类 class ImageFileUpload(FileUpload): __slots__ = () @@ -74,7 +67,7 @@ class ImageFileUpload(FileUpload): ''' print('*'*100,' mqtt_list:',len(self._mqtt_list)) - + model_info = [] # 更加模型编码解析数据 for code, det_list in det_xywh.items(): @@ -83,15 +76,27 @@ class ImageFileUpload(FileUpload): if len(target_list) > 0: aFrame = frame.copy() for target in target_list: - draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5]) - igH,igW = aFrame.shape[0:2] - if len(self._mqtt_list)>=1: - #camParas = self._mqtt_list[0]['data'] + # 自研车牌模型判断 + if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code): + box = [target[1][0][0], target[1][0][1], target[1][3][0], target[1][3][1]] + draw_name_ocr(box, aFrame, target[4], target[0]) + cls = 0 + elif ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code): + draw_name_crowd(target[3], aFrame, target[4], cls) + cls = 0 + else: + draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, + target[5]) + + igH, igW = aFrame.shape[0:2] + if len(self._mqtt_list) >= 1: + # camParas = self._mqtt_list[0]['data'] camParas = self._mqtt_list[0] - gps = locate_byMqtt(target[1],igW,igH,camParas,outFormat='wgs84') + gps = locate_byMqtt(target[1], igW, igH, camParas, outFormat='wgs84') else: - gps=[None,None] - model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame,'gps':gps}) + gps = [None, None] + model_info.append( + {"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame, 'gps': gps}) if len(model_info) > 0: image_result = { "or_frame": frame, @@ -130,7 +135,7 @@ class ImageFileUpload(FileUpload): # 获取队列中的消息 image_msg = get_no_block_queue(image_queue) if image_msg is not None: - + if image_msg[0] == 2: logger.info("图片上传线程收到命令:{}, requestId: {}",image_msg[1] ,request_id) if 'stop' == image_msg[1]: @@ -138,7 +143,7 @@ class ImageFileUpload(FileUpload): break if 'algStart' == image_msg[1]: self._algStatus = True; logger.info("图片上传线程,执行算法开启命令, requestId:{}", request_id) if 'algStop' == image_msg[1]: self._algStatus = False; logger.info("图片上传线程,执行算法关闭命令, requestId:{}", request_id) - + if image_msg[0] == 1: image_result = self.handle_image(image_msg[1], frame_step) if image_result is not None: @@ -148,7 +153,7 @@ class ImageFileUpload(FileUpload): image_result["last_frame"], analyse_type, "OR", "0", "0", request_id) - if self._storage_source==1: + if self._storage_source==1: or_future = t.submit(minioSdk.put_object, or_image,or_image_name) else: or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes()) @@ -164,13 +169,13 @@ class ImageFileUpload(FileUpload): model_info["modelCode"], model_info["detectTargetCode"], request_id) - if self._storage_source==1: + if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_image.tobytes()) - + task.append(ai_future) #msg_list.append(message_feedback(request_id, # AnalysisStatus.RUNNING.value, @@ -194,7 +199,7 @@ class ImageFileUpload(FileUpload): longitude=model_info['gps'][0], latitude=model_info['gps'][1], ) ) - + if (not self._algSwitch) or ( self._algStatus and self._algSwitch): for msg in msg_list: put_queue(fb_queue, msg, timeout=2, is_ex=False) @@ -220,7 +225,7 @@ def build_image_name(*args): time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S") return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame, random_num, mode_type, modeCode, target, image_type) - + #如果任务是图像处理,则用此类 class ImageTypeImageFileUpload(Thread): @@ -249,12 +254,20 @@ class ImageTypeImageFileUpload(Thread): if target_list is not None and len(target_list) > 0: aiFrame = copy_frame.copy() for target in target_list: - draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) - model_info.append({ - "modelCode": str(code), - "detectTargetCode": str(cls), - "frame": aiFrame - }) + # 自研车牌模型判断 + if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code): + draw_name_ocr(target[1], aiFrame, font_config[cls], target[0]) + elif ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code): + draw_name_crowd(target[1],aiFrame,font_config[cls],target[0]) + else: + draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) + + model_info.append({ + "modelCode": str(code), + "detectTargetCode": str(cls), + "frame": aiFrame + }) + if len(model_info) > 0: image_result = { "or_frame": copy_frame, @@ -278,7 +291,7 @@ class ImageTypeImageFileUpload(Thread): minioSdk = MinioSdk(base_dir, env, request_id ) else: aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) - + start_time = time() while True: try: @@ -299,12 +312,12 @@ class ImageTypeImageFileUpload(Thread): if det_xywh is None: ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"), result.get("type"), request_id) - - if self._storage_source==1: + + if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name) else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame) - + task.append(ai_future) remote_names.append(ai_image_name) #msg_list.append(message_feedback(request_id, @@ -318,7 +331,7 @@ class ImageTypeImageFileUpload(Thread): else: image_result = self.handle_image(det_xywh, copy_frame, font_config) if image_result: - + # 图片帧数编码 if image_url is None: or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame")) @@ -326,8 +339,8 @@ class ImageTypeImageFileUpload(Thread): image_result.get("last_frame"), analyse_type, "OR", "0", "O", request_id) - - if self._storage_source==1: + + if self._storage_source==1: or_future = t.submit(minioSdk.put_object, or_image,image_url_0) else: or_future = t.submit(aliyunOssSdk.put_object, image_url_0, @@ -344,9 +357,9 @@ class ImageTypeImageFileUpload(Thread): model_info.get("modelCode"), model_info.get("detectTargetCode"), request_id) - if self._storage_source==1: + if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) - else: + else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_image.tobytes()) task.append(ai_future) @@ -362,9 +375,8 @@ class ImageTypeImageFileUpload(Thread): remote_url_list = [] for thread_result in task: remote_url_list.append(thread_result.result()) - - - #以下代码是为了获取图像上传后,返回的全路径地址 + + # 以下代码是为了获取图像上传后,返回的全路径地址 if det_xywh is None: msg_list.append(message_feedback(request_id, AnalysisStatus.RUNNING.value, @@ -396,11 +408,7 @@ class ImageTypeImageFileUpload(Thread): model_info_list[ii].get('modelCode'), model_info_list[ii].get('detectTargetCode'), analyse_results=result)) - - - - for msg in msg_list: put_queue(fb_queue, msg, timeout=2, is_ex=False) else: