You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1096 lines
48KB

  1. # -*- coding: utf-8 -*-
  2. from json import loads
  3. from time import time
  4. from traceback import format_exc
  5. import cv2
  6. import subprocess as sp
  7. import numpy as np
  8. from loguru import logger
  9. from common import Constant
  10. from exception.CustomerException import ServiceException
  11. from enums.ExceptionEnum import ExceptionType
  12. class Cv2Util:
  13. __slots__ = [
  14. 'pullUrl',
  15. 'pushUrl',
  16. 'orFilePath',
  17. 'aiFilePath',
  18. 'p',
  19. 'or_video_file',
  20. 'ai_video_file',
  21. 'fps',
  22. 'width',
  23. 'height',
  24. 'wh',
  25. 'h',
  26. 'w',
  27. 'all_frames',
  28. 'bit_rate',
  29. 'pull_p',
  30. 'requestId',
  31. 'p_push_retry_num',
  32. 'isGpu',
  33. 'read_w_h',
  34. 'context',
  35. 'p_push_time'
  36. ]
  37. def __init__(self, pullUrl=None, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None, context=None,
  38. gpu_ids=None):
  39. self.pullUrl = pullUrl
  40. self.pushUrl = pushUrl
  41. self.orFilePath = orFilePath
  42. self.aiFilePath = aiFilePath
  43. self.p = None
  44. self.or_video_file = None
  45. self.ai_video_file = None
  46. self.fps = None
  47. self.width = None
  48. self.height = None
  49. self.wh = None
  50. self.h = None
  51. self.w = None
  52. self.all_frames = None
  53. self.bit_rate = None
  54. self.pull_p = None
  55. self.requestId = requestId
  56. self.p_push_time = 0
  57. self.p_push_retry_num = 0
  58. self.isGpu = False
  59. self.read_w_h = None
  60. self.context = context
  61. if gpu_ids is not None and len(gpu_ids) > 0:
  62. self.isGpu = True
  63. def getFrameConfig(self, fps, width, height):
  64. if self.fps is None or self.width != width or self.height != height:
  65. self.fps = fps
  66. self.width = width
  67. self.height = height
  68. if width > Constant.width:
  69. self.h = int(self.height // 2)
  70. self.w = int(self.width // 2)
  71. else:
  72. self.h = int(self.height)
  73. self.w = int(self.width)
  74. def clear_video_info(self):
  75. self.fps = None
  76. self.width = None
  77. self.height = None
  78. '''
  79. 获取视频信息
  80. '''
  81. def get_video_info(self):
  82. try:
  83. if self.pullUrl is None or len(self.pullUrl) == 0:
  84. logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  85. raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  86. ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  87. args = ['ffprobe', '-show_format', '-show_streams', '-of', 'json', self.pullUrl]
  88. p = sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE)
  89. out, err = p.communicate(timeout=20)
  90. if p.returncode != 0:
  91. raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
  92. probe = loads(out.decode('utf-8'))
  93. if probe is None or probe.get("streams") is None:
  94. raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
  95. # 视频大小
  96. # format = probe['format']
  97. # size = int(format['size'])/1024/1024
  98. video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
  99. if video_stream is None:
  100. raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
  101. width = video_stream.get('width')
  102. height = video_stream.get('height')
  103. nb_frames = video_stream.get('nb_frames')
  104. fps = video_stream.get('r_frame_rate')
  105. # duration = video_stream.get('duration')
  106. # bit_rate = video_stream.get('bit_rate')
  107. if width is not None and int(width) != 0 and height is not None and int(height) != 0:
  108. self.width = int(width)
  109. self.height = int(height)
  110. self.wh = self.width * self.height * 3
  111. if width > Constant.width:
  112. self.h = int(self.height // 2)
  113. self.w = int(self.width // 2)
  114. else:
  115. self.h = int(self.height)
  116. self.w = int(self.width)
  117. if nb_frames:
  118. self.all_frames = int(nb_frames)
  119. up, down = str(fps).split('/')
  120. self.fps = int(eval(up) / eval(down))
  121. if self.fps > 30:
  122. logger.info("获取视频FPS大于30帧, FPS:{}, requestId:{}", self.fps, self.requestId)
  123. self.fps = 30
  124. if self.fps < 25:
  125. logger.info("获取视频FPS小于25帧, FPS:{}, requestId:{}", self.fps, self.requestId)
  126. self.fps = 25
  127. # if duration:
  128. # self.duration = float(video_stream['duration'])
  129. # self.bit_rate = int(bit_rate) / 1000
  130. logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}, requestId:{}",
  131. self.width, self.height, self.fps, self.all_frames, self.requestId)
  132. except ServiceException as s:
  133. logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
  134. self.clear_video_info()
  135. raise s
  136. except Exception:
  137. logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), self.requestId)
  138. self.clear_video_info()
  139. '''
  140. 录屏任务获取视频信息
  141. '''
  142. def get_recording_video_info(self):
  143. try:
  144. video_info = 'ffprobe -show_format -show_streams -of json %s' % self.pullUrl
  145. p = sp.Popen(video_info, stdout=sp.PIPE, stderr=sp.PIPE, shell=True)
  146. out, err = p.communicate(timeout=17)
  147. if p.returncode != 0:
  148. raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
  149. probe = loads(out.decode('utf-8'))
  150. if probe is None or probe.get("streams") is None:
  151. raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
  152. video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
  153. if video_stream is None:
  154. raise Exception("未获取视频信息!!!!!requestId:" + self.requestId)
  155. width = video_stream.get('width')
  156. height = video_stream.get('height')
  157. nb_frames = video_stream.get('nb_frames')
  158. fps = video_stream.get('r_frame_rate')
  159. if width and int(width) > 0:
  160. self.width = int(width)
  161. if height and int(height) > 0:
  162. self.height = int(height)
  163. if self.width and self.height:
  164. self.wh = int(width * height * 3)
  165. self.read_w_h = ([self.height, self.width, 3])
  166. if nb_frames and int(nb_frames) > 0:
  167. self.all_frames = int(nb_frames)
  168. if fps:
  169. up, down = str(fps).split('/')
  170. self.fps = int(eval(up) / eval(down))
  171. logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}, requestId:{}", self.width,
  172. self.height, self.fps, self.all_frames, self.requestId)
  173. except ServiceException as s:
  174. logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
  175. self.clear_video_info()
  176. raise s
  177. except Exception:
  178. logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), self.requestId)
  179. self.clear_video_info()
  180. def getRecordingFrameConfig(self, fps, width, height):
  181. self.fps = fps
  182. self.width = width
  183. self.height = height
  184. '''
  185. 录屏拉取视频
  186. '''
  187. def recording_pull_p(self):
  188. try:
  189. # 如果视频信息不存在, 不初始化拉流
  190. if self.checkconfig():
  191. return
  192. # 如果已经初始化, 不再初始化
  193. if self.pull_p:
  194. return
  195. command = ['ffmpeg -re', '-y', '-an'
  196. # '-hide_banner',
  197. ]
  198. if self.pullUrl.startswith('rtsp://'):
  199. command.extend(['-rtsp_transport', 'tcp'])
  200. if self.isGpu:
  201. command.extend(['-hwaccel', 'cuda'])
  202. command.extend(['-i', self.pullUrl,
  203. '-f', 'rawvideo',
  204. '-pix_fmt', 'bgr24',
  205. '-r', '25',
  206. '-'])
  207. self.pull_p = sp.Popen(command, stdout=sp.PIPE)
  208. except ServiceException as s:
  209. logger.exception("构建拉流管道异常: {}, requestId:{}", s.msg, self.requestId)
  210. self.clear_video_info()
  211. if self.pull_p:
  212. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  213. self.pull_p.stdout.close()
  214. self.pull_p.terminate()
  215. self.pull_p.wait()
  216. self.pull_p = None
  217. raise s
  218. except Exception as e:
  219. logger.error("构建拉流管道异常:{}, requestId:{}", e, self.requestId)
  220. self.clear_video_info()
  221. if self.pull_p:
  222. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  223. self.pull_p.stdout.close()
  224. self.pull_p.terminate()
  225. self.pull_p.wait()
  226. self.pull_p = None
  227. def recording_read(self):
  228. result = None
  229. try:
  230. self.recording_pull_p()
  231. in_bytes = self.pull_p.stdout.read(self.wh)
  232. if in_bytes is not None and len(in_bytes) > 0:
  233. try:
  234. result = np.frombuffer(in_bytes, np.uint8).reshape(self.read_w_h)
  235. except Exception:
  236. logger.error("视频格式异常:{}, requestId:{}", format_exc(), self.requestId)
  237. raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
  238. ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
  239. except ServiceException as s:
  240. if self.pull_p:
  241. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  242. self.pull_p.stdout.close()
  243. self.pull_p.terminate()
  244. self.pull_p.wait()
  245. self.pull_p = None
  246. raise s
  247. except Exception:
  248. logger.error("读流异常:{}, requestId:{}", format_exc(), self.requestId)
  249. if self.pull_p:
  250. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  251. self.pull_p.stdout.close()
  252. self.pull_p.terminate()
  253. self.pull_p.wait()
  254. self.pull_p = None
  255. return result
  256. '''
  257. 拉取视频
  258. '''
  259. def build_pull_p(self):
  260. try:
  261. command = ['ffmpeg']
  262. if self.pullUrl.startswith("rtsp://"):
  263. command.extend(['-rtsp_transport', 'tcp'])
  264. command.extend(['-re',
  265. '-y',
  266. '-an',
  267. # '-hwaccel', 'cuda', cuvid
  268. '-c:v', 'h264_cuvid',
  269. # '-resize', self.wah,
  270. '-i', self.pullUrl,
  271. '-f', 'rawvideo',
  272. '-pix_fmt', 'bgr24',
  273. '-r', '25',
  274. '-'])
  275. self.pull_p = sp.Popen(command, stdout=sp.PIPE)
  276. except ServiceException as s:
  277. logger.exception("构建拉流管道异常: {}, requestId:{}", s.msg, self.requestId)
  278. raise s
  279. except Exception as e:
  280. logger.error("构建拉流管道异常:{}, requestId:{}", format_exc(), self.requestId)
  281. self.clear_video_info()
  282. if self.pull_p:
  283. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  284. self.pull_p.stdout.close()
  285. self.pull_p.terminate()
  286. self.pull_p.wait()
  287. self.pull_p = None
  288. def checkconfig(self):
  289. if self.width is None or self.height is None or self.fps is None:
  290. return True
  291. return False
  292. def read(self):
  293. result = None
  294. try:
  295. if self.pull_p is None:
  296. self.build_pull_p()
  297. in_bytes = self.pull_p.stdout.read(self.wh)
  298. if in_bytes is not None and len(in_bytes) > 0:
  299. try:
  300. result = (np.frombuffer(in_bytes, np.uint8).reshape([self.height, self.width, 3]))
  301. # img = (np.frombuffer(in_bytes, np.uint8)).reshape((self.h, self.w))
  302. except Exception as ei:
  303. logger.error("视频格式异常:{}, requestId:{}", format_exc(), self.requestId)
  304. raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
  305. ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
  306. # result = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
  307. # result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
  308. if self.width > Constant.width:
  309. result = cv2.resize(result, (self.w, self.h), interpolation=cv2.INTER_LINEAR)
  310. except ServiceException as s:
  311. raise s
  312. except Exception as e:
  313. self.clear_video_info()
  314. if self.pull_p:
  315. logger.info("关闭拉流管道, requestId:{}", self.requestId)
  316. self.pull_p.stdout.close()
  317. self.pull_p.terminate()
  318. self.pull_p.wait()
  319. self.pull_p = None
  320. logger.error("读流异常:{}, requestId:{}", format_exc(), self.requestId)
  321. return result
  322. def close(self):
  323. self.clear_video_info()
  324. if self.pull_p:
  325. if self.pull_p.stdout:
  326. self.pull_p.stdout.close()
  327. self.pull_p.terminate()
  328. self.pull_p.wait()
  329. self.pull_p = None
  330. logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
  331. if self.p:
  332. if self.p.stdin:
  333. self.p.stdin.close()
  334. self.p.terminate()
  335. self.p.wait()
  336. self.p = None
  337. # self.p.communicate()
  338. # self.p.kill()
  339. logger.info("关闭管道完成, requestId:{}", self.requestId)
  340. if self.or_video_file:
  341. self.or_video_file.release()
  342. self.or_video_file = None
  343. logger.info("关闭原视频写入流完成, requestId:{}", self.requestId)
  344. if self.ai_video_file:
  345. self.ai_video_file.release()
  346. self.ai_video_file = None
  347. logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
  348. # 构建 cv2
  349. # def build_cv2(self):
  350. # try:
  351. # if self.cap is not None:
  352. # logger.info("重试, 关闭cap, requestId:{}", self.requestId)
  353. # self.cap.release()
  354. # if self.p is not None:
  355. # logger.info("重试, 关闭管道, requestId:{}", self.requestId)
  356. # self.p.stdin.close()
  357. # self.p.terminate()
  358. # self.p.wait()
  359. # if self.pullUrl is None:
  360. # logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  361. # raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  362. # ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  363. # if self.pushUrl is None:
  364. # logger.error("推流地址不能为空, requestId:{}", self.requestId)
  365. # raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  366. # ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  367. # self.cap = cv2.VideoCapture(self.pullUrl)
  368. # if self.fps is None or self.fps == 0:
  369. # self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
  370. # if self.width is None or self.width == 0:
  371. # self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  372. # if self.height is None or self.height == 0:
  373. # self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  374. # command = ['/usr/bin/ffmpeg',
  375. # # '-y', # 不经过确认,输出时直接覆盖同名文件。
  376. # '-f', 'rawvideo',
  377. # '-vcodec', 'rawvideo',
  378. # '-pix_fmt', 'bgr24', # 显示可用的像素格式
  379. # # '-s', "{}x{}".format(self.width * 2, self.height),
  380. # '-s', "{}x{}".format(int(self.width), int(self.height/2)),
  381. # # '-r', str(15),
  382. # '-i', '-', # 指定输入文件
  383. # '-g', '25',
  384. # '-b:v', '3000k',
  385. # '-tune', 'zerolatency', # 加速编码速度
  386. # '-c:v', 'libx264', # 指定视频编码器
  387. # '-sc_threshold', '0',
  388. # '-pix_fmt', 'yuv420p',
  389. # '-an',
  390. # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  391. # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  392. # '-f', 'flv',
  393. # self.pushUrl]
  394. # # 管道配置
  395. # logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
  396. # self.p = sp.Popen(command, stdin=sp.PIPE)
  397. # except ServiceException as s:
  398. # logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId)
  399. # raise s
  400. # except Exception as e:
  401. # logger.exception("初始化cv2异常:{}, requestId:{}", e, self.requestId)
  402. # 构建 cv2
  403. def build_p(self):
  404. try:
  405. if self.pushUrl is None or len(self.pushUrl) == 0:
  406. logger.error("推流地址不能为空, requestId:{}", self.requestId)
  407. raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  408. ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  409. command = ['ffmpeg',
  410. # '-loglevel', 'debug',
  411. '-re',
  412. '-y',
  413. "-an",
  414. '-f', 'rawvideo',
  415. '-vcodec', 'rawvideo',
  416. '-pix_fmt', 'bgr24',
  417. '-thread_queue_size', '1024',
  418. '-s', "{}x{}".format(self.w * 2, self.h),
  419. '-i', '-', # 指定输入文件
  420. '-r', str(25),
  421. '-g', str(25),
  422. '-maxrate', '6000k',
  423. # '-profile:v', 'high',
  424. '-b:v', '5000k',
  425. # '-crf', '18',
  426. # '-rc:v', 'vbr',
  427. # '-cq:v', '25',
  428. # '-qmin', '25',
  429. # '-qmax', '25',
  430. '-c:v', 'h264_nvenc', #
  431. '-bufsize', '5000k',
  432. # '-c:v', 'libx264', # 指定视频编码器
  433. # '-tune', 'zerolatency', # 加速编码速度
  434. # '-sc_threshold', '0',
  435. '-pix_fmt', 'yuv420p',
  436. # '-flvflags', 'no_duration_filesize',
  437. # '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  438. '-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  439. '-tune', 'll',
  440. '-f', 'flv',
  441. self.pushUrl]
  442. logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width,
  443. self.requestId)
  444. self.p = sp.Popen(command, stdin=sp.PIPE, shell=False)
  445. except ServiceException as s:
  446. if self.p:
  447. if self.p.stdin:
  448. self.p.stdin.close()
  449. self.p.terminate()
  450. self.p.wait()
  451. logger.exception("构建p管道异常: {}, requestId:{}", s.msg, self.requestId)
  452. raise s
  453. except Exception as e:
  454. if self.p:
  455. if self.p.stdin:
  456. self.p.stdin.close()
  457. self.p.terminate()
  458. self.p.wait()
  459. logger.error("初始化p管道异常:{}, requestId:{}", format_exc(), self.requestId)
  460. def push_stream(self, frame):
  461. current_retry_num = 0
  462. while True:
  463. try:
  464. if self.p is None:
  465. self.build_p()
  466. self.p.stdin.write(frame.tostring())
  467. break
  468. except ServiceException as s:
  469. raise s
  470. except Exception as ex:
  471. if self.p_push_time == 0:
  472. self.p_push_time = time.time()
  473. if time.time() - self.p_push_time < 2:
  474. self.p_push_retry_num += 1
  475. self.p_push_time = time.time()
  476. if time.time() - self.p_push_time > 60:
  477. self.p_push_retry_num = 0
  478. self.p_push_time = time.time()
  479. logger.error("推流管道异常:{}, requestId: {}", format_exc(), self.requestId)
  480. if self.p:
  481. try:
  482. if self.p.stdin:
  483. self.p.stdin.close()
  484. self.p.terminate()
  485. self.p.wait()
  486. except:
  487. logger.error("推流管道异常:{}, requestId: {}", format_exc(), self.requestId)
  488. self.p = None
  489. current_retry_num += 1
  490. if self.p_push_retry_num > 100:
  491. logger.error("推流进管道异常:{}, requestId: {}", format_exc(), self.requestId)
  492. raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
  493. ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
  494. if current_retry_num > 3:
  495. logger.error("推流进管道异常:{}, requestId: {}", format_exc(), self.requestId)
  496. raise ServiceException(ExceptionType.PUSH_STREAM_EXCEPTION.value[0],
  497. ExceptionType.PUSH_STREAM_EXCEPTION.value[1])
  498. def build_or_write(self):
  499. try:
  500. if self.orFilePath is not None and self.or_video_file is None:
  501. self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25,
  502. (self.w, self.h))
  503. # self.or_video_file.set(cv2.CAP_PROP_BITRATE, 5000)
  504. if self.or_video_file is None:
  505. logger.error("or_video_file为空, requestId:{}", self.requestId)
  506. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  507. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  508. except ServiceException as s:
  509. if self.or_video_file:
  510. self.or_video_file.release()
  511. self.or_video_file = None
  512. logger.error("构建OR文件写对象异常: {}, requestId:{}", s.msg, self.requestId)
  513. raise s
  514. except Exception as e:
  515. if self.or_video_file:
  516. self.or_video_file.release()
  517. self.or_video_file = None
  518. logger.error("构建OR文件写对象异常: {}, requestId:{}", format_exc(), self.requestId)
  519. raise e
  520. except:
  521. if self.or_video_file:
  522. self.or_video_file.release()
  523. self.or_video_file = None
  524. logger.exception("构建OR文件写对象异常:{}, requestId:{}", format_exc(), self.requestId)
  525. raise Exception("构建OR文件写对象异常")
  526. def build_ai_write(self):
  527. try:
  528. if self.aiFilePath is not None and self.ai_video_file is None:
  529. self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25,
  530. (self.w * 2, self.h))
  531. # self.ai_video_file.set(cv2.CAP_PROP_BITRATE, 5000)
  532. if self.ai_video_file is None:
  533. logger.error("ai_video_file为空, requestId:{}", self.requestId)
  534. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  535. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  536. except ServiceException as s:
  537. if self.ai_video_file:
  538. self.ai_video_file.release()
  539. self.ai_video_file = None
  540. logger.error("构建AI文件写对象异常: {}, requestId:{}", s.msg, self.requestId)
  541. raise s
  542. except Exception as e:
  543. if self.ai_video_file:
  544. self.ai_video_file.release()
  545. self.ai_video_file = None
  546. logger.error("构建AI文件写对象异常: {}, requestId:{}", format_exc(), self.requestId)
  547. raise e
  548. except:
  549. if self.ai_video_file:
  550. self.ai_video_file.release()
  551. self.ai_video_file = None
  552. logger.error("构建AI文件写对象异常:{}, requestId:{}", format_exc(), self.requestId)
  553. raise Exception("构建AI文件写对象异常")
  554. def video_or_write(self, frame):
  555. ai_retry_num = 0
  556. while True:
  557. try:
  558. if self.or_video_file is None:
  559. self.build_or_write()
  560. self.or_video_file.write(frame)
  561. break
  562. except ServiceException as s:
  563. raise s
  564. except Exception as ex:
  565. if ai_retry_num > 3:
  566. logger.error("重新写入原视频视频到本地, 重试失败:{}, requestId: {}", format_exc(),
  567. self.requestId)
  568. raise ex
  569. finally:
  570. ai_retry_num += 1
  571. def video_ai_write(self, frame):
  572. ai_retry_num = 0
  573. while True:
  574. try:
  575. if self.ai_video_file is None:
  576. self.build_ai_write()
  577. self.ai_video_file.write(frame)
  578. break
  579. except ServiceException as s:
  580. raise s
  581. except Exception as ex:
  582. if ai_retry_num > 3:
  583. logger.exception("重新写入分析后的视频到本地,重试失败:{}, requestId: {}", format_exc(),
  584. self.requestId)
  585. raise ex
  586. finally:
  587. ai_retry_num += 1
  588. def video_merge(self, frame1, frame2):
  589. # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  590. # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  591. # frame_merge = np.hstack((frameLeft, frameRight))
  592. frame_merge = np.hstack((frame1, frame2))
  593. return frame_merge
  594. def getP(self):
  595. if self.p is None:
  596. logger.error("获取管道为空, requestId:{}", self.requestId)
  597. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  598. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  599. return self.p
  600. def getOrVideoFile(self):
  601. if self.or_video_file is None:
  602. logger.error("获取原视频写入对象为空, requestId:{}", self.requestId)
  603. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  604. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  605. return self.or_video_file
  606. def getAiVideoFile(self):
  607. if self.ai_video_file is None:
  608. logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId)
  609. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  610. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  611. return self.ai_video_file
  612. def check_video_stream(width, height):
  613. if width is None or height is None:
  614. return True
  615. return False
  616. def build_video_info(pull_url, requestId):
  617. try:
  618. if pull_url is None or len(pull_url) == 0:
  619. logger.error("拉流地址不能为空, requestId:{}", requestId)
  620. raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  621. ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  622. pp = sp.Popen(['ffprobe', '-show_format', '-show_streams', '-of', 'json', pull_url], stdout=sp.PIPE,
  623. stderr=sp.PIPE)
  624. out, err = pp.communicate(timeout=18)
  625. if pp.returncode != 0:
  626. logger.error("获取视频信息失败: {}, requestId:{}", err.decode('utf-8'), requestId)
  627. raise Exception("未获取视频信息!!!!")
  628. probe = loads(out.decode('utf-8'))
  629. video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
  630. if video_stream is None:
  631. raise Exception("未获取视频信息!!!!")
  632. width_new, height_new = video_stream.get('width'), video_stream.get('height')
  633. nb_frames = video_stream.get('nb_frames', 0)
  634. duration = video_stream.get('duration')
  635. if duration is not None and float(duration) != float(0):
  636. nb_frames = int(float(duration) * 25)
  637. if width_new is not None and int(width_new) != 0 and height_new is not None and int(height_new) != 0:
  638. height_o = int(height_new)
  639. width, height = int(width_new), height_o * 3 // 2
  640. width_height_3 = width * height
  641. all_frames = int(nb_frames)
  642. w_2, h_2 = width, height_o
  643. if width > Constant.width:
  644. w_2, h_2 = width // 2, height_o // 2
  645. logger.info("视频信息, width:{}|height:{}|all_frames:{}, requestId:{}", width, height_o, all_frames,
  646. requestId)
  647. return width, height, width_height_3, all_frames, w_2, h_2
  648. raise Exception("未获取视频信息!!!!")
  649. except ServiceException as s:
  650. logger.error("获取视频信息异常: {}, requestId:{}", s.msg, requestId)
  651. raise s
  652. except Exception:
  653. logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), requestId)
  654. return None, None, None, 0, None, None
  655. def build_video_info2(pull_url, requestId):
  656. try:
  657. pp = sp.Popen(['ffprobe', '-show_format', '-show_streams', '-of', 'json', pull_url], stdout=sp.PIPE,
  658. stderr=sp.PIPE)
  659. out, err = pp.communicate(timeout=17)
  660. if pp.returncode != 0:
  661. logger.error("获取视频信息失败: {}, requestId:{}", err.decode('utf-8'), requestId)
  662. raise Exception("未获取视频信息!!!!")
  663. probe = loads(out.decode('utf-8'))
  664. video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
  665. if video_stream is None:
  666. raise Exception("未获取视频信息!!!!")
  667. width_new, height_new = video_stream.get('width'), video_stream.get('height')
  668. nb_frames = video_stream.get('nb_frames', 0)
  669. # fps = video_stream.get('r_frame_rate')
  670. duration = video_stream.get('duration')
  671. if duration is not None and float(duration) != float(0):
  672. nb_frames = int(float(duration) * 25)
  673. # bit_rate = video_stream.get('bit_rate')
  674. if width_new is not None and int(width_new) != 0 and height_new is not None and int(height_new) != 0:
  675. width_o, height_o = int(width_new), int(height_new)
  676. # width_height_3 = width * height * 3
  677. width_height_3 = width_o * height_o * 3 // 2
  678. width, height, all_frames = width_o, height_o * 3 // 2, int(nb_frames)
  679. w, h = width_o, height_o
  680. if width > Constant.width:
  681. w, h = width_o // 2, height_o // 2
  682. # up, down = str(fps).split('/')
  683. # self.fps = int(eval(up) / eval(down))
  684. # if duration:
  685. # self.duration = float(video_stream['duration'])
  686. # self.bit_rate = int(bit_rate) / 1000
  687. logger.info("视频信息, width:{}|height:{}|all_frames:{}, requestId:{}", width_o, height_o, all_frames,
  688. requestId)
  689. return width, height, width_height_3, all_frames, w, h
  690. raise Exception("未获取视频信息!!!!")
  691. except ServiceException as s:
  692. logger.error("获取视频信息异常: {}, requestId:{}", s.msg, requestId)
  693. raise s
  694. except Exception:
  695. logger.error("获取视频信息异常:{}, requestId:{}", format_exc(), requestId)
  696. return None, None, None, 0, None, None
  697. def start_pull_p(pull_url, requestId):
  698. try:
  699. command = ['ffmpeg']
  700. if pull_url.startswith("rtsp://"):
  701. command.extend(['-timeout', '20000000', '-rtsp_transport', 'tcp'])
  702. if pull_url.startswith("http") or pull_url.startswith("rtmp"):
  703. command.extend(['-rw_timeout', '20000000'])
  704. command.extend(['-re',
  705. '-y',
  706. '-an',
  707. # '-hwaccel', 'cuda', cuvid
  708. '-c:v', 'h264_cuvid',
  709. # '-resize', self.wah,
  710. '-i', pull_url,
  711. '-f', 'rawvideo',
  712. # '-pix_fmt', 'bgr24',
  713. '-r', '25',
  714. '-'])
  715. return sp.Popen(command, stdout=sp.PIPE)
  716. except ServiceException as s:
  717. logger.error("构建拉流管道异常: {}, requestId:{}", s.msg, requestId)
  718. raise s
  719. except Exception as e:
  720. logger.error("构建拉流管道异常:{}, requestId:{}", format_exc(), requestId)
  721. raise e
  722. def clear_pull_p(pull_p, requestId):
  723. try:
  724. if pull_p and pull_p.poll() is None:
  725. logger.info("关闭拉流管道, requestId:{}", requestId)
  726. if pull_p.stdout:
  727. pull_p.stdout.close()
  728. pull_p.terminate()
  729. pull_p.wait(timeout=30)
  730. logger.info("拉流管道已关闭, requestId:{}", requestId)
  731. except Exception as e:
  732. logger.error("关闭拉流管道异常: {}, requestId:{}", format_exc(), requestId)
  733. if pull_p and pull_p.poll() is None:
  734. pull_p.kill()
  735. pull_p.wait(timeout=30)
  736. raise e
  737. def pull_read_video_stream(pull_p, pull_url, width, height, width_height_3, w_2, h_2, requestId):
  738. result = None
  739. try:
  740. if pull_p is None:
  741. pull_p = start_pull_p(pull_url, requestId)
  742. in_bytes = pull_p.stdout.read(width_height_3)
  743. if in_bytes is not None and len(in_bytes) > 0:
  744. try:
  745. # result = (np.frombuffer(in_bytes, np.uint8).reshape([height * 3 // 2, width, 3]))
  746. result = (np.frombuffer(in_bytes, np.uint8)).reshape((height, width))
  747. result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
  748. # result = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
  749. if width > Constant.width:
  750. result = cv2.resize(result, (w_2, h_2), interpolation=cv2.INTER_LINEAR)
  751. except Exception:
  752. logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
  753. raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
  754. ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
  755. except ServiceException as s:
  756. clear_pull_p(pull_p, requestId)
  757. raise s
  758. except Exception:
  759. clear_pull_p(pull_p, requestId)
  760. pull_p, width, height = None, None, None
  761. logger.error("读流异常:{}, requestId:{}", format_exc(), requestId)
  762. return result, pull_p, width, height
  763. def pull_read_video_stream2(pull_p, pull_url, width, height, width_height_3, w, h, requestId):
  764. result = None
  765. try:
  766. if pull_p is None:
  767. pull_p = start_pull_p(pull_url, requestId)
  768. in_bytes = pull_p.stdout.read(width_height_3)
  769. if in_bytes is not None and len(in_bytes) > 0:
  770. try:
  771. result = (np.frombuffer(in_bytes, np.uint8)).reshape((height, width))
  772. result = cv2.cvtColor(result, cv2.COLOR_YUV2BGR_NV12)
  773. if width > Constant.width:
  774. result = cv2.resize(result, (w, h), interpolation=cv2.INTER_LINEAR)
  775. except Exception:
  776. logger.error("视频格式异常:{}, requestId:{}", format_exc(), requestId)
  777. raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
  778. ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
  779. except ServiceException as s:
  780. clear_pull_p(pull_p, requestId)
  781. raise s
  782. except Exception:
  783. clear_pull_p(pull_p, requestId)
  784. pull_p, width, height = None, None, None
  785. logger.error("读流异常:{}, requestId:{}", format_exc(), requestId)
  786. return result, pull_p, width, height
  787. def video_conjuncing(frame1, frame2):
  788. # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  789. # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  790. # frame_merge = np.hstack((frameLeft, frameRight))
  791. frame_merge = np.hstack((frame1, frame2))
  792. return frame_merge
  793. def build_push_p(push_url, width, height, requestId):
  794. push_p = None
  795. try:
  796. if push_url is None or len(push_url) == 0:
  797. logger.error("推流地址不能为空, requestId:{}", requestId)
  798. raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  799. ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  800. command = ['ffmpeg',
  801. # '-loglevel', 'debug',
  802. # '-re',
  803. '-y',
  804. "-an",
  805. '-f', 'rawvideo',
  806. '-vcodec', 'rawvideo',
  807. '-pix_fmt', 'bgr24',
  808. '-thread_queue_size', '1024',
  809. '-s', "{}x{}".format(width, height),
  810. '-i', '-', # 指定输入文件
  811. '-r', str(25),
  812. '-g', str(25),
  813. '-maxrate', '6000k',
  814. # '-profile:v', 'high',
  815. '-b:v', '4000k',
  816. # '-crf', '18',
  817. # '-rc:v', 'vbr',
  818. # '-cq:v', '25',
  819. # '-qmin', '25',
  820. # '-qmax', '25',
  821. '-c:v', 'h264_nvenc', #
  822. '-bufsize', '4000k',
  823. # '-c:v', 'libx264', # 指定视频编码器
  824. # '-tune', 'zerolatency', # 加速编码速度
  825. # '-sc_threshold', '0',
  826. # '-rc', 'cbr_ld_hq',
  827. # '-zerolatency', '1',
  828. '-pix_fmt', 'yuv420p',
  829. # '-flvflags', 'no_duration_filesize',
  830. # '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  831. '-preset', 'p6', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  832. '-tune', 'll',
  833. '-f', 'flv',
  834. push_url]
  835. logger.info("height:{}|width:{}|requestId:{}", height, width, requestId)
  836. push_p = sp.Popen(command, stdin=sp.PIPE, shell=False)
  837. return push_p
  838. except ServiceException as s:
  839. if push_p:
  840. if push_p.stdin:
  841. push_p.stdin.close()
  842. push_p.terminate()
  843. push_p.wait()
  844. logger.error("构建p管道异常: {}, requestId:{}", s.msg, requestId)
  845. raise s
  846. except Exception as e:
  847. if push_p:
  848. if push_p.stdin:
  849. push_p.stdin.close()
  850. push_p.terminate()
  851. push_p.wait()
  852. logger.error("初始化p管道异常:{}, requestId:{}", format_exc(), requestId)
  853. raise e
  854. def push_video_stream(frame, push_p, push_url, p_push_status, requestId):
  855. """
  856. :param frame: 当前视频帧
  857. :param push_p: 推流管道
  858. :param push_url: 推流地址
  859. :param p_push_status: 控制异常控制
  860. :param requestId: 请求id
  861. :return: 推流管道
  862. """
  863. st = time()
  864. try:
  865. if push_p is None:
  866. height, width = frame.shape[0:2]
  867. push_p = build_push_p(push_url, width, height, requestId)
  868. push_p.stdin.write(frame.tostring())
  869. return push_p
  870. except ServiceException as s:
  871. clear_push_p(push_p, requestId)
  872. raise s
  873. except Exception:
  874. et = time() - st
  875. logger.error("推流异常使用时间:{}, requestId: {}", et, requestId)
  876. if et > 20:
  877. logger.error("推流进管道异常:{}, requestId: {}", format_exc(), requestId)
  878. raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
  879. ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
  880. if p_push_status[0] == 0:
  881. p_push_status[0] = time()
  882. p_push_status[1] += 1
  883. elif time() - p_push_status[0] <= 60:
  884. p_push_status[1] += 1
  885. p_push_status[0] = time()
  886. elif time() - p_push_status[0] > 60:
  887. p_push_status[1] = 1
  888. p_push_status[0] = time()
  889. logger.error("推流管道异常:{}, requestId: {}", format_exc(), requestId)
  890. clear_push_p(push_p, requestId)
  891. if p_push_status[1] > 5:
  892. logger.error("推流进管道异常:{}, requestId: {}", format_exc(), requestId)
  893. raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
  894. ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
  895. return None
  896. def clear_push_p(push_p, requestId):
  897. if push_p:
  898. try:
  899. if push_p.stdin:
  900. push_p.stdin.close()
  901. push_p.terminate()
  902. push_p.wait()
  903. except Exception:
  904. logger.error("推流管道异常:{}, requestId: {}", format_exc(), requestId)
  905. def close_or_write_stream(or_video_file, requestId):
  906. try:
  907. if or_video_file:
  908. or_video_file.release()
  909. except Exception:
  910. logger.info("关闭原视频写流管道异常:{}, requestId:{}", format_exc(), requestId)
  911. def close_ai_write_stream(ai_video_file, requestId):
  912. try:
  913. if ai_video_file:
  914. ai_video_file.release()
  915. except Exception:
  916. logger.info("关闭AI视频写流管道异常:{}, requestId:{}", format_exc(), requestId)
  917. def close_all_p(push_p, or_video_file, ai_video_file, requestId):
  918. logger.info("开始停止推流、写流管道!requestId:{}", requestId)
  919. clear_push_p(push_p, requestId)
  920. close_or_write_stream(or_video_file, requestId)
  921. close_ai_write_stream(ai_video_file, requestId)
  922. logger.info("停止推流、写流管道完成!requestId:{}", requestId)
  923. def build_or_video(orFilePath, width, height, requestId):
  924. or_video_file = None
  925. try:
  926. or_video_file = cv2.VideoWriter(orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
  927. if or_video_file is None:
  928. logger.error("or_video_file为空, requestId:{}", requestId)
  929. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  930. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  931. return or_video_file
  932. except ServiceException as s:
  933. if or_video_file:
  934. or_video_file.release()
  935. logger.error("构建OR文件写对象异常: {}, requestId:{}", s.msg, requestId)
  936. raise s
  937. except Exception as e:
  938. if or_video_file:
  939. or_video_file.release()
  940. logger.error("构建OR文件写对象异常: {}, requestId:{}", format_exc(), requestId)
  941. raise e
  942. def write_or_video(frame, orFilePath, or_video_file, or_write_status, requestId):
  943. """
  944. :param frame: 当时视频帧
  945. :param orFilePath: 原视频名称
  946. :param or_video_file: 原视频本地写流对象
  947. :param or_write_status: 原视频写流状态检查 [0, 0] 第一个参数时间,第二个参数重试的次数
  948. :param requestId: 请求id
  949. :return: 原视频本地写流对象
  950. :desc 如果写流失败了, 不重试, 丢弃本次视频帧
  951. """
  952. try:
  953. if or_video_file is None:
  954. height, width = frame.shape[0], frame.shape[1]
  955. or_video_file = build_or_video(orFilePath, width, height, requestId)
  956. or_video_file.write(frame)
  957. return or_video_file
  958. except ServiceException as s:
  959. if or_video_file:
  960. or_video_file.release()
  961. raise s
  962. except Exception as ex:
  963. # 当第一次写视频帧到本地失败, 更新or_write_status的时间和重试次数
  964. if or_write_status[0] == 0:
  965. or_write_status[0] = time()
  966. or_write_status[1] += 1
  967. # 1分钟内失败重试次数更新, 1分钟中容忍小于5次的失败次数
  968. elif time() - or_write_status[0] <= 60:
  969. or_write_status[1] += 1
  970. or_write_status[0] = time()
  971. # 大于1分钟初始化检查数组
  972. elif time() - or_write_status[0] > 60:
  973. or_write_status[1] = 1
  974. or_write_status[0] = time()
  975. if or_write_status[1] > 5:
  976. if or_video_file:
  977. or_video_file.release()
  978. logger.error("重新写入原视频视频到本地, 重试失败:{}, requestId: {}", format_exc(), requestId)
  979. raise ex
  980. return or_video_file
  981. def build_ai_video(aiFilePath, width, height, requestId):
  982. ai_video_file = None
  983. try:
  984. ai_video_file = cv2.VideoWriter(aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), 25, (width, height))
  985. if ai_video_file is None:
  986. logger.error("ai_video_file为空, requestId:{}", requestId)
  987. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  988. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  989. return ai_video_file
  990. except ServiceException as s:
  991. if ai_video_file:
  992. ai_video_file.release()
  993. logger.error("构建AI文件写对象异常: {}, requestId:{}", s.msg, requestId)
  994. raise s
  995. except Exception as e:
  996. if ai_video_file:
  997. ai_video_file.release()
  998. logger.error("构建AI文件写对象异常: {}, requestId:{}", format_exc(), requestId)
  999. raise e
  1000. def write_ai_video(frame, aiFilePath, ai_video_file, ai_write_status, requestId):
  1001. try:
  1002. if ai_video_file is None:
  1003. height, width = frame.shape[0], frame.shape[1]
  1004. ai_video_file = build_ai_video(aiFilePath, width, height, requestId)
  1005. ai_video_file.write(frame)
  1006. return ai_video_file
  1007. except ServiceException as s:
  1008. if ai_video_file:
  1009. ai_video_file.release()
  1010. raise s
  1011. except Exception as ex:
  1012. if ai_write_status[0] == 0:
  1013. ai_write_status[0] = time()
  1014. ai_write_status[1] += 1
  1015. # 大于1分钟初始化检查数组
  1016. elif time() - ai_write_status[0] > 60:
  1017. ai_write_status[1] = 1
  1018. ai_write_status[0] = time()
  1019. # 1分钟内失败重试次数更新, 1分钟中容忍小于5次的失败次数
  1020. elif time() - ai_write_status[0] <= 60:
  1021. ai_write_status[1] += 1
  1022. ai_write_status[0] = time()
  1023. if ai_write_status[1] > 5:
  1024. if ai_video_file:
  1025. ai_video_file.release()
  1026. logger.error("重新写入分析后的视频到本地,重试失败:{}, requestId: {}", format_exc(), requestId)
  1027. raise ex
  1028. return ai_video_file