You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

331 lines
20KB

  1. # -*- coding: utf-8 -*-
  2. import os
  3. from multiprocessing import Process, Queue
  4. from os import getpid
  5. from time import time, sleep
  6. from traceback import format_exc
  7. import psutil
  8. from loguru import logger
  9. from util.LogUtils import init_log
  10. from concurrency.FileUploadThread import ImageFileUpload
  11. from entity.FeedBack import message_feedback
  12. from enums.AnalysisStatusEnum import AnalysisStatus
  13. from enums.ExceptionEnum import ExceptionType
  14. from exception.CustomerException import ServiceException
  15. from util.Cv2Utils import check_video_stream, build_video_info, pull_read_video_stream, clear_pull_p
  16. from util.QueUtil import get_no_block_queue, put_queue, clear_queue, put_queue_result
  17. class PullVideoStreamProcess(Process):
  18. __slots__ = ("_command_queue", "_msg", "_context", "_fb_queue", "_pull_queue", "_image_queue", "_analyse_type",
  19. "_frame_num")
  20. def __init__(self, *args):
  21. super().__init__()
  22. # 自带参数
  23. self._command_queue = Queue()
  24. # 传参
  25. self._msg, self._context, self._fb_queue, self._pull_queue, self._image_queue, self._analyse_type, \
  26. self._frame_num = args
  27. def sendCommand(self, result):
  28. put_queue(self._command_queue, result, timeout=2, is_ex=True)
  29. @staticmethod
  30. def start_File_upload(fb_queue, context, msg, image_queue, analyse_type):
  31. image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type)
  32. image_thread.setDaemon(True)
  33. image_thread.start()
  34. return image_thread
  35. @staticmethod
  36. def check(start_time, service_timeout, request_id, image_thread):
  37. if time() - start_time > service_timeout:
  38. logger.error("拉流进程运行超时, requestId: {}", request_id)
  39. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  40. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  41. # 检测图片上传线程是否正常运行
  42. if image_thread and not image_thread.is_alive():
  43. logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常, requestId:{}", request_id)
  44. raise Exception("未检测到图片上传线程活动,图片上传线程可能出现异常!")
  45. class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
  46. __slots__ = ()
  47. def run(self):
  48. # 避免循环调用性能影响, 优先赋值
  49. context, msg, analyse_type, frame_num = self._context, self._msg, self._analyse_type, self._frame_num
  50. base_dir, env, service = context['base_dir'], context['env'], context["service"]
  51. request_id, pull_url = msg["request_id"], msg["pull_url"]
  52. pull_stream_timeout, read_stream_timeout, service_timeout = int(service["cv2_pull_stream_timeout"]), \
  53. int(service["cv2_read_stream_timeout"]), int(service["timeout"]) + 120
  54. command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \
  55. self._fb_queue
  56. image_thread, ex = None, None
  57. width, height, width_height_3, all_frames, w_2, h_2, pull_p = None, None, None, 0, None, None, None
  58. frame_list, frame_index_list = [], []
  59. ex_status = True
  60. try:
  61. # 初始化日志
  62. init_log(base_dir, env)
  63. logger.info("开启启动实时视频拉流进程, requestId:{}", request_id)
  64. # 开启图片上传线程
  65. image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
  66. cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1
  67. start_time, pull_stream_start_time, read_start_time, full_timeout = time(), None, None, None
  68. while True:
  69. # 检测任务执行是否超时、图片上传线程是否正常
  70. self.check(start_time, service_timeout, request_id, image_thread)
  71. command_msg = get_no_block_queue(command_queue)
  72. if command_msg is not None:
  73. if 'stop' == command_msg.get("command"):
  74. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  75. break
  76. if 'stop_ex' == command_msg.get("command"):
  77. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  78. ex_status = False
  79. break
  80. # 检测视频信息是否存在或拉流对象是否存在
  81. if check_video_stream(width, height):
  82. if len(frame_list) > 0:
  83. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  84. frame_list, frame_index_list = [], []
  85. logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
  86. if pull_stream_start_time is None:
  87. pull_stream_start_time = time()
  88. pull_stream_init_timeout = time() - pull_stream_start_time
  89. if pull_stream_init_timeout > pull_stream_timeout:
  90. logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, request_id)
  91. # 如果超时了, 将异常信息发送给主进程,如果队列满了,抛出异常
  92. raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
  93. ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
  94. cv2_init_num += 1
  95. width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
  96. if width is None:
  97. sleep(1)
  98. continue
  99. pull_stream_start_time, cv2_init_num = None, 1
  100. frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3,
  101. w_2, h_2, request_id)
  102. if pull_queue.full():
  103. logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
  104. if full_timeout is None:
  105. full_timeout = time()
  106. if time() - full_timeout > 180:
  107. logger.error("拉流队列阻塞超时, 请检查父进程是否正常!requestId: {}", request_id)
  108. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  109. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  110. if psutil.Process(getpid()).ppid() == 1:
  111. clear_pull_p(pull_p, request_id)
  112. ex_status = False
  113. for q in [command_queue, pull_queue, image_queue]:
  114. clear_queue(q)
  115. if image_thread and image_thread.is_alive():
  116. put_queue(image_queue, (2, "stop"), timeout=1)
  117. image_thread.join(120)
  118. logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
  119. put_queue(self._fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
  120. self._analyse_type,
  121. ExceptionType.NO_RESOURCES.value[0],
  122. ExceptionType.NO_RESOURCES.value[1]), timeout=2)
  123. break
  124. del frame
  125. continue
  126. full_timeout = None
  127. if frame is None:
  128. clear_pull_p(pull_p, request_id)
  129. if len(frame_list) > 0:
  130. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  131. frame_list, frame_index_list = [], []
  132. logger.info("获取帧为空, 开始重试: {}次, requestId: {}", init_pull_num, request_id)
  133. if read_start_time is None:
  134. read_start_time = time()
  135. pull_stream_read_timeout = time() - read_start_time
  136. if pull_stream_read_timeout > read_stream_timeout:
  137. logger.info("拉流过程中断了重试超时, 超时时间: {}, requestId: {}", pull_stream_read_timeout,
  138. request_id)
  139. raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
  140. ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
  141. init_pull_num += 1
  142. continue
  143. init_pull_num, read_start_time = 1, None
  144. frame_list.append(frame)
  145. frame_index_list.append(concurrent_frame)
  146. if len(frame_list) >= frame_num:
  147. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
  148. frame_list, frame_index_list = [], []
  149. concurrent_frame += 1
  150. del frame
  151. except ServiceException as s:
  152. logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
  153. ex = s.code, s.msg
  154. except Exception:
  155. logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", format_exc(), pull_queue.qsize(), request_id)
  156. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  157. finally:
  158. clear_pull_p(pull_p, request_id)
  159. del frame_list, frame_index_list
  160. if ex_status:
  161. if ex:
  162. code, msg = ex
  163. r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
  164. else:
  165. r = put_queue_result(pull_queue, (2,), timeout=10)
  166. if r:
  167. c_time = time()
  168. while time() - c_time < 60:
  169. command_msg = get_no_block_queue(command_queue)
  170. if command_msg is not None:
  171. if 'stop' == command_msg.get("command"):
  172. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  173. if image_thread and image_thread.is_alive():
  174. put_queue(image_queue, (2, "stop"), timeout=1)
  175. logger.info("停止图片上传线程, requestId:{}", request_id)
  176. image_thread.join(120)
  177. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  178. break
  179. for q in [command_queue, pull_queue, image_queue]:
  180. clear_queue(q)
  181. if image_thread and image_thread.is_alive():
  182. put_queue(image_queue, (2, "stop"), timeout=1)
  183. logger.info("停止图片上传线程, requestId:{}", request_id)
  184. image_thread.join(120)
  185. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  186. logger.info("实时拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
  187. image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)
  188. class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
  189. __slots__ = ()
  190. def run(self):
  191. msg, context, frame_num, analyse_type = self._msg, self._context, self._frame_num, self._analyse_type
  192. request_id, base_dir, env, pull_url = msg["request_id"], context['base_dir'], context['env'], msg["pull_url"]
  193. ex, service_timeout, full_timeout = None, int(context["service"]["timeout"]) + 120, None
  194. command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \
  195. self._fb_queue
  196. image_thread, pull_p = None, None
  197. width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None
  198. frame_list, frame_index_list = [], []
  199. ex_status = True
  200. try:
  201. # 初始化日志
  202. init_log(base_dir, env)
  203. logger.info("开启离线视频拉流进程, requestId:{}", request_id)
  204. # 开启图片上传线程
  205. image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
  206. # 初始化拉流工具类
  207. cv2_init_num, concurrent_frame = 0, 1
  208. start_time = time()
  209. while True:
  210. # 检测任务执行是否超时、图片上传线程是否正常
  211. self.check(start_time, service_timeout, request_id, image_thread)
  212. command_msg = get_no_block_queue(command_queue)
  213. if command_msg is not None:
  214. if 'stop' == command_msg.get("command"):
  215. logger.info("开始停止离线拉流进程, requestId:{}", request_id)
  216. break
  217. if 'stop_ex' == command_msg.get("command"):
  218. logger.info("开始停止离线拉流进程, requestId:{}", request_id)
  219. ex_status = False
  220. break
  221. # 检测视频信息是否存在或拉流对象是否存在
  222. if check_video_stream(width, height):
  223. logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
  224. if cv2_init_num > 3:
  225. logger.info("离线拉流重试失败, 重试次数: {}, requestId: {}", cv2_init_num, request_id)
  226. raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
  227. ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
  228. cv2_init_num += 1
  229. sleep(1)
  230. width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
  231. continue
  232. if pull_queue.full():
  233. logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
  234. if full_timeout is None:
  235. full_timeout = time()
  236. if time() - full_timeout > 180:
  237. logger.error("pull队列阻塞超时,请检测父进程是否正常!requestId: {}", request_id)
  238. raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  239. ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
  240. if psutil.Process(getpid()).ppid() == 1:
  241. clear_pull_p(pull_p, request_id)
  242. ex_status = False
  243. for q in [command_queue, pull_queue, image_queue]:
  244. clear_queue(q)
  245. put_queue(image_queue, (2, "stop"), timeout=1)
  246. image_thread.join(120)
  247. logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
  248. put_queue(self._fb_queue, message_feedback(request_id,
  249. AnalysisStatus.FAILED.value,
  250. self._analyse_type,
  251. ExceptionType.NO_RESOURCES.value[0],
  252. ExceptionType.NO_RESOURCES.value[1]), timeout=2)
  253. break
  254. continue
  255. full_timeout = None
  256. frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height,
  257. width_height_3, w_2, h_2, request_id)
  258. if frame is None:
  259. logger.info("总帧数: {}, 当前帧数: {}, requestId: {}", all_frames, concurrent_frame, request_id)
  260. clear_pull_p(pull_p, request_id)
  261. if len(frame_list) > 0:
  262. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
  263. # 允许100帧的误差
  264. if concurrent_frame < all_frames - 100:
  265. logger.info("离线拉流异常结束:requestId: {}", request_id)
  266. raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
  267. ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
  268. logger.info("离线拉流线程结束, requestId: {}", request_id)
  269. break
  270. frame_list.append(frame)
  271. frame_index_list.append(concurrent_frame)
  272. if len(frame_list) >= frame_num:
  273. put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
  274. frame_list, frame_index_list = [], []
  275. concurrent_frame += 1
  276. del frame
  277. except ServiceException as s:
  278. logger.error("离线拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
  279. ex = s.code, s.msg
  280. except Exception:
  281. logger.error("离线拉流异常: {}, requestId:{}", format_exc(), request_id)
  282. ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  283. finally:
  284. clear_pull_p(pull_p, request_id)
  285. del frame_list, frame_index_list
  286. if ex_status:
  287. if ex:
  288. code, msg = ex
  289. r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
  290. else:
  291. r = put_queue_result(pull_queue, (2,), timeout=10)
  292. if r:
  293. c_time = time()
  294. while time() - c_time < 180:
  295. command_msg = get_no_block_queue(command_queue)
  296. if command_msg is not None:
  297. if 'stop' == command_msg.get("command"):
  298. logger.info("开始停止实时拉流进程, requestId:{}", request_id)
  299. if image_thread and image_thread.is_alive():
  300. put_queue(image_queue, (2, "stop"), timeout=1)
  301. logger.info("停止图片上传线程, requestId:{}", request_id)
  302. image_thread.join(120)
  303. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  304. break
  305. for q in [command_queue, pull_queue, image_queue]:
  306. clear_queue(q)
  307. if image_thread and image_thread.is_alive():
  308. put_queue(image_queue, (2, "stop"), timeout=1)
  309. logger.info("停止图片上传线程, requestId:{}", request_id)
  310. image_thread.join(120)
  311. logger.info("停止图片上传线程结束, requestId:{}", request_id)
  312. logger.info("离线拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
  313. image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)