空港防疫算法交互
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

432 lines
21KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import cv2
  4. import subprocess as sp
  5. import ffmpeg
  6. import numpy as np
  7. from loguru import logger
  8. from exception.CustomerException import ServiceException
  9. from enums.ExceptionEnum import ExceptionType
  10. class Cv2Util():
  11. def __init__(self, pullUrl, pushUrl=None, orFilePath=None, aiFilePath=None, requestId=None):
  12. self.pullUrl = pullUrl
  13. self.pushUrl = pushUrl
  14. self.orFilePath = orFilePath
  15. self.aiFilePath = aiFilePath
  16. self.cap = None
  17. self.p = None
  18. self.or_video_file = None
  19. self.ai_video_file = None
  20. self.fps = None
  21. self.width = None
  22. self.height = None
  23. self.wh = None
  24. self.all_frames = None
  25. self.bit_rate = None
  26. self.pull_p = None
  27. self.requestId = requestId
  28. self.p_push_retry_num = 0
  29. '''
  30. 获取视频信息
  31. '''
  32. def get_video_info(self):
  33. try:
  34. if self.pullUrl is None:
  35. logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  36. raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  37. ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  38. probe = ffmpeg.probe(self.pullUrl)
  39. # 视频大小
  40. # format = probe['format']
  41. # size = int(format['size'])/1024/1024
  42. video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
  43. if video_stream is None:
  44. logger.error("根据拉流地址未获取到视频流, requestId:{}", self.requestId)
  45. return
  46. width = video_stream.get('width')
  47. height = video_stream.get('height')
  48. nb_frames = video_stream.get('nb_frames')
  49. fps = video_stream.get('r_frame_rate')
  50. # duration = video_stream.get('duration')
  51. bit_rate = video_stream.get('bit_rate')
  52. if width:
  53. self.width = int(width)
  54. if height:
  55. self.height = int(height)
  56. if width is not None and height is not None:
  57. self.wh = int(width * height * 3)
  58. if nb_frames:
  59. self.all_frames = int(nb_frames)
  60. if fps:
  61. up, down = str(fps).split('/')
  62. self.fps = int(eval(up) / eval(down))
  63. # if duration:
  64. # self.duration = float(video_stream['duration'])
  65. if bit_rate:
  66. self.bit_rate = int(bit_rate) / 1000
  67. logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}|bit_rate:{}, requestId:{}", self.width,
  68. self.height, self.fps, self.all_frames, self.bit_rate, self.requestId)
  69. except ServiceException as s:
  70. logger.exception("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
  71. raise s
  72. except Exception as e:
  73. logger.exception("获取视频信息异常:{}, requestId:{}", e, self.requestId)
  74. '''
  75. 拉取视频
  76. '''
  77. def build_pull_p(self):
  78. try:
  79. if self.width is None or self.height is None:
  80. return
  81. if self.pull_p:
  82. logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
  83. self.pull_p.stdout.close()
  84. self.pull_p.terminate()
  85. self.pull_p.wait()
  86. if self.pullUrl is None:
  87. logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  88. raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  89. ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  90. # command = ['ffmpeg',
  91. # # '-b:v', '3000k',
  92. # '-i', self.pullUrl,
  93. # '-f', 'rawvideo',
  94. # '-vcodec', 'rawvideo',
  95. # '-pix_fmt', 'bgr24',
  96. # # '-s', "{}x{}".format(int(width), int(height)),
  97. # '-an',
  98. # '-']
  99. aa = {'loglevel': 'error', 'c:v': 'h264_cuvid'}
  100. process = (
  101. ffmpeg
  102. .input(self.pullUrl, **aa)
  103. .output('pipe:', format='rawvideo', pix_fmt='bgr24', loglevel='error')
  104. .overwrite_output()
  105. .global_args('-an')
  106. .run_async(pipe_stdout=True)
  107. )
  108. # self.pull_p = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE)
  109. self.pull_p = process
  110. except ServiceException as s:
  111. logger.exception("构建拉流管道异常: {}, requestId:{}", s, self.requestId)
  112. raise s
  113. except Exception as e:
  114. logger.exception("构建拉流管道异常:{}, requestId:{}", e, self.requestId)
  115. def checkconfig(self):
  116. if self.fps is None or self.width is None or self.height is None:
  117. return True
  118. return False
  119. def read(self):
  120. result = None
  121. try:
  122. # if self.pull_p is None:
  123. # logger.error("拉流管道为空, requestId:{}", self.requestId)
  124. # raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0],
  125. # ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1])
  126. in_bytes = self.pull_p.stdout.read(self.wh)
  127. if in_bytes is not None and len(in_bytes) > 0:
  128. result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.height), int(self.width), 3]))
  129. result = cv2.resize(result, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  130. except ServiceException as s:
  131. logger.exception("读流异常: {}, requestId:{}", s, self.requestId)
  132. except Exception as e:
  133. logger.exception("读流异常:{}, requestId:{}", e, self.requestId)
  134. return result
  135. def close(self):
  136. if self.pull_p:
  137. if self.pull_p.stdout:
  138. self.pull_p.stdout.close()
  139. self.pull_p.terminate()
  140. self.pull_p.wait()
  141. logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
  142. if self.p:
  143. if self.p.stdin:
  144. self.p.stdin.close()
  145. self.p.terminate()
  146. self.p.wait()
  147. # self.p.communicate()
  148. # self.p.kill()
  149. logger.info("关闭管道完成, requestId:{}", self.requestId)
  150. if self.or_video_file:
  151. self.or_video_file.release()
  152. logger.info("关闭原视频写入流完成, requestId:{}", self.requestId)
  153. if self.ai_video_file:
  154. self.ai_video_file.release()
  155. logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
  156. # 构建 cv2
  157. # def build_cv2(self):
  158. # try:
  159. # if self.cap is not None:
  160. # logger.info("重试, 关闭cap, requestId:{}", self.requestId)
  161. # self.cap.release()
  162. # if self.p is not None:
  163. # logger.info("重试, 关闭管道, requestId:{}", self.requestId)
  164. # self.p.stdin.close()
  165. # self.p.terminate()
  166. # self.p.wait()
  167. # if self.pullUrl is None:
  168. # logger.error("拉流地址不能为空, requestId:{}", self.requestId)
  169. # raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
  170. # ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
  171. # if self.pushUrl is None:
  172. # logger.error("推流地址不能为空, requestId:{}", self.requestId)
  173. # raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  174. # ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  175. # self.cap = cv2.VideoCapture(self.pullUrl)
  176. # if self.fps is None or self.fps == 0:
  177. # self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
  178. # if self.width is None or self.width == 0:
  179. # self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  180. # if self.height is None or self.height == 0:
  181. # self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  182. # command = ['/usr/bin/ffmpeg',
  183. # # '-y', # 不经过确认,输出时直接覆盖同名文件。
  184. # '-f', 'rawvideo',
  185. # '-vcodec', 'rawvideo',
  186. # '-pix_fmt', 'bgr24', # 显示可用的像素格式
  187. # # '-s', "{}x{}".format(self.width * 2, self.height),
  188. # '-s', "{}x{}".format(int(self.width), int(self.height/2)),
  189. # # '-r', str(15),
  190. # '-i', '-', # 指定输入文件
  191. # '-g', '25',
  192. # '-b:v', '3000k',
  193. # '-tune', 'zerolatency', # 加速编码速度
  194. # '-c:v', 'libx264', # 指定视频编码器
  195. # '-sc_threshold', '0',
  196. # '-pix_fmt', 'yuv420p',
  197. # '-an',
  198. # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  199. # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  200. # '-f', 'flv',
  201. # self.pushUrl]
  202. # # 管道配置
  203. # logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
  204. # self.p = sp.Popen(command, stdin=sp.PIPE)
  205. # except ServiceException as s:
  206. # logger.exception("构建cv2异常: {}, requestId:{}", s, self.requestId)
  207. # raise s
  208. # except Exception as e:
  209. # logger.exception("初始化cv2异常:{}, requestId:{}", e, self.requestId)
  210. # 构建 cv2
  211. def build_p(self):
  212. try:
  213. if self.p:
  214. logger.info("重试, 关闭管道, requestId:{}", self.requestId)
  215. if self.p.stdin:
  216. self.p.stdin.close()
  217. self.p.terminate()
  218. self.p.wait()
  219. # self.p.communicate()
  220. # self.p.kill()
  221. if self.pushUrl is None:
  222. logger.error("推流地址不能为空, requestId:{}", self.requestId)
  223. raise ServiceException(ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[0],
  224. ExceptionType.PUSH_STREAM_URL_EXCEPTION.value[1])
  225. command = ['ffmpeg',
  226. # '-loglevel', 'debug',
  227. '-y', # 不经过确认,输出时直接覆盖同名文件。
  228. '-f', 'rawvideo',
  229. '-vcodec', 'rawvideo',
  230. '-pix_fmt', 'bgr24',
  231. '-thread_queue_size', '16',
  232. # '-s', "{}x{}".format(self.width * 2, self.height),
  233. '-s', "{}x{}".format(int(self.width), int(self.height / 2)),
  234. '-r', str(self.fps),
  235. '-i', '-', # 指定输入文件
  236. '-g', str(self.fps),
  237. # '-maxrate', '15000k',
  238. # '-profile:v', 'high',
  239. # '-b:v', '4000k',
  240. # '-crf', '18',
  241. '-rc:v', 'vbr',
  242. '-cq:v', '30',
  243. '-qmin', '30',
  244. '-qmax', '30',
  245. '-c:v', 'h264_nvenc', #
  246. # '-bufsize', '4000k',
  247. # '-c:v', 'libx264', # 指定视频编码器
  248. # '-tune', 'zerolatency', # 加速编码速度
  249. # '-sc_threshold', '0',
  250. '-pix_fmt', 'yuv420p',
  251. "-an",
  252. # '-flvflags', 'no_duration_filesize',
  253. '-preset', 'fast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  254. # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  255. '-f', 'flv',
  256. self.pushUrl]
  257. # command = 'ffmpeg -loglevel debug -y -f rawvideo -vcodec rawvideo -pix_fmt bgr24' +\
  258. # ' -s ' + "{}x{}".format(int(self.width), int(self.height/2))\
  259. # + ' -i - ' + '-g ' + str(self.fps)+\
  260. # ' -b:v 6000k -tune zerolatency -c:v libx264 -pix_fmt yuv420p -preset ultrafast'+\
  261. # ' -f flv ' + self.pushUrl
  262. # kwargs = {'format': 'rawvideo',
  263. # # 'vcodec': 'rawvideo',
  264. # 'pix_fmt': 'bgr24',
  265. # 's': '{}x{}'.format(int(self.width), int(self.height/2))}
  266. # out = {
  267. # 'r': str(self.fps),
  268. # 'g': str(self.fps),
  269. # 'b:v': '5500k', # 恒定码率
  270. # # 'maxrate': '15000k',
  271. # # 'crf': '18',
  272. # 'bufsize': '5500k',
  273. # 'tune': 'zerolatency', # 加速编码速度
  274. # 'c:v': 'libx264', # 指定视频编码器
  275. # 'sc_threshold': '0',
  276. # 'pix_fmt': 'yuv420p',
  277. # # 'flvflags': 'no_duration_filesize',
  278. # 'preset': 'medium', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
  279. # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
  280. # 'format': 'flv'}
  281. # 管道配置
  282. # process2 = (
  283. # ffmpeg
  284. # .input('pipe:', **kwargs)
  285. # .output(self.pushUrl, **out)
  286. # .global_args('-y', '-an')
  287. # .overwrite_output()
  288. # .run_async(pipe_stdin=True)
  289. # )
  290. logger.info("fps:{}|height:{}|width:{}|requestId:{}", self.fps, self.height, self.width, self.requestId)
  291. self.p = sp.Popen(command, stdin=sp.PIPE, shell=False)
  292. # self.p = process2
  293. except ServiceException as s:
  294. logger.exception("构建p管道异常: {}, requestId:{}", s, self.requestId)
  295. raise s
  296. except Exception as e:
  297. logger.exception("初始化p管道异常:{}, requestId:{}", e, self.requestId)
  298. def push_stream(self, frame):
  299. if self.p is None:
  300. self.build_p()
  301. try:
  302. self.p.stdin.write(frame.tostring())
  303. except Exception as ex:
  304. logger.exception("推流进管道异常:{}, requestId: {}", ex, self.requestId)
  305. current_retry_num = 0
  306. while True:
  307. try:
  308. if self.p_push_retry_num > 20:
  309. logger.info("推流失败重试次数过多, 请检查相关配置信息, 当前重试次数: {}, requestId: {}",
  310. self.p_push_retry_num, self.requestId)
  311. current_retry_num = 4
  312. break
  313. self.p_push_retry_num += 1
  314. time.sleep(10)
  315. self.build_p()
  316. self.p.stdin.write(frame.tostring())
  317. logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
  318. self.requestId)
  319. break
  320. except Exception as e:
  321. current_retry_num += 1
  322. logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
  323. current_retry_num, self.requestId)
  324. if current_retry_num > 3:
  325. logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.requestId)
  326. break
  327. if current_retry_num > 3:
  328. raise ServiceException(ExceptionType.PUSH_STREAM_URL_E_EXCEPTION.value[0],
  329. ExceptionType.PUSH_STREAM_URL_E_EXCEPTION.value[1])
  330. def video_write(self, or_frame, ai_frame):
  331. try:
  332. self.build_write()
  333. if or_frame is not None and len(or_frame) > 0:
  334. self.or_video_file.write(or_frame)
  335. if ai_frame is not None and len(ai_frame) > 0:
  336. self.ai_video_file.write(ai_frame)
  337. except Exception as ex:
  338. ai_retry_num = 0
  339. while True:
  340. try:
  341. if or_frame is not None and len(or_frame) > 0:
  342. self.or_video_file.write(or_frame)
  343. if ai_frame is not None and len(ai_frame) > 0:
  344. self.ai_video_file.write(ai_frame)
  345. logger.info("重新写入离线分析后视频到本地, 当前重试次数: {}, requestId: {}", ai_retry_num,
  346. self.requestId)
  347. break
  348. except Exception as e:
  349. ai_retry_num += 1
  350. logger.exception("重新写入离线分析后视频到本地:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
  351. ai_retry_num, self.requestId)
  352. if ai_retry_num > 3:
  353. logger.exception("重新写入离线分析后视频到本地,重试失败:{}, requestId: {}", e, self.requestId)
  354. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  355. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  356. def build_write(self):
  357. try:
  358. if self.fps is None or self.width is None or self.height is None:
  359. raise ServiceException(ExceptionType.VIDEO_CONFIG_EXCEPTION.value[0],
  360. ExceptionType.VIDEO_CONFIG_EXCEPTION.value[1])
  361. if self.orFilePath is not None and self.or_video_file is None:
  362. self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
  363. (int(self.width / 2), int(self.height / 2)))
  364. if self.or_video_file is None:
  365. raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0],
  366. ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1])
  367. if self.aiFilePath is not None and self.ai_video_file is None:
  368. self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
  369. (int(self.width), int(self.height / 2)))
  370. if self.ai_video_file is None:
  371. raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0],
  372. ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1])
  373. except ServiceException as s:
  374. logger.exception("构建文件写对象异常: {}, requestId:{}", s, self.requestId)
  375. raise s
  376. except Exception as e:
  377. logger.exception("构建文件写对象异常: {}, requestId:{}", e, self.requestId)
  378. raise e
  379. def video_merge(self, frame1, frame2):
  380. # frameLeft = cv2.resize(frame1, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  381. # frameRight = cv2.resize(frame2, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
  382. # frame_merge = np.hstack((frameLeft, frameRight))
  383. frame_merge = np.hstack((frame1, frame2))
  384. return frame_merge
  385. def getP(self):
  386. if self.p is None:
  387. logger.error("获取管道为空, requestId:{}", self.requestId)
  388. raise ServiceException(ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[0],
  389. ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1])
  390. return self.p
  391. def getCap(self):
  392. if self.cap is None:
  393. logger.error("获取cv2为空, requestId:{}", self.requestId)
  394. raise ServiceException(ExceptionType.CV2_IS_NULL_EXCEPTION.value[0],
  395. ExceptionType.CV2_IS_NULL_EXCEPTION.value[1])
  396. return self.cap
  397. def getOrVideoFile(self):
  398. if self.or_video_file is None:
  399. logger.error("获取原视频写入对象为空, requestId:{}", self.requestId)
  400. raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0],
  401. ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1])
  402. return self.or_video_file
  403. def getAiVideoFile(self):
  404. if self.ai_video_file is None:
  405. logger.error("获取AI视频写入对象为空, requestId:{}", self.requestId)
  406. raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0],
  407. ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1])
  408. return self.ai_video_file