You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
5.6KB

  1. # -*- coding: utf-8 -*-
  2. import queue
  3. import sys
  4. from multiprocessing import Queue
  5. from time import sleep
  6. from traceback import format_exc
  7. from cerberus import Validator
  8. from loguru import logger
  9. from bean.Feedback import push_result
  10. from concurrency.mqtt.MqttPushStreamProcess import PushStreamProcess
  11. from enums.ExceptionEnum import ExceptionType, StreamStrExceptionType
  12. from enums.StatusEnum import StatusType
  13. from exception.CustomerException import ServiceException
  14. from concurrency.mqtt.MqttFeedbackThread import FeedbackThread
  15. from util.MqttUtil import MqttClient
  16. from util.QueUtil import get_no_block_queue
  17. class MqttDispatcherService:
  18. __slots__ = "__service_config"
  19. def __init__(self, service_config):
  20. # 服务配置
  21. self.__service_config = service_config
  22. self.start_service()
  23. def start_service(self):
  24. service_config = self.__service_config
  25. msg_queue, fb_queue = queue.Queue(), Queue()
  26. feedbackThread = None
  27. # 当前任务记录
  28. task = {
  29. "stream": None
  30. }
  31. handle_method = {
  32. "stream": lambda x, y, z, h: handle_stream(x, y, z, h)
  33. }
  34. mq = MqttClient(service_config["base_dir"], msg_queue, fb_queue)
  35. mq.start()
  36. sleep(1)
  37. while True:
  38. try:
  39. for tk in list(task.keys()):
  40. if task[tk] is not None and not task[tk].is_alive():
  41. task[tk] = None
  42. feedbackThread = start_feedback_thread(mq, fb_queue, feedbackThread)
  43. if not mq.client.is_connected():
  44. logger.info("mqtt重连中")
  45. mq.stop()
  46. sleep(4)
  47. mq.client.reconnect()
  48. mq.client.loop_start()
  49. # 订阅消息处理
  50. message = get_no_block_queue(msg_queue)
  51. if message is not None and len(message) == 2 and isinstance(message, tuple):
  52. method = handle_method.get(message[0])
  53. if method is not None:
  54. method(message[1], task, fb_queue, service_config)
  55. else:
  56. sleep(1)
  57. except Exception:
  58. logger.error("服务异常: {}", format_exc())
  59. def handle_stream(msg, task, fb_queue, service_config):
  60. try:
  61. check_msg(STREAM_SCHEMA, msg)
  62. command = msg["command"]
  63. if 'start' == command:
  64. if task["stream"] is not None:
  65. logger.warning("推流任务已存在!!!")
  66. push_result(fb_queue,
  67. StreamStrExceptionType.PUSH_STREAM_TASK_IS_AREADLY.value[0],
  68. StreamStrExceptionType.PUSH_STREAM_TASK_IS_AREADLY.value[1],
  69. status=StatusType.FAILED.value[0])
  70. return
  71. pullUrl, pushUrl = msg.get("pullUrl"), msg.get("pushUrl")
  72. pp = PushStreamProcess(fb_queue, service_config, pullUrl, pushUrl)
  73. pp.start()
  74. task["stream"] = pp
  75. if 'stop' == command:
  76. if task["stream"] is None:
  77. logger.warning("推流任务不存在, 任务无法停止!")
  78. push_result(fb_queue,
  79. StreamStrExceptionType.PUSH_STREAM_TASK_IS_NOT_AREADLY.value[0],
  80. StreamStrExceptionType.PUSH_STREAM_TASK_IS_NOT_AREADLY.value[1],
  81. StatusType.FAILED.value[0])
  82. return
  83. task["stream"].send_event({"command": "stop"})
  84. except ServiceException as s:
  85. logger.error("消息处理异常: {}", s.msg)
  86. push_result(fb_queue, s.code, s.msg, StatusType.FAILED.value[0])
  87. except Exception:
  88. logger.error("消息处理异常: {}", format_exc())
  89. push_result(fb_queue,
  90. ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  91. ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  92. StatusType.FAILED.value[0])
  93. finally:
  94. del msg
  95. def check_msg(schema, msg):
  96. try:
  97. v = Validator(schema, allow_unknown=True)
  98. result = v.validate(msg)
  99. if not result:
  100. logger.error("参数校验异常: {}", v.errors)
  101. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  102. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  103. except ServiceException as s:
  104. raise s
  105. except Exception:
  106. logger.error("参数校验异常: {}", format_exc())
  107. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  108. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  109. def start_feedback_thread(mq, fb_queue, feedbackThread):
  110. if feedbackThread is None:
  111. feedbackThread = FeedbackThread(fb_queue, mq)
  112. feedbackThread.setDaemon(True)
  113. feedbackThread.start()
  114. else:
  115. if not feedbackThread.is_alive():
  116. logger.error("反馈线程异常停止! 开始终止程序!")
  117. sys.exit()
  118. return feedbackThread
  119. STREAM_SCHEMA = {
  120. 'command': {
  121. 'type': 'string',
  122. 'required': True,
  123. 'empty': False,
  124. 'nullable': False,
  125. 'allowed': ['start', 'stop']
  126. },
  127. 'pullUrl': {
  128. 'type': 'string',
  129. 'required': False,
  130. 'empty': True,
  131. 'nullable': True,
  132. 'maxlength': 255,
  133. 'regex': r'(^(https|http|rtsp|rtmp|artc|webrtc|ws)://\w.+$)?'
  134. },
  135. 'pushUrl': {
  136. 'type': 'string',
  137. 'required': False,
  138. 'nullable': True,
  139. 'empty': True,
  140. 'maxlength': 255,
  141. 'regex': r'(^(https|http|rtsp|rtmp|artc|webrtc|ws)://\w.+$)?'
  142. }
  143. }