''' 这个版本增加了船舶过滤功能 ''' import time import sys from core.models.bisenet import BiSeNet from models.AIDetector_pytorch import Detector from models.AIDetector_pytorch import plot_one_box,Colors from utils.postprocess_utils import center_coordinate,fourcorner_coordinate,remove_simivalue,remove_sameeleme_inalist import os os.environ['CUDA_VISIBLE_DEVICES'] = '1' from models.model_stages import BiSeNet import cv2 import torch import torch.nn.functional as F from PIL import Image import numpy as np import torchvision.transforms as transforms from utils.segutils import colour_code_segmentation from utils.segutils import get_label_info os.environ['KMP_DUPLICATE_LIB_OK']='TRUE' os.environ["CUDA_VISIBLE_DEVICES"] = "0" sys.path.append("../") # 为了导入上级目录的,添加一个新路径 def AI_postprocess(preds,_mask_cv,pars,_img_cv): '''考虑船上人过滤''' '''输入:落水人员的结果(类别+坐标)、原图、mask图像 过程:获得mask的轮廓,判断人员是否在轮廓内。 在,则保留且绘制;不在,舍弃。 返回:最终绘制的结果图、最终落水人员(坐标、类别、置信度), ''' '''1、最大分割水域作为判断依据''' zoom_factor=4 #缩小因子设置为4,考虑到numpy中分别遍历xy进行缩放耗时大。 original_height = _mask_cv.shape[0] original_width=_mask_cv.shape[1] zoom_height=int(original_height/zoom_factor) zoom_width=int(original_width/zoom_factor) _mask_cv = cv2.resize(_mask_cv, (zoom_width,zoom_height)) #缩小原图,宽在前,高在后 t4 = time.time() img_gray = cv2.cvtColor(_mask_cv, cv2.COLOR_BGR2GRAY) if len(_mask_cv.shape)==3 else _mask_cv # t5 = time.time() contours, thresh = cv2.threshold(img_gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # 寻找轮廓(多边界) contours, hierarchy = cv2.findContours(thresh, cv2.RETR_LIST, 2) contour_info = [] for c in contours: contour_info.append(( c, cv2.isContourConvex(c), cv2.contourArea(c), )) contour_info = sorted(contour_info, key=lambda c: c[2], reverse=True) t6 = time.time() '''新增模块::如果水域为空,则返回原图、无落水人员等。''' if contour_info==[]: # final_img=_img_cv final_head_person_filterwater=[] timeInfos=0 # return final_img, final_head_person_filterwater return final_head_person_filterwater,timeInfos else: max_contour = contour_info[0] max_contour1=max_contour[0] max_contour_X=max_contour1[0][0][:] max_contour=max_contour[0]*zoom_factor# contours恢复原图尺寸 # max_contour=max_contour[0]*zoom_factor# contours恢复原图尺寸 print(max_contour) t7 = time.time() '''2.1、preds中head+person取出,boat取出。''' init_head_person=[] init_boat = [] for i in range(len(preds)): if preds[i][4]=='head' or preds[i][4]=='person': init_head_person.append(preds[i]) else: init_boat.append(preds[i]) t8 = time.time() '''新增模块:2.2、preds中head+person取出,过滤掉head与person中指向同一人的部分,保留同一人的person标签。''' init_head=[] init_person=[] #head与person标签分开 for i in range(len(init_head_person)): if init_head_person[i][4]=='head': init_head.append(init_head_person[i]) else: init_person.append(init_head_person[i]) # person的框形成contours person_contour=[] for i in range(len(init_person)): boundbxs_temp=[init_person[i][0],init_person[i][1],init_person[i][2],init_person[i][3]] contour_temp_person=fourcorner_coordinate(boundbxs_temp) #得到person预测框的顺序contour contour_temp_person=np.array(contour_temp_person) contour_temp_person=np.float32(contour_temp_person) person_contour.append(np.array(contour_temp_person)) # head是否在person的contours内,在说明是同一人,过滤掉。 list_head=[] for i in range(len(init_head)): for j in range(len(person_contour)): center_x, center_y=center_coordinate(init_head[i]) flag = cv2.pointPolygonTest(person_contour[j], (center_x, center_y), False) #若为False,会找点是否在内,外,或轮廓上(相应返回+1, -1, 0)。 if flag==1: pass else: list_head.append(init_head[i]) # person和最终head合并起来 init_head_person_temp=init_person+list_head '''3、preds中head+person,通过1中水域过滤''' init_head_person_filterwater=init_head_person_temp final_head_person_filterwater=[] for i in range(len(init_head_person_filterwater)): center_x, center_y=center_coordinate(init_head_person_filterwater[i]) flag = cv2.pointPolygonTest(max_contour, (center_x, center_y), False) #若为False,会找点是否在内,外,或轮廓上(相应返回+1, -1, 0)。 if flag==1: final_head_person_filterwater.append(init_head_person_filterwater[i]) else: pass t9 = time.time() '''4、水域过滤后的head+person,再通过船舶范围过滤''' init_head_person_filterboat=final_head_person_filterwater # final_head_person_filterboat=[] #获取船舶范围 boat_contour=[] for i in range(len(init_boat)): boundbxs1=[init_boat[i][0],init_boat[i][1],init_boat[i][2],init_boat[i][3]] contour_temp=fourcorner_coordinate(boundbxs1) #得到boat预测框的顺序contour contour_temp_=np.array(contour_temp) contour_temp_=np.float32(contour_temp_) boat_contour.append(np.array(contour_temp_)) t10 = time.time() # 遍历船舶范围,取出在船舶范围内的head和person(可能有重复元素) list_headperson_inboat=[] for i in range(len(init_head_person_filterboat)): for j in range(len(boat_contour)): center_x, center_y=center_coordinate(init_head_person_filterboat[i]) # yyyyyyyy=boat_contour[j] flag = cv2.pointPolygonTest(boat_contour[j], (center_x, center_y), False) #若为False,会找点是否在内,外,或轮廓上(相应返回+1, -1, 0)。 if flag==1: list_headperson_inboat.append(init_head_person_filterboat[i]) else: pass print('list_headperson_inboat',list_headperson_inboat) if len(list_headperson_inboat)==0: pass else: list_headperson_inboat=remove_sameeleme_inalist(list_headperson_inboat) #将重复嵌套列表元素删除 # 过滤船舶范围内的head和person final_head_person_filterboat=remove_simivalue(init_head_person_filterboat,list_headperson_inboat) t11 = time.time() '''5、输出最终落水人员,并绘制保存检测图''' colors = Colors() if final_head_person_filterwater is not None: for i in range(len(final_head_person_filterboat)): # lbl = self.names[int(cls_id)] lbl = final_head_person_filterboat[i][4] xyxy=[final_head_person_filterboat[i][0],final_head_person_filterboat[i][1],final_head_person_filterboat[i][2],final_head_person_filterboat[i][3]] c = int(5) plot_one_box(xyxy, _img_cv, label=lbl, color=colors(c, True), line_thickness=3) final_img=_img_cv t12 = time.time() # cv2.imwrite('final_result.png', _img_cv) t13 = time.time() print('存图:%s, 过滤标签:%s ,遍历船舶范围:%s,水域过滤后的head+person:%s,水域过滤:%s,head+person、boat取出:%s,新增如果水域为空:%s,找contours:%s,图像改变:%s' %((t13-t12) * 1000,(t12-t11) * 1000,(t11-t10) * 1000,(t10-t9) * 1000,(t9-t8) * 1000,(t8-t7) * 1000,(t7-t6) * 1000,(t6-t5) * 1000,(t5-t4) * 1000 ) ) timeInfos=('存图:%s, 过滤标签:%s ,遍历船舶范围:%s,水域过滤后的head+person:%s,水域过滤:%s,head+person、boat取出:%s,新增如果水域为空:%s,找contours:%s,图像改变:%s' %((t13-t12) * 1000,(t12-t11) * 1000,(t11-t10) * 1000,(t10-t9) * 1000,(t9-t8) * 1000,(t8-t7) * 1000,(t7-t6) * 1000,(t6-t5) * 1000,(t5-t4) * 1000 ) ) return final_head_person_filterwater,timeInfos #返回最终绘制的结果图、最终落水人员(坐标、类别、置信度) def AI_process(model, segmodel, args1,path1): '''对原图进行目标检测和水域分割''' '''输入:检测模型、分割模型、配置参数、路径 返回:返回目标检测结果、原图像、分割图像, ''' '''检测图片''' t21=time.time() _img_cv = cv2.imread(path1) # 将这里的送入yolov5 t22 = time.time() # _img_cv=_img_cv.numpy() pred = model.detect(_img_cv) # 检测结果 #对pred处理,处理成list嵌套 pred=[[*x[0:4],x[4],x[5].cpu().tolist()] for x in pred[1]] # pred=[[x[0],*x[1:5],x[5].cpu().float()] for x in pred[1]] print('pred', pred) t23 = time.time() '''分割图片''' img = Image.open(path1).convert('RGB') t231 = time.time() transf1 = transforms.ToTensor() transf2 = transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) imgs = transf1(img) imgs = transf2(imgs) print(path1) # numpy数组格式为(H,W,C) size = [360, 640] imgs = imgs.unsqueeze(0) imgs = imgs.cuda() N, C, H, W = imgs.size() self_scale = 360 / H new_hw = [int(H * self_scale), int(W * self_scale)] print("line50", new_hw) imgs = F.interpolate(imgs, new_hw, mode='bilinear', align_corners=True) t24 = time.time() with torch.no_grad(): logits = segmodel(imgs)[0] t241 = time.time() logits = F.interpolate(logits, size=size, mode='bilinear', align_corners=True) probs = torch.softmax(logits, dim=1) preds = torch.argmax(probs, dim=1) preds_squeeze = preds.squeeze(0) preds_squeeze_predict = colour_code_segmentation(np.array(preds_squeeze.cpu()), args1['label_info']) preds_squeeze_predict = cv2.resize(np.uint8(preds_squeeze_predict), (W, H)) predict_mask = cv2.cvtColor(np.uint8(preds_squeeze_predict), cv2.COLOR_RGB2BGR) _mask_cv =predict_mask t25 = time.time() cv2.imwrite('seg_result.png', _mask_cv) t26 = time.time() print('存分割图:%s, 分割后处理:%s ,分割推理:%s ,分割图变小:%s,分割图读图:%s,检测模型推理:%s,读图片:%s' %((t26-t25) * 1000,(t25-t241) * 1000,(t241-t24) * 1000,(t24-t231) * 1000,(t231-t23) * 1000,(t23-t22) * 1000,(t22-t21) * 1000 ) ) return pred, _img_cv, _mask_cv #返回目标检测结果、原图像、分割图像 def main(): '''配置参数''' label_info = get_label_info('utils/class_dict.csv') pars={'cuda':'0','crop_size':512,'input_dir':'input_dir','output_dir':'output_dir','workers':16,'label_info':label_info, 'dspth':'./data/','backbone':'STDCNet813','use_boundary_2':False, 'use_boundary_4':False, 'use_boundary_8':True, 'use_boundary_16':False,'use_conv_last':False} dete_weights='weights/best_luoshui20230608.pt' '''分割模型权重路径''' seg_weights = 'weights/model_final.pth' '''初始化目标检测模型''' model = Detector(dete_weights) '''初始化分割模型2''' n_classes = 2 segmodel = BiSeNet(backbone=pars['backbone'], n_classes=n_classes, use_boundary_2=pars['use_boundary_2'], use_boundary_4=pars['use_boundary_4'], use_boundary_8=pars['use_boundary_8'], use_boundary_16=pars['use_boundary_16'], use_conv_last=pars['use_conv_last']) segmodel.load_state_dict(torch.load(seg_weights)) segmodel.cuda() segmodel.eval() '''图像测试''' folders = os.listdir(pars['input_dir']) for i in range(len(folders)): path1 = pars['input_dir'] + '/' + folders[i] t1=time.time() '''对原图进行目标检测和水域分割''' pred, _img_cv, _mask_cv=AI_process(model,segmodel, pars,path1) t2 = time.time() '''进入后处理,判断水域内有落水人员''' haha,zzzz=AI_postprocess(pred, _mask_cv,pars,_img_cv ) t3 = time.time() print('总时间分布:前处理t2-t1,后处理t3-t2',(t2-t1)*1000,(t3-t2)*1000) if __name__ == "__main__": main()