# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor from multiprocessing import Process from os import getpid from os.path import join from time import time, sleep from traceback import format_exc import cv2 import numpy as np import psutil from loguru import logger from enums.ExceptionEnum import ExceptionType from exception.CustomerException import ServiceException from util import ImageUtils from util.Cv2Utils import video_conjuncing, write_or_video, write_ai_video, push_video_stream, close_all_p from util.ImageUtils import url2Array, add_water_pic from util.LogUtils import init_log from util.PlotsUtils import draw_painting_joint, filterBox, xywh2xyxy2 from util.QueUtil import get_no_block_queue, put_queue, clear_queue class PushStreamProcess2(Process): __slots__ = ("_msg", "_push_queue", "_image_queue", '_push_ex_queue', '_hb_queue', "_context") def __init__(self, *args): super().__init__() # 传参 self._msg, self._push_queue, self._image_queue, self._push_ex_queue, self._hb_queue, self._context = args def build_logo_url(self): logo = None if self._context["video"]["video_add_water"]: logo = self._msg.get("logo_url") if logo: logo = url2Array(logo, enable_ex=False) if logo is None: logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1) self._context["logo"] = logo class OnPushStreamProcess2(PushStreamProcess2): __slots__ = () def run(self): msg, context = self._msg, self._context self.build_logo_url() request_id = msg["request_id"] base_dir, env = context["base_dir"], context['env'] push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \ self._hb_queue orFilePath, aiFilePath, logo = context["orFilePath"], context["aiFilePath"], context["logo"] or_video_file, ai_video_file, push_p, push_url = None, None, None, msg["push_url"] service_timeout = int(context["service"]["timeout"]) + 120 frame_score = context["service"]["filter"]["frame_score"] ex = None ex_status = True try: init_log(base_dir, env) logger.info("开始启动推流进程!requestId:{}", request_id) with ThreadPoolExecutor(max_workers=3) as t: # 定义三种推流、写原视频流、写ai视频流策略 # 第一个参数时间, 第二个参数重试次数 p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0] start_time = time() minID = 0 maxID = 0 while True: # 检测推流执行超时时间 if time() - start_time > service_timeout: logger.error("推流超时, requestId: {}", request_id) raise ServiceException(ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[1]) # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己 if psutil.Process(getpid()).ppid() == 1: ex_status = False logger.info("推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id) for q in [push_queue, image_queue, push_ex_queue, hb_queue]: clear_queue(q) break # 获取推流的视频帧 push_r = get_no_block_queue(push_queue) if push_r is not None: # [(1, ...] 视频帧操作 # [(2, 操作指令)] 指令操作 if push_r[0] == 1: # 如果是多模型push_objs数组可能包含[模型1识别数组, 模型2识别数组, 模型3识别数组] frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1] # 处理每一帧图片 for i, frame in enumerate(frame_list): # 复制帧用来画图 copy_frame = frame.copy() # 所有问题记录字典 det_xywh, thread_p = {}, [] det_tmp = {} det_xywh2 = {} # [模型1识别数组, 模型2识别数组, 模型3识别数组] for s_det_list in push_objs: code, det_result = s_det_list[0], s_det_list[1][i] if len(det_result) > 0: font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"] rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"] for qs in det_result: box, score, cls = xywh2xyxy2(qs) if cls not in allowedList or score < frame_score: continue label_array, color = label_arrays[cls], rainbows[cls] rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config) thread_p.append(rr) if det_xywh.get(code) is None: det_xywh[code] = {} cd = det_xywh[code].get(cls) is_new = False if len(qs) == 8: trackId = qs[7] elif len(qs) == 5: trackId = qs[4] if trackId > minID: is_new = True if det_tmp.get(code) is None: det_tmp[code] = [cls] else: if not (cls in det_tmp[code]): det_tmp[code].append(cls) qs_tmp = [cls, box, score, label_array, color, is_new] if trackId > maxID: maxID = trackId if cd is None: det_xywh[code][cls] = [qs_tmp] else: det_xywh[code][cls].append(qs_tmp) minID = maxID if logo: frame = add_water_pic(frame, logo, request_id) copy_frame = add_water_pic(copy_frame, logo, request_id) if len(thread_p) > 0: for r in thread_p: r.result() frame_merge = video_conjuncing(frame, copy_frame) # 写原视频到本地 write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file, or_write_status, request_id) # 写识别视频到本地 write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, ai_write_status, request_id) push_p_result = t.submit(push_video_stream, frame_merge, push_p, push_url, p_push_status, request_id) if det_xywh: for index, (key, value) in enumerate(det_xywh.items()): for k in value.keys(): if (key in det_tmp.keys()) and (k in det_tmp[key]): det_xywh2[key] = {} det_xywh2[key][k] = det_xywh[key][k] if len(det_xywh2) > 0: put_queue(image_queue, (1, [det_xywh2, frame, frame_index_list[i], all_frames, draw_config["font_config"]])) push_p = push_p_result.result(timeout=60) ai_video_file = write_ai_video_result.result(timeout=60) or_video_file = write_or_video_result.result(timeout=60) # 接收停止指令 if push_r[0] == 2: if 'stop' == push_r[1]: logger.info("停止推流线程, requestId: {}", request_id) break if 'stop_ex' == push_r[1]: logger.info("停止推流线程, requestId: {}", request_id) ex_status = False break del push_r else: sleep(1) except ServiceException as s: logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id) ex = s.code, s.msg except Exception: logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id) ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] finally: # 关闭推流管, 原视频写对象, 分析视频写对象 close_all_p(push_p, or_video_file, ai_video_file, request_id) if ex: code, msg = ex put_queue(push_ex_queue, (1, code, msg), timeout=2) else: if ex_status: # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片 c_time = time() while time() - c_time < 60: if image_queue.qsize() == 0 or image_queue.empty(): break sleep(2) for q in [push_queue, image_queue, hb_queue]: clear_queue(q) logger.info("推流进程停止完成!requestId:{}", request_id) class OffPushStreamProcess2(PushStreamProcess2): __slots__ = () def run(self): self.build_logo_url() msg, context = self._msg, self._context request_id = msg["request_id"] base_dir, env = context["base_dir"], context['env'] push_queue, image_queue, push_ex_queue, hb_queue = self._push_queue, self._image_queue, self._push_ex_queue, \ self._hb_queue aiFilePath, logo = context["aiFilePath"], context["logo"] ai_video_file, push_p, push_url = None, None, msg["push_url"] service_timeout = int(context["service"]["timeout"]) + 120 frame_score = context["service"]["filter"]["frame_score"] ex = None ex_status = True try: init_log(base_dir, env) logger.info("开始启动离线推流进程!requestId:{}", request_id) with ThreadPoolExecutor(max_workers=2) as t: # 定义三种推流、写原视频流、写ai视频流策略 # 第一个参数时间, 第二个参数重试次数 p_push_status, ai_write_status = [0, 0], [0, 0] start_time = time() minID = 0 maxID = 0 while True: # 检测推流执行超时时间 if time() - start_time > service_timeout: logger.error("离线推流超时, requestId: {}", request_id) raise ServiceException(ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[0], ExceptionType.PUSH_STREAM_TIMEOUT_EXCEPTION.value[1]) # 系统由于各种问题可能会杀死内存使用多的进程, 自己杀掉自己 if psutil.Process(getpid()).ppid() == 1: ex_status = False logger.info("离线推流进程检测到父进程异常停止, 自动停止推流进程, requestId: {}", request_id) for q in [push_queue, image_queue, push_ex_queue, hb_queue]: clear_queue(q) break # 获取推流的视频帧 push_r = get_no_block_queue(push_queue) if push_r is not None: # [(1, ...] 视频帧操作 # [(2, 操作指令)] 指令操作 if push_r[0] == 1: frame_list, frame_index_list, all_frames, draw_config, push_objs = push_r[1] # 处理每一帧图片 for i, frame in enumerate(frame_list): if frame_index_list[i] % 300 == 0 and frame_index_list[i] <= all_frames: task_process = "%.2f" % (float(frame_index_list[i]) / float(all_frames)) put_queue(hb_queue, {"hb_value": task_process}, timeout=2) # 复制帧用来画图 copy_frame = frame.copy() # 所有问题记录字典 det_xywh, thread_p = {}, [] det_xywh2 = {} det_tmp = {} for s_det_list in push_objs: code, det_result = s_det_list[0], s_det_list[1][i] if len(det_result) > 0: font_config, allowedList = draw_config["font_config"], draw_config[code]["allowedList"] rainbows, label_arrays = draw_config[code]["rainbows"], draw_config[code]["label_arrays"] for qs in det_result: box, score, cls = xywh2xyxy2(qs) if cls not in allowedList or score < frame_score: continue label_array, color = label_arrays[cls], rainbows[cls] rr = t.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config) thread_p.append(rr) if det_xywh.get(code) is None: det_xywh[code] = {} cd = det_xywh[code].get(cls) is_new = False if len(qs) == 8: trackId = qs[7] elif len(qs) == 5: trackId = qs[4] if trackId > minID: is_new = True if det_tmp.get(code) is None: det_tmp[code] = [cls] else: if not (cls in det_tmp[code]): det_tmp[code].append(cls) qs_tmp = [cls, box, score, label_array, color, is_new] if trackId > maxID: maxID = trackId if cd is None: det_xywh[code][cls] = [qs_tmp] else: det_xywh[code][cls].append(qs_tmp) minID = maxID if logo: frame = add_water_pic(frame, logo, request_id) copy_frame = add_water_pic(copy_frame, logo, request_id) if len(thread_p) > 0: for r in thread_p: r.result() frame_merge = video_conjuncing(frame, copy_frame) # 写识别视频到本地 write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, ai_write_status, request_id) push_p_result = t.submit(push_video_stream, frame_merge, push_p, push_url, p_push_status, request_id) if det_xywh: for index, (key, value) in enumerate(det_xywh.items()): for k in value.keys(): if (key in det_tmp.keys()) and (k in det_tmp[key]): det_xywh2[key] = {} det_xywh2[key][k] = det_xywh[key][k] if len(det_xywh2) > 0: put_queue(image_queue, (1, [det_xywh2, frame, frame_index_list[i], all_frames, draw_config["font_config"]])) push_p = push_p_result.result(timeout=60) ai_video_file = write_ai_video_result.result(timeout=60) # 接收停止指令 if push_r[0] == 2: if 'stop' == push_r[1]: logger.info("停止推流线程, requestId: {}", request_id) break if 'stop_ex' == push_r[1]: logger.info("停止推流线程, requestId: {}", request_id) ex_status = False break del push_r else: sleep(1) except ServiceException as s: logger.error("推流进程异常:{}, requestId:{}", s.msg, request_id) ex = s.code, s.msg except Exception: logger.error("推流进程异常:{}, requestId:{}", format_exc(), request_id) ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1] finally: # 关闭推流管, 分析视频写对象 close_all_p(push_p, None, ai_video_file, request_id) if ex: code, msg = ex put_queue(push_ex_queue, (1, code, msg), timeout=2) else: if ex_status: # 关闭推流的时候, 等待1分钟图片队列处理完,如果1分钟内没有处理完, 清空图片队列, 丢弃没有上传的图片 c_time = time() while time() - c_time < 60: if image_queue.qsize() == 0 or image_queue.empty(): break sleep(2) for q in [push_queue, image_queue, hb_queue]: clear_queue(q) logger.info("推流进程停止完成!requestId:{}", request_id)