You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
7.5KB

  1. # -*- coding: utf-8 -*-
  2. from json import loads, dumps
  3. from traceback import format_exc
  4. import paho.mqtt.client as mqtt
  5. import time
  6. from loguru import logger
  7. from bean.Feedback import upload_result, rtmp_result
  8. from enums.ExceptionEnum import ExceptionType, RtmpExceptionType
  9. from enums.StatusEnum import UploadTaskStatusType
  10. from util.RWUtils import getConfigs
  11. class MqttClient:
  12. __slots__ = ("client", "config", "msg_queue", "fb_queue", "res_topic")
  13. def __init__(self, base_dir, msg_queue, fb_queue):
  14. self.client = None
  15. self.config = getConfigs(base_dir, "config/mqtt.yml")
  16. self.res_topic = {
  17. "upload": self.config["topic"]["upload"]["res_topic"] % self.config["mqtt"]["airport_code"],
  18. "rtmp": self.config["topic"]["easy_rtmp_live"]["res_topic"] % self.config["mqtt"]["airport_code"]
  19. }
  20. self.msg_queue = msg_queue
  21. self.fb_queue = fb_queue
  22. def create_client(self):
  23. if self.client is None:
  24. try:
  25. client_id = self.config["mqtt"]["client_id"]
  26. airport_code = self.config["mqtt"]["airport_code"]
  27. self.client = mqtt.Client(client_id=client_id, clean_session=True)
  28. # self.client.enable_logger(logger=logging)
  29. self.client.disable_logger()
  30. self.client.on_connect = self.on_connect
  31. self.client.on_subscribe = self.on_subscribe
  32. self.client.on_message = self.on_message
  33. # self.client.on_disconnect = self.on_disconnect
  34. # self.client.on_log = self.on_log
  35. upload_topic = self.config["topic"]["upload"]["sub_topic"] % airport_code
  36. rtmp_topic = self.config["topic"]["easy_rtmp_live"]["sub_topic"] % airport_code
  37. self.client.user_data_set({
  38. "sub_upload_topic": upload_topic,
  39. "sub_rtmp_topic": rtmp_topic,
  40. "msg_queue": self.msg_queue,
  41. "fb_queue": self.fb_queue,
  42. "topic": {
  43. upload_topic: "upload",
  44. rtmp_topic: "rtmp"
  45. }
  46. })
  47. self.client.username_pw_set(self.config["mqtt"]["username"], self.config["mqtt"]["password"])
  48. # self.client.will_set("willTopic", dumps({"message", "与服务器断开连接!!"}), 0, False)
  49. self.client.connect(host=self.config["mqtt"]["host"], port=self.config["mqtt"]["port"],
  50. keepalive=self.config["mqtt"]["keepalive"])
  51. except Exception:
  52. if self.client:
  53. self.client.disconnect()
  54. self.client.loop_stop()
  55. self.client = None
  56. logger.error("创建mqtt客户端异常: {}", format_exc())
  57. @staticmethod
  58. def on_connect(client, userdata, flags, rc):
  59. logger.info("mqtt客户端连接结果编号: {}, flags: {}", rc, flags)
  60. if 0 == rc:
  61. client.subscribe(userdata.get("sub_upload_topic"), 2)
  62. client.subscribe(userdata.get("sub_rtmp_topic"), 2)
  63. logger.info("mqtt客户端连接成功!sub_upload_topic: {}", userdata.get("sub_upload_topic"))
  64. logger.info("mqtt客户端连接成功!sub_rtmp_topic: {}", userdata.get("sub_rtmp_topic"))
  65. elif 1 == rc:
  66. logger.error("mqtt连接失败, 不正确的协议版本!rc:{}", rc)
  67. elif 2 == rc:
  68. logger.error("mqtt连接失败, 无效的客户端标识符!rc:{}", rc)
  69. elif 3 == rc:
  70. logger.error("mqtt连接失败, 服务器不可用!rc:{}", rc)
  71. elif 4 == rc:
  72. logger.error("mqtt连接失败, 错误的用户名或密码!rc:{}", rc)
  73. elif 5 == rc:
  74. logger.error("mqtt连接失败, 未授权!rc:{}", rc)
  75. else:
  76. logger.error("mqtt连接失败, 未知异常!rc:{}", rc)
  77. # 当代理响应订阅请求时被调用
  78. @staticmethod
  79. def on_subscribe(client, userdata, mid, granted_qos):
  80. logger.info("mqtt开始订阅: {},{}!", mid, granted_qos)
  81. # @staticmethod
  82. # def on_disconnect(client, userdata, rc):
  83. # if rc != 0:
  84. # logger.info("mqtt端口连接!开始重新连接!!!!")
  85. # # 重新连接
  86. # client.reconnect()
  87. @staticmethod
  88. def on_publish(client, obj, mid):
  89. logger.info("mqtt开始发布: {},{}!", mid, obj)
  90. # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
  91. @staticmethod
  92. def on_message(client, userdata, msg):
  93. fb_queue, topic = userdata["fb_queue"], userdata["topic"]
  94. try:
  95. if msg is None or msg.payload is None or len(msg.payload) == 0:
  96. logger.error("mqtt监听到空消息!!!!!!!!!")
  97. return
  98. message = loads(msg.payload.decode())
  99. logger.info("mqtt消息监听, topic:{}, message: {}!", msg.topic, message)
  100. if topic.get(msg.topic) is not None:
  101. msg_queue = userdata["msg_queue"]
  102. msg_queue.put((topic.get(msg.topic), message), timeout=2)
  103. except Exception:
  104. logger.error("mqtt消息监听异常, topic:{}, message: {}, 异常: {}!", msg.topic, msg.payload, format_exc())
  105. if userdata.get("sub_upload_topic") == msg.topic:
  106. message = loads(msg.payload.decode())
  107. upload_result(fb_queue,
  108. message.get("request_id"),
  109. errorCode=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  110. errorMsg=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1],
  111. status=UploadTaskStatusType.FAILED.value[0],
  112. imageList=[],
  113. videoList=[])
  114. if userdata.get("sub_rtmp_topic") == msg.topic:
  115. rtmp_result(fb_queue,
  116. RtmpExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  117. RtmpExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  118. @staticmethod
  119. def on_log(client, obj, level, string):
  120. logger.info("当前日志信息, level: {}, msg: {}!", level, string)
  121. def start(self):
  122. try:
  123. self.create_client()
  124. self.client.loop_start()
  125. except Exception as e:
  126. logger.error("mqtt启动异常: {}", format_exc())
  127. raise e
  128. def stop(self):
  129. if self.client:
  130. self.client.disconnect()
  131. self.client.loop_stop()
  132. def publish(self, result):
  133. res_topic = self.res_topic
  134. retry_count = 0
  135. while True:
  136. try:
  137. logger.info("mqtt发布信, 消息体: {}", result)
  138. self.client.publish(res_topic.get(result[0]), payload=dumps(result[1]), qos=2)
  139. break
  140. except Exception as e:
  141. retry_count += 1
  142. time.sleep(1)
  143. logger.error("mqtt发布信息失败, 消息体: {}, 异常: {}", dumps(result), format_exc())
  144. if retry_count > 3:
  145. raise e
  146. del result
  147. # if __name__ == "__main__":
  148. # mq = MqttClient(r"D:\tuoheng\codenew\tuoheng_push_stream")
  149. # mq.start()
  150. # i = 0
  151. # while True:
  152. # time.sleep(1)
  153. # mq.publish({"tip": "1111111111111111111111"})
  154. # print('state: ', mq.client._state, 'loop进程:', mq. client._thread)
  155. # print(mq.client.is_connected())