You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

237 lines
10KB

  1. # -*- coding: utf-8 -*-
  2. import queue
  3. import sys
  4. from json import loads
  5. from multiprocessing import Queue
  6. from time import sleep
  7. from traceback import format_exc
  8. from cerberus import Validator
  9. from loguru import logger
  10. from bean.Feedback import upload_result, rtmp_result
  11. from common.Constant import CHANNEL_LIST_URL, UPDATE_CHANNEL_URL
  12. from enums.ExceptionEnum import ExceptionType, UploadStrExceptionType
  13. from enums.HttpExceptionEnum import EasyExceptionType
  14. from enums.StatusEnum import UploadTaskStatusType
  15. from exception.CustomerException import ServiceException
  16. from concurrency.mqtt.MqttFeedbackThread import FeedbackThread
  17. from concurrency.mqtt.MqttUploadFileProcess import MqttUploadFileProcess
  18. from util.MqttUtil import MqttClient
  19. from util.QueUtil import get_no_block_queue
  20. from util.RequestUtil import HttpRequests
  21. class MqttDispatcherService:
  22. __slots__ = "__service_config"
  23. def __init__(self, service_config):
  24. # 服务配置
  25. self.__service_config = service_config
  26. self.start_service()
  27. def start_service(self):
  28. service_config = self.__service_config
  29. msg_queue, fb_queue = queue.Queue(), Queue()
  30. feedbackThread = None
  31. # 当前任务记录
  32. task = {
  33. "upload": None
  34. }
  35. handle_method = {
  36. "upload": lambda x, y, z, h: handle_upload(x, y, z, h),
  37. "rtmp": lambda x, y, z, h: handle_rtmp(x, y, z, h)
  38. }
  39. mq = MqttClient(service_config["base_dir"], msg_queue, fb_queue)
  40. mq.start()
  41. sleep(1)
  42. while True:
  43. try:
  44. for tk in list(task.keys()):
  45. if task[tk] is not None and not task[tk].is_alive():
  46. task[tk] = None
  47. feedbackThread = start_feedback_thread(mq, fb_queue, feedbackThread)
  48. if not mq.client.is_connected():
  49. logger.info("mqtt重连中")
  50. mq.stop()
  51. sleep(4)
  52. mq.client.reconnect()
  53. mq.client.loop_start()
  54. # 订阅消息处理
  55. message = get_no_block_queue(msg_queue)
  56. if message is not None and len(message) == 2 and isinstance(message, tuple):
  57. method = handle_method.get(message[0])
  58. if method is not None:
  59. method(message[1], task, fb_queue, service_config)
  60. else:
  61. sleep(1)
  62. except Exception:
  63. logger.error("服务异常: {}", format_exc())
  64. def handle_upload(msg, task, fb_queue, service_config):
  65. try:
  66. check_msg(UPLOAD_SCHEMA, msg)
  67. command = msg["command"]
  68. if 'start' == command:
  69. if task["upload"] is not None:
  70. logger.warning("上传任务已存在!!!")
  71. upload_result(fb_queue, msg["requestId"],
  72. errorCode=UploadStrExceptionType.UPLOAD_TASK_IS_AREADLY.value[0],
  73. errorMsg=UploadStrExceptionType.UPLOAD_TASK_IS_AREADLY.value[1],
  74. status=UploadTaskStatusType.FAILED.value[0])
  75. return
  76. upload_p = MqttUploadFileProcess(fb_queue, service_config, msg["requestId"])
  77. upload_p.start()
  78. task["upload"] = upload_p
  79. if 'stop' == command:
  80. if task["upload"] is None:
  81. logger.warning("上传任务不存在, 任务无法停止!")
  82. upload_result(fb_queue,
  83. UploadStrExceptionType.UPLOAD_TASK_IS_NOT_AREADLY.value[0],
  84. UploadStrExceptionType.UPLOAD_TASK_IS_NOT_AREADLY.value[1],
  85. UploadTaskStatusType.FAILED.value[0])
  86. return
  87. task["upload"].sendEvent({"command": "stop"})
  88. except ServiceException as s:
  89. logger.error("文件上传请求异常: {}", s.msg)
  90. upload_result(fb_queue, msg.get("requestId"),
  91. errorCode=s.code,
  92. errorMsg=s.msg,
  93. status=UploadTaskStatusType.FAILED.value[0])
  94. except Exception:
  95. logger.error("消息处理异常: {}", format_exc())
  96. upload_result(fb_queue, msg.get("requestId"),
  97. errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  98. errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  99. status=UploadTaskStatusType.FAILED.value[0])
  100. finally:
  101. del msg
  102. def handle_rtmp(msg, task, fb_queue, service_config):
  103. request = HttpRequests()
  104. headers = {'content-type': "application/json"}
  105. try:
  106. check_msg(RTMP_SCHEMA, msg)
  107. c_resp = request.send_request('GET', CHANNEL_LIST_URL, headers=headers, timeout=30)
  108. if c_resp.status_code != 200:
  109. logger.error("获取任务列表失败! 状态码: {}, {}", c_resp.status_code, c_resp.__dict__)
  110. raise ServiceException(EasyExceptionType.EASY_TASK_GET_FAIL.value[0],
  111. EasyExceptionType.EASY_TASK_GET_FAIL.value[1])
  112. else:
  113. content = c_resp.content.decode('utf-8')
  114. content = loads(content)
  115. channelList = content.get("ChannelList")
  116. if channelList is None or len(channelList) == 0:
  117. logger.error("获取EasyRtmpLive任务失败!")
  118. raise ServiceException(EasyExceptionType.EASY_TASK_GET_FAIL.value[0],
  119. EasyExceptionType.EASY_TASK_GET_FAIL.value[1])
  120. if channelList is not None and len(channelList) > 1:
  121. logger.error("EasyRtmpLive任务限制, 限制1个任务!")
  122. raise ServiceException(EasyExceptionType.EASY_TASK_LIMIT.value[0],
  123. EasyExceptionType.EASY_TASK_LIMIT.value[1])
  124. channel = channelList[0]
  125. command = msg["command"]
  126. if 'start' == command:
  127. if channel["enable"] == "true":
  128. logger.error("任务正在执行, 请稍后再试!")
  129. raise ServiceException(EasyExceptionType.TASK_IS_EXECUTING.value[0],
  130. EasyExceptionType.TASK_IS_EXECUTING.value[1])
  131. channel["enable"] = "true"
  132. if 'stop' == command:
  133. if channel["enable"] == "false":
  134. logger.info("EasyRtmpLive任务已停止!")
  135. rtmp_result(fb_queue, 0, "操作成功!")
  136. return
  137. channel["enable"] = "false"
  138. channel["srcURL"] = '"%s"' % channel["srcURL"]
  139. req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \
  140. (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"],
  141. channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"])
  142. url = UPDATE_CHANNEL_URL % req_param
  143. u_resp = request.send_request('GET', url, headers=headers, timeout=30)
  144. if u_resp.status_code != 200:
  145. if channel["enable"] == "true":
  146. channel["enable"] = "false"
  147. fail_req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \
  148. (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"],
  149. channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"])
  150. fail_url = UPDATE_CHANNEL_URL % fail_req_param
  151. request.send_request('GET', fail_url, headers=headers, timeout=30)
  152. logger.error("EasyRtmpLive启动失败! 状态码: {}, {}", u_resp.status_code, u_resp.__dict__)
  153. raise ServiceException(EasyExceptionType.EASY_TASK_START_FAIL.value[0],
  154. EasyExceptionType.EASY_TASK_START_FAIL.value[1])
  155. else:
  156. u_content = u_resp.content.decode('utf-8')
  157. if u_content == "OK":
  158. rtmp_result(fb_queue, 0, "操作成功!")
  159. else:
  160. raise ServiceException(EasyExceptionType.EASY_TASK_START_FAIL.value[0],
  161. EasyExceptionType.EASY_TASK_START_FAIL.value[1])
  162. except ServiceException as s:
  163. logger.error("EasyRtmpLive异常: {}", s.msg)
  164. rtmp_result(fb_queue, s.code, s.msg)
  165. except Exception:
  166. logger.error("EasyRtmpLive异常: {}", format_exc())
  167. rtmp_result(fb_queue,
  168. EasyExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  169. EasyExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  170. finally:
  171. del msg
  172. request.close_session()
  173. def check_msg(schema, msg):
  174. try:
  175. v = Validator(schema, allow_unknown=True)
  176. result = v.validate(msg)
  177. if not result:
  178. logger.error("参数校验异常: {}", v.errors)
  179. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  180. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  181. except ServiceException as s:
  182. raise s
  183. except Exception:
  184. logger.error("参数校验异常: {}", format_exc())
  185. raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
  186. ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
  187. def start_feedback_thread(mq, fb_queue, feedbackThread):
  188. if feedbackThread is None:
  189. feedbackThread = FeedbackThread(fb_queue, mq)
  190. feedbackThread.setDaemon(True)
  191. feedbackThread.start()
  192. else:
  193. if not feedbackThread.is_alive():
  194. logger.error("反馈线程异常停止! 开始终止程序!")
  195. sys.exit()
  196. return feedbackThread
  197. UPLOAD_SCHEMA = {
  198. "requestId": {
  199. 'type': 'string',
  200. 'required': True,
  201. 'empty': False,
  202. 'nullable': False,
  203. 'regex': r'^[a-zA-Z0-9]{1,36}$'
  204. },
  205. 'command': {
  206. 'type': 'string',
  207. 'required': True,
  208. 'empty': False,
  209. 'nullable': False,
  210. 'allowed': ['start', 'stop']
  211. }
  212. }
  213. RTMP_SCHEMA = {
  214. 'command': {
  215. 'type': 'string',
  216. 'required': True,
  217. 'empty': False,
  218. 'nullable': False,
  219. 'allowed': ['start', 'stop']
  220. }
  221. }