# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor from os.path import join from threading import Thread from traceback import format_exc import cv2 import numpy as np from loguru import logger from util.Cv2Utils import write_or_video, write_ai_video, push_video_stream, close_all_p, video_conjuncing from util.ImageUtils import url2Array, add_water_pic from util.PlotsUtils import draw_painting_joint from util.QueUtil import put_queue class OnPushStreamThread(Thread): __slots__ = ('_msg', '_push_queue', '_context', 'ex', '_logo', '_image_queue') def __init__(self, *args): super().__init__() # 传参 self._msg, self._push_queue, self._image_queue, self._context = args # 自带参数 self.ex = None self._logo = None if self._context["video"]["video_add_water"]: self._logo = self._msg.get("logo_url") if self._logo: self._logo = url2Array(self._logo, enable_ex=False) if not self._logo: self._logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1) def run(self): request_id, push_queue, image_queue = self._msg.get("request_id"), self._push_queue, self._image_queue orFilePath, aiFilePath, logo = self._context.get("orFilePath"), self._context.get("aiFilePath"), self._logo or_video_file, ai_video_file, push_p = None, None, None push_url = self._msg.get("push_url") try: logger.info("开始启动推流线程!requestId:{}", request_id) with ThreadPoolExecutor(max_workers=2) as t: p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0] while True: push_parm = push_queue.get() if push_parm is not None: # [(1, 原视频帧, 分析视频帧)] # # [视频帧、当前帧数、 总帧数、 [(问题数组、code、allowedList、label_arraylist、rainbows)]] # res = (1, (pull_frame[1], pull_frame[2], pull_frame[3], [])) # [(2, 操作指令)] if push_parm[0] == 1: # 视频帧操作 frame, current_frame, all_frames, ques_list = push_parm[1] copy_frame = frame.copy() det_xywh = {} if len(ques_list) > 0: for qs in ques_list: det_xywh[qs[1]] = {} detect_targets_code = int(qs[0][0]) score = qs[0][-1] label_array = qs[3][detect_targets_code] color = qs[4][detect_targets_code] if not isinstance(qs[0][1], (list, tuple, np.ndarray)): xc, yc, x2, y2 = int(qs[0][1]), int(qs[0][2]), int(qs[0][3]), int(qs[0][4]) box = [(xc, yc), (x2, yc), (x2, y2), (xc, y2)] else: box = qs[0][1] draw_painting_joint(box, copy_frame, label_array, score, color, "leftTop") cd = det_xywh[qs[1]].get(detect_targets_code) if cd is None: det_xywh[qs[1]][detect_targets_code] = [ [detect_targets_code, box, score, label_array, color]] else: det_xywh[qs[1]][detect_targets_code].append( [detect_targets_code, box, score, label_array, color]) if logo: frame = add_water_pic(frame, logo, request_id) copy_frame = add_water_pic(copy_frame, logo, request_id) frame_merge = video_conjuncing(frame, copy_frame) # 写原视频到本地 write_or_video_result = t.submit(write_or_video, frame, orFilePath, or_video_file, or_write_status, request_id) # 写识别视频到本地 write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, ai_write_status, request_id) if len(det_xywh) > 0: put_queue(image_queue, (1, (det_xywh, frame, current_frame, all_frames))) push_p = push_video_stream(frame_merge, push_p, push_url, p_push_status, request_id) ai_video_file = write_ai_video_result.result() or_video_file = write_or_video_result.result() if push_parm[0] == 2: if 'stop' == push_parm[1]: logger.info("停止推流线程, requestId: {}", request_id) close_all_p(push_p, or_video_file, ai_video_file, request_id) or_video_file, ai_video_file, push_p = None, None, None break except Exception as e: logger.error("推流线程异常:{}, requestId:{}", format_exc(), request_id) self.ex = e finally: close_all_p(push_p, or_video_file, ai_video_file, request_id) logger.info("推流线程停止完成!requestId:{}", request_id) class OffPushStreamThread(Thread): __slots__ = ('_msg', '_push_queue', '_context', 'ex', '_logo', '_image_queue') def __init__(self, *args): super().__init__() # 传参 self._msg, self._push_queue, self._image_queue, self._context = args # 自带参数 self.ex = None self._logo = None if self._context["video"]["video_add_water"]: self._logo = self._msg.get("logo_url") if self._logo: self._logo = url2Array(self._logo, enable_ex=False) if not self._logo: self._logo = cv2.imread(join(self._context['base_dir'], "image/logo.png"), -1) def run(self): request_id, push_queue, image_queue = self._msg.get("request_id"), self._push_queue, self._image_queue aiFilePath, logo = self._context.get("aiFilePath"), self._logo ai_video_file, push_p = None, None push_url = self._msg.get("push_url") try: logger.info("开始启动推流线程!requestId:{}", request_id) with ThreadPoolExecutor(max_workers=1) as t: p_push_status, or_write_status, ai_write_status = [0, 0], [0, 0], [0, 0] while True: push_parm = push_queue.get() if push_parm is not None: # [(1, 原视频帧, 分析视频帧)] # # [视频帧、当前帧数、 总帧数、 [(问题数组、code、allowedList、label_arraylist、rainbows)]] # res = (1, (pull_frame[1], pull_frame[2], pull_frame[3], [])) # [(2, 操作指令)] if push_parm[0] == 1: # 视频帧操作 frame, current_frame, all_frames, ques_list = push_parm[1] copy_frame = frame.copy() det_xywh = {} if len(ques_list) > 0: for qs in ques_list: det_xywh[qs[1]] = {} detect_targets_code = int(qs[0][0]) score = qs[0][-1] label_array = qs[3][detect_targets_code] color = qs[4][detect_targets_code] if not isinstance(qs[0][1], (list, tuple, np.ndarray)): xc, yc, x2, y2 = int(qs[0][1]), int(qs[0][2]), int(qs[0][3]), int(qs[0][4]) box = [(xc, yc), (x2, yc), (x2, y2), (xc, y2)] else: box = qs[0][1] draw_painting_joint(box, copy_frame, label_array, score, color, "leftTop") cd = det_xywh[qs[1]].get(detect_targets_code) if cd is None: det_xywh[qs[1]][detect_targets_code] = [ [detect_targets_code, box, score, label_array, color]] else: det_xywh[qs[1]][detect_targets_code].append( [detect_targets_code, box, score, label_array, color]) if logo: frame = add_water_pic(frame, logo, request_id) copy_frame = add_water_pic(copy_frame, logo, request_id) frame_merge = video_conjuncing(frame, copy_frame) # 写识别视频到本地 write_ai_video_result = t.submit(write_ai_video, frame_merge, aiFilePath, ai_video_file, ai_write_status, request_id) if len(det_xywh) > 0: put_queue(image_queue, (1, (det_xywh, frame, current_frame, all_frames))) push_p = push_video_stream(frame_merge, push_p, push_url, p_push_status, request_id) ai_video_file = write_ai_video_result.result() if push_parm[0] == 2: if 'stop' == push_parm[1]: logger.info("停止推流线程, requestId: {}", request_id) close_all_p(push_p, None, ai_video_file, request_id) ai_video_file, push_p = None, None break except Exception as e: logger.error("推流线程异常:{}, requestId:{}", format_exc(), request_id) self.ex = e finally: close_all_p(push_p, None, ai_video_file, request_id) logger.info("推流线程停止完成!requestId:{}", request_id)