306 lines
18 KiB
Python
306 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from threading import Thread
|
||
from time import sleep, time
|
||
from traceback import format_exc
|
||
|
||
from loguru import logger
|
||
import cv2
|
||
|
||
from entity.FeedBack import message_feedback
|
||
from enums.ExceptionEnum import ExceptionType
|
||
from exception.CustomerException import ServiceException
|
||
from util.AliyunSdk import AliyunOssSdk
|
||
from util.MinioSdk import MinioSdk
|
||
from util import TimeUtils
|
||
from enums.AnalysisStatusEnum import AnalysisStatus
|
||
from util.PlotsUtils import draw_painting_joint
|
||
from util.QueUtil import put_queue, get_no_block_queue, clear_queue
|
||
import io
|
||
|
||
class FileUpload(Thread):
|
||
__slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg')
|
||
|
||
def __init__(self, *args):
|
||
super().__init__()
|
||
self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args
|
||
self._storage_source = self._context['service']['storage_source']
|
||
|
||
class ImageFileUpload(FileUpload):
|
||
__slots__ = ()
|
||
|
||
@staticmethod
|
||
def handle_image(frame_msg, frame_step):
|
||
# (high_score_image["code"], all_frames, draw_config["font_config"])
|
||
# high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list)
|
||
det_xywh, frame, current_frame, all_frames, font_config = frame_msg
|
||
'''
|
||
det_xywh:{
|
||
'code':{
|
||
1: [[detect_targets_code, box, score, label_array, color]]
|
||
}
|
||
}
|
||
模型编号:modeCode
|
||
检测目标:detectTargetCode
|
||
'''
|
||
model_info = []
|
||
# 更加模型编码解析数据
|
||
for code, det_list in det_xywh.items():
|
||
if len(det_list) > 0:
|
||
for cls, target_list in det_list.items():
|
||
if len(target_list) > 0:
|
||
aFrame = frame.copy()
|
||
for target in target_list:
|
||
draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5])
|
||
model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame})
|
||
if len(model_info) > 0:
|
||
image_result = {
|
||
"or_frame": frame,
|
||
"model_info": model_info,
|
||
"current_frame": current_frame,
|
||
"last_frame": current_frame + frame_step
|
||
}
|
||
return image_result
|
||
return None
|
||
|
||
def run(self):
|
||
msg, context = self._msg, self._context
|
||
service = context["service"]
|
||
base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"]
|
||
logger.info("启动图片上传线程, requestId: {}", request_id)
|
||
image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type
|
||
service_timeout = int(service["timeout"])
|
||
frame_step = int(service["filter"]["frame_step"]) + 120
|
||
try:
|
||
with ThreadPoolExecutor(max_workers=2) as t:
|
||
# 初始化oss客户端
|
||
if self._storage_source==1:
|
||
minioSdk = MinioSdk(base_dir, env, request_id )
|
||
else:
|
||
aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id)
|
||
start_time = time()
|
||
while True:
|
||
try:
|
||
if time() - start_time > service_timeout:
|
||
logger.error("图片上传线程运行超时, requestId: {}", request_id)
|
||
break
|
||
raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
|
||
ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
|
||
# 获取队列中的消息
|
||
image_msg = get_no_block_queue(image_queue)
|
||
if image_msg is not None:
|
||
if image_msg[0] == 2:
|
||
if 'stop' == image_msg[1]:
|
||
logger.info("开始停止图片上传线程, requestId:{}", request_id)
|
||
break
|
||
if image_msg[0] == 1:
|
||
image_result = self.handle_image(image_msg[1], frame_step)
|
||
if image_result is not None:
|
||
task = []
|
||
or_image = cv2.imencode(".jpg", image_result["or_frame"])[1]
|
||
or_image_name = build_image_name(image_result["current_frame"],
|
||
image_result["last_frame"],
|
||
analyse_type,
|
||
"OR", "0", "0", request_id)
|
||
if self._storage_source==1:
|
||
or_future = t.submit(minioSdk.put_object, or_image,or_image_name)
|
||
else:
|
||
or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes())
|
||
task.append(or_future)
|
||
model_info_list = image_result["model_info"]
|
||
msg_list = []
|
||
for model_info in model_info_list:
|
||
ai_image = cv2.imencode(".jpg", model_info["aFrame"])[1]
|
||
ai_image_name = build_image_name(image_result["current_frame"],
|
||
image_result["last_frame"],
|
||
analyse_type,
|
||
"AI",
|
||
model_info["modelCode"],
|
||
model_info["detectTargetCode"],
|
||
request_id)
|
||
if self._storage_source==1:
|
||
ai_future = t.submit(minioSdk.put_object, ai_image,
|
||
ai_image_name)
|
||
else:
|
||
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name,
|
||
ai_image.tobytes())
|
||
|
||
task.append(ai_future)
|
||
msg_list.append(message_feedback(request_id,
|
||
AnalysisStatus.RUNNING.value,
|
||
analyse_type, "", "", "",
|
||
or_image_name,
|
||
ai_image_name,
|
||
model_info['modelCode'],
|
||
model_info['detectTargetCode']))
|
||
for tk in task:
|
||
tk.result()
|
||
for msg in msg_list:
|
||
put_queue(fb_queue, msg, timeout=2, is_ex=False)
|
||
del task, msg_list
|
||
else:
|
||
sleep(1)
|
||
del image_msg
|
||
except Exception:
|
||
logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id)
|
||
finally:
|
||
logger.info("停止图片上传线程0, requestId:{}", request_id)
|
||
clear_queue(image_queue)
|
||
logger.info("停止图片上传线程1, requestId:{}", request_id)
|
||
|
||
|
||
def build_image_name(*args):
|
||
"""
|
||
{requestId}/{time_now}_frame-{current_frame}-{last_frame}_type_{random_num}-{mode_type}" \
|
||
"-{modeCode}-{target}_{image_type}.jpg
|
||
"""
|
||
current_frame, last_frame, mode_type, image_type, modeCode, target, request_id = args
|
||
random_num = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
|
||
time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S")
|
||
return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame,
|
||
random_num, mode_type, modeCode, target, image_type)
|
||
|
||
|
||
class ImageTypeImageFileUpload(Thread):
|
||
__slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg')
|
||
|
||
def __init__(self, *args):
|
||
super().__init__()
|
||
self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args
|
||
self._storage_source = self._context['service']['storage_source']
|
||
@staticmethod
|
||
def handle_image(det_xywh, copy_frame, font_config):
|
||
"""
|
||
det_xywh:{
|
||
'code':{
|
||
1: [[detect_targets_code, box, score, label_array, color]]
|
||
}
|
||
}
|
||
模型编号:modeCode
|
||
检测目标:detectTargetCode
|
||
"""
|
||
model_info = []
|
||
# 更加模型编码解析数据
|
||
for code, det_info in det_xywh.items():
|
||
if det_info is not None and len(det_info) > 0:
|
||
for cls, target_list in det_info.items():
|
||
if target_list is not None and len(target_list) > 0:
|
||
aiFrame = copy_frame.copy()
|
||
for target in target_list:
|
||
draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config)
|
||
model_info.append({
|
||
"modelCode": str(code),
|
||
"detectTargetCode": str(cls),
|
||
"frame": aiFrame
|
||
})
|
||
if len(model_info) > 0:
|
||
image_result = {
|
||
"or_frame": copy_frame,
|
||
"model_info": model_info,
|
||
"current_frame": 0,
|
||
"last_frame": 0
|
||
}
|
||
return image_result
|
||
return None
|
||
|
||
def run(self):
|
||
context, msg = self._context, self._msg
|
||
base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"]
|
||
logger.info("启动图片识别图片上传线程, requestId: {}", request_id)
|
||
image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type
|
||
service_timeout = int(context["service"]["timeout"])
|
||
with ThreadPoolExecutor(max_workers=2) as t:
|
||
try:
|
||
# 初始化oss客户端
|
||
if self._storage_source==1:
|
||
minioSdk = MinioSdk(base_dir, env, request_id )
|
||
else:
|
||
aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id)
|
||
|
||
start_time = time()
|
||
while True:
|
||
try:
|
||
if time() - start_time > service_timeout:
|
||
logger.error("图片上传进程运行超时, requestId: {}", request_id)
|
||
break
|
||
# 获取队列中的消息
|
||
image_msg = image_queue.get()
|
||
if image_msg is not None:
|
||
if image_msg[0] == 2:
|
||
if 'stop' == image_msg[1]:
|
||
logger.info("开始停止图片上传线程, requestId:{}", request_id)
|
||
break
|
||
if image_msg[0] == 1:
|
||
task, msg_list = [], []
|
||
det_xywh, image_url, copy_frame, font_config, result = image_msg[1]
|
||
if det_xywh is None:
|
||
ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"),
|
||
result.get("type"), request_id)
|
||
|
||
if self._storage_source==1:
|
||
ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name)
|
||
else:
|
||
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame)
|
||
|
||
task.append(ai_future)
|
||
msg_list.append(message_feedback(request_id,
|
||
AnalysisStatus.RUNNING.value,
|
||
analyse_type, "", "", "",
|
||
image_url,
|
||
ai_image_name,
|
||
result.get("modelCode"),
|
||
result.get("type"),
|
||
analyse_results=result))
|
||
else:
|
||
image_result = self.handle_image(det_xywh, copy_frame, font_config)
|
||
if image_result:
|
||
# 图片帧数编码
|
||
if image_url is None:
|
||
or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame"))
|
||
image_url = build_image_name(image_result.get("current_frame"),
|
||
image_result.get("last_frame"),
|
||
analyse_type,
|
||
"OR", "0", "O", request_id)
|
||
|
||
if self._storage_source==1:
|
||
or_future = t.submit(minioSdk.put_object, or_image,image_url)
|
||
else:
|
||
or_future = t.submit(aliyunOssSdk.put_object, image_url,
|
||
or_image.tobytes())
|
||
task.append(or_future)
|
||
model_info_list = image_result.get("model_info")
|
||
for model_info in model_info_list:
|
||
ai_result, ai_image = cv2.imencode(".jpg", model_info.get("frame"))
|
||
ai_image_name = build_image_name(image_result.get("current_frame"),
|
||
image_result.get("last_frame"),
|
||
analyse_type,
|
||
"AI",
|
||
model_info.get("modelCode"),
|
||
model_info.get("detectTargetCode"),
|
||
request_id)
|
||
if self._storage_source==1:
|
||
ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name)
|
||
else:
|
||
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name,
|
||
ai_image.tobytes())
|
||
task.append(ai_future)
|
||
msg_list.append(message_feedback(request_id,
|
||
AnalysisStatus.RUNNING.value,
|
||
analyse_type, "", "", "",
|
||
image_url,
|
||
ai_image_name,
|
||
model_info.get('modelCode'),
|
||
model_info.get('detectTargetCode'),
|
||
analyse_results=result))
|
||
for thread_result in task:
|
||
thread_result.result()
|
||
for msg in msg_list:
|
||
put_queue(fb_queue, msg, timeout=2, is_ex=False)
|
||
else:
|
||
sleep(1)
|
||
except Exception as e:
|
||
logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id)
|
||
finally:
|
||
clear_queue(image_queue)
|
||
logger.info("停止图片识别图片上传线程, requestId:{}", request_id)
|