更新 concurrency/FileUploadThread.py

This commit is contained in:
zhoushuliang 2025-07-10 16:54:08 +08:00
parent 1542682828
commit 9f6c1eb8db
1 changed files with 58 additions and 50 deletions

View File

@ -8,12 +8,13 @@ from loguru import logger
import cv2 import cv2
from entity.FeedBack import message_feedback from entity.FeedBack import message_feedback
from enums.ExceptionEnum import ExceptionType from enums.ExceptionEnum import ExceptionType
from enums.ModelTypeEnum import ModelType
from exception.CustomerException import ServiceException from exception.CustomerException import ServiceException
from util.AliyunSdk import AliyunOssSdk from util.AliyunSdk import AliyunOssSdk
from util.MinioSdk import MinioSdk from util.MinioSdk import MinioSdk
from util import TimeUtils from util import TimeUtils
from enums.AnalysisStatusEnum import AnalysisStatus from enums.AnalysisStatusEnum import AnalysisStatus
from util.PlotsUtils import draw_painting_joint from util.PlotsUtils import draw_painting_joint, draw_name_ocr, draw_name_crowd
from util.QueUtil import put_queue, get_no_block_queue, clear_queue from util.QueUtil import put_queue, get_no_block_queue, clear_queue
import io import io
from util.LocationUtils import locate_byMqtt from util.LocationUtils import locate_byMqtt
@ -42,19 +43,11 @@ class FileUpload(Thread):
print("执行替代程序defaultEnabled=False") print("执行替代程序defaultEnabled=False")
# 这里放非默认逻辑的代码 # 这里放非默认逻辑的代码
self._algSwitch = False self._algSwitch = False
print("---line46 :FileUploadThread.py---",self._algSwitch) print("---line46 :FileUploadThread.py---",self._algSwitch)
#如果任务是在线、离线处理,则用此类
#如果任务是在线、离线处理,则用此类
class ImageFileUpload(FileUpload): class ImageFileUpload(FileUpload):
__slots__ = () __slots__ = ()
@ -74,7 +67,7 @@ class ImageFileUpload(FileUpload):
''' '''
print('*'*100,' mqtt_list:',len(self._mqtt_list)) print('*'*100,' mqtt_list:',len(self._mqtt_list))
model_info = [] model_info = []
# 更加模型编码解析数据 # 更加模型编码解析数据
for code, det_list in det_xywh.items(): for code, det_list in det_xywh.items():
@ -83,15 +76,27 @@ class ImageFileUpload(FileUpload):
if len(target_list) > 0: if len(target_list) > 0:
aFrame = frame.copy() aFrame = frame.copy()
for target in target_list: for target in target_list:
draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5]) # 自研车牌模型判断
igH,igW = aFrame.shape[0:2] if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code):
if len(self._mqtt_list)>=1: box = [target[1][0][0], target[1][0][1], target[1][3][0], target[1][3][1]]
#camParas = self._mqtt_list[0]['data'] draw_name_ocr(box, aFrame, target[4], target[0])
cls = 0
elif ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code):
draw_name_crowd(target[3], aFrame, target[4], cls)
cls = 0
else:
draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config,
target[5])
igH, igW = aFrame.shape[0:2]
if len(self._mqtt_list) >= 1:
# camParas = self._mqtt_list[0]['data']
camParas = self._mqtt_list[0] camParas = self._mqtt_list[0]
gps = locate_byMqtt(target[1],igW,igH,camParas,outFormat='wgs84') gps = locate_byMqtt(target[1], igW, igH, camParas, outFormat='wgs84')
else: else:
gps=[None,None] gps = [None, None]
model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame,'gps':gps}) model_info.append(
{"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame, 'gps': gps})
if len(model_info) > 0: if len(model_info) > 0:
image_result = { image_result = {
"or_frame": frame, "or_frame": frame,
@ -130,7 +135,7 @@ class ImageFileUpload(FileUpload):
# 获取队列中的消息 # 获取队列中的消息
image_msg = get_no_block_queue(image_queue) image_msg = get_no_block_queue(image_queue)
if image_msg is not None: if image_msg is not None:
if image_msg[0] == 2: if image_msg[0] == 2:
logger.info("图片上传线程收到命令:{}, requestId: {}",image_msg[1] ,request_id) logger.info("图片上传线程收到命令:{}, requestId: {}",image_msg[1] ,request_id)
if 'stop' == image_msg[1]: if 'stop' == image_msg[1]:
@ -138,7 +143,7 @@ class ImageFileUpload(FileUpload):
break break
if 'algStart' == image_msg[1]: self._algStatus = True; logger.info("图片上传线程,执行算法开启命令, requestId:{}", request_id) if 'algStart' == image_msg[1]: self._algStatus = True; logger.info("图片上传线程,执行算法开启命令, requestId:{}", request_id)
if 'algStop' == image_msg[1]: self._algStatus = False; logger.info("图片上传线程,执行算法关闭命令, requestId:{}", request_id) if 'algStop' == image_msg[1]: self._algStatus = False; logger.info("图片上传线程,执行算法关闭命令, requestId:{}", request_id)
if image_msg[0] == 1: if image_msg[0] == 1:
image_result = self.handle_image(image_msg[1], frame_step) image_result = self.handle_image(image_msg[1], frame_step)
if image_result is not None: if image_result is not None:
@ -148,7 +153,7 @@ class ImageFileUpload(FileUpload):
image_result["last_frame"], image_result["last_frame"],
analyse_type, analyse_type,
"OR", "0", "0", request_id) "OR", "0", "0", request_id)
if self._storage_source==1: if self._storage_source==1:
or_future = t.submit(minioSdk.put_object, or_image,or_image_name) or_future = t.submit(minioSdk.put_object, or_image,or_image_name)
else: else:
or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes()) or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes())
@ -164,13 +169,13 @@ class ImageFileUpload(FileUpload):
model_info["modelCode"], model_info["modelCode"],
model_info["detectTargetCode"], model_info["detectTargetCode"],
request_id) request_id)
if self._storage_source==1: if self._storage_source==1:
ai_future = t.submit(minioSdk.put_object, ai_image, ai_future = t.submit(minioSdk.put_object, ai_image,
ai_image_name) ai_image_name)
else: else:
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name,
ai_image.tobytes()) ai_image.tobytes())
task.append(ai_future) task.append(ai_future)
#msg_list.append(message_feedback(request_id, #msg_list.append(message_feedback(request_id,
# AnalysisStatus.RUNNING.value, # AnalysisStatus.RUNNING.value,
@ -194,7 +199,7 @@ class ImageFileUpload(FileUpload):
longitude=model_info['gps'][0], longitude=model_info['gps'][0],
latitude=model_info['gps'][1], latitude=model_info['gps'][1],
) ) ) )
if (not self._algSwitch) or ( self._algStatus and self._algSwitch): if (not self._algSwitch) or ( self._algStatus and self._algSwitch):
for msg in msg_list: for msg in msg_list:
put_queue(fb_queue, msg, timeout=2, is_ex=False) put_queue(fb_queue, msg, timeout=2, is_ex=False)
@ -220,7 +225,7 @@ def build_image_name(*args):
time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S") time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S")
return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame, return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame,
random_num, mode_type, modeCode, target, image_type) random_num, mode_type, modeCode, target, image_type)
#如果任务是图像处理,则用此类 #如果任务是图像处理,则用此类
class ImageTypeImageFileUpload(Thread): class ImageTypeImageFileUpload(Thread):
@ -249,12 +254,20 @@ class ImageTypeImageFileUpload(Thread):
if target_list is not None and len(target_list) > 0: if target_list is not None and len(target_list) > 0:
aiFrame = copy_frame.copy() aiFrame = copy_frame.copy()
for target in target_list: for target in target_list:
draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) # 自研车牌模型判断
model_info.append({ if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code):
"modelCode": str(code), draw_name_ocr(target[1], aiFrame, font_config[cls], target[0])
"detectTargetCode": str(cls), elif ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code):
"frame": aiFrame draw_name_crowd(target[1],aiFrame,font_config[cls],target[0])
}) else:
draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config)
model_info.append({
"modelCode": str(code),
"detectTargetCode": str(cls),
"frame": aiFrame
})
if len(model_info) > 0: if len(model_info) > 0:
image_result = { image_result = {
"or_frame": copy_frame, "or_frame": copy_frame,
@ -278,7 +291,7 @@ class ImageTypeImageFileUpload(Thread):
minioSdk = MinioSdk(base_dir, env, request_id ) minioSdk = MinioSdk(base_dir, env, request_id )
else: else:
aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id)
start_time = time() start_time = time()
while True: while True:
try: try:
@ -299,12 +312,12 @@ class ImageTypeImageFileUpload(Thread):
if det_xywh is None: if det_xywh is None:
ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"), ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"),
result.get("type"), request_id) result.get("type"), request_id)
if self._storage_source==1: if self._storage_source==1:
ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name) ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name)
else: else:
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame) ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame)
task.append(ai_future) task.append(ai_future)
remote_names.append(ai_image_name) remote_names.append(ai_image_name)
#msg_list.append(message_feedback(request_id, #msg_list.append(message_feedback(request_id,
@ -318,7 +331,7 @@ class ImageTypeImageFileUpload(Thread):
else: else:
image_result = self.handle_image(det_xywh, copy_frame, font_config) image_result = self.handle_image(det_xywh, copy_frame, font_config)
if image_result: if image_result:
# 图片帧数编码 # 图片帧数编码
if image_url is None: if image_url is None:
or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame")) or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame"))
@ -326,8 +339,8 @@ class ImageTypeImageFileUpload(Thread):
image_result.get("last_frame"), image_result.get("last_frame"),
analyse_type, analyse_type,
"OR", "0", "O", request_id) "OR", "0", "O", request_id)
if self._storage_source==1: if self._storage_source==1:
or_future = t.submit(minioSdk.put_object, or_image,image_url_0) or_future = t.submit(minioSdk.put_object, or_image,image_url_0)
else: else:
or_future = t.submit(aliyunOssSdk.put_object, image_url_0, or_future = t.submit(aliyunOssSdk.put_object, image_url_0,
@ -344,9 +357,9 @@ class ImageTypeImageFileUpload(Thread):
model_info.get("modelCode"), model_info.get("modelCode"),
model_info.get("detectTargetCode"), model_info.get("detectTargetCode"),
request_id) request_id)
if self._storage_source==1: if self._storage_source==1:
ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name)
else: else:
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name,
ai_image.tobytes()) ai_image.tobytes())
task.append(ai_future) task.append(ai_future)
@ -362,9 +375,8 @@ class ImageTypeImageFileUpload(Thread):
remote_url_list = [] remote_url_list = []
for thread_result in task: for thread_result in task:
remote_url_list.append(thread_result.result()) remote_url_list.append(thread_result.result())
# 以下代码是为了获取图像上传后,返回的全路径地址
#以下代码是为了获取图像上传后,返回的全路径地址
if det_xywh is None: if det_xywh is None:
msg_list.append(message_feedback(request_id, msg_list.append(message_feedback(request_id,
AnalysisStatus.RUNNING.value, AnalysisStatus.RUNNING.value,
@ -396,11 +408,7 @@ class ImageTypeImageFileUpload(Thread):
model_info_list[ii].get('modelCode'), model_info_list[ii].get('modelCode'),
model_info_list[ii].get('detectTargetCode'), model_info_list[ii].get('detectTargetCode'),
analyse_results=result)) analyse_results=result))
for msg in msg_list: for msg in msg_list:
put_queue(fb_queue, msg, timeout=2, is_ex=False) put_queue(fb_queue, msg, timeout=2, is_ex=False)
else: else: