代码整理

This commit is contained in:
jiangchaoqing 2025-07-15 10:36:30 +08:00
parent 963ad31911
commit a82efd81e2
8 changed files with 10 additions and 2269 deletions

8
README.md Normal file
View File

@ -0,0 +1,8 @@
1.2025.01.21把之前的tuoheng alg仓库代码重新开个仓库 (1)在config/service/dsp_test_service.yml里面添加参数控制存储用的oss还是minio storage_source: 1 2.2025.02.06 (1)修改代码把mqtt读取加入到系统中。config/service/dsp_test_service.yml中添加mqtt_flag,决定是否启用。 (2)修改了minio情况下的文件名命名方式。 3.2025.02.12 (1)增加了对alg算法开发的代码。可以通过配置文件config/service/dsp_test_service.yml中algSwitch: true决定是否启用。
4、2025.07.10 周树亮 - 增加人群计数自研车牌模型裸土覆盖3个场景
5、江朝庆 -- 0715
1代码整理删除冗余代码。
2增加requirements.txt,方便部署
3) logs

View File

@ -1,305 +0,0 @@
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from time import sleep, time
from traceback import format_exc
from loguru import logger
import cv2
from entity.FeedBack import message_feedback
from enums.ExceptionEnum import ExceptionType
from exception.CustomerException import ServiceException
from util.AliyunSdk import AliyunOssSdk
from util.MinioSdk import MinioSdk
from util import TimeUtils
from enums.AnalysisStatusEnum import AnalysisStatus
from util.PlotsUtils import draw_painting_joint
from util.QueUtil import put_queue, get_no_block_queue, clear_queue
import io
class FileUpload(Thread):
__slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg')
def __init__(self, *args):
super().__init__()
self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args
self._storage_source = self._context['service']['storage_source']
class ImageFileUpload(FileUpload):
__slots__ = ()
@staticmethod
def handle_image(frame_msg, frame_step):
# (high_score_image["code"], all_frames, draw_config["font_config"])
# high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list)
det_xywh, frame, current_frame, all_frames, font_config = frame_msg
'''
det_xywh:{
'code':{
1: [[detect_targets_code, box, score, label_array, color]]
}
}
模型编号modeCode
检测目标detectTargetCode
'''
model_info = []
# 更加模型编码解析数据
for code, det_list in det_xywh.items():
if len(det_list) > 0:
for cls, target_list in det_list.items():
if len(target_list) > 0:
aFrame = frame.copy()
for target in target_list:
draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5])
model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame})
if len(model_info) > 0:
image_result = {
"or_frame": frame,
"model_info": model_info,
"current_frame": current_frame,
"last_frame": current_frame + frame_step
}
return image_result
return None
def run(self):
msg, context = self._msg, self._context
service = context["service"]
base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"]
logger.info("启动图片上传线程, requestId: {}", request_id)
image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type
service_timeout = int(service["timeout"])
frame_step = int(service["filter"]["frame_step"]) + 120
try:
with ThreadPoolExecutor(max_workers=2) as t:
# 初始化oss客户端
if self._storage_source==1:
minioSdk = MinioSdk(base_dir, env, request_id )
else:
aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id)
start_time = time()
while True:
try:
if time() - start_time > service_timeout:
logger.error("图片上传线程运行超时, requestId: {}", request_id)
break
raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
# 获取队列中的消息
image_msg = get_no_block_queue(image_queue)
if image_msg is not None:
if image_msg[0] == 2:
if 'stop' == image_msg[1]:
logger.info("开始停止图片上传线程, requestId:{}", request_id)
break
if image_msg[0] == 1:
image_result = self.handle_image(image_msg[1], frame_step)
if image_result is not None:
task = []
or_image = cv2.imencode(".jpg", image_result["or_frame"])[1]
or_image_name = build_image_name(image_result["current_frame"],
image_result["last_frame"],
analyse_type,
"OR", "0", "0", request_id)
if self._storage_source==1:
or_future = t.submit(minioSdk.put_object, or_image,or_image_name)
else:
or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes())
task.append(or_future)
model_info_list = image_result["model_info"]
msg_list = []
for model_info in model_info_list:
ai_image = cv2.imencode(".jpg", model_info["aFrame"])[1]
ai_image_name = build_image_name(image_result["current_frame"],
image_result["last_frame"],
analyse_type,
"AI",
model_info["modelCode"],
model_info["detectTargetCode"],
request_id)
if self._storage_source==1:
ai_future = t.submit(minioSdk.put_object, ai_image,
ai_image_name)
else:
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name,
ai_image.tobytes())
task.append(ai_future)
msg_list.append(message_feedback(request_id,
AnalysisStatus.RUNNING.value,
analyse_type, "", "", "",
or_image_name,
ai_image_name,
model_info['modelCode'],
model_info['detectTargetCode']))
for tk in task:
tk.result()
for msg in msg_list:
put_queue(fb_queue, msg, timeout=2, is_ex=False)
del task, msg_list
else:
sleep(1)
del image_msg
except Exception:
logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id)
finally:
logger.info("停止图片上传线程0, requestId:{}", request_id)
clear_queue(image_queue)
logger.info("停止图片上传线程1, requestId:{}", request_id)
def build_image_name(*args):
"""
{requestId}/{time_now}_frame-{current_frame}-{last_frame}_type_{random_num}-{mode_type}" \
"-{modeCode}-{target}_{image_type}.jpg
"""
current_frame, last_frame, mode_type, image_type, modeCode, target, request_id = args
random_num = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S")
return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame,
random_num, mode_type, modeCode, target, image_type)
class ImageTypeImageFileUpload(Thread):
__slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg')
def __init__(self, *args):
super().__init__()
self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args
self._storage_source = self._context['service']['storage_source']
@staticmethod
def handle_image(det_xywh, copy_frame, font_config):
"""
det_xywh:{
'code':{
1: [[detect_targets_code, box, score, label_array, color]]
}
}
模型编号modeCode
检测目标detectTargetCode
"""
model_info = []
# 更加模型编码解析数据
for code, det_info in det_xywh.items():
if det_info is not None and len(det_info) > 0:
for cls, target_list in det_info.items():
if target_list is not None and len(target_list) > 0:
aiFrame = copy_frame.copy()
for target in target_list:
draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config)
model_info.append({
"modelCode": str(code),
"detectTargetCode": str(cls),
"frame": aiFrame
})
if len(model_info) > 0:
image_result = {
"or_frame": copy_frame,
"model_info": model_info,
"current_frame": 0,
"last_frame": 0
}
return image_result
return None
def run(self):
context, msg = self._context, self._msg
base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"]
logger.info("启动图片识别图片上传线程, requestId: {}", request_id)
image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type
service_timeout = int(context["service"]["timeout"])
with ThreadPoolExecutor(max_workers=2) as t:
try:
# 初始化oss客户端
if self._storage_source==1:
minioSdk = MinioSdk(base_dir, env, request_id )
else:
aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id)
start_time = time()
while True:
try:
if time() - start_time > service_timeout:
logger.error("图片上传进程运行超时, requestId: {}", request_id)
break
# 获取队列中的消息
image_msg = image_queue.get()
if image_msg is not None:
if image_msg[0] == 2:
if 'stop' == image_msg[1]:
logger.info("开始停止图片上传线程, requestId:{}", request_id)
break
if image_msg[0] == 1:
task, msg_list = [], []
det_xywh, image_url, copy_frame, font_config, result = image_msg[1]
if det_xywh is None:
ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"),
result.get("type"), request_id)
if self._storage_source==1:
ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name)
else:
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame)
task.append(ai_future)
msg_list.append(message_feedback(request_id,
AnalysisStatus.RUNNING.value,
analyse_type, "", "", "",
image_url,
ai_image_name,
result.get("modelCode"),
result.get("type"),
analyse_results=result))
else:
image_result = self.handle_image(det_xywh, copy_frame, font_config)
if image_result:
# 图片帧数编码
if image_url is None:
or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame"))
image_url = build_image_name(image_result.get("current_frame"),
image_result.get("last_frame"),
analyse_type,
"OR", "0", "O", request_id)
if self._storage_source==1:
or_future = t.submit(minioSdk.put_object, or_image,image_url)
else:
or_future = t.submit(aliyunOssSdk.put_object, image_url,
or_image.tobytes())
task.append(or_future)
model_info_list = image_result.get("model_info")
for model_info in model_info_list:
ai_result, ai_image = cv2.imencode(".jpg", model_info.get("frame"))
ai_image_name = build_image_name(image_result.get("current_frame"),
image_result.get("last_frame"),
analyse_type,
"AI",
model_info.get("modelCode"),
model_info.get("detectTargetCode"),
request_id)
if self._storage_source==1:
ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name)
else:
ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name,
ai_image.tobytes())
task.append(ai_future)
msg_list.append(message_feedback(request_id,
AnalysisStatus.RUNNING.value,
analyse_type, "", "", "",
image_url,
ai_image_name,
model_info.get('modelCode'),
model_info.get('detectTargetCode'),
analyse_results=result))
for thread_result in task:
thread_result.result()
for msg in msg_list:
put_queue(fb_queue, msg, timeout=2, is_ex=False)
else:
sleep(1)
except Exception as e:
logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id)
finally:
clear_queue(image_queue)
logger.info("停止图片识别图片上传线程, requestId:{}", request_id)

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,6 @@ log_name: "dsp.log"
log_fmt: "{time:YYYY-MM-DD HH:mm:ss.SSS} [{level}][{process.name}-{process.id}-{thread.name}-{thread.id}][{line}] {module}-{function} - {message}"
level: "INFO"
rotation: "00:00"
retention: "7 days"
retention: "15 days"
encoding: "utf8"

View File

@ -5,6 +5,6 @@ log_name: "dsp.log"
log_fmt: "{time:YYYY-MM-DD HH:mm:ss.SSS} [{level}][{process.name}-{process.id}-{thread.name}-{thread.id}][{line}] {module}-{function} - {message}"
level: "INFO"
rotation: "00:00"
retention: "3 days"
retention: "7 days"
encoding: "utf8"

View File

@ -1,11 +0,0 @@
1.2025.01.21把之前的tuoheng alg仓库代码重新开个仓库
(1)在config/service/dsp_test_service.yml里面添加参数控制存储用的oss还是minio
storage_source: 1
2.2025.02.06
(1)修改代码把mqtt读取加入到系统中。config/service/dsp_test_service.yml中添加mqtt_flag,决定是否启用。
(2)修改了minio情况下的文件名命名方式。
3.2025.02.12
(1)增加了对alg算法开发的代码。可以通过配置文件config/service/dsp_test_service.yml中algSwitch: true决定是否启用。
4、2025.07.10
周树亮 - 增加人群计数自研车牌模型裸土覆盖3个场景

View File

@ -1,507 +0,0 @@
# -*- coding: utf-8 -*-
import time,os
from os.path import join
from traceback import format_exc
import json
from cerberus import Validator
from common.Constant import ONLINE_START_SCHEMA, ONLINE_STOP_SCHEMA, OFFLINE_START_SCHEMA, OFFLINE_STOP_SCHEMA, \
IMAGE_SCHEMA, RECORDING_START_SCHEMA, RECORDING_STOP_SCHEMA, PULL2PUSH_START_SCHEMA, PULL2PUSH_STOP_SCHEMA
from common.YmlConstant import service_yml_path, kafka_yml_path
from concurrency.FeedbackThread import FeedbackThread
from concurrency.uploadGPU import uploadGPUinfos
from concurrency.IntelligentRecognitionProcess2 import OnlineIntelligentRecognitionProcess2, \
OfflineIntelligentRecognitionProcess2, PhotosIntelligentRecognitionProcess2
from concurrency.Pull2PushStreamProcess import PushStreamProcess
from entity.FeedBack import message_feedback, recording_feedback, pull_stream_feedback
from enums.AnalysisStatusEnum import AnalysisStatus
from enums.AnalysisTypeEnum import AnalysisType
from enums.ExceptionEnum import ExceptionType
from enums.ModelTypeEnum import ModelMethodTypeEnum, ModelType
from enums.RecordingStatusEnum import RecordingStatus
from enums.StatusEnum import PushStreamStatus, ExecuteStatus
from exception.CustomerException import ServiceException
from loguru import logger
from multiprocessing import Queue
from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \
OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess, ScreenRecordingProcess
from util.CpuUtils import print_cpu_ex_status
from util.FileUtils import create_dir_not_exist
from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available,select_best_server
from util.KafkaUtils import CustomerKafkaConsumer
from util.QueUtil import put_queue
from util.RWUtils import getConfigs
from kafka import KafkaProducer, KafkaConsumer
'''
分发服务
'''
class DispatcherService:
__slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics','__taskType', '__task_type',
'__kafka_config', '__recordingProcesses', '__pull2PushProcesses','__topicsPort','__gpuTopic','__role','__uploadGPUThread','__gpuDics','__producer')
def __init__(self, base_dir, env):
# 检测cuda是否活动
check_cude_is_available()
# 获取全局上下文配置
self.__context = getConfigs(join(base_dir, service_yml_path % env))
# 创建任务执行, 视频保存路径
create_dir_not_exist(join(base_dir, self.__context["video"]["file_path"]))
# 将根路径和环境设置到上下文中
self.__context["base_dir"], self.__context["env"] = base_dir, env
# 问题反馈线程
self.__feedbackThread,self.__uploadGPUThread, self.__fbQueue = None,None, Queue()
# 实时、离线、图片任务进程字典
self.__listeningProcesses = {}
# 录屏任务进程字典
self.__recordingProcesses = {}
# 转推流任务进程字典
self.__pull2PushProcesses = {}
self.__kafka_config = getConfigs(join(base_dir, kafka_yml_path % env))
self.__producer = KafkaProducer(
bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'))
self.__gpuDics = { }#用于存储gpu信息的字典
self.__role = self.__context["role"]
self.__topics = [
self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic
self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic
self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic
self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic
self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic
]
self.__topicsPort = [
self.__kafka_config["topicPort"]["dsp-alg-online-tasks-topic"], # 实时监听topic
self.__kafka_config["topicPort"]["dsp-alg-offline-tasks-topic"], # 离线监听topic
self.__kafka_config["topicPort"]["dsp-alg-image-tasks-topic"], # 图片监听topic
self.__kafka_config["topicPort"]["dsp-recording-task-topic"], # 录屏监听topic
self.__kafka_config["topicPort"]["dsp-push-stream-task-topic"] # 推流监听topic
]
self.__gpuTopic = [self.__kafka_config["topicGPU"]]
if self.__role==1:
self.__topics = self.__topics + self.__topicsPort + self.__gpuTopic
# 对应topic的各个lambda表达式
self.__task_type = {
self.__topics[0]: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y),
lambda x, y, z: self.identify_method(x, y, z)),
self.__topics[1]: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y),
lambda x, y, z: self.identify_method(x, y, z)),
self.__topics[2]: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y),
lambda x, y, z: self.identify_method(x, y, z)),
self.__topics[3]: (AnalysisType.RECORDING.value, lambda x, y: self.recording(x, y),
lambda x, y, z: self.recording_method(x, y, z)),
self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y),
lambda x, y, z: self.push_stream_method(x, y, z))
}
self.__taskType={
self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"]:0, # 实时监听topic
self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"]:1, # 离线监听topic
self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"]:2, # 图片监听topic
self.__kafka_config["topic"]["dsp-recording-task-topic"]:3, # 录屏监听topic
self.__kafka_config["topic"]["dsp-push-stream-task-topic"]:4 # 推流监听topic
}
gpu_name_array = get_first_gpu_name()
gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array]
gpu_name = '2080Ti'
if len(gpu_array) > 0:
if gpu_array[0] != '2080':
gpu_name = gpu_array[0]
else:
raise Exception("GPU资源不在提供的模型所支持的范围内请先提供对应的GPU模型")
logger.info("当前服务环境为: {}, 服务器GPU使用型号: {}", env, gpu_name)
self.__context["gpu_name"] = gpu_name
self.start_service()
# 服务调用启动方法
def start_service(self):
# 初始化kafka监听者
customerKafkaConsumer = CustomerKafkaConsumer(self.__kafka_config, topics=self.__topics)
####增加一个线程用于试试监控和发送gpu状态####
####
logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 服务器IP:{}".format(self.__kafka_config['bootstrap_servers'] ))
while True:
try:
# 检查任务进程运行情况,去除结束的任务
self.check_process_task()
# 启动反馈线程
self.start_feedback_thread()
self.start_uploadGPU_thread()
msg = customerKafkaConsumer.poll()
if msg is not None and len(msg) > 0:
for k, v in msg.items():
for m in v:
message = m.value
#如果收到的信息是gpu状态的话收到信息后更新自己的gpu服务器状态下面不再执行
if m.topic in self.__gpuTopic:
customerKafkaConsumer.commit_offset(m,'x'*16,False)
#更新机器资源现状
ip = message['System']['Local IP Address']
self.__gpuDics[ip]=message
continue
#如果收到的信息是门户消息收到信息后要根据Gpu状态转发到对应的机器。
elif m.topic in self.__topicsPort:
customerKafkaConsumer.commit_offset(m, 'y'*16)
#状态分析
#recondGpu={'hostname':'thsw2','IP':'192.168.10.66','gpuId':0}
recondGpu= select_best_server(self.__gpuDics)
if recondGpu is None:
print( 'recondGpu',recondGpu, ' self.__gpuDics: ',self.__gpuDics,' topic:',m.topic, ' message:',message )
continue
#转发消息
message['transmit_topic'] = m.topic + '-' + recondGpu['IP']
transmitMsg={'transmit':message}
msg_json = json.dumps( message )
future = self.__producer.send( message['transmit_topic'] ,msg_json)
try:
future.get(timeout=2)
logger.info( "转发消息成功消息topic:{},消息内容:{}",message['transmit_topic'],message )
except kafka_errors as e:
print('------transmitted error:',e)
logger.info("转发消息失败")
traceback.format_exc()
else:
requestId = message.get("request_id")
if requestId is None:
logger.error("请求参数格式错误, 请检查请求体格式是否正确message:%s"%(message))
continue
customerKafkaConsumer.commit_offset(m, requestId)
logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
m.topic, m.offset, m.partition, message, requestId)
message['taskType']=self.__taskType[m.topic]
topic_method = self.__task_type[m.topic]
topic_method[2](topic_method[1], message, topic_method[0])
else:
print_gpu_ex_status()
print_cpu_ex_status(self.__context["base_dir"])
time.sleep(1)
except Exception:
logger.error("主线程异常:{}", format_exc())
def identify_method(self, handle_method, message, analysisType):
try:
check_cude_is_available()
handle_method(message, analysisType)
except ServiceException as s:
logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"])
put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType,
s.code, s.msg), timeout=1)
except Exception:
logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"])
put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType,
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1)
finally:
del message
def push_stream_method(self, handle_method, message, analysisType):
try:
check_cude_is_available()
handle_method(message, analysisType)
except ServiceException as s:
logger.error("消息监听异常:{}, requestId: {}", s.msg, message['request_id'])
videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in
message.get("video_urls", []) if url.get("id") is not None]
put_queue(self.__fbQueue, pull_stream_feedback(message['request_id'], ExecuteStatus.FAILED.value[0],
s.code, s.msg, videoInfo), timeout=1)
except Exception:
logger.error("消息监听异常:{}, requestId: {}", format_exc(), message['request_id'])
videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in
message.get("video_urls", []) if url.get("id") is not None]
put_queue(self.__fbQueue, pull_stream_feedback(message.get("request_id"), ExecuteStatus.FAILED.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1], videoInfo),
timeout=1)
finally:
del message
def recording_method(self, handle_method, message, analysisType):
try:
check_cude_is_available()
handle_method(message, analysisType)
except ServiceException as s:
logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"])
put_queue(self.__fbQueue,
recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0],
error_code=s.code, error_msg=s.msg), timeout=1)
except Exception:
logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"])
put_queue(self.__fbQueue,
recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1)
finally:
del message
# 开启实时进程
def startOnlineProcess(self, msg, analysisType):
if self.__listeningProcesses.get(msg["request_id"]):
logger.warning("实时重复任务请稍后再试requestId:{}", msg["request_id"])
return
model_type = self.__context["service"]["model"]["model_type"]
codes = [model.get("code") for model in msg["models"] if model.get("code")]
if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes:
coir = OnlineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context)
else:
coir = OnlineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context)
coir.start()
logger.info("开始实时进程requestId:{},pid:{}, ppid:{}", msg["request_id"],os.getpid(),os.getppid())
self.__listeningProcesses[msg["request_id"]] = coir
# 结束实时进程
def stopOnlineProcess(self, msg):
ps = self.__listeningProcesses.get(msg["request_id"])
if ps is None:
logger.warning("未查询到该任务无法停止任务requestId:{}", msg["request_id"])
return
ps.sendEvent({"command": "stop"})
# 新增该函数用于向子任务发送命令algStartalgStop
def sendCmdToChildProcess(self, msg,cmd="algStart"):
ps = self.__listeningProcesses.get(msg["request_id"])
if ps is None:
logger.warning("未查询到该任务无法停止任务requestId:{}", msg["request_id"])
return
ps.sendEvent({"command": cmd})
@staticmethod
def check_process(listeningProcess):
for requestId in list(listeningProcess.keys()):
if not listeningProcess[requestId].is_alive():
del listeningProcess[requestId]
def check_process_task(self):
self.check_process(self.__listeningProcesses)
self.check_process(self.__recordingProcesses)
self.check_process(self.__pull2PushProcesses)
# 开启离线进程
def startOfflineProcess(self, msg, analysisType):
if self.__listeningProcesses.get(msg["request_id"]):
logger.warning("离线重复任务请稍后再试requestId:{}", msg["request_id"])
return
model_type = self.__context["service"]["model"]["model_type"]
codes = [model.get("code") for model in msg["models"] if model.get("code")]
if ModelMethodTypeEnum.NORMAL.value == model_type:
first = OfflineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context)
else:
first = OfflineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context)
first.start()
self.__listeningProcesses[msg["request_id"]] = first
# 结束离线进程
def stopOfflineProcess(self, msg):
ps = self.__listeningProcesses.get(msg["request_id"])
if ps is None:
logger.warning("未查询到该任务无法停止任务requestId:{}", msg["request_id"])
return
ps.sendEvent({"command": "stop"})
# 开启图片分析进程
def startImageProcess(self, msg, analysisType):
pp = self.__listeningProcesses.get(msg["request_id"])
if pp is not None:
logger.warning("重复任务请稍后再试requestId:{}", msg["request_id"])
return
model_type = self.__context["service"]["model"]["model_type"]
codes = [model.get("code") for model in msg["models"] if model.get("code")]
if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes:
imaged = PhotosIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context)
else:
imaged = PhotosIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context)
# 创建在线识别进程并启动
imaged.start()
self.__listeningProcesses[msg["request_id"]] = imaged
'''
校验kafka消息
'''
@staticmethod
def check_msg(msg, schema):
try:
v = Validator(schema, allow_unknown=True)
result = v.validate(msg)
if not result:
logger.error("参数校验异常: {}, requestId: {}", v.errors, msg["request_id"])
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
except ServiceException as s:
raise s
except Exception:
logger.error("参数校验异常: {}, requestId: {}", format_exc(), msg["request_id"])
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
'''
开启反馈线程用于发送消息
'''
def start_feedback_thread(self):
if self.__feedbackThread is None:
self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config)
self.__feedbackThread.setDaemon(True)
self.__feedbackThread.start()
time.sleep(1)
if self.__feedbackThread and not self.__feedbackThread.is_alive():
logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!")
self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config)
self.__feedbackThread.setDaemon(True)
self.__feedbackThread.start()
time.sleep(1)
def start_uploadGPU_thread(self):
if self.__uploadGPUThread is None:
self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config)
self.__uploadGPUThread.setDaemon(True)
self.__uploadGPUThread.start()
time.sleep(1)
if self.__uploadGPUThread and not self.__uploadGPUThread.is_alive():
logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!")
self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config)
self.__uploadGPUThread.setDaemon(True)
self.__uploadGPUThread.start()
time.sleep(1)
'''
在线分析逻辑
'''
def online0(self, message, analysisType):
if "start" == message.get("command"):
self.check_msg(message, ONLINE_START_SCHEMA)
if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]):
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startOnlineProcess(message, analysisType)
elif message.get("command") in ["algStart","algStop"]:
self.sendCmdToChildProcess(message,cmd=message.get("command"))
elif "stop" == message.get("command"):
self.check_msg(message, ONLINE_STOP_SCHEMA)
self.stopOnlineProcess(message)
else:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
def online(self, message, analysisType):
if "start" == message.get("command"):
self.check_msg(message, ONLINE_START_SCHEMA)
if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]):
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startOnlineProcess(message, analysisType)
elif message.get("command") in ["algStart","algStop"]:
if message.get("defaultEnabled",True):
self.sendCmdToChildProcess(message,cmd=message.get("command"))
elif "stop" == message.get("command"):
self.check_msg(message, ONLINE_STOP_SCHEMA)
self.stopOnlineProcess(message)
else:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
def offline(self, message, analysisType):
if "start" == message.get("command"):
self.check_msg(message, OFFLINE_START_SCHEMA)
if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]):
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startOfflineProcess(message, analysisType)
elif message.get("command") in ["algStart","algStop"]:
self.sendCmdToChildProcess( message,cmd=message.get("command"))
elif "stop" == message.get("command"):
self.check_msg(message, OFFLINE_STOP_SCHEMA)
self.stopOfflineProcess(message)
else:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
def image(self, message, analysisType):
if "start" == message.get("command"):
self.check_msg(message, IMAGE_SCHEMA)
if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["image"]["limit"]):
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startImageProcess(message, analysisType)
else:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
def recording(self, message, analysisType):
if "start" == message.get("command"):
self.check_msg(message, RECORDING_START_SCHEMA)
if len(self.__recordingProcesses) >= int(self.__context['service']["task"]["limit"]):
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startRecordingProcess(message, analysisType)
elif "stop" == message.get("command"):
self.check_msg(message, RECORDING_STOP_SCHEMA)
self.stopRecordingProcess(message)
else:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
# 开启录屏进程
def startRecordingProcess(self, msg, analysisType):
if self.__listeningProcesses.get(msg["request_id"]):
logger.warning("重复任务请稍后再试requestId:{}", msg["request_id"])
return
srp = ScreenRecordingProcess(self.__fbQueue, self.__context, msg, analysisType)
srp.start()
self.__recordingProcesses[msg["request_id"]] = srp
# 结束录屏进程
def stopRecordingProcess(self, msg):
rdp = self.__recordingProcesses.get(msg["request_id"])
if rdp is None:
logger.warning("未查询到该任务无法停止任务requestId:{}", msg["request_id"])
return
rdp.sendEvent({"command": "stop"})
def pullStream(self, message, analysisType):
if "start" == message.get("command"):
self.check_msg(message, PULL2PUSH_START_SCHEMA)
if len(self.__pull2PushProcesses) >= int(self.__context['service']["task"]["limit"]):
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startPushStreamProcess(message, analysisType)
elif "stop" == message.get("command"):
self.check_msg(message, PULL2PUSH_STOP_SCHEMA)
self.stopPushStreamProcess(message)
else:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
def startPushStreamProcess(self, msg, analysisType):
if self.__pull2PushProcesses.get(msg["request_id"]):
logger.warning("重复任务请稍后再试requestId:{}", msg["request_id"])
return
srp = PushStreamProcess(self.__fbQueue, self.__context, msg, analysisType)
srp.start()
self.__pull2PushProcesses[msg["request_id"]] = srp
# 结束录屏进程
def stopPushStreamProcess(self, msg):
srp = self.__pull2PushProcesses.get(msg["request_id"])
if srp is None:
logger.warning("未查询到该任务无法停止任务requestId:{}", msg["request_id"])
return
srp.sendEvent({"command": "stop", "videoIds": msg.get("video_ids", [])})