空港防疫算法交互
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
14KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, as_completed
  4. import GPUtil
  5. from concurrency.FeedbackThread import FeedbackThread
  6. from entity.FeedBack import message_feedback, message_register
  7. from enums.ExceptionEnum import ExceptionType
  8. from exception.CustomerException import ServiceException
  9. from util import YmlUtils, LogUtils, KafkaUtils
  10. from loguru import logger
  11. from multiprocessing import Queue
  12. from util import GPUtils
  13. from util.ImageUtils import url2Array
  14. from util.ModelUtils import FKModel
  15. from util.OcrBaiduSdk import OcrBaiduSdk
  16. '''
  17. 分发服务
  18. '''
  19. def distinguish(flowMan, fkmodel, gpuId, orc, msgId):
  20. registerId = flowMan.get("registerId")
  21. carUrl = flowMan.get("carUrl")
  22. flowManUrlList = flowMan.get("flowManUrlList")
  23. args_list = []
  24. if carUrl is not None and len(carUrl) > 0:
  25. args_list.append(('plate', carUrl, gpuId, fkmodel, orc, msgId))
  26. for flowManUrl in flowManUrlList:
  27. if flowManUrl is not None and len(flowManUrl) > 0:
  28. args_list.append(('code', flowManUrl, gpuId, fkmodel, orc, msgId))
  29. register = message_register(registerId=registerId, carUrl=carUrl, carCode="", flowManRecognitionlList=[])
  30. with ThreadPoolExecutor(max_workers=2) as t:
  31. for result in t.map(ai_segmentation_recognition, args_list):
  32. if result is not None:
  33. if result == '':
  34. continue
  35. if result.get("type") == '2':
  36. register["carCode"] = result.get("carCode")
  37. register["carCodeScore"] = result.get("score")
  38. else:
  39. register["flowManRecognitionlList"].append(result)
  40. else:
  41. return None
  42. return register
  43. # 2.调用百度云识别
  44. def ai_segmentation_recognition(param):
  45. try:
  46. image = url2Array(param[1])
  47. if param[0] == 'plate':
  48. dataBack = param[3].process(image, param[2], param[0])
  49. logger.info("算法分割结果: {}", dataBack)
  50. # ('plate', carUrl, gpuId, fkmodel, orc, msgId)
  51. # {'type': 2, 'plateImage': "图片", '0.939557671546936', 'color': 'green'}
  52. if dataBack is None or dataBack.get("plateImage") is None or len(dataBack.get("plateImage")) == 0:
  53. result = param[4].license_plate_recognition(image, param[5])
  54. score = ''
  55. if result is None or result.get("words_result") is None:
  56. logger.error("车牌识别为空: {}", result)
  57. carCode = ''
  58. else:
  59. carCode = result.get("words_result").get("number")
  60. else:
  61. result = param[4].license_plate_recognition(dataBack.get("plateImage")[0], param[5])
  62. score = dataBack.get("plateImage")[1]
  63. if result is None or result.get("words_result") is None:
  64. result = param[4].license_plate_recognition(image, param[5])
  65. if result is None or result.get("words_result") is None:
  66. logger.error("车牌识别为空: {}", result)
  67. carCode = ''
  68. else:
  69. carCode = result.get("words_result").get("number")
  70. else:
  71. carCode = result.get("words_result").get("number")
  72. return {'type': str(2), 'carUrl': param[1], 'carCode': carCode, 'score': score}
  73. if param[0] == 'code':
  74. dataBack = param[3].process(image, param[2], param[0])
  75. logger.info("算法分割结果: {}", dataBack)
  76. if dataBack is None or dataBack.get("type") is None:
  77. return ''
  78. # 行程码
  79. if dataBack.get("type") == 0:
  80. # 手机号
  81. # if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
  82. # phoneNumberRecognition = ''
  83. # phone_score = ''
  84. # else:
  85. # phone = param[4].universal_text_recognition(dataBack.get("phoneNumberImage")[0], param[5])
  86. # phone_score = dataBack.get("phoneNumberImage")[1]
  87. # if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
  88. # logger.error("手机号识别为空: {}", phone)
  89. # phoneNumberRecognition = ''
  90. # else:
  91. # phoneNumberRecognition = phone.get("words_result")
  92. if dataBack.get("cityImage") is None or len(dataBack.get("cityImage")) == 0:
  93. cityRecognition = ''
  94. city_score = ''
  95. else:
  96. city = param[4].universal_text_recognition(dataBack.get("cityImage")[0], param[5])
  97. city_score = dataBack.get("cityImage")[1]
  98. if city is None or city.get("words_result") is None or len(city.get("words_result")) == 0:
  99. logger.error("城市识别为空: {}", city)
  100. cityRecognition = ''
  101. else:
  102. cityRecognition = city.get("words_result")
  103. return {'type': str(dataBack.get("type")),
  104. 'imageUrl': param[1],
  105. # 'phoneNumberRecognition': phoneNumberRecognition,
  106. # 'phone_sorce': phone_score,
  107. 'cityRecognition': cityRecognition,
  108. 'city_score': city_score}
  109. elif dataBack.get("type") == 1:
  110. # if dataBack.get("nameImage") is None or len(dataBack.get("nameImage")) == 0:
  111. # nameRecognition = ''
  112. # name_score = ''
  113. # else:
  114. # name = param[4].universal_text_recognition(dataBack.get("nameImage")[0], param[5])
  115. # name_score = dataBack.get("nameImage")[1]
  116. # if name is None or name.get("words_result") is None or len(name.get("words_result")) == 0:
  117. # logger.error("名字识别为空: {}", name)
  118. # nameRecognition = ''
  119. # else:
  120. # nameRecognition = name.get("words_result")
  121. # if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
  122. # phoneNumberRecognition = ''
  123. # phone_score = ''
  124. # else:
  125. # phone = param[4].universal_text_recognition(dataBack.get("phoneNumberImage")[0], param[5])
  126. # phone_score = dataBack.get("phoneNumberImage")[1]
  127. # if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
  128. # logger.error("手机号识别为空: {}", phone)
  129. # phoneNumberRecognition = ''
  130. # else:
  131. # phoneNumberRecognition = phone.get("words_result")
  132. if dataBack.get("hsImage") is None or len(dataBack.get("hsImage")) == 0:
  133. hsRecognition = ''
  134. hs_score = ''
  135. else:
  136. hs = param[4].universal_text_recognition(dataBack.get("hsImage")[0], param[5])
  137. hs_score = dataBack.get("hsImage")[1]
  138. if hs is None or hs.get("words_result") is None or len(hs.get("words_result")) == 0:
  139. logger.error("核酸识别为空: {}", hs)
  140. hsRecognition = ''
  141. else:
  142. hsRecognition = hs.get("words_result")
  143. return {'type': str(dataBack.get("type")),
  144. 'imageUrl': param[1],
  145. 'color': dataBack.get("color"),
  146. # 'nameRecognition': nameRecognition,
  147. # 'name_score': name_score,
  148. # 'phoneNumberRecognition': phoneNumberRecognition,
  149. # 'phone_score': phone_score,
  150. 'hsRecognition': hsRecognition,
  151. 'hs_score': hs_score}
  152. else:
  153. return ''
  154. except ServiceException as s:
  155. logger.exception("AI划图,百度云识别失败:{}, msgId: {}", s.msg, param[5])
  156. return None
  157. except Exception as e:
  158. logger.exception("AI划图,百度云识别失败: {}, msgId:{}", e, param[5])
  159. return None
  160. class DispatcherService():
  161. # 初始化
  162. def __init__(self):
  163. # 获取DSP环境所需要的配置
  164. self.content = YmlUtils.getConfigs()
  165. # 初始化日志
  166. LogUtils.init_log(self.content)
  167. # # 记录当前正在执行的分析任务
  168. # self.tasks = {}
  169. self.fbQueue = Queue()
  170. # 服务调用启动方法
  171. def start_service(self):
  172. # 启动问题反馈线程
  173. self.start_feedback_thread()
  174. # 初始化kafka监听者
  175. topics = self.content["kafka"]["topic"]["fk-alg-tasks-topic"]
  176. customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.content, topics=topics)
  177. print("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙")
  178. orc = OcrBaiduSdk(self.content)
  179. # 初始化模型
  180. fkmodel = FKModel()
  181. while True:
  182. # self.check_task()
  183. # 获取当前可用GPU
  184. gpu_ids = GPUtils.get_gpu_ids(self.content)
  185. if gpu_ids is not None and len(gpu_ids) > 0:
  186. msg = customerKafkaConsumer.poll()
  187. if msg is None or len(msg) == 0:
  188. time.sleep(2)
  189. else:
  190. for k, v in msg.items():
  191. for m in v:
  192. message = m.value
  193. try:
  194. customerKafkaConsumer.commit_offset(m)
  195. logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, msgId:{}",
  196. m.topic, m.offset, m.partition, message, message.get("msgId"))
  197. ################## 消息驱动分析执行 ##################
  198. check_result = self.check_fk_msg(message)
  199. if not check_result:
  200. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  201. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  202. msgId = message.get("msgId")
  203. registerMsg = message.get("registerMsg")
  204. obj_list = []
  205. response = message_feedback(msgId,
  206. erroCode=ExceptionType.SUCCESS.value[0],
  207. erroMsg=ExceptionType.SUCCESS.value[1],
  208. registerRecognitionMsg=[])
  209. with ThreadPoolExecutor(max_workers=2) as t:
  210. for flowMan in registerMsg:
  211. obj = t.submit(distinguish, flowMan, fkmodel, gpu_ids[0], orc, msgId)
  212. obj_list.append(obj)
  213. for future in as_completed(obj_list):
  214. data = future.result()
  215. if data is None:
  216. response = message_feedback(msgId,
  217. erroCode=ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  218. erroMsg=ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  219. registerRecognitionMsg=[])
  220. break
  221. else:
  222. response["registerRecognitionMsg"].append(data)
  223. self.fbQueue.put(response)
  224. except ServiceException as s:
  225. logger.exception("消息监听异常:{}, msgId: {}", s.msg, message.get("msgId"))
  226. self.fbQueue.put(message_feedback(message.get("msgId"), s.code, s.msg))
  227. except Exception as e:
  228. logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id"))
  229. self.fbQueue.put(message_feedback(message.get("msgId"),
  230. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  231. ExceptionType.SERVICE_INNER_EXCEPTION.value[
  232. 1]))
  233. else:
  234. logger.info("当前可用gpu数量: {}", gpu_ids)
  235. GPUtil.showUtilization()
  236. def checkGPU(self, msgId):
  237. gpu_ids = None
  238. while True:
  239. GPUtil.showUtilization()
  240. gpu_ids = GPUtils.get_gpu_ids(self.content)
  241. if gpu_ids is None or len(gpu_ids) == 0:
  242. logger.warning("暂无可用GPU资源,5秒后重试, 可用gpu数: {}, msgId: {}", gpu_ids, msgId)
  243. time.sleep(5)
  244. continue
  245. else:
  246. break
  247. return gpu_ids
  248. def start_feedback_thread(self):
  249. feedbackThread = FeedbackThread(self.fbQueue, self.content)
  250. feedbackThread.start()
  251. # def check_task(self):
  252. # for msgId in list(self.tasks.keys()):
  253. # if not self.tasks[msgId].is_alive():
  254. # del self.tasks[msgId]
  255. # 校验kafka消息
  256. def check_fk_msg(self, msg):
  257. msgId = msg.get("msgId")
  258. registerMsg = msg.get("registerMsg")
  259. if msgId is None:
  260. return False
  261. if registerMsg is None:
  262. return False
  263. return True