You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

305 satır
13KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import GPUtil
  4. from concurrency.FeedbackThread import FeedbackThread
  5. from entity.FeedBack import message_feedback
  6. from enums.AnalysisStatusEnum import AnalysisStatus
  7. from enums.AnalysisTypeEnum import AnalysisType
  8. from enums.ExceptionEnum import ExceptionType
  9. from exception.CustomerException import ServiceException
  10. from util import YmlUtils, FileUtils, LogUtils, KafkaUtils, TimeUtils
  11. from loguru import logger
  12. from multiprocessing import Queue
  13. from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \
  14. OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess
  15. from util import GPUtils
  16. '''
  17. 分发服务
  18. '''
  19. class DispatcherService:
  20. # 初始化
  21. def __init__(self):
  22. # 获取DSP环境所需要的配置
  23. self.content = YmlUtils.getConfigs()
  24. # 初始化日志
  25. LogUtils.init_log(self.content)
  26. # 检查视频保存地址,不存在创建文件夹,迁移初始化
  27. FileUtils.create_dir_not_exist(self.content["video"]["file_path"])
  28. # 记录当前正在执行的实时流分析任务
  29. self.onlineProcesses = {}
  30. # 记录当前正在执行的离线视频分析任务
  31. self.offlineProcesses = {}
  32. # 记录当前正在执行的图片分析任务
  33. self.photoProcesses = {}
  34. self.fbQueue = Queue()
  35. self.online_topic = self.content["kafka"]["topic"]["dsp-alg-online-tasks-topic"]
  36. self.offline_topic = self.content["kafka"]["topic"]["dsp-alg-offline-tasks-topic"]
  37. self.image_topic = self.content["kafka"]["topic"]["dsp-alg-image-tasks-topic"]
  38. self.topics = [self.online_topic, self.offline_topic, self.image_topic]
  39. self.analysisType = {
  40. self.online_topic: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y)),
  41. self.offline_topic: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y)),
  42. self.image_topic: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y))
  43. }
  44. # 服务调用启动方法
  45. def start_service(self):
  46. # 启动问题反馈线程
  47. feedbackThread = self.start_feedback_thread()
  48. # 初始化kafka监听者
  49. customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.content, topics=self.topics)
  50. print("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙")
  51. # 循环消息处理
  52. while True:
  53. # 检查任务进程运行情况,去除活动的任务
  54. self.check_process_task()
  55. # 校验问题反馈线程是否正常
  56. if not feedbackThread.is_alive():
  57. logger.error("======================问题反馈线程异常停止======================")
  58. break
  59. # 获取当前可用gpu使用数量
  60. gpu_ids = GPUtils.get_gpu_ids(self.content)
  61. if gpu_ids is not None and len(gpu_ids) > 0:
  62. msg = customerKafkaConsumer.poll()
  63. if msg is not None and len(msg) > 0:
  64. for k, v in msg.items():
  65. for m in v:
  66. message = m.value
  67. analysisType = self.analysisType.get(m.topic)[0]
  68. try:
  69. customerKafkaConsumer.commit_offset(m)
  70. logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
  71. m.topic, m.offset, m.partition, message, message.get("request_id"))
  72. self.analysisType.get(m.topic)[1](message, gpu_ids)
  73. except ServiceException as s:
  74. logger.exception("消息监听异常:{}, requestId: {}", s.msg, message.get("request_id"))
  75. if analysisType is not None:
  76. feedback = {
  77. "feedback": message_feedback(message.get("request_id"),
  78. AnalysisStatus.FAILED.value,
  79. analysisType,
  80. s.code,
  81. s.msg,
  82. analyse_time=TimeUtils.now_date_to_str())}
  83. self.fbQueue.put(message, feedback)
  84. except Exception as e:
  85. logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id"))
  86. if analysisType is not None:
  87. feedback = {
  88. "feedback": message_feedback(message.get("request_id"),
  89. AnalysisStatus.FAILED.value,
  90. analysisType,
  91. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  92. ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  93. analyse_time=TimeUtils.now_date_to_str())}
  94. self.fbQueue.put(message, feedback)
  95. else:
  96. time.sleep(1)
  97. else:
  98. logger.info("当前可用gpu数量: {}", gpu_ids)
  99. GPUtil.showUtilization()
  100. time.sleep(5)
  101. # 开启实时进程
  102. def startOnlineProcess(self, msg, gpu_ids):
  103. # 相同的requestId不在执行
  104. if self.onlineProcesses.get(msg.get("request_id")):
  105. logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id"))
  106. return
  107. cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids}
  108. # 创建在线识别进程并启动
  109. oirp = OnlineIntelligentRecognitionProcess(cfg)
  110. oirp.start()
  111. # 记录请求与进程映射
  112. self.onlineProcesses[msg.get("request_id")] = oirp
  113. # 结束实时进程
  114. def stopOnlineProcess(self, msg):
  115. ps = self.onlineProcesses.get(msg.get("request_id"))
  116. if ps is None:
  117. logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id"))
  118. return
  119. ps.sendEvent({'command': 'stop'})
  120. # 检查实时、离线进程任务运行情况,去除不活动的任务
  121. def check_process_task(self):
  122. for requestId in list(self.onlineProcesses.keys()):
  123. if not self.onlineProcesses[requestId].is_alive():
  124. del self.onlineProcesses[requestId]
  125. for requestId in list(self.offlineProcesses.keys()):
  126. if not self.offlineProcesses[requestId].is_alive():
  127. del self.offlineProcesses[requestId]
  128. for requestId in list(self.photoProcesses.keys()):
  129. if not self.photoProcesses[requestId].is_alive():
  130. del self.photoProcesses[requestId]
  131. # 开启离线进程
  132. def startOfflineProcess(self, msg, gpu_ids):
  133. # 相同的requestId不在执行
  134. if self.offlineProcesses.get(msg.get("request_id")):
  135. logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id"))
  136. return
  137. cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids}
  138. # 创建在线识别进程并启动
  139. ofirp = OfflineIntelligentRecognitionProcess(cfg)
  140. ofirp.start()
  141. self.offlineProcesses[msg.get("request_id")] = ofirp
  142. # 结束离线进程
  143. def stopOfflineProcess(self, msg):
  144. ps = self.offlineProcesses.get(msg.get("request_id"))
  145. if ps is None:
  146. logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id"))
  147. return
  148. ps.sendEvent({'command': 'stop'})
  149. # 开启图片分析进程
  150. def startImageProcess(self, msg, gpu_ids):
  151. # 相同的requestId不在执行
  152. if self.photoProcesses.get(msg.get("request_id")):
  153. logger.warning("重复任务,请稍后再试!requestId:{}", msg.get("request_id"))
  154. return
  155. cfg = {"fbQueue": self.fbQueue, "content": self.content, "msg": msg, "gpu_ids": gpu_ids}
  156. # 创建在线识别进程并启动
  157. imagep = PhotosIntelligentRecognitionProcess(cfg)
  158. imagep.start()
  159. self.photoProcesses[msg.get("request_id")] = imagep
  160. # 校验实时kafka消息
  161. def check_online_msg(self, msg):
  162. requestId = msg.get("request_id")
  163. command = msg.get("command")
  164. models = msg.get("models")
  165. pull_url = msg.get("pull_url")
  166. push_url = msg.get("push_url")
  167. results_base_dir = msg.get("results_base_dir")
  168. if command is None:
  169. return False
  170. if requestId is None:
  171. return False
  172. if command == "start" and models is None:
  173. return False
  174. if models is not None:
  175. for model in models:
  176. if model.get("code") is None:
  177. return False
  178. if model.get("categories") is None:
  179. return False
  180. if command == "start" and pull_url is None:
  181. return False
  182. if command == "start" and push_url is None:
  183. return False
  184. if command == "start" and results_base_dir is None:
  185. return False
  186. return True
  187. # 校验实时kafka消息
  188. def check_offline_msg(self, msg):
  189. requestId = msg.get("request_id")
  190. models = msg.get("models")
  191. command = msg.get("command")
  192. original_url = msg.get("original_url")
  193. original_type = msg.get("original_type")
  194. push_url = msg.get("push_url")
  195. results_base_dir = msg.get("results_base_dir")
  196. if command is None:
  197. return False
  198. if requestId is None:
  199. return False
  200. if command == 'start' and models is None:
  201. return False
  202. if models is not None:
  203. for model in models:
  204. if model.get("code") is None:
  205. return False
  206. if model.get("categories") is None:
  207. return False
  208. if command == 'start' and original_url is None:
  209. return False
  210. if command == 'start' and push_url is None:
  211. return False
  212. if command == 'start' and original_type is None:
  213. return False
  214. if command == 'start' and results_base_dir is None:
  215. return False
  216. return True
  217. # 校验图片kafka消息
  218. def check_image_msg(self, msg):
  219. requestId = msg.get("request_id")
  220. models = msg.get("models")
  221. command = msg.get("command")
  222. image_urls = msg.get("image_urls")
  223. results_base_dir = msg.get("results_base_dir")
  224. if command is None:
  225. return False
  226. if requestId is None:
  227. return False
  228. if command == 'start' and models is None:
  229. return False
  230. if models is not None:
  231. for model in models:
  232. if model.get("code") is None:
  233. return False
  234. if model.get("categories") is None:
  235. return False
  236. if command == 'start' and image_urls is None:
  237. return False
  238. if command == 'start' and results_base_dir is None:
  239. return False
  240. return True
  241. '''
  242. 开启问题反馈线程
  243. '''
  244. def start_feedback_thread(self):
  245. feedbackThread = FeedbackThread(self.fbQueue, self.content)
  246. feedbackThread.setDaemon(True)
  247. feedbackThread.start()
  248. return feedbackThread
  249. def online(self, message, gpu_ids):
  250. check_result = self.check_online_msg(message)
  251. if not check_result:
  252. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  253. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  254. if 'start' == message.get("command"):
  255. logger.info("开始实时分析")
  256. self.startOnlineProcess(message, gpu_ids)
  257. elif 'stop' == message.get("command"):
  258. self.stopOnlineProcess(message)
  259. else:
  260. pass
  261. def offline(self, message, gpu_ids):
  262. check_result = self.check_offline_msg(message)
  263. if not check_result:
  264. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  265. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  266. if 'start' == message.get("command"):
  267. logger.info("开始离线分析")
  268. self.startOfflineProcess(message, gpu_ids)
  269. time.sleep(3)
  270. elif 'stop' == message.get("command"):
  271. self.stopOfflineProcess(message)
  272. else:
  273. pass
  274. def image(self, message, gpu_ids):
  275. check_result = self.check_image_msg(message)
  276. if not check_result:
  277. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  278. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  279. if 'start' == message.get("command"):
  280. logger.info("开始图片分析")
  281. self.startImageProcess(message, gpu_ids)
  282. # elif 'stop' == message.get("command"):
  283. # self.stopImageProcess(message)
  284. else:
  285. pass