From a82efd81e27db44ae7b92ecedc83f01cf62356e2 Mon Sep 17 00:00:00 2001 From: jiangchaoqing Date: Tue, 15 Jul 2025 10:36:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=95=B4=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 + concurrency/FileUploadThread_20250106.py | 305 ---- .../IntelligentRecognitionProcess_20250106.py | 1444 ----------------- config/logger/dsp_prod_logger.yml | 2 +- config/logger/dsp_test_logger.yml | 2 +- readme.md | 11 - t.txt => requirements.txt | 0 service/Dispatcher-raw.py | 507 ------ 8 files changed, 10 insertions(+), 2269 deletions(-) create mode 100644 README.md delete mode 100644 concurrency/FileUploadThread_20250106.py delete mode 100644 concurrency/IntelligentRecognitionProcess_20250106.py delete mode 100644 readme.md rename t.txt => requirements.txt (100%) delete mode 100644 service/Dispatcher-raw.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..cb3cdbc --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +1.2025.01.21把之前的tuoheng alg仓库代码重新开个仓库 (1)在config/service/dsp_test_service.yml里面添加参数,控制存储用的oss还是minio storage_source: 1 2.2025.02.06 (1)修改代码,把mqtt读取加入到系统中。config/service/dsp_test_service.yml,中添加mqtt_flag,决定是否启用。 (2)修改了minio情况下的,文件名命名方式。 3.2025.02.12 (1)增加了对alg算法开发的代码。可以通过配置文件config/service/dsp_test_service.yml中algSwitch: true,决定是否启用。 + +4、2025.07.10 周树亮 - 增加人群计数,自研车牌模型,裸土覆盖3个场景 + +5、江朝庆 -- 0715 +1)代码整理,删除冗余代码。 +2)增加requirements.txt,方便部署 +3) logs \ No newline at end of file diff --git a/concurrency/FileUploadThread_20250106.py b/concurrency/FileUploadThread_20250106.py deleted file mode 100644 index 5531f9d..0000000 --- a/concurrency/FileUploadThread_20250106.py +++ /dev/null @@ -1,305 +0,0 @@ -# -*- coding: utf-8 -*- -from concurrent.futures import ThreadPoolExecutor -from threading import Thread -from time import sleep, time -from traceback import format_exc - -from loguru import logger -import cv2 - -from entity.FeedBack import message_feedback -from enums.ExceptionEnum import ExceptionType -from exception.CustomerException import ServiceException -from util.AliyunSdk import AliyunOssSdk -from util.MinioSdk import MinioSdk -from util import TimeUtils -from enums.AnalysisStatusEnum import AnalysisStatus -from util.PlotsUtils import draw_painting_joint -from util.QueUtil import put_queue, get_no_block_queue, clear_queue -import io - -class FileUpload(Thread): - __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg') - - def __init__(self, *args): - super().__init__() - self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args - self._storage_source = self._context['service']['storage_source'] - -class ImageFileUpload(FileUpload): - __slots__ = () - - @staticmethod - def handle_image(frame_msg, frame_step): - # (high_score_image["code"], all_frames, draw_config["font_config"]) - # high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list) - det_xywh, frame, current_frame, all_frames, font_config = frame_msg - ''' - det_xywh:{ - 'code':{ - 1: [[detect_targets_code, box, score, label_array, color]] - } - } - 模型编号:modeCode - 检测目标:detectTargetCode - ''' - model_info = [] - # 更加模型编码解析数据 - for code, det_list in det_xywh.items(): - if len(det_list) > 0: - for cls, target_list in det_list.items(): - if len(target_list) > 0: - aFrame = frame.copy() - for target in target_list: - draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5]) - model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame}) - if len(model_info) > 0: - image_result = { - "or_frame": frame, - "model_info": model_info, - "current_frame": current_frame, - "last_frame": current_frame + frame_step - } - return image_result - return None - - def run(self): - msg, context = self._msg, self._context - service = context["service"] - base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"] - logger.info("启动图片上传线程, requestId: {}", request_id) - image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type - service_timeout = int(service["timeout"]) - frame_step = int(service["filter"]["frame_step"]) + 120 - try: - with ThreadPoolExecutor(max_workers=2) as t: - # 初始化oss客户端 - if self._storage_source==1: - minioSdk = MinioSdk(base_dir, env, request_id ) - else: - aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) - start_time = time() - while True: - try: - if time() - start_time > service_timeout: - logger.error("图片上传线程运行超时, requestId: {}", request_id) - break - raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0], - ExceptionType.TASK_EXCUTE_TIMEOUT.value[1]) - # 获取队列中的消息 - image_msg = get_no_block_queue(image_queue) - if image_msg is not None: - if image_msg[0] == 2: - if 'stop' == image_msg[1]: - logger.info("开始停止图片上传线程, requestId:{}", request_id) - break - if image_msg[0] == 1: - image_result = self.handle_image(image_msg[1], frame_step) - if image_result is not None: - task = [] - or_image = cv2.imencode(".jpg", image_result["or_frame"])[1] - or_image_name = build_image_name(image_result["current_frame"], - image_result["last_frame"], - analyse_type, - "OR", "0", "0", request_id) - if self._storage_source==1: - or_future = t.submit(minioSdk.put_object, or_image,or_image_name) - else: - or_future = t.submit(aliyunOssSdk.put_object, or_image_name, or_image.tobytes()) - task.append(or_future) - model_info_list = image_result["model_info"] - msg_list = [] - for model_info in model_info_list: - ai_image = cv2.imencode(".jpg", model_info["aFrame"])[1] - ai_image_name = build_image_name(image_result["current_frame"], - image_result["last_frame"], - analyse_type, - "AI", - model_info["modelCode"], - model_info["detectTargetCode"], - request_id) - if self._storage_source==1: - ai_future = t.submit(minioSdk.put_object, ai_image, - ai_image_name) - else: - ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, - ai_image.tobytes()) - - task.append(ai_future) - msg_list.append(message_feedback(request_id, - AnalysisStatus.RUNNING.value, - analyse_type, "", "", "", - or_image_name, - ai_image_name, - model_info['modelCode'], - model_info['detectTargetCode'])) - for tk in task: - tk.result() - for msg in msg_list: - put_queue(fb_queue, msg, timeout=2, is_ex=False) - del task, msg_list - else: - sleep(1) - del image_msg - except Exception: - logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id) - finally: - logger.info("停止图片上传线程0, requestId:{}", request_id) - clear_queue(image_queue) - logger.info("停止图片上传线程1, requestId:{}", request_id) - - -def build_image_name(*args): - """ - {requestId}/{time_now}_frame-{current_frame}-{last_frame}_type_{random_num}-{mode_type}" \ - "-{modeCode}-{target}_{image_type}.jpg - """ - current_frame, last_frame, mode_type, image_type, modeCode, target, request_id = args - random_num = TimeUtils.now_date_to_str(TimeUtils.YMDHMSF) - time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S") - return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame, - random_num, mode_type, modeCode, target, image_type) - - -class ImageTypeImageFileUpload(Thread): - __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg') - - def __init__(self, *args): - super().__init__() - self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args - self._storage_source = self._context['service']['storage_source'] - @staticmethod - def handle_image(det_xywh, copy_frame, font_config): - """ - det_xywh:{ - 'code':{ - 1: [[detect_targets_code, box, score, label_array, color]] - } - } - 模型编号:modeCode - 检测目标:detectTargetCode - """ - model_info = [] - # 更加模型编码解析数据 - for code, det_info in det_xywh.items(): - if det_info is not None and len(det_info) > 0: - for cls, target_list in det_info.items(): - if target_list is not None and len(target_list) > 0: - aiFrame = copy_frame.copy() - for target in target_list: - draw_painting_joint(target[1], aiFrame, target[3], target[2], target[4], font_config) - model_info.append({ - "modelCode": str(code), - "detectTargetCode": str(cls), - "frame": aiFrame - }) - if len(model_info) > 0: - image_result = { - "or_frame": copy_frame, - "model_info": model_info, - "current_frame": 0, - "last_frame": 0 - } - return image_result - return None - - def run(self): - context, msg = self._context, self._msg - base_dir, env, request_id = context["base_dir"], context["env"], msg["request_id"] - logger.info("启动图片识别图片上传线程, requestId: {}", request_id) - image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type - service_timeout = int(context["service"]["timeout"]) - with ThreadPoolExecutor(max_workers=2) as t: - try: - # 初始化oss客户端 - if self._storage_source==1: - minioSdk = MinioSdk(base_dir, env, request_id ) - else: - aliyunOssSdk = AliyunOssSdk(base_dir, env, request_id) - - start_time = time() - while True: - try: - if time() - start_time > service_timeout: - logger.error("图片上传进程运行超时, requestId: {}", request_id) - break - # 获取队列中的消息 - image_msg = image_queue.get() - if image_msg is not None: - if image_msg[0] == 2: - if 'stop' == image_msg[1]: - logger.info("开始停止图片上传线程, requestId:{}", request_id) - break - if image_msg[0] == 1: - task, msg_list = [], [] - det_xywh, image_url, copy_frame, font_config, result = image_msg[1] - if det_xywh is None: - ai_image_name = build_image_name(0, 0, analyse_type, "AI", result.get("modelCode"), - result.get("type"), request_id) - - if self._storage_source==1: - ai_future = t.submit(minioSdk.put_object, copy_frame,ai_image_name) - else: - ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, copy_frame) - - task.append(ai_future) - msg_list.append(message_feedback(request_id, - AnalysisStatus.RUNNING.value, - analyse_type, "", "", "", - image_url, - ai_image_name, - result.get("modelCode"), - result.get("type"), - analyse_results=result)) - else: - image_result = self.handle_image(det_xywh, copy_frame, font_config) - if image_result: - # 图片帧数编码 - if image_url is None: - or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame")) - image_url = build_image_name(image_result.get("current_frame"), - image_result.get("last_frame"), - analyse_type, - "OR", "0", "O", request_id) - - if self._storage_source==1: - or_future = t.submit(minioSdk.put_object, or_image,image_url) - else: - or_future = t.submit(aliyunOssSdk.put_object, image_url, - or_image.tobytes()) - task.append(or_future) - model_info_list = image_result.get("model_info") - for model_info in model_info_list: - ai_result, ai_image = cv2.imencode(".jpg", model_info.get("frame")) - ai_image_name = build_image_name(image_result.get("current_frame"), - image_result.get("last_frame"), - analyse_type, - "AI", - model_info.get("modelCode"), - model_info.get("detectTargetCode"), - request_id) - if self._storage_source==1: - ai_future = t.submit(minioSdk.put_object, ai_image, ai_image_name) - else: - ai_future = t.submit(aliyunOssSdk.put_object, ai_image_name, - ai_image.tobytes()) - task.append(ai_future) - msg_list.append(message_feedback(request_id, - AnalysisStatus.RUNNING.value, - analyse_type, "", "", "", - image_url, - ai_image_name, - model_info.get('modelCode'), - model_info.get('detectTargetCode'), - analyse_results=result)) - for thread_result in task: - thread_result.result() - for msg in msg_list: - put_queue(fb_queue, msg, timeout=2, is_ex=False) - else: - sleep(1) - except Exception as e: - logger.error("图片上传异常:{}, requestId:{}", format_exc(), request_id) - finally: - clear_queue(image_queue) - logger.info("停止图片识别图片上传线程, requestId:{}", request_id) diff --git a/concurrency/IntelligentRecognitionProcess_20250106.py b/concurrency/IntelligentRecognitionProcess_20250106.py deleted file mode 100644 index 3e56ce0..0000000 --- a/concurrency/IntelligentRecognitionProcess_20250106.py +++ /dev/null @@ -1,1444 +0,0 @@ -# -*- coding: utf-8 -*- -import base64 -import os -from concurrent.futures import ThreadPoolExecutor -from os.path import join, exists, getsize -from time import time, sleep -from traceback import format_exc - -import cv2 - -from multiprocessing import Process, Queue - -import numpy as np -from loguru import logger - -from common.Constant import init_progess, success_progess -from concurrency.FileUploadThread import ImageTypeImageFileUpload -from concurrency.HeartbeatThread import Heartbeat - -from concurrency.PullVideoStreamProcess import OnlinePullVideoStreamProcess, OfflinePullVideoStreamProcess -from concurrency.PushVideoStreamProcess import OnPushStreamProcess, OffPushStreamProcess - -from util.GPUtils import check_gpu_resource -from util.LogUtils import init_log -from concurrency.CommonThread import Common -from concurrency.PullStreamThread import RecordingPullStreamThread -from concurrency.RecordingHeartbeatThread import RecordingHeartbeat -from enums.AnalysisStatusEnum import AnalysisStatus -from enums.AnalysisTypeEnum import AnalysisType -from enums.ExceptionEnum import ExceptionType -from enums.ModelTypeEnum import ModelType -from enums.RecordingStatusEnum import RecordingStatus -from util.AliyunSdk import ThAliyunVodSdk -from util.CpuUtils import check_cpu -from util.Cv2Utils import write_or_video, push_video_stream, close_all_p -from entity.FeedBack import message_feedback, recording_feedback -from exception.CustomerException import ServiceException -from util.ImageUtils import url2Array, add_water_pic -from util.ModelUtils import MODEL_CONFIG -from util.OcrBaiduSdk import OcrBaiduSdk - -from enums.BaiduSdkEnum import VehicleEnumVALUE -from enums.ModelTypeEnum import BaiduModelTarget -from util.PlotsUtils import xywh2xyxy2 -from util.QueUtil import put_queue, get_no_block_queue, clear_queue -from util.TimeUtils import now_date_to_str, YMDHMSF -from util.CpuUtils import print_cpu_status -import inspect -class IntelligentRecognitionProcess(Process): - __slots__ = ('_fb_queue', '_msg', '_analyse_type', '_context', 'event_queue', '_pull_queue', '_hb_queue', - "_image_queue", "_push_queue", '_push_ex_queue') - - def __init__(self, *args): - super().__init__() - # 入参 - self._fb_queue, self._msg, self._analyse_type, self._context = args - # 初始化参数 - self.event_queue, self._pull_queue, self._hb_queue, self._image_queue, self._push_queue, self._push_ex_queue = \ - Queue(), Queue(10), Queue(), Queue(), Queue(), Queue() - - # 发送waitting消息 - put_queue(self._fb_queue, message_feedback(self._msg["request_id"], AnalysisStatus.WAITING.value, - self._analyse_type, progress=init_progess), timeout=2, is_ex=True) - self._storage_source = self._context['service']['storage_source'] - - def sendEvent(self, eBody): - put_queue(self.event_queue, eBody, timeout=2, is_ex=True) - - def clear_queue(self): - clear_queue(self.event_queue) - clear_queue(self._pull_queue) - clear_queue(self._hb_queue) - clear_queue(self._image_queue) - clear_queue(self._push_queue) - clear_queue(self._push_ex_queue) - - @staticmethod - def build_video_path(context, msg, is_build_or=True): - random_time = now_date_to_str(YMDHMSF) - pre_path = '%s/%s%s' % (context["base_dir"], context["video"]["file_path"], random_time) - end_path = '%s%s' % (msg["request_id"], ".mp4") - if is_build_or: - context["orFilePath"] = '%s%s%s' % (pre_path, "_on_or_", end_path) - context["aiFilePath"] = '%s%s%s' % (pre_path, "_on_ai_", end_path) - - @staticmethod - def start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context): - hb_thread = Heartbeat(fb_queue, hb_queue, request_id, analyse_type, context) - hb_thread.setDaemon(True) - hb_thread.start() - return hb_thread - - - - - -class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): - __slots__ = () - - @staticmethod - def start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context): - pushProcess = OnPushStreamProcess(msg, push_queue, image_queue, push_ex_queue, hb_queue, context) - pushProcess.daemon = True - pushProcess.start() - return pushProcess - - @staticmethod - def start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, frame_num): - pullProcess = OnlinePullVideoStreamProcess(msg, context, fb_queue, pull_queue, image_queue, analyse_type, - frame_num) - pullProcess.daemon = True - pullProcess.start() - return pullProcess - - - def upload_video(self,base_dir, env, request_id, orFilePath, aiFilePath): - if self._storage_source==1: - minioSdk = MinioSdk(base_dir, env, request_id ) - upload_video_thread_or = Common(minioSdk.put_object, orFilePath, "%s/or_online.mp4" % request_id) - upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "%s/ai_online.mp4" % request_id) - else: - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_or = Common(aliyunVodSdk.get_play_url, orFilePath, "or_online_%s" % request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - - - - upload_video_thread_or.setDaemon(True) - upload_video_thread_ai.setDaemon(True) - upload_video_thread_or.start() - upload_video_thread_ai.start() - or_url = upload_video_thread_or.get_result() - ai_url = upload_video_thread_ai.get_result() - return or_url, ai_url - ''' - @staticmethod - def upload_video(base_dir, env, request_id, orFilePath, aiFilePath): - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_or = Common(aliyunVodSdk.get_play_url, orFilePath, "or_online_%s" % request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - upload_video_thread_or.setDaemon(True) - upload_video_thread_ai.setDaemon(True) - upload_video_thread_or.start() - upload_video_thread_ai.start() - or_url = upload_video_thread_or.get_result() - ai_url = upload_video_thread_ai.get_result() - return or_url, ai_url - ''' - - @staticmethod - def ai_normal_dtection(model, frame, request_id): - model_conf, code = model - retResults = MODEL_CONFIG[code][3]([model_conf, frame, request_id])[0] - if type(retResults) is np.ndarray or len(retResults) == 0: - ret = retResults - if type(retResults) is np.ndarray: - ret = retResults.tolist() - else: - ret = retResults[2] - return code, ret - - @staticmethod - def obj_det(self, model_array, frame, task_status, cframe, tt, request_id): - push_obj = [] - if task_status[1] == 1: - dtection_result = [] - for model in model_array: - result = tt.submit(self.ai_normal_dtection, model, frame, request_id) - dtection_result.append(result) - for d in dtection_result: - code, det_r = d.result() - if len(det_r) > 0: - push_obj.append((code, det_r)) - if len(push_obj) == 0: - task_status[1] = 0 - if task_status[1] == 0: - if cframe % 30 == 0: - dtection_result1 = [] - for model in model_array: - result = tt.submit(self.ai_normal_dtection, model, frame, request_id) - dtection_result1.append(result) - for d in dtection_result1: - code, det_r = d.result() - if len(det_r) > 0: - push_obj.append((code, det_r)) - if len(push_obj) > 0: - task_status[1] = 1 - return push_obj - - @staticmethod - def checkPT(start_time, service_timeout, pull_process, push_process, hb_thread, push_ex_queue, pull_queue, - request_id): - if time() - start_time > service_timeout: - logger.error("任务执行超时, requestId: {}", request_id) - raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0], - ExceptionType.TASK_EXCUTE_TIMEOUT.value[1]) - if pull_process is not None and not pull_process.is_alive(): - while True: - if pull_queue.empty() or pull_queue.qsize() == 0: - break - pull_result = get_no_block_queue(pull_queue) - if pull_result is not None and pull_result[0] == 1: - raise ServiceException(pull_result[1], pull_result[2]) - logger.info("拉流进程异常停止, requestId: {}", request_id) - raise Exception("拉流进程异常停止!") - if hb_thread is not None and not hb_thread.is_alive(): - logger.info("心跳线程异常停止, requestId: {}", request_id) - raise Exception("心跳线程异常停止!") - if push_process is not None and not push_process.is_alive(): - while True: - if push_ex_queue.empty() or push_ex_queue.qsize() == 0: - break - push_result = get_no_block_queue(push_ex_queue) - if push_result is not None and push_result[0] == 1: - raise ServiceException(push_result[1], push_result[2]) - logger.info("推流进程异常停止, requestId: {}", request_id) - raise Exception("推流进程异常停止!") - - def run(self): - msg, context, analyse_type = self._msg, self._context, self._analyse_type - self.build_video_path(context, msg) - request_id = msg["request_id"] - base_dir, env = context["base_dir"], context["env"] - service_timeout = int(context["service"]["timeout"]) - ex = None - # 拉流进程、推流进程、心跳线程 - pull_process, push_process, hb_thread = None, None, None - - # 事件队列、拉流队列、心跳队列、反馈队列 - event_queue, pull_queue, hb_queue, fb_queue = self.event_queue, self._pull_queue, self._hb_queue, self._fb_queue - - # 推流队列、推流异常队列、图片队列 - push_queue, push_ex_queue, image_queue = self._push_queue, self._push_ex_queue, self._image_queue - try: - # 初始化日志 - init_log(base_dir, env) - # 打印启动日志 - logger.info("开始启动实时分析进程!requestId: {}", request_id) - - # 启动拉流进程(包含拉流线程, 图片上传线程,mqtt读取线程) - # 拉流进程初始化时间长, 先启动 - pull_process = self.start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, 25) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0, - # 启动心跳线程 - hb_thread = self.start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context) - - - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #7.0, - # 加载算法模型 - model_array = get_model(msg, context, analyse_type) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #9.5 - # 启动推流进程 - push_process = self.start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) - # 第一个参数: 模型是否初始化 0:未初始化 1:初始化 - # 第二个参数: 检测是否有问题 0: 没有问题, 1: 有问题 - task_status = [0, 0] - draw_config = {} - start_time = time() - # 识别2个线程性能最优 - with ThreadPoolExecutor(max_workers=2) as t: - # 可能使用模型组合, 模型组合最多3个模型, 1对3, 上面的2个线程对应6个线程 - with ThreadPoolExecutor(max_workers=6) as tt: - while True: - # 检查拉流进程是否正常, 心跳线程是否正常 - self.checkPT(start_time, service_timeout, pull_process, push_process, hb_thread, push_ex_queue, - pull_queue, request_id) - # 检查推流是否异常 - push_status = get_no_block_queue(push_ex_queue) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) #9.5,11.2 - if push_status is not None and push_status[0] == 1: - raise ServiceException(push_status[1], push_status[2]) - # 获取停止指令 - event_result = get_no_block_queue(event_queue) - - if event_result: - cmdStr = event_result.get("command") - - # 接收到停止指令 - if "stop" == cmdStr: - logger.info("实时任务开始停止, requestId: {}", request_id) - pull_process.sendCommand({"command": 'stop'}) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) - pull_result = get_no_block_queue(pull_queue) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) - if pull_result is None: - sleep(1) - continue - # (4, (frame_list, frame_index_list, all_frames)) - if pull_result[0] == 4: - frame_list, frame_index_list, all_frames = pull_result[1] - if len(frame_list) > 0: - # 判断是否已经初始化 - if task_status[0] == 0: - task_status[0] = 1 - for i, model in enumerate(model_array): - model_conf, code = model - model_param = model_conf[1] - # (modeType, model_param, allowedList, names, rainbows) - MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0], - model_conf) - if draw_config.get("font_config") is None: - draw_config["font_config"] = model_param['font_config'] - if draw_config.get(code) is None: - draw_config[code] = {} - draw_config[code]["allowedList"] = model_conf[2] - draw_config[code]["rainbows"] = model_conf[4] - draw_config[code]["label_arrays"] = model_param['label_arraylist'] - if "label_dict" in model_param: - draw_config[code]["label_dict"] = model_param['label_dict'] - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) - # 多线程并发处理, 经过测试两个线程最优 - det_array = [] - for i, frame in enumerate(frame_list): - det_result = t.submit(self.obj_det, self, model_array, frame, task_status, - frame_index_list[i], tt, request_id) - det_array.append(det_result) - push_objs = [det.result() for det in det_array] - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) - put_queue(push_queue, - (1, (frame_list, frame_index_list, all_frames, draw_config, push_objs)), - timeout=2, is_ex=True) - #print_cpu_status(requestId=request_id,lineNum=inspect.currentframe().f_lineno) - del det_array, push_objs - del frame_list, frame_index_list, all_frames - elif pull_result[0] == 1: - # 拉流发生异常 - put_queue(push_queue, (2, 'stop_ex'), timeout=1, is_ex=True) - push_process.join(120) - pull_process.sendCommand({"command": 'stop'}) - pull_process.join(120) - raise ServiceException(pull_result[1], pull_result[2]) - elif pull_result[0] == 2: - put_queue(push_queue, (2, 'stop'), timeout=1, is_ex=True) - push_process.join(120) - pull_process.sendCommand({"command": 'stop'}) - pull_process.join(120) - break - else: - raise Exception("未知拉流状态异常!") - except ServiceException as s: - logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id) - ex = s.code, s.msg - except Exception: - logger.error("服务异常: {}, requestId: {},", format_exc(), request_id) - ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] - finally: - orFilePath, aiFilePath = context["orFilePath"], context["aiFilePath"] - base_dir, env = context["base_dir"], context["env"] - or_url, ai_url, exc = "", "", None - try: - # 如果拉流进程存在, 关闭拉流进程(拉流线程、图片上传线程) - if push_process and push_process.is_alive(): - put_queue(push_queue, (2, 'stop_ex'), timeout=1) - logger.info("关闭推流进程, requestId:{}", request_id) - push_process.join(timeout=120) - logger.info("关闭推流进程1, requestId:{}", request_id) - if pull_process and pull_process.is_alive(): - pull_process.sendCommand({"command": 'stop_ex'}) - pull_process.sendCommand({"command": 'stop'}) - logger.info("关闭拉流进程, requestId:{}", request_id) - pull_process.join(timeout=120) - logger.info("关闭拉流进程1, requestId:{}", request_id) - if exists(orFilePath) and exists(aiFilePath) and getsize(orFilePath) > 100: - or_url, ai_url = self.upload_video(base_dir, env, request_id, orFilePath, aiFilePath) - if or_url is None or ai_url is None: - logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", request_id) - raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0], - ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1]) - # 停止心跳线程 - if hb_thread and hb_thread.is_alive(): - put_queue(hb_queue, {"command": "stop"}, timeout=1) - hb_thread.join(timeout=120) - if exists(orFilePath): - logger.info("开始删除原视频, orFilePath: {}, requestId: {}", orFilePath, request_id) - os.remove(orFilePath) - logger.info("删除原视频成功, orFilePath: {}, requestId: {}", orFilePath, request_id) - if exists(aiFilePath): - logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, request_id) - os.remove(aiFilePath) - logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, request_id) - # 如果有异常, 检查是否有原视频和AI视频,有则上传,响应失败 - if ex: - code, msg = ex - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, - analyse_type, - error_code=code, - error_msg=msg, - video_url=or_url, - ai_video_url=ai_url), timeout=2, is_ex=False) - else: - if or_url is None or len(or_url) == 0 or ai_url is None or len(ai_url) == 0: - raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0], - ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1]) - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.SUCCESS.value, - analyse_type, - progress=success_progess, - video_url=or_url, - ai_video_url=ai_url), timeout=2, is_ex=False) - - except ServiceException as s: - logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id) - exc = s.code, s.msg - except Exception: - logger.error("服务异常: {}, requestId: {},", format_exc(), request_id) - exc = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] - finally: - if push_process and push_process.is_alive(): - put_queue(push_queue, (2, 'stop_ex'), timeout=1) - logger.info("关闭推流进程, requestId:{}", request_id) - push_process.join(timeout=120) - logger.info("关闭推流进程1, requestId:{}", request_id) - if pull_process and pull_process.is_alive(): - pull_process.sendCommand({"command": 'stop_ex'}) - pull_process.sendCommand({"command": 'stop'}) - logger.info("关闭拉流进程, requestId:{}", request_id) - pull_process.join(timeout=120) - logger.info("关闭拉流进程1, requestId:{}", request_id) - if exc: - code, msg = exc - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, - analyse_type, - error_code=code, - error_msg=msg, - video_url=or_url, - ai_video_url=ai_url), timeout=2, is_ex=False) - logger.info("清理队列, requestId:{}", request_id) - self.clear_queue() - logger.info("清理队列完成, requestId:{}", request_id) - - -class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): - __slots__ = () - - def upload_video(self,base_dir, env, request_id, aiFilePath): - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - - if self._storage_source==1: - minioSdk = MinioSdk(base_dir, env, request_id ) - upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "%s/ai_online.mp4" % request_id) - else: - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - - upload_video_thread_ai.setDaemon(True) - upload_video_thread_ai.start() - ai_url = upload_video_thread_ai.get_result() - return ai_url - - ''' - @staticmethod - def upload_video(base_dir, env, request_id, aiFilePath): - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - upload_video_thread_ai.setDaemon(True) - upload_video_thread_ai.start() - ai_url = upload_video_thread_ai.get_result() - return ai_url - ''' - @staticmethod - def ai_normal_dtection(model, frame, request_id): - model_conf, code = model - retResults = MODEL_CONFIG[code][3]([model_conf, frame, request_id])[0] - if type(retResults) is np.ndarray or len(retResults) == 0: - ret = retResults - if type(retResults) is np.ndarray: - ret = retResults.tolist() - else: - ret = retResults[2] - return code, ret - - @staticmethod - def obj_det(self, model_array, frame, task_status, cframe, tt, request_id): - push_obj = [] - if task_status[1] == 1: - dtection_result = [] - for model in model_array: - result = tt.submit(self.ai_normal_dtection, model, frame, request_id) - dtection_result.append(result) - for d in dtection_result: - code, det_r = d.result() - if len(det_r) > 0: - push_obj.append((code, det_r)) - if len(push_obj) == 0: - task_status[1] = 0 - if task_status[1] == 0: - if cframe % 30 == 0: - dtection_result1 = [] - for model in model_array: - result = tt.submit(self.ai_normal_dtection, model, frame, request_id) - dtection_result1.append(result) - for d in dtection_result1: - code, det_r = d.result() - if len(det_r) > 0: - push_obj.append((code, det_r)) - if len(push_obj) > 0: - task_status[1] = 1 - return push_obj - - @staticmethod - def start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context): - pushProcess = OffPushStreamProcess(msg, push_queue, image_queue, push_ex_queue, hb_queue, context) - pushProcess.daemon = True - pushProcess.start() - return pushProcess - - @staticmethod - def start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, frame_num): - pullProcess = OfflinePullVideoStreamProcess(msg, context, fb_queue, pull_queue, image_queue, analyse_type, - frame_num) - pullProcess.daemon = True - pullProcess.start() - return pullProcess - - @staticmethod - def checkPT(service_timeout, start_time, pull_process, push_process, hb_thread, push_ex_queue, pull_queue, - request_id): - if time() - start_time > service_timeout: - logger.error("任务执行超时, requestId: {}", request_id) - raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0], - ExceptionType.TASK_EXCUTE_TIMEOUT.value[1]) - if pull_process is not None and not pull_process.is_alive(): - while True: - if pull_queue.empty() or pull_queue.qsize() == 0: - break - pull_result = get_no_block_queue(pull_queue) - if pull_result is not None and pull_result[0] == 1: - raise ServiceException(pull_result[1], pull_result[2]) - logger.info("拉流进程异常停止, requestId: {}", request_id) - raise Exception("拉流进程异常停止!") - if hb_thread is not None and not hb_thread.is_alive(): - logger.info("心跳线程异常停止, requestId: {}", request_id) - raise Exception("心跳线程异常停止!") - if push_process is not None and not push_process.is_alive(): - while True: - if push_ex_queue.empty() or push_ex_queue.qsize() == 0: - break - push_result = get_no_block_queue(push_ex_queue) - if push_result is not None and push_result[0] == 1: - raise ServiceException(push_result[1], push_result[2]) - logger.info("推流进程异常停止, requestId: {}", request_id) - raise Exception("推流进程异常停止!") - - def run(self): - msg, context, analyse_type, ex = self._msg, self._context, self._analyse_type, None - self.build_video_path(context, msg, is_build_or=False) - request_id, base_dir, env = msg["request_id"], context["base_dir"], context["env"] - # 拉流进程、推流进程、心跳线程 - pull_process, push_process, hb_thread = None, None, None - service_timeout = int(context["service"]["timeout"]) - # 事件队列、拉流队列、心跳队列、反馈队列 - event_queue, pull_queue, hb_queue, fb_queue = self.event_queue, self._pull_queue, self._hb_queue, self._fb_queue - # 推流队列、推流异常队列、图片队列 - push_queue, push_ex_queue, image_queue = self._push_queue, self._push_ex_queue, self._image_queue - try: - # 初始化日志 - init_log(base_dir, env) - # 打印启动日志 - logger.info("开始启动离线分析进程!requestId: {}", request_id) - # 启动拉流进程(包含拉流线程, 图片上传线程) - # 拉流进程初始化时间长, 先启动 - pull_process = self.start_pull_stream(msg, context, fb_queue, pull_queue, image_queue, analyse_type, 25) - # 启动心跳线程 - hb_thread = self.start_heartbeat(fb_queue, hb_queue, request_id, analyse_type, context) - # 加载算法模型 - model_array = get_model(msg, context, analyse_type) - # 启动推流进程 - push_process = self.start_push_stream(msg, push_queue, image_queue, push_ex_queue, hb_queue, context) - # 第一个参数: 模型是否初始化 0:未初始化 1:初始化 - # 第二个参数: 检测是否有问题 0: 没有问题, 1: 有问题 - task_status = [0, 0] - draw_config = {} - start_time = time() - # 识别2个线程性能最优 - with ThreadPoolExecutor(max_workers=2) as t: - # 可能使用模型组合, 模型组合最多3个模型, 1对3, 上面的2个线程对应6个线程 - with ThreadPoolExecutor(max_workers=6) as tt: - while True: - # 检查拉流进程是否正常, 心跳线程是否正常 - self.checkPT(service_timeout, start_time, pull_process, push_process, hb_thread, push_ex_queue, - pull_queue, request_id) - # 检查推流是否异常 - push_status = get_no_block_queue(push_ex_queue) - if push_status is not None and push_status[0] == 1: - raise ServiceException(push_status[1], push_status[2]) - # 获取停止指令 - event_result = get_no_block_queue(event_queue) - if event_result: - cmdStr = event_result.get("command") - # 接收到停止指令 - if "stop" == cmdStr: - logger.info("离线任务开始停止, requestId: {}", request_id) - pull_process.sendCommand({"command": 'stop'}) - pull_result = get_no_block_queue(pull_queue) - if pull_result is None: - sleep(1) - continue - # (4, (frame_list, frame_index_list, all_frames)) - if pull_result[0] == 4: - frame_list, frame_index_list, all_frames = pull_result[1] - if len(frame_list) > 0: - # 判断是否已经初始化 - if task_status[0] == 0: - task_status[0] = 1 - for i, model in enumerate(model_array): - model_conf, code = model - model_param = model_conf[1] - # (modeType, model_param, allowedList, names, rainbows) - MODEL_CONFIG[code][2](frame_list[0].shape[1], frame_list[0].shape[0], - model_conf) - if draw_config.get("font_config") is None: - draw_config["font_config"] = model_param['font_config'] - if draw_config.get(code) is None: - draw_config[code] = {} - draw_config[code]["allowedList"] = model_conf[2] - draw_config[code]["rainbows"] = model_conf[4] - draw_config[code]["label_arrays"] = model_param['label_arraylist'] - if "label_dict" in model_param: - draw_config[code]["label_dict"] = model_param['label_dict'] - det_array = [] - for i, frame in enumerate(frame_list): - det_result = t.submit(self.obj_det, self, model_array, frame, task_status, - frame_index_list[i], tt, request_id) - det_array.append(det_result) - push_objs = [det.result() for det in det_array] - put_queue(push_queue, - (1, (frame_list, frame_index_list, all_frames, draw_config, push_objs)), - timeout=2, is_ex=True) - del det_array, push_objs - del frame_list, frame_index_list, all_frames - elif pull_result[0] == 1: - put_queue(push_queue, (2, 'stop_ex'), timeout=1, is_ex=True) - logger.info("关闭推流进程, requestId:{}", request_id) - push_process.join(timeout=120) - logger.info("关闭推流进程1, requestId:{}", request_id) - raise ServiceException(pull_result[1], pull_result[2]) - elif pull_result[0] == 2: - logger.info("离线任务开始停止, requestId: {}", request_id) - put_queue(push_queue, (2, 'stop'), timeout=1, is_ex=True) - push_process.join(120) - pull_process.sendCommand({"command": 'stop'}) - pull_process.join(120) - break - else: - raise Exception("未知拉流状态异常!") - except ServiceException as s: - logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id) - ex = s.code, s.msg - except Exception: - logger.error("服务异常: {}, requestId: {},", format_exc(), request_id) - ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] - finally: - base_dir, env, aiFilePath = context["base_dir"], context["env"], context["aiFilePath"] - ai_url, exc = "", None - try: - if push_process and push_process.is_alive(): - put_queue(push_queue, (2, 'stop_ex'), timeout=1) - push_process.join(timeout=120) - if pull_process and pull_process.is_alive(): - pull_process.sendCommand({"command": 'stop_ex'}) - pull_process.sendCommand({"command": 'stop'}) - pull_process.join(timeout=120) - if exists(aiFilePath) and getsize(aiFilePath) > 100: - ai_url = self.upload_video(base_dir, env, request_id, aiFilePath) - if ai_url is None: - logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", request_id) - raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0], - ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1]) - # 停止心跳线程 - if hb_thread and hb_thread.is_alive(): - put_queue(hb_queue, {"command": "stop"}, timeout=1) - hb_thread.join(timeout=120) - if exists(aiFilePath): - logger.info("开始删除AI视频, aiFilePath: {}, requestId: {}", aiFilePath, request_id) - os.remove(aiFilePath) - logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", aiFilePath, request_id) - # 如果有异常, 检查是否有原视频和AI视频,有则上传,响应失败 - if ex: - code, msg = ex - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, - analyse_type, - error_code=code, - error_msg=msg, - ai_video_url=ai_url), timeout=2, is_ex=False) - else: - if ai_url is None or len(ai_url) == 0: - raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0], - ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1]) - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.SUCCESS.value, - analyse_type, - progress=success_progess, - ai_video_url=ai_url), timeout=2, is_ex=False) - - except ServiceException as s: - logger.exception("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, request_id) - exc = s.code, s.msg - except Exception: - logger.error("服务异常: {}, requestId: {},", format_exc(), request_id) - exc = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] - finally: - if push_process and push_process.is_alive(): - put_queue(push_queue, (2, 'stop_ex'), timeout=1) - push_process.join(timeout=120) - if pull_process and pull_process.is_alive(): - pull_process.sendCommand({"command": 'stop_ex'}) - pull_process.sendCommand({"command": 'stop'}) - pull_process.join(timeout=120) - if hb_thread and hb_thread.is_alive(): - put_queue(hb_queue, {"command": "stop"}, timeout=1) - hb_thread.join(timeout=120) - if exc: - code, msg = exc - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, - analyse_type, - error_code=code, - error_msg=msg, - ai_video_url=ai_url), timeout=2, is_ex=False) - self.clear_queue() - - -''' -图片识别 -''' - - -class PhotosIntelligentRecognitionProcess(Process): - __slots__ = ("_fb_queue", "_msg", "_analyse_type", "_context", "_image_queue") - - def __init__(self, *args): - super().__init__() - self._fb_queue, self._msg, self._analyse_type, self._context = args - self._image_queue = Queue() - put_queue(self._fb_queue, message_feedback(self._msg["request_id"], AnalysisStatus.WAITING.value, - self._analyse_type, progress=init_progess), timeout=2, is_ex=True) - self.build_logo(self._msg, self._context) - self._storage_source = self._context['service']['storage_source'] - - @staticmethod - def build_logo(msg, context): - logo = None - if context["video"]["video_add_water"]: - logo = msg.get("logo_url") - if logo is not None and len(logo) > 0: - logo = url2Array(logo, enable_ex=False) - if logo is None: - logo = cv2.imread(join(context['base_dir'], "image/logo.png"), -1) - context['logo'] = logo - - def epidemic_prevention(self, imageUrl, model, orc, request_id): - try: - # modeType, allowedList, new_device, model, par, img_type - model_conf, code = model - modeType, allowedList, new_device, model, par, img_type = model_conf - image = url2Array(imageUrl) - param = [image, new_device, model, par, img_type, request_id] - dataBack = MODEL_CONFIG[code][3](param) - if img_type == 'plate': - carCode = '' - if dataBack is None or dataBack.get("plateImage") is None or len(dataBack.get("plateImage")) == 0: - result = orc.license_plate_recognition(image, request_id) - score = '' - if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0: - logger.error("车牌识别为空: {}", result) - carCode = '' - else: - for word in result.get("words_result"): - if word is not None and word.get("number") is not None: - if len(carCode) == 0: - carCode = word.get("number") - else: - carCode = carCode + "," + word.get("number") - else: - result = orc.license_plate_recognition(dataBack.get("plateImage")[0], request_id) - score = dataBack.get("plateImage")[1] - if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0: - result = orc.license_plate_recognition(image, request_id) - if result is None or result.get("words_result") is None or len(result.get("words_result")) == 0: - logger.error("车牌识别为空: {}", result) - carCode = '' - else: - for word in result.get("words_result"): - if word is not None and word.get("number") is not None: - if len(carCode) == 0: - carCode = word.get("number") - else: - carCode = carCode + "," + word.get("number") - else: - for word in result.get("words_result"): - if word is not None and word.get("number") is not None: - if len(carCode) == 0: - carCode = word.get("number") - else: - carCode = carCode + "," + word.get("number") - if len(carCode) > 0: - plate_result = {'type': str(3), 'modelCode': code, 'carUrl': imageUrl, - 'carCode': carCode, - 'score': score} - put_queue(self._fb_queue, message_feedback(request_id, - AnalysisStatus.RUNNING.value, - AnalysisType.IMAGE.value, "", "", - '', - imageUrl, - imageUrl, - str(code), - str(3), - plate_result), timeout=2, is_ex=True) - if img_type == 'code': - if dataBack is None or dataBack.get("type") is None: - return - # 行程码 - if dataBack.get("type") == 1 and 1 in allowedList: - # 手机号 - if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0: - phoneNumberRecognition = '' - phone_score = '' - else: - phone = orc.universal_text_recognition(dataBack.get("phoneNumberImage")[0], request_id) - phone_score = dataBack.get("phoneNumberImage")[1] - if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0: - logger.error("手机号识别为空: {}", phone) - phoneNumberRecognition = '' - else: - phoneNumberRecognition = phone.get("words_result") - if dataBack.get("cityImage") is None or len(dataBack.get("cityImage")) == 0: - cityRecognition = '' - city_score = '' - else: - city = orc.universal_text_recognition(dataBack.get("cityImage")[0], request_id) - city_score = dataBack.get("cityImage")[1] - if city is None or city.get("words_result") is None or len(city.get("words_result")) == 0: - logger.error("城市识别为空: {}", city) - cityRecognition = '' - else: - cityRecognition = city.get("words_result") - if len(phoneNumberRecognition) > 0 or len(cityRecognition) > 0: - trip_result = {'type': str(1), - 'modelCode': code, - 'imageUrl': imageUrl, - 'phoneNumberRecognition': phoneNumberRecognition, - 'phone_sorce': phone_score, - 'cityRecognition': cityRecognition, - 'city_score': city_score} - put_queue(self._fb_queue, message_feedback(request_id, - AnalysisStatus.RUNNING.value, - AnalysisType.IMAGE.value, "", "", - '', - imageUrl, - imageUrl, - str(code), - str(1), - trip_result), timeout=2, is_ex=True) - if dataBack.get("type") == 2 and 2 in allowedList: - if dataBack.get("nameImage") is None or len(dataBack.get("nameImage")) == 0: - nameRecognition = '' - name_score = '' - else: - name = orc.universal_text_recognition(dataBack.get("nameImage")[0], request_id) - name_score = dataBack.get("nameImage")[1] - if name is None or name.get("words_result") is None or len(name.get("words_result")) == 0: - logger.error("名字识别为空: {}", name) - nameRecognition = '' - else: - nameRecognition = name.get("words_result") - - if dataBack.get("phoneNumberImage") is None or len(dataBack.get("phoneNumberImage")) == 0: - phoneNumberRecognition = '' - phone_score = '' - else: - phone = orc.universal_text_recognition(dataBack.get("phoneNumberImage")[0], request_id) - phone_score = dataBack.get("phoneNumberImage")[1] - if phone is None or phone.get("words_result") is None or len(phone.get("words_result")) == 0: - logger.error("手机号识别为空: {}", phone) - phoneNumberRecognition = '' - else: - phoneNumberRecognition = phone.get("words_result") - if dataBack.get("hsImage") is None or len(dataBack.get("hsImage")) == 0: - hsRecognition = '' - hs_score = '' - else: - hs = orc.universal_text_recognition(dataBack.get("hsImage")[0], request_id) - hs_score = dataBack.get("hsImage")[1] - if hs is None or hs.get("words_result") is None or len(hs.get("words_result")) == 0: - logger.error("核酸识别为空: {}", hs) - hsRecognition = '' - else: - hsRecognition = hs.get("words_result") - if len(nameRecognition) > 0 or len(phoneNumberRecognition) > 0 or len(hsRecognition) > 0: - healthy_result = {'type': str(2), - 'modelCode': code, - 'imageUrl': imageUrl, - 'color': dataBack.get("color"), - 'nameRecognition': nameRecognition, - 'name_score': name_score, - 'phoneNumberRecognition': phoneNumberRecognition, - 'phone_score': phone_score, - 'hsRecognition': hsRecognition, - 'hs_score': hs_score} - put_queue(self._fb_queue, message_feedback(request_id, - AnalysisStatus.RUNNING.value, - AnalysisType.IMAGE.value, "", "", - '', - imageUrl, - imageUrl, - str(code), - str(2), - healthy_result), timeout=2, is_ex=True) - except ServiceException as s: - raise s - except Exception as e: - logger.error("模型分析异常: {}, requestId: {}", format_exc(), request_id) - raise e - - ''' - # 防疫模型 - ''' - - def epidemicPrevention(self, imageUrls, model, base_dir, env, request_id): - with ThreadPoolExecutor(max_workers=2) as t: - orc = OcrBaiduSdk(base_dir, env) - obj_list = [] - for imageUrl in imageUrls: - obj = t.submit(self.epidemic_prevention, imageUrl, model, orc, request_id) - obj_list.append(obj) - for r in obj_list: - r.result(60) - - def image_recognition(self, imageUrl, mod, image_queue, logo, request_id): - try: - model_conf, code = mod - model_param = model_conf[1] - image = url2Array(imageUrl) - MODEL_CONFIG[code][2](image.shape[1], image.shape[0], model_conf) - p_result = MODEL_CONFIG[code][3]([model_conf, image, request_id])[0] - #print(' line872:p_result[2]:',p_result[2] ) - if p_result is None or len(p_result) < 3 or p_result[2] is None or len(p_result[2]) == 0: - return - if logo: - image = add_water_pic(image, logo, request_id) - # (modeType, model_param, allowedList, names, rainbows) - allowedList = model_conf[2] - label_arraylist = model_param['label_arraylist'] - font_config = model_param['font_config'] - rainbows = model_conf[4] - det_xywh = {code: {}} - ai_result_list = p_result[2] - for ai_result in ai_result_list: - box, score, cls = xywh2xyxy2(ai_result) - # 如果检测目标在识别任务中,继续处理 - if cls in allowedList: - label_array = label_arraylist[cls] - color = rainbows[cls] - cd = det_xywh[code].get(cls) - if cd is None: - det_xywh[code][cls] = [[cls, box, score, label_array, color]] - else: - det_xywh[code][cls].append([cls, box, score, label_array, color]) - #print('ai_result_list:{},allowlist:{}'.format(ai_result_list,allowedList )) - if len(det_xywh) > 0: - put_queue(image_queue, (1, (det_xywh, imageUrl, image, font_config, "")), timeout=2, is_ex=False) - except ServiceException as s: - raise s - except Exception as e: - logger.error("模型分析异常: {}, requestId: {}", format_exc(), self._msg.get("request_id")) - raise e - - def publicIdentification(self, imageUrls, mod, image_queue, logo, request_id): - with ThreadPoolExecutor(max_workers=2) as t: - obj_list = [] - for imageUrl in imageUrls: - obj = t.submit(self.image_recognition, imageUrl, mod, image_queue, logo, request_id) - obj_list.append(obj) - for r in obj_list: - r.result(60) - - ''' - 1. imageUrls: 图片url数组,多张图片 - 2. mod: 模型对象 - 3. image_queue: 图片队列 - ''' - - def baiduRecognition(self, imageUrls, mod, image_queue, logo, request_id): - with ThreadPoolExecutor(max_workers=2) as t: - thread_result = [] - for imageUrl in imageUrls: - obj = t.submit(self.baidu_recognition, imageUrl, mod, image_queue, logo, request_id) - thread_result.append(obj) - for r in thread_result: - r.result(60) - - def baidu_recognition(self, imageUrl, mod, image_queue, logo, request_id): - with ThreadPoolExecutor(max_workers=2) as t: - try: - # modeType, aipImageClassifyClient, aipBodyAnalysisClient, allowedList, rainbows, - # vehicle_names, person_names, requestId - model_conf, code = mod - allowedList = model_conf[3] - rainbows = model_conf[4] - # 图片转数组 - img = url2Array(imageUrl) - vehicle_label_arrays, person_label_arrays, font_config = MODEL_CONFIG[code][2](img.shape[1], - img.shape[0], - model_conf) - obj_list = [] - for target in allowedList: - parm = [target, imageUrl, model_conf[1], model_conf[2], request_id] - reuslt = t.submit(self.baidu_method, code, parm, img, image_queue, vehicle_label_arrays, - person_label_arrays, font_config, rainbows, logo) - obj_list.append(reuslt) - for r in obj_list: - r.result(60) - except ServiceException as s: - raise s - except Exception as e: - logger.error("百度AI分析异常: {}, requestId: {}", format_exc(), request_id) - raise e - - @staticmethod - def baidu_method(code, parm, img, image_queue, vehicle_label_arrays, person_label_arrays, font_config, - rainbows, logo): - # [target, url, aipImageClassifyClient, aipBodyAnalysisClient, requestId] - request_id = parm[4] - target = parm[0] - image_url = parm[1] - result = MODEL_CONFIG[code][3](parm) - if target == BaiduModelTarget.VEHICLE_DETECTION.value[1] and result is not None: - vehicleInfo = result.get("vehicle_info") - if vehicleInfo is not None and len(vehicleInfo) > 0: - det_xywh = {code: {}} - copy_frame = img.copy() - for i, info in enumerate(vehicleInfo): - value = VehicleEnumVALUE.get(info.get("type")) - target_num = value.value[2] - label_array = vehicle_label_arrays[target_num] - color = rainbows[target_num] - if value is None: - logger.error("车辆识别出现未支持的目标类型!type:{}, requestId:{}", info.get("type"), request_id) - return - left_top = (int(info.get("location").get("left")), int(info.get("location").get("top"))) - right_top = (int(info.get("location").get("left")) + int(info.get("location").get("width")), - int(info.get("location").get("top"))) - right_bottom = (int(info.get("location").get("left")) + int(info.get("location").get("width")), - int(info.get("location").get("top")) + int(info.get("location").get("height"))) - left_bottom = (int(info.get("location").get("left")), - int(info.get("location").get("top")) + int(info.get("location").get("height"))) - box = [left_top, right_top, right_bottom, left_bottom] - score = float("%.2f" % info.get("probability")) - if logo: - copy_frame = add_water_pic(copy_frame, logo, request_id) - if det_xywh[code].get(target) is None: - det_xywh[code][target] = [[target, box, score, label_array, color]] - else: - det_xywh[code][target].append([target, box, score, label_array, color]) - info["id"] = str(i) - if len(det_xywh[code]) > 0: - result["type"] = str(target) - result["modelCode"] = code - put_queue(image_queue, (1, (det_xywh, image_url, copy_frame, font_config, result)), timeout=2, - is_ex=True) - # 人体识别 - if target == BaiduModelTarget.HUMAN_DETECTION.value[1] and result is not None: - personInfo = result.get("person_info") - personNum = result.get("person_num") - if personNum is not None and personNum > 0 and personInfo is not None and len(personInfo) > 0: - det_xywh = {code: {}} - copy_frame = img.copy() - for i, info in enumerate(personInfo): - left_top = (int(info.get("location").get("left")), int(info.get("location").get("top"))) - right_top = (int(info.get("location").get("left")) + int(info.get("location").get("width")), - int(info.get("location").get("top"))) - right_bottom = (int(info.get("location").get("left")) + int(info.get("location").get("width")), - int(info.get("location").get("top")) + int(info.get("location").get("height"))) - left_bottom = (int(info.get("location").get("left")), - int(info.get("location").get("top")) + int(info.get("location").get("height"))) - box = [left_top, right_top, right_bottom, left_bottom] - score = float("%.2f" % info.get("location").get("score")) - label_array = person_label_arrays[0] - color = rainbows[0] - if logo: - copy_frame = add_water_pic(copy_frame, logo, request_id) - if det_xywh[code].get(target) is None: - det_xywh[code][target] = [[target, box, score, label_array, color]] - else: - det_xywh[code][target].append([target, box, score, label_array, color]) - info["id"] = str(i) - if len(det_xywh[code]) > 0: - result["type"] = str(target) - result["modelCode"] = code - put_queue(image_queue, (1, (det_xywh, image_url, copy_frame, font_config, result)), timeout=2) - # 人流量 - if target == BaiduModelTarget.PEOPLE_COUNTING.value[1] and result is not None: - base64Image = result.get("image") - if base64Image is not None and len(base64Image) > 0: - baiduImage = base64.b64decode(base64Image) - result["type"] = str(target) - result["modelCode"] = code - del result["image"] - put_queue(image_queue, (1, (None, image_url, baiduImage, None, result)), timeout=2) - - @staticmethod - def start_File_upload(fb_queue, context, msg, image_queue, analyse_type): - image_thread = ImageTypeImageFileUpload(fb_queue, context, msg, image_queue, analyse_type) - image_thread.setDaemon(True) - image_thread.start() - return image_thread - - def run(self): - fb_queue, msg, analyse_type, context = self._fb_queue, self._msg, self._analyse_type, self._context - request_id, logo, image_queue = msg["request_id"], context['logo'], self._image_queue - base_dir, env = context["base_dir"], context["env"] - imageUrls = msg["image_urls"] - image_thread = None - with ThreadPoolExecutor(max_workers=2) as t: - try: - init_log(base_dir, env) - logger.info("开始启动图片识别进程, requestId: {}", request_id) - model_array = get_model(msg, context, analyse_type) - image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type) - task_list = [] - for model in model_array: - # 百度模型逻辑 - if model[1] == ModelType.BAIDU_MODEL.value[1]: - result = t.submit(self.baiduRecognition, imageUrls, model, image_queue, logo, request_id) - task_list.append(result) - # 防疫模型 - elif model[1] == ModelType.EPIDEMIC_PREVENTION_MODEL.value[1]: - result = t.submit(self.epidemicPrevention, imageUrls, model, base_dir, env, request_id) - task_list.append(result) - # 车牌模型 - elif model[1] == ModelType.PLATE_MODEL.value[1]: - result = t.submit(self.epidemicPrevention, imageUrls, model, base_dir, env, request_id) - task_list.append(result) - else: - result = t.submit(self.publicIdentification, imageUrls, model, image_queue, logo, request_id) - task_list.append(result) - for r in task_list: - r.result(60) - if image_thread and not image_thread.is_alive(): - raise Exception("图片识别图片上传线程异常停止!!!") - if image_thread and image_thread.is_alive(): - put_queue(image_queue, (2, 'stop'), timeout=2) - image_thread.join(120) - logger.info("图片进程任务完成,requestId:{}", request_id) - put_queue(fb_queue, message_feedback(request_id, - AnalysisStatus.SUCCESS.value, - analyse_type, - progress=success_progess), timeout=2, is_ex=True) - except ServiceException as s: - logger.error("图片分析异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, request_id) - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, - analyse_type, - s.code, - s.msg), timeout=2) - except Exception: - logger.error("图片分析异常: {}, requestId:{}", format_exc(), request_id) - put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, - analyse_type, - ExceptionType.SERVICE_INNER_EXCEPTION.value[0], - ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=2) - finally: - if image_thread and image_thread.is_alive(): - clear_queue(image_queue) - put_queue(image_queue, (2, 'stop'), timeout=2) - image_thread.join(120) - clear_queue(image_queue) - - -class ScreenRecordingProcess(Process): - __slots__ = ('_fb_queue', '_context', '_msg', '_analysisType', '_event_queue', '_hb_queue', '_analysisType') - - def __init__(self, *args): - super().__init__() - # 传参 - self._fb_queue, self._context, self._msg, self._analysisType = args - self._event_queue, self._hb_queue, self._pull_queue = Queue(), Queue(), Queue(10) - put_queue(self._fb_queue, - recording_feedback(self._msg["request_id"], RecordingStatus.RECORDING_WAITING.value[0]), - timeout=1, is_ex=True) - self._storage_source = self._context['service']['storage_source'] - def sendEvent(self, result): - put_queue(self._event_queue, result, timeout=2, is_ex=True) - - @staticmethod - def start_pull_stream_thread(msg, context, pull_queue, hb_queue, fb_queue, frame_num): - pullThread = RecordingPullStreamThread(msg, context, pull_queue, hb_queue, fb_queue, frame_num) - pullThread.setDaemon(True) - pullThread.start() - return pullThread - - @staticmethod - def start_hb_thread(fb_queue, hb_queue, request_id): - hb = RecordingHeartbeat(fb_queue, hb_queue, request_id) - hb.setDaemon(True) - hb.start() - return hb - - @staticmethod - def check(start_time, service_timeout, pull_thread, hb_thread, request_id): - if time() - start_time > service_timeout: - logger.error("录屏超时, requestId: {}", request_id) - raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0], - ExceptionType.TASK_EXCUTE_TIMEOUT.value[1]) - - if pull_thread and not pull_thread.is_alive(): - logger.info("录屏拉流线程停止异常, requestId: {}", request_id) - raise Exception("录屏拉流线程异常停止") - if hb_thread and not hb_thread.is_alive(): - logger.info("录屏心跳线程异常停止, requestId: {}", request_id) - raise Exception("录屏心跳线程异常停止") - - def run(self): - msg, context = self._msg, self._context - request_id, push_url = msg['request_id'], msg.get('push_url') - pull_queue, fb_queue, hb_queue, event_queue = self._pull_queue, self._fb_queue, self._hb_queue, \ - self._event_queue - base_dir, env, service_timeout = context['base_dir'], context['env'], int(context["service"]["timeout"]) - pre_path, end_path = '%s/%s%s' % (base_dir, context["video"]["file_path"], now_date_to_str(YMDHMSF)), \ - '%s%s' % (request_id, ".mp4") - orFilePath = '%s%s%s' % (pre_path, "_on_or_", end_path) - pull_thread, hb_thread = None, None - or_write_status, p_push_status = [0, 0], [0, 0] - or_video_file, push_p = None, None - ex = None - try: - # 初始化日志 - init_log(base_dir, env) - # 启动拉流线程 - pull_thread = self.start_pull_stream_thread(msg, context, pull_queue, hb_queue, fb_queue, 25) - hb_thread = self.start_hb_thread(fb_queue, hb_queue, request_id) - start_time = time() - with ThreadPoolExecutor(max_workers=2) as t: - while True: - # 检查拉流线程和心跳线程 - self.check(start_time, service_timeout, pull_thread, hb_thread, request_id) - # 判断是否需要停止录屏 - event_result = get_no_block_queue(event_queue) - if event_result is not None: - cmdStr = event_result.get("command") - # 接收到停止指令 - if 'stop' == cmdStr: - logger.info("录屏任务开始停止, requestId: {}", request_id) - pull_thread.sendEvent({"command": "stop"}) - pull_result = get_no_block_queue(pull_queue) - if pull_result is None: - sleep(1) - continue - if pull_result[0] == 1: - close_all_p(push_p, or_video_file, None, request_id) - pull_thread.sendEvent({"command": "stop"}) - pull_thread.join(180) - raise ServiceException(pull_result[1], pull_result[2]) - elif pull_result[0] == 2: - close_all_p(push_p, or_video_file, None, request_id) - pull_thread.sendEvent({"command": "stop"}) - pull_thread.join(180) - break - elif pull_result[0] == 4: - frame_list, frame_index_list, all_frames = pull_result[1] - if len(frame_list) > 0: - for i, frame in enumerate(frame_list): - if frame_index_list[i] % 300 == 0 and frame_index_list[i] < all_frames: - task_process = "%.2f" % (float(frame_index_list[i]) / float(all_frames)) - put_queue(hb_queue, {"progress": task_process}, timeout=1) - write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file, - or_write_status, request_id) - if push_url is not None and len(push_url) > 0: - push_p_result = t.submit(push_video_stream, frame, push_p, push_url, p_push_status, - request_id) - push_p = push_p_result.result() - or_video_file = write_or_video_result.result() - else: - raise Exception("未知拉流状态异常!") - logger.info("录屏线程任务完成,requestId:{}", self._msg.get("request_id")) - except ServiceException as s: - logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, self._msg.get("request_id")) - ex = s.code, s.msg - except Exception: - logger.error("服务异常: {}, requestId: {},", format_exc(), self._msg.get("request_id")) - ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] - finally: - or_url = "" - exn = None - try: - # 关闭推流管道, 原视频写流客户端 - close_all_p(push_p, or_video_file, None, request_id) - # 关闭拉流线程 - if pull_thread and pull_thread.is_alive(): - pull_thread.sendEvent({"command": "stop_ex"}) - pull_thread.sendEvent({"command": "stop"}) - pull_thread.join(120) - # 判断是否需要上传视频 - if exists(orFilePath) and getsize(orFilePath) > 100: - or_url = self.upload_video(base_dir, env, request_id, orFilePath) - if or_url is None or len(or_url) == 0: - logger.error("原视频或AI视频播放上传VOD失败!, requestId: {}", request_id) - raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0], - ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1]) - # 停止心跳线程 - if hb_thread and hb_thread.is_alive(): - put_queue(hb_queue, {"command": "stop"}, timeout=10, is_ex=False) - hb_thread.join(timeout=120) - if exists(orFilePath): - logger.info("开始删除原视频, orFilePath: {}, requestId: {}", orFilePath, request_id) - os.remove(orFilePath) - logger.info("删除原视频成功, orFilePath: {}, requestId: {}", orFilePath, request_id) - # 如果有异常, 检查是否有原视频和AI视频,有则上传,响应失败 - if ex: - code, msg = ex - put_queue(fb_queue, recording_feedback(request_id, RecordingStatus.RECORDING_FAILED.value[0], - error_code=code, - error_msg=msg, - video_url=or_url), timeout=10, is_ex=False) - else: - if or_url is None or len(or_url) == 0: - raise ServiceException(ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[0], - ExceptionType.PUSH_STREAM_TIME_EXCEPTION.value[1]) - put_queue(fb_queue, recording_feedback(request_id, RecordingStatus.RECORDING_SUCCESS.value[0], - progress=success_progess, - video_url=or_url), timeout=10, is_ex=False) - except ServiceException as s: - exn = s.code, s.msg - except Exception: - logger.error("异常:{}, requestId: {}", format_exc(), request_id) - exn = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] - finally: - if pull_thread and pull_thread.is_alive(): - pull_thread.sendEvent({"command": "stop"}) - pull_thread.join(120) - if hb_thread and hb_thread.is_alive(): - put_queue(hb_queue, {"command": "stop"}, timeout=10, is_ex=False) - hb_thread.join(timeout=120) - self.clear_queue_end() - if exn: - code, msg = exn - put_queue(fb_queue, recording_feedback(request_id, RecordingStatus.RECORDING_FAILED.value[0], - error_code=code, - error_msg=msg, - video_url=or_url), timeout=10, is_ex=False) - - def clear_queue_end(self): - clear_queue(self._event_queue) - clear_queue(self._hb_queue) - clear_queue(self._pull_queue) - - - - - def upload_video(self,base_dir, env, request_id, orFilePath): - if self._storage_source==1: - minioSdk = MinioSdk(base_dir, env, request_id ) - upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "%s/ai_online.mp4" % request_id) - else: - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id) - - upload_video_thread_ai.setDaemon(True) - upload_video_thread_ai.start() - or_url = upload_video_thread_ai.get_result() - return or_url - ''' - @staticmethod - def upload_video(base_dir, env, request_id, orFilePath): - aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id) - upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, orFilePath, "or_online_%s" % request_id) - upload_video_thread_ai.setDaemon(True) - upload_video_thread_ai.start() - or_url = upload_video_thread_ai.get_result() - return or_url - ''' - -""" -"models": [{ - "code": "模型编号", - "categories":[{ - "id": "模型id", - "config": { - "k1": "v1", - "k2": "v2" - } - }] -}] -""" - - -def get_model(msg, context, analyse_type): - # 初始变量 - request_id, base_dir, gpu_name, env = msg["request_id"], context["base_dir"], context["gpu_name"], context["env"] - models, model_num_limit = msg["models"], context["service"]["model"]['limit'] - try: - # 实时、离线元组 - analyse_type_tuple = (AnalysisType.ONLINE.value, AnalysisType.OFFLINE.value) - # (实时、离线)检查模型组合, 目前只支持3个模型组合 - if analyse_type in analyse_type_tuple: - if len(models) > model_num_limit: - raise ServiceException(ExceptionType.MODEL_GROUP_LIMIT_EXCEPTION.value[0], - ExceptionType.MODEL_GROUP_LIMIT_EXCEPTION.value[1]) - modelArray, codeArray = [], set() - for model in models: - # 模型编码 - code = model["code"] - # 检验code是否重复 - if code in codeArray: - raise ServiceException(ExceptionType.MODEL_DUPLICATE_EXCEPTION.value[0], - ExceptionType.MODEL_DUPLICATE_EXCEPTION.value[1]) - codeArray.add(code) - # 检测目标数组 - needed_objectsIndex = list(set([int(category["id"]) for category in model["categories"]])) - logger.info("模型编号: {}, 检查目标: {}, requestId: {}", code, needed_objectsIndex, request_id) - model_method = MODEL_CONFIG.get(code) - if model_method is None: - logger.error("未匹配到对应的模型, requestId:{}", request_id) - raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0], - ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1]) - # 检查cpu资源、gpu资源 - check_cpu(base_dir, request_id) - gpu_ids = check_gpu_resource(request_id) - # 如果实时识别、离线识别 - if analyse_type in analyse_type_tuple: - if model["is_video"] == "1": - mod = model_method[0](gpu_ids[0], needed_objectsIndex, request_id, gpu_name, base_dir, env) - modelArray.append((mod.model_conf, code)) - else: - raise ServiceException(ExceptionType.MODEL_NOT_SUPPORT_VIDEO_EXCEPTION.value[0], - ExceptionType.MODEL_NOT_SUPPORT_VIDEO_EXCEPTION.value[1], - model_method[1].value[2]) - # 如果是图片识别 - if analyse_type == AnalysisType.IMAGE.value: - if model["is_image"] == "1": - mod = model_method[0](gpu_ids[0], needed_objectsIndex, request_id, gpu_name, base_dir, env) - modelArray.append((mod.model_conf, code)) - else: - raise ServiceException(ExceptionType.MODEL_NOT_SUPPORT_IMAGE_EXCEPTION.value[0], - ExceptionType.MODEL_NOT_SUPPORT_IMAGE_EXCEPTION.value[1], - model_method[1].value[2]) - if len(modelArray) == 0: - raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0], - ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[1]) - return modelArray - except ServiceException as s: - raise s - except Exception: - logger.error("模型配置处理异常: {}, request_id: {}", format_exc(), request_id) - raise ServiceException(ExceptionType.MODEL_LOADING_EXCEPTION.value[0], - ExceptionType.MODEL_LOADING_EXCEPTION.value[1]) diff --git a/config/logger/dsp_prod_logger.yml b/config/logger/dsp_prod_logger.yml index 1e8baed..838d39a 100644 --- a/config/logger/dsp_prod_logger.yml +++ b/config/logger/dsp_prod_logger.yml @@ -5,6 +5,6 @@ log_name: "dsp.log" log_fmt: "{time:YYYY-MM-DD HH:mm:ss.SSS} [{level}][{process.name}-{process.id}-{thread.name}-{thread.id}][{line}] {module}-{function} - {message}" level: "INFO" rotation: "00:00" -retention: "7 days" +retention: "15 days" encoding: "utf8" diff --git a/config/logger/dsp_test_logger.yml b/config/logger/dsp_test_logger.yml index 2a2e706..1e8baed 100644 --- a/config/logger/dsp_test_logger.yml +++ b/config/logger/dsp_test_logger.yml @@ -5,6 +5,6 @@ log_name: "dsp.log" log_fmt: "{time:YYYY-MM-DD HH:mm:ss.SSS} [{level}][{process.name}-{process.id}-{thread.name}-{thread.id}][{line}] {module}-{function} - {message}" level: "INFO" rotation: "00:00" -retention: "3 days" +retention: "7 days" encoding: "utf8" diff --git a/readme.md b/readme.md deleted file mode 100644 index 58bcefa..0000000 --- a/readme.md +++ /dev/null @@ -1,11 +0,0 @@ -1.2025.01.21把之前的tuoheng alg仓库代码重新开个仓库 - (1)在config/service/dsp_test_service.yml里面添加参数,控制存储用的oss还是minio - storage_source: 1 -2.2025.02.06 - (1)修改代码,把mqtt读取加入到系统中。config/service/dsp_test_service.yml,中添加mqtt_flag,决定是否启用。 - (2)修改了minio情况下的,文件名命名方式。 -3.2025.02.12 - (1)增加了对alg算法开发的代码。可以通过配置文件config/service/dsp_test_service.yml中algSwitch: true,决定是否启用。 - -4、2025.07.10 -周树亮 - 增加人群计数,自研车牌模型,裸土覆盖3个场景 diff --git a/t.txt b/requirements.txt similarity index 100% rename from t.txt rename to requirements.txt diff --git a/service/Dispatcher-raw.py b/service/Dispatcher-raw.py deleted file mode 100644 index 013952c..0000000 --- a/service/Dispatcher-raw.py +++ /dev/null @@ -1,507 +0,0 @@ -# -*- coding: utf-8 -*- -import time,os -from os.path import join -from traceback import format_exc -import json -from cerberus import Validator - -from common.Constant import ONLINE_START_SCHEMA, ONLINE_STOP_SCHEMA, OFFLINE_START_SCHEMA, OFFLINE_STOP_SCHEMA, \ - IMAGE_SCHEMA, RECORDING_START_SCHEMA, RECORDING_STOP_SCHEMA, PULL2PUSH_START_SCHEMA, PULL2PUSH_STOP_SCHEMA -from common.YmlConstant import service_yml_path, kafka_yml_path -from concurrency.FeedbackThread import FeedbackThread -from concurrency.uploadGPU import uploadGPUinfos -from concurrency.IntelligentRecognitionProcess2 import OnlineIntelligentRecognitionProcess2, \ - OfflineIntelligentRecognitionProcess2, PhotosIntelligentRecognitionProcess2 -from concurrency.Pull2PushStreamProcess import PushStreamProcess -from entity.FeedBack import message_feedback, recording_feedback, pull_stream_feedback -from enums.AnalysisStatusEnum import AnalysisStatus -from enums.AnalysisTypeEnum import AnalysisType -from enums.ExceptionEnum import ExceptionType -from enums.ModelTypeEnum import ModelMethodTypeEnum, ModelType -from enums.RecordingStatusEnum import RecordingStatus -from enums.StatusEnum import PushStreamStatus, ExecuteStatus -from exception.CustomerException import ServiceException -from loguru import logger -from multiprocessing import Queue -from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, \ - OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess, ScreenRecordingProcess -from util.CpuUtils import print_cpu_ex_status -from util.FileUtils import create_dir_not_exist -from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available,select_best_server -from util.KafkaUtils import CustomerKafkaConsumer -from util.QueUtil import put_queue -from util.RWUtils import getConfigs -from kafka import KafkaProducer, KafkaConsumer -''' - 分发服务 -''' - - -class DispatcherService: - __slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics','__taskType', '__task_type', - '__kafka_config', '__recordingProcesses', '__pull2PushProcesses','__topicsPort','__gpuTopic','__role','__uploadGPUThread','__gpuDics','__producer') - - def __init__(self, base_dir, env): - # 检测cuda是否活动 - check_cude_is_available() - # 获取全局上下文配置 - self.__context = getConfigs(join(base_dir, service_yml_path % env)) - # 创建任务执行, 视频保存路径 - create_dir_not_exist(join(base_dir, self.__context["video"]["file_path"])) - # 将根路径和环境设置到上下文中 - self.__context["base_dir"], self.__context["env"] = base_dir, env - - # 问题反馈线程 - self.__feedbackThread,self.__uploadGPUThread, self.__fbQueue = None,None, Queue() - # 实时、离线、图片任务进程字典 - self.__listeningProcesses = {} - # 录屏任务进程字典 - self.__recordingProcesses = {} - # 转推流任务进程字典 - self.__pull2PushProcesses = {} - self.__kafka_config = getConfigs(join(base_dir, kafka_yml_path % env)) - - self.__producer = KafkaProducer( - bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun - value_serializer=lambda v: v.encode('utf-8')) - - self.__gpuDics = { }#用于存储gpu信息的字典 - self.__role = self.__context["role"] - self.__topics = [ - self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic - self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic - self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic - self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic - self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic - ] - - self.__topicsPort = [ - self.__kafka_config["topicPort"]["dsp-alg-online-tasks-topic"], # 实时监听topic - self.__kafka_config["topicPort"]["dsp-alg-offline-tasks-topic"], # 离线监听topic - self.__kafka_config["topicPort"]["dsp-alg-image-tasks-topic"], # 图片监听topic - self.__kafka_config["topicPort"]["dsp-recording-task-topic"], # 录屏监听topic - self.__kafka_config["topicPort"]["dsp-push-stream-task-topic"] # 推流监听topic - ] - self.__gpuTopic = [self.__kafka_config["topicGPU"]] - - if self.__role==1: - self.__topics = self.__topics + self.__topicsPort + self.__gpuTopic - - - # 对应topic的各个lambda表达式 - self.__task_type = { - self.__topics[0]: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y), - lambda x, y, z: self.identify_method(x, y, z)), - self.__topics[1]: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y), - lambda x, y, z: self.identify_method(x, y, z)), - self.__topics[2]: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y), - lambda x, y, z: self.identify_method(x, y, z)), - self.__topics[3]: (AnalysisType.RECORDING.value, lambda x, y: self.recording(x, y), - lambda x, y, z: self.recording_method(x, y, z)), - self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y), - lambda x, y, z: self.push_stream_method(x, y, z)) - - } - self.__taskType={ - self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"]:0, # 实时监听topic - self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"]:1, # 离线监听topic - self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"]:2, # 图片监听topic - self.__kafka_config["topic"]["dsp-recording-task-topic"]:3, # 录屏监听topic - self.__kafka_config["topic"]["dsp-push-stream-task-topic"]:4 # 推流监听topic - } - gpu_name_array = get_first_gpu_name() - gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array] - gpu_name = '2080Ti' - if len(gpu_array) > 0: - if gpu_array[0] != '2080': - gpu_name = gpu_array[0] - else: - raise Exception("GPU资源不在提供的模型所支持的范围内!请先提供对应的GPU模型!") - logger.info("当前服务环境为: {}, 服务器GPU使用型号: {}", env, gpu_name) - self.__context["gpu_name"] = gpu_name - self.start_service() - - # 服务调用启动方法 - def start_service(self): - # 初始化kafka监听者 - customerKafkaConsumer = CustomerKafkaConsumer(self.__kafka_config, topics=self.__topics) - ####增加一个线程,用于试试监控和发送gpu状态#### - #### - logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 服务器IP:{}".format(self.__kafka_config['bootstrap_servers'] )) - while True: - try: - # 检查任务进程运行情况,去除结束的任务 - self.check_process_task() - # 启动反馈线程 - self.start_feedback_thread() - self.start_uploadGPU_thread() - msg = customerKafkaConsumer.poll() - if msg is not None and len(msg) > 0: - for k, v in msg.items(): - for m in v: - message = m.value - #如果收到的信息是gpu状态的话,收到信息后,更新自己的gpu服务器状态,下面不再执行 - if m.topic in self.__gpuTopic: - customerKafkaConsumer.commit_offset(m,'x'*16,False) - #更新机器资源现状 - ip = message['System']['Local IP Address'] - self.__gpuDics[ip]=message - continue - #如果收到的信息是门户消息,收到信息后,要根据Gpu状态,转发到对应的机器。 - elif m.topic in self.__topicsPort: - customerKafkaConsumer.commit_offset(m, 'y'*16) - #状态分析 - #recondGpu={'hostname':'thsw2','IP':'192.168.10.66','gpuId':0} - recondGpu= select_best_server(self.__gpuDics) - if recondGpu is None: - print( 'recondGpu:',recondGpu, ' self.__gpuDics: ',self.__gpuDics,' topic:',m.topic, ' message:',message ) - continue - #转发消息 - message['transmit_topic'] = m.topic + '-' + recondGpu['IP'] - transmitMsg={'transmit':message} - msg_json = json.dumps( message ) - future = self.__producer.send( message['transmit_topic'] ,msg_json) - try: - future.get(timeout=2) - logger.info( "转发消息成功,消息topic:{},消息内容:{}",message['transmit_topic'],message ) - except kafka_errors as e: - print('------transmitted error:',e) - logger.info("转发消息失败") - traceback.format_exc() - else: - requestId = message.get("request_id") - if requestId is None: - logger.error("请求参数格式错误, 请检查请求体格式是否正确!message:%s"%(message)) - continue - customerKafkaConsumer.commit_offset(m, requestId) - logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", - m.topic, m.offset, m.partition, message, requestId) - - message['taskType']=self.__taskType[m.topic] - topic_method = self.__task_type[m.topic] - topic_method[2](topic_method[1], message, topic_method[0]) - else: - print_gpu_ex_status() - print_cpu_ex_status(self.__context["base_dir"]) - time.sleep(1) - except Exception: - logger.error("主线程异常:{}", format_exc()) - - def identify_method(self, handle_method, message, analysisType): - try: - check_cude_is_available() - handle_method(message, analysisType) - except ServiceException as s: - logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"]) - put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType, - s.code, s.msg), timeout=1) - except Exception: - logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"]) - put_queue(self.__fbQueue, message_feedback(message["request_id"], AnalysisStatus.FAILED.value, analysisType, - ExceptionType.SERVICE_INNER_EXCEPTION.value[0], - ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1) - finally: - del message - - def push_stream_method(self, handle_method, message, analysisType): - try: - check_cude_is_available() - handle_method(message, analysisType) - except ServiceException as s: - logger.error("消息监听异常:{}, requestId: {}", s.msg, message['request_id']) - videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in - message.get("video_urls", []) if url.get("id") is not None] - put_queue(self.__fbQueue, pull_stream_feedback(message['request_id'], ExecuteStatus.FAILED.value[0], - s.code, s.msg, videoInfo), timeout=1) - except Exception: - logger.error("消息监听异常:{}, requestId: {}", format_exc(), message['request_id']) - videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.FAILED.value[0]} for url in - message.get("video_urls", []) if url.get("id") is not None] - put_queue(self.__fbQueue, pull_stream_feedback(message.get("request_id"), ExecuteStatus.FAILED.value[0], - ExceptionType.SERVICE_INNER_EXCEPTION.value[0], - ExceptionType.SERVICE_INNER_EXCEPTION.value[1], videoInfo), - timeout=1) - finally: - del message - - def recording_method(self, handle_method, message, analysisType): - try: - check_cude_is_available() - handle_method(message, analysisType) - except ServiceException as s: - logger.error("消息监听异常:{}, requestId: {}", s.msg, message["request_id"]) - put_queue(self.__fbQueue, - recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0], - error_code=s.code, error_msg=s.msg), timeout=1) - except Exception: - logger.error("消息监听异常:{}, requestId: {}", format_exc(), message["request_id"]) - put_queue(self.__fbQueue, - recording_feedback(message["request_id"], RecordingStatus.RECORDING_FAILED.value[0], - ExceptionType.SERVICE_INNER_EXCEPTION.value[0], - ExceptionType.SERVICE_INNER_EXCEPTION.value[1]), timeout=1) - finally: - del message - - # 开启实时进程 - def startOnlineProcess(self, msg, analysisType): - if self.__listeningProcesses.get(msg["request_id"]): - logger.warning("实时重复任务,请稍后再试!requestId:{}", msg["request_id"]) - return - model_type = self.__context["service"]["model"]["model_type"] - codes = [model.get("code") for model in msg["models"] if model.get("code")] - if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes: - coir = OnlineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context) - else: - coir = OnlineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context) - coir.start() - logger.info("开始实时进程!requestId:{},pid:{}, ppid:{}", msg["request_id"],os.getpid(),os.getppid()) - self.__listeningProcesses[msg["request_id"]] = coir - - # 结束实时进程 - def stopOnlineProcess(self, msg): - ps = self.__listeningProcesses.get(msg["request_id"]) - if ps is None: - logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) - return - ps.sendEvent({"command": "stop"}) - - # 新增该函数用于,向子任务发送命令(algStart,algStop) - def sendCmdToChildProcess(self, msg,cmd="algStart"): - ps = self.__listeningProcesses.get(msg["request_id"]) - if ps is None: - logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) - return - ps.sendEvent({"command": cmd}) - - @staticmethod - def check_process(listeningProcess): - for requestId in list(listeningProcess.keys()): - if not listeningProcess[requestId].is_alive(): - del listeningProcess[requestId] - - def check_process_task(self): - self.check_process(self.__listeningProcesses) - self.check_process(self.__recordingProcesses) - self.check_process(self.__pull2PushProcesses) - - # 开启离线进程 - def startOfflineProcess(self, msg, analysisType): - if self.__listeningProcesses.get(msg["request_id"]): - logger.warning("离线重复任务,请稍后再试!requestId:{}", msg["request_id"]) - return - model_type = self.__context["service"]["model"]["model_type"] - codes = [model.get("code") for model in msg["models"] if model.get("code")] - if ModelMethodTypeEnum.NORMAL.value == model_type: - first = OfflineIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context) - else: - first = OfflineIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context) - first.start() - self.__listeningProcesses[msg["request_id"]] = first - - # 结束离线进程 - def stopOfflineProcess(self, msg): - ps = self.__listeningProcesses.get(msg["request_id"]) - if ps is None: - logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) - return - ps.sendEvent({"command": "stop"}) - - # 开启图片分析进程 - def startImageProcess(self, msg, analysisType): - pp = self.__listeningProcesses.get(msg["request_id"]) - if pp is not None: - logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"]) - return - model_type = self.__context["service"]["model"]["model_type"] - codes = [model.get("code") for model in msg["models"] if model.get("code")] - if ModelMethodTypeEnum.NORMAL.value == model_type or ModelType.ILLPARKING_MODEL.value[1] in codes: - imaged = PhotosIntelligentRecognitionProcess(self.__fbQueue, msg, analysisType, self.__context) - else: - imaged = PhotosIntelligentRecognitionProcess2(self.__fbQueue, msg, analysisType, self.__context) - # 创建在线识别进程并启动 - imaged.start() - self.__listeningProcesses[msg["request_id"]] = imaged - - ''' - 校验kafka消息 - ''' - - @staticmethod - def check_msg(msg, schema): - try: - v = Validator(schema, allow_unknown=True) - result = v.validate(msg) - if not result: - logger.error("参数校验异常: {}, requestId: {}", v.errors, msg["request_id"]) - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - except ServiceException as s: - raise s - except Exception: - logger.error("参数校验异常: {}, requestId: {}", format_exc(), msg["request_id"]) - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - ''' - 开启反馈线程,用于发送消息 - ''' - - def start_feedback_thread(self): - if self.__feedbackThread is None: - self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config) - self.__feedbackThread.setDaemon(True) - self.__feedbackThread.start() - time.sleep(1) - if self.__feedbackThread and not self.__feedbackThread.is_alive(): - logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!") - self.__feedbackThread = FeedbackThread(self.__fbQueue, self.__kafka_config) - self.__feedbackThread.setDaemon(True) - self.__feedbackThread.start() - time.sleep(1) - - def start_uploadGPU_thread(self): - if self.__uploadGPUThread is None: - self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config) - self.__uploadGPUThread.setDaemon(True) - self.__uploadGPUThread.start() - time.sleep(1) - if self.__uploadGPUThread and not self.__uploadGPUThread.is_alive(): - logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!") - self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config) - self.__uploadGPUThread.setDaemon(True) - self.__uploadGPUThread.start() - time.sleep(1) - - ''' - 在线分析逻辑 - ''' - - def online0(self, message, analysisType): - if "start" == message.get("command"): - self.check_msg(message, ONLINE_START_SCHEMA) - if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]): - raise ServiceException(ExceptionType.NO_RESOURCES.value[0], - ExceptionType.NO_RESOURCES.value[1]) - self.startOnlineProcess(message, analysisType) - elif message.get("command") in ["algStart","algStop"]: - self.sendCmdToChildProcess(message,cmd=message.get("command")) - elif "stop" == message.get("command"): - self.check_msg(message, ONLINE_STOP_SCHEMA) - self.stopOnlineProcess(message) - else: - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - - def online(self, message, analysisType): - if "start" == message.get("command"): - self.check_msg(message, ONLINE_START_SCHEMA) - if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]): - raise ServiceException(ExceptionType.NO_RESOURCES.value[0], - ExceptionType.NO_RESOURCES.value[1]) - self.startOnlineProcess(message, analysisType) - - elif message.get("command") in ["algStart","algStop"]: - - if message.get("defaultEnabled",True): - self.sendCmdToChildProcess(message,cmd=message.get("command")) - - - elif "stop" == message.get("command"): - self.check_msg(message, ONLINE_STOP_SCHEMA) - self.stopOnlineProcess(message) - else: - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - - - - def offline(self, message, analysisType): - if "start" == message.get("command"): - self.check_msg(message, OFFLINE_START_SCHEMA) - if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["limit"]): - raise ServiceException(ExceptionType.NO_RESOURCES.value[0], - ExceptionType.NO_RESOURCES.value[1]) - self.startOfflineProcess(message, analysisType) - elif message.get("command") in ["algStart","algStop"]: - self.sendCmdToChildProcess( message,cmd=message.get("command")) - elif "stop" == message.get("command"): - self.check_msg(message, OFFLINE_STOP_SCHEMA) - self.stopOfflineProcess(message) - else: - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - def image(self, message, analysisType): - if "start" == message.get("command"): - self.check_msg(message, IMAGE_SCHEMA) - if len(self.__listeningProcesses) >= int(self.__context['service']["task"]["image"]["limit"]): - raise ServiceException(ExceptionType.NO_RESOURCES.value[0], - ExceptionType.NO_RESOURCES.value[1]) - self.startImageProcess(message, analysisType) - else: - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - def recording(self, message, analysisType): - if "start" == message.get("command"): - self.check_msg(message, RECORDING_START_SCHEMA) - if len(self.__recordingProcesses) >= int(self.__context['service']["task"]["limit"]): - raise ServiceException(ExceptionType.NO_RESOURCES.value[0], - ExceptionType.NO_RESOURCES.value[1]) - self.startRecordingProcess(message, analysisType) - elif "stop" == message.get("command"): - self.check_msg(message, RECORDING_STOP_SCHEMA) - self.stopRecordingProcess(message) - else: - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - # 开启录屏进程 - def startRecordingProcess(self, msg, analysisType): - if self.__listeningProcesses.get(msg["request_id"]): - logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"]) - return - srp = ScreenRecordingProcess(self.__fbQueue, self.__context, msg, analysisType) - srp.start() - self.__recordingProcesses[msg["request_id"]] = srp - - # 结束录屏进程 - def stopRecordingProcess(self, msg): - rdp = self.__recordingProcesses.get(msg["request_id"]) - if rdp is None: - logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) - return - rdp.sendEvent({"command": "stop"}) - - def pullStream(self, message, analysisType): - if "start" == message.get("command"): - self.check_msg(message, PULL2PUSH_START_SCHEMA) - if len(self.__pull2PushProcesses) >= int(self.__context['service']["task"]["limit"]): - raise ServiceException(ExceptionType.NO_RESOURCES.value[0], - ExceptionType.NO_RESOURCES.value[1]) - - self.startPushStreamProcess(message, analysisType) - elif "stop" == message.get("command"): - self.check_msg(message, PULL2PUSH_STOP_SCHEMA) - self.stopPushStreamProcess(message) - else: - raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], - ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) - - def startPushStreamProcess(self, msg, analysisType): - if self.__pull2PushProcesses.get(msg["request_id"]): - logger.warning("重复任务,请稍后再试!requestId:{}", msg["request_id"]) - return - srp = PushStreamProcess(self.__fbQueue, self.__context, msg, analysisType) - srp.start() - self.__pull2PushProcesses[msg["request_id"]] = srp - - # 结束录屏进程 - def stopPushStreamProcess(self, msg): - srp = self.__pull2PushProcesses.get(msg["request_id"]) - if srp is None: - logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) - return - srp.sendEvent({"command": "stop", "videoIds": msg.get("video_ids", [])})