|
- # -*- coding: utf-8 -*-
- import time
- from traceback import format_exc
-
- from multiprocessing import Process, Queue
-
- from loguru import logger
-
- from concurrency.PushStreamThread import PushSteamThread
- from enums.StatusEnum import PushStreamStatus, ExecuteStatus
- from util.LogUtils import init_log
-
- from enums.ExceptionEnum import ExceptionType
- from entity.FeedBack import pull_stream_feedback
- from exception.CustomerException import ServiceException
- from util.QueUtil import get_no_block_queue
-
-
- class PushStreamProcess(Process):
- __slots__ = [
- '_fbQueue',
- '_eventQueue',
- '_context',
- '_msg',
- '_analysisType',
- '_base_dir'
- ]
-
- def __init__(self, param):
- super().__init__()
- self._fbQueue = param.fbqueue
- self.eventQueue = Queue()
- self._context = param.context
- self._msg = param.msg
- self._analysisType = param.analyse_type
- self._base_dir = param.base_dir
-
- def sendEvent(self, eBody):
- self.eventQueue.put(eBody)
-
- # 获取下一个事件
- def getEvent(self):
- return get_no_block_queue(self.eventQueue)
-
- # 推送执行结果
- def sendResult(self, result):
- self._fbQueue.put(result)
-
- def sendhbMessage(self, status, videoIds, error_code="", error_msg=""):
- self.sendResult({"pull_stream": pull_stream_feedback(self._msg.get("requestId"),
- status,
- error_code,
- error_msg, videoIds)})
-
- def run(self):
- task = {}
- videoStatus = {}
- requestId = self._msg.get("requestId")
- try:
- # 程序开始时间
- init_log(self._base_dir)
- videoInfo = [{"id": url.get("id"), "status": PushStreamStatus.WAITING.value[0]} for url in
- self._msg.get("videoUrls", []) if url.get("id")]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId, ExecuteStatus.WAITING.value[0], "", "",
- videoInfo)})
- videoUrls = self._msg.get("videoUrls")
- requestId = self._msg.get("requestId")
- if videoUrls is None or len(videoUrls) == 0:
- raise ServiceException(ExceptionType.PUSH_STREAM_URL_IS_NULL.value[0],
- ExceptionType.PUSH_STREAM_URL_IS_NULL.value[1])
- if len(videoUrls) > 5:
- logger.error("推流数量超过限制, 当前推流数量: {}, requestId:{}", len(videoUrls), requestId)
- raise ServiceException(ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[0],
- ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[1])
- for videoUrl in videoUrls:
- pushThread = PushSteamThread(videoUrl["pullUrl"], videoUrl["pushUrl"], requestId, videoUrl["id"])
- pushThread.start()
- task[videoUrl["id"]] = pushThread
- enable_time = time.time()
- for video in videoInfo:
- videoStatus[video.get("id")] = video.get("status")
- count = 0
- while True:
- # 整个推流任务超时时间
- if time.time() - enable_time > 43200:
- logger.error("任务执行超时, requestId:{}", requestId)
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].pushStreamUtil.status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.TIMEOUT.value[0]
- videoInfo_timeout = [{"id": k, "status": v} for k, v in videoStatus.items()]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId,
- ExecuteStatus.TIMEOUT.value[0],
- ExceptionType.TASK_EXCUTE_TIMEOUT_EXCEPTION.value[
- 0],
- ExceptionType.TASK_EXCUTE_TIMEOUT_EXCEPTION.value[
- 1],
- videoInfo_timeout)})
- break
- # 接受停止指令
- event_result = get_no_block_queue(self.eventQueue)
- if event_result is not None:
- command = event_result.get("command")
- videoIds = event_result.get("videoIds")
- if "stop" == command:
- # 如果videoIds是空停止所有任务
- if videoIds is None or len(videoIds) == 0:
- logger.info("停止所有执行的推流任务, requestId:{}", requestId)
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].pushStreamUtil.status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.SUCCESS.value[0]
- videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId,
- ExecuteStatus.SUCCESS.value[0],
- "",
- "",
- videoInfo_sucess)})
- break
- else:
- logger.info("停止指定的推流任务, requestId:{}", requestId)
- alive_thread = 0
- for t in list(task.keys()):
- if task[t].is_alive():
- if t in videoIds:
- task[t].pushStreamUtil.status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.SUCCESS.value[0]
- else:
- alive_thread += 1
- if alive_thread == 0:
- videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId,
- ExecuteStatus.SUCCESS.value[0],
- "",
- "",
- videoInfo_sucess)})
- break
- for t in list(task.keys()):
- if task[t].pushStreamUtil.status and not task[t].is_alive():
- logger.error("检测到推流线程异常停止!videoId:{}, requestId:{}", t, requestId)
- raise Exception("检测到推流线程异常停止!")
- statusQueue = get_no_block_queue(task[t].statusQueue)
- if statusQueue is not None and statusQueue[0] == 1:
- logger.error("推流任务异常: videoId:{}, requestId:{}", t, requestId)
- task[t].join(timeout=120)
- videoStatus[t] = PushStreamStatus.FAILED.value[0]
- raise statusQueue[1]
- if task[t].is_alive() and statusQueue is not None and statusQueue[0] == 2:
- if PushStreamStatus.RETRYING.value[0] == statusQueue[1]:
- videoStatus[t] = PushStreamStatus.RETRYING.value[0]
- task[t].pushStreamUtil.start_time = time.time()
- if task[t].is_alive() and time.time() - task[t].pushStreamUtil.start_time > 21:
- videoStatus[t] = PushStreamStatus.RUNNING.value[0]
- if count % 10 == 0:
- videoInfo_hb = [{"id": k, "status": v} for k, v in videoStatus.items()]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId,
- ExecuteStatus.RUNNING.value[0],
- "",
- "",
- videoInfo_hb)})
- count = 0
- count += 1
- time.sleep(1)
- except ServiceException as s:
- logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, requestId)
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].pushStreamUtil.status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.FAILED.value[0]
- videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId,
- ExecuteStatus.FAILED.value[0],
- s.code,
- s.msg,
- videoInfo_ex)})
- except Exception:
- logger.error("服务异常: {}, requestId: {},", format_exc(), requestId)
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].pushStreamUtil.status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.FAILED.value[0]
- videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()]
- self.sendResult({"pull_stream": pull_stream_feedback(requestId,
- ExecuteStatus.FAILED.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
- videoInfo_ex)})
- finally:
- if len(task) > 0:
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].pushStreamUtil.status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- logger.info("推流任务完成, requestId: {}", requestId)
|