115 lines
5.0 KiB
Python
115 lines
5.0 KiB
Python
import subprocess as sp
|
||
from traceback import format_exc
|
||
import cv2, time
|
||
import numpy as np
|
||
from loguru import logger
|
||
from DrGraph.utils.Helper import Helper
|
||
from DrGraph.utils.Exception import ServiceException
|
||
from DrGraph.utils.Constant import Constant
|
||
|
||
class NetStream:
|
||
def __init__(self, pull_url, push_url, request_id):
|
||
self.pull_url = pull_url
|
||
self.push_url = push_url
|
||
self.request_id = request_id
|
||
self.pull_p = None
|
||
|
||
self.width = 1920
|
||
self.height = 1080 * 3 // 2
|
||
self.width_height_3 = 1920 * 1080 * 3 // 2
|
||
self.w_2 = 960
|
||
self.h_2 = 540
|
||
|
||
self.frame_count = 0
|
||
self.start_time = time.time();
|
||
|
||
def clear_pull_p(self,pull_p, requestId):
|
||
try:
|
||
if pull_p and pull_p.poll() is None:
|
||
logger.info("关闭拉流管道, requestId:{}", requestId)
|
||
if pull_p.stdout:
|
||
pull_p.stdout.close()
|
||
pull_p.terminate()
|
||
pull_p.wait(timeout=30)
|
||
logger.info("拉流管道已关闭, requestId:{}", requestId)
|
||
except Exception as e:
|
||
logger.error("关闭拉流管道异常: {}, requestId:{}", format_exc(), requestId)
|
||
if pull_p and pull_p.poll() is None:
|
||
pull_p.kill()
|
||
pull_p.wait(timeout=30)
|
||
raise e
|
||
|
||
def start_pull_p(self, pull_url, requestId):
|
||
try:
|
||
command = ['D:/DrGraph/DSP/ffmpeg.exe']
|
||
# if pull_url.startswith("rtsp://"):
|
||
# command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp'])
|
||
# if pull_url.startswith("http") or pull_url.startswith("rtmp"):
|
||
# command.extend(['-rw_timeout', '20000000'])
|
||
command.extend(['-re',
|
||
'-y',
|
||
'-an',
|
||
# '-hwaccel', 'cuda', cuvid
|
||
'-c:v', 'h264_cuvid',
|
||
# '-resize', self.wah,
|
||
'-i', pull_url,
|
||
'-f', 'rawvideo',
|
||
# '-pix_fmt', 'bgr24',
|
||
'-r', '25',
|
||
'-'])
|
||
self.pull_p = sp.Popen(command, stdout=sp.PIPE)
|
||
return self.pull_p
|
||
except ServiceException as s:
|
||
logger.error("构建拉流管道ServiceException异常: url={}, {}, requestId:{}", pull_url, s.msg, requestId)
|
||
raise s
|
||
except Exception as e:
|
||
logger.error("构建拉流管道Exception异常:url={}, {}, requestId:{}", pull_url, format_exc(), requestId)
|
||
raise e
|
||
|
||
def pull_read_video_stream(self):
|
||
result = None
|
||
try:
|
||
if self.pull_p is None:
|
||
self.start_pull_p(self.pull_url, self.request_id)
|
||
in_bytes = self.pull_p.stdout.read(self.width_height_3)
|
||
if in_bytes is not None and len(in_bytes) > 0:
|
||
try:
|
||
# result = (np.frombuffer(in_bytes, np.uint8).reshape([height * 3 // 2, width, 3]))
|
||
# ValueError: cannot reshape array of size 3110400 into shape (1080,1920)
|
||
result = (np.frombuffer(in_bytes, np.uint8)).reshape((self.height, self.width))
|
||
result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
|
||
# result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
|
||
if result.shape[1] > Constant.pull_frame_width:
|
||
result = cv2.resize(result, (result.shape[1] // 2, result.shape[0] // 2), interpolation=cv2.INTER_LINEAR)
|
||
except Exception:
|
||
logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
|
||
raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
|
||
ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
|
||
except ServiceException as s:
|
||
logger.error("ServiceException 读流异常: {}, requestId:{}", s.msg, self.request_id)
|
||
self.clear_pull_p(self.pull_p, self.request_id)
|
||
self.pull_p = None
|
||
result = None
|
||
raise s
|
||
except Exception:
|
||
logger.error("Exception 读流异常:{}, requestId:{}", format_exc(), self.request_id)
|
||
self.clear_pull_p(self.pull_p, self.request_id)
|
||
self.pull_p = None
|
||
self.width = None
|
||
self.height = None
|
||
self.width_height_3 = None
|
||
result = None
|
||
logger.error("读流异常:{}, requestId:{}", format_exc(), self.request_id)
|
||
return result
|
||
|
||
def prepare_pull(self):
|
||
if self.pull_p is None:
|
||
self.start_time = time.time();
|
||
self.start_pull_p(self.pull_url, self.request_id)
|
||
|
||
def next_pull_frame(self):
|
||
if self.pull_p is None:
|
||
logger.error(f'pull_p is None, requestId: {self.request_id}')
|
||
return None
|
||
frame = self.pull_read_video_stream()
|
||
return frame |