You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
6.5KB

  1. # -*- coding: utf-8 -*-
  2. from json import loads, dumps
  3. from traceback import format_exc
  4. import paho.mqtt.client as mqtt
  5. import time
  6. from loguru import logger
  7. from bean.Feedback import push_result
  8. from enums.ExceptionEnum import ExceptionType
  9. from enums.StatusEnum import StatusType
  10. from util.RWUtils import getConfigs
  11. class MqttClient:
  12. __slots__ = ("client", "config", "msg_queue", "fb_queue", "res_topic")
  13. def __init__(self, base_dir, msg_queue, fb_queue):
  14. self.client = None
  15. self.config = getConfigs(base_dir, "config/mqtt.yml")
  16. self.res_topic = {
  17. "stream": self.config["topic"]["stream"]["res_topic"] % self.config["mqtt"]["client_id"]
  18. }
  19. self.msg_queue = msg_queue
  20. self.fb_queue = fb_queue
  21. def create_client(self):
  22. if self.client is None:
  23. try:
  24. client_id = self.config["mqtt"]["client_id"]
  25. self.client = mqtt.Client(client_id=client_id, clean_session=True)
  26. # self.client.enable_logger(logger=logging)
  27. self.client.disable_logger()
  28. self.client.on_connect = self.on_connect
  29. self.client.on_subscribe = self.on_subscribe
  30. self.client.on_message = self.on_message
  31. # self.client.on_disconnect = self.on_disconnect
  32. # self.client.on_log = self.on_log
  33. stream_topic = self.config["topic"]["stream"]["sub_topic"] % client_id
  34. self.client.user_data_set({
  35. "sub_stream_topic": stream_topic,
  36. "msg_queue": self.msg_queue,
  37. "fb_queue": self.fb_queue,
  38. "topic": {
  39. stream_topic: "stream"
  40. }
  41. })
  42. self.client.username_pw_set(self.config["mqtt"]["username"], self.config["mqtt"]["password"])
  43. # self.client.will_set("willTopic", dumps({"message", "与服务器断开连接!!"}), 0, False)
  44. self.client.connect(host=self.config["mqtt"]["host"], port=self.config["mqtt"]["port"],
  45. keepalive=self.config["mqtt"]["keepalive"])
  46. except Exception:
  47. if self.client:
  48. self.client.disconnect()
  49. self.client.loop_stop()
  50. self.client = None
  51. logger.error("创建mqtt客户端异常: {}", format_exc())
  52. @staticmethod
  53. def on_connect(client, userdata, flags, rc):
  54. logger.info("mqtt客户端连接结果编号: {}, flags: {}", rc, flags)
  55. if 0 == rc:
  56. client.subscribe(userdata.get("sub_stream_topic"), 2)
  57. logger.info("mqtt客户端连接成功!sub_stream_topic: {}", userdata.get("sub_stream_topic"))
  58. elif 1 == rc:
  59. logger.error("mqtt连接失败, 不正确的协议版本!rc:{}", rc)
  60. elif 2 == rc:
  61. logger.error("mqtt连接失败, 无效的客户端标识符!rc:{}", rc)
  62. elif 3 == rc:
  63. logger.error("mqtt连接失败, 服务器不可用!rc:{}", rc)
  64. elif 4 == rc:
  65. logger.error("mqtt连接失败, 错误的用户名或密码!rc:{}", rc)
  66. elif 5 == rc:
  67. logger.error("mqtt连接失败, 未授权!rc:{}", rc)
  68. else:
  69. logger.error("mqtt连接失败, 未知异常!rc:{}", rc)
  70. # 当代理响应订阅请求时被调用
  71. @staticmethod
  72. def on_subscribe(client, userdata, mid, granted_qos):
  73. logger.info("mqtt开始订阅: {},{}!", mid, granted_qos)
  74. # @staticmethod
  75. # def on_disconnect(client, userdata, rc):
  76. # if rc != 0:
  77. # logger.info("mqtt端口连接!开始重新连接!!!!")
  78. # # 重新连接
  79. # client.reconnect()
  80. @staticmethod
  81. def on_publish(client, obj, mid):
  82. logger.info("mqtt开始发布: {},{}!", mid, obj)
  83. # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
  84. @staticmethod
  85. def on_message(client, userdata, msg):
  86. fb_queue, topic = userdata["fb_queue"], userdata["topic"]
  87. try:
  88. if msg is None or msg.payload is None or len(msg.payload) == 0:
  89. logger.error("mqtt监听到空消息!!!!!!!!!")
  90. return
  91. message = loads(msg.payload.decode())
  92. logger.info("mqtt消息监听, topic:{}, message: {}!", msg.topic, message)
  93. if topic.get(msg.topic) is not None:
  94. msg_queue = userdata["msg_queue"]
  95. msg_queue.put((topic.get(msg.topic), message), timeout=2)
  96. except Exception:
  97. logger.error("mqtt消息监听异常, topic:{}, message: {}, 异常: {}!", msg.topic, msg.payload, format_exc())
  98. if userdata.get("sub_stream_topic") == msg.topic:
  99. push_result(fb_queue,
  100. errorCode=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  101. errorMsg=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1],
  102. status=StatusType.FAILED.value[0])
  103. @staticmethod
  104. def on_log(client, obj, level, string):
  105. logger.info("当前日志信息, level: {}, msg: {}!", level, string)
  106. def start(self):
  107. try:
  108. self.create_client()
  109. self.client.loop_start()
  110. except Exception as e:
  111. logger.error("mqtt启动异常: {}", format_exc())
  112. raise e
  113. def stop(self):
  114. if self.client:
  115. self.client.disconnect()
  116. self.client.loop_stop()
  117. def publish(self, result):
  118. res_topic = self.res_topic
  119. retry_count = 0
  120. while True:
  121. try:
  122. logger.info("mqtt发布信, 消息体: {}", result)
  123. self.client.publish(res_topic.get(result[0]), payload=dumps(result[1]), qos=2)
  124. break
  125. except Exception as e:
  126. retry_count += 1
  127. time.sleep(1)
  128. logger.error("mqtt发布信息失败, 消息体: {}, 异常: {}", dumps(result), format_exc())
  129. if retry_count > 3:
  130. raise e
  131. del result
  132. # if __name__ == "__main__":
  133. # mq = MqttClient(r"D:\tuoheng\codenew\tuoheng_push_stream")
  134. # mq.start()
  135. # i = 0
  136. # while True:
  137. # time.sleep(1)
  138. # mq.publish({"tip": "1111111111111111111111"})
  139. # print('state: ', mq.client._state, 'loop进程:', mq. client._thread)
  140. # print(mq.client.is_connected())