You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
20KB

  1. # -*- coding: utf-8 -*-
  2. import os
  3. from multiprocessing import Process, Queue
  4. from os import getpid
  5. from time import time, sleep
  6. from traceback import format_exc
  7. import psutil
  8. from loguru import logger
  9. from util.LogUtils import init_log
  10. from concurrency.FileUploadThread import ImageFileUpload
  11. from entity.FeedBack import message_feedback
  12. from enums.AnalysisStatusEnum import AnalysisStatus
  13. from enums.ExceptionEnum import ExceptionType
  14. from exception.CustomerException import ServiceException
  15. from util.Cv2Utils import check_video_stream, build_video_info, pull_read_video_stream, clear_pull_p
  16. from util.QueUtil import get_no_block_queue, put_queue, clear_queue, put_queue_result
  17. class PullVideoStreamProcess2(Process):
  18. __slots__ = ("_command_queue", "_msg", "_context", "_fb_queue", "_pull_queue", "_image_queue", "_analyse_type",
  19. "_frame_num")
  20. def __init__(self, *args):
  21. super().__init__()
  22. # 自带参数
  23. self._command_queue = Queue()
  24. # 传参
  25. self._msg, self._context, self._fb_queue, self._pull_queue, self._image_queue, self._analyse_type, \
  26. self._frame_num = args
  27. def sendCommand(self, result):
  28. try:
  29. self._command_queue.put(result, timeout=10)
  30. except Exception:
  31. logger.error("添加队列超时异常:{}, requestId:{}", format_exc(), self._msg.get("request_id"))
  32. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  33. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  34. @staticmethod
  35. def start_File_upload(*args):
  36. fb_queue, context, msg, image_queue, analyse_type = args
  37. image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type)
  38. image_thread.setDaemon(True)
  39. image_thread.start()
  40. return image_thread
  41. @staticmethod
  42. def check(start_time, service_timeout, request_id, image_thread):
  43. if time() - start_time > service_timeout:
  44. logger.error("分析超时, requestId: {}", request_id)
  45. raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0],
  46. ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1])
  47. # 检测图片上传线程是否正常运行
  48. if image_thread is not None and not image_thread.is_alive():
  49. logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常, requestId:{}", request_id)
  50. raise Exception("未检测到图片上传线程活动,图片上传线程可能出现异常!")
  51. class OnlinePullVideoStreamProcess2(PullVideoStreamProcess2):
  52. __slots__ = ()
  53. def run(self):
  54. # 避免循环调用性能影响, 优先赋值
  55. context, msg, analyse_type = self._context, self._msg, self._analyse_type
  56. request_id, base_dir, env = msg["request_id"], context['base_dir'], context['env']
  57. pull_url, frame_num = msg["pull_url"], self._frame_num
  58. pull_stream_timeout = int(context["service"]["cv2_pull_stream_timeout"])
  59. read_stream_timeout = int(context["service"]["cv2_read_stream_timeout"])
  60. service_timeout = int(context["service"]["timeout"])
  61. command_queue, pull_queue, image_queue = self._command_queue, self._pull_queue, self._image_queue
  62. fb_queue = self._fb_queue
  63. image_thread, pull_p = None, None
  64. width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None
  65. frame_list, frame_index_list = [], []
  66. ex = None
  67. ex_status = True
  68. full_timeout = None
  69. try:
  70. # 初始化日志
  71. init_log(base_dir, env)
  72. logger.info("开启实时视频拉流进程, requestId:{}", request_id)
  73. # 开启图片上传线程
  74. image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
  75. # 初始化拉流工具类
  76. cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1
  77. start_time, pull_start_time, read_start_time = time(), None, None
  78. while True:
  79. # 检测任务执行是否超时、图片上传线程是否正常
  80. self.check(start_time, service_timeout, request_id, image_thread)
  81. command_msg = get_no_block_queue(command_queue)
  82. if command_msg is not None:
  83. if 'stop' == command_msg.get("command"):
  84. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  85. break
  86. if 'stop_ex' == command_msg.get("command"):
  87. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  88. ex_status = False
  89. break
  90. # 检测视频信息是否存在或拉流对象是否存在
  91. if check_video_stream(width, height):
  92. if len(frame_list) > 0:
  93. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  94. frame_list, frame_index_list = [], []
  95. logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
  96. if pull_start_time is None:
  97. pull_start_time = time()
  98. pull_stream_init_timeout = time() - pull_start_time
  99. if pull_stream_init_timeout > pull_stream_timeout:
  100. logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, request_id)
  101. raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
  102. ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
  103. cv2_init_num += 1
  104. width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
  105. if width is None:
  106. sleep(1)
  107. continue
  108. pull_start_time, cv2_init_num = None, 1
  109. frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3,
  110. w_2, h_2, request_id)
  111. if frame is None:
  112. if len(frame_list) > 0:
  113. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  114. frame_list, frame_index_list = [], []
  115. logger.info("获取帧为空, 开始重试: {}次, requestId: {}", init_pull_num, request_id)
  116. if read_start_time is None:
  117. read_start_time = time()
  118. pull_stream_read_timeout = time() - read_start_time
  119. if pull_stream_read_timeout > read_stream_timeout:
  120. logger.info("拉流过程中断了重试超时, 超时时间: {}, requestId: {}", pull_stream_read_timeout,
  121. request_id)
  122. raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
  123. ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
  124. init_pull_num += 1
  125. continue
  126. init_pull_num, read_start_time = 1, None
  127. if pull_queue.full():
  128. logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
  129. if full_timeout is None:
  130. full_timeout = time()
  131. if time() - full_timeout > 180:
  132. logger.error("拉流队列阻塞异常, requestId: {}", request_id)
  133. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  134. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  135. if psutil.Process(getpid()).ppid() == 1:
  136. clear_pull_p(pull_p, request_id)
  137. ex_status = False
  138. for q in [command_queue, pull_queue, image_queue]:
  139. clear_queue(q)
  140. if image_thread and image_thread.is_alive():
  141. put_queue(image_queue, (2, "stop"), timeout=1)
  142. image_thread.join(120)
  143. logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
  144. put_queue(self._fb_queue, message_feedback(request_id,
  145. AnalysisStatus.FAILED.value,
  146. self._analyse_type,
  147. ExceptionType.NO_RESOURCES.value[0],
  148. ExceptionType.NO_RESOURCES.value[1]))
  149. break
  150. del frame
  151. continue
  152. full_timeout = None
  153. frame_list.append(frame)
  154. frame_index_list.append(concurrent_frame)
  155. if len(frame_list) >= frame_num:
  156. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
  157. frame_list, frame_index_list = [], []
  158. concurrent_frame += 1
  159. del frame
  160. except ServiceException as s:
  161. logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
  162. ex = s.code, s.msg
  163. except Exception:
  164. logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", format_exc(), pull_queue.qsize(), request_id)
  165. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  166. finally:
  167. clear_pull_p(pull_p, request_id)
  168. del frame_list, frame_index_list
  169. if ex_status:
  170. if ex:
  171. code, msg = ex
  172. r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
  173. else:
  174. r = put_queue_result(pull_queue, (2,), timeout=10)
  175. if r:
  176. c_time = time()
  177. while time() - c_time < 180:
  178. command_msg = get_no_block_queue(command_queue)
  179. if command_msg is not None:
  180. if 'stop' == command_msg.get("command"):
  181. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  182. if image_thread and image_thread.is_alive():
  183. put_queue(image_queue, (2, "stop"), timeout=1)
  184. logger.info("停止图片上传线程, requestId:{}", request_id)
  185. image_thread.join(120)
  186. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  187. break
  188. for q in [command_queue, pull_queue, image_queue]:
  189. clear_queue(q)
  190. if image_thread and image_thread.is_alive():
  191. put_queue(image_queue, (2, "stop"), timeout=1)
  192. logger.info("停止图片上传线程, requestId:{}", request_id)
  193. image_thread.join(120)
  194. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  195. logger.info("实时拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
  196. image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)
  197. class OfflinePullVideoStreamProcess2(PullVideoStreamProcess2):
  198. __slots__ = ()
  199. def run(self):
  200. msg, context, frame_num, analyse_type = self._msg, self._context, self._frame_num, self._analyse_type
  201. request_id, base_dir, env, pull_url = msg["request_id"], context['base_dir'], context['env'], msg["pull_url"]
  202. ex, service_timeout = None, int(context["service"]["timeout"])
  203. command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \
  204. self._fb_queue
  205. image_thread, pull_p = None, None
  206. width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None
  207. frame_list, frame_index_list = [], []
  208. ex_status = True
  209. full_timeout = None
  210. try:
  211. # 初始化日志
  212. init_log(base_dir, env)
  213. logger.info("开启离线视频拉流进程, requestId:{}", request_id)
  214. # 开启图片上传线程
  215. image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
  216. # 初始化拉流工具类
  217. cv2_init_num = 0
  218. concurrent_frame = 1
  219. start_time = time()
  220. while True:
  221. # 检测任务执行是否超时、图片上传线程是否正常
  222. self.check(start_time, service_timeout, request_id, image_thread)
  223. command_msg = get_no_block_queue(command_queue)
  224. if command_msg is not None:
  225. if 'stop' == command_msg.get("command"):
  226. logger.info("开始停止离线拉流进程, requestId:{}", request_id)
  227. break
  228. if 'stop_ex' == command_msg.get("command"):
  229. logger.info("开始停止离线拉流进程, requestId:{}", request_id)
  230. ex_status = False
  231. break
  232. # 检测视频信息是否存在或拉流对象是否存在
  233. if check_video_stream(width, height):
  234. if len(frame_list) > 0:
  235. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  236. frame_list, frame_index_list = [], []
  237. logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
  238. if cv2_init_num > 3:
  239. clear_pull_p(pull_p, request_id)
  240. logger.info("离线拉流重试失败, 重试次数: {}, requestId: {}", cv2_init_num, request_id)
  241. raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
  242. ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
  243. cv2_init_num += 1
  244. sleep(1)
  245. width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
  246. continue
  247. if pull_queue.full():
  248. logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
  249. if full_timeout is None:
  250. full_timeout = time()
  251. if time() - full_timeout > 300:
  252. logger.error("拉流队列阻塞超时, 请检查父进程是否正常!requestId: {}", request_id)
  253. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  254. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  255. if psutil.Process(getpid()).ppid() == 1:
  256. clear_pull_p(pull_p, request_id)
  257. ex_status = False
  258. for q in [command_queue, pull_queue, image_queue]:
  259. clear_queue(q)
  260. if image_thread and image_thread.is_alive():
  261. put_queue(image_queue, (2, "stop"), timeout=1)
  262. image_thread.join(120)
  263. logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
  264. put_queue(self._fb_queue, message_feedback(request_id,
  265. AnalysisStatus.FAILED.value,
  266. self._analyse_type,
  267. ExceptionType.NO_RESOURCES.value[0],
  268. ExceptionType.NO_RESOURCES.value[1]))
  269. break
  270. continue
  271. full_timeout = None
  272. frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3,
  273. w_2, h_2, request_id)
  274. if frame is None:
  275. logger.info("总帧数: {}, 当前帧数: {}, requestId: {}", all_frames, concurrent_frame, request_id)
  276. clear_pull_p(pull_p, request_id)
  277. if len(frame_list) > 0:
  278. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=2, is_ex=False)
  279. frame_list, frame_index_list = [], []
  280. # 允许100帧的误差
  281. if concurrent_frame < all_frames - 100:
  282. logger.info("离线拉流异常结束:requestId: {}", request_id)
  283. raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
  284. ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
  285. logger.info("离线拉流线程结束, requestId: {}", request_id)
  286. break
  287. frame_list.append(frame)
  288. frame_index_list.append(concurrent_frame)
  289. if len(frame_list) >= frame_num:
  290. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
  291. frame_list, frame_index_list = [], []
  292. concurrent_frame += 1
  293. del frame
  294. except ServiceException as s:
  295. logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
  296. ex = s.code, s.msg
  297. except Exception:
  298. logger.error("实时拉流异常: {}, requestId:{}", format_exc(), request_id)
  299. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  300. finally:
  301. clear_pull_p(pull_p, request_id)
  302. del frame_list, frame_index_list
  303. if ex_status:
  304. if ex:
  305. code, msg = ex
  306. r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
  307. else:
  308. r = put_queue_result(pull_queue, (2,), timeout=10)
  309. if r:
  310. c_time = time()
  311. while time() - c_time < 180:
  312. command_msg = get_no_block_queue(command_queue)
  313. if command_msg is not None:
  314. if 'stop' == command_msg.get("command"):
  315. logger.info("开始停止离线拉流进程, requestId:{}", request_id)
  316. if image_thread and image_thread.is_alive():
  317. put_queue(image_queue, (2, "stop"), timeout=1)
  318. logger.info("停止图片上传线程, requestId:{}", request_id)
  319. image_thread.join(120)
  320. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  321. break
  322. for q in [command_queue, pull_queue, image_queue]:
  323. clear_queue(q)
  324. if image_thread and image_thread.is_alive():
  325. put_queue(image_queue, (2, "stop"), timeout=1)
  326. logger.info("停止图片上传线程, requestId:{}", request_id)
  327. image_thread.join(120)
  328. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  329. logger.info("离线拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
  330. image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)