# -*- coding: utf-8 -*- import queue import sys from multiprocessing import Queue from time import sleep from traceback import format_exc from cerberus import Validator from loguru import logger from bean.Feedback import push_result from concurrency.mqtt.MqttPushStreamProcess import PushStreamProcess from enums.ExceptionEnum import ExceptionType, StreamStrExceptionType from enums.StatusEnum import StatusType from exception.CustomerException import ServiceException from concurrency.mqtt.MqttFeedbackThread import FeedbackThread from util.MqttUtil import MqttClient from util.QueUtil import get_no_block_queue class MqttDispatcherService: __slots__ = "__service_config" def __init__(self, service_config): # 服务配置 self.__service_config = service_config self.start_service() def start_service(self): service_config = self.__service_config msg_queue, fb_queue = queue.Queue(), Queue() feedbackThread = None # 当前任务记录 task = { "stream": None } handle_method = { "stream": lambda x, y, z, h: handle_stream(x, y, z, h) } mq = MqttClient(service_config["base_dir"], msg_queue, fb_queue) mq.start() sleep(1) while True: try: for tk in list(task.keys()): if task[tk] is not None and not task[tk].is_alive(): task[tk] = None feedbackThread = start_feedback_thread(mq, fb_queue, feedbackThread) if not mq.client.is_connected(): logger.info("mqtt重连中") mq.stop() sleep(4) mq.client.reconnect() mq.client.loop_start() # 订阅消息处理 message = get_no_block_queue(msg_queue) if message is not None and len(message) == 2 and isinstance(message, tuple): method = handle_method.get(message[0]) if method is not None: method(message[1], task, fb_queue, service_config) else: sleep(1) except Exception: logger.error("服务异常: {}", format_exc()) def handle_stream(msg, task, fb_queue, service_config): try: check_msg(STREAM_SCHEMA, msg) command = msg["command"] if 'start' == command: if task["stream"] is not None: logger.warning("推流任务已存在!!!") push_result(fb_queue, StreamStrExceptionType.PUSH_STREAM_TASK_IS_AREADLY.value[0], StreamStrExceptionType.PUSH_STREAM_TASK_IS_AREADLY.value[1], status=StatusType.FAILED.value[0]) return pullUrl, pushUrl = msg.get("pullUrl"), msg.get("pushUrl") pp = PushStreamProcess(fb_queue, service_config, pullUrl, pushUrl) pp.start() task["stream"] = pp if 'stop' == command: if task["stream"] is None: logger.warning("推流任务不存在, 任务无法停止!") push_result(fb_queue, StreamStrExceptionType.PUSH_STREAM_TASK_IS_NOT_AREADLY.value[0], StreamStrExceptionType.PUSH_STREAM_TASK_IS_NOT_AREADLY.value[1], StatusType.FAILED.value[0]) return task["stream"].send_event({"command": "stop"}) except ServiceException as s: logger.error("消息处理异常: {}", s.msg) push_result(fb_queue, s.code, s.msg, StatusType.FAILED.value[0]) except Exception: logger.error("消息处理异常: {}", format_exc()) push_result(fb_queue, ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1], StatusType.FAILED.value[0]) finally: del msg def check_msg(schema, msg): try: v = Validator(schema, allow_unknown=True) result = v.validate(msg) if not result: logger.error("参数校验异常: {}", v.errors) raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) except ServiceException as s: raise s except Exception: logger.error("参数校验异常: {}", format_exc()) raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1]) def start_feedback_thread(mq, fb_queue, feedbackThread): if feedbackThread is None: feedbackThread = FeedbackThread(fb_queue, mq) feedbackThread.setDaemon(True) feedbackThread.start() else: if not feedbackThread.is_alive(): logger.error("反馈线程异常停止! 开始终止程序!") sys.exit() return feedbackThread STREAM_SCHEMA = { 'command': { 'type': 'string', 'required': True, 'empty': False, 'nullable': False, 'allowed': ['start', 'stop'] }, 'pullUrl': { 'type': 'string', 'required': False, 'empty': True, 'nullable': True, 'maxlength': 255, 'regex': r'(^(https|http|rtsp|rtmp|artc|webrtc|ws)://\w.+$)?' }, 'pushUrl': { 'type': 'string', 'required': False, 'nullable': True, 'empty': True, 'maxlength': 255, 'regex': r'(^(https|http|rtsp|rtmp|artc|webrtc|ws)://\w.+$)?' } }