|
- # -*- coding: utf-8 -*-
- import time
- from concurrent.futures import ThreadPoolExecutor, as_completed
-
- import GPUtil
-
- from concurrency.FeedbackThread import FeedbackThread
- from entity.FeedBack import message_feedback, message_register
- from enums.ExceptionEnum import ExceptionType
- from exception.CustomerException import ServiceException
- from util import YmlUtils, LogUtils, KafkaUtils
- from loguru import logger
- from multiprocessing import Queue
- from util import GPUtils
- from util.ImageUtils import url2Array
- from util.ModelUtils import FKModel
- from util.OcrBaiduSdk import OcrBaiduSdk
-
- '''
- 分发服务
- '''
-
- def distinguish(flowMan, fkmodel, gpuId, orc, msgId):
- registerId = flowMan.get("registerId")
- carUrl = flowMan.get("carUrl")
- flowManUrlList = flowMan.get("flowManUrlList")
- args_list = []
- args_list.append(('plate', carUrl, gpuId, fkmodel, orc, msgId))
- for flowManUrl in flowManUrlList:
- args_list.append(('code', flowManUrl, gpuId, fkmodel, orc, msgId))
- register = message_register(registerId=registerId, carUrl=carUrl, carCode="", flowManRecognitionlList=[])
- with ThreadPoolExecutor(max_workers=2) as t:
- for result in t.map(ai_segmentation_recognition, args_list):
- if result is not None:
- if result.get("type") == '2':
- register["carCode"] = result.get("carCode")
- register["carCodeScore"] = result.get("score")
- else:
- register["flowManRecognitionlList"].append(result)
- else:
- return None
- return register
-
-
- # 2.调用百度云识别
- def ai_segmentation_recognition(param):
- try:
- image = url2Array(param[1])
- if param[0] == 'plate':
- dataBack = param[3].process(image, param[2], param[0])
- logger.info("算法分割结果: {}", dataBack)
- if dataBack is None or dataBack.get("type") is None:
- raise ServiceException(ExceptionType.AI_RECOGNITION_FAILED.value[0],
- ExceptionType.AI_RECOGNITION_FAILED.value[1])
- # ('plate', carUrl, gpuId, fkmodel, orc, msgId)
- # {'type': 2, 'plateImage': "图片", '0.939557671546936', 'color': 'green'}
- if dataBack.get("plateImage") is None or len(dataBack.get("plateImage")) == 0:
- carCode = ''
- score = ''
- else:
- result = param[4].license_plate_recognition(dataBack.get("plateImage")[0], param[5])
- score = dataBack.get("plateImage")[1]
- if result is None or result.get("words_result") is None:
- logger.error("车牌识别为空: {}", result)
- carCode = ''
- else:
- carCode = result.get("words_result").get("number")
- return {'type': str(dataBack.get("type")), 'carUrl': param[1], 'carCode': carCode, 'score': score}
- if param[0] == 'code':
- dataBack = param[3].process(image, param[2], param[0])
- logger.info("算法分割结果: {}", dataBack)
- if dataBack is None or dataBack.get("type") is None:
- raise ServiceException(ExceptionType.AI_RECOGNITION_FAILED.value[0],
- ExceptionType.AI_RECOGNITION_FAILED.value[1])
- # 行程码
- if dataBack.get("type") == 0:
- # 手机号
- if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
- phoneNumberRecognition = ''
- phone_score = ''
- else:
- phone = param[4].universal_text_recognition(dataBack.get("phoneNumberImage")[0], param[5])
- phone_score = dataBack.get("phoneNumberImage")[1]
- if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
- logger.error("手机号识别为空: {}", phone)
- phoneNumberRecognition = ''
- else:
- phoneNumberRecognition = phone.get("words_result")
- if dataBack.get("cityImage") is None or len(dataBack.get("cityImage")) == 0:
- cityRecognition = ''
- city_score = ''
- else:
- city = param[4].universal_text_recognition(dataBack.get("cityImage")[0], param[5])
- city_score = dataBack.get("cityImage")[1]
- if city is None or city.get("words_result") is None or len(phone.get("words_result")) == 0:
- logger.error("城市识别为空: {}", city)
- cityRecognition = ''
- else:
- cityRecognition = city.get("words_result")
- return {'type': str(dataBack.get("type")),
- 'imageUrl': param[1],
- 'phoneNumberRecognition': phoneNumberRecognition,
- 'phone_sorce': phone_score,
- 'cityRecognition': cityRecognition,
- 'city_score': city_score}
- elif dataBack.get("type") == 1:
- if dataBack.get("nameImage") is None or len(dataBack.get("nameImage")) == 0:
- nameRecognition = ''
- name_score = ''
- else:
- name = param[4].universal_text_recognition(dataBack.get("nameImage")[0], param[5])
- name_score = dataBack.get("nameImage")[1]
- if name is None or name.get("words_result") is None or len(name.get("words_result")) == 0:
- logger.error("名字识别为空: {}", name)
- nameRecognition = ''
- else:
- nameRecognition = name.get("words_result")
-
- if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
- phoneNumberRecognition = ''
- phone_score = ''
- else:
- phone = param[4].universal_text_recognition(dataBack.get("phoneNumberImage")[0], param[5])
- phone_score = dataBack.get("phoneNumberImage")[1]
- if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
- logger.error("手机号识别为空: {}", phone)
- phoneNumberRecognition = ''
- else:
- phoneNumberRecognition = phone.get("words_result")
-
- if dataBack.get("hsImage") is None or len(dataBack.get("hsImage")) == 0:
- hsRecognition = ''
- hs_score = ''
- else:
- hs = param[4].universal_text_recognition(dataBack.get("hsImage")[0], param[5])
- hs_score = dataBack.get("hsImage")[1]
- if hs is None or hs.get("words_result") is None or len(hs.get("words_result")) == 0:
- logger.error("核酸识别为空: {}", hs)
- hsRecognition = ''
- else:
- hsRecognition = hs.get("words_result")
-
- return {'type': str(dataBack.get("type")),
- 'imageUrl': param[1],
- 'color': dataBack.get("color"),
- 'nameRecognition': nameRecognition,
- 'name_score': name_score,
- 'phoneNumberRecognition': phoneNumberRecognition,
- 'phone_score': phone_score,
- 'hsRecognition': hsRecognition,
- 'hs_score': hs_score}
- else:
- raise ServiceException(ExceptionType.AI_RECOGNITION_FAILED.value[0],
- ExceptionType.AI_RECOGNITION_FAILED.value[1])
- except ServiceException as s:
- logger.exception("AI划图,百度云识别失败:{}, msgId: {}", s.msg, param[5])
- return None
- except Exception as e:
- logger.exception("AI划图,百度云识别失败: {}, msgId:{}", e, param[5])
- return None
-
-
-
-
- class DispatcherService():
-
- # 初始化
- def __init__(self):
- # 获取DSP环境所需要的配置
- self.content = YmlUtils.getConfigs()
- # 初始化日志
- LogUtils.init_log(self.content)
- # # 记录当前正在执行的分析任务
- # self.tasks = {}
- self.fbQueue = Queue()
-
- # 服务调用启动方法
- def start_service(self):
- # 启动问题反馈线程
- self.start_feedback_thread()
- # 初始化kafka监听者
- topics = self.content["kafka"]["topic"]["fk-alg-tasks-topic"]
- customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.content, topics=topics)
- print("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙")
- orc = OcrBaiduSdk(self.content)
- # 初始化模型
- fkmodel = FKModel()
- while True:
- # self.check_task()
- # 获取当前可用GPU
- gpu_ids = GPUtils.get_gpu_ids(self.content)
- if gpu_ids is not None and len(gpu_ids) > 0:
- msg = customerKafkaConsumer.poll()
- if msg is None or len(msg) == 0:
- time.sleep(2)
- else:
- for k, v in msg.items():
- for m in v:
- message = m.value
- try:
- gpu_ids = self.checkGPU(message.get("msgId"))
- customerKafkaConsumer.commit_offset(m)
- logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, msgId:{}",
- m.topic, m.offset, m.partition, message, message.get("msgId"))
- ################## 消息驱动分析执行 ##################
- check_result = self.check_fk_msg(message)
- if not check_result:
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
- msgId = message.get("msgId")
- registerMsg = message.get("registerMsg")
- obj_list = []
- response = message_feedback(msgId,
- erroCode=ExceptionType.SUCCESS.value[0],
- erroMsg=ExceptionType.SUCCESS.value[1],
- registerRecognitionMsg=[])
- with ThreadPoolExecutor(max_workers=2) as t:
- for flowMan in registerMsg:
- obj = t.submit(distinguish, flowMan, fkmodel, gpu_ids[0], orc, msgId)
- obj_list.append(obj)
- for future in as_completed(obj_list):
- data = future.result()
- if data is None:
- response = message_feedback(msgId,
- erroCode=ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- erroMsg=ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- registerRecognitionMsg=[])
- break
- else:
- response["registerRecognitionMsg"].append(data)
- self.fbQueue.put(response)
- except ServiceException as s:
- logger.exception("消息监听异常:{}, msgId: {}", s.msg, message.get("msgId"))
- self.fbQueue.put(message_feedback(message.get("msgId"), s.code, s.msg))
- except Exception as e:
- logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id"))
- self.fbQueue.put(message_feedback(message.get("msgId"),
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[
- 1]))
-
- else:
- logger.info("当前可用gpu数量: {}", gpu_ids)
- GPUtil.showUtilization()
-
-
-
-
- def checkGPU(self, msgId):
- gpu_ids = None
- while True:
- GPUtil.showUtilization()
- gpu_ids = GPUtils.get_gpu_ids(self.content)
- if gpu_ids is None or len(gpu_ids) == 0:
- logger.warning("暂无可用GPU资源,5秒后重试, 可用gpu数: {}, msgId: {}", len(gpu_ids), msgId)
- time.sleep(5)
- continue
- else:
- break
- return gpu_ids
-
- def start_feedback_thread(self):
- feedbackThread = FeedbackThread(self.fbQueue, self.content)
- feedbackThread.start()
-
- # def check_task(self):
- # for msgId in list(self.tasks.keys()):
- # if not self.tasks[msgId].is_alive():
- # del self.tasks[msgId]
-
- # 校验kafka消息
- def check_fk_msg(self, msg):
- msgId = msg.get("msgId")
- registerMsg = msg.get("registerMsg")
- if msgId is None:
- return False
- if registerMsg is None:
- return False
- return True
|