|
- # -*- coding: utf-8 -*-
- import time
- import GPUtil
-
- from concurrency.FeedbackThread import FeedbackThread
- from entity.FeedBack import message_feedback
- from enums.AnalysisStatusEnum import AnalysisStatus
- from enums.AnalysisTypeEnum import AnalysisType
- from enums.ExceptionEnum import ExceptionType
- from exception.CustomerException import ServiceException
- from util import YmlUtils, FileUtils, LogUtils, KafkaUtils, TimeUtils
- from loguru import logger
- from multiprocessing import Queue
- from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \
- OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess
- from util import GPUtils
-
- '''
- 分发服务
- '''
-
-
- class DispatcherService:
-
- # 初始化
- def __init__(self):
- # 获取DSP环境所需要的配置
- self.content = YmlUtils.getConfigs()
- # 初始化日志
- LogUtils.init_log(self.content)
- # 检查视频保存地址,不存在创建文件夹,迁移初始化
- FileUtils.create_dir_not_exist(self.content["video"]["file_path"])
- # 记录当前正在执行的实时流分析任务
- self.onlineProcesses = {}
- # 记录当前正在执行的离线视频分析任务
- self.offlineProcesses = {}
- # 记录当前正在执行的图片分析任务
- self.photoProcesses = {}
- self.fbQueue = Queue()
- self.online_topic = self.content["kafka"]["topic"]["dsp-alg-online-tasks-topic"]
- self.offline_topic = self.content["kafka"]["topic"]["dsp-alg-offline-tasks-topic"]
- self.image_topic = self.content["kafka"]["topic"]["dsp-alg-image-tasks-topic"]
- self.topics = [self.online_topic, self.offline_topic, self.image_topic]
- self.analysisType = {
- self.online_topic: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y)),
- self.offline_topic: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y)),
- self.image_topic: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y))
- }
-
- # 服务调用启动方法
- def start_service(self):
- # 启动问题反馈线程
- feedbackThread = self.start_feedback_thread()
- # 初始化kafka监听者
- customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.content, topics=self.topics)
- print("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙")
- # 循环消息处理
- while True:
- # 检查任务进程运行情况,去除活动的任务
- self.check_process_task()
- # 校验问题反馈线程是否正常
- if not feedbackThread.is_alive():
- logger.error("======================问题反馈线程异常停止======================")
- break
- # 获取当前可用gpu使用数量
- gpu_ids = GPUtils.get_gpu_ids(self.content)
- if gpu_ids is not None and len(gpu_ids) > 0:
- msg = customerKafkaConsumer.poll()
- if msg is not None and len(msg) > 0:
- for k, v in msg.items():
- for m in v:
- message = m.value
- analysisType = self.analysisType.get(m.topic)[0]
- try:
- customerKafkaConsumer.commit_offset(m)
- logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
- m.topic, m.offset, m.partition, message, message.get("request_id"))
- self.analysisType.get(m.topic)[1](message, gpu_ids)
- except ServiceException as s:
- logger.exception("消息监听异常:{}, requestId: {}", s.msg, message.get("request_id"))
- if analysisType is not None:
- feedback = {
- "feedback": message_feedback(message.get("request_id"),
- AnalysisStatus.FAILED.value,
- analysisType,
- s.code,
- s.msg,
- analyse_time=TimeUtils.now_date_to_str())}
- self.fbQueue.put(message, feedback)
- except Exception as e:
- logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id"))
- if analysisType is not None:
- feedback = {
- "feedback": message_feedback(message.get("request_id"),
- AnalysisStatus.FAILED.value,
- analysisType,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- analyse_time=TimeUtils.now_date_to_str())}
- self.fbQueue.put(message, feedback)
- else:
- time.sleep(1)
- else:
- logger.info("当前可用gpu数量: {}", gpu_ids)
- GPUtil.showUtilization()
- time.sleep(5)
-
- # 开启实时进程
- def startOnlineProcess(self, msg, gpu_ids):
- # 相同的requestId不在执行
- if self.onlineProcesses.get(msg.get("request_id")):
- logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id"))
- return
- cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids}
- # 创建在线识别进程并启动
- oirp = OnlineIntelligentRecognitionProcess(cfg)
- oirp.start()
- # 记录请求与进程映射
- self.onlineProcesses[msg.get("request_id")] = oirp
-
- # 结束实时进程
- def stopOnlineProcess(self, msg):
- ps = self.onlineProcesses.get(msg.get("request_id"))
- if ps is None:
- logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id"))
- return
- ps.sendEvent({'command': 'stop'})
-
- # 检查实时、离线进程任务运行情况,去除不活动的任务
- def check_process_task(self):
- for requestId in list(self.onlineProcesses.keys()):
- if not self.onlineProcesses[requestId].is_alive():
- del self.onlineProcesses[requestId]
- for requestId in list(self.offlineProcesses.keys()):
- if not self.offlineProcesses[requestId].is_alive():
- del self.offlineProcesses[requestId]
- for requestId in list(self.photoProcesses.keys()):
- if not self.photoProcesses[requestId].is_alive():
- del self.photoProcesses[requestId]
-
- # 开启离线进程
- def startOfflineProcess(self, msg, gpu_ids):
- # 相同的requestId不在执行
- if self.offlineProcesses.get(msg.get("request_id")):
- logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id"))
- return
- cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids}
- # 创建在线识别进程并启动
- ofirp = OfflineIntelligentRecognitionProcess(cfg)
- ofirp.start()
- self.offlineProcesses[msg.get("request_id")] = ofirp
-
- # 结束离线进程
- def stopOfflineProcess(self, msg):
- ps = self.offlineProcesses.get(msg.get("request_id"))
- if ps is None:
- logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id"))
- return
- ps.sendEvent({'command': 'stop'})
-
- # 开启图片分析进程
- def startImageProcess(self, msg, gpu_ids):
- # 相同的requestId不在执行
- if self.photoProcesses.get(msg.get("request_id")):
- logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id"))
- return
- cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids}
- # 创建在线识别进程并启动
- imagep = PhotosIntelligentRecognitionProcess(cfg)
- imagep.start()
- self.photoProcesses[msg.get("request_id")] = imagep
-
- # 校验实时kafka消息
- def check_online_msg(self, msg):
- requestId = msg.get("request_id")
- command = msg.get("command")
- models = msg.get("models")
- pull_url = msg.get("pull_url")
- push_url = msg.get("push_url")
- results_base_dir = msg.get("results_base_dir")
- if command is None:
- return False
- if requestId is None:
- return False
- if command == "start" and models is None:
- return False
- if models is not None:
- for model in models:
- if model.get("code") is None:
- return False
- if model.get("categories") is None:
- return False
- if command == "start" and pull_url is None:
- return False
- if command == "start" and push_url is None:
- return False
- if command == "start" and results_base_dir is None:
- return False
- return True
-
- # 校验实时kafka消息
- def check_offline_msg(self, msg):
- requestId = msg.get("request_id")
- models = msg.get("models")
- command = msg.get("command")
- original_url = msg.get("original_url")
- original_type = msg.get("original_type")
- push_url = msg.get("push_url")
- results_base_dir = msg.get("results_base_dir")
- if command is None:
- return False
- if requestId is None:
- return False
- if command == 'start' and models is None:
- return False
- if models is not None:
- for model in models:
- if model.get("code") is None:
- return False
- if model.get("categories") is None:
- return False
- if command == 'start' and original_url is None:
- return False
- if command == 'start' and push_url is None:
- return False
- if command == 'start' and original_type is None:
- return False
- if command == 'start' and results_base_dir is None:
- return False
- return True
-
- # 校验图片kafka消息
- def check_image_msg(self, msg):
- requestId = msg.get("request_id")
- models = msg.get("models")
- command = msg.get("command")
- image_urls = msg.get("image_urls")
- results_base_dir = msg.get("results_base_dir")
- if command is None:
- return False
- if requestId is None:
- return False
- if command == 'start' and models is None:
- return False
- if models is not None:
- for model in models:
- if model.get("code") is None:
- return False
- if model.get("categories") is None:
- return False
- if command == 'start' and image_urls is None:
- return False
- if command == 'start' and results_base_dir is None:
- return False
- return True
-
- '''
- 开启问题反馈线程
- '''
- def start_feedback_thread(self):
- feedbackThread = FeedbackThread(self.fbQueue, self.content)
- feedbackThread.setDaemon(True)
- feedbackThread.start()
- return feedbackThread
-
- def online(self, message, gpu_ids):
- check_result = self.check_online_msg(message)
- if not check_result:
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
- if 'start' == message.get("command"):
- logger.info("开始实时分析")
- self.startOnlineProcess(message, gpu_ids)
- elif 'stop' == message.get("command"):
- self.stopOnlineProcess(message)
- else:
- pass
-
- def offline(self, message, gpu_ids):
- check_result = self.check_offline_msg(message)
- if not check_result:
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
- if 'start' == message.get("command"):
- logger.info("开始离线分析")
- self.startOfflineProcess(message, gpu_ids)
- time.sleep(3)
- elif 'stop' == message.get("command"):
- self.stopOfflineProcess(message)
- else:
- pass
-
- def image(self, message, gpu_ids):
- check_result = self.check_image_msg(message)
- if not check_result:
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
- if 'start' == message.get("command"):
- logger.info("开始图片分析")
- self.startImageProcess(message, gpu_ids)
- # elif 'stop' == message.get("command"):
- # self.stopImageProcess(message)
- else:
- pass
|