# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor from threading import Thread from time import sleep, time from traceback import format_exc from loguru import logger import cv2 from entity.FeedBack import message_feedback from enums.ExceptionEnum import ExceptionType from exception.CustomerException import ServiceException from util.AliyunSdk import AliyunOssSdk from util.MinioSdk import MinioSdk from util import TimeUtils from enums.AnalysisStatusEnum import AnalysisStatus from util.PlotsUtils import draw_painting_joint from util.QueUtil import put_queue, get_no_block_queue, clear_queue import io from util.LocationUtils import locate_byMqtt class FileUpload(Thread): __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg','_mqtt_list') def __init__(self, *args): super().__init__() self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type,self._mqtt_list = args self._storage_source = self._context['service']['storage_source'] self._algStatus = False # 默认关闭 self._algSwitch = self._context['service']['algSwitch'] #如果任务是在线、离线处理,则用此类 class ImageFileUpload(FileUpload): __slots__ = () #@staticmethod def handle_image(self,frame_msg, frame_step): # (high_score_image["code"], all_frames, draw_config["font_config"]) # high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list) det_xywh, frame, current_frame, all_frames, font_config = frame_msg ''' det_xywh:{ 'code':{ 1: [[detect_targets_code, box, score, label_array, color,is_new]] } } #is_new--是否是新的目标。 模型编号:modeCode 检测目标:detectTargetCode ''' print('*'*100,' mqtt_list:',len(self._mqtt_list)) model_info = [] # 更加模型编码解析数据 for code, det_list in det_xywh.items(): if len(det_list) > 0: for cls, target_list in det_list.items(): if len(target_list) > 0: aFrame = frame.copy() for target in target_list: draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5]) igH,igW = aFrame.shape[0:2] if len(self._mqtt_list)>=1: #camParas = self._mqtt_list[0]['data'] camParas = self._mqtt_list[0] gps = locate_byMqtt(target[1],igW,igH,camParas,outFormat='wgs84') else: gps=[None,None] model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame,'gps':gps}) if len(model_info) > 0: image_result = { "or_frame": frame, "model_info": model_info, "current_frame": current_frame, "last_frame": current_frame + frame_step } return image_result return None def run(self): msg, context = self._msg, self._context service = context["service"] base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"] logger.info("启动图片上传线程, requestId: {}", request_id) image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type service_timeout = int(service["timeout"]) frame_step = int(service["filter"]["frame_step"]) + 120 if msg['taskType']==0: self._algStatus = False else: self._algStatus = True try: with ThreadPoolExecutor(max_workers=2) as t: # 初始化oss客户端 if self._storage_source==1: minioSdk = MinioSdk(base_dir, env, request_id ) else: aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) start_time = time() while True: try: if time() - start_time > service_timeout: logger.error("图片上传线程运行超时, requestId: {}", request_id) break raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0], ExceptionType.TASK_EXCUTE_TIMEOUT.value[1]) # 获取队列中的消息 image_msg = get_no_block_queue(image_queue) if image_msg is not None: if image_msg[0] == 2: logger.info("图片上传线程收到命令:{}, requestId: {}",image_msg[1] ,request_id) if 'stop' == image_msg[1]: logger.info("开始停止图片上传线程, requestId:{}", request_id) break if 'algStart' == image_msg[1]: self._algStatus = True; logger.info("图片上传线程,执行算法开启命令, requestId:{}", request_id) if 'algStop' == image_msg[1]: self._algStatus = False; logger.info("图片上传线程,执行算法关闭命令, requestId:{}", request_id) if image_msg[0] == 1: image_result = self.handle_image(image_msg[1], frame_step) if image_result is not None: task = [] or_image = cv2.imencode(".jpg", image_result["or_frame"])[1] or_image_name = build_image_name(image_result["current_frame"], image_result["last_frame"], analyse_type, "OR", "0", "0", request_id) if self._storage_source==1: or_future = t.submit(minioSdk.put_object, or_image,or_image_name) else: or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes()) task.append(or_future) model_info_list = image_result["model_info"] msg_list = [] for model_info in model_info_list: ai_image = cv2.imencode(".jpg", model_info["aFrame"])[1] ai_image_name = build_image_name(image_result["current_frame"], image_result["last_frame"], analyse_type, "AI", model_info["modelCode"], model_info["detectTargetCode"], request_id) if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_image.tobytes()) task.append(ai_future) #msg_list.append(message_feedback(request_id, # AnalysisStatus.RUNNING.value, # analyse_type, "", "", "", # or_image_name, # ai_image_name, # model_info['modelCode'], # model_info['detectTargetCode'])) remote_image_list=[] for tk in task: remote_image_list.append( tk.result()) for ii,model_info in enumerate(model_info_list): msg_list.append( message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type, "", "", "", remote_image_list[0], remote_image_list[ii+1], model_info['modelCode'], model_info['detectTargetCode'], longitude=model_info['gps'][0], latitude=model_info['gps'][1], ) ) if (not self._algSwitch) or ( self._algStatus and self._algSwitch): for msg in msg_list: put_queue(fb_queue, msg, timeout=2, is_ex=False) del task, msg_list else: sleep(1) del image_msg except Exception: logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id) finally: logger.info("停止图片上传线程0, requestId:{}", request_id) clear_queue(image_queue) logger.info("停止图片上传线程1, requestId:{}", request_id) def build_image_name(*args): """ {requestId}/{time_now}_frame-{current_frame}-{last_frame}_type_{random_num}-{mode_type}" \ "-{modeCode}-{target}_{image_type}.jpg """ current_frame, last_frame, mode_type, image_type, modeCode, target, request_id = args random_num = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF) time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S") return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame, random_num, mode_type, modeCode, target, image_type) #如果任务是图像处理,则用此类 class ImageTypeImageFileUpload(Thread): __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg') def __init__(self, *args): super().__init__() self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args self._storage_source = self._context['service']['storage_source'] @staticmethod def handle_image(det_xywh, copy_frame, font_config): """ det_xywh:{ 'code':{ 1: [[detect_targets_code, box, score, label_array, color]] } } 模型编号:modeCode 检测目标:detectTargetCode """ model_info = [] # 更加模型编码解析数据 for code, det_info in det_xywh.items(): if det_info is not None and len(det_info) > 0: for cls, target_list in det_info.items(): if target_list is not None and len(target_list) > 0: aiFrame = copy_frame.copy() for target in target_list: draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) model_info.append({ "modelCode": str(code), "detectTargetCode": str(cls), "frame": aiFrame }) if len(model_info) > 0: image_result = { "or_frame": copy_frame, "model_info": model_info, "current_frame": 0, "last_frame": 0 } return image_result return None def run(self): context, msg = self._context, self._msg base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"] logger.info("启动图片识别图片上传线程, requestId: {}", request_id) image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type service_timeout = int(context["service"]["timeout"]) with ThreadPoolExecutor(max_workers=2) as t: try: # 初始化oss客户端 if self._storage_source==1: minioSdk = MinioSdk(base_dir, env, request_id ) else: aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) start_time = time() while True: try: if time() - start_time > service_timeout: logger.error("图片上传进程运行超时, requestId: {}", request_id) break # 获取队列中的消息 image_msg = image_queue.get() if image_msg is not None: if image_msg[0] == 2: if 'stop' == image_msg[1]: logger.info("开始停止图片上传线程, requestId:{}", request_id) break if image_msg[0] == 1: task, msg_list = [], [] det_xywh, image_url, copy_frame, font_config, result = image_msg[1] remote_names = [] if det_xywh is None: ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"), result.get("type"), request_id) if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name) else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame) task.append(ai_future) remote_names.append(ai_image_name) #msg_list.append(message_feedback(request_id, # AnalysisStatus.RUNNING.value, # analyse_type, "", "", "", # image_url, # ai_image_name, # result.get("modelCode"), # result.get("type"), # analyse_results=result)) else: image_result = self.handle_image(det_xywh, copy_frame, font_config) if image_result: # 图片帧数编码 if image_url is None: or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame")) image_url_0 = build_image_name(image_result.get("current_frame"), image_result.get("last_frame"), analyse_type, "OR", "0", "O", request_id) if self._storage_source==1: or_future = t.submit(minioSdk.put_object, or_image,image_url_0) else: or_future = t.submit(aliyunOssSdk.put_object, image_url_0, or_image.tobytes()) task.append(or_future) remote_names.append(image_url_0) model_info_list = image_result.get("model_info") for model_info in model_info_list: ai_result, ai_image = cv2.imencode(".jpg", model_info.get("frame")) ai_image_name = build_image_name(image_result.get("current_frame"), image_result.get("last_frame"), analyse_type, "AI", model_info.get("modelCode"), model_info.get("detectTargetCode"), request_id) if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_image.tobytes()) task.append(ai_future) remote_names.append(ai_image_name) #msg_list.append(message_feedback(request_id, # AnalysisStatus.RUNNING.value, # analyse_type, "", "", "", # image_url, # ai_image_name, # model_info.get('modelCode'), # model_info.get('detectTargetCode'), # analyse_results=result)) remote_url_list = [] for thread_result in task: remote_url_list.append(thread_result.result()) #以下代码是为了获取图像上传后,返回的全路径地址 if det_xywh is None: msg_list.append(message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type, "", "", "", image_url, remote_url_list[0], result.get("modelCode"), result.get("type"), analyse_results=result)) else: if image_result: if image_url is None: for ii in range(len(remote_names)-1): msg_list.append(message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type, "", "", "", remote_url_list[0], remote_url_list[1+ii], model_info.get('modelCode'), model_info.get('detectTargetCode'), analyse_results=result)) else: for ii in range(len(remote_names)): msg_list.append(message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type, "", "", "", image_url, remote_url_list[ii], model_info_list[ii].get('modelCode'), model_info_list[ii].get('detectTargetCode'), analyse_results=result)) for msg in msg_list: put_queue(fb_queue, msg, timeout=2, is_ex=False) else: sleep(1) except Exception as e: logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id) finally: clear_queue(image_queue) logger.info("停止图片识别图片上传线程, requestId:{}", request_id)