|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # -*- coding: utf-8 -*-
- from json import loads, dumps
- from traceback import format_exc
-
- import paho.mqtt.client as mqtt
- import time
-
- from loguru import logger
-
- from bean.Feedback import push_result
- from enums.ExceptionEnum import ExceptionType
- from enums.StatusEnum import StatusType
- from util.RWUtils import getConfigs
-
-
- class MqttClient:
- __slots__ = ("client", "config", "msg_queue", "fb_queue", "res_topic")
-
- def __init__(self, base_dir, msg_queue, fb_queue):
- self.client = None
- self.config = getConfigs(base_dir, "config/mqtt.yml")
- self.res_topic = {
- "stream": self.config["topic"]["stream"]["res_topic"] % self.config["mqtt"]["client_id"]
- }
- self.msg_queue = msg_queue
- self.fb_queue = fb_queue
-
- def create_client(self):
- if self.client is None:
- try:
- client_id = self.config["mqtt"]["client_id"]
- self.client = mqtt.Client(client_id=client_id, clean_session=True)
- # self.client.enable_logger(logger=logging)
- self.client.disable_logger()
- self.client.on_connect = self.on_connect
- self.client.on_subscribe = self.on_subscribe
- self.client.on_message = self.on_message
- # self.client.on_disconnect = self.on_disconnect
- # self.client.on_log = self.on_log
- stream_topic = self.config["topic"]["stream"]["sub_topic"] % client_id
- self.client.user_data_set({
- "sub_stream_topic": stream_topic,
- "msg_queue": self.msg_queue,
- "fb_queue": self.fb_queue,
- "topic": {
- stream_topic: "stream"
- }
- })
- self.client.username_pw_set(self.config["mqtt"]["username"], self.config["mqtt"]["password"])
- # self.client.will_set("willTopic", dumps({"message", "与服务器断开连接!!"}), 0, False)
- self.client.connect(host=self.config["mqtt"]["host"], port=self.config["mqtt"]["port"],
- keepalive=self.config["mqtt"]["keepalive"])
- except Exception:
- if self.client:
- self.client.disconnect()
- self.client.loop_stop()
- self.client = None
- logger.error("创建mqtt客户端异常: {}", format_exc())
-
- @staticmethod
- def on_connect(client, userdata, flags, rc):
- logger.info("mqtt客户端连接结果编号: {}, flags: {}", rc, flags)
- if 0 == rc:
- client.subscribe(userdata.get("sub_stream_topic"), 2)
- logger.info("mqtt客户端连接成功!sub_stream_topic: {}", userdata.get("sub_stream_topic"))
- elif 1 == rc:
- logger.error("mqtt连接失败, 不正确的协议版本!rc:{}", rc)
- elif 2 == rc:
- logger.error("mqtt连接失败, 无效的客户端标识符!rc:{}", rc)
- elif 3 == rc:
- logger.error("mqtt连接失败, 服务器不可用!rc:{}", rc)
- elif 4 == rc:
- logger.error("mqtt连接失败, 错误的用户名或密码!rc:{}", rc)
- elif 5 == rc:
- logger.error("mqtt连接失败, 未授权!rc:{}", rc)
- else:
- logger.error("mqtt连接失败, 未知异常!rc:{}", rc)
-
- # 当代理响应订阅请求时被调用
- @staticmethod
- def on_subscribe(client, userdata, mid, granted_qos):
- logger.info("mqtt开始订阅: {},{}!", mid, granted_qos)
-
- # @staticmethod
- # def on_disconnect(client, userdata, rc):
- # if rc != 0:
- # logger.info("mqtt端口连接!开始重新连接!!!!")
- # # 重新连接
- # client.reconnect()
-
- @staticmethod
- def on_publish(client, obj, mid):
- logger.info("mqtt开始发布: {},{}!", mid, obj)
-
- # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
- @staticmethod
- def on_message(client, userdata, msg):
- fb_queue, topic = userdata["fb_queue"], userdata["topic"]
- try:
- if msg is None or msg.payload is None or len(msg.payload) == 0:
- logger.error("mqtt监听到空消息!!!!!!!!!")
- return
- message = loads(msg.payload.decode())
- logger.info("mqtt消息监听, topic:{}, message: {}!", msg.topic, message)
- if topic.get(msg.topic) is not None:
- msg_queue = userdata["msg_queue"]
- msg_queue.put((topic.get(msg.topic), message), timeout=2)
- except Exception:
- logger.error("mqtt消息监听异常, topic:{}, message: {}, 异常: {}!", msg.topic, msg.payload, format_exc())
- if userdata.get("sub_stream_topic") == msg.topic:
- push_result(fb_queue,
- errorCode=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- errorMsg=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1],
- status=StatusType.FAILED.value[0])
-
- @staticmethod
- def on_log(client, obj, level, string):
- logger.info("当前日志信息, level: {}, msg: {}!", level, string)
-
- def start(self):
- try:
- self.create_client()
- self.client.loop_start()
- except Exception as e:
- logger.error("mqtt启动异常: {}", format_exc())
- raise e
-
- def stop(self):
- if self.client:
- self.client.disconnect()
- self.client.loop_stop()
-
- def publish(self, result):
- res_topic = self.res_topic
- retry_count = 0
- while True:
- try:
- logger.info("mqtt发布信, 消息体: {}", result)
- self.client.publish(res_topic.get(result[0]), payload=dumps(result[1]), qos=2)
- break
- except Exception as e:
- retry_count += 1
- time.sleep(1)
- logger.error("mqtt发布信息失败, 消息体: {}, 异常: {}", dumps(result), format_exc())
- if retry_count > 3:
- raise e
- del result
-
- # if __name__ == "__main__":
- # mq = MqttClient(r"D:\tuoheng\codenew\tuoheng_push_stream")
- # mq.start()
- # i = 0
- # while True:
- # time.sleep(1)
- # mq.publish({"tip": "1111111111111111111111"})
- # print('state: ', mq.client._state, 'loop进程:', mq. client._thread)
- # print(mq.client.is_connected())
|