import copy import subprocess as sp from enum import Enum, unique from PIL import Image import time import cv2 import sys sys.path.extend(['..','../AIlib' ]) from AI import AI_process, AI_process_forest, get_postProcess_para import cv2,os,time from segutils.segmodel import SegModel from models.experimental import attempt_load from utils.torch_utils import select_device from utilsK.queRiver import get_labelnames,get_label_arrays import numpy as np import torch from utilsK.masterUtils import get_needed_objectsIndex # 异常枚举 @unique class ModelType(Enum): WATER_SURFACE_MODEL = ("1", "001", "水面模型") FOREST_FARM_MODEL = ("2", "002", "森林模型") TRAFFIC_FARM_MODEL = ("3", "003", "交通模型") def checkCode(code): for model in ModelType: if model.value[1] == code: return True return False class ModelConfig(): def __init__(self): postFile = '../AIlib/conf/para.json' self.conf_thres, self.iou_thres, self.classes, self.rainbows = get_postProcess_para(postFile) class SZModelConfig(ModelConfig): def __init__(self): super(SZModelConfig, self).__init__() labelnames = "../AIlib/weights/yolov5/class8/labelnames.json" ##对应类别表 self.names = get_labelnames(labelnames) self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40, fontpath="../AIlib/conf/platech.ttf") class LCModelConfig(ModelConfig): def __init__(self): super(LCModelConfig, self).__init__() labelnames = "../AIlib/weights/forest/labelnames.json" self.names = get_labelnames(labelnames) self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40, fontpath="../AIlib/conf/platech.ttf") class RFModelConfig(ModelConfig): def __init__(self): super(RFModelConfig, self).__init__() labelnames = "../AIlib/weights/road/labelnames.json" self.names = get_labelnames(labelnames) imageW = 1536 outfontsize=int(imageW/1920*40) self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=outfontsize, fontpath="../AIlib/conf/platech.ttf") class Model(): def __init__(self, device, allowedList=None): ##预先设置的参数 self.device_ = device ##选定模型,可选 cpu,'0','1' self.allowedList = allowedList # 水面模型 class SZModel(Model): def __init__(self, device, allowedList=None): super().__init__(device, allowedList) self.device = select_device(self.device_) self.half = self.device.type != 'cpu' self.model = attempt_load("../AIlib/weights/yolov5/class8/bestcao.pt", map_location=self.device) if self.half: self.model.half() self.segmodel = SegModel(nclass=2, weights='../AIlib/weights/STDC/model_maxmIOU75_1720_0.946_360640.pth', device=self.device) # names, label_arraylist, rainbows, conf_thres, iou_thres def process(self, frame, config): return AI_process([frame], self.model, self.segmodel, config[0], config[1], config[2], self.half, self.device, config[3], config[4], self.allowedList) # 森林模型 class LCModel(Model): def __init__(self, device, allowedList=None): super().__init__(device, allowedList) self.device = select_device(self.device_) self.half = self.device.type != 'cpu' # half precision only supported on CUDA self.model = attempt_load("../AIlib/weights/forest/best.pt", map_location=self.device) # load FP32 model if self.half: self.model.half() self.segmodel = None # names, label_arraylist, rainbows, conf_thres, iou_thres def process(self, frame, config): return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2], self.half, self.device, config[3], config[4], self.allowedList) # 交通模型 class RFModel(Model): def __init__(self, device, allowedList=None): super().__init__(device, allowedList) self.device = select_device(self.device_) self.half = self.device.type != 'cpu' # half precision only supported on CUDA self.model = attempt_load("../AIlib/weights/road/best.pt", map_location=self.device) # load FP32 model if self.half: self.model.half() self.segmodel = None # names, label_arraylist, rainbows, conf_thres, iou_thres def process(self, frame, config): return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2], self.half, self.device, config[3], config[4], self.allowedList) def get_model(args): for model in args[2]: try: code = '001' needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")] if code == ModelType.WATER_SURFACE_MODEL.value[1]: return SZModel(args[1], needed_objectsIndex), code, args[0].get("sz") elif code == ModelType.FOREST_FARM_MODEL.value[1]: return LCModel(args[1], needed_objectsIndex), code, args[0].get("lc") elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]: return RFModel(args[1], needed_objectsIndex), code, args[0].get("rf") else: raise Exception("11111") except Exception as e: raise Exception("22222") class PictureWaterMark(): def common_water(self, image, logo): width, height = image.shape[1], image.shape[0] mark_width, mark_height = logo.shape[1], logo.shape[0] rate = int(width * 0.2) / mark_width logo_new = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST) position = (int(width * 0.95 - logo_new.shape[1]), int(height * 0.95 - logo_new.shape[0])) b = Image.new('RGBA', (width, height), (0, 0, 0, 0)) # 创建新图像:透明' a = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) watermark = Image.fromarray(cv2.cvtColor(logo_new, cv2.COLOR_BGRA2RGBA)) # 图片旋转 # watermark = watermark.rotate(45) b.paste(a, (0, 0)) b.paste(watermark, position, mask=watermark) return cv2.cvtColor(np.asarray(b), cv2.COLOR_BGR2RGB) def common_water_1(self, image, logo, alpha=1): h, w = image.shape[0], image.shape[1] if w >= h: rate = int(w * 0.1) / logo.shape[1] else: rate = int(h * 0.1) / logo.shape[0] mask = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST) mask_h, mask_w = mask.shape[0], mask.shape[1] mask_channels = cv2.split(mask) dst_channels = cv2.split(image) # b, g, r, a = cv2.split(mask) # 计算mask在图片的坐标 ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w)) dr_points = (int(h * 0.95), int(w - h * 0.05)) for i in range(3): dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] = dst_channels[i][ ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] * ( 255.0 - mask_channels[3] * alpha) / 255 dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] += np.array( mask_channels[i] * (mask_channels[3] * alpha / 255), dtype=np.uint8) dst_img = cv2.merge(dst_channels) return dst_img def video_merge(frame1, frame2, width, height): frameLeft = cv2.resize(frame1, (width, height), interpolation=cv2.INTER_LINEAR) frameRight = cv2.resize(frame2, (width, height), interpolation=cv2.INTER_LINEAR) frame_merge = np.hstack((frameLeft, frameRight)) # frame_merge = np.hstack((frame1, frame2)) return frame_merge cap = cv2.VideoCapture("/home/DATA/chenyukun/3.mp4") # Get video information fps = int(cap.get(cv2.CAP_PROP_FPS)) print(fps) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) print(width) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(height) # command = ['ffmpeg', # '-y', # 不经过确认,输出时直接覆盖同名文件。 # '-f', 'rawvideo', # '-vcodec', 'rawvideo', # '-pix_fmt', 'bgr24', # # '-s', "{}x{}".format(self.width * 2, self.height), # '-s', "{}x{}".format(width, height), # '-r', str(15), # '-i', '-', # 指定输入文件 # '-g', '15', # '-sc_threshold', '0', # 使得GOP的插入更加均匀 # '-b:v', '3000k', # 指定码率 # '-tune', 'zerolatency', # 加速编码速度 # '-c:v', 'libx264', # 指定视频编码器 # '-pix_fmt', 'yuv420p', # "-an", # '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, # # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 # '-f', 'flv', # "rtmp://live.push.t-aaron.com/live/THSAk"] # # # 管道配置 # p = sp.Popen(command, stdin=sp.PIPE, shell=False) sz = SZModelConfig() lc = LCModelConfig() rf = RFModelConfig() config = { "sz": (sz.names, sz.label_arraylist, sz.rainbows, sz.conf_thres, sz.iou_thres), "lc": (lc.names, lc.label_arraylist, lc.rainbows, lc.conf_thres, lc.iou_thres), "rf": (rf.names, rf.label_arraylist, rf.rainbows, rf.conf_thres, rf.iou_thres), } model = { "models": [ { "code": "001", "categories": [ { "id": "0", "config": {} }, { "id": "1", "config": {} }, { "id": "2", "config": {} }, { "id": "3", "config": {} }, { "id": "4", "config": {} }, { "id": "5", "config": {} }, { "id": "6", "config": {} }, { "id": "7", "config": {} } ] }] } mod, model_type_code, modelConfig = get_model((config, str(1), model.get("models"))) pic = PictureWaterMark() logo = cv2.imread("./image/logo.png", -1) ai_video_file = cv2.VideoWriter("/home/DATA/chenyukun/aa/1.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (width*2, height)) while(cap.isOpened()): start =time.time() ret, frame = cap.read() # cap.grab() if not ret: print("Opening camera is failed") break p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig) frame = pic.common_water_1(frame, logo) p_result[1] = pic.common_water_1(p_result[1], logo) frame_merge = video_merge(frame, p_result[1], width, height) ai_video_file.write(frame_merge) print(time.time()-start) ai_video_file.release() cap.release()