|
- # -*- coding: utf-8 -*-
- from queue import Queue
- from threading import Thread
- import time
- from traceback import format_exc
-
- from loguru import logger
-
- from enums.StatusEnum import PushStreamStatus
- from exception.CustomerException import ServiceException
- from util.PushStreamUtils import PushStreamUtil
- from util.QueUtil import put_queue
-
-
- class PushSteamThread(Thread):
- __slots__ = ("pushStreamUtil", "requestId", "videoId", "statusQueue")
-
- def __init__(self, pullUrl, pushUrl, requestId, videoId):
- super().__init__()
- self.pushStreamUtil = PushStreamUtil(pullUrl, pushUrl, requestId)
- self.requestId = requestId
- self.videoId = videoId
- self.statusQueue = Queue()
-
- def run(self):
- logger.info("开始启动推流线程, 视频id: {}, requestId:{}", self.videoId, self.requestId)
- while True:
- try:
- self.pushStreamUtil.start_push_stream()
- out, err = self.pushStreamUtil.push_stream_sp.communicate()
- # 异常断流
- if self.pushStreamUtil.status:
- logger.warning("推流异常,请检测拉流地址和推流地址是否正常!")
- if self.pushStreamUtil.push_stream_sp.returncode != 0:
- logger.error("推流异常:{}, 视频id: {}, requestId:{}", err.decode(), self.videoId,
- self.requestId)
- put_queue(self.statusQueue, (2, PushStreamStatus.RETRYING.value[0]), is_throw_ex=False)
- self.pushStreamUtil.close_push_stream_sp()
- time.sleep(1)
- # 手动断流
- if not self.pushStreamUtil.status:
- self.pushStreamUtil.close_push_stream_sp()
- break
- except ServiceException as s:
- logger.error("异常: {}, 视频id: {}, requestId:{}", s.msg, self.videoId, self.requestId)
- self.pushStreamUtil.status = False
- self.pushStreamUtil.close_push_stream_sp()
- put_queue(self.statusQueue, (1, s), is_throw_ex=False)
- break
- except Exception as e:
- logger.error("异常:{}, 视频id: {}, requestId:{}", format_exc(), self.videoId, self.requestId)
- self.pushStreamUtil.status = False
- self.pushStreamUtil.close_push_stream_sp()
- put_queue(self.statusQueue, (1, e), is_throw_ex=False)
- break
- logger.info("结束推流线程, 视频id: {}, requestId:{}", self.videoId, self.requestId)
|