|
- # -*- coding: utf-8 -*-
- import queue
- import sys
- from multiprocessing import Queue
- from time import sleep
- from traceback import format_exc
-
- from cerberus import Validator
- from loguru import logger
-
- from bean.Feedback import push_result
- from concurrency.mqtt.MqttPushStreamProcess import PushStreamProcess
- from enums.ExceptionEnum import ExceptionType, StreamStrExceptionType
- from enums.StatusEnum import StatusType
- from exception.CustomerException import ServiceException
- from concurrency.mqtt.MqttFeedbackThread import FeedbackThread
- from util.MqttUtil import MqttClient
- from util.QueUtil import get_no_block_queue
-
-
- class MqttDispatcherService:
- __slots__ = "__service_config"
-
- def __init__(self, service_config):
- # 服务配置
- self.__service_config = service_config
- self.start_service()
-
- def start_service(self):
- service_config = self.__service_config
- msg_queue, fb_queue = queue.Queue(), Queue()
- feedbackThread = None
- # 当前任务记录
- task = {
- "stream": None
- }
- handle_method = {
- "stream": lambda x, y, z, h: handle_stream(x, y, z, h)
- }
- mq = MqttClient(service_config["base_dir"], msg_queue, fb_queue)
- mq.start()
- sleep(1)
- while True:
- try:
- for tk in list(task.keys()):
- if task[tk] is not None and not task[tk].is_alive():
- task[tk] = None
- feedbackThread = start_feedback_thread(mq, fb_queue, feedbackThread)
- if not mq.client.is_connected():
- logger.info("mqtt重连中")
- mq.stop()
- sleep(4)
- mq.client.reconnect()
- mq.client.loop_start()
- # 订阅消息处理
- message = get_no_block_queue(msg_queue)
- if message is not None and len(message) == 2 and isinstance(message, tuple):
- method = handle_method.get(message[0])
- if method is not None:
- method(message[1], task, fb_queue, service_config)
- else:
- sleep(1)
- except Exception:
- logger.error("服务异常: {}", format_exc())
-
-
- def handle_stream(msg, task, fb_queue, service_config):
- try:
- check_msg(STREAM_SCHEMA, msg)
- command = msg["command"]
- if 'start' == command:
- if task["stream"] is not None:
- logger.warning("推流任务已存在!!!")
- push_result(fb_queue,
- StreamStrExceptionType.PUSH_STREAM_TASK_IS_AREADLY.value[0],
- StreamStrExceptionType.PUSH_STREAM_TASK_IS_AREADLY.value[1],
- status=StatusType.FAILED.value[0])
- return
- pullUrl, pushUrl = msg.get("pullUrl"), msg.get("pushUrl")
- pp = PushStreamProcess(fb_queue, service_config, pullUrl, pushUrl)
- pp.start()
- task["stream"] = pp
- if 'stop' == command:
- if task["stream"] is None:
- logger.warning("推流任务不存在, 任务无法停止!")
- push_result(fb_queue,
- StreamStrExceptionType.PUSH_STREAM_TASK_IS_NOT_AREADLY.value[0],
- StreamStrExceptionType.PUSH_STREAM_TASK_IS_NOT_AREADLY.value[1],
- StatusType.FAILED.value[0])
- return
- task["stream"].send_event({"command": "stop"})
- except ServiceException as s:
- logger.error("消息处理异常: {}", s.msg)
- push_result(fb_queue, s.code, s.msg, StatusType.FAILED.value[0])
- except Exception:
- logger.error("消息处理异常: {}", format_exc())
- push_result(fb_queue,
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- StatusType.FAILED.value[0])
- finally:
- del msg
-
-
- def check_msg(schema, msg):
- try:
- v = Validator(schema, allow_unknown=True)
- result = v.validate(msg)
- if not result:
- logger.error("参数校验异常: {}", v.errors)
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
- except ServiceException as s:
- raise s
- except Exception:
- logger.error("参数校验异常: {}", format_exc())
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
-
-
- def start_feedback_thread(mq, fb_queue, feedbackThread):
- if feedbackThread is None:
- feedbackThread = FeedbackThread(fb_queue, mq)
- feedbackThread.setDaemon(True)
- feedbackThread.start()
- else:
- if not feedbackThread.is_alive():
- logger.error("反馈线程异常停止! 开始终止程序!")
- sys.exit()
- return feedbackThread
-
-
- STREAM_SCHEMA = {
- 'command': {
- 'type': 'string',
- 'required': True,
- 'empty': False,
- 'nullable': False,
- 'allowed': ['start', 'stop']
- },
- 'pullUrl': {
- 'type': 'string',
- 'required': False,
- 'empty': True,
- 'nullable': True,
- 'maxlength': 255,
- 'regex': r'(^(https|http|rtsp|rtmp|artc|webrtc|ws)://\w.+$)?'
- },
- 'pushUrl': {
- 'type': 'string',
- 'required': False,
- 'nullable': True,
- 'empty': True,
- 'maxlength': 255,
- 'regex': r'(^(https|http|rtsp|rtmp|artc|webrtc|ws)://\w.+$)?'
- }
- }
|