163 lines
6.1 KiB
Python
163 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
||
from threading import Thread
|
||
from time import sleep, time
|
||
from traceback import format_exc
|
||
|
||
from loguru import logger
|
||
from common.YmlConstant import mqtt_yml_path
|
||
from util.RWUtils import getConfigs
|
||
from common.Constant import init_progess
|
||
from enums.AnalysisStatusEnum import AnalysisStatus
|
||
from entity.FeedBack import message_feedback
|
||
from enums.ExceptionEnum import ExceptionType
|
||
from exception.CustomerException import ServiceException
|
||
from util.QueUtil import get_no_block_queue, put_queue, clear_queue
|
||
from multiprocessing import Process, Queue
|
||
import paho.mqtt.client as mqtt
|
||
import json,os
|
||
class PullMqtt(Thread):
|
||
__slots__ = ('__fb_queue', '__mqtt_list', '__request_id', '__analyse_type', "_context" ,'__business')
|
||
|
||
def __init__(self, *args):
|
||
super().__init__()
|
||
self.__fb_queue, self.__mqtt_list, self.__request_id, self.__analyse_type, self._context, self.__business = args
|
||
|
||
base_dir, env = self._context["base_dir"], self._context["env"]
|
||
self.__config = getConfigs(os.path.join(base_dir, mqtt_yml_path % env))
|
||
if self.__business == 0:
|
||
self.__broker = self.__config['location']["broker"]
|
||
self.__port = self.__config['location']["port"]
|
||
self.__topic = self.__config['location']["topic"]
|
||
elif self.__business == 1:
|
||
self.__broker = self.__config['invade']["broker"]
|
||
self.__port = self.__config['invade']["port"]
|
||
self.__topic = self.__config['invade']["topic"]
|
||
self.__lengthMqttList = self.__config["length"]
|
||
|
||
|
||
def put_queue(self,__queue,data):
|
||
if __queue.full():
|
||
a = __queue.get()
|
||
__queue.put( data,block=True, timeout=2 )
|
||
def on_connect(self,client,userdata,flags,rc):
|
||
client.subscribe(self.__topic)
|
||
|
||
|
||
|
||
# 当接收到MQTT消息时,回调函数
|
||
def on_location(self,client, userdata, msg):
|
||
# 将消息解码为JSON格式
|
||
payload = msg.payload.decode('utf-8')
|
||
data = json.loads(payload)
|
||
#logger.info(str(data))
|
||
# 解析位姿信息
|
||
lon = data.get("lon")
|
||
lat = data.get("lat")
|
||
alt = data.get("alt")
|
||
yaw = data.get("yaw")
|
||
pitch = data.get("pitch")
|
||
roll = data.get("roll")
|
||
|
||
if len(self.__mqtt_list) == self.__lengthMqttList:
|
||
self.__mqtt_list.pop(0)
|
||
self.__mqtt_list.append([self.__business,data])
|
||
|
||
|
||
# 打印无人机的位姿信息
|
||
#print(f"Longitude: {lon}, Latitude: {lat}, Altitude: {alt}, sat:{data.get('satcount')} , list length:{len(self.__mqtt_list)}")
|
||
|
||
def on_invade(self, client, userdata, msg):
|
||
# 将消息解码为JSON格式
|
||
payload = msg.payload.decode('utf-8')
|
||
data = json.loads(payload)
|
||
# logger.info(str(data))
|
||
# 解析位姿信息
|
||
points = data.get("points")
|
||
|
||
if len(self.__mqtt_list) == self.__lengthMqttList:
|
||
self.__mqtt_list.pop(0)
|
||
self.__mqtt_list.append([self.__business,points])
|
||
|
||
# 打印无人机的位姿信息
|
||
# print(f"Longitude: {lon}, Latitude: {lat}, Altitude: {alt}, sat:{data.get('satcount')} , list length:{len(self.__mqtt_list)}")
|
||
|
||
def mqtt_connect(self):
|
||
# 创建客户端
|
||
self.client = mqtt.Client()
|
||
self.client.on_connect = self.on_connect
|
||
if self.__business == 0:
|
||
# 设置回调函数
|
||
self.client.on_message = self.on_location
|
||
elif self.__business == 1:
|
||
# 设置回调函数
|
||
self.client.on_message = self.on_invade
|
||
|
||
# 连接到 Broker
|
||
self.client.connect(self.__broker, self.__port)
|
||
|
||
# 订阅主题
|
||
self.client.subscribe(self.__topic)
|
||
# 循环等待并处理网络事件
|
||
self.client.loop_forever()
|
||
|
||
def mqtt_disconnect(self):
|
||
start_time = time()
|
||
while True:
|
||
if time() - start_time > service_timeout:
|
||
logger.error("MQTT读取超时, requestId: %s,限定时间:%.1s , 已运行:%.1fs"%(request_id,service_timeout, time() - start_time))
|
||
raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
|
||
ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
|
||
client.loop_stop() # 停止循环
|
||
client.disconnect() # 断开连接
|
||
|
||
def run(self):
|
||
request_id, mqtt_list, progress = self.__request_id, self.__mqtt_list, init_progess
|
||
analyse_type, fb_queue = self.__analyse_type, self.__fb_queue
|
||
#service_timeout = int(self.__config["service"]["timeout"]) + 120
|
||
|
||
try:
|
||
logger.info("开始MQTT读取线程!requestId:{}", request_id)
|
||
mqtt_init_num = 0
|
||
self.mqtt_connect()
|
||
|
||
except Exception:
|
||
logger.error("MQTT线程异常:{}, requestId:{}", format_exc(), request_id)
|
||
finally:
|
||
mqtt_list = []
|
||
logger.info("MQTT线程停止完成!requestId:{}", request_id)
|
||
|
||
|
||
def start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context):
|
||
mqtt_thread = PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
|
||
mqtt_thread.setDaemon(True)
|
||
mqtt_thread.start()
|
||
return mqtt_thread
|
||
def start_PullVideo(mqtt_list):
|
||
for i in range(1000):
|
||
sleep(1)
|
||
if len(mqtt_list)>=10:
|
||
print( mqtt_list[4])
|
||
print(i,len(mqtt_list))
|
||
if __name__=="__main__":
|
||
#context = {'service':{'timeout':3600},'mqtt':{
|
||
# 'broker':"101.133.163.127",'port':1883,'topic':"test/topic","length":10}
|
||
# }
|
||
context = {
|
||
'base_dir':'/home/th/WJ/test/tuoheng_algN',
|
||
'env':'test'
|
||
|
||
}
|
||
analyse_type = '1'
|
||
request_id = '123456789'
|
||
event_queue, pull_queue, mqtt_list, image_queue, push_queue, push_ex_queue = Queue(), Queue(10), [], Queue(), Queue(), Queue()
|
||
fb_queue = Queue()
|
||
mqtt_thread = start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
|
||
|
||
|
||
start_PullVideo(mqtt_list)
|
||
print('---line117--')
|
||
|
||
|
||
|
||
#mqtt_thread.join()
|
||
|