更新 concurrency/IntelligentRecognitionProcess.py

This commit is contained in:
zhoushuliang 2025-07-10 17:03:24 +08:00
parent 9f6c1eb8db
commit 57de938d7c
1 changed files with 151 additions and 48 deletions

View File

@ -91,8 +91,7 @@ class IntelligentRecognitionProcess(Process):
hb_thread.start()
return hb_thread
class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
@ -229,7 +228,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
# 事件队列、拉流队列、心跳队列、反馈队列
event_queue, pull_queue, hb_queue, fb_queue = self.event_queue, self._pull_queue, self._hb_queue, self._fb_queue
# 推流队列、推流异常队列、图片队列
push_queue, push_ex_queue, image_queue = self._push_queue, self._push_ex_queue, self._image_queue
try:
@ -237,16 +236,15 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
init_log(base_dir, env)
# 打印启动日志
logger.info("开始启动实时分析进程requestId: {}", request_id)
# 启动拉流进程(包含拉流线程, 图片上传线程,mqtt读取线程)
# 拉流进程初始化时间长, 先启动
pull_process = self.start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, 25)
#print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0,
# 启动心跳线程
hb_thread = self.start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context)
#print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0,
# print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0,
# 加载算法模型
model_array = get_model(msg, context, analyse_type)
#print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #9.5
@ -273,7 +271,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
raise ServiceException(push_status[1], push_status[2])
# 获取停止指令
event_result = get_no_block_queue(event_queue)
if event_result:
cmdStr = event_result.get("command")
#接收到算法开启、或者关闭的命令
@ -281,7 +279,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
logger.info("发送向推流进程发送算法命令, requestId: {}, {}", request_id,cmdStr )
put_queue(push_queue, (2, cmdStr), timeout=1, is_ex=True)
pull_process.sendCommand({"command": cmdStr})
# 接收到停止指令
if "stop" == cmdStr:
logger.info("实时任务开始停止, requestId: {}", request_id)
@ -301,20 +299,31 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
task_status[0] = 1
for i, model in enumerate(model_array):
model_conf, code = model
model_param = model_conf[1]
# (modeType, model_param, allowedList, names, rainbows)
MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0],
model_conf)
if draw_config.get("font_config") is None:
draw_config["font_config"] = model_param['font_config']
if draw_config.get(code) is None:
draw_config[code] = {}
draw_config[code]["allowedList"] = model_conf[2]
draw_config[code]["rainbows"] = model_conf[4]
draw_config[code]["label_arrays"] = model_param['label_arraylist']
if "label_dict" in model_param:
draw_config[code]["label_dict"] = model_param['label_dict']
#print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno)
if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code) or \
ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code):
if draw_config.get(code) is None:
draw_config[code] = {}
draw_config["font_config"] = model_conf[4]
draw_config[code]["allowedList"] = 0
draw_config[code]["label_arrays"] = [None]
draw_config[code]["rainbows"] = model_conf[4]
else:
model_param = model_conf[1]
# (modeType, model_param, allowedList, names, rainbows)
MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0],
model_conf)
if draw_config.get("font_config") is None:
draw_config["font_config"] = model_param['font_config']
if draw_config.get(code) is None:
draw_config[code] = {}
draw_config[code]["allowedList"] = model_conf[2]
draw_config[code]["rainbows"] = model_conf[4]
draw_config[code]["label_arrays"] = model_param['label_arraylist']
if "label_dict" in model_param:
draw_config[code]["label_dict"] = model_param['label_dict']
# print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno)
# 多线程并发处理, 经过测试两个线程最优
det_array = []
for i, frame in enumerate(frame_list):
@ -437,23 +446,23 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
__slots__ = ()
def upload_video(self,base_dir, env, request_id, aiFilePath):
aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id)
if self._storage_source==1:
minioSdk = MinioSdk(base_dir, env, request_id )
upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "ai_online_%s.mp4" % request_id)
else:
aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id)
upload_video_thread_ai.setDaemon(True)
upload_video_thread_ai.start()
ai_url = upload_video_thread_ai.get_result()
return ai_url
'''
@staticmethod
def upload_video(base_dir, env, request_id, aiFilePath):
@ -602,7 +611,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
logger.info("发送向推流进程发送算法命令, requestId: {}, {}", request_id,cmdStr )
put_queue(push_queue, (2, cmdStr), timeout=1, is_ex=True)
pull_process.sendCommand({"command": cmdStr})
pull_result = get_no_block_queue(pull_queue)
if pull_result is None:
sleep(1)
@ -616,19 +625,31 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
task_status[0] = 1
for i, model in enumerate(model_array):
model_conf, code = model
model_param = model_conf[1]
# (modeType, model_param, allowedList, names, rainbows)
MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0],
model_conf)
if draw_config.get("font_config") is None:
draw_config["font_config"] = model_param['font_config']
if draw_config.get(code) is None:
draw_config[code] = {}
draw_config[code]["allowedList"] = model_conf[2]
draw_config[code]["rainbows"] = model_conf[4]
draw_config[code]["label_arrays"] = model_param['label_arraylist']
if "label_dict" in model_param:
draw_config[code]["label_dict"] = model_param['label_dict']
if ModelType.CITY_CARPLATE_MODEL.value[1] == str(code) or \
ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1] == str(code):
if draw_config.get(code) is None:
draw_config[code] = {}
draw_config["font_config"] = model_conf[4]
draw_config[code]["allowedList"] = 0
draw_config[code]["label_arrays"] = [None]
draw_config[code]["rainbows"] = model_conf[4]
else:
model_param = model_conf[1]
# (modeType, model_param, allowedList, names, rainbows)
MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0],
model_conf)
if draw_config.get("font_config") is None:
draw_config["font_config"] = model_param['font_config']
if draw_config.get(code) is None:
draw_config[code] = {}
draw_config[code]["allowedList"] = model_conf[2]
draw_config[code]["rainbows"] = model_conf[4]
draw_config[code]["label_arrays"] = model_param['label_arraylist']
if "label_dict" in model_param:
draw_config[code]["label_dict"] = model_param['label_dict']
det_array = []
for i, frame in enumerate(frame_list):
det_result = t.submit(self.obj_det, self, model_array, frame, task_status,
@ -922,6 +943,63 @@ class PhotosIntelligentRecognitionProcess(Process):
logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id)
raise e
#自研究车牌模型
def carplate_rec(self, imageUrl, mod, image_queue, request_id):
try:
# model_conf modeType, allowedList, detpar, ocrmodel, rainbows
model_conf, code = mod
modeType, device, modelList, detpar, rainbows = model_conf
image = url2Array(imageUrl)
dets = {code: {}}
# param = [image, new_device, model, par, img_type, request_id]
# model_conf, frame, device, requestId
dataBack = MODEL_CONFIG[code][3]([[modeType, device, modelList, detpar], image, request_id])[0][2]
dets[code][0] = dataBack
if not dataBack:
logger.info("车牌识别为空")
# for ai_result in dataBack:
# label, box = ai_result
# color = rainbows
if len(dataBack) > 0:
put_queue(image_queue, (1, (dets, imageUrl, image, rainbows, "")), timeout=2, is_ex=False)
except ServiceException as s:
raise s
except Exception as e:
logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id)
raise e
#密集人群计数
def denscrowdcount_rec(self, imageUrl, mod, image_queue, request_id):
try:
# model_conf modeType, allowedList, detpar, ocrmodel, rainbows
model_conf, code = mod
modeType, device, model, postPar, rainbows = model_conf
image = url2Array(imageUrl)
dets = {code: {}}
# param = [image, new_device, model, par, img_type, request_id]
# model_conf, frame, device, requestId
dataBack = MODEL_CONFIG[code][3]([[modeType, device, model, postPar], image, request_id])[0][2]
logger.info("当前人数:{}", dataBack[0][0])
dets[code][0] = dataBack
if not dataBack:
logger.info("当前页面无人")
# for ai_result in dataBack:
# label, box = ai_result
# color = rainbows
if len(dataBack) > 0:
put_queue(image_queue, (1, (dets, imageUrl, image, rainbows, '')), timeout=2, is_ex=False)
except ServiceException as s:
raise s
except Exception as e:
logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id)
raise e
'''
# 防疫模型
'''
@ -936,6 +1014,26 @@ class PhotosIntelligentRecognitionProcess(Process):
for r in obj_list:
r.result(60)
# 自研车牌识别模型:
def carpalteRec(self, imageUrls, model, image_queue, request_id):
with ThreadPoolExecutor(max_workers=2) as t:
obj_list = []
for imageUrl in imageUrls:
obj = t.submit(self.carplate_rec, imageUrl, model, image_queue, request_id)
obj_list.append(obj)
for r in obj_list:
r.result(60)
# 密集人群计数CITY_DENSECROWDCOUNT_MODEL
def denscrowdcountRec(self, imageUrls, model, image_queue, request_id):
with ThreadPoolExecutor(max_workers=2) as t:
obj_list = []
for imageUrl in imageUrls:
obj = t.submit(self.denscrowdcount_rec, imageUrl, model, image_queue, request_id)
obj_list.append(obj)
for r in obj_list:
r.result(60)
def image_recognition(self, imageUrl, mod, image_queue, logo, request_id):
try:
model_conf, code = mod
@ -1125,7 +1223,7 @@ class PhotosIntelligentRecognitionProcess(Process):
except requests.exceptions.RequestException as e:
# 捕获请求过程中可能出现的异常(如网络问题、超时等)
return False,str(e)
def run(self):
fb_queue, msg, analyse_type, context = self._fb_queue, self._msg, self._analyse_type, self._context
request_id, logo, image_queue = msg["request_id"], context['logo'], self._image_queue
@ -1136,7 +1234,7 @@ class PhotosIntelligentRecognitionProcess(Process):
valFlag=True
for url in imageUrls:
valFlag,ret = self.check_ImageUrl_Vaild(url,timeout=1)
if not valFlag:
logger.error("图片分析异常: {}, requestId:{},url:{}",ret, request_id,url)
#print("AnalysisStatus.FAILED.value:{}ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0]:{},ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1]:{}".format(AnalysisStatus.FAILED.value,ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0],ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1] ) )
@ -1168,6 +1266,14 @@ class PhotosIntelligentRecognitionProcess(Process):
elif model[1] == ModelType.PLATE_MODEL.value[1]:
result = t.submit(self.epidemicPrevention, imageUrls, model, base_dir, env, request_id)
task_list.append(result)
# 自研车牌模型
elif model[1] == ModelType.CITY_CARPLATE_MODEL.value[1]:
result = t.submit(self.carpalteRec, imageUrls, model, image_queue, request_id)
task_list.append(result)
# 人群计数模型
elif model[1] == ModelType.CITY_DENSECROWDCOUNT_MODEL.value[1]:
result = t.submit(self.denscrowdcountRec, imageUrls, model, image_queue, request_id)
task_list.append(result)
else:
result = t.submit(self.publicIdentification, imageUrls, model, image_queue, logo, request_id)
task_list.append(result)
@ -1214,7 +1320,7 @@ class ScreenRecordingProcess(Process):
put_queue(self._fb_queue,
recording_feedback(self._msg["request_id"], RecordingStatus.RECORDING_WAITING.value[0]),
timeout=1, is_ex=True)
self._storage_source = self._context['service']['storage_source']
self._storage_source = self._context['service']['storage_source']
def sendEvent(self, result):
put_queue(self._event_queue, result, timeout=2, is_ex=True)
@ -1380,9 +1486,6 @@ class ScreenRecordingProcess(Process):
clear_queue(self._hb_queue)
clear_queue(self._pull_queue)
def upload_video(self,base_dir, env, request_id, orFilePath):
if self._storage_source==1:
minioSdk = MinioSdk(base_dir, env, request_id )
@ -1390,7 +1493,7 @@ class ScreenRecordingProcess(Process):
else:
aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id)
upload_video_thread_ai.setDaemon(True)
upload_video_thread_ai.start()
or_url = upload_video_thread_ai.get_result()