diff --git a/concurrency/FileUploadThread.py b/concurrency/FileUploadThread.py index d662512..8ee9a61 100644 --- a/concurrency/FileUploadThread.py +++ b/concurrency/FileUploadThread.py @@ -6,21 +6,26 @@ from traceback import format_exc from loguru import logger import cv2 +import requests +import json +import base64 from entity.FeedBack import message_feedback from enums.ExceptionEnum import ExceptionType -from enums.ModelTypeEnum import ModelType from exception.CustomerException import ServiceException from util.AliyunSdk import AliyunOssSdk from util.MinioSdk import MinioSdk from util import TimeUtils from enums.AnalysisStatusEnum import AnalysisStatus -from util.PlotsUtils import draw_painting_joint, draw_name_ocr, draw_name_crowd +from util.PlotsUtils import draw_painting_joint from util.QueUtil import put_queue, get_no_block_queue, clear_queue import io from util.LocationUtils import locate_byMqtt +# +from common.YmlConstant import service_yml_path + class FileUpload(Thread): - __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg','_mqtt_list') + __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg', '_mqtt_list') def __init__(self, *args): super().__init__() @@ -30,8 +35,8 @@ class FileUpload(Thread): # self._algStatus = True # 默认关闭 self._algSwitch = self._context['service']['algSwitch'] - - + self.is_use_llm = True + self.cnt = 0 #0521: default_enabled = str(self._msg.get("defaultEnabled", "True")).lower() == "true" @@ -44,15 +49,64 @@ class FileUpload(Thread): # 这里放非默认逻辑的代码 self._algSwitch = False - print("---line46 :FileUploadThread.py---",self._algSwitch) - - -#如果任务是在线、离线处理,则用此类 +# 如果任务是在线、离线处理,则用此类 class ImageFileUpload(FileUpload): __slots__ = () + def extract_answers(self, text): # 提取大模型中的分点文本内容 + lines = [line.strip() for line in text.split('.') if line.strip()] + answers = [] + for i, line in enumerate(lines): + if i > 0: + if i < len(lines) - 1: + answers.append(line[:-1]) + else: + answers.append(line) + return answers + + def get_llm_res(self, image, prompt): + SERVER_IP = "192.168.10.11" + API_URL = f"http://{SERVER_IP}:8000/generate" + + # _, buffer = cv2.imencode('.jpg', image) + # is_success, buffer = cv2.imencode('.png', image) + is_success, buffer = cv2.imencode('.webp', image, [cv2.IMWRITE_WEBP_QUALITY, 100]) + image_base64 = base64.b64encode(buffer).decode('utf-8') + + conversation = { + "prompt": prompt, + "image_base64": image_base64, + } + + try: + logger.info(f"正在向 {API_URL} 发送请求...") + response = requests.post(API_URL, json=conversation, timeout=60) # 设置超时 + + response.raise_for_status() # 如果状态码不是2xx,会抛出异常 + + resp = response.json() + result = resp.get("response") + + logger.info("\n--- 请求成功 ---") + logger.info(f"原始Prompt: {prompt}") + logger.info(f"模型生成结果: {result}") + + except requests.exceptions.RequestException as e: + logger.info(f"\n--- 请求失败 ---") + logger.info(f"发生错误: {e}") + except json.JSONDecodeError: + logger.info("\n--- 解析失败 ---") + logger.info(f"无法解析服务器返回的响应: {response.text}") + + new_result = self.extract_answers(result) + new_result = new_result[1] # 获取第二点内容 + # flag = "不" in result or ("没" in result and "没错" not in result) or "否" in result or "未" in result or "无" in result + flag = "不存在" in new_result or ("没" in new_result and "没错" not in new_result) or ( + "否" in new_result and "是否" not in new_result) or "未" in new_result or "无法" in new_result + return not flag, new_result + #@staticmethod - def handle_image(self,frame_msg, frame_step): + def handle_image(self, frame_msg, frame_step): # (high_score_image["code"], all_frames, draw_config["font_config"]) # high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list) det_xywh, frame, current_frame, all_frames, font_config = frame_msg @@ -67,7 +121,6 @@ class ImageFileUpload(FileUpload): ''' print('*'*100,' mqtt_list:',len(self._mqtt_list)) - model_info = [] # 更加模型编码解析数据 for code, det_list in det_xywh.items(): @@ -76,27 +129,15 @@ class ImageFileUpload(FileUpload): if len(target_list) > 0: aFrame = frame.copy() for target in target_list: - # 自研车牌模型判断 - if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code): - box = [target[1][0][0], target[1][0][1], target[1][3][0], target[1][3][1]] - draw_name_ocr(box, aFrame, target[4], target[0]) - cls = 0 - elif ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code): - draw_name_crowd(target[3], aFrame, target[4], cls) - cls = 0 - else: - draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, - target[5]) - - igH, igW = aFrame.shape[0:2] - if len(self._mqtt_list) >= 1: - # camParas = self._mqtt_list[0]['data'] + draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5]) + igH,igW = aFrame.shape[0:2] + if len(self._mqtt_list)>=1: + #camParas = self._mqtt_list[0]['data'] camParas = self._mqtt_list[0] - gps = locate_byMqtt(target[1], igW, igH, camParas, outFormat='wgs84') + gps = locate_byMqtt(target[1],igW,igH,camParas,outFormat='wgs84') else: - gps = [None, None] - model_info.append( - {"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame, 'gps': gps}) + gps=[None,None] + model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame,'gps':gps}) if len(model_info) > 0: image_result = { "or_frame": frame, @@ -135,7 +176,7 @@ class ImageFileUpload(FileUpload): # 获取队列中的消息 image_msg = get_no_block_queue(image_queue) if image_msg is not None: - + if image_msg[0] == 2: logger.info("图片上传线程收到命令:{}, requestId: {}",image_msg[1] ,request_id) if 'stop' == image_msg[1]: @@ -143,7 +184,7 @@ class ImageFileUpload(FileUpload): break if 'algStart' == image_msg[1]: self._algStatus = True; logger.info("图片上传线程,执行算法开启命令, requestId:{}", request_id) if 'algStop' == image_msg[1]: self._algStatus = False; logger.info("图片上传线程,执行算法关闭命令, requestId:{}", request_id) - + if image_msg[0] == 1: image_result = self.handle_image(image_msg[1], frame_step) if image_result is not None: @@ -153,30 +194,70 @@ class ImageFileUpload(FileUpload): image_result["last_frame"], analyse_type, "OR", "0", "0", request_id) - if self._storage_source==1: - or_future = t.submit(minioSdk.put_object, or_image,or_image_name) + if self._storage_source==1: + or_future = t.submit(minioSdk.put_object, or_image, or_image_name) else: or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes()) task.append(or_future) model_info_list = image_result["model_info"] + llm_flag_list = [] msg_list = [] for model_info in model_info_list: - ai_image = cv2.imencode(".jpg", model_info["aFrame"])[1] - ai_image_name = build_image_name(image_result["current_frame"], - image_result["last_frame"], - analyse_type, - "AI", - model_info["modelCode"], - model_info["detectTargetCode"], - request_id) - if self._storage_source==1: - ai_future = t.submit(minioSdk.put_object, ai_image, - ai_image_name) - else: - ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, - ai_image.tobytes()) + aFrame = model_info["aFrame"] + # aFrame_rgb = cv2.cvtColor(aFrame, cv2.COLOR_BGR2RGB) + + tar_cls = model_info["detectTargetCode"] + logger.info("目标类别:{}", tar_cls) + + # is_use_llm = True + if tar_cls == '0': + # tar = "明火" + tar = "2. 判断图中是否有明火。3. 如果有,识别火势的大小,比如:[无情况/小/中/大]" + elif tar_cls == '1': + # tar = "某种物质燃烧产生的烟雾" + tar = "2. 判断图中是否有烟雾。3. 如果有,识别烟雾的颜色,比如:[无烟雾/白/灰白/灰黑]" + else: + self.is_use_llm = False + llm_flag = True + if self.is_use_llm: + # prompt = f"我看到图中有{tar},请你判断是否正确。简要回答。" + prompt = f"分点进行输出:1. 简短描述图像中的场景。{tar}" + logger.info("检测到目标,大模型提示词:{}", prompt) + llm_flag, llm_res = self.get_llm_res(aFrame, prompt) + llm_flag_list.append(llm_flag) + logger.info("检测到目标,大模型识别图中是否存在目标:{}, 大模型输出:{}", llm_flag, llm_res) + + if llm_flag: + logger.info("经大模型筛查,小模型识别正确!!!") + # cv2.imwrite(f"/home/thsw2/wei/image_res/{self.cnt}_{tar_cls}_1.jpg", aFrame, [cv2.IMWRITE_JPEG_QUALITY, 80]) + # self.cnt = self.cnt + 1 + else: + logger.info("经大模型筛查,小模型识别错误!!!") + # cv2.imwrite(f"/home/thsw2/wei/image_res/{self.cnt}_{tar_cls}_0.jpg", aFrame, [cv2.IMWRITE_JPEG_QUALITY, 80]) + # self.cnt = self.cnt + 1 + else: + llm_flag_list.append(llm_flag) + + ai_image = cv2.imencode(".jpg", aFrame)[1] + ai_image_name = build_image_name(image_result["current_frame"], + image_result["last_frame"], + analyse_type, + "AI", + model_info["modelCode"], + model_info["detectTargetCode"], + request_id) + if llm_flag: # 根据大模型结果判定是否上传 + if self._storage_source==1: + ai_future = t.submit(minioSdk.put_object, ai_image, + ai_image_name) + else: + ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, + ai_image.tobytes()) + + task.append(ai_future) + + # print("###line288",len(task)) - task.append(ai_future) #msg_list.append(message_feedback(request_id, # AnalysisStatus.RUNNING.value, # analyse_type, "", "", "", @@ -184,22 +265,24 @@ class ImageFileUpload(FileUpload): # ai_image_name, # model_info['modelCode'], # model_info['detectTargetCode'])) - remote_image_list=[] + + remote_image_list = [] for tk in task: - remote_image_list.append( tk.result()) - - for ii,model_info in enumerate(model_info_list): - msg_list.append( message_feedback(request_id, - AnalysisStatus.RUNNING.value, - analyse_type, "", "", "", - remote_image_list[0], - remote_image_list[ii+1], - model_info['modelCode'], - model_info['detectTargetCode'], - longitude=model_info['gps'][0], - latitude=model_info['gps'][1], - ) ) + remote_image_list.append( tk.result() ) + for ii, model_info in enumerate(model_info_list): + if llm_flag_list[ii]: + msg_list.append( message_feedback(request_id, + AnalysisStatus.RUNNING.value, + analyse_type, "", "", "", + remote_image_list[0], + remote_image_list[ii+1], + model_info['modelCode'], + model_info['detectTargetCode'], + longitude=model_info['gps'][0], + latitude=model_info['gps'][1], + ) ) + if (not self._algSwitch) or ( self._algStatus and self._algSwitch): for msg in msg_list: put_queue(fb_queue, msg, timeout=2, is_ex=False) @@ -225,7 +308,7 @@ def build_image_name(*args): time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S") return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame, random_num, mode_type, modeCode, target, image_type) - + #如果任务是图像处理,则用此类 class ImageTypeImageFileUpload(Thread): @@ -254,20 +337,12 @@ class ImageTypeImageFileUpload(Thread): if target_list is not None and len(target_list) > 0: aiFrame = copy_frame.copy() for target in target_list: - # 自研车牌模型判断 - if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code): - draw_name_ocr(target[1], aiFrame, font_config[cls], target[0]) - elif ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code): - draw_name_crowd(target[1],aiFrame,font_config[cls],target[0]) - else: - draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) - - model_info.append({ - "modelCode": str(code), - "detectTargetCode": str(cls), - "frame": aiFrame - }) - + draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) + model_info.append({ + "modelCode": str(code), + "detectTargetCode": str(cls), + "frame": aiFrame + }) if len(model_info) > 0: image_result = { "or_frame": copy_frame, @@ -291,7 +366,7 @@ class ImageTypeImageFileUpload(Thread): minioSdk = MinioSdk(base_dir, env, request_id ) else: aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) - + start_time = time() while True: try: @@ -312,12 +387,12 @@ class ImageTypeImageFileUpload(Thread): if det_xywh is None: ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"), result.get("type"), request_id) - - if self._storage_source==1: + + if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name) else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame) - + task.append(ai_future) remote_names.append(ai_image_name) #msg_list.append(message_feedback(request_id, @@ -331,7 +406,7 @@ class ImageTypeImageFileUpload(Thread): else: image_result = self.handle_image(det_xywh, copy_frame, font_config) if image_result: - + # 图片帧数编码 if image_url is None: or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame")) @@ -339,8 +414,8 @@ class ImageTypeImageFileUpload(Thread): image_result.get("last_frame"), analyse_type, "OR", "0", "O", request_id) - - if self._storage_source==1: + + if self._storage_source==1: or_future = t.submit(minioSdk.put_object, or_image,image_url_0) else: or_future = t.submit(aliyunOssSdk.put_object, image_url_0, @@ -357,9 +432,9 @@ class ImageTypeImageFileUpload(Thread): model_info.get("modelCode"), model_info.get("detectTargetCode"), request_id) - if self._storage_source==1: + if self._storage_source==1: ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) - else: + else: ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_image.tobytes()) task.append(ai_future) @@ -375,8 +450,9 @@ class ImageTypeImageFileUpload(Thread): remote_url_list = [] for thread_result in task: remote_url_list.append(thread_result.result()) - - # 以下代码是为了获取图像上传后,返回的全路径地址 + + + #以下代码是为了获取图像上传后,返回的全路径地址 if det_xywh is None: msg_list.append(message_feedback(request_id, AnalysisStatus.RUNNING.value, @@ -408,7 +484,11 @@ class ImageTypeImageFileUpload(Thread): model_info_list[ii].get('modelCode'), model_info_list[ii].get('detectTargetCode'), analyse_results=result)) + + + + for msg in msg_list: put_queue(fb_queue, msg, timeout=2, is_ex=False) else: diff --git a/config/service/dsp_test_service.yml b/config/service/dsp_test_service.yml index ab0def4..1e2129f 100644 --- a/config/service/dsp_test_service.yml +++ b/config/service/dsp_test_service.yml @@ -25,6 +25,10 @@ service: # 2 模型追踪 model_type: 1 limit: 3 + # 新增:大模型识别配置 + llm: + server_ip: "192.168.10.11" + api_url: "http://192.168.10.11:8000/generate" task: # 任务限制5个 limit: 5 @@ -35,5 +39,4 @@ service: #是否启用mqtt,0--不用,1--启用 mqtt_flag: 0 #是否启用alg控制功能 - algSwitch: False - + algSwitch: False \ No newline at end of file