# -*- coding: utf-8 -*- from json import loads, dumps from traceback import format_exc import paho.mqtt.client as mqtt import time from loguru import logger from bean.Feedback import upload_result, rtmp_result from enums.ExceptionEnum import ExceptionType, RtmpExceptionType from enums.StatusEnum import UploadTaskStatusType from util.RWUtils import getConfigs class MqttClient: __slots__ = ("client", "config", "msg_queue", "fb_queue", "res_topic") def __init__(self, base_dir, msg_queue, fb_queue): self.client = None self.config = getConfigs(base_dir, "config/mqtt.yml") self.res_topic = { "upload": self.config["topic"]["upload"]["res_topic"] % self.config["mqtt"]["airport_code"], "rtmp": self.config["topic"]["easy_rtmp_live"]["res_topic"] % self.config["mqtt"]["airport_code"] } self.msg_queue = msg_queue self.fb_queue = fb_queue def create_client(self): if self.client is None: try: client_id = self.config["mqtt"]["client_id"] airport_code = self.config["mqtt"]["airport_code"] self.client = mqtt.Client(client_id=client_id, clean_session=True) # self.client.enable_logger(logger=logging) self.client.disable_logger() self.client.on_connect = self.on_connect self.client.on_subscribe = self.on_subscribe self.client.on_message = self.on_message # self.client.on_disconnect = self.on_disconnect # self.client.on_log = self.on_log upload_topic = self.config["topic"]["upload"]["sub_topic"] % airport_code rtmp_topic = self.config["topic"]["easy_rtmp_live"]["sub_topic"] % airport_code self.client.user_data_set({ "sub_upload_topic": upload_topic, "sub_rtmp_topic": rtmp_topic, "msg_queue": self.msg_queue, "fb_queue": self.fb_queue, "topic": { upload_topic: "upload", rtmp_topic: "rtmp" } }) self.client.username_pw_set(self.config["mqtt"]["username"], self.config["mqtt"]["password"]) # self.client.will_set("willTopic", dumps({"message", "与服务器断开连接!!"}), 0, False) self.client.connect(host=self.config["mqtt"]["host"], port=self.config["mqtt"]["port"], keepalive=self.config["mqtt"]["keepalive"]) except Exception: if self.client: self.client.disconnect() self.client.loop_stop() self.client = None logger.error("创建mqtt客户端异常: {}", format_exc()) @staticmethod def on_connect(client, userdata, flags, rc): logger.info("mqtt客户端连接结果编号: {}, flags: {}", rc, flags) if 0 == rc: client.subscribe(userdata.get("sub_upload_topic"), 2) client.subscribe(userdata.get("sub_rtmp_topic"), 2) logger.info("mqtt客户端连接成功!sub_upload_topic: {}", userdata.get("sub_upload_topic")) logger.info("mqtt客户端连接成功!sub_rtmp_topic: {}", userdata.get("sub_rtmp_topic")) elif 1 == rc: logger.error("mqtt连接失败, 不正确的协议版本!rc:{}", rc) elif 2 == rc: logger.error("mqtt连接失败, 无效的客户端标识符!rc:{}", rc) elif 3 == rc: logger.error("mqtt连接失败, 服务器不可用!rc:{}", rc) elif 4 == rc: logger.error("mqtt连接失败, 错误的用户名或密码!rc:{}", rc) elif 5 == rc: logger.error("mqtt连接失败, 未授权!rc:{}", rc) else: logger.error("mqtt连接失败, 未知异常!rc:{}", rc) # 当代理响应订阅请求时被调用 @staticmethod def on_subscribe(client, userdata, mid, granted_qos): logger.info("mqtt开始订阅: {},{}!", mid, granted_qos) # @staticmethod # def on_disconnect(client, userdata, rc): # if rc != 0: # logger.info("mqtt端口连接!开始重新连接!!!!") # # 重新连接 # client.reconnect() @staticmethod def on_publish(client, obj, mid): logger.info("mqtt开始发布: {},{}!", mid, obj) # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。 @staticmethod def on_message(client, userdata, msg): fb_queue, topic = userdata["fb_queue"], userdata["topic"] try: if msg is None or msg.payload is None or len(msg.payload) == 0: logger.error("mqtt监听到空消息!!!!!!!!!") return message = loads(msg.payload.decode()) logger.info("mqtt消息监听, topic:{}, message: {}!", msg.topic, message) if topic.get(msg.topic) is not None: msg_queue = userdata["msg_queue"] msg_queue.put((topic.get(msg.topic), message), timeout=2) except Exception: logger.error("mqtt消息监听异常, topic:{}, message: {}, 异常: {}!", msg.topic, msg.payload, format_exc()) if userdata.get("sub_upload_topic") == msg.topic: message = loads(msg.payload.decode()) upload_result(fb_queue, message.get("request_id"), errorCode=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], errorMsg=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1], status=UploadTaskStatusType.FAILED.value[0], imageList=[], videoList=[]) if userdata.get("sub_rtmp_topic") == msg.topic: rtmp_result(fb_queue, RtmpExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], RtmpExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) @staticmethod def on_log(client, obj, level, string): logger.info("当前日志信息, level: {}, msg: {}!", level, string) def start(self): try: self.create_client() self.client.loop_start() except Exception as e: logger.error("mqtt启动异常: {}", format_exc()) raise e def stop(self): if self.client: self.client.disconnect() self.client.loop_stop() def publish(self, result): res_topic = self.res_topic retry_count = 0 while True: try: logger.info("mqtt发布信, 消息体: {}", result) self.client.publish(res_topic.get(result[0]), payload=dumps(result[1]), qos=2) break except Exception as e: retry_count += 1 time.sleep(1) logger.error("mqtt发布信息失败, 消息体: {}, 异常: {}", dumps(result), format_exc()) if retry_count > 3: raise e del result # if __name__ == "__main__": # mq = MqttClient(r"D:\tuoheng\codenew\tuoheng_push_stream") # mq.start() # i = 0 # while True: # time.sleep(1) # mq.publish({"tip": "1111111111111111111111"}) # print('state: ', mq.client._state, 'loop进程:', mq. client._thread) # print(mq.client.is_connected())