# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from os.path import join from threading import Thread from traceback import format_exc import cv2 import numpy as np from loguru import logger from util.Cv2Utils import write_or_video, write_ai_video, push_video_stream, close_all_p, video_conjuncing from util.ImageUtils import url2Array, add_water_pic from util.PlotsUtils import draw_painting_joint from util.QueUtil import put_queue class OnPushStreamThread2(Thread): __slots__ = ('_msg', '_push_queue', '_context', 'ex', '_logo', '_image_queue') def __init__(self, *args): super().__init__() # 传参 self._msg, self._push_queue, self._image_queue, self._context = args # 自带参数 self.ex = None self._logo = None if self._context["video"]["video_add_water"]: self._logo = self._msg.get("logo_url") if self._logo: self._logo = url2Array(self._logo, enable_ex=False) if not self._logo: self._logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1) def run(self): request_id, push_queue, image_queue = self._msg.get("request_id"), self._push_queue, self._image_queue orFilePath, aiFilePath, logo = self._context.get("orFilePath"), self._context.get("aiFilePath"), self._logo or_video_file, ai_video_file, push_p = None, None, None push_url = self._msg.get("push_url") try: logger.info("开始启动推流线程!requestId:{}", request_id) with ThreadPoolExecutor(max_workers=2) as t: with ThreadPoolExecutor(max_workers=5) as tt: p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0] while True: push_r = push_queue.get() if push_r is not None: # [(1, 原视频帧, 分析视频帧)] # [(code, retResults[2])] # [(2, 操作指令)] if push_r[0] == 1: # 视频帧操作 frame_list, frame_index_list, all_frames = push_r[1] allowedList, rainbows, label_arrays, font_config = push_r[2] for i, frame in enumerate(frame_list): copy_frame = frame.copy() det_xywh = {} # 每帧可能存在多模型,多模型问题处理 thread_p = [] for det in push_r[3]: code, retResults = det det_xywh[code] = {} # 如果识别到了检测目标 if len(retResults[i]) > 0: for qs in retResults[i]: detect_targets_code = int(qs[6]) if detect_targets_code not in allowedList: logger.warning("当前检测目标不在检测目标中: {}, requestId: {}", detect_targets_code, request_id) continue score = qs[5] label_array = label_arrays[detect_targets_code] color = rainbows[detect_targets_code] if not isinstance(qs[1], (list, tuple, np.ndarray)): xc, yc, x2, y2 = int(qs[1]), int(qs[2]), int(qs[3]), int(qs[4]) box = [(xc, yc), (x2, yc), (x2, y2), (xc, y2)] else: box = qs[1] # box, img, label_array, score=0.5, color=None, config=None dp = tt.submit(draw_painting_joint, box, copy_frame, label_array, score, color, font_config) thread_p.append(dp) cd = det_xywh[code].get(detect_targets_code) if cd is None: det_xywh[code][detect_targets_code] = [ [detect_targets_code, box, score, label_array, color]] else: det_xywh[code][detect_targets_code].append( [detect_targets_code, box, score, label_array, color]) if logo: frame = add_water_pic(frame, logo, request_id) copy_frame = add_water_pic(copy_frame, logo, request_id) if len(thread_p) > 0: completed_results = wait(thread_p, timeout=60, return_when=ALL_COMPLETED) completed_futures = completed_results.done for r in completed_futures: if r.exception(): raise r.exception() frame_merge = video_conjuncing(frame, copy_frame) # 写原视频到本地 write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file, or_write_status, request_id) # 写识别视频到本地 write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, ai_write_status, request_id) if len(det_xywh) > 0: put_queue(image_queue, (1, (det_xywh, frame, frame_index_list[i], all_frames, font_config))) push_p = push_video_stream(frame_merge, push_p, push_url, p_push_status, request_id) ai_video_file = write_ai_video_result.result() or_video_file = write_or_video_result.result() if push_r[0] == 2: if 'stop' == push_r[1]: logger.info("停止推流线程, requestId: {}", request_id) close_all_p(push_p, or_video_file, ai_video_file, request_id) or_video_file, ai_video_file, push_p = None, None, None break except Exception as e: logger.error("推流线程异常:{}, requestId:{}", format_exc(), request_id) self.ex = e finally: close_all_p(push_p, or_video_file, ai_video_file, request_id) logger.info("推流线程停止完成!requestId:{}", request_id) # class OffPushStreamThread(Thread): # __slots__ = ('_msg', '_push_queue', '_context', 'ex', '_logo', '_image_queue') # # def __init__(self, *args): # super().__init__() # # 传参 # self._msg, self._push_queue, self._image_queue, self._context = args # # 自带参数 # self.ex = None # self._logo = None # if self._context["video"]["video_add_water"]: # self._logo = self._msg.get("logo_url") # if self._logo: # self._logo = url2Array(self._logo, enable_ex=False) # if not self._logo: # self._logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1) # # def run(self): # request_id, push_queue, image_queue = self._msg.get("request_id"), self._push_queue, self._image_queue # aiFilePath, logo = self._context.get("aiFilePath"), self._logo # ai_video_file, push_p = None, None # push_url = self._msg.get("push_url") # try: # logger.info("开始启动推流线程!requestId:{}", request_id) # with ThreadPoolExecutor(max_workers=1) as t: # p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0] # while True: # push_parm = push_queue.get() # if push_parm is not None: # # [(1, 原视频帧, 分析视频帧)] # # # [视频帧、当前帧数、 总帧数、 [(问题数组、code、allowedList、label_arraylist、rainbows)]] # # res = (1, (pull_frame[1], pull_frame[2], pull_frame[3], [])) # # [(2, 操作指令)] # if push_parm[0] == 1: # 视频帧操作 # frame, current_frame, all_frames, ques_list = push_parm[1] # copy_frame = frame.copy() # det_xywh = {} # if len(ques_list) > 0: # for qs in ques_list: # det_xywh[qs[1]] = {} # detect_targets_code = int(qs[0][0]) # score = qs[0][-1] # label_array = qs[3][detect_targets_code] # color = qs[4][detect_targets_code] # if not isinstance(qs[0][1], (list, tuple, np.ndarray)): # xc, yc, x2, y2 = int(qs[0][1]), int(qs[0][2]), int(qs[0][3]), int(qs[0][4]) # box = [(xc, yc), (x2, yc), (x2, y2), (xc, y2)] # else: # box = qs[0][1] # draw_painting_joint(box, copy_frame, label_array, score, color, "leftTop") # cd = det_xywh[qs[1]].get(detect_targets_code) # if cd is None: # det_xywh[qs[1]][detect_targets_code] = [ # [detect_targets_code, box, score, label_array, color]] # else: # det_xywh[qs[1]][detect_targets_code].append( # [detect_targets_code, box, score, label_array, color]) # if logo: # frame = add_water_pic(frame, logo, request_id) # copy_frame = add_water_pic(copy_frame, logo, request_id) # frame_merge = video_conjuncing(frame, copy_frame) # # 写识别视频到本地 # write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, # ai_write_status, request_id) # if len(det_xywh) > 0: # put_queue(image_queue, (1, (det_xywh, frame, current_frame, all_frames))) # push_p = push_video_stream(frame_merge, push_p, push_url, p_push_status, request_id) # ai_video_file = write_ai_video_result.result() # if push_parm[0] == 2: # if 'stop' == push_parm[1]: # logger.info("停止推流线程, requestId: {}", request_id) # close_all_p(push_p, None, ai_video_file, request_id) # ai_video_file, push_p = None, None # break # except Exception as e: # logger.error("推流线程异常:{}, requestId:{}", format_exc(), request_id) # self.ex = e # finally: # close_all_p(push_p, None, ai_video_file, request_id) # logger.info("推流线程停止完成!requestId:{}", request_id)