|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- # -*- coding: utf-8 -*-
- import queue
- import sys
- from json import loads
- from multiprocessing import Queue
- from time import sleep
- from traceback import format_exc
-
- from cerberus import Validator
- from loguru import logger
-
- from bean.Feedback import upload_result, rtmp_result
- from common.Constant import CHANNEL_LIST_URL, UPDATE_CHANNEL_URL
- from enums.ExceptionEnum import ExceptionType, UploadStrExceptionType
- from enums.HttpExceptionEnum import EasyExceptionType
- from enums.StatusEnum import UploadTaskStatusType
- from exception.CustomerException import ServiceException
- from concurrency.mqtt.MqttFeedbackThread import FeedbackThread
- from concurrency.mqtt.MqttUploadFileProcess import MqttUploadFileProcess
- from util.MqttUtil import MqttClient
- from util.QueUtil import get_no_block_queue
- from util.RequestUtil import HttpRequests
-
-
- class MqttDispatcherService:
- __slots__ = "__service_config"
-
- def __init__(self, service_config):
- # 服务配置
- self.__service_config = service_config
- self.start_service()
-
- def start_service(self):
- service_config = self.__service_config
- msg_queue, fb_queue = queue.Queue(), Queue()
- feedbackThread = None
- # 当前任务记录
- task = {
- "upload": None
- }
- handle_method = {
- "upload": lambda x, y, z, h: handle_upload(x, y, z, h),
- "rtmp": lambda x, y, z, h: handle_rtmp(x, y, z, h)
- }
- mq = MqttClient(service_config["base_dir"], msg_queue, fb_queue)
- mq.start()
- sleep(1)
- while True:
- try:
- for tk in list(task.keys()):
- if task[tk] is not None and not task[tk].is_alive():
- task[tk] = None
- feedbackThread = start_feedback_thread(mq, fb_queue, feedbackThread)
- if not mq.client.is_connected():
- logger.info("mqtt重连中")
- mq.stop()
- sleep(4)
- mq.client.reconnect()
- mq.client.loop_start()
- # 订阅消息处理
- message = get_no_block_queue(msg_queue)
- if message is not None and len(message) == 2 and isinstance(message, tuple):
- method = handle_method.get(message[0])
- if method is not None:
- method(message[1], task, fb_queue, service_config)
- else:
- sleep(1)
- except Exception:
- logger.error("服务异常: {}", format_exc())
-
-
- def handle_upload(msg, task, fb_queue, service_config):
- try:
- check_msg(UPLOAD_SCHEMA, msg)
- command = msg["command"]
- if 'start' == command:
- if task["upload"] is not None:
- logger.warning("上传任务已存在!!!")
- upload_result(fb_queue, msg["requestId"],
- errorCode=UploadStrExceptionType.UPLOAD_TASK_IS_AREADLY.value[0],
- errorMsg=UploadStrExceptionType.UPLOAD_TASK_IS_AREADLY.value[1],
- status=UploadTaskStatusType.FAILED.value[0])
- return
- upload_p = MqttUploadFileProcess(fb_queue, service_config, msg["requestId"])
- upload_p.start()
- task["upload"] = upload_p
- if 'stop' == command:
- if task["upload"] is None:
- logger.warning("上传任务不存在, 任务无法停止!")
- upload_result(fb_queue,
- UploadStrExceptionType.UPLOAD_TASK_IS_NOT_AREADLY.value[0],
- UploadStrExceptionType.UPLOAD_TASK_IS_NOT_AREADLY.value[1],
- UploadTaskStatusType.FAILED.value[0])
- return
- task["upload"].sendEvent({"command": "stop"})
- except ServiceException as s:
- logger.error("文件上传请求异常: {}", s.msg)
- upload_result(fb_queue, msg.get("requestId"),
- errorCode=s.code,
- errorMsg=s.msg,
- status=UploadTaskStatusType.FAILED.value[0])
- except Exception:
- logger.error("消息处理异常: {}", format_exc())
- upload_result(fb_queue, msg.get("requestId"),
- errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- status=UploadTaskStatusType.FAILED.value[0])
- finally:
- del msg
-
-
- def handle_rtmp(msg, task, fb_queue, service_config):
- request = HttpRequests()
- headers = {'content-type': "application/json"}
- try:
- check_msg(RTMP_SCHEMA, msg)
- c_resp = request.send_request('GET', CHANNEL_LIST_URL, headers=headers, timeout=30)
- if c_resp.status_code != 200:
- logger.error("获取任务列表失败! 状态码: {}, {}", c_resp.status_code, c_resp.__dict__)
- raise ServiceException(EasyExceptionType.EASY_TASK_GET_FAIL.value[0],
- EasyExceptionType.EASY_TASK_GET_FAIL.value[1])
- else:
- content = c_resp.content.decode('utf-8')
- content = loads(content)
- channelList = content.get("ChannelList")
- if channelList is None or len(channelList) == 0:
- logger.error("获取EasyRtmpLive任务失败!")
- raise ServiceException(EasyExceptionType.EASY_TASK_GET_FAIL.value[0],
- EasyExceptionType.EASY_TASK_GET_FAIL.value[1])
- if channelList is not None and len(channelList) > 1:
- logger.error("EasyRtmpLive任务限制, 限制1个任务!")
- raise ServiceException(EasyExceptionType.EASY_TASK_LIMIT.value[0],
- EasyExceptionType.EASY_TASK_LIMIT.value[1])
- channel = channelList[0]
- command = msg["command"]
- if 'start' == command:
- if channel["enable"] == "true":
- logger.error("任务正在执行, 请稍后再试!")
- raise ServiceException(EasyExceptionType.TASK_IS_EXECUTING.value[0],
- EasyExceptionType.TASK_IS_EXECUTING.value[1])
- channel["enable"] = "true"
- if 'stop' == command:
- if channel["enable"] == "false":
- logger.info("EasyRtmpLive任务已停止!")
- rtmp_result(fb_queue, 0, "操作成功!")
- return
- channel["enable"] = "false"
- channel["srcURL"] = '"%s"' % channel["srcURL"]
- req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \
- (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"],
- channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"])
- url = UPDATE_CHANNEL_URL % req_param
- u_resp = request.send_request('GET', url, headers=headers, timeout=30)
- if u_resp.status_code != 200:
- if channel["enable"] == "true":
- channel["enable"] = "false"
- fail_req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \
- (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"],
- channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"])
- fail_url = UPDATE_CHANNEL_URL % fail_req_param
- request.send_request('GET', fail_url, headers=headers, timeout=30)
- logger.error("EasyRtmpLive启动失败! 状态码: {}, {}", u_resp.status_code, u_resp.__dict__)
- raise ServiceException(EasyExceptionType.EASY_TASK_START_FAIL.value[0],
- EasyExceptionType.EASY_TASK_START_FAIL.value[1])
- else:
- u_content = u_resp.content.decode('utf-8')
- if u_content == "OK":
- rtmp_result(fb_queue, 0, "操作成功!")
- else:
- raise ServiceException(EasyExceptionType.EASY_TASK_START_FAIL.value[0],
- EasyExceptionType.EASY_TASK_START_FAIL.value[1])
- except ServiceException as s:
- logger.error("EasyRtmpLive异常: {}", s.msg)
- rtmp_result(fb_queue, s.code, s.msg)
- except Exception:
- logger.error("EasyRtmpLive异常: {}", format_exc())
- rtmp_result(fb_queue,
- EasyExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- EasyExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- finally:
- del msg
- request.close_session()
-
- def check_msg(schema, msg):
- try:
- v = Validator(schema, allow_unknown=True)
- result = v.validate(msg)
- if not result:
- logger.error("参数校验异常: {}", v.errors)
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
- except ServiceException as s:
- raise s
- except Exception:
- logger.error("参数校验异常: {}", format_exc())
- raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
- ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
-
-
- def start_feedback_thread(mq, fb_queue, feedbackThread):
- if feedbackThread is None:
- feedbackThread = FeedbackThread(fb_queue, mq)
- feedbackThread.setDaemon(True)
- feedbackThread.start()
- else:
- if not feedbackThread.is_alive():
- logger.error("反馈线程异常停止! 开始终止程序!")
- sys.exit()
- return feedbackThread
-
-
- UPLOAD_SCHEMA = {
- "requestId": {
- 'type': 'string',
- 'required': True,
- 'empty': False,
- 'nullable': False,
- 'regex': r'^[a-zA-Z0-9]{1,36}$'
- },
- 'command': {
- 'type': 'string',
- 'required': True,
- 'empty': False,
- 'nullable': False,
- 'allowed': ['start', 'stop']
- }
- }
-
- RTMP_SCHEMA = {
- 'command': {
- 'type': 'string',
- 'required': True,
- 'empty': False,
- 'nullable': False,
- 'allowed': ['start', 'stop']
- }
- }
|