import subprocess as sp from traceback import format_exc import cv2, time import numpy as np from loguru import logger from DrGraph.utils.Helper import Helper from DrGraph.utils.Exception import ServiceException from DrGraph.utils.Constant import Constant class NetStream: def __init__(self, pull_url, push_url, request_id): self.pull_url = pull_url self.push_url = push_url self.request_id = request_id self.pull_p = None self.width = 1920 self.height = 1080 * 3 // 2 self.width_height_3 = 1920 * 1080 * 3 // 2 self.w_2 = 960 self.h_2 = 540 self.frame_count = 0 self.start_time = time.time(); def clear_pull_p(self,pull_p, requestId): try: if pull_p and pull_p.poll() is None: logger.info("关闭拉流管道, requestId:{}", requestId) if pull_p.stdout: pull_p.stdout.close() pull_p.terminate() pull_p.wait(timeout=30) logger.info("拉流管道已关闭, requestId:{}", requestId) except Exception as e: logger.error("关闭拉流管道异常: {}, requestId:{}", format_exc(), requestId) if pull_p and pull_p.poll() is None: pull_p.kill() pull_p.wait(timeout=30) raise e def start_pull_p(self, pull_url, requestId): try: command = ['D:/DrGraph/DSP/ffmpeg.exe'] # if pull_url.startswith("rtsp://"): # command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp']) # if pull_url.startswith("http") or pull_url.startswith("rtmp"): # command.extend(['-rw_timeout', '20000000']) command.extend(['-re', '-y', '-an', # '-hwaccel', 'cuda', cuvid '-c:v', 'h264_cuvid', # '-resize', self.wah, '-i', pull_url, '-f', 'rawvideo', # '-pix_fmt', 'bgr24', '-r', '25', '-']) self.pull_p = sp.Popen(command, stdout=sp.PIPE) return self.pull_p except ServiceException as s: logger.error("构建拉流管道ServiceException异常: url={}, {}, requestId:{}", pull_url, s.msg, requestId) raise s except Exception as e: logger.error("构建拉流管道Exception异常:url={}, {}, requestId:{}", pull_url, format_exc(), requestId) raise e def pull_read_video_stream(self): result = None try: if self.pull_p is None: self.start_pull_p(self.pull_url, self.request_id) in_bytes = self.pull_p.stdout.read(self.width_height_3) if in_bytes is not None and len(in_bytes) > 0: try: # result = (np.frombuffer(in_bytes, np.uint8).reshape([height * 3 // 2, width, 3])) # ValueError: cannot reshape array of size 3110400 into shape (1080,1920) result = (np.frombuffer(in_bytes, np.uint8)).reshape((self.height, self.width)) result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12) # result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) if result.shape[1] > Constant.pull_frame_width: result = cv2.resize(result, (result.shape[1] // 2, result.shape[0] // 2), interpolation=cv2.INTER_LINEAR) except Exception: logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId) raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0], ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1]) except ServiceException as s: logger.error("ServiceException 读流异常: {}, requestId:{}", s.msg, self.request_id) self.clear_pull_p(self.pull_p, self.request_id) self.pull_p = None result = None raise s except Exception: logger.error("Exception 读流异常:{}, requestId:{}", format_exc(), self.request_id) self.clear_pull_p(self.pull_p, self.request_id) self.pull_p = None self.width = None self.height = None self.width_height_3 = None result = None logger.error("读流异常:{}, requestId:{}", format_exc(), self.request_id) return result def prepare_pull(self): if self.pull_p is None: self.start_time = time.time(); self.start_pull_p(self.pull_url, self.request_id) def next_pull_frame(self): if self.pull_p is None: logger.error(f'pull_p is None, requestId: {self.request_id}') return None frame = self.pull_read_video_stream() return frame