# -*- coding: utf-8 -*- import os from multiprocessing import Process, Queue from os import getpid from time import time, sleep from traceback import format_exc import psutil from loguru import logger from util.LogUtils import init_log from concurrency.FileUploadThread import ImageFileUpload from entity.FeedBack import message_feedback from enums.AnalysisStatusEnum import AnalysisStatus from enums.ExceptionEnum import ExceptionType from exception.CustomerException import ServiceException from util.Cv2Utils import check_video_stream, build_video_info, pull_read_video_stream, clear_pull_p from util.QueUtil import get_no_block_queue, put_queue, clear_queue, put_queue_result class PullVideoStreamProcess2(Process): __slots__ = ("_command_queue", "_msg", "_context", "_fb_queue", "_pull_queue", "_image_queue", "_analyse_type", "_frame_num") def __init__(self, *args): super().__init__() # 自带参数 self._command_queue = Queue() # 传参 self._msg, self._context, self._fb_queue, self._pull_queue, self._image_queue, self._analyse_type, \ self._frame_num = args def sendCommand(self, result): try: self._command_queue.put(result, timeout=10) except Exception: logger.error("添加队列超时异常:{}, requestId:{}", format_exc(), self._msg.get("request_id")) raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]) @staticmethod def start_File_upload(*args): fb_queue, context, msg, image_queue, analyse_type = args image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type) image_thread.setDaemon(True) image_thread.start() return image_thread @staticmethod def check(start_time, service_timeout, request_id, image_thread): if time() - start_time > service_timeout: logger.error("分析超时, requestId: {}", request_id) raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0], ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1]) # 检测图片上传线程是否正常运行 if image_thread is not None and not image_thread.is_alive(): logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常, requestId:{}", request_id) raise Exception("未检测到图片上传线程活动,图片上传线程可能出现异常!") class OnlinePullVideoStreamProcess2(PullVideoStreamProcess2): __slots__ = () def run(self): # 避免循环调用性能影响, 优先赋值 context, msg, analyse_type = self._context, self._msg, self._analyse_type request_id, base_dir, env = msg["request_id"], context['base_dir'], context['env'] pull_url, frame_num = msg["pull_url"], self._frame_num pull_stream_timeout = int(context["service"]["cv2_pull_stream_timeout"]) read_stream_timeout = int(context["service"]["cv2_read_stream_timeout"]) service_timeout = int(context["service"]["timeout"]) command_queue, pull_queue, image_queue = self._command_queue, self._pull_queue, self._image_queue fb_queue = self._fb_queue image_thread, pull_p = None, None width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None frame_list, frame_index_list = [], [] ex = None ex_status = True full_timeout = None try: # 初始化日志 init_log(base_dir, env) logger.info("开启实时视频拉流进程, requestId:{}", request_id) # 开启图片上传线程 image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type) # 初始化拉流工具类 cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1 start_time, pull_start_time, read_start_time = time(), None, None while True: # 检测任务执行是否超时、图片上传线程是否正常 self.check(start_time, service_timeout, request_id, image_thread) command_msg = get_no_block_queue(command_queue) if command_msg is not None: if 'stop' == command_msg.get("command"): logger.info("开始停止实时拉流进程, requestId:{}", request_id) break if 'stop_ex' == command_msg.get("command"): logger.info("开始停止实时拉流进程, requestId:{}", request_id) ex_status = False break # 检测视频信息是否存在或拉流对象是否存在 if check_video_stream(width, height): if len(frame_list) > 0: put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1) frame_list, frame_index_list = [], [] logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id) if pull_start_time is None: pull_start_time = time() pull_stream_init_timeout = time() - pull_start_time if pull_stream_init_timeout > pull_stream_timeout: logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, request_id) raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0], ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1]) cv2_init_num += 1 width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id) if width is None: sleep(1) continue pull_start_time, cv2_init_num = None, 1 frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3, w_2, h_2, request_id) if frame is None: if len(frame_list) > 0: put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1) frame_list, frame_index_list = [], [] logger.info("获取帧为空, 开始重试: {}次, requestId: {}", init_pull_num, request_id) if read_start_time is None: read_start_time = time() pull_stream_read_timeout = time() - read_start_time if pull_stream_read_timeout > read_stream_timeout: logger.info("拉流过程中断了重试超时, 超时时间: {}, requestId: {}", pull_stream_read_timeout, request_id) raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0], ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1]) init_pull_num += 1 continue init_pull_num, read_start_time = 1, None if pull_queue.full(): logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id) if full_timeout is None: full_timeout = time() if time() - full_timeout > 180: logger.error("拉流队列阻塞异常, requestId: {}", request_id) raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]) if psutil.Process(getpid()).ppid() == 1: clear_pull_p(pull_p, request_id) ex_status = False for q in [command_queue, pull_queue, image_queue]: clear_queue(q) if image_thread and image_thread.is_alive(): put_queue(image_queue, (2, "stop"), timeout=1) image_thread.join(120) logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id) put_queue(self._fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, self._analyse_type, ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1])) break del frame continue full_timeout = None frame_list.append(frame) frame_index_list.append(concurrent_frame) if len(frame_list) >= frame_num: put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True) frame_list, frame_index_list = [], [] concurrent_frame += 1 del frame except ServiceException as s: logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id) ex = s.code, s.msg except Exception: logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", format_exc(), pull_queue.qsize(), request_id) ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] finally: clear_pull_p(pull_p, request_id) del frame_list, frame_index_list if ex_status: if ex: code, msg = ex r = put_queue_result(pull_queue, (1, code, msg), timeout=10) else: r = put_queue_result(pull_queue, (2,), timeout=10) if r: c_time = time() while time() - c_time < 180: command_msg = get_no_block_queue(command_queue) if command_msg is not None: if 'stop' == command_msg.get("command"): logger.info("开始停止实时拉流进程, requestId:{}", request_id) if image_thread and image_thread.is_alive(): put_queue(image_queue, (2, "stop"), timeout=1) logger.info("停止图片上传线程, requestId:{}", request_id) image_thread.join(120) logger.info("停止图片上传线程结束, requestId:{}", request_id) break for q in [command_queue, pull_queue, image_queue]: clear_queue(q) if image_thread and image_thread.is_alive(): put_queue(image_queue, (2, "stop"), timeout=1) logger.info("停止图片上传线程, requestId:{}", request_id) image_thread.join(120) logger.info("停止图片上传线程结束, requestId:{}", request_id) logger.info("实时拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}", image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id) class OfflinePullVideoStreamProcess2(PullVideoStreamProcess2): __slots__ = () def run(self): msg, context, frame_num, analyse_type = self._msg, self._context, self._frame_num, self._analyse_type request_id, base_dir, env, pull_url = msg["request_id"], context['base_dir'], context['env'], msg["pull_url"] ex, service_timeout = None, int(context["service"]["timeout"]) command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \ self._fb_queue image_thread, pull_p = None, None width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None frame_list, frame_index_list = [], [] ex_status = True full_timeout = None try: # 初始化日志 init_log(base_dir, env) logger.info("开启离线视频拉流进程, requestId:{}", request_id) # 开启图片上传线程 image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type) # 初始化拉流工具类 cv2_init_num = 0 concurrent_frame = 1 start_time = time() while True: # 检测任务执行是否超时、图片上传线程是否正常 self.check(start_time, service_timeout, request_id, image_thread) command_msg = get_no_block_queue(command_queue) if command_msg is not None: if 'stop' == command_msg.get("command"): logger.info("开始停止离线拉流进程, requestId:{}", request_id) break if 'stop_ex' == command_msg.get("command"): logger.info("开始停止离线拉流进程, requestId:{}", request_id) ex_status = False break # 检测视频信息是否存在或拉流对象是否存在 if check_video_stream(width, height): if len(frame_list) > 0: put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1) frame_list, frame_index_list = [], [] logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id) if cv2_init_num > 3: clear_pull_p(pull_p, request_id) logger.info("离线拉流重试失败, 重试次数: {}, requestId: {}", cv2_init_num, request_id) raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0], ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1]) cv2_init_num += 1 sleep(1) width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id) continue if pull_queue.full(): logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id) if full_timeout is None: full_timeout = time() if time() - full_timeout > 300: logger.error("拉流队列阻塞超时, 请检查父进程是否正常!requestId: {}", request_id) raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]) if psutil.Process(getpid()).ppid() == 1: clear_pull_p(pull_p, request_id) ex_status = False for q in [command_queue, pull_queue, image_queue]: clear_queue(q) if image_thread and image_thread.is_alive(): put_queue(image_queue, (2, "stop"), timeout=1) image_thread.join(120) logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id) put_queue(self._fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value, self._analyse_type, ExceptionType.NO_RESOURCES.value[0], ExceptionType.NO_RESOURCES.value[1])) break continue full_timeout = None frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3, w_2, h_2, request_id) if frame is None: logger.info("总帧数: {}, 当前帧数: {}, requestId: {}", all_frames, concurrent_frame, request_id) clear_pull_p(pull_p, request_id) if len(frame_list) > 0: put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=2, is_ex=False) frame_list, frame_index_list = [], [] # 允许100帧的误差 if concurrent_frame < all_frames - 100: logger.info("离线拉流异常结束:requestId: {}", request_id) raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0], ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1]) logger.info("离线拉流线程结束, requestId: {}", request_id) break frame_list.append(frame) frame_index_list.append(concurrent_frame) if len(frame_list) >= frame_num: put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True) frame_list, frame_index_list = [], [] concurrent_frame += 1 del frame except ServiceException as s: logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id) ex = s.code, s.msg except Exception: logger.error("实时拉流异常: {}, requestId:{}", format_exc(), request_id) ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] finally: clear_pull_p(pull_p, request_id) del frame_list, frame_index_list if ex_status: if ex: code, msg = ex r = put_queue_result(pull_queue, (1, code, msg), timeout=10) else: r = put_queue_result(pull_queue, (2,), timeout=10) if r: c_time = time() while time() - c_time < 180: command_msg = get_no_block_queue(command_queue) if command_msg is not None: if 'stop' == command_msg.get("command"): logger.info("开始停止离线拉流进程, requestId:{}", request_id) if image_thread and image_thread.is_alive(): put_queue(image_queue, (2, "stop"), timeout=1) logger.info("停止图片上传线程, requestId:{}", request_id) image_thread.join(120) logger.info("停止图片上传线程结束, requestId:{}", request_id) break for q in [command_queue, pull_queue, image_queue]: clear_queue(q) if image_thread and image_thread.is_alive(): put_queue(image_queue, (2, "stop"), timeout=1) logger.info("停止图片上传线程, requestId:{}", request_id) image_thread.join(120) logger.info("停止图片上传线程结束, requestId:{}", request_id) logger.info("离线拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}", image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)