# -*- coding: utf-8 -*- import time from alg_airport_ffmpeg.common.Constant import ConstantEnum from alg_airport_ffmpeg.exception.CustomerException import ServiceException from alg_airport_ffmpeg.util import YmlUtils, LogUtils from loguru import logger from alg_airport_ffmpeg.util.Cv2Utils import Cv2Util ''' 推流服务 ''' class PushStreamService: # 初始化 def __init__(self): # 获取airport环境所需要的配置 self.__content = YmlUtils.get_configs() # 初始化日志 LogUtils.init_log(self.__content) # 服务调用启动方法 def start_service(self): logger.info(ConstantEnum.START_LOG.value[0]) # 循环消息处理 pull_url = self.__content["video"]["pullUrl"] pushUrl = self.__content["video"]["pushUrl"] cv2_tool = Cv2Util(pull_url, pushUrl) cv2_init_num = 0 while True: try: time1 = time.time() #################################拉流推流一体################################### # 1,检查拉流通道是否有流,没有流7秒后重试 if not cv2_tool.is_video_stream(pull_url): logger.info("开始重新获取视频信息: {}次", cv2_init_num) time.sleep(1) cv2_init_num += 1 cv2_tool.close_push_stream() continue cv2_init_num = 0 # 2. 推流存在跳过,不存在初始化推流 cv2_tool.start_push_stream() # 3. 判断推流通道是否有流,进程是否存在 if cv2_tool.is_push_stream_ok(): cv2_tool.close_push_stream() #################################先拉流再推流################################### # if cv2_tool.check_config(): # time.sleep(1) # logger.info("开始重新获取视频信息: {}次", cv2_init_num) # cv2_init_num += 1 # # cv2_tool.build_cv2() # 方式1 cv2 # cv2_tool.get_video_info() # 方式2 ffmpeg # continue # cv2_init_num = 0 # # frame = cv2_tool.cv2_read() # 方式1 cv2 # frame = cv2_tool.read() # 方式2 ffmpeg # if frame is None: # continue # cv2_tool.push_stream_write(frame) # print(time.time()- time1) except ServiceException as s: logger.error("推流异常: {}", s.msg) except Exception as e: logger.error("推流异常: {}", e) cv2_tool.close()