# -*- coding: utf-8 -*- import time from traceback import format_exc from multiprocessing import Process, Queue from loguru import logger from concurrency.PushStreamThread import PushSteamThread from enums.StatusEnum import PushStreamStatus, ExecuteStatus from util.LogUtils import init_log from enums.ExceptionEnum import ExceptionType from entity.FeedBack import pull_stream_feedback from exception.CustomerException import ServiceException from util.QueUtil import get_no_block_queue class PushStreamProcess(Process): __slots__ = [ '_fbQueue', '_eventQueue', '_context', '_msg', '_analysisType', '_base_dir' ] def __init__(self, param): super().__init__() self._fbQueue = param.fbqueue self.eventQueue = Queue() self._context = param.context self._msg = param.msg self._analysisType = param.analyse_type self._base_dir = param.base_dir def sendEvent(self, eBody): self.eventQueue.put(eBody) # 获取下一个事件 def getEvent(self): return get_no_block_queue(self.eventQueue) # 推送执行结果 def sendResult(self, result): self._fbQueue.put(result) def sendhbMessage(self, status, videoIds, error_code="", error_msg=""): self.sendResult({"pull_stream": pull_stream_feedback(self._msg.get("requestId"), status, error_code, error_msg, videoIds)}) def run(self): task = {} videoStatus = {} requestId = self._msg.get("requestId") try: # 程序开始时间 init_log(self._base_dir) videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.WAITING.value[0]} for url in self._msg.get("videoUrls", []) if url.get("id")] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.WAITING.value[0], "", "", videoInfo)}) videoUrls = self._msg.get("videoUrls") requestId = self._msg.get("requestId") if videoUrls is None or len(videoUrls) == 0: raise ServiceException(ExceptionType.PUSH_STREAM_URL_IS_NULL.value[0], ExceptionType.PUSH_STREAM_URL_IS_NULL.value[1]) if len(videoUrls) > 5: logger.error("推流数量超过限制, 当前推流数量: {}, requestId:{}", len(videoUrls), requestId) raise ServiceException(ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[0], ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[1]) for videoUrl in videoUrls: pushThread = PushSteamThread(videoUrl["pullUrl"], videoUrl["pushUrl"], requestId, videoUrl["id"]) pushThread.start() task[videoUrl["id"]] = pushThread enable_time = time.time() for video in videoInfo: videoStatus[video.get("id")] = video.get("status") count = 0 while True: # 整个推流任务超时时间 if time.time() - enable_time > 43200: logger.error("任务执行超时, requestId:{}", requestId) for t in list(task.keys()): if task[t].is_alive(): task[t].pushStreamUtil.status = False task[t].pushStreamUtil.close_push_stream_sp() task[t].join(120) videoStatus[t] = PushStreamStatus.TIMEOUT.value[0] videoInfo_timeout = [{"id": k, "status": v} for k, v in videoStatus.items()] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.TIMEOUT.value[0], ExceptionType.TASK_EXCUTE_TIMEOUT_EXCEPTION.value[ 0], ExceptionType.TASK_EXCUTE_TIMEOUT_EXCEPTION.value[ 1], videoInfo_timeout)}) break # 接受停止指令 event_result = get_no_block_queue(self.eventQueue) if event_result is not None: command = event_result.get("command") videoIds = event_result.get("videoIds") if "stop" == command: # 如果videoIds是空停止所有任务 if videoIds is None or len(videoIds) == 0: logger.info("停止所有执行的推流任务, requestId:{}", requestId) for t in list(task.keys()): if task[t].is_alive(): task[t].pushStreamUtil.status = False task[t].pushStreamUtil.close_push_stream_sp() task[t].join(120) videoStatus[t] = PushStreamStatus.SUCCESS.value[0] videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.SUCCESS.value[0], "", "", videoInfo_sucess)}) break else: logger.info("停止指定的推流任务, requestId:{}", requestId) alive_thread = 0 for t in list(task.keys()): if task[t].is_alive(): if t in videoIds: task[t].pushStreamUtil.status = False task[t].pushStreamUtil.close_push_stream_sp() task[t].join(120) videoStatus[t] = PushStreamStatus.SUCCESS.value[0] else: alive_thread += 1 if alive_thread == 0: videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.SUCCESS.value[0], "", "", videoInfo_sucess)}) break for t in list(task.keys()): if task[t].pushStreamUtil.status and not task[t].is_alive(): logger.error("检测到推流线程异常停止!videoId:{}, requestId:{}", t, requestId) raise Exception("检测到推流线程异常停止!") statusQueue = get_no_block_queue(task[t].statusQueue) if statusQueue is not None and statusQueue[0] == 1: logger.error("推流任务异常: videoId:{}, requestId:{}", t, requestId) task[t].join(timeout=120) videoStatus[t] = PushStreamStatus.FAILED.value[0] raise statusQueue[1] if task[t].is_alive() and statusQueue is not None and statusQueue[0] == 2: if PushStreamStatus.RETRYING.value[0] == statusQueue[1]: videoStatus[t] = PushStreamStatus.RETRYING.value[0] task[t].pushStreamUtil.start_time = time.time() if task[t].is_alive() and time.time() - task[t].pushStreamUtil.start_time > 21: videoStatus[t] = PushStreamStatus.RUNNING.value[0] if count % 10 == 0: videoInfo_hb = [{"id": k, "status": v} for k, v in videoStatus.items()] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.RUNNING.value[0], "", "", videoInfo_hb)}) count = 0 count += 1 time.sleep(1) except ServiceException as s: logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, requestId) for t in list(task.keys()): if task[t].is_alive(): task[t].pushStreamUtil.status = False task[t].pushStreamUtil.close_push_stream_sp() task[t].join(120) videoStatus[t] = PushStreamStatus.FAILED.value[0] videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.FAILED.value[0], s.code, s.msg, videoInfo_ex)}) except Exception: logger.error("服务异常: {}, requestId: {},", format_exc(), requestId) for t in list(task.keys()): if task[t].is_alive(): task[t].pushStreamUtil.status = False task[t].pushStreamUtil.close_push_stream_sp() task[t].join(120) videoStatus[t] = PushStreamStatus.FAILED.value[0] videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()] self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.FAILED.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1], videoInfo_ex)}) finally: if len(task) > 0: for t in list(task.keys()): if task[t].is_alive(): task[t].pushStreamUtil.status = False task[t].pushStreamUtil.close_push_stream_sp() task[t].join(120) logger.info("推流任务完成, requestId: {}", requestId)