You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
20KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from os.path import join
  4. from traceback import format_exc
  5. from cerberus import Validator
  6. from common.Constant import ONLINE_START_SCHEMA, ONLINE_STOP_SCHEMA, OFFLINE_START_SCHEMA, OFFLINE_STOP_SCHEMA, \
  7. IMAGE_SCHEMA, RECORDING_START_SCHEMA, RECORDING_STOP_SCHEMA, PULL2PUSH_START_SCHEMA, PULL2PUSH_STOP_SCHEMA
  8. from common.YmlConstant import service_yml_path, kafka_yml_path
  9. from concurrency.FeedbackThread import FeedbackThread
  10. from concurrency.IntelligentRecognitionProcess2 import OnlineIntelligentRecognitionProcess2, \
  11. OfflineIntelligentRecognitionProcess2, PhotosIntelligentRecognitionProcess2
  12. from concurrency.Pull2PushStreamProcess import PushStreamProcess
  13. from entity.FeedBack import message_feedback, recording_feedback, pull_stream_feedback
  14. from enums.AnalysisStatusEnum import AnalysisStatus
  15. from enums.AnalysisTypeEnum import AnalysisType
  16. from enums.ExceptionEnum import ExceptionType
  17. from enums.ModelTypeEnum import ModelMethodTypeEnum, ModelType
  18. from enums.RecordingStatusEnum import RecordingStatus
  19. from enums.StatusEnum import PushStreamStatus, ExecuteStatus
  20. from exception.CustomerException import ServiceException
  21. from loguru import logger
  22. from multiprocessing import Queue
  23. from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \
  24. OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess, ScreenRecordingProcess
  25. from util.CpuUtils import print_cpu_ex_status
  26. from util.FileUtils import create_dir_not_exist
  27. from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available
  28. from util.KafkaUtils import CustomerKafkaConsumer
  29. from util.QueUtil import put_queue
  30. from util.RWUtils import getConfigs
  31. '''
  32. 分发服务
  33. '''
  34. class DispatcherService:
  35. __slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics', '__task_type',
  36. '__kafka_config', '__recordingProcesses', '__pull2PushProcesses')
  37. def __init__(self, base_dir, env):
  38. # 检测cuda是否活动
  39. check_cude_is_available()
  40. # 获取全局上下文配置
  41. self.__context = getConfigs(join(base_dir, service_yml_path % env))
  42. # 创建任务执行, 视频保存路径
  43. create_dir_not_exist(join(base_dir, self.__context["video"]["file_path"]))
  44. # 将根路径和环境设置到上下文中
  45. self.__context["base_dir"], self.__context["env"] = base_dir, env
  46. # 问题反馈线程
  47. self.__feedbackThread, self.__fbQueue = None, Queue()
  48. # 实时、离线、图片任务进程字典
  49. self.__listeningProcesses = {}
  50. # 录屏任务进程字典
  51. self.__recordingProcesses = {}
  52. # 转推流任务进程字典
  53. self.__pull2PushProcesses = {}
  54. self.__kafka_config = getConfigs(join(base_dir, kafka_yml_path % env))
  55. self.__topics = (
  56. self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic
  57. self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic
  58. self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic
  59. self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic
  60. self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic
  61. )
  62. # 对应topic的各个lambda表达式
  63. self.__task_type = {
  64. self.__topics[0]: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y),
  65. lambda x, y, z: self.identify_method(x, y, z)),
  66. self.__topics[1]: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y),
  67. lambda x, y, z: self.identify_method(x, y, z)),
  68. self.__topics[2]: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y),
  69. lambda x, y, z: self.identify_method(x, y, z)),
  70. self.__topics[3]: (AnalysisType.RECORDING.value, lambda x, y: self.recording(x, y),
  71. lambda x, y, z: self.recording_method(x, y, z)),
  72. self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y),
  73. lambda x, y, z: self.push_stream_method(x, y, z))
  74. }
  75. gpu_name_array = get_first_gpu_name()
  76. gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array]
  77. gpu_name = '2080Ti'
  78. if len(gpu_array) > 0:
  79. if gpu_array[0] != '2080':
  80. gpu_name = gpu_array[0]
  81. else:
  82. raise Exception("GPU资源不在提供的模型所支持的范围内!请先提供对应的GPU模型!")
  83. logger.info("当前服务环境为: {}, 服务器GPU使用型号: {}", env, gpu_name)
  84. self.__context["gpu_name"] = gpu_name
  85. self.start_service()
  86. # 服务调用启动方法
  87. def start_service(self):
  88. # 初始化kafka监听者
  89. customerKafkaConsumer = CustomerKafkaConsumer(self.__kafka_config, topics=self.__topics)
  90. logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙")
  91. while True:
  92. try:
  93. # 检查任务进程运行情况,去除结束的任务
  94. self.check_process_task()
  95. # 启动反馈线程
  96. self.start_feedback_thread()
  97. msg = customerKafkaConsumer.poll()
  98. if msg is not None and len(msg) > 0:
  99. for k, v in msg.items():
  100. for m in v:
  101. message = m.value
  102. requestId = message.get("request_id")
  103. if requestId is None:
  104. logger.error("请求参数格式错误, 请检查请求体格式是否正确!")
  105. continue
  106. customerKafkaConsumer.commit_offset(m, requestId)
  107. logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
  108. m.topic, m.offset, m.partition, message, requestId)
  109. topic_method = self.__task_type[m.topic]
  110. topic_method[2](topic_method[1], message, topic_method[0])
  111. else:
  112. print_gpu_ex_status()
  113. print_cpu_ex_status(self.__context["base_dir"])
  114. time.sleep(1)
  115. except Exception:
  116. logger.error("主线程异常:{}", format_exc())
  117. def identify_method(self, handle_method, message, analysisType):
  118. try:
  119. check_cude_is_available()
  120. handle_method(message, analysisType)
  121. except ServiceException as s:
  122. logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"])
  123. put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType,
  124. s.code, s.msg), timeout=1)
  125. except Exception:
  126. logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"])
  127. put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType,
  128. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  129. ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1)
  130. finally:
  131. del message
  132. def push_stream_method(self, handle_method, message, analysisType):
  133. try:
  134. check_cude_is_available()
  135. handle_method(message, analysisType)
  136. except ServiceException as s:
  137. logger.error("消息监听异常:{}, requestId: {}", s.msg, message['request_id'])
  138. videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in
  139. message.get("video_urls", []) if url.get("id") is not None]
  140. put_queue(self.__fbQueue, pull_stream_feedback(message['request_id'], ExecuteStatus.FAILED.value[0],
  141. s.code, s.msg, videoInfo), timeout=1)
  142. except Exception:
  143. logger.error("消息监听异常:{}, requestId: {}", format_exc(), message['request_id'])
  144. videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in
  145. message.get("video_urls", []) if url.get("id") is not None]
  146. put_queue(self.__fbQueue, pull_stream_feedback(message.get("request_id"), ExecuteStatus.FAILED.value[0],
  147. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  148. ExceptionType.SERVICE_INNER_EXCEPTION.value[1], videoInfo),
  149. timeout=1)
  150. finally:
  151. del message
  152. def recording_method(self, handle_method, message, analysisType):
  153. try:
  154. check_cude_is_available()
  155. handle_method(message, analysisType)
  156. except ServiceException as s:
  157. logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"])
  158. put_queue(self.__fbQueue,
  159. recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0],
  160. error_code=s.code, error_msg=s.msg), timeout=1)
  161. except Exception:
  162. logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"])
  163. put_queue(self.__fbQueue,
  164. recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0],
  165. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  166. ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1)
  167. finally:
  168. del message
  169. # 开启实时进程
  170. def startOnlineProcess(self, msg, analysisType):
  171. if self.__listeningProcesses.get(msg["request_id"]):
  172. logger.warning("实时重复任务,请稍后再试!requestId:{}", msg["request_id"])
  173. return
  174. model_type = self.__context["service"]["model"]["model_type"]
  175. codes = [model.get("code") for model in msg["models"] if model.get("code")]
  176. if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes:
  177. coir = OnlineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context)
  178. else:
  179. coir = OnlineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context)
  180. coir.start()
  181. self.__listeningProcesses[msg["request_id"]] = coir
  182. # 结束实时进程
  183. def stopOnlineProcess(self, msg):
  184. ps = self.__listeningProcesses.get(msg["request_id"])
  185. if ps is None:
  186. logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"])
  187. return
  188. ps.sendEvent({"command": "stop"})
  189. @staticmethod
  190. def check_process(listeningProcess):
  191. for requestId in list(listeningProcess.keys()):
  192. if not listeningProcess[requestId].is_alive():
  193. del listeningProcess[requestId]
  194. def check_process_task(self):
  195. self.check_process(self.__listeningProcesses)
  196. self.check_process(self.__recordingProcesses)
  197. self.check_process(self.__pull2PushProcesses)
  198. # 开启离线进程
  199. def startOfflineProcess(self, msg, analysisType):
  200. if self.__listeningProcesses.get(msg["request_id"]):
  201. logger.warning("离线重复任务,请稍后再试!requestId:{}", msg["request_id"])
  202. return
  203. model_type = self.__context["service"]["model"]["model_type"]
  204. codes = [model.get("code") for model in msg["models"] if model.get("code")]
  205. if ModelMethodTypeEnum.NORMAL.value == model_type:
  206. first = OfflineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context)
  207. else:
  208. first = OfflineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context)
  209. first.start()
  210. self.__listeningProcesses[msg["request_id"]] = first
  211. # 结束离线进程
  212. def stopOfflineProcess(self, msg):
  213. ps = self.__listeningProcesses.get(msg["request_id"])
  214. if ps is None:
  215. logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"])
  216. return
  217. ps.sendEvent({"command": "stop"})
  218. # 开启图片分析进程
  219. def startImageProcess(self, msg, analysisType):
  220. pp = self.__listeningProcesses.get(msg["request_id"])
  221. if pp is not None:
  222. logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"])
  223. return
  224. model_type = self.__context["service"]["model"]["model_type"]
  225. codes = [model.get("code") for model in msg["models"] if model.get("code")]
  226. if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes:
  227. imaged = PhotosIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context)
  228. else:
  229. imaged = PhotosIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context)
  230. # 创建在线识别进程并启动
  231. imaged.start()
  232. self.__listeningProcesses[msg["request_id"]] = imaged
  233. '''
  234. 校验kafka消息
  235. '''
  236. @staticmethod
  237. def check_msg(msg, schema):
  238. try:
  239. v = Validator(schema, allow_unknown=True)
  240. result = v.validate(msg)
  241. if not result:
  242. logger.error("参数校验异常: {}, requestId: {}", v.errors, msg["request_id"])
  243. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  244. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  245. except ServiceException as s:
  246. raise s
  247. except Exception:
  248. logger.error("参数校验异常: {}, requestId: {}", format_exc(), msg["request_id"])
  249. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  250. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  251. '''
  252. 开启反馈线程,用于发送消息
  253. '''
  254. def start_feedback_thread(self):
  255. if self.__feedbackThread is None:
  256. self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config)
  257. self.__feedbackThread.setDaemon(True)
  258. self.__feedbackThread.start()
  259. time.sleep(1)
  260. if self.__feedbackThread and not self.__feedbackThread.is_alive():
  261. logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!")
  262. self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config)
  263. self.__feedbackThread.setDaemon(True)
  264. self.__feedbackThread.start()
  265. time.sleep(1)
  266. '''
  267. 在线分析逻辑
  268. '''
  269. def online(self, message, analysisType):
  270. if "start" == message.get("command"):
  271. self.check_msg(message, ONLINE_START_SCHEMA)
  272. if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]):
  273. raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
  274. ExceptionType.NO_RESOURCES.value[1])
  275. self.startOnlineProcess(message, analysisType)
  276. elif "stop" == message.get("command"):
  277. self.check_msg(message, ONLINE_STOP_SCHEMA)
  278. self.stopOnlineProcess(message)
  279. else:
  280. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  281. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  282. def offline(self, message, analysisType):
  283. if "start" == message.get("command"):
  284. self.check_msg(message, OFFLINE_START_SCHEMA)
  285. if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]):
  286. raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
  287. ExceptionType.NO_RESOURCES.value[1])
  288. self.startOfflineProcess(message, analysisType)
  289. elif "stop" == message.get("command"):
  290. self.check_msg(message, OFFLINE_STOP_SCHEMA)
  291. self.stopOfflineProcess(message)
  292. else:
  293. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  294. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  295. def image(self, message, analysisType):
  296. if "start" == message.get("command"):
  297. self.check_msg(message, IMAGE_SCHEMA)
  298. if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["image"]["limit"]):
  299. raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
  300. ExceptionType.NO_RESOURCES.value[1])
  301. self.startImageProcess(message, analysisType)
  302. else:
  303. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  304. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  305. def recording(self, message, analysisType):
  306. if "start" == message.get("command"):
  307. self.check_msg(message, RECORDING_START_SCHEMA)
  308. if len(self.__recordingProcesses) >= int(self.__context['service']["task"]["limit"]):
  309. raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
  310. ExceptionType.NO_RESOURCES.value[1])
  311. self.startRecordingProcess(message, analysisType)
  312. elif "stop" == message.get("command"):
  313. self.check_msg(message, RECORDING_STOP_SCHEMA)
  314. self.stopRecordingProcess(message)
  315. else:
  316. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  317. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  318. # 开启录屏进程
  319. def startRecordingProcess(self, msg, analysisType):
  320. if self.__listeningProcesses.get(msg["request_id"]):
  321. logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"])
  322. return
  323. srp = ScreenRecordingProcess(self.__fbQueue, self.__context, msg, analysisType)
  324. srp.start()
  325. self.__recordingProcesses[msg["request_id"]] = srp
  326. # 结束录屏进程
  327. def stopRecordingProcess(self, msg):
  328. rdp = self.__recordingProcesses.get(msg["request_id"])
  329. if rdp is None:
  330. logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"])
  331. return
  332. rdp.sendEvent({"command": "stop"})
  333. def pullStream(self, message, analysisType):
  334. if "start" == message.get("command"):
  335. self.check_msg(message, PULL2PUSH_START_SCHEMA)
  336. if len(self.__pull2PushProcesses) >= int(self.__context['service']["task"]["limit"]):
  337. raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
  338. ExceptionType.NO_RESOURCES.value[1])
  339. self.startPushStreamProcess(message, analysisType)
  340. elif "stop" == message.get("command"):
  341. self.check_msg(message, PULL2PUSH_STOP_SCHEMA)
  342. self.stopPushStreamProcess(message)
  343. else:
  344. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  345. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  346. def startPushStreamProcess(self, msg, analysisType):
  347. if self.__pull2PushProcesses.get(msg["request_id"]):
  348. logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"])
  349. return
  350. srp = PushStreamProcess(self.__fbQueue, self.__context, msg, analysisType)
  351. srp.start()
  352. self.__pull2PushProcesses[msg["request_id"]] = srp
  353. # 结束录屏进程
  354. def stopPushStreamProcess(self, msg):
  355. srp = self.__pull2PushProcesses.get(msg["request_id"])
  356. if srp is None:
  357. logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"])
  358. return
  359. srp.sendEvent({"command": "stop", "videoIds": msg.get("video_ids", [])})