diff --git a/concurrency/FileUploadThread.py b/concurrency/FileUploadThread.py index 6147ce7..069f921 100644 --- a/concurrency/FileUploadThread.py +++ b/concurrency/FileUploadThread.py @@ -16,6 +16,7 @@ from enums.AnalysisStatusEnum import AnalysisStatus from util.PlotsUtils import draw_painting_joint from util.QueUtil import put_queue, get_no_block_queue, clear_queue import io +from util.LocationUtils import locate_byMqtt class FileUpload(Thread): __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg','_mqtt_list') @@ -24,7 +25,9 @@ class FileUpload(Thread): super().__init__() self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type,self._mqtt_list = args self._storage_source = self._context['service']['storage_source'] - + self._algStatus = False # 默认关闭 + self._algSwitch = self._context['service']['algSwitch'] +#如果任务是在线、离线处理,则用此类 class ImageFileUpload(FileUpload): __slots__ = () @@ -36,15 +39,14 @@ class ImageFileUpload(FileUpload): ''' det_xywh:{ 'code':{ - 1: [[detect_targets_code, box, score, label_array, color]] + 1: [[detect_targets_code, box, score, label_array, color,is_new]] } - } + } #is_new--是否是新的目标。 模型编号:modeCode 检测目标:detectTargetCode ''' - #print('*'*100,' mqtt_list:',len(self._mqtt_list)) - #if len(self._mqtt_list)>=10: - # print(' mqtt[4]:',self._mqtt_list[0]['satcount']) + print('*'*100,' mqtt_list:',len(self._mqtt_list)) + model_info = [] # 更加模型编码解析数据 @@ -55,7 +57,14 @@ class ImageFileUpload(FileUpload): aFrame = frame.copy() for target in target_list: draw_painting_joint(target[1], aFrame, target[3], target[2], target[4], font_config, target[5]) - model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame}) + igH,igW = aFrame.shape[0:2] + if len(self._mqtt_list)>=1: + #camParas = self._mqtt_list[0]['data'] + camParas = self._mqtt_list[0] + gps = locate_byMqtt(target[1],igW,igH,camParas,outFormat='wgs84') + else: + gps=[None,None] + model_info.append({"modelCode": str(code), "detectTargetCode": str(cls), "aFrame": aFrame,'gps':gps}) if len(model_info) > 0: image_result = { "or_frame": frame, @@ -74,6 +83,8 @@ class ImageFileUpload(FileUpload): image_queue, fb_queue, analyse_type = self._image_queue, self._fb_queue, self._analyse_type service_timeout = int(service["timeout"]) frame_step = int(service["filter"]["frame_step"]) + 120 + if msg['taskType']==0: self._algStatus = False + else: self._algStatus = True try: with ThreadPoolExecutor(max_workers=2) as t: # 初始化oss客户端 @@ -92,10 +103,15 @@ class ImageFileUpload(FileUpload): # 获取队列中的消息 image_msg = get_no_block_queue(image_queue) if image_msg is not None: + if image_msg[0] == 2: + logger.info("图片上传线程收到命令:{}, requestId: {}",image_msg[1] ,request_id) if 'stop' == image_msg[1]: logger.info("开始停止图片上传线程, requestId:{}", request_id) break + if 'algStart' == image_msg[1]: self._algStatus = True; logger.info("图片上传线程,执行算法开启命令, requestId:{}", request_id) + if 'algStop' == image_msg[1]: self._algStatus = False; logger.info("图片上传线程,执行算法关闭命令, requestId:{}", request_id) + if image_msg[0] == 1: image_result = self.handle_image(image_msg[1], frame_step) if image_result is not None: @@ -147,10 +163,14 @@ class ImageFileUpload(FileUpload): remote_image_list[0], remote_image_list[ii+1], model_info['modelCode'], - model_info['detectTargetCode']) ) - - for msg in msg_list: - put_queue(fb_queue, msg, timeout=2, is_ex=False) + model_info['detectTargetCode'], + longitude=model_info['gps'][0], + latitude=model_info['gps'][1], + ) ) + + if (not self._algSwitch) or ( self._algStatus and self._algSwitch): + for msg in msg_list: + put_queue(fb_queue, msg, timeout=2, is_ex=False) del task, msg_list else: sleep(1) @@ -175,7 +195,7 @@ def build_image_name(*args): random_num, mode_type, modeCode, target, image_type) - +#如果任务是图像处理,则用此类 class ImageTypeImageFileUpload(Thread): __slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg') @@ -271,6 +291,7 @@ class ImageTypeImageFileUpload(Thread): else: image_result = self.handle_image(det_xywh, copy_frame, font_config) if image_result: + # 图片帧数编码 if image_url is None: or_result, or_image = cv2.imencode(".jpg", image_result.get("or_frame")) @@ -345,8 +366,8 @@ class ImageTypeImageFileUpload(Thread): analyse_type, "", "", "", image_url, remote_url_list[ii], - model_info.get('modelCode'), - model_info.get('detectTargetCode'), + model_info_list[ii].get('modelCode'), + model_info_list[ii].get('detectTargetCode'), analyse_results=result)) diff --git a/concurrency/IntelligentRecognitionProcess.py b/concurrency/IntelligentRecognitionProcess.py index 6a94672..7c40632 100644 --- a/concurrency/IntelligentRecognitionProcess.py +++ b/concurrency/IntelligentRecognitionProcess.py @@ -5,7 +5,7 @@ from concurrent.futures import ThreadPoolExecutor from os.path import join, exists, getsize from time import time, sleep from traceback import format_exc - +import requests import cv2 from multiprocessing import Process, Queue @@ -63,7 +63,7 @@ class IntelligentRecognitionProcess(Process): put_queue(self._fb_queue, message_feedback(self._msg["request_id"], AnalysisStatus.WAITING.value, self._analyse_type, progress=init_progess), timeout=2, is_ex=True) self._storage_source = self._context['service']['storage_source'] - + self._algStatus = False def sendEvent(self, eBody): put_queue(self.event_queue, eBody, timeout=2, is_ex=True) @@ -276,7 +276,12 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): if event_result: cmdStr = event_result.get("command") - + #接收到算法开启、或者关闭的命令 + if cmdStr in ['algStart' , 'algStop' ]: + logger.info("发送向推流进程发送算法命令, requestId: {}, {}", request_id,cmdStr ) + put_queue(push_queue, (2, cmdStr), timeout=1, is_ex=True) + pull_process.sendCommand({"command": cmdStr}) + # 接收到停止指令 if "stop" == cmdStr: logger.info("实时任务开始停止, requestId: {}", request_id) @@ -593,6 +598,11 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): if "stop" == cmdStr: logger.info("离线任务开始停止, requestId: {}", request_id) pull_process.sendCommand({"command": 'stop'}) + if cmdStr in ['algStart' , 'algStop' ]: + logger.info("发送向推流进程发送算法命令, requestId: {}, {}", request_id,cmdStr ) + put_queue(push_queue, (2, cmdStr), timeout=1, is_ex=True) + pull_process.sendCommand({"command": cmdStr}) + pull_result = get_no_block_queue(pull_queue) if pull_result is None: sleep(1) @@ -1104,16 +1114,43 @@ class PhotosIntelligentRecognitionProcess(Process): image_thread.setDaemon(True) image_thread.start() return image_thread - + def check_ImageUrl_Vaild(self,url,timeout=1): + try: + # 发送 HTTP 请求,尝试访问图片 + response = requests.get(url, timeout=timeout) # 设置超时时间为 10 秒 + if response.status_code == 200: + return True,url + else: + return False,f"图片地址无效,状态码:{response.status_code}" + except requests.exceptions.RequestException as e: + # 捕获请求过程中可能出现的异常(如网络问题、超时等) + return False,str(e) + def run(self): fb_queue, msg, analyse_type, context = self._fb_queue, self._msg, self._analyse_type, self._context request_id, logo, image_queue = msg["request_id"], context['logo'], self._image_queue base_dir, env = context["base_dir"], context["env"] imageUrls = msg["image_urls"] image_thread = None - with ThreadPoolExecutor(max_workers=2) as t: + init_log(base_dir, env) + valFlag=True + for url in imageUrls: + valFlag,ret = self.check_ImageUrl_Vaild(url,timeout=1) + + if not valFlag: + logger.error("图片分析异常: {}, requestId:{},url:{}",ret, request_id,url) + #print("AnalysisStatus.FAILED.value:{},ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0]:{},ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1]:{}".format(AnalysisStatus.FAILED.value,ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0],ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1] ) ) + put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, + analyse_type, + ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0], + ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1]), timeout=2) + + return + + + with ThreadPoolExecutor(max_workers=1) as t: try: - init_log(base_dir, env) + #init_log(base_dir, env) logger.info("开始启动图片识别进程, requestId: {}", request_id) model_array = get_model(msg, context, analyse_type) image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type) diff --git a/concurrency/PullVideoStreamProcess.py b/concurrency/PullVideoStreamProcess.py index d87c779..54fe030 100644 --- a/concurrency/PullVideoStreamProcess.py +++ b/concurrency/PullVideoStreamProcess.py @@ -102,6 +102,10 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess): logger.info("开始停止实时拉流进程, requestId:{}", request_id) ex_status = False break + if command_msg.get("command") in ['algStart' , 'algStop' ]: + logger.info("拉流进程中,requestId:{},向图片上传进程发送命令:{}", request_id,command_msg.get("command")) + put_queue(image_queue, (2, command_msg.get("command") ), timeout=1) + # 检测视频信息是否存在或拉流对象是否存在 if check_video_stream(width, height): if len(frame_list) > 0: @@ -249,6 +253,10 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess): logger.info("开始停止离线拉流进程, requestId:{}", request_id) ex_status = False break + if command_msg.get("command") in ['algStart' , 'algStop' ]: + logger.info("拉流进程中,requestId:{},向图片上传进程发送命令:{}", request_id,command_msg.get("command")) + put_queue(image_queue, (2, command_msg.get("command") ), timeout=1) + # 检测视频信息是否存在或拉流对象是否存在 if check_video_stream(width, height): logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id) diff --git a/concurrency/PushVideoStreamProcess.py b/concurrency/PushVideoStreamProcess.py index 6a5850a..6788d61 100644 --- a/concurrency/PushVideoStreamProcess.py +++ b/concurrency/PushVideoStreamProcess.py @@ -35,7 +35,9 @@ class PushStreamProcess(Process): super().__init__() # 传参 self._msg, self._push_queue, self._image_queue, self._push_ex_queue, self._hb_queue, self._context = args - + self._algStatus = False # 默认关闭 + self._algSwitch = self._context['service']['algSwitch'] + def build_logo_url(self): logo = None if self._context["video"]["video_add_water"]: @@ -168,7 +170,11 @@ class OnPushStreamProcess(PushStreamProcess): if len(thread_p) > 0: for r in thread_p: r.result() - frame_merge = video_conjuncing(frame, copy_frame) + #print('----line173:',self._algSwitch,self._algStatus) + if self._algSwitch and (not self._algStatus): + frame_merge = video_conjuncing(frame, frame.copy()) + else: + frame_merge = video_conjuncing(frame, copy_frame) # 写原视频到本地 write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file, or_write_status, request_id) @@ -231,6 +237,9 @@ class OnPushStreamProcess(PushStreamProcess): or_video_file = write_or_video_result.result(timeout=60) # 接收停止指令 if push_r[0] == 2: + logger.info("拉流进程收到控制命令为:{}, requestId: {}",push_r[1] ,request_id) + if 'algStart' == push_r[1]: self._algStatus = True;logger.info("算法识别开启, requestId: {}", request_id) + if 'algStop' == push_r[1]: self._algStatus = False;logger.info("算法识别关闭, requestId: {}", request_id) if 'stop' == push_r[1]: logger.info("停止推流进程, requestId: {}", request_id) break @@ -238,6 +247,7 @@ class OnPushStreamProcess(PushStreamProcess): ex_status = False logger.info("停止推流进程, requestId: {}", request_id) break + del push_r else: sleep(1) @@ -286,6 +296,8 @@ class OffPushStreamProcess(PushStreamProcess): picture_similarity = bool(context["service"]["filter"]["picture_similarity"]) qs_np_tmp = None pix_dis = 60 + if msg['taskType']==0: self._algStatus = False + else: self._algStatus = True try: init_log(base_dir, env) logger.info("开始启动离线推流进程!requestId:{}", request_id) @@ -372,7 +384,11 @@ class OffPushStreamProcess(PushStreamProcess): if len(thread_p) > 0: for r in thread_p: r.result() - frame_merge = video_conjuncing(frame, copy_frame) + print('----line384:',self._algSwitch,self._algStatus) + if self._algSwitch and (not self._algStatus): + frame_merge = video_conjuncing(frame, frame.copy()) + else: + frame_merge = video_conjuncing(frame, copy_frame) # 写识别视频到本地 write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, @@ -433,6 +449,9 @@ class OffPushStreamProcess(PushStreamProcess): ai_video_file = write_ai_video_result.result(timeout=60) # 接收停止指令 if push_r[0] == 2: + logger.info("拉流进程收到控制命令为:{}, requestId: {}",push_r[1] ,request_id) + if 'algStart' == push_r[1]: self._algStatus = True;logger.info("算法识别开启, requestId: {}", request_id) + if 'algStop' == push_r[1]: self._algStatus = False;logger.info("算法识别关闭, requestId: {}", request_id) if 'stop' == push_r[1]: logger.info("停止推流进程, requestId: {}", request_id) break diff --git a/concurrency/__pycache__/FileUploadThread.cpython-38.pyc b/concurrency/__pycache__/FileUploadThread.cpython-38.pyc index 0008489..e00f621 100644 Binary files a/concurrency/__pycache__/FileUploadThread.cpython-38.pyc and b/concurrency/__pycache__/FileUploadThread.cpython-38.pyc differ diff --git a/concurrency/__pycache__/IntelligentRecognitionProcess.cpython-38.pyc b/concurrency/__pycache__/IntelligentRecognitionProcess.cpython-38.pyc index e67b41c..013cd6c 100644 Binary files a/concurrency/__pycache__/IntelligentRecognitionProcess.cpython-38.pyc and b/concurrency/__pycache__/IntelligentRecognitionProcess.cpython-38.pyc differ diff --git a/concurrency/__pycache__/PullVideoStreamProcess.cpython-38.pyc b/concurrency/__pycache__/PullVideoStreamProcess.cpython-38.pyc index 9b7facf..51d27cb 100644 Binary files a/concurrency/__pycache__/PullVideoStreamProcess.cpython-38.pyc and b/concurrency/__pycache__/PullVideoStreamProcess.cpython-38.pyc differ diff --git a/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc b/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc index c89267d..c67b250 100644 Binary files a/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc and b/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc differ diff --git a/config/mqtt/dsp_test_mqtt.yml b/config/mqtt/dsp_test_mqtt.yml index aff8717..a8e2310 100644 --- a/config/mqtt/dsp_test_mqtt.yml +++ b/config/mqtt/dsp_test_mqtt.yml @@ -1,6 +1,10 @@ -mqtt_flag: false -broker : "101.133.163.127" +mqtt_flag: true +broker : "58.213.148.44" port : 1883 -topic: "test/topic" +username: "admin" +password: "admin##123" +#topic: "/topic/v1/airportFly/%s/aiDroneData" +topic: "/topic/v1/airportDrone/THJSQ03B2309TPCTD5QV/realTime/data" # 存储多少条消息到list里 length: 10 + diff --git a/config/service/dsp_test_service.yml b/config/service/dsp_test_service.yml index d97bde9..00646a5 100644 --- a/config/service/dsp_test_service.yml +++ b/config/service/dsp_test_service.yml @@ -30,4 +30,6 @@ service: storage_source: 1 #是否启用mqtt,0--不用,1--启用 mqtt_flag: 0 + #是否启用alg控制功能 + algSwitch: false diff --git a/entity/FeedBack.py b/entity/FeedBack.py index 909457e..0ce6cf8 100644 --- a/entity/FeedBack.py +++ b/entity/FeedBack.py @@ -4,7 +4,7 @@ from util.TimeUtils import now_date_to_str def message_feedback(requestId, status, analyse_type, error_code="", error_msg="", progress="", original_url="", - sign_url="", modelCode="", detectTargetCode="", analyse_results="", video_url="", ai_video_url=""): + sign_url="", modelCode="", detectTargetCode="", analyse_results="", video_url="", ai_video_url="",longitude="",latitude=""): if len(analyse_results) > 0: analyse_results = dumps(analyse_results) taskbar = { @@ -23,7 +23,9 @@ def message_feedback(requestId, status, analyse_type, error_code="", error_msg=" "analyse_results": analyse_results, "model_code": modelCode, "detect_targets_code": detectTargetCode, - "analyse_time": now_date_to_str() + "analyse_time": now_date_to_str(), + "longitude":str(longitude), + "latitude":str(latitude), } ] } diff --git a/entity/__pycache__/FeedBack.cpython-38.pyc b/entity/__pycache__/FeedBack.cpython-38.pyc index 4b14cee..0785694 100644 Binary files a/entity/__pycache__/FeedBack.cpython-38.pyc and b/entity/__pycache__/FeedBack.cpython-38.pyc differ diff --git a/enums/ModelTypeEnum.py b/enums/ModelTypeEnum.py index 4154b89..85192f5 100644 --- a/enums/ModelTypeEnum.py +++ b/enums/ModelTypeEnum.py @@ -141,7 +141,7 @@ class ModelType(Enum): 'roadVehicleAngle': 15, 'speedRoadVehicleAngleMax': 75, 'roundness': 1.0, - 'cls': 9, + 'cls': 10, 'vehicleFactor': 0.1, 'confThres': 0.25, 'roadIou': 0.6, @@ -628,7 +628,7 @@ class ModelType(Enum): - FORESTCROWD_FARM_MODEL = ("2", "026", "森林人群模型", 'forestCrowd', lambda device, gpuName: { + FORESTCROWD_FARM_MODEL = ("26", "026", "森林人群模型", 'forestCrowd', lambda device, gpuName: { 'labelnames': ["林斑", "病死树", "行人", "火焰", "烟雾","人群"], 'postProcess':{'function':gather_post_process,'pars':{'pedestrianId':2,'crowdThreshold':4,'gatherId':5,'distancePersonScale':2.0}}, 'models': @@ -700,7 +700,66 @@ class ModelType(Enum): 'Segweights': '../AIlib2/weights/highWay2/stdc_360X640_%s_fp16.engine' % gpuName }) + SMARTSITE_MODEL = ("28", "028", "智慧工地模型", 'smartSite', lambda device, gpuName: { + 'labelnames': [ "工人","塔式起重机","悬臂","起重机","压路机","推土机","挖掘机","卡车","装载机","泵车","混凝土搅拌车","打桩","其他车辆" ], + 'postProcess':{'function':default_mix,'pars':{}}, + 'models': + [ + { + 'weight':"../AIlib2/weights/smartSite/yolov5_%s_fp16.engine"%(gpuName),###检测模型路径 + 'name':'yolov5', + 'model':yolov5Model, + 'par':{ 'half':True,'device':'cuda:0' ,'conf_thres':0.25,'iou_thres':0.45,'allowedList':list(range(20)),'segRegionCnt':1, 'trtFlag_det':True,'trtFlag_seg':False, "score_byClass":{"0":0.25,"1":0.3,"2":0.3,"3":0.3 } }, + } + + + ], + 'postFile': { + "rainbows": COLOR + }, + + }) + RUBBISH_MODEL = ("29", "029", "垃圾模型", 'rubbish', lambda device, gpuName: { + 'labelnames': [ "建筑垃圾","白色垃圾","其他垃圾"], + 'postProcess':{'function':default_mix,'pars':{}}, + 'models': + [ + { + 'weight':"../AIlib2/weights/rubbish/yolov5_%s_fp16.engine"%(gpuName),###检测模型路径 + 'name':'yolov5', + 'model':yolov5Model, + 'par':{ 'half':True,'device':'cuda:0' ,'conf_thres':0.25,'iou_thres':0.45,'allowedList':list(range(20)),'segRegionCnt':1, 'trtFlag_det':True,'trtFlag_seg':False, "score_byClass":{"0":0.25,"1":0.3,"2":0.3,"3":0.3 } }, + } + + + ], + 'postFile': { + "rainbows": COLOR + }, + + }) + + FIREWORK_MODEL = ("30", "030", "烟花模型", 'firework', lambda device, gpuName: { + 'labelnames': [ "烟花"], + 'postProcess':{'function':default_mix,'pars':{}}, + 'models': + [ + { + 'weight':"../AIlib2/weights/firework/yolov5_%s_fp16.engine"%(gpuName),###检测模型路径 + 'name':'yolov5', + 'model':yolov5Model, + 'par':{ 'half':True,'device':'cuda:0' ,'conf_thres':0.25,'iou_thres':0.45,'allowedList':list(range(20)),'segRegionCnt':1, 'trtFlag_det':True,'trtFlag_seg':False, "score_byClass":{"0":0.25,"1":0.3,"2":0.3,"3":0.3 } }, + } + + + ], + 'postFile': { + "rainbows": COLOR + }, + + }) + @staticmethod def checkCode(code): diff --git a/enums/__pycache__/ModelTypeEnum.cpython-38.pyc b/enums/__pycache__/ModelTypeEnum.cpython-38.pyc index a513f48..fc9c834 100644 Binary files a/enums/__pycache__/ModelTypeEnum.cpython-38.pyc and b/enums/__pycache__/ModelTypeEnum.cpython-38.pyc differ diff --git a/readme.md b/readme.md index 8a95403..d29501a 100644 --- a/readme.md +++ b/readme.md @@ -4,3 +4,5 @@ 2.2025.02.06 (1)修改代码,把mqtt读取加入到系统中。config/service/dsp_test_service.yml,中添加mqtt_flag,决定是否启用。 (2)修改了minio情况下的,文件名命名方式。 +3.2025.02.12 + (1)增加了对alg算法开发的代码。可以通过配置文件config/service/dsp_test_service.yml中algSwitch: true,决定是否启用。 diff --git a/service/Dispatcher.py b/service/Dispatcher.py index 2793052..1eb2ad4 100644 --- a/service/Dispatcher.py +++ b/service/Dispatcher.py @@ -37,7 +37,7 @@ from util.RWUtils import getConfigs class DispatcherService: - __slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics', '__task_type', + __slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics','__taskType', '__task_type', '__kafka_config', '__recordingProcesses', '__pull2PushProcesses') def __init__(self, base_dir, env): @@ -79,6 +79,13 @@ class DispatcherService: self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y), lambda x, y, z: self.push_stream_method(x, y, z)) } + self.__taskType={ + "dsp-alg-online-tasks":0, + "dsp-alg-offline-tasks":1, + "dsp-alg-image-tasks":2, + "dsp-recording-task":3, + "dsp-push-stream-task":4 + } gpu_name_array = get_first_gpu_name() gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array] gpu_name = '2080Ti' @@ -109,11 +116,12 @@ class DispatcherService: message = m.value requestId = message.get("request_id") if requestId is None: - logger.error("请求参数格式错误, 请检查请求体格式是否正确!") + logger.error("请求参数格式错误, 请检查请求体格式是否正确!message:%s"%(message)) continue customerKafkaConsumer.commit_offset(m, requestId) logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", m.topic, m.offset, m.partition, message, requestId) + message['taskType']=self.__taskType[m.topic] topic_method = self.__task_type[m.topic] topic_method[2](topic_method[1], message, topic_method[0]) else: @@ -200,6 +208,14 @@ class DispatcherService: logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) return ps.sendEvent({"command": "stop"}) + + # 新增该函数用于,向子任务发送命令(algStart,algStop) + def sendCmdToChildProcess(self, msg,cmd="algStart"): + ps = self.__listeningProcesses.get(msg["request_id"]) + if ps is None: + logger.warning("未查询到该任务,无法停止任务!requestId:{}", msg["request_id"]) + return + ps.sendEvent({"command": cmd}) @staticmethod def check_process(listeningProcess): @@ -298,6 +314,8 @@ class DispatcherService: raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startOnlineProcess(message, analysisType) + elif message.get("command") in ["algStart","algStop"]: + self.sendCmdToChildProcess(message,cmd=message.get("command")) elif "stop" == message.get("command"): self.check_msg(message, ONLINE_STOP_SCHEMA) self.stopOnlineProcess(message) @@ -312,6 +330,8 @@ class DispatcherService: raise ServiceException(ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1]) self.startOfflineProcess(message, analysisType) + elif message.get("command") in ["algStart","algStop"]: + self.sendCmdToChildProcess( message,cmd=message.get("command")) elif "stop" == message.get("command"): self.check_msg(message, OFFLINE_STOP_SCHEMA) self.stopOfflineProcess(message) diff --git a/service/__pycache__/Dispatcher.cpython-38.pyc b/service/__pycache__/Dispatcher.cpython-38.pyc index a439102..114829c 100644 Binary files a/service/__pycache__/Dispatcher.cpython-38.pyc and b/service/__pycache__/Dispatcher.cpython-38.pyc differ diff --git a/util/LocationUtils.py b/util/LocationUtils.py new file mode 100644 index 0000000..d63c473 --- /dev/null +++ b/util/LocationUtils.py @@ -0,0 +1,156 @@ + +import os,math +import numpy as np + +# WGS-84经纬度转Web墨卡托 +def wgs_to_mercator(x, y): + y = 85.0511287798 if y > 85.0511287798 else y + y = -85.0511287798 if y < -85.0511287798 else y + + x2 = x * 20037508.34 / 180.0 + y2 = math.log(math.tan((90.0 + y) * math.pi / 360.0)) / (math.pi / 180.0) + + #print( ' y:',y, " before Log:",math.tan((90.0 + y) * math.pi / 360.0), ' log:' , math.log(math.tan((90.0 + y) * math.pi / 360.0))) + y2 = y2 * 20037508.34 / 180.0 + return x2, y2 +def mercator_to_wgs(x, y): + """ + 将墨卡托投影坐标转换为WGS-84经纬度坐标 + :param x: 墨卡托投影的X坐标 + :param y: 墨卡托投影的Y坐标 + :return: 经度(longitude)和纬度(latitude) + """ + # 地球半径(米) + R = 6378137.0 + # 墨卡托投影的X坐标转换为经度 + lon = x / R * 180.0 / math.pi + # 墨卡托投影的Y坐标转换为纬度 + lat = math.atan(math.sinh(y / R)) * 180.0 / math.pi + return lon, lat + + +def ImageCorToCamCor(p0,w=1920,h=1080): + x,y=p0[0:2] + return x-w/2.,(h-y)-h/2. + +def wgs84_to_gcj02(lat, lon): + """将 WGS-84 坐标转换为 GCJ-02 坐标 (高德地图坐标)""" + A = 6378245.0 # 长半轴 + EE = 0.00669342162296594323 # 偏心率平方 + if out_of_china(lat, lon): + return lat, lon # 如果在中国以外,直接返回 WGS-84 坐标 + + # 坐标转换 + dlat = transform_lat(lon - 105.0, lat - 35.0) + dlon = transform_lon(lon - 105.0, lat - 35.0) + radlat = lat / 180.0 * math.pi + magic = math.sin(radlat) + magic = 1 - EE * magic * magic + sqrt_magic = math.sqrt(magic) + dlat = (dlat * 180.0) / (A * (1 - EE) / (magic * sqrt_magic) * math.pi) + dlon = (dlon * 180.0) / (A / sqrt_magic * math.cos(radlat) * math.pi) + + mg_lat = lat + dlat + mg_lon = lon + dlon + + return mg_lat, mg_lon + +def out_of_china(lat, lon): + """检查坐标是否在中国以外""" + return lon < 72.004 or lon > 137.8347 or lat < 0.8293 or lat > 55.8271 + +def transform_lat(lon, lat): + """辅助函数: 进行纬度转换""" + ret = (-100.0 + 2.0 * lon + 3.0 * lat + 0.2 * lat * lat + + 0.1 * lon * lat + 0.2 * math.sqrt(abs(lon))) + ret += (20.0 * math.sin(6.0 * lon * PI) + 20.0 * math.sin(2.0 * lon * PI)) * 2.0 / 3.0 + ret += (20.0 * math.sin(lat * PI) + 40.0 * math.sin(lat / 3.0 * PI)) * 2.0 / 3.0 + ret += (160.0 * math.sin(lat / 12.0 * PI) + 320.0 * math.sin(lat * PI / 30.0)) * 2.0 / 3.0 + return ret + +def transform_lon(lon, lat): + """辅助函数: 进行经度转换""" + ret = (300.0 + lon + 2.0 * lat + 0.1 * lon * lon + + 0.1 * lon * lat + 0.1 * math.sqrt(abs(lon))) + ret += (20.0 * math.sin(6.0 * lon * PI) + 20.0 * math.sin(2.0 * lon * PI)) * 2.0 / 3.0 + ret += (20.0 * math.sin(lon * PI) + 40.0 * math.sin(lon / 3.0 * PI)) * 2.0 / 3.0 + ret += (150.0 * math.sin(lon / 12.0 * PI) + 300.0 * math.sin(lon / 30.0 * PI)) * 2.0 / 3.0 + return ret + +def cam2word(p0,pUAV,yaw,delta=1.55e-3 * 4056.0/1920,pitch=-45,f=4.5,camH=50e3,igW=1920,igH=1080): + + pitch = pitch/180.0*np.pi + sinp = np.sin(pitch );cosp= np.cos(pitch) + p0_new = ImageCorToCamCor(p0,igW,igH) + Xc0 = p0_new[0]*delta;Zc0 = p0_new[1]*delta; + + #(Zw0,Xw0)--相对于光心,X,Z并未校正到正东和正北。 + #f=4.5*f/24.00 + Zw0=camH*( -f*sinp + Zc0*cosp )/( f*cosp + Zc0*sinp)*1e-3 + Xw0= camH*Xc0/(f*cosp + Zc0*sinp)*1e-3 + #print(' %4.0f %4.0f %4.8f %4.8f %4.8f %4.8f f:%.2f'%( p0[0],p0[1], Xc0, Zc0,Xw0,Zw0,f ) ) + #yaw定义为拍摄方向,即图片的高方位(Z方向)偏离正北的方向,北偏东为正。 + yaw_rad = yaw/180.0*np.pi + siny=np.sin(yaw_rad);cosy=np.cos(yaw_rad) + Zx0_rot = Xw0*cosy + Zw0*siny + pUAV[0] + Zw0_rot = -Xw0*siny + Zw0*cosy + pUAV[1] + + + return Zx0_rot,Zw0_rot + +def location( point,igW,igH,PlanWgs84,PlanH,yaw,delta,pitch,focal,outFormat='wgs84'): + ''' + 输入图像中点的X,Y坐标,及无人机相关信息、相机相关信息,输出该点的Wgs84经纬度坐标 + point--点在图像上的坐标,左上角为(0,0),X方向为宽度方向,Y方向为高度方向 + igW--图像的宽度 + igH--图像的高度 + PlanWgs84--无人机的Wgs84坐标,(lon,lat),(经度,纬度) + PlanH--无人机拍照时的相对高度,用mm表示 + yaw--云台的yaw + delta--单个像素的长度值,用mm表示。 + pitch--无人的pitch + focal--真实焦距,用mm表示 + ''' + PlanX,PlanY = wgs_to_mercator(PlanWgs84[0],PlanWgs84[1]) + #print('location:',PlanX,PlanY) + #PlanX,PlanY--东西、南北方向的墨卡托投影坐标 + #print( 'line268:',point,PlanX,PlanY,yaw, delta,pitch,focal,PlanH,igW,igH ) + cor_world = cam2word( point,(PlanX,PlanY),yaw, delta,pitch,focal,PlanH,igW,igH) + cor_world = mercator_to_wgs(cor_world[0], cor_world[1]) + if outFormat=='GCJ02' or outFormat=='gcj02': + cor_world = wgs84_to_gcj02(cor_world[0], cor_world[1]) + + return cor_world + +def locate_byMqtt(box,igW,igH,camParas,outFormat='wgs84'): + #camParas--{'lon': 3479.8250608, 'lat': 3566.7630802, 'gpssingal': 4, 'satcount': 6896, 'alt': 3.256, 'hspeed': 86.0, 'vspeed': 4.447911, 'ysingal': 0, 'tsingal': 0, 'voltage': 24.971, 'flytime': 0, 'datetime': 1739315683895, 'yaw': 70.243252, 'roll': -0.89436062, 'pitch': 0.89897547, 'armed': 'false', 'mode': 'stabilize', 'distToHome': 7.132033, 'deviceid': 'THJSQ03A2302KSPYGJ2G', 'mileage': '0', 'altasl': 21.26, 'altasl2': -20.74, 'landing_target_x': 0, 'landing_target_y': 0, 'landing_target_z': 0} + #模型输出的点的格式是-[(486, 264), (505, 264), (505, 290), (486, 290)] + + box_np = np.array(box); + point = int(np.mean( box_np[:,0] )) , int(np.mean( box_np[:,1] )) + PlanWgs84 = (float(camParas['lon']),float(camParas['lat'])) # + PlanH = float(camParas['alt'])# + yaw = float(camParas['camerayaw']) + #delta = camParas[''] + delta = 1.55e-3 * 4056.0/1920 + pitch = float(camParas['camerapitch']) + #focal = camParas[''] + focal = 3.5 + + out = location( point,igW,igH,PlanWgs84,PlanH,yaw,delta,pitch,focal,outFormat='wgs84') + return out + +if __name__=="__main__": + srt="videos/DJI_20221220133918_0001_W_0.SRT" + videoUrl = "videos/DJI_20221220133918_0001_W.MP4" + imgOut= "videos/imgs" + fpbeg=17273;fpend=17830 + nums = list(range(fpbeg,fpend,16)) + #generateNewSRT(srt) + #captureImages(videoUrl,nums,imgOut) + + process(videoUrl,srt,nums,imW=1920,imH=1080,txtDir='videos/labels' ) + #draw_results() + + #rotate_example() + #rotate_example3() \ No newline at end of file diff --git a/util/ModelUtils.py b/util/ModelUtils.py index e49f85d..26c7086 100644 --- a/util/ModelUtils.py +++ b/util/ModelUtils.py @@ -331,6 +331,8 @@ class IMModel: new_device = torch.device(par['device']) model = torch.jit.load(par[img_type]['weights']) + logger.info("########################加载 ../AIlib2/weights/conf/jkm/plate_yolov5s_v3.jit 成功 ########################, requestId:{}", + requestId) self.model_conf = (modeType, allowedList, new_device, model, par, img_type) except Exception: logger.error("模型加载异常:{}, requestId:{}", format_exc(), requestId) @@ -346,6 +348,7 @@ def im_process(args): boxes = post_process(pred, padInfos, device, conf_thres=par['conf_thres'], iou_thres=par['iou_thres'], nc=par[img_type]['nc']) # 后处理 dataBack = get_return_data(frame, boxes, modelType=img_type, plate_dilate=par['plate_dilate']) + print('-------line351----:',dataBack) return dataBack except ServiceException as s: raise s @@ -613,6 +616,29 @@ MODEL_CONFIG = { lambda x, y, z: one_label(x, y, z), lambda x: model_process(x) ), + # 加载智慧工地模型 + ModelType.SMARTSITE_MODEL.value[1]: ( + lambda x, y, r, t, z, h: cityManagementModel(x, y, r, ModelType.SMARTSITE_MODEL, t, z, h), + ModelType.SMARTSITE_MODEL, + lambda x, y, z: one_label(x, y, z), + lambda x: detSeg_demo2(x) + ), + + # 加载垃圾模型 + ModelType.RUBBISH_MODEL.value[1]: ( + lambda x, y, r, t, z, h: cityManagementModel(x, y, r, ModelType.RUBBISH_MODEL, t, z, h), + ModelType.RUBBISH_MODEL, + lambda x, y, z: one_label(x, y, z), + lambda x: detSeg_demo2(x) + ), + + # 加载烟花模型 + ModelType.FIREWORK_MODEL.value[1]: ( + lambda x, y, r, t, z, h: cityManagementModel(x, y, r, ModelType.FIREWORK_MODEL, t, z, h), + ModelType.FIREWORK_MODEL, + lambda x, y, z: one_label(x, y, z), + lambda x: detSeg_demo2(x) + ), } diff --git a/util/__pycache__/LocationUtils.cpython-38.pyc b/util/__pycache__/LocationUtils.cpython-38.pyc new file mode 100644 index 0000000..e1d16ee Binary files /dev/null and b/util/__pycache__/LocationUtils.cpython-38.pyc differ diff --git a/util/__pycache__/LogUtils.cpython-38.pyc b/util/__pycache__/LogUtils.cpython-38.pyc index b28ac14..d6eeb6e 100644 Binary files a/util/__pycache__/LogUtils.cpython-38.pyc and b/util/__pycache__/LogUtils.cpython-38.pyc differ diff --git a/util/__pycache__/MinioSdk.cpython-38.pyc b/util/__pycache__/MinioSdk.cpython-38.pyc index fb08a49..3ef6bab 100644 Binary files a/util/__pycache__/MinioSdk.cpython-38.pyc and b/util/__pycache__/MinioSdk.cpython-38.pyc differ diff --git a/util/__pycache__/ModelUtils.cpython-38.pyc b/util/__pycache__/ModelUtils.cpython-38.pyc index 1361e98..2381de6 100644 Binary files a/util/__pycache__/ModelUtils.cpython-38.pyc and b/util/__pycache__/ModelUtils.cpython-38.pyc differ