Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

495 lines
23KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import cv2
  4. import subprocess as sp
  5. import ffmpeg
  6. import numpy as np
  7. from loguru import logger
  8. from exception.CustomerException import ServiceException
  9. from enums.ExceptionEnum import ExceptionType
  10. class Cv2Util():
  11. def __init__(self, pullUrl, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None):
  12. self.pullUrl = pullUrl
  13. self.pushUrl = pushUrl
  14. self.orFilePath = orFilePath
  15. self.aiFilePath = aiFilePath
  16. self.cap = None
  17. self.p = None
  18. self.or_video_file = None
  19. self.ai_video_file = None
  20. self.fps = None
  21. self.width = None
  22. self.height = None
  23. self.wah = None
  24. self.wh = None
  25. self.h = None
  26. self.hn = None
  27. self.w = None
  28. self.all_frames = None
  29. self.bit_rate = None
  30. self.pull_p = None
  31. self.requestId = requestId
  32. self.p_push_retry_num = 0
  33. self.resize_status = False
  34. self.current_frame = 0
  35. def getFrameConfig(self, fps, width, height):
  36. if self.fps is None:
  37. self.fps = fps
  38. self.width = width
  39. self.height = height
  40. self.wh = int(width * height * 3 // 8)
  41. self.wah = '%sx%s' % (int(self.width / 2), int(self.height / 2))
  42. self.h = int(self.height * 3 // 4)
  43. self.w = int(self.width // 2)
  44. self.hn = int(self.height // 2)
  45. self.wn = int(self.width // 2)
  46. w_f = self.wh != width * height * 3 / 8
  47. h_f = self.h != self.height * 3 / 4
  48. wd_f = self.w != self.width / 2
  49. if w_f or h_f or wd_f:
  50. self.resize_status = True
  51. self.wh = int(width * height * 3 // 2)
  52. self.wah = '%sx%s' % (int(self.width), int(self.height))
  53. self.h = int(self.height * 3 // 2)
  54. self.w = int(self.width)
  55. '''
  56. 获取视频信息
  57. '''
  58. def get_video_info(self):
  59. try:
  60. if self.pullUrl is None:
  61. logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  62. raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  63. ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  64. probe = ffmpeg.probe(self.pullUrl)
  65. if probe is None or probe.get("streams") is None:
  66. return
  67. # 视频大小
  68. # format = probe['format']
  69. # size = int(format['size'])/1024/1024
  70. video_stream = next((stream for stream in probe['streams'] if stream.get('codec_type') == 'video'), None)
  71. if video_stream is None:
  72. logger.error("根据拉流地址未获取到视频流, requestId:{}", self.requestId)
  73. return
  74. width = video_stream.get('width')
  75. height = video_stream.get('height')
  76. nb_frames = video_stream.get('nb_frames')
  77. fps = video_stream.get('r_frame_rate')
  78. # duration = video_stream.get('duration')
  79. bit_rate = video_stream.get('bit_rate')
  80. if width:
  81. self.width = int(width)
  82. if height:
  83. self.height = int(height)
  84. if width is not None and height is not None:
  85. self.wh = int(width * height * 3 // 8)
  86. self.wah = '%sx%s' % (int(self.width / 2), int(self.height / 2))
  87. self.h = int(self.height * 3 // 4)
  88. self.w = int(self.width / 2)
  89. self.hn = int(self.height / 2)
  90. self.wn = int(self.width // 2)
  91. w_f = self.wh != width * height * 3 / 8
  92. h_f = self.h != self.height * 3 / 4
  93. wd_f = self.w != self.width / 2
  94. if w_f or h_f or wd_f:
  95. self.resize_status = True
  96. self.wh = int(width * height * 3 // 2)
  97. self.wah = '%sx%s' % (int(self.width), int(self.height))
  98. self.h = int(self.height * 3 // 2)
  99. self.w = int(self.width)
  100. if nb_frames:
  101. self.all_frames = int(nb_frames)
  102. if fps:
  103. up, down = str(fps).split('/')
  104. self.fps = int(eval(up) / eval(down))
  105. # if duration:
  106. # self.duration = float(video_stream['duration'])
  107. if bit_rate:
  108. self.bit_rate = int(bit_rate) / 1000
  109. logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}|bit_rate:{}, requestId:{}", self.width,
  110. self.height, self.fps, self.all_frames, self.bit_rate, self.requestId)
  111. except ServiceException as s:
  112. logger.error("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
  113. raise s
  114. except ffmpeg._run.Error as er:
  115. logger.error("获取视频信息异常: {}, requestId:{}", er.stderr.decode(encoding='utf-8'), self.requestId)
  116. except Exception as e:
  117. logger.exception("获取视频信息异常:{}, requestId:{}", e, self.requestId)
  118. '''
  119. 拉取视频
  120. '''
  121. def build_pull_p(self):
  122. try:
  123. if self.wah is None:
  124. return
  125. if self.pull_p:
  126. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  127. self.pull_p.stdout.close()
  128. self.pull_p.terminate()
  129. self.pull_p.wait()
  130. # command = ['ffmpeg',
  131. # # '-b:v', '3000k',
  132. # '-i', self.pullUrl,
  133. # '-f', 'rawvideo',
  134. # '-vcodec', 'rawvideo',
  135. # '-pix_fmt', 'bgr24',
  136. # # '-s', "{}x{}".format(int(width), int(height)),
  137. # '-an',
  138. # '-']
  139. # input_config = {'c:v': 'h264_cuvid', 'resize': self.wah}
  140. # process = (
  141. # ffmpeg
  142. # .input(self.pullUrl, **input_config)
  143. # .output('pipe:', format='rawvideo', r=str(self.fps)) # pix_fmt='bgr24'
  144. # .overwrite_output()
  145. # .global_args('-an')
  146. # .run_async(pipe_stdout=True)
  147. # )
  148. command = ['ffmpeg',
  149. '-re',
  150. '-y',
  151. '-c:v', 'h264_cuvid',
  152. '-resize', self.wah,
  153. '-i', self.pullUrl,
  154. '-f', 'rawvideo',
  155. '-an',
  156. '-']
  157. self.pull_p = sp.Popen(command, stdout=sp.PIPE)
  158. # self.pull_p = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE)
  159. # self.pull_p = process
  160. except ServiceException as s:
  161. logger.exception("构建拉流管道异常: {}, requestId:{}", s, self.requestId)
  162. raise s
  163. except Exception as e:
  164. logger.exception("构建拉流管道异常:{}, requestId:{}", e, self.requestId)
  165. def checkconfig(self):
  166. if self.fps is None or self.width is None or self.height is None:
  167. return True
  168. return False
  169. def read(self):
  170. result = None
  171. try:
  172. # if self.pull_p is None:
  173. # logger.error("拉流管道为空, requestId:{}", self.requestId)
  174. # raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0],
  175. # ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1])
  176. in_bytes = self.pull_p.stdout.read(self.wh)
  177. self.current_frame += 1
  178. if in_bytes is not None and len(in_bytes) > 0:
  179. # result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.height), int(self.width), 3]))
  180. try:
  181. img = (np.frombuffer(in_bytes, np.uint8)).reshape((self.h, self.w))
  182. except Exception as ei:
  183. logger.exception("视频格式异常:{}, requestId:{}", ei, self.requestId)
  184. raise ServiceException(ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[0],
  185. ExceptionType.VIDEO_RESOLUTION_EXCEPTION.value[1])
  186. result = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
  187. if self.resize_status:
  188. result = cv2.resize(result, (int(self.width / 2), int(self.height / 2)),
  189. interpolation=cv2.INTER_LINEAR)
  190. except ServiceException as s:
  191. raise s
  192. except Exception as e:
  193. logger.exception("读流异常:{}, requestId:{}", e, self.requestId)
  194. return result
  195. def close(self):
  196. if self.pull_p:
  197. if self.pull_p.stdout:
  198. self.pull_p.stdout.close()
  199. self.pull_p.terminate()
  200. self.pull_p.wait()
  201. logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
  202. if self.p:
  203. if self.p.stdin:
  204. self.p.stdin.close()
  205. self.p.terminate()
  206. self.p.wait()
  207. # self.p.communicate()
  208. # self.p.kill()
  209. logger.info("关闭管道完成, requestId:{}", self.requestId)
  210. if self.or_video_file:
  211. self.or_video_file.release()
  212. logger.info("关闭原视频写入流完成, requestId:{}", self.requestId)
  213. if self.ai_video_file:
  214. self.ai_video_file.release()
  215. logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
  216. # 构建 cv2
  217. # def build_cv2(self):
  218. # try:
  219. # if self.cap is not None:
  220. # logger.info("重试, 关闭cap, requestId:{}", self.requestId)
  221. # self.cap.release()
  222. # if self.p is not None:
  223. # logger.info("重试, 关闭管道, requestId:{}", self.requestId)
  224. # self.p.stdin.close()
  225. # self.p.terminate()
  226. # self.p.wait()
  227. # if self.pullUrl is None:
  228. # logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  229. # raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  230. # ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  231. # if self.pushUrl is None:
  232. # logger.error("推流地址不能为空, requestId:{}", self.requestId)
  233. # raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  234. # ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  235. # self.cap = cv2.VideoCapture(self.pullUrl)
  236. # if self.fps is None or self.fps == 0:
  237. # self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
  238. # if self.width is None or self.width == 0:
  239. # self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  240. # if self.height is None or self.height == 0:
  241. # self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  242. # command = ['/usr/bin/ffmpeg',
  243. # # '-y', # 不经过确认,输出时直接覆盖同名文件。
  244. # '-f', 'rawvideo',
  245. # '-vcodec', 'rawvideo',
  246. # '-pix_fmt', 'bgr24', # 显示可用的像素格式
  247. # # '-s', "{}x{}".format(self.width * 2, self.height),
  248. # '-s', "{}x{}".format(int(self.width), int(self.height/2)),
  249. # # '-r', str(15),
  250. # '-i', '-', # 指定输入文件
  251. # '-g', '25',
  252. # '-b:v', '3000k',
  253. # '-tune', 'zerolatency', # 加速编码速度
  254. # '-c:v', 'libx264', # 指定视频编码器
  255. # '-sc_threshold', '0',
  256. # '-pix_fmt', 'yuv420p',
  257. # '-an',
  258. # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  259. # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  260. # '-f', 'flv',
  261. # self.pushUrl]
  262. # # 管道配置
  263. # logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
  264. # self.p = sp.Popen(command, stdin=sp.PIPE)
  265. # except ServiceException as s:
  266. # logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId)
  267. # raise s
  268. # except Exception as e:
  269. # logger.exception("初始化cv2异常:{}, requestId:{}", e, self.requestId)
  270. # 构建 cv2
  271. def build_p(self):
  272. try:
  273. if self.p:
  274. logger.info("重试, 关闭管道, requestId:{}", self.requestId)
  275. if self.p.stdin:
  276. self.p.stdin.close()
  277. self.p.terminate()
  278. self.p.wait()
  279. # self.p.communicate()
  280. # self.p.kill()
  281. if self.pushUrl is None:
  282. logger.error("推流地址不能为空, requestId:{}", self.requestId)
  283. raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  284. ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  285. command = ['ffmpeg',
  286. # '-loglevel', 'debug',
  287. '-y',
  288. '-f', 'rawvideo',
  289. '-vcodec', 'rawvideo',
  290. '-pix_fmt', 'bgr24',
  291. '-thread_queue_size', '1024',
  292. # '-s', "{}x{}".format(self.width * 2, self.height),
  293. '-s', "{}x{}".format(int(self.width), int(self.hn)),
  294. '-r', str(self.fps),
  295. '-i', '-', # 指定输入文件
  296. '-g', str(self.fps),
  297. # '-maxrate', '15000k',
  298. # '-profile:v', 'high',
  299. # '-b:v', '4000k',
  300. # '-crf', '18',
  301. '-rc:v', 'vbr',
  302. '-cq:v', '30',
  303. '-qmin', '30',
  304. '-qmax', '30',
  305. '-c:v', 'h264_nvenc', #
  306. # '-bufsize', '4000k',
  307. # '-c:v', 'libx264', # 指定视频编码器
  308. # '-tune', 'zerolatency', # 加速编码速度
  309. # '-sc_threshold', '0',
  310. '-pix_fmt', 'yuv420p',
  311. "-an",
  312. # '-flvflags', 'no_duration_filesize',
  313. '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  314. # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  315. '-f', 'flv',
  316. self.pushUrl]
  317. # command = 'ffmpeg -loglevel debug -y -f rawvideo -vcodec rawvideo -pix_fmt bgr24' +\
  318. # ' -s ' + "{}x{}".format(int(self.width), int(self.height/2))\
  319. # + ' -i - ' + '-g ' + str(self.fps)+\
  320. # ' -b:v 6000k -tune zerolatency -c:v libx264 -pix_fmt yuv420p -preset ultrafast'+\
  321. # ' -f flv ' + self.pushUrl
  322. # kwargs = {'format': 'rawvideo',
  323. # # 'vcodec': 'rawvideo',
  324. # 'pix_fmt': 'bgr24',
  325. # 's': '{}x{}'.format(int(self.width), int(self.height/2))}
  326. # out = {
  327. # 'r': str(self.fps),
  328. # 'g': str(self.fps),
  329. # 'b:v': '5500k', # 恒定码率
  330. # # 'maxrate': '15000k',
  331. # # 'crf': '18',
  332. # 'bufsize': '5500k',
  333. # 'tune': 'zerolatency', # 加速编码速度
  334. # 'c:v': 'libx264', # 指定视频编码器
  335. # 'sc_threshold': '0',
  336. # 'pix_fmt': 'yuv420p',
  337. # # 'flvflags': 'no_duration_filesize',
  338. # 'preset': 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  339. # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  340. # 'format': 'flv'}
  341. # 管道配置
  342. # process2 = (
  343. # ffmpeg
  344. # .input('pipe:', **kwargs)
  345. # .output(self.pushUrl, **out)
  346. # .global_args('-y', '-an')
  347. # .overwrite_output()
  348. # .run_async(pipe_stdin=True)
  349. # )
  350. logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
  351. self.p = sp.Popen(command, stdin=sp.PIPE, shell=False)
  352. # self.p = process2
  353. except ServiceException as s:
  354. logger.exception("构建p管道异常: {}, requestId:{}", s, self.requestId)
  355. raise s
  356. except Exception as e:
  357. logger.exception("初始化p管道异常:{}, requestId:{}", e, self.requestId)
  358. async def push_stream_write(self, frame):
  359. self.p.stdin.write(frame.tostring())
  360. async def push_stream(self, frame):
  361. if self.p is None:
  362. self.build_p()
  363. try:
  364. await self.push_stream_write(frame)
  365. return True
  366. except Exception as ex:
  367. logger.exception("推流进管道异常:{}, requestId: {}", ex, self.requestId)
  368. current_retry_num = 0
  369. while True:
  370. try:
  371. time.sleep(1)
  372. self.p_push_retry_num += 1
  373. current_retry_num += 1
  374. if current_retry_num > 3 or self.p_push_retry_num > 600:
  375. return False
  376. self.build_p()
  377. await self.push_stream_write(frame)
  378. logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
  379. self.requestId)
  380. return True
  381. except Exception as e:
  382. logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
  383. current_retry_num, self.requestId)
  384. return False
  385. async def video_frame_write(self, or_frame, ai_frame):
  386. if or_frame is not None:
  387. self.or_video_file.write(or_frame)
  388. if ai_frame is not None:
  389. self.ai_video_file.write(ai_frame)
  390. async def video_write(self, or_frame, ai_frame):
  391. try:
  392. self.build_write()
  393. if or_frame is not None and len(or_frame) > 0:
  394. await self.video_frame_write(or_frame, None)
  395. if ai_frame is not None and len(ai_frame) > 0:
  396. await self.video_frame_write(None, ai_frame)
  397. return True
  398. except Exception as ex:
  399. ai_retry_num = 0
  400. while True:
  401. try:
  402. ai_retry_num += 1
  403. if ai_retry_num > 3:
  404. logger.exception("重新写入离线分析后视频到本地,重试失败:{}, requestId: {}", e, self.requestId)
  405. return False
  406. if or_frame is not None and len(or_frame) > 0:
  407. await self.or_video_file.write(or_frame)
  408. if ai_frame is not None and len(ai_frame) > 0:
  409. await self.ai_video_file.write(ai_frame)
  410. logger.info("重新写入离线分析后视频到本地, 当前重试次数: {}, requestId: {}", ai_retry_num,
  411. self.requestId)
  412. return True
  413. except Exception as e:
  414. logger.exception("重新写入离线分析后视频到本地:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
  415. ai_retry_num, self.requestId)
  416. def build_write(self):
  417. try:
  418. if self.fps is None or self.width is None or self.height is None:
  419. raise ServiceException(ExceptionType.VIDEO_CONFIG_EXCEPTION.value[0],
  420. ExceptionType.VIDEO_CONFIG_EXCEPTION.value[1])
  421. if self.orFilePath is not None and self.or_video_file is None:
  422. self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
  423. (int(self.wn), int(self.hn)))
  424. if self.or_video_file is None:
  425. raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0],
  426. ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1])
  427. if self.aiFilePath is not None and self.ai_video_file is None:
  428. self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
  429. (int(self.width), int(self.hn)))
  430. if self.ai_video_file is None:
  431. raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0],
  432. ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1])
  433. except ServiceException as s:
  434. logger.exception("构建文件写对象异常: {}, requestId:{}", s, self.requestId)
  435. raise s
  436. except Exception as e:
  437. logger.exception("构建文件写对象异常: {}, requestId:{}", e, self.requestId)
  438. raise e
  439. def video_merge(self, frame1, frame2):
  440. # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  441. # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  442. # frame_merge = np.hstack((frameLeft, frameRight))
  443. frame_merge = np.hstack((frame1, frame2))
  444. return frame_merge
  445. def getP(self):
  446. if self.p is None:
  447. logger.error("获取管道为空, requestId:{}", self.requestId)
  448. raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0],
  449. ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1])
  450. return self.p
  451. def getCap(self):
  452. if self.cap is None:
  453. logger.error("获取cv2为空, requestId:{}", self.requestId)
  454. raise ServiceException(ExceptionType.CV2_IS_NULL_EXCEPTION.value[0],
  455. ExceptionType.CV2_IS_NULL_EXCEPTION.value[1])
  456. return self.cap
  457. def getOrVideoFile(self):
  458. if self.or_video_file is None:
  459. logger.error("获取原视频写入对象为空, requestId:{}", self.requestId)
  460. raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0],
  461. ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1])
  462. return self.or_video_file
  463. def getAiVideoFile(self):
  464. if self.ai_video_file is None:
  465. logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId)
  466. raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0],
  467. ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1])
  468. return self.ai_video_file