add mqtt
This commit is contained in:
parent
f47a35d790
commit
3e118c3d84
|
|
@ -10,3 +10,5 @@ aliyun_yml_path = "config/aliyun/dsp_%s_aliyun.yml"
|
|||
baidu_yml_path = 'config/baidu/dsp_%s_baidu.yml'
|
||||
# minio配置路径
|
||||
minio_yml_path = 'config/minio/dsp_%s_minio.yml'
|
||||
# mqtt配置路径
|
||||
mqtt_yml_path = 'config/mqtt/dsp_%s_mqtt.yml'
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -6,7 +6,6 @@ from traceback import format_exc
|
|||
|
||||
from loguru import logger
|
||||
import cv2
|
||||
|
||||
from entity.FeedBack import message_feedback
|
||||
from enums.ExceptionEnum import ExceptionType
|
||||
from exception.CustomerException import ServiceException
|
||||
|
|
@ -19,18 +18,18 @@ from util.QueUtil import put_queue, get_no_block_queue, clear_queue
|
|||
import io
|
||||
|
||||
class FileUpload(Thread):
|
||||
__slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg')
|
||||
__slots__ = ('_fb_queue', '_context', '_image_queue', '_analyse_type', '_msg','_mqtt_list')
|
||||
|
||||
def __init__(self, *args):
|
||||
super().__init__()
|
||||
self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type = args
|
||||
self._fb_queue, self._context, self._msg, self._image_queue, self._analyse_type,self._mqtt_list = args
|
||||
self._storage_source = self._context['service']['storage_source']
|
||||
|
||||
class ImageFileUpload(FileUpload):
|
||||
__slots__ = ()
|
||||
|
||||
@staticmethod
|
||||
def handle_image(frame_msg, frame_step):
|
||||
#@staticmethod
|
||||
def handle_image(self,frame_msg, frame_step):
|
||||
# (high_score_image["code"], all_frames, draw_config["font_config"])
|
||||
# high_score_image["code"][code][cls] = (frame, frame_index_list[i], cls_list)
|
||||
det_xywh, frame, current_frame, all_frames, font_config = frame_msg
|
||||
|
|
@ -43,6 +42,10 @@ class ImageFileUpload(FileUpload):
|
|||
模型编号:modeCode
|
||||
检测目标:detectTargetCode
|
||||
'''
|
||||
#print('*'*100,' mqtt_list:',len(self._mqtt_list))
|
||||
#if len(self._mqtt_list)>=10:
|
||||
# print(' mqtt[4]:',self._mqtt_list[0]['satcount'])
|
||||
|
||||
model_info = []
|
||||
# 更加模型编码解析数据
|
||||
for code, det_list in det_xywh.items():
|
||||
|
|
@ -170,6 +173,7 @@ def build_image_name(*args):
|
|||
time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S")
|
||||
return "%s/%s_frame-%s-%s_type_%s-%s-%s-%s_%s.jpg" % (request_id, time_now, current_frame, last_frame,
|
||||
random_num, mode_type, modeCode, target, image_type)
|
||||
|
||||
|
||||
|
||||
class ImageTypeImageFileUpload(Thread):
|
||||
|
|
|
|||
|
|
@ -117,8 +117,8 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
|
|||
def upload_video(self,base_dir, env, request_id, orFilePath, aiFilePath):
|
||||
if self._storage_source==1:
|
||||
minioSdk = MinioSdk(base_dir, env, request_id )
|
||||
upload_video_thread_or = Common(minioSdk.put_object, orFilePath, "%s/or_online.mp4" % request_id)
|
||||
upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "%s/ai_online.mp4" % request_id)
|
||||
upload_video_thread_or = Common(minioSdk.put_object, orFilePath, "or_online_%s.mp4" % request_id)
|
||||
upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "ai_online_%s.mp4" % request_id)
|
||||
else:
|
||||
aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
|
||||
upload_video_thread_or = Common(aliyunVodSdk.get_play_url, orFilePath, "or_online_%s" % request_id)
|
||||
|
|
@ -439,7 +439,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
|
|||
|
||||
if self._storage_source==1:
|
||||
minioSdk = MinioSdk(base_dir, env, request_id )
|
||||
upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "%s/ai_online.mp4" % request_id)
|
||||
upload_video_thread_ai = Common(minioSdk.put_object, aiFilePath, "ai_online_%s.mp4" % request_id)
|
||||
else:
|
||||
aliyunVodSdk = ThAliyunVodSdk(base_dir, env, request_id)
|
||||
upload_video_thread_ai = Common(aliyunVodSdk.get_play_url, aiFilePath, "ai_online_%s" % request_id)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,142 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from threading import Thread
|
||||
from time import sleep, time
|
||||
from traceback import format_exc
|
||||
|
||||
from loguru import logger
|
||||
from common.YmlConstant import mqtt_yml_path
|
||||
from util.RWUtils import getConfigs
|
||||
from common.Constant import init_progess
|
||||
from enums.AnalysisStatusEnum import AnalysisStatus
|
||||
from entity.FeedBack import message_feedback
|
||||
from enums.ExceptionEnum import ExceptionType
|
||||
from exception.CustomerException import ServiceException
|
||||
from util.QueUtil import get_no_block_queue, put_queue, clear_queue
|
||||
from multiprocessing import Process, Queue
|
||||
import paho.mqtt.client as mqtt
|
||||
import json,os
|
||||
class PullMqtt(Thread):
|
||||
__slots__ = ('__fb_queue', '__mqtt_list', '__request_id', '__analyse_type', "_context")
|
||||
|
||||
def __init__(self, *args):
|
||||
super().__init__()
|
||||
self.__fb_queue, self.__mqtt_list, self.__request_id, self.__analyse_type, self._context = args
|
||||
|
||||
base_dir, env = self._context["base_dir"], self._context["env"]
|
||||
self.__config = getConfigs(os.path.join(base_dir, mqtt_yml_path % env))
|
||||
|
||||
self.__broker = self.__config["broker"]
|
||||
self.__port = self.__config["port"]
|
||||
self.__topic = self.__config["topic"]
|
||||
self.__lengthMqttList = self.__config["length"]
|
||||
|
||||
|
||||
def put_queue(self,__queue,data):
|
||||
if __queue.full():
|
||||
a = __queue.get()
|
||||
__queue.put( data,block=True, timeout=2 )
|
||||
def on_connect(self,client,userdata,flags,rc):
|
||||
client.subscribe(self.__topic)
|
||||
|
||||
|
||||
|
||||
# 当接收到MQTT消息时,回调函数
|
||||
def on_message(self,client, userdata, msg):
|
||||
# 将消息解码为JSON格式
|
||||
payload = msg.payload.decode('utf-8')
|
||||
data = json.loads(payload)
|
||||
#logger.info(str(data))
|
||||
|
||||
|
||||
# 解析位姿信息
|
||||
lon = data.get("lon")
|
||||
lat = data.get("lat")
|
||||
alt = data.get("alt")
|
||||
yaw = data.get("yaw")
|
||||
pitch = data.get("pitch")
|
||||
roll = data.get("roll")
|
||||
|
||||
if len(self.__mqtt_list) == self.__lengthMqttList:
|
||||
self.__mqtt_list.pop(0)
|
||||
self.__mqtt_list.append(data)
|
||||
|
||||
|
||||
# 打印无人机的位姿信息
|
||||
#print(f"Longitude: {lon}, Latitude: {lat}, Altitude: {alt}, sat:{data.get('satcount')} , list length:{len(self.__mqtt_list)}")
|
||||
|
||||
def mqtt_connect(self):
|
||||
# 创建客户端
|
||||
self.client = mqtt.Client()
|
||||
self.client.on_connect = self.on_connect
|
||||
# 设置回调函数
|
||||
self.client.on_message = self.on_message
|
||||
|
||||
# 连接到 Broker
|
||||
self.client.connect(self.__broker, self.__port)
|
||||
|
||||
# 订阅主题
|
||||
self.client.subscribe(self.__topic)
|
||||
# 循环等待并处理网络事件
|
||||
self.client.loop_forever()
|
||||
|
||||
def mqtt_disconnect(self):
|
||||
start_time = time()
|
||||
while True:
|
||||
if time() - start_time > service_timeout:
|
||||
logger.error("MQTT读取超时, requestId: %s,限定时间:%.1s , 已运行:%.1fs"%(request_id,service_timeout, time() - start_time))
|
||||
raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
|
||||
ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
|
||||
client.loop_stop() # 停止循环
|
||||
client.disconnect() # 断开连接
|
||||
|
||||
def run(self):
|
||||
request_id, mqtt_list, progress = self.__request_id, self.__mqtt_list, init_progess
|
||||
analyse_type, fb_queue = self.__analyse_type, self.__fb_queue
|
||||
#service_timeout = int(self.__config["service"]["timeout"]) + 120
|
||||
|
||||
try:
|
||||
logger.info("开始MQTT读取线程!requestId:{}", request_id)
|
||||
mqtt_init_num = 0
|
||||
self.mqtt_connect()
|
||||
|
||||
except Exception:
|
||||
logger.error("MQTT线程异常:{}, requestId:{}", format_exc(), request_id)
|
||||
finally:
|
||||
mqtt_list = []
|
||||
logger.info("MQTT线程停止完成!requestId:{}", request_id)
|
||||
|
||||
|
||||
def start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context):
|
||||
mqtt_thread = PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
|
||||
mqtt_thread.setDaemon(True)
|
||||
mqtt_thread.start()
|
||||
return mqtt_thread
|
||||
def start_PullVideo(mqtt_list):
|
||||
for i in range(1000):
|
||||
sleep(1)
|
||||
if len(mqtt_list)>=10:
|
||||
print( mqtt_list[4])
|
||||
print(i,len(mqtt_list))
|
||||
if __name__=="__main__":
|
||||
#context = {'service':{'timeout':3600},'mqtt':{
|
||||
# 'broker':"101.133.163.127",'port':1883,'topic':"test/topic","length":10}
|
||||
# }
|
||||
context = {
|
||||
'base_dir':'/home/th/WJ/test/tuoheng_algN',
|
||||
'env':'test'
|
||||
|
||||
}
|
||||
analyse_type = '1'
|
||||
request_id = '123456789'
|
||||
event_queue, pull_queue, mqtt_list, image_queue, push_queue, push_ex_queue = Queue(), Queue(10), [], Queue(), Queue(), Queue()
|
||||
fb_queue = Queue()
|
||||
mqtt_thread = start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
|
||||
|
||||
|
||||
start_PullVideo(mqtt_list)
|
||||
print('---line117--')
|
||||
|
||||
|
||||
|
||||
#mqtt_thread.join()
|
||||
|
||||
|
|
@ -7,7 +7,7 @@ from traceback import format_exc
|
|||
|
||||
import psutil
|
||||
from loguru import logger
|
||||
|
||||
from concurrency.PullMqttThread import PullMqtt
|
||||
from util.LogUtils import init_log
|
||||
from concurrency.FileUploadThread import ImageFileUpload
|
||||
from entity.FeedBack import message_feedback
|
||||
|
|
@ -20,7 +20,7 @@ from util.QueUtil import get_no_block_queue, put_queue, clear_queue, put_queue_r
|
|||
|
||||
class PullVideoStreamProcess(Process):
|
||||
__slots__ = ("_command_queue", "_msg", "_context", "_fb_queue", "_pull_queue", "_image_queue", "_analyse_type",
|
||||
"_frame_num")
|
||||
"_frame_num","_mqtt_list")
|
||||
|
||||
def __init__(self, *args):
|
||||
super().__init__()
|
||||
|
|
@ -30,16 +30,24 @@ class PullVideoStreamProcess(Process):
|
|||
# 传参
|
||||
self._msg, self._context, self._fb_queue, self._pull_queue, self._image_queue, self._analyse_type, \
|
||||
self._frame_num = args
|
||||
|
||||
self._mqtt_list = []
|
||||
def sendCommand(self, result):
|
||||
put_queue(self._command_queue, result, timeout=2, is_ex=True)
|
||||
|
||||
@staticmethod
|
||||
def start_File_upload(fb_queue, context, msg, image_queue, analyse_type):
|
||||
image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type)
|
||||
def start_File_upload(fb_queue, context, msg, image_queue, analyse_type,mqtt_list):
|
||||
image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type,mqtt_list)
|
||||
image_thread.setDaemon(True)
|
||||
image_thread.start()
|
||||
return image_thread
|
||||
|
||||
@staticmethod
|
||||
def start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context):
|
||||
mqtt_thread = PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
|
||||
mqtt_thread.setDaemon(True)
|
||||
mqtt_thread.start()
|
||||
return mqtt_thread
|
||||
|
||||
|
||||
@staticmethod
|
||||
def check(start_time, service_timeout, request_id, image_thread):
|
||||
|
|
@ -63,8 +71,8 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
|
|||
request_id, pull_url = msg["request_id"], msg["pull_url"]
|
||||
pull_stream_timeout, read_stream_timeout, service_timeout = int(service["cv2_pull_stream_timeout"]), \
|
||||
int(service["cv2_read_stream_timeout"]), int(service["timeout"]) + 120
|
||||
command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \
|
||||
self._fb_queue
|
||||
command_queue, pull_queue, image_queue, fb_queue ,mqtt_list= self._command_queue, self._pull_queue, self._image_queue, \
|
||||
self._fb_queue,self._mqtt_list
|
||||
image_thread, ex = None, None
|
||||
width, height, width_height_3, all_frames, w_2, h_2, pull_p = None, None, None, 0, None, None, None
|
||||
frame_list, frame_index_list = [], []
|
||||
|
|
@ -73,8 +81,13 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
|
|||
# 初始化日志
|
||||
init_log(base_dir, env)
|
||||
logger.info("开启启动实时视频拉流进程, requestId:{},pid:{},ppid:{}", request_id,os.getpid(),os.getppid())
|
||||
|
||||
#开启mqtt
|
||||
if service["mqtt_flag"]==1:
|
||||
mqtt_thread = self.start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
|
||||
|
||||
# 开启图片上传线程
|
||||
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
|
||||
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type,mqtt_list)
|
||||
cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1
|
||||
start_time, pull_stream_start_time, read_start_time, full_timeout = time(), None, None, None
|
||||
while True:
|
||||
|
|
@ -219,7 +232,7 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
|
|||
logger.info("开启离线视频拉流进程, requestId:{}", request_id)
|
||||
|
||||
# 开启图片上传线程
|
||||
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
|
||||
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type,[])
|
||||
|
||||
# 初始化拉流工具类
|
||||
cv2_init_num, concurrent_frame = 0, 1
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,6 @@
|
|||
mqtt_flag: false
|
||||
broker : "101.133.163.127"
|
||||
port : 1883
|
||||
topic: "test/topic"
|
||||
# 存储多少条消息到list里
|
||||
length: 10
|
||||
|
|
@ -28,4 +28,6 @@ service:
|
|||
limit: 20
|
||||
#storage source,0--aliyun,1--minio
|
||||
storage_source: 1
|
||||
#是否启用mqtt,0--不用,1--启用
|
||||
mqtt_flag: 0
|
||||
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,6 @@
|
|||
1.2025.01.21把之前的tuoheng alg仓库代码重新开个仓库
|
||||
(1)在config/service/dsp_test_service.yml里面添加参数,控制存储用的oss还是minio
|
||||
storage_source: 1
|
||||
2.2025.02.06
|
||||
(1)修改代码,把mqtt读取加入到系统中。config/service/dsp_test_service.yml,中添加mqtt_flag,决定是否启用。
|
||||
(2)修改了minio情况下的,文件名命名方式。
|
||||
Binary file not shown.
Binary file not shown.
|
|
@ -56,7 +56,9 @@ class MinioSdk:
|
|||
bucketName,mime = self.get_bucknetName_from_filename(localPath)
|
||||
|
||||
self.create_bucknet(bucketName)
|
||||
remoteUrl=join(request_id,remotePath )
|
||||
if '/' not in remotePath:
|
||||
remoteUrl=join(request_id,remotePath )
|
||||
else: remoteUrl = remotePath
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
while True:
|
||||
|
|
@ -75,7 +77,6 @@ class MinioSdk:
|
|||
os.system(cmd1 )
|
||||
cmd2 = 'mv %s %s'%(newLocalpath,localPath )
|
||||
os.system(cmd2 )
|
||||
#print( '-----line72: mime:',mime )
|
||||
ret = self.minioClient.fput_object(bucketName, remoteUrl,localPath,content_type=mime)
|
||||
#print('-----line72: mime:',mime)
|
||||
|
||||
|
|
@ -101,7 +102,6 @@ if __name__ == "__main__":
|
|||
|
||||
|
||||
remotePath='or.mp4'
|
||||
#remotePath='or.jpg'
|
||||
minioSdk = MinioSdk(base_dir, env, request_id )
|
||||
|
||||
##上传cv2编码后的图像np数组
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue