# -*- coding: utf-8 -*- import time from os.path import join from traceback import format_exc from cerberus import Validator from common.Constant import ONLINE_START_SCHEMA, ONLINE_STOP_SCHEMA, OFFLINE_START_SCHEMA, OFFLINE_STOP_SCHEMA, \ IMAGE_SCHEMA, RECORDING_START_SCHEMA, RECORDING_STOP_SCHEMA, PULL2PUSH_START_SCHEMA, PULL2PUSH_STOP_SCHEMA from common.YmlConstant import service_yml_path, kafka_yml_path from concurrency.FeedbackThread import FeedbackThread from concurrency.IntelligentRecognitionProcess2 import OnlineIntelligentRecognitionProcess2, \ OfflineIntelligentRecognitionProcess2, PhotosIntelligentRecognitionProcess2 from concurrency.Pull2PushStreamProcess import PushStreamProcess from entity.FeedBack import message_feedback, recording_feedback, pull_stream_feedback from enums.AnalysisStatusEnum import AnalysisStatus from enums.AnalysisTypeEnum import AnalysisType from enums.ExceptionEnum import ExceptionType from enums.ModelTypeEnum import ModelMethodTypeEnum, ModelType from enums.RecordingStatusEnum import RecordingStatus from enums.StatusEnum import PushStreamStatus, ExecuteStatus from exception.CustomerException import ServiceException from loguru import logger from multiprocessing import Queue from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \ OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess, ScreenRecordingProcess from util.CpuUtils import print_cpu_ex_status from util.FileUtils import create_dir_not_exist from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available from util.KafkaUtils import CustomerKafkaConsumer from util.QueUtil import put_queue from util.RWUtils import getConfigs ''' 分发服务 ''' class DispatcherService: __slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics', '__task_type', '__kafka_config', '__recordingProcesses', '__pull2PushProcesses') def __init__(self, base_dir, env): # 检测cuda是否活动 check_cude_is_available() # 获取全局上下文配置 self.__context = getConfigs(join(base_dir, service_yml_path % env)) # 创建任务执行, 视频保存路径 create_dir_not_exist(join(base_dir, self.__context["video"]["file_path"])) # 将根路径和环境设置到上下文中 self.__context["base_dir"], self.__context["env"] = base_dir, env # 问题反馈线程 self.__feedbackThread, self.__fbQueue = None, Queue() # 实时、离线、图片任务进程字典 self.__listeningProcesses = {} # 录屏任务进程字典 self.__recordingProcesses = {} # 转推流任务进程字典 self.__pull2PushProcesses = {} self.__kafka_config = getConfigs(join(base_dir, kafka_yml_path % env)) self.__topics = ( self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic ) # 对应topic的各个lambda表达式 self.__task_type = { self.__topics[0]: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y), lambda x, y, z: self.identify_method(x, y, z)), self.__topics[1]: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y), lambda x, y, z: self.identify_method(x, y, z)), self.__topics[2]: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y), lambda x, y, z: self.identify_method(x, y, z)), self.__topics[3]: (AnalysisType.RECORDING.value, lambda x, y: self.recording(x, y), lambda x, y, z: self.recording_method(x, y, z)), self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y), lambda x, y, z: self.push_stream_method(x, y, z)) } gpu_name_array = get_first_gpu_name() gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array] gpu_name = '2080Ti' if len(gpu_array) > 0: if gpu_array[0] != '2080': gpu_name = gpu_array[0] else: raise Exception("GPU资源不在提供的模型所支持的范围内!请先提供对应的GPU模型!") logger.info("当前服务环境为: {}, 服务器GPU使用型号: {}", env, gpu_name) self.__context["gpu_name"] = gpu_name self.start_service() # 服务调用启动方法 def start_service(self): # 初始化kafka监听者 customerKafkaConsumer = CustomerKafkaConsumer(self.__kafka_config, topics=self.__topics) logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙") while True: try: # 检查任务进程运行情况,去除结束的任务 self.check_process_task() # 启动反馈线程 self.start_feedback_thread() msg = customerKafkaConsumer.poll() if msg is not None and len(msg) > 0: for k, v in msg.items(): for m in v: message = m.value requestId = message.get("request_id") if requestId is None: logger.error("请求参数格式错误, 请检查请求体格式是否正确!") continue customerKafkaConsumer.commit_offset(m, requestId) logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", m.topic, m.offset, m.partition, message, requestId) topic_method = self.__task_type[m.topic] topic_method[2](topic_method[1], message, topic_method[0]) else: print_gpu_ex_status() print_cpu_ex_status(self.__context["base_dir"]) time.sleep(1) except Exception: logger.error("主线程异常:{}", format_exc()) def identify_method(self, handle_method, message, analysisType): try: check_cude_is_available() handle_method(message, analysisType) except ServiceException as s: logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"]) put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType, s.code, s.msg), timeout=1) except Exception: logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"]) put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType, ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1) finally: del message def push_stream_method(self, handle_method, message, analysisType): try: check_cude_is_available() handle_method(message, analysisType) except ServiceException as s: logger.error("消息监听异常:{}, requestId: {}", s.msg, message['request_id']) videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in message.get("video_urls", []) if url.get("id") is not None] put_queue(self.__fbQueue, pull_stream_feedback(message['request_id'], ExecuteStatus.FAILED.value[0], s.code, s.msg, videoInfo), timeout=1) except Exception: logger.error("消息监听异常:{}, requestId: {}", format_exc(), message['request_id']) videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in message.get("video_urls", []) if url.get("id") is not None] put_queue(self.__fbQueue, pull_stream_feedback(message.get("request_id"), ExecuteStatus.FAILED.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1], videoInfo), timeout=1) finally: del message def recording_method(self, handle_method, message, analysisType): try: check_cude_is_available() handle_method(message, analysisType) except ServiceException as s: logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"]) put_queue(self.__fbQueue, recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0], error_code=s.code, error_msg=s.msg), timeout=1) except Exception: logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"]) put_queue(self.__fbQueue, recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1) finally: del message # 开启实时进程 def startOnlineProcess(self, msg, analysisType): if self.__listeningProcesses.get(msg["request_id"]): logger.warning("实时重复任务,请稍后再试!requestId:{}", msg["request_id"]) return model_type = self.__context["service"]["model"]["model_type"] codes = [model.get("code") for model in msg["models"] if model.get("code")] if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes: coir = OnlineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context) else: coir = OnlineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context) coir.start() self.__listeningProcesses[msg["request_id"]] = coir # 结束实时进程 def stopOnlineProcess(self, msg): ps = self.__listeningProcesses.get(msg["request_id"]) if ps is None: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) return ps.sendEvent({"command": "stop"}) @staticmethod def check_process(listeningProcess): for requestId in list(listeningProcess.keys()): if not listeningProcess[requestId].is_alive(): del listeningProcess[requestId] def check_process_task(self): self.check_process(self.__listeningProcesses) self.check_process(self.__recordingProcesses) self.check_process(self.__pull2PushProcesses) # 开启离线进程 def startOfflineProcess(self, msg, analysisType): if self.__listeningProcesses.get(msg["request_id"]): logger.warning("离线重复任务,请稍后再试!requestId:{}", msg["request_id"]) return model_type = self.__context["service"]["model"]["model_type"] codes = [model.get("code") for model in msg["models"] if model.get("code")] if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes: first = OfflineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context) else: first = OfflineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context) first.start() self.__listeningProcesses[msg["request_id"]] = first # 结束离线进程 def stopOfflineProcess(self, msg): ps = self.__listeningProcesses.get(msg["request_id"]) if ps is None: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) return ps.sendEvent({"command": "stop"}) # 开启图片分析进程 def startImageProcess(self, msg, analysisType): pp = self.__listeningProcesses.get(msg["request_id"]) if pp is not None: logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"]) return model_type = self.__context["service"]["model"]["model_type"] codes = [model.get("code") for model in msg["models"] if model.get("code")] if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes: imaged = PhotosIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context) else: imaged = PhotosIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context) # 创建在线识别进程并启动 imaged.start() self.__listeningProcesses[msg["request_id"]] = imaged ''' 校验kafka消息 ''' @staticmethod def check_msg(msg, schema): try: v = Validator(schema, allow_unknown=True) result = v.validate(msg) if not result: logger.error("参数校验异常: {}, requestId: {}", v.errors, msg["request_id"]) raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) except ServiceException as s: raise s except Exception: logger.error("参数校验异常: {}, requestId: {}", format_exc(), msg["request_id"]) raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) ''' 开启反馈线程,用于发送消息 ''' def start_feedback_thread(self): if self.__feedbackThread is None: self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config) self.__feedbackThread.setDaemon(True) self.__feedbackThread.start() time.sleep(1) if self.__feedbackThread and not self.__feedbackThread.is_alive(): logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!") self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config) self.__feedbackThread.setDaemon(True) self.__feedbackThread.start() time.sleep(1) ''' 在线分析逻辑 ''' def online(self, message, analysisType): if "start" == message.get("command"): self.check_msg(message, ONLINE_START_SCHEMA) if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]): raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startOnlineProcess(message, analysisType) elif "stop" == message.get("command"): self.check_msg(message, ONLINE_STOP_SCHEMA) self.stopOnlineProcess(message) else: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) def offline(self, message, analysisType): if "start" == message.get("command"): self.check_msg(message, OFFLINE_START_SCHEMA) if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]): raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startOfflineProcess(message, analysisType) elif "stop" == message.get("command"): self.check_msg(message, OFFLINE_STOP_SCHEMA) self.stopOfflineProcess(message) else: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) def image(self, message, analysisType): if "start" == message.get("command"): self.check_msg(message, IMAGE_SCHEMA) if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["image"]["limit"]): raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startImageProcess(message, analysisType) else: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) def recording(self, message, analysisType): if "start" == message.get("command"): self.check_msg(message, RECORDING_START_SCHEMA) if len(self.__recordingProcesses) >= int(self.__context['service']["task"]["limit"]): raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startRecordingProcess(message, analysisType) elif "stop" == message.get("command"): self.check_msg(message, RECORDING_STOP_SCHEMA) self.stopRecordingProcess(message) else: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) # 开启录屏进程 def startRecordingProcess(self, msg, analysisType): if self.__listeningProcesses.get(msg["request_id"]): logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"]) return srp = ScreenRecordingProcess(self.__fbQueue, self.__context, msg, analysisType) srp.start() self.__recordingProcesses[msg["request_id"]] = srp # 结束录屏进程 def stopRecordingProcess(self, msg): rdp = self.__recordingProcesses.get(msg["request_id"]) if rdp is None: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) return rdp.sendEvent({"command": "stop"}) def pullStream(self, message, analysisType): if "start" == message.get("command"): self.check_msg(message, PULL2PUSH_START_SCHEMA) if len(self.__pull2PushProcesses) >= int(self.__context['service']["task"]["limit"]): raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startPushStreamProcess(message, analysisType) elif "stop" == message.get("command"): self.check_msg(message, PULL2PUSH_STOP_SCHEMA) self.stopPushStreamProcess(message) else: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) def startPushStreamProcess(self, msg, analysisType): if self.__pull2PushProcesses.get(msg["request_id"]): logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"]) return srp = PushStreamProcess(self.__fbQueue, self.__context, msg, analysisType) srp.start() self.__pull2PushProcesses[msg["request_id"]] = srp # 结束录屏进程 def stopPushStreamProcess(self, msg): srp = self.__pull2PushProcesses.get(msg["request_id"]) if srp is None: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) return srp.sendEvent({"command": "stop", "videoIds": msg.get("video_ids", [])})