You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
7.4KB

  1. # -*- coding: utf-8 -*-
  2. from json import loads, dumps
  3. from traceback import format_exc
  4. import paho.mqtt.client as mqtt
  5. import time
  6. from loguru import logger
  7. from bean.Feedback import upload_result, rtmp_result
  8. from enums.ExceptionEnum import ExceptionType, RtmpExceptionType
  9. from enums.StatusEnum import UploadTaskStatusType
  10. from util.RWUtils import getConfigs
  11. class MqttClient:
  12. __slots__ = ("client", "config", "msg_queue", "fb_queue", "res_topic")
  13. def __init__(self, base_dir, msg_queue, fb_queue):
  14. self.client = None
  15. self.config = getConfigs(base_dir, "config/mqtt.yml")
  16. self.res_topic = {
  17. "upload": self.config["topic"]["upload"]["res_topic"] % self.config["mqtt"]["client_id"],
  18. "rtmp": self.config["topic"]["easy_rtmp_live"]["res_topic"] % self.config["mqtt"]["client_id"]
  19. }
  20. self.msg_queue = msg_queue
  21. self.fb_queue = fb_queue
  22. def create_client(self):
  23. if self.client is None:
  24. try:
  25. client_id = self.config["mqtt"]["client_id"]
  26. self.client = mqtt.Client(client_id=client_id, clean_session=True)
  27. # self.client.enable_logger(logger=logging)
  28. self.client.disable_logger()
  29. self.client.on_connect = self.on_connect
  30. self.client.on_subscribe = self.on_subscribe
  31. self.client.on_message = self.on_message
  32. # self.client.on_disconnect = self.on_disconnect
  33. # self.client.on_log = self.on_log
  34. upload_topic = self.config["topic"]["upload"]["sub_topic"] % client_id
  35. rtmp_topic = self.config["topic"]["easy_rtmp_live"]["sub_topic"] % client_id
  36. self.client.user_data_set({
  37. "sub_upload_topic": upload_topic,
  38. "sub_rtmp_topic": rtmp_topic,
  39. "msg_queue": self.msg_queue,
  40. "fb_queue": self.fb_queue,
  41. "topic": {
  42. upload_topic: "upload",
  43. rtmp_topic: "rtmp"
  44. }
  45. })
  46. self.client.username_pw_set(self.config["mqtt"]["username"], self.config["mqtt"]["password"])
  47. # self.client.will_set("willTopic", dumps({"message", "与服务器断开连接!!"}), 0, False)
  48. self.client.connect(host=self.config["mqtt"]["host"], port=self.config["mqtt"]["port"],
  49. keepalive=self.config["mqtt"]["keepalive"])
  50. except Exception:
  51. if self.client:
  52. self.client.disconnect()
  53. self.client.loop_stop()
  54. self.client = None
  55. logger.error("创建mqtt客户端异常: {}", format_exc())
  56. @staticmethod
  57. def on_connect(client, userdata, flags, rc):
  58. logger.info("mqtt客户端连接结果编号: {}, flags: {}", rc, flags)
  59. if 0 == rc:
  60. client.subscribe(userdata.get("sub_upload_topic"), 2)
  61. client.subscribe(userdata.get("sub_rtmp_topic"), 2)
  62. logger.info("mqtt客户端连接成功!sub_upload_topic: {}", userdata.get("sub_upload_topic"))
  63. logger.info("mqtt客户端连接成功!sub_rtmp_topic: {}", userdata.get("sub_rtmp_topic"))
  64. elif 1 == rc:
  65. logger.error("mqtt连接失败, 不正确的协议版本!rc:{}", rc)
  66. elif 2 == rc:
  67. logger.error("mqtt连接失败, 无效的客户端标识符!rc:{}", rc)
  68. elif 3 == rc:
  69. logger.error("mqtt连接失败, 服务器不可用!rc:{}", rc)
  70. elif 4 == rc:
  71. logger.error("mqtt连接失败, 错误的用户名或密码!rc:{}", rc)
  72. elif 5 == rc:
  73. logger.error("mqtt连接失败, 未授权!rc:{}", rc)
  74. else:
  75. logger.error("mqtt连接失败, 未知异常!rc:{}", rc)
  76. # 当代理响应订阅请求时被调用
  77. @staticmethod
  78. def on_subscribe(client, userdata, mid, granted_qos):
  79. logger.info("mqtt开始订阅: {},{}!", mid, granted_qos)
  80. # @staticmethod
  81. # def on_disconnect(client, userdata, rc):
  82. # if rc != 0:
  83. # logger.info("mqtt端口连接!开始重新连接!!!!")
  84. # # 重新连接
  85. # client.reconnect()
  86. @staticmethod
  87. def on_publish(client, obj, mid):
  88. logger.info("mqtt开始发布: {},{}!", mid, obj)
  89. # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
  90. @staticmethod
  91. def on_message(client, userdata, msg):
  92. fb_queue, topic = userdata["fb_queue"], userdata["topic"]
  93. try:
  94. if msg is None or msg.payload is None or len(msg.payload) == 0:
  95. logger.error("mqtt监听到空消息!!!!!!!!!")
  96. return
  97. message = loads(msg.payload.decode())
  98. logger.info("mqtt消息监听, topic:{}, message: {}!", msg.topic, message)
  99. if topic.get(msg.topic) is not None:
  100. msg_queue = userdata["msg_queue"]
  101. msg_queue.put((topic.get(msg.topic), message), timeout=2)
  102. except Exception:
  103. logger.error("mqtt消息监听异常, topic:{}, message: {}, 异常: {}!", msg.topic, msg.payload, format_exc())
  104. if userdata.get("sub_upload_topic") == msg.topic:
  105. message = loads(msg.payload.decode())
  106. upload_result(fb_queue,
  107. message.get("request_id"),
  108. errorCode=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  109. errorMsg=ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1],
  110. status=UploadTaskStatusType.FAILED.value[0],
  111. imageList=[],
  112. videoList=[])
  113. if userdata.get("sub_rtmp_topic") == msg.topic:
  114. rtmp_result(fb_queue,
  115. RtmpExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  116. RtmpExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  117. @staticmethod
  118. def on_log(client, obj, level, string):
  119. logger.info("当前日志信息, level: {}, msg: {}!", level, string)
  120. def start(self):
  121. try:
  122. self.create_client()
  123. self.client.loop_start()
  124. except Exception as e:
  125. logger.error("mqtt启动异常: {}", format_exc())
  126. raise e
  127. def stop(self):
  128. if self.client:
  129. self.client.disconnect()
  130. self.client.loop_stop()
  131. def publish(self, result):
  132. res_topic = self.res_topic
  133. retry_count = 0
  134. while True:
  135. try:
  136. logger.info("mqtt发布信, 消息体: {}", result)
  137. self.client.publish(res_topic.get(result[0]), payload=dumps(result[1]), qos=2)
  138. break
  139. except Exception as e:
  140. retry_count += 1
  141. time.sleep(1)
  142. logger.error("mqtt发布信息失败, 消息体: {}, 异常: {}", dumps(result), format_exc())
  143. if retry_count > 3:
  144. raise e
  145. del result
  146. # if __name__ == "__main__":
  147. # mq = MqttClient(r"D:\tuoheng\codenew\tuoheng_push_stream")
  148. # mq.start()
  149. # i = 0
  150. # while True:
  151. # time.sleep(1)
  152. # mq.publish({"tip": "1111111111111111111111"})
  153. # print('state: ', mq.client._state, 'loop进程:', mq. client._thread)
  154. # print(mq.client.is_connected())