|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- # -*- coding: utf-8 -*-
- import time
- from traceback import format_exc
-
- from multiprocessing import Process, Queue
-
- from loguru import logger
-
- from concurrency.Pull2PushStreamThread import PushSteamThread
- from enums.StatusEnum import PushStreamStatus, ExecuteStatus
- from util.LogUtils import init_log
-
- from enums.ExceptionEnum import ExceptionType
- from entity.FeedBack import pull_stream_feedback
- from exception.CustomerException import ServiceException
- from util.QueUtil import get_no_block_queue, put_queue
-
-
- class PushStreamProcess(Process):
- __slots__ = ('_fb_queue', 'event_queue', '_context', '_msg', '_analysisType')
-
- def __init__(self, *args):
- super().__init__()
- self._fb_queue, self._context, self._msg, self._analysisType = args
- self.event_queue = Queue()
-
- def sendEvent(self, eBody):
- try:
- self.event_queue.put(eBody, timeout=2)
- except Exception:
- logger.error("添加事件到队列超时异常:{}, requestId:{}", format_exc(), self._msg["request_id"])
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
- def run(self):
- msg, context = self._msg, self._context
- requestId, videoUrls = msg["request_id"], msg["video_urls"]
- base_dir, env = context['base_dir'], context['env']
- fb_queue = self._fb_queue
- task, videoStatus = {}, {}
- ex = None
- try:
- init_log(base_dir, env)
- if videoUrls is None or len(videoUrls) == 0:
- raise ServiceException(ExceptionType.PUSH_STREAM_URL_IS_NULL.value[0],
- ExceptionType.PUSH_STREAM_URL_IS_NULL.value[1])
- if len(videoUrls) > 5:
- logger.error("推流数量超过限制, 当前推流数量: {}, requestId:{}", len(videoUrls), requestId)
- raise ServiceException(ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[0],
- ExceptionType.PULL_STREAM_NUM_LIMIT_EXCEPTION.value[1])
- videoInfo = [{"id": url["id"], "status": PushStreamStatus.WAITING.value[0]} for url in videoUrls if
- url.get("id")]
- put_queue(fb_queue, pull_stream_feedback(requestId, ExecuteStatus.WAITING.value[0], "", "", videoInfo))
- for videoUrl in videoUrls:
- pushThread = PushSteamThread(videoUrl["pull_url"], videoUrl["push_url"], requestId, videoUrl["id"])
- pushThread.start()
- task[videoUrl["id"]] = pushThread
- enable_time = time.time()
- for video in videoInfo:
- videoStatus[video.get("id")] = video.get("status")
- count = 0
- while True:
- # 整个推流任务超时时间
- if time.time() - enable_time > 43200:
- logger.error("任务执行超时, requestId:{}", requestId)
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.TIMEOUT.value[0]
- videoInfo_timeout = [{"id": k, "status": v} for k, v in videoStatus.items()]
- put_queue(fb_queue, pull_stream_feedback(requestId, ExecuteStatus.TIMEOUT.value[0],
- ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
- ExceptionType.TASK_EXCUTE_TIMEOUT.value[1],
- videoInfo_timeout))
- break
- # 接受停止指令
- event_result = get_no_block_queue(self.event_queue)
- if event_result is not None:
- command = event_result.get("command")
- videoIds = event_result.get("videoIds")
- if "stop" == command:
- # 如果videoIds是空停止所有任务
- if videoIds is None or len(videoIds) == 0:
- logger.info("停止所有执行的推流任务, requestId:{}", requestId)
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.SUCCESS.value[0]
- videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()]
- put_queue(fb_queue, pull_stream_feedback(requestId, ExecuteStatus.SUCCESS.value[0], "", "",
- videoInfo_sucess))
- break
- else:
- logger.info("停止指定的推流任务, requestId:{}", requestId)
- alive_thread = 0
- for t in list(task.keys()):
- if task[t].is_alive():
- if t in videoIds:
- task[t].status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.SUCCESS.value[0]
- else:
- alive_thread += 1
- if alive_thread == 0:
- videoInfo_sucess = [{"id": k, "status": v} for k, v in videoStatus.items()]
- put_queue(fb_queue, pull_stream_feedback(requestId, ExecuteStatus.SUCCESS.value[0], "",
- "", videoInfo_sucess))
- break
- for t in list(task.keys()):
- if task[t].status and not task[t].is_alive():
- videoStatus[t] = PushStreamStatus.FAILED.value[0]
- logger.error("检测到推流线程异常停止!videoId:{}, requestId:{}", t, requestId)
- if task[t].ex:
- raise task[t].ex
- raise Exception("检测到推流线程异常停止!")
- if task[t].is_alive():
- videoStatus[t] = task[t].excute_status
- if count % 10 == 0:
- videoInfo_hb = [{"id": k, "status": v} for k, v in videoStatus.items()]
- put_queue(fb_queue, pull_stream_feedback(requestId, ExecuteStatus.RUNNING.value[0], "", "",
- videoInfo_hb))
- count = 0
- count += 1
- time.sleep(1)
- except ServiceException as s:
- ex = s.code, s.msg
- logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, requestId)
- except Exception:
- ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
- logger.error("服务异常: {}, requestId: {},", format_exc(), requestId)
- finally:
- if ex:
- errorCode, errorMsg = ex
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- videoStatus[t] = PushStreamStatus.FAILED.value[0]
- videoInfo_ex = [{"id": k, "status": v} for k, v in videoStatus.items()]
- put_queue(fb_queue, pull_stream_feedback(requestId, ExecuteStatus.FAILED.value[0], errorCode, errorMsg,
- videoInfo_ex))
- for t in list(task.keys()):
- if task[t].is_alive():
- task[t].status = False
- task[t].pushStreamUtil.close_push_stream_sp()
- task[t].join(120)
- logger.info("推流任务完成, requestId: {}", requestId)
|