|
- # -*- coding: utf-8 -*-
- import base64
- import json
- import os
- import time
- import copy
- from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
- from os.path import join, exists
- from traceback import format_exc
-
- import cv2
- import numpy as np
-
- from multiprocessing import Process, Queue
- from loguru import logger
-
- from common.Constant import init_progess, success_progess
-
- from util.GPUtils import check_gpu_resource
- from util.LogUtils import init_log
- from concurrency.CommonThread import Common
- from concurrency.PullStreamThread import RecordingPullStreamThread
- from concurrency.PullVideoStreamProcess import OnlinePullVideoStreamProcess, OfflinePullVideoStreamProcess
- from concurrency.RecordingHeartbeatThread import RecordingHeartbeat
- from enums.AnalysisStatusEnum import AnalysisStatus
- from enums.AnalysisTypeEnum import AnalysisType
- from enums.ExceptionEnum import ExceptionType
- from enums.ModelTypeEnum import ModelType
- from enums.RecordingStatusEnum import RecordingStatus
- from util import TimeUtils
- from util.AliyunSdk import AliyunOssSdk, ThAliyunVodSdk
- from util.CpuUtils import check_cpu
- from util.Cv2Utils import Cv2Util, video_conjuncing, push_video_stream, write_or_video, write_ai_video, close_all_p
- from entity.FeedBack import message_feedback, recording_feedback
- from exception.CustomerException import ServiceException
- from util.ImageUtils import PictureWaterMark, url2Array, add_water_pic
- from util.ModelUtils import MODEL_CONFIG, FONTPATH
- from util.OcrBaiduSdk import OcrBaiduSdk
-
- from enums.BaiduSdkEnum import VehicleEnumVALUE, VehicleEnum
- from enums.ModelTypeEnum import BaiduModelTarget
- from util.PlotsUtils import draw_painting_joint, get_label_arrays
-
-
- class IntelligentRecognitionProcess(Process):
- __slots__ = (
- '_fbQueue',
- 'eventQueue',
- '_imageQueue',
- '_hbQueue',
- 'pullQueue',
- '_context',
- '_msg',
- '_analyse_type',
- '_base_dir',
- '_gpu_name',
- '_enable_add_water',
- '_logo',
- 'start_proccess_time'
- )
-
- def __init__(self, param):
- super().__init__()
- # Param(self._fbQueue, msg, analysisType, self.__base_dir)
- self._fbQueue = param.fbqueue
- self.eventQueue = Queue()
- self._imageQueue = Queue()
- self._hbQueue = Queue()
- self.pullQueue = Queue(100)
- self._context = param.context
- self._base_dir = param.base_dir
- self._msg = param.msg
- self._gpu_name = param.gpu_name
- self._analyse_type = param.analyse_type
- self._enable_add_water = bool(self._context["video"]["video_add_water"])
- self.start_proccess_time = time.time()
- self._logo = None
- if self._enable_add_water:
- self._logo = self._msg.get("logo_url")
- if self._logo:
- self._logo = url2Array(self._logo, enable_ex=False)
- if not self._logo:
- self._logo = cv2.imread(join(self._base_dir, "image/logo.png"), -1)
- putStatusMessage(self._fbQueue, AnalysisStatus.WAITING.value, init_progess, self._analyse_type,
- self._msg.get("request_id"))
-
- def clearPullQueue(self):
- while True:
- if self.pullQueue.qsize() > 0 or self._imageQueue.qsize() > 0:
- getNoBlockQueue(self.pullQueue)
- getNoBlockQueue(self._imageQueue)
- else:
- break
-
- def waitPullStream(self, pullProcess):
- while True:
- self.clearPullQueue()
- start = time.time()
- pullProcess.join(5)
- if time.time() - start >= 5:
- self.clearPullQueue()
- else:
- break
-
- # 给本进程发送事件
- def sendEvent(self, eBody):
- try:
- self.eventQueue.put(eBody, timeout=10)
- except Exception:
- logger.error("添加事件到队列超时异常:{}, requestId:{}", format_exc(), self._msg.get("request_id"))
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
- def getImageQueue(self):
- eBody = None
- try:
- eBody = self._imageQueue.get(block=False)
- except Exception as e:
- pass
- return eBody
-
- def sendImageQueue(self, result):
- try:
- self._imageQueue.put(result, timeout=10)
- except Exception:
- logger.error("添加图片到队列超时异常:{}, requestId:{}", format_exc(),
- self._msg.get("request_id"))
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
- def sendhbQueue(self, result):
- try:
- self._hbQueue.put(result, timeout=10)
- except Exception:
- logger.error("添加心跳到队列超时异常:{}, requestId:{}", format_exc(),
- self._msg.get("request_id"))
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
- '''
- "models": [{
- "code": "模型编号",
- "categories":[{
- "id": "模型id",
- "config": {
- "k1": "v1",
- "k2": "v2"
- }
- }]
- }]
- '''
-
- def get_model(self):
- requestId = self._msg.get("request_id")
- try:
- analyse_type_tuple = (AnalysisType.ONLINE.value, AnalysisType.OFFLINE.value)
- models = self._msg.get("models")
- if models is None or len(models) == 0:
- raise ServiceException(ExceptionType.AI_MODEL_CONFIG_EXCEPTION.value[0],
- ExceptionType.AI_MODEL_CONFIG_EXCEPTION.value[1])
- if self._analyse_type in analyse_type_tuple:
- if len(models) > self._context["model"]['limit']:
- raise ServiceException(ExceptionType.MODEL_GROUP_LIMIT_EXCEPTION.value[0],
- ExceptionType.MODEL_GROUP_LIMIT_EXCEPTION.value[1])
- modelArray = []
- codeArray = set()
- for model in models:
- # 模型编码
- code = model.get("code")
- # 检验code是否重复
- if code in codeArray:
- continue
- codeArray.add(code)
- # 检测目标数组
- needed_objectsIndex = list(set([int(category.get("id")) for category in model.get("categories")]))
- # 检测目标和相关检测目标配置
- # categories = model.get("categories")
- logger.info("模型编号: {}, 检查目标: {}, requestId: {}", code, needed_objectsIndex, requestId)
- model_method = MODEL_CONFIG.get(code)
- if model_method is not None:
- check_cpu(self._base_dir, requestId)
- gpu_ids = check_gpu_resource(requestId)
- if self._analyse_type in analyse_type_tuple:
- if model.get("is_video") == "1":
- mod = model_method[0](gpu_ids[0], needed_objectsIndex, requestId, self._gpu_name,
- self._base_dir)
- modelArray.append((mod.model_conf, code))
- else:
- raise ServiceException(ExceptionType.MODEL_NOT_SUPPORT_VIDEO_EXCEPTION.value[0],
- ExceptionType.MODEL_NOT_SUPPORT_VIDEO_EXCEPTION.value[1],
- model_method[1].value[2])
- # 如果是图片识别
- if self._analyse_type == AnalysisType.IMAGE.value:
- if model.get("is_image") == "1":
- mod = model_method[0](gpu_ids[0], needed_objectsIndex, requestId, self._gpu_name,
- self._base_dir)
- modelArray.append((mod.model_conf, code))
- else:
- raise ServiceException(ExceptionType.MODEL_NOT_SUPPORT_IMAGE_EXCEPTION.value[0],
- ExceptionType.MODEL_NOT_SUPPORT_IMAGE_EXCEPTION.value[1],
- model_method[1].value[2])
- else:
- logger.error("未匹配到对应的模型, request_id:{}", requestId)
- raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],
- ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1])
- if len(modelArray) == 0:
- raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],
- ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1])
- return modelArray
- except ServiceException as s:
- raise s
- except Exception as e:
- logger.error("模型配置处理异常: {}, request_id: {}", format_exc(), requestId)
- raise ServiceException(ExceptionType.MODEL_LOADING_EXCEPTION.value[0],
- ExceptionType.MODEL_LOADING_EXCEPTION.value[1])
-
-
- def putQueue(queue, result, requestId, enable_ex=True):
- try:
- queue.put(result, timeout=10)
- except Exception:
- logger.error("添加队列超时异常:{}, requestId:{}", format_exc(), requestId)
- if enable_ex:
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- def putStatusMessage(fb_queue, analysisStatus, progress, analyse_type, requestId):
- putQueue(fb_queue, {"feedback": message_feedback(requestId,
- analysisStatus,
- analyse_type,
- progress=progress,
- analyse_time=TimeUtils.now_date_to_str())}, requestId)
-
-
- def getNoBlockQueue(queue):
- eBody = None
- try:
- eBody = queue.get(block=False)
- except Exception as e:
- pass
- return eBody
-
-
- def getBlockQueue(queue):
- return queue.get()
-
-
- def checkPullProcess(pullProcess, requestId):
- if pullProcess is not None and not pullProcess.is_alive():
- logger.info("拉流进程停止异常, requestId: {}", requestId)
- raise Exception("拉流进程异常停止")
-
-
- def buildFrame(pull_queue, logo, enable_add_water, frame_score, model_array, task_status, ttt, requestId):
- frames = []
- status = None
- for i in range(pull_queue.qsize()):
- frame_result = getNoBlockQueue(pull_queue)
- if frame_result is None:
- continue
- if frame_result[0] == '4':
- width = frame_result[3]
- if task_status[0] == 1:
- for model in model_array:
- model_conf = model[0]
- label_method = MODEL_CONFIG[model_conf[0].value[1]][2]
- if label_method:
- model_param = model_conf[2]
- label_method(width, model_param)
- task_status[0] = 0
- # 如果是第一条消息, 第一条消息设置变量为true
- # 参数1: ("4", frame, concurrent_frame, w_2, h_2, all_frames), requestId)
- # 参数2:task_status 任务记录状态 第一个参数是否发现问题 1发现 0 否
- # 参数3:模型信息
- # 参数4:logo
- # 参数5:是否添加logo
- # 参数6:得分多少以上才输出问题
- # 参数7: 线程池
- # 参数8:是否是第一帧
- # 参数9 是否是问题
- frames.append((frame_result, task_status, model_array, logo, enable_add_water, frame_score,
- ttt, requestId))
- else:
- status = frame_result
- return frames, status
-
-
- def analyze(mod, frame, task_status):
- model_conf = mod[0]
- code = mod[1]
- model_param = model_conf[2]
- model_param[0] = frame
- allowedList = None
- label_arraylist = None
- rainbows = None
- if task_status[1] == 1:
- p_result, timeOut = MODEL_CONFIG[code][3](model_param)
- if p_result[2] is not None and len(p_result[2]) > 0:
- allowedList = model_conf[1]
- label_arraylist = model_conf[2][4]
- rainbows = model_conf[2][5]
- else:
- time_e = int(time.time())
- if time_e % 2 == 0:
- p_result, timeOut = MODEL_CONFIG[code][3](model_param)
- if p_result[2] is not None and len(p_result[2]) > 0:
- allowedList = model_conf[1]
- label_arraylist = model_conf[2][4]
- rainbows = model_conf[2][5]
- else:
- p_result = [[], frame, [], []]
- return p_result, code, allowedList, label_arraylist, rainbows
-
-
- def process(frame):
- try:
- # (frame_result, task_status, model_array, logo, enable_add_water, frame_score, ttt, is_question, requestId)
- frame_result = frame[0]
- analyze_result = []
- or_frame = frame_result[1]
- copy_frame = or_frame.copy()
- task_status = frame[1]
- model_array = frame[2]
- logo = frame[3]
- enable_add_water = frame[4]
- frame_score = frame[5]
- ttt = frame[6]
- requestId = frame[7]
- # 按模型组合识别
- for mod in model_array:
- result = ttt.submit(analyze, mod, or_frame, task_status)
- analyze_result.append(result)
- results = wait(analyze_result, timeout=60, return_when=ALL_COMPLETED)
- completed_futures = results.done
- det_xywh = {}
- for r in completed_futures:
- if r.exception():
- raise r.exception()
- p_result, code, allowedList, label_arraylist, rainbows = r.result()
- if allowedList is not None:
- det_xywh[code] = {}
- ai_result_list = p_result[2]
- for ai_result in ai_result_list:
- # 检测目标
- detect_targets_code = int(ai_result[0])
- score = ai_result[-1]
- # 如果检测目标在识别任务中,继续处理
- if detect_targets_code in allowedList and score >= frame_score:
- label_array = label_arraylist[detect_targets_code]
- color = rainbows[detect_targets_code]
- # [float(cls_c), xc,yc,w,h, float(conf_c)]
- # [ cls,[ (x0,y0),(x1,y1),(x2,y2),(x3,y3) ],score]
- if not isinstance(ai_result[1], (list, tuple, np.ndarray)):
- xc = int(ai_result[1])
- yc = int(ai_result[2])
- x2 = int(ai_result[3])
- y2 = int(ai_result[4])
- tl = (xc, yc)
- tr = (x2, yc)
- br = (x2, y2)
- bl = (xc, y2)
- box = [tl, tr, br, bl]
- else:
- box = ai_result[1]
- draw_painting_joint(box, copy_frame, label_array, score, color, "leftTop")
- cd = det_xywh[code].get(detect_targets_code)
- if cd is None:
- det_xywh[code][detect_targets_code] = [
- [detect_targets_code, box, score, label_array, color]]
- else:
- det_xywh[code][detect_targets_code].append(
- [detect_targets_code, box, score, label_array, color])
- if enable_add_water:
- or_frame = add_water_pic(or_frame, logo, requestId)
- copy_frame = add_water_pic(copy_frame, logo, requestId)
- frame_merge = video_conjuncing(or_frame, copy_frame)
- return frame_result, frame_merge, det_xywh
- except ServiceException as s:
- raise s
- except Exception as e:
- logger.error("模型分析异常: {}", format_exc())
- raise e
-
-
- class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
- __slots__ = ('__orFilePath', '__aiFilePath')
-
- def __init__(self, param):
- super(OnlineIntelligentRecognitionProcess, self).__init__(param)
- random_time = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
- self.__orFilePath = "%s%s%s%s%s" % (
- join(self._base_dir, self._context["video"]["file_path"]), random_time, "_on_or_",
- self._msg.get("request_id"), ".mp4")
- self.__aiFilePath = "%s%s%s%s%s" % (
- join(self._base_dir, self._context["video"]["file_path"]), random_time, "_on_ai_",
- self._msg.get("request_id"), ".mp4")
-
- # 停止任务方法
- def stop_task(self, pullProcess, snalysisStatus):
- pullProcess.sendCommand({"command": "stop_pull_stream"})
- if not os.path.exists(self.__orFilePath) or not os.path.exists(self.__aiFilePath):
- logger.error("原视频或AI视频不存在!requestId:{}", self._msg.get("request_id"))
- raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0],
- ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1])
- aliyunVodSdk = ThAliyunVodSdk(self._base_dir, self._msg.get("request_id"), self._context["dsp"]["active"])
- upload_video_thread_or = Common(self._context, aliyunVodSdk.get_play_url, self.__orFilePath,
- "or_online_%s" % self._msg.get("request_id"))
- upload_video_thread_ai = Common(self._context, aliyunVodSdk.get_play_url, self.__aiFilePath,
- "ai_online_%s" % self._msg.get("request_id"))
- upload_video_thread_or.setDaemon(True)
- upload_video_thread_ai.setDaemon(True)
- upload_video_thread_or.start()
- upload_video_thread_ai.start()
- or_url = upload_video_thread_or.get_result()
- ai_url = upload_video_thread_ai.get_result()
- if or_url is None or ai_url is None:
- logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", self._msg.get("request_id"))
- raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
- ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
- pullProcess.sendCommand({"command": "stop_image_hb"})
- self.waitPullStream(pullProcess)
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"), snalysisStatus,
- self._analyse_type,
- progress=success_progess,
- original_url=or_url,
- sign_url=ai_url,
- analyse_time=TimeUtils.now_date_to_str())},
- self._msg.get("request_id"))
-
- def start_pull_stream(self):
- pullProcess = OnlinePullVideoStreamProcess(self._msg, self._context, self.pullQueue,
- self._fbQueue, self._hbQueue, self._imageQueue,
- self._analyse_type, self._base_dir)
- pullProcess.daemon = True
- pullProcess.start()
- return pullProcess
-
- def run(self):
- # cv2tool = None
- feedback = None
- pullProcess = None
- requestId = self._msg.get("request_id")
- push_url = self._msg.get("push_url")
- orFilePath = self.__orFilePath
- aiFilePath = self.__aiFilePath
- push_p = None
- or_video_file = None
- ai_video_file = None
- fb_queue = self._fbQueue
- try:
- init_log(self._base_dir)
- logger.info("实时任务进行中!!!!!!!requestId: {}", requestId)
- # 获取变量
- eventQueue = self.eventQueue
- pull_queue = self.pullQueue
- image_queue = self._imageQueue
- # 加载模型
- model_array = self.get_model()
- # 启动拉流进程
- pullProcess = self.start_pull_stream()
- enable_add_water = self._enable_add_water
- logo = self._logo
- # 第1个参数控制是否第一帧, 是1, 不是 0
- # 第2个参数控制是否发现问题, 发现问题1, 没有问题0
- task_status = [1, 1]
- frame_score = float(self._context["service"]["frame_score"])
- # cv2tool = Cv2Util(None, self._msg.get("push_url"), self.__orFilePath, self.__aiFilePath, requestId)
- putStatusMessage(self._fbQueue, AnalysisStatus.RUNNING.value, init_progess, self._analyse_type, requestId)
- task_frame = None
- p_push_array = [0, 0]
- with ThreadPoolExecutor(max_workers=5) as t:
- with ThreadPoolExecutor(max_workers=3) as tt:
- with ThreadPoolExecutor(max_workers=15) as ttt:
- while True:
- checkPullProcess(pullProcess, requestId)
- event_result = getNoBlockQueue(eventQueue)
- if event_result:
- cmdStr = event_result.get("command")
- # 接收到停止指令
- if "stop_ex" == cmdStr:
- close_all_p(push_p, or_video_file, ai_video_file, requestId)
- logger.info("实时任务开始停止, requestId: {}", requestId)
- pullProcess.sendCommand({"command": 'stop_image_hb'})
- feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
- self._analyse_type,
- ExceptionType.NO_RESOURCES.value[0],
- ExceptionType.NO_RESOURCES.value[1],
- analyse_time=TimeUtils.now_date_to_str())}
- self.waitPullStream(pullProcess)
- break
- # 接收到停止指令
- if "stop" == cmdStr:
- logger.info("实时任务开始停止, requestId: {}", requestId)
- pullProcess.sendCommand({"command": 'stop_pull_stream'})
- frames = []
- status = None
- if task_frame:
- frames, status = task_frame.result()
- task_frame = tt.submit(buildFrame, pull_queue, logo, enable_add_water, frame_score,
- model_array, task_status, ttt, requestId)
- if len(frames) == 0 and status is None:
- time.sleep(0.5)
- continue
- if len(frames) > 0:
- check = False
- for result in t.map(process, frames):
- if result is not None:
- frame_result, frame_merge, det_xywh = result
- write_or_video_result = tt.submit(write_or_video, frame_result[1], orFilePath,
- or_video_file, frame_result[3],
- frame_result[4],
- requestId)
- write_ai_video_result = tt.submit(write_ai_video, frame_merge, aiFilePath,
- ai_video_file, frame_result[3],
- frame_result[4],
- requestId)
- push_p = push_video_stream(frame_merge, push_p, push_url,
- frame_result[3], frame_result[4], p_push_array,
- requestId)
- ai_video_file = write_ai_video_result.result()
- or_video_file = write_or_video_result.result()
- # res = [push_stream_result, write_or_video_result, write_ai_video_result]
- # completed_results = wait(res, timeout=600, return_when=ALL_COMPLETED)
- # completed_futures = completed_results.done
- # for r in completed_futures:
- # if r.exception():
- # raise r.exception()
- if len(det_xywh) > 0:
- check = True
- putQueue(image_queue, {"image": (frame_result, det_xywh)}, requestId)
-
- if check:
- task_status[1] = 1
- if not check:
- task_status[1] = 0
- if status is None:
- continue
- if status[0] == "1":
- raise ServiceException(status[1], status[2])
- elif status[0] == "3":
- close_all_p(push_p, or_video_file, ai_video_file, requestId)
- self.stop_task(pullProcess, AnalysisStatus.TIMEOUT.value)
- break
- elif status[0] == "9":
- logger.info("实时任务正常结束:requestId: {}", self._msg.get("request_id"))
- close_all_p(push_p, or_video_file, ai_video_file, requestId)
- self.stop_task(pullProcess, AnalysisStatus.SUCCESS.value)
- break
- else:
- raise Exception("未知拉流状态异常!")
- logger.info("实时进程任务完成,requestId:{}", self._msg.get("request_id"))
- except ServiceException as s:
- logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg,
- self._msg.get("request_id"))
- feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
- self._analyse_type,
- s.code,
- s.msg,
- analyse_time=TimeUtils.now_date_to_str())}
- except Exception as e:
- logger.error("服务异常: {}, requestId: {},", format_exc(), self._msg.get("request_id"))
- feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
- self._analyse_type,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- analyse_time=TimeUtils.now_date_to_str())}
- finally:
- close_all_p(push_p, or_video_file, ai_video_file, requestId)
- if pullProcess is not None and pullProcess.is_alive():
- pullProcess.sendCommand({"command": "stop_image_hb"})
- self.waitPullStream(pullProcess)
- if feedback:
- putQueue(fb_queue, feedback, requestId)
- # 删除本地视频文件
- if self.__orFilePath is not None and exists(self.__orFilePath):
- logger.info("开始删除原视频, orFilePath: {}, requestId: {}", self.__orFilePath, requestId)
- os.remove(self.__orFilePath)
- logger.info("删除原视频成功, orFilePath: {}, requestId: {}", self.__orFilePath, requestId)
- if self.__aiFilePath is not None and exists(self.__aiFilePath):
- logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", self.__aiFilePath, requestId)
- os.remove(self.__aiFilePath)
- logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", self.__aiFilePath, requestId)
-
-
- class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
- __slots__ = "__aiFilePath"
-
- def __init__(self, cfg):
- super(OfflineIntelligentRecognitionProcess, self).__init__(cfg)
- # 定义原视频、AI视频保存名称
- random_time = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
- self.__aiFilePath = "%s%s%s%s%s" % (
- join(self._base_dir, self._context["video"]["file_path"]), random_time, "_on_ai_",
- self._msg.get("request_id"), ".mp4")
-
- def stop_task(self, pullProcess, analysisStatus):
- pullProcess.sendCommand({"command": "stop_pull_stream"})
- if not os.path.exists(self.__aiFilePath):
- logger.error("AI视频不存在!requestId:{}", self._msg.get("request_id"))
- raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0],
- ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1])
- aliyunVodSdk = ThAliyunVodSdk(self._base_dir, self._msg.get("request_id"), self._context["dsp"]["active"])
- upload_video_thread_ai = Common(self._context, aliyunVodSdk.get_play_url, self.__aiFilePath,
- "ai_offLine_%s" % self._msg.get("request_id"))
- upload_video_thread_ai.setDaemon(True)
- upload_video_thread_ai.start()
- ai_play_url = upload_video_thread_ai.get_result()
- if ai_play_url is None:
- logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", self._msg.get("request_id"))
- raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
- ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
- pullProcess.sendCommand({"command": "stop_image_hb"})
- self.waitPullStream(pullProcess)
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"), analysisStatus,
- AnalysisType.OFFLINE.value,
- progress=success_progess,
- sign_url=ai_play_url,
- analyse_time=TimeUtils.now_date_to_str())},
- self._msg.get("request_id"))
-
- def start_pull_stream(self):
- pullProcess = OfflinePullVideoStreamProcess(self._msg, self._context, self.pullQueue, self._fbQueue,
- self._hbQueue, self._imageQueue, self._analyse_type, self._base_dir)
- pullProcess.daemon = True
- pullProcess.start()
- return pullProcess
-
- def run(self):
- # cv2tool = None
- requestId = self._msg.get("request_id")
- push_url = self._msg.get("push_url")
- eventQueue = self.eventQueue
- pull_queue = self.pullQueue
- image_queue = self._imageQueue
- hb_queue = self._hbQueue
- fb_queue = self._fbQueue
- pullProcess = None
- feedback = None
- push_p = None
- aiFilePath = self.__aiFilePath
- ai_video_file = None
- try:
- init_log(self._base_dir)
- enable_add_water = self._enable_add_water
- logo = self._logo
- task_status = [1, 1]
- frame_score = float(self._context["service"]["frame_score"])
- # 加载模型
- model_array = self.get_model()
- pullProcess = self.start_pull_stream()
- # cv2tool = Cv2Util(None, self._msg.get("push_url"), aiFilePath=self.__aiFilePath,
- # requestId=self._msg.get("request_id"))
- task_frame = None
- p_push_array = [0, 0]
- with ThreadPoolExecutor(max_workers=5) as t:
- with ThreadPoolExecutor(max_workers=2) as tt:
- with ThreadPoolExecutor(max_workers=15) as ttt:
- while True:
- if pullProcess is not None and not pullProcess.is_alive():
- logger.info("拉流进程停止异常, requestId: {}", requestId)
- raise Exception("拉流进程异常停止")
- # 检查是否获取到视频信息
- event_result = getNoBlockQueue(eventQueue)
- if event_result:
- cmdStr = event_result.get("command")
- if "stop_ex" == cmdStr:
- logger.info("离线任务开始停止, requestId: {}", requestId)
- pullProcess.sendCommand({"command": 'stop_image_hb'})
- feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
- self._analyse_type,
- ExceptionType.NO_RESOURCES.value[0],
- ExceptionType.NO_RESOURCES.value[1],
- analyse_time=TimeUtils.now_date_to_str())}
- self.waitPullStream(pullProcess)
- break
- if "stop" == cmdStr:
- logger.info("离线任务开始停止分析, requestId: {}", requestId)
- pullProcess.sendCommand({"command": 'stop_pull_stream'})
- frames = []
- status = None
- if task_frame is not None:
- frames, status = task_frame.result()
- task_frame = tt.submit(buildFrame, pull_queue, logo, enable_add_water, frame_score,
- model_array, task_status, ttt, requestId)
- if len(frames) == 0 and status is None:
- time.sleep(0.5)
- continue
- if len(frames) > 0:
- check = False
- for result in t.map(process, frames):
- if result is not None:
- frame_result, frame_merge, det_xywh = result
- write_ai_video_result = tt.submit(write_ai_video, frame_merge, aiFilePath,
- ai_video_file, frame_result[3],
- frame_result[4],
- requestId)
- push_p = push_video_stream(frame_merge, push_p, push_url,
- frame_result[3], frame_result[4], p_push_array,
- requestId)
- ai_video_file = write_ai_video_result.result()
- # frame_all, frame_merge = result
- # push_stream_result = tt.submit(cv2tool.push_stream, frame_merge)
- # write_ai_video_result = tt.submit(cv2tool.video_ai_write, frame_merge)
- # res = [push_stream_result, write_ai_video_result]
- # completed_results = wait(res, timeout=600, return_when=ALL_COMPLETED)
- # completed_futures = completed_results.done
- # for r in completed_futures:
- # if r.exception():
- # raise r.exception()
- if len(det_xywh) > 0:
- check = True
- putQueue(image_queue, {"image": (frame_result, det_xywh)}, requestId)
- if frame_result[2] % 400 == 0 and frame_result[2] <= frame_result[5]:
- task_process = str(format(float(frame_result[2]) / float(frame_result[5]),
- '.4f'))
- putQueue(hb_queue, {"hb_value": task_process}, requestId)
- if check:
- task_status[1] = 1
- if not check:
- task_status[1] = 0
- if status is None:
- continue
- if status[0] == "1":
- raise ServiceException(status[1], status[2])
- elif status[0] == "2":
- close_all_p(push_p, None, ai_video_file, requestId)
- self.stop_task(pullProcess, AnalysisStatus.SUCCESS.value)
- break
- elif status[0] == "3":
- close_all_p(push_p, None, ai_video_file, requestId)
- self.stop_task(pullProcess, AnalysisStatus.TIMEOUT.value)
- break
- elif status[0] == "9":
- close_all_p(push_p, None, ai_video_file, requestId)
- logger.info("离线任务正常结束:requestId: {}", requestId)
- self.stop_task(pullProcess, AnalysisStatus.SUCCESS.value)
- break
- logger.info("离线进程任务完成,requestId:{}", self._msg.get("request_id"))
- except ServiceException as s:
- logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, self._msg.get("request_id"))
- feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
- AnalysisType.OFFLINE.value,
- s.code,
- s.msg,
- analyse_time=TimeUtils.now_date_to_str())}
- except Exception as e:
- logger.error("服务异常: {}, requestId:{}", format_exc(), requestId)
- feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
- AnalysisType.OFFLINE.value,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- analyse_time=TimeUtils.now_date_to_str())}
- finally:
- close_all_p(push_p, None, ai_video_file, requestId)
- if pullProcess is not None and pullProcess.is_alive():
- pullProcess.sendCommand({"command": "stop_image_hb"})
- self.waitPullStream(pullProcess)
- if feedback:
- putQueue(fb_queue, feedback, requestId)
- # 删除本地视频文件
- if self.__aiFilePath is not None and exists(self.__aiFilePath):
- logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", self.__aiFilePath, requestId)
- os.remove(self.__aiFilePath)
- logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", self.__aiFilePath, requestId)
-
-
- '''
- 图片识别
- '''
-
-
- class PhotosIntelligentRecognitionProcess(IntelligentRecognitionProcess):
- __slots__ = ()
-
- def epidemic_prevention(self, imageUrl, mod, orc):
- try:
- model_conf, model_type_code = mod
- image = url2Array(imageUrl)
- model_param = model_conf[2]
- model_param[0] = image
- dataBack = MODEL_CONFIG[model_type_code][3](model_param)
- # [frame, device, model, par, img_type, requestId]
- img_type = model_param[4]
- allowedList = model_conf[1]
- if img_type == 'plate':
- carCode = ''
- if dataBack is None or dataBack.get("plateImage") is None or len(dataBack.get("plateImage")) == 0:
- result = orc.license_plate_recognition(image, self._msg.get("request_id"))
- score = ''
- if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0:
- logger.error("车牌识别为空: {}", result)
- carCode = ''
- else:
- for word in result.get("words_result"):
- if word is not None and word.get("number") is not None:
- if len(carCode) == 0:
- carCode = word.get("number")
- else:
- carCode = carCode + "," + word.get("number")
- else:
- result = orc.license_plate_recognition(dataBack.get("plateImage")[0], self._msg.get("request_id"))
- score = dataBack.get("plateImage")[1]
- if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0:
- result = orc.license_plate_recognition(image, self._msg.get("request_id"))
- if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0:
- logger.error("车牌识别为空: {}", result)
- carCode = ''
- else:
- for word in result.get("words_result"):
- if word is not None and word.get("number") is not None:
- if len(carCode) == 0:
- carCode = word.get("number")
- else:
- carCode = carCode + "," + word.get("number")
- else:
- for word in result.get("words_result"):
- if word is not None and word.get("number") is not None:
- if len(carCode) == 0:
- carCode = word.get("number")
- else:
- carCode = carCode + "," + word.get("number")
- if len(carCode) > 0:
- plate_result = {'type': str(3), 'modelCode': model_type_code, 'carUrl': imageUrl,
- 'carCode': carCode,
- 'score': score}
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- AnalysisType.IMAGE.value, "", "",
- '',
- imageUrl,
- imageUrl,
- str(model_type_code),
- str(3),
- TimeUtils.now_date_to_str(),
- json.dumps(plate_result))},
- self._msg.get("request_id"))
- if img_type == 'code':
- if dataBack is None or dataBack.get("type") is None:
- return
- # 行程码
- if dataBack.get("type") == 1 and 1 in allowedList:
- # 手机号
- if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
- phoneNumberRecognition = ''
- phone_score = ''
- else:
- phone = orc.universal_text_recognition(dataBack.get("phoneNumberImage")[0],
- self._msg.get("request_id"))
- phone_score = dataBack.get("phoneNumberImage")[1]
- if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
- logger.error("手机号识别为空: {}", phone)
- phoneNumberRecognition = ''
- else:
- phoneNumberRecognition = phone.get("words_result")
- if dataBack.get("cityImage") is None or len(dataBack.get("cityImage")) == 0:
- cityRecognition = ''
- city_score = ''
- else:
- city = orc.universal_text_recognition(dataBack.get("cityImage")[0], self._msg.get("request_id"))
- city_score = dataBack.get("cityImage")[1]
- if city is None or city.get("words_result") is None or len(city.get("words_result")) == 0:
- logger.error("城市识别为空: {}", city)
- cityRecognition = ''
- else:
- cityRecognition = city.get("words_result")
- if len(phoneNumberRecognition) > 0 or len(cityRecognition) > 0:
- trip_result = {'type': str(1),
- 'modelCode': model_type_code,
- 'imageUrl': imageUrl,
- 'phoneNumberRecognition': phoneNumberRecognition,
- 'phone_sorce': phone_score,
- 'cityRecognition': cityRecognition,
- 'city_score': city_score}
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- AnalysisType.IMAGE.value, "", "",
- '',
- imageUrl,
- imageUrl,
- str(model_type_code),
- str(1),
- TimeUtils.now_date_to_str(),
- json.dumps(trip_result))},
- self._msg.get("request_id"))
- if dataBack.get("type") == 2 and 2 in allowedList:
- if dataBack.get("nameImage") is None or len(dataBack.get("nameImage")) == 0:
- nameRecognition = ''
- name_score = ''
- else:
- name = orc.universal_text_recognition(dataBack.get("nameImage")[0], self._msg.get("request_id"))
- name_score = dataBack.get("nameImage")[1]
- if name is None or name.get("words_result") is None or len(name.get("words_result")) == 0:
- logger.error("名字识别为空: {}", name)
- nameRecognition = ''
- else:
- nameRecognition = name.get("words_result")
-
- if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0:
- phoneNumberRecognition = ''
- phone_score = ''
- else:
- phone = orc.universal_text_recognition(dataBack.get("phoneNumberImage")[0],
- self._msg.get("request_id"))
- phone_score = dataBack.get("phoneNumberImage")[1]
- if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0:
- logger.error("手机号识别为空: {}", phone)
- phoneNumberRecognition = ''
- else:
- phoneNumberRecognition = phone.get("words_result")
- if dataBack.get("hsImage") is None or len(dataBack.get("hsImage")) == 0:
- hsRecognition = ''
- hs_score = ''
- else:
- hs = orc.universal_text_recognition(dataBack.get("hsImage")[0], self._msg.get("request_id"))
- hs_score = dataBack.get("hsImage")[1]
- if hs is None or hs.get("words_result") is None or len(hs.get("words_result")) == 0:
- logger.error("核酸识别为空: {}", hs)
- hsRecognition = ''
- else:
- hsRecognition = hs.get("words_result")
- if len(nameRecognition) > 0 or len(phoneNumberRecognition) > 0 or len(hsRecognition) > 0:
- healthy_result = {'type': str(2),
- 'modelCode': model_type_code,
- 'imageUrl': imageUrl,
- 'color': dataBack.get("color"),
- 'nameRecognition': nameRecognition,
- 'name_score': name_score,
- 'phoneNumberRecognition': phoneNumberRecognition,
- 'phone_score': phone_score,
- 'hsRecognition': hsRecognition,
- 'hs_score': hs_score}
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- AnalysisType.IMAGE.value, "", "",
- '',
- imageUrl,
- imageUrl,
- str(model_type_code),
- str(2),
- TimeUtils.now_date_to_str(),
- json.dumps(healthy_result))},
- self._msg.get("request_id"))
- except ServiceException as s:
- raise s
- except Exception as e:
- logger.error("模型分析异常: {}, requestId: {}", format_exc(), self._msg.get("request_id"))
- raise e
-
- '''
- # 防疫模型
- '''
-
- def epidemicPrevention(self, imageUrls, mod, tt):
- orc = OcrBaiduSdk(self._base_dir)
- obj_list = []
- for imageUrl in imageUrls:
- obj = tt.submit(self.epidemic_prevention, imageUrl, mod, orc)
- obj_list.append(obj)
- completed_results = wait(obj_list, timeout=120, return_when=ALL_COMPLETED)
- completed_futures = completed_results.done
- for r in completed_futures:
- if r.exception():
- raise r.exception()
-
- def image_recognition(self, imageUrl, mod, aliyunOssSdk):
- try:
- frame_score = float(self._context["service"]["frame_score"])
- model_conf, model_type_code = mod
- model_param = model_conf[2]
- image = url2Array(imageUrl)
- model_param[0] = image
- label_method = MODEL_CONFIG[model_type_code][2]
- if label_method:
- label_method(int(image.shape[1]), model_param)
- # 调用AI模型
- p_result, timeOut = MODEL_CONFIG[model_type_code][3](model_param)
- if p_result is None or len(p_result) < 4:
- return
- if self._enable_add_water:
- image = add_water_pic(image, self._logo, self._msg.get("request_id"))
- ai_result_list = p_result[2]
- if ai_result_list is not None and len(ai_result_list) > 0:
- allowedList = model_conf[1]
- label_arraylist = model_param[4]
- rainbows = model_param[5]
- if allowedList is not None:
- det_xywh = {}
- for ai_result in ai_result_list:
- detect_targets_code = int(ai_result[0])
- score = ai_result[-1]
- # 如果检测目标在识别任务中,继续处理
- if detect_targets_code in allowedList and score >= frame_score:
- label_array = label_arraylist[detect_targets_code]
- color = rainbows[detect_targets_code]
- # [float(cls_c), xc,yc,w,h, float(conf_c)]
- # [ cls,[ (x0,y0),(x1,y1),(x2,y2),(x3,y3) ],score]
- if not isinstance(ai_result[1], (list, tuple, np.ndarray)):
- xc = int(ai_result[1])
- yc = int(ai_result[2])
- x2 = int(ai_result[3])
- y2 = int(ai_result[4])
- tl = (xc, yc)
- tr = (x2, yc)
- br = (x2, y2)
- bl = (xc, y2)
- box = [tl, tr, br, bl]
- else:
- box = ai_result[1]
- cd = det_xywh.get(detect_targets_code)
- if cd is None or len(cd) == 0:
- frame = copy.deepcopy(image)
- draw_painting_joint(box, frame, label_array, score, color, "leftTop")
- det_xywh[detect_targets_code] = {
- "modelCode": str(model_type_code),
- "detectTargetCode": str(detect_targets_code),
- "frame": frame
- }
- else:
- frame = cd.get("frame")
- draw_painting_joint(box, frame, label_array, score, color, "leftTop")
- det_xywh[detect_targets_code]['frame'] = frame
- for target_list in list(det_xywh.keys()):
- if det_xywh.get(target_list) is not None and len(det_xywh.get(target_list)) > 0:
- ai_result, ai_image = cv2.imencode(".jpg", det_xywh.get(target_list).get("frame"))
- ai_image_name = self.build_image_name(det_xywh.get(target_list).get("detectTargetCode"))
- aliyunOssSdk.sync_upload_file(ai_image_name, ai_image.tobytes())
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- self._analyse_type, "", "",
- '',
- imageUrl,
- ai_image_name,
- str(model_type_code),
- det_xywh.get(target_list).get(
- "detectTargetCode"),
- TimeUtils.now_date_to_str())},
- self._msg.get("request_id"))
- except ServiceException as s:
- raise s
- except Exception as e:
- logger.error("模型分析异常: {}, requestId: {}", format_exc(), self._msg.get("request_id"))
- raise e
-
- def publicIdentification(self, imageUrls, mod, aliyunOssSdk, tt):
- obj_list = []
- for imageUrl in imageUrls:
- obj = tt.submit(self.image_recognition, imageUrl, mod, aliyunOssSdk)
- obj_list.append(obj)
- completed_results = wait(obj_list, timeout=120, return_when=ALL_COMPLETED)
- completed_futures = completed_results.done
- for r in completed_futures:
- if r.exception():
- raise r.exception()
-
- '''
- 1. imageUrls: 图片url数组,多张图片
- 2. mod: 模型对象
- 3. model_type_code: 模型编号
- 4. t: 线程池对象
- 5. aliyunOssSdk: 阿里云sdk
- '''
-
- def baiduRecognition(self, imageUrls, mod, tt, ttt, aliyunOssSdk):
- obj_list = []
- for imageUrl in imageUrls:
- obj = tt.submit(self.baidu_recognition, imageUrl, mod, aliyunOssSdk, ttt)
- obj_list.append(obj)
- completed_results = wait(obj_list, timeout=120, return_when=ALL_COMPLETED)
- completed_futures = completed_results.done
- for r in completed_futures:
- if r.exception():
- raise r.exception()
-
- def baidu_recognition(self, imageUrl, mod, aliyunOssSdk, ttt):
- try:
- # (modeType, allowedList, model_param, rainbows)
- model_conf, model_type_code = mod
- allowedList = model_conf[1]
- rainbows = model_conf[3]
- # 检查检测目标
- if allowedList is None or len(allowedList) == 0:
- raise ServiceException(ExceptionType.THE_DETECTION_TARGET_CANNOT_BE_EMPTY.value[0],
- ExceptionType.THE_DETECTION_TARGET_CANNOT_BE_EMPTY.value[1])
- # 图片转数组
- img = url2Array(imageUrl)
- base_dir = self._base_dir
- fontsize = int(img.shape[1] / 1920 * 40)
- names = [VehicleEnum.CAR.value[1], VehicleEnum.TRICYCLE.value[1], VehicleEnum.MOTORBIKE.value[1],
- VehicleEnum.CARPLATE.value[1], VehicleEnum.TRUCK.value[1], VehicleEnum.BUS.value[1]]
- label_array = get_label_arrays(names, rainbows, fontSize=fontsize,
- fontPath=join(base_dir, FONTPATH))
- person = ['人']
- person_label = get_label_arrays(person, rainbows, fontSize=fontsize,
- fontPath=join(base_dir, FONTPATH))
- obj_list = []
- for target in allowedList:
- reuslt = ttt.submit(self.baidu_method, mod, target, imageUrl, img, aliyunOssSdk, label_array,
- person_label)
- obj_list.append(reuslt)
- completed_results = wait(obj_list, timeout=120, return_when=ALL_COMPLETED)
- completed_futures = completed_results.done
- for r in completed_futures:
- if r.exception():
- raise r.exception()
- except ServiceException as s:
- raise s
- except Exception as e:
- logger.error("百度AI分析异常: {}, requestId: {}", format_exc(), self._msg.get("request_id"))
- raise e
-
- def build_image_name(self, target):
- random_num = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
- time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S")
- image_format = "{base_dir}/{time_now}_frame-{current_frame}-{last_frame}_type_{random_num}-{mode_type}" \
- "-{target}-{requestId}_{image_type}.jpg"
- image_name = image_format.format(
- base_dir=self._msg.get("results_base_dir"),
- time_now=time_now,
- current_frame=str(0),
- last_frame=str(0),
- random_num=random_num,
- mode_type=AnalysisType.IMAGE.value,
- target=target,
- requestId=self._msg.get("request_id"),
- image_type="AI")
- return image_name
-
- def baidu_method(self, mod, target, imageUrl, img, aliyunOssSdk, label_arrays, person_label):
- # (modeType, allowedList, model_param, rainbows)
- model_conf, model_type_code = mod
- model_param = model_conf[2]
- model_param[0] = target
- model_param[1] = imageUrl
- rainbows = model_conf[3]
- # [target, url, aipImageClassifyClient, aipBodyAnalysisClient, requestId]
- result = MODEL_CONFIG[model_type_code][3](model_param)
- if target == BaiduModelTarget.VEHICLE_DETECTION.value[1] and result is not None:
- vehicleInfo = result.get("vehicle_info")
- if vehicleInfo is not None and len(vehicleInfo) > 0:
- aiIamge = None
- copy_frame = img.copy()
- for i, info in enumerate(vehicleInfo):
- value = VehicleEnumVALUE.get(info.get("type"))
- target_num = value.value[2]
- label_array = label_arrays[target_num]
- color = rainbows[target_num]
- if value is None:
- logger.error("车辆识别出现未支持的目标类型!type:{}, requestId:{}", info.get("type"),
- self._msg.get("request_id"))
- return
- left_top = (int(info.get("location").get("left")), int(info.get("location").get("top")))
- right_top = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
- int(info.get("location").get("top")))
- right_bottom = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
- int(info.get("location").get("top")) + int(info.get("location").get("height")))
- left_bottom = (int(info.get("location").get("left")),
- int(info.get("location").get("top")) + int(info.get("location").get("height")))
- box = [left_top, right_top, right_bottom, left_bottom]
- score = "%.2f" % info.get("probability")
- if self._enable_add_water:
- copy_frame = add_water_pic(copy_frame, self._logo, self._msg.get("request_id"))
- draw_painting_joint(box, copy_frame, label_array, float(score), color, "leftTop")
- aiIamge = copy_frame
- info["id"] = str(i)
- if aiIamge is not None and len(aiIamge) > 0:
- ai_image_name = self.build_image_name(target)
- ai_result, ai_image = cv2.imencode(".jpg", aiIamge)
- aliyunOssSdk.sync_upload_file(ai_image_name, ai_image.tobytes())
- result["type"] = str(target)
- result["modelCode"] = model_type_code
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- AnalysisType.IMAGE.value, "", "",
- '',
- imageUrl,
- ai_image_name,
- str(model_type_code),
- str(target),
- TimeUtils.now_date_to_str(),
- json.dumps(result))},
- self._msg.get("request_id"))
-
- # 人体识别
- if target == BaiduModelTarget.HUMAN_DETECTION.value[1] and result is not None:
- personInfo = result.get("person_info")
- personNum = result.get("person_num")
- if personNum is not None and personNum > 0 and personInfo is not None and len(personInfo) > 0:
- personImage = None
- copy_frame = img.copy()
- for i, info in enumerate(personInfo):
- left_top = (int(info.get("location").get("left")), int(info.get("location").get("top")))
- right_top = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
- int(info.get("location").get("top")))
- right_bottom = (int(info.get("location").get("left")) + int(info.get("location").get("width")),
- int(info.get("location").get("top")) + int(info.get("location").get("height")))
- left_bottom = (int(info.get("location").get("left")),
- int(info.get("location").get("top")) + int(info.get("location").get("height")))
- box = [left_top, right_top, right_bottom, left_bottom]
- score = "%.2f" % info.get("location").get("score")
- if self._enable_add_water:
- copy_frame = add_water_pic(copy_frame, self._logo, self._msg.get("request_id"))
- draw_painting_joint(box, copy_frame, person_label[0], float(score), rainbows[0], "leftTop")
- personImage = copy_frame
- info["id"] = str(i)
- if personImage is not None and len(personImage) > 0:
- ai_image_name = self.build_image_name(target)
- ai_result, ai_image = cv2.imencode(".jpg", personImage)
- aliyunOssSdk.sync_upload_file(ai_image_name, ai_image.tobytes())
- result["type"] = str(target)
- result["modelCode"] = model_type_code
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- AnalysisType.IMAGE.value, "", "",
- '',
- imageUrl,
- ai_image_name,
- str(model_type_code),
- str(target),
- TimeUtils.now_date_to_str(),
- json.dumps(result))},
- self._msg.get("request_id"))
- # 人流量
- if target == BaiduModelTarget.PEOPLE_COUNTING.value[1] and result is not None:
- base64Image = result.get("image")
- if base64Image is not None and len(base64Image) > 0:
- baiduImage = base64.b64decode(base64Image)
- ai_image_name = self.build_image_name(target)
- aliyunOssSdk.sync_upload_file(ai_image_name, baiduImage)
- result["type"] = str(target)
- result["modelCode"] = model_type_code
- del result["image"]
- putQueue(self._fbQueue, {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.RUNNING.value,
- AnalysisType.IMAGE.value, "", "",
- '',
- imageUrl,
- ai_image_name,
- str(model_type_code),
- str(target),
- TimeUtils.now_date_to_str(),
- json.dumps(result))},
- self._msg.get("request_id"))
-
- def run(self):
- with ThreadPoolExecutor(max_workers=5) as t:
- with ThreadPoolExecutor(max_workers=10) as tt:
- with ThreadPoolExecutor(max_workers=5) as ttt:
- try:
- requestId = self._msg.get("request_id")
- # 初始化日志
- init_log(self._base_dir)
- model_array = self.get_model()
- imageUrls = self._msg.get("image_urls")
- aliyunOssSdk = AliyunOssSdk(self._base_dir, requestId)
- aliyunOssSdk.get_oss_bucket()
- task_list = []
- for model in model_array:
- # 百度模型逻辑
- if model[1] == ModelType.BAIDU_MODEL.value[1]:
- result = t.submit(self.baiduRecognition, imageUrls, model, tt, ttt, aliyunOssSdk)
- task_list.append(result)
- # 防疫模型
- elif model[1] == ModelType.EPIDEMIC_PREVENTION_MODEL.value[1]:
- result = t.submit(self.epidemicPrevention, imageUrls, model, tt)
- task_list.append(result)
- # 车牌模型
- elif model[1] == ModelType.PLATE_MODEL.value[1]:
- result = t.submit(self.epidemicPrevention, imageUrls, model, tt)
- task_list.append(result)
- else:
- result = t.submit(self.publicIdentification, imageUrls, model, aliyunOssSdk, tt)
- task_list.append(result)
- if len(task_list) > 0:
- completed_results = wait(task_list, timeout=120, return_when=ALL_COMPLETED)
- completed_futures = completed_results.done
- for r in completed_futures:
- if r.exception():
- raise r.exception()
- logger.info("图片进程任务完成,requestId:{}", self._msg.get("request_id"))
- putQueue(self._fbQueue,
- {"feedback": message_feedback(self._msg.get("request_id"),
- AnalysisStatus.SUCCESS.value,
- AnalysisType.IMAGE.value,
- progress=success_progess,
- analyse_time=TimeUtils.now_date_to_str())},
- self._msg.get("request_id"))
- except ServiceException as s:
- logger.error("图片分析异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg,
- self._msg.get("request_id"))
- putQueue(self._fbQueue,
- {"feedback": message_feedback(self._msg.get("request_id"), AnalysisStatus.FAILED.value,
- self._analyse_type,
- s.code,
- s.msg,
- analyse_time=TimeUtils.now_date_to_str())},
- self._msg.get("request_id"))
- except Exception as e:
- logger.error("图片分析异常: {}, requestId:{}", format_exc(),
- self._msg.get("request_id"))
- putQueue(self._fbQueue,
- {"feedback": message_feedback(self._msg.get("request_id"), AnalysisStatus.FAILED.value,
- self._analyse_type,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- analyse_time=TimeUtils.now_date_to_str())},
- self._msg.get("request_id"))
-
-
- '''
- 录屏进程
- '''
-
-
- class ScreenRecordingProcess(Process):
- __slots__ = [
- '_fbQueue',
- '_eventQueue',
- '_hbQueue',
- '_pullQueue',
- '_context',
- '_msg',
- '_pic',
- '_service_timeout',
- '_orFilePath',
- '_analysisType',
- '_base_dir',
- '_gpu_name'
- ]
-
- def __init__(self, param):
- super().__init__()
- # Param(self.__fbQueue, msg, analysisType, self.__base_dir, self.__context, self.__gpu_name)
- ######################################### 初始化变量 #########################################
- self._fbQueue = param.fbqueue
- self.eventQueue = Queue()
- self._hbQueue = Queue()
- self.pullQueue = Queue(200)
- self._context = param.context
- self._msg = param.msg
- self._analysisType = param.analyse_type
- self._base_dir = param.base_dir
- self._gpu_name = param.gpu_name
- self._pic = PictureWaterMark(requestId=self._msg.get("request_id"))
- self._service_timeout = self._context["service"]["timeout"]
- random_time = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF)
- self._orFilePath = "%s%s%s%s%s" % (self._context["video"]["file_path"], random_time, "_or_",
- self._msg.get("request_id"), ".mp4")
- self.sendhbMessage(RecordingStatus.RECORDING_WAITING.value[0])
-
- def clearPullQueue(self):
- while True:
- if self.pullQueue.qsize() > 0:
- self.getPullQueue()
- else:
- break
-
- def sendEvent(self, eBody):
- self.eventQueue.put(eBody)
-
- # 获取下一个事件
- def getEvent(self):
- eBody = None
- try:
- eBody = self.eventQueue.get(block=False)
- return eBody
- except Exception as e:
- pass
- return eBody
-
- def getPullQueue(self):
- eBody = None
- try:
- eBody = self.pullQueue.get(block=False)
- return eBody
- except Exception as e:
- pass
- return eBody
-
- # 推送执行结果
- def sendResult(self, result):
- self._fbQueue.put(result)
-
- def sendhbMessage(self, status, error_code="", error_msg="", recording_video_url=""):
- self.sendResult({"recording": recording_feedback(self._msg.get("request_id"),
- status,
- error_code,
- error_msg,
- recording_video_url)})
-
- def start_pull_stream_thread(self):
- pullThread = RecordingPullStreamThread(self._msg, self._context, self.pullQueue, self._fbQueue)
- pullThread.setDaemon(True)
- pullThread.start()
- return pullThread
-
- # 停止任务方法
- def stop_task(self, hb, status):
- if not os.path.exists(self._orFilePath):
- logger.error("原视频不存在!requestId:{}", self._msg.get("request_id"))
- raise ServiceException(ExceptionType.OR_VIDEO_DO_NOT_EXEIST_EXCEPTION.value[0],
- ExceptionType.OR_VIDEO_DO_NOT_EXEIST_EXCEPTION.value[1])
- aliyunVodSdk = ThAliyunVodSdk(self._base_dir, self._msg.get("request_id"), self._context["dsp"]["active"])
- upload_video_thread_or = Common(self._context, aliyunVodSdk.get_play_url, self._orFilePath,
- "or_recording_%s" % self._msg.get("request_id"))
- upload_video_thread_or.setDaemon(True)
- upload_video_thread_or.start()
- or_play_url = upload_video_thread_or.get_result()
- if or_play_url is None:
- logger.error("原视频上传VOD失败!原视频播放地址:{}, requestId: {}", or_play_url, self._msg.get("request_id"))
- raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
- ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
- hb.sendHbQueue({"command": "stop"})
- hb.join(20)
- self.sendhbMessage(status, "", "", or_play_url)
-
- def recordingFrame(self, cv2tool):
- frames = []
- status = None
- for i in range(self.pullQueue.qsize()):
- frame_result = self.getPullQueue()
- if frame_result is None:
- continue
- if frame_result.get("status") == '4':
- cv2tool.getFrameConfig(int(frame_result.get("fps")), int(frame_result.get("width")),
- int(frame_result.get("height")))
- frames.append((frame_result))
- else:
- status = frame_result
- return frames, status
-
- def start_hb_thread(self):
- hb = RecordingHeartbeat(self._fbQueue, self._hbQueue, self._msg.get("request_id"))
- hb.setDaemon(True)
- hb.start()
- return hb
-
- def run(self):
- cv2tool = None
- recording = None
- pullThread = None
- status = True
- hb = None
- try:
- # 程序开始时间
- init_log(self._base_dir)
- pullThread = self.start_pull_stream_thread()
- cv2tool = Cv2Util(orFilePath=self._orFilePath, requestId=self._msg.get("request_id"))
- hb = self.start_hb_thread()
- task_frame = None
- with ThreadPoolExecutor(max_workers=6) as t:
- while True:
- if status and not pullThread.is_alive():
- logger.info("录屏拉流线程停止异常, requestId: {}", self._msg.get("request_id"))
- raise Exception("录屏拉流线程异常停止")
- if not hb.is_alive():
- logger.info("录屏心跳线程异常停止, requestId: {}", self._msg.get("request_id"))
- raise Exception("录屏心跳线程异常停止")
- eBody = self.getEvent()
- if eBody is not None and len(eBody) > 0:
- cmdStr = eBody.get("command")
- # 接收到停止指令
- if 'stop' == cmdStr:
- logger.info("录屏任务开始停止, requestId: {}", self._msg.get("request_id"))
- pullThread.sendCommand({"command": "stop"})
- frames = []
- status = None
- if task_frame is not None:
- frames, status = task_frame.result()
- task_frame = t.submit(self.recordingFrame, cv2tool)
- if len(frames) == 0 and status is None:
- time.sleep(0.02)
- continue
- if frames is not None and len(frames) > 0:
- for frame in frames:
- cv2tool.video_or_write(frame[0].get("frame"))
- if status is None:
- continue
- if status.get("status") == "1":
- raise ServiceException(status.get("error").get("code"), status.get("error").get("msg"))
- elif status.get("status") == "2":
- cv2tool.close()
- self.stop_task(hb, RecordingStatus.RECORDING_SUCCESS.value[0])
- pullThread.join(10)
- break
- else:
- raise Exception("未知拉流状态异常!")
- logger.info("录屏线程任务完成,requestId:{}", self._msg.get("request_id"))
- except ServiceException as s:
- logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, self._msg.get("request_id"))
- recording = {"recording": recording_feedback(self._msg.get("request_id"),
- RecordingStatus.RECORDING_FAILED.value[0],
- s.code,
- s.msg)}
- except Exception:
- logger.error("服务异常: {}, requestId: {},", format_exc(), self._msg.get("request_id"))
- recording = {"recording": recording_feedback(self._msg.get("request_id"),
- RecordingStatus.RECORDING_FAILED.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])}
- finally:
- if cv2tool:
- cv2tool.close()
- if hb:
- hb.sendHbQueue({"command": "stop"})
- hb.join(20)
- self.clearPullQueue()
- if pullThread is not None and pullThread.is_alive():
- pullThread.sendCommand({"command": "stop"})
- pullThread.join(20)
- if recording:
- self.sendResult(recording)
- # 删除本地视频文件
- if self._orFilePath is not None and os.path.exists(self._orFilePath):
- logger.info("开始删除原视频, orFilePath: {}, requestId: {}", self._orFilePath,
- self._msg.get("request_id"))
- os.remove(self._orFilePath)
- logger.info("删除原视频成功, orFilePath: {}, requestId: {}", self._orFilePath,
- self._msg.get("request_id"))
|