# -*- coding: utf-8 -*- import time import GPUtil from concurrency.FeedbackThread import FeedbackThread from entity.FeedBack import message_feedback from enums.AnalysisStatusEnum import AnalysisStatus from enums.AnalysisTypeEnum import AnalysisType from enums.ExceptionEnum import ExceptionType from exception.CustomerException import ServiceException from util import YmlUtils, FileUtils, LogUtils, KafkaUtils, TimeUtils from loguru import logger from multiprocessing import Queue from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \ OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess from util import GPUtils ''' 分发服务 ''' class DispatcherService: # 初始化 def __init__(self): # 获取DSP环境所需要的配置 self.content = YmlUtils.getConfigs() # 初始化日志 LogUtils.init_log(self.content) # 检查视频保存地址,不存在创建文件夹,迁移初始化 FileUtils.create_dir_not_exist(self.content["video"]["file_path"]) # 记录当前正在执行的实时流分析任务 self.onlineProcesses = {} # 记录当前正在执行的离线视频分析任务 self.offlineProcesses = {} # 记录当前正在执行的图片分析任务 self.photoProcesses = {} self.fbQueue = Queue() self.online_topic = self.content["kafka"]["topic"]["dsp-alg-online-tasks-topic"] self.offline_topic = self.content["kafka"]["topic"]["dsp-alg-offline-tasks-topic"] self.image_topic = self.content["kafka"]["topic"]["dsp-alg-image-tasks-topic"] self.topics = [self.online_topic, self.offline_topic, self.image_topic] self.analysisType = { self.online_topic: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y)), self.offline_topic: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y)), self.image_topic: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y)) } # 服务调用启动方法 def start_service(self): # 启动问题反馈线程 feedbackThread = self.start_feedback_thread() # 初始化kafka监听者 customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.content, topics=self.topics) print("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙") # 循环消息处理 while True: # 检查任务进程运行情况,去除活动的任务 self.check_process_task() # 校验问题反馈线程是否正常 if not feedbackThread.is_alive(): logger.error("======================问题反馈线程异常停止======================") break # 获取当前可用gpu使用数量 gpu_ids = GPUtils.get_gpu_ids(self.content) if gpu_ids is not None and len(gpu_ids) > 0: msg = customerKafkaConsumer.poll() if msg is not None and len(msg) > 0: for k, v in msg.items(): for m in v: message = m.value analysisType = self.analysisType.get(m.topic)[0] try: customerKafkaConsumer.commit_offset(m) logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", m.topic, m.offset, m.partition, message, message.get("request_id")) self.analysisType.get(m.topic)[1](message, gpu_ids) except ServiceException as s: logger.exception("消息监听异常:{}, requestId: {}", s.msg, message.get("request_id")) if analysisType is not None: feedback = { "feedback": message_feedback(message.get("request_id"), AnalysisStatus.FAILED.value, analysisType, s.code, s.msg, analyse_time=TimeUtils.now_date_to_str())} self.fbQueue.put(message, feedback) except Exception as e: logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id")) if analysisType is not None: feedback = { "feedback": message_feedback(message.get("request_id"), AnalysisStatus.FAILED.value, analysisType, ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1], analyse_time=TimeUtils.now_date_to_str())} self.fbQueue.put(message, feedback) else: time.sleep(1) else: logger.info("当前可用gpu数量: {}", gpu_ids) GPUtil.showUtilization() time.sleep(5) # 开启实时进程 def startOnlineProcess(self, msg, gpu_ids): # 相同的requestId不在执行 if self.onlineProcesses.get(msg.get("request_id")): logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id")) return cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids} # 创建在线识别进程并启动 oirp = OnlineIntelligentRecognitionProcess(cfg) oirp.start() # 记录请求与进程映射 self.onlineProcesses[msg.get("request_id")] = oirp # 结束实时进程 def stopOnlineProcess(self, msg): ps = self.onlineProcesses.get(msg.get("request_id")) if ps is None: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id")) return ps.sendEvent({'command': 'stop'}) # 检查实时、离线进程任务运行情况,去除不活动的任务 def check_process_task(self): for requestId in list(self.onlineProcesses.keys()): if not self.onlineProcesses[requestId].is_alive(): del self.onlineProcesses[requestId] for requestId in list(self.offlineProcesses.keys()): if not self.offlineProcesses[requestId].is_alive(): del self.offlineProcesses[requestId] for requestId in list(self.photoProcesses.keys()): if not self.photoProcesses[requestId].is_alive(): del self.photoProcesses[requestId] # 开启离线进程 def startOfflineProcess(self, msg, gpu_ids): # 相同的requestId不在执行 if self.offlineProcesses.get(msg.get("request_id")): logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id")) return cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids} # 创建在线识别进程并启动 ofirp = OfflineIntelligentRecognitionProcess(cfg) ofirp.start() self.offlineProcesses[msg.get("request_id")] = ofirp # 结束离线进程 def stopOfflineProcess(self, msg): ps = self.offlineProcesses.get(msg.get("request_id")) if ps is None: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id")) return ps.sendEvent({'command': 'stop'}) # 开启图片分析进程 def startImageProcess(self, msg, gpu_ids): # 相同的requestId不在执行 if self.photoProcesses.get(msg.get("request_id")): logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id")) return cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids} # 创建在线识别进程并启动 imagep = PhotosIntelligentRecognitionProcess(cfg) imagep.start() self.photoProcesses[msg.get("request_id")] = imagep # 校验实时kafka消息 def check_online_msg(self, msg): requestId = msg.get("request_id") command = msg.get("command") models = msg.get("models") pull_url = msg.get("pull_url") push_url = msg.get("push_url") results_base_dir = msg.get("results_base_dir") if command is None: return False if requestId is None: return False if command == "start" and models is None: return False if models is not None: for model in models: if model.get("code") is None: return False if model.get("categories") is None: return False if command == "start" and pull_url is None: return False if command == "start" and push_url is None: return False if command == "start" and results_base_dir is None: return False return True # 校验实时kafka消息 def check_offline_msg(self, msg): requestId = msg.get("request_id") models = msg.get("models") command = msg.get("command") original_url = msg.get("original_url") original_type = msg.get("original_type") push_url = msg.get("push_url") results_base_dir = msg.get("results_base_dir") if command is None: return False if requestId is None: return False if command == 'start' and models is None: return False if models is not None: for model in models: if model.get("code") is None: return False if model.get("categories") is None: return False if command == 'start' and original_url is None: return False if command == 'start' and push_url is None: return False if command == 'start' and original_type is None: return False if command == 'start' and results_base_dir is None: return False return True # 校验图片kafka消息 def check_image_msg(self, msg): requestId = msg.get("request_id") models = msg.get("models") command = msg.get("command") image_urls = msg.get("image_urls") results_base_dir = msg.get("results_base_dir") if command is None: return False if requestId is None: return False if command == 'start' and models is None: return False if models is not None: for model in models: if model.get("code") is None: return False if model.get("categories") is None: return False if command == 'start' and image_urls is None: return False if command == 'start' and results_base_dir is None: return False return True ''' 开启问题反馈线程 ''' def start_feedback_thread(self): feedbackThread = FeedbackThread(self.fbQueue, self.content) feedbackThread.setDaemon(True) feedbackThread.start() return feedbackThread def online(self, message, gpu_ids): check_result = self.check_online_msg(message) if not check_result: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) if 'start' == message.get("command"): logger.info("开始实时分析") self.startOnlineProcess(message, gpu_ids) elif 'stop' == message.get("command"): self.stopOnlineProcess(message) else: pass def offline(self, message, gpu_ids): check_result = self.check_offline_msg(message) if not check_result: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) if 'start' == message.get("command"): logger.info("开始离线分析") self.startOfflineProcess(message, gpu_ids) time.sleep(3) elif 'stop' == message.get("command"): self.stopOfflineProcess(message) else: pass def image(self, message, gpu_ids): check_result = self.check_image_msg(message) if not check_result: raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) if 'start' == message.get("command"): logger.info("开始图片分析") self.startImageProcess(message, gpu_ids) # elif 'stop' == message.get("command"): # self.stopImageProcess(message) else: pass