import argparse from PIL import Image from crowdUtils.engine import standard_transforms,preprocess,postprocess,DictToObject,AnchorPointsf from crowdUtils.models import build_model from segutils import trtUtils2 import os,torch,cv2,time import numpy as np import warnings import tensorrt as trt from copy import deepcopy warnings.filterwarnings('ignore') class crowdModel(object): def __init__(self, weights=None, par={'mean':[0.485, 0.456, 0.406], 'std':[0.229, 0.224, 0.225],'threshold':0.5, 'modelPar':{'backbone':'vgg16_bn', 'gpu_id':0,'anchorFlag':False,'line':2,'width':None,'height':None , 'output_dir':'./output', 'row':2} } ): print('-'*20,par['modelPar'] ) self.mean = par['mean'] self.std =par['std'] self.width = par['modelPar']['width'] self.height = par['modelPar']['height'] self.minShape = par['input_profile_shapes'][0] self.maxShape = par['input_profile_shapes'][2] self.IOShapes0,self.IOShapes1 = [ None,None,None ],[ None,None,None ] self.Oshapes0,self.Oshapes1 = [ None,None,None ], [ None,None,None ] self.modelPar = DictToObject(par['modelPar']) self.threshold = par['threshold'] self.device = 'cuda:0' if weights.endswith('.engine') or weights.endswith('.trt'): self.infer_type ='trt' elif weights.endswith('.pth') or weights.endswith('.pt') : self.infer_type ='pth' else: print('#########ERROR:',weights,': no registered inference type, exit') sys.exit(0) if self.infer_type=='trt': logger = trt.Logger(trt.Logger.ERROR) with open(weights, "rb") as f, trt.Runtime(logger) as runtime: self.engine=runtime.deserialize_cuda_engine(f.read())# 输入trt本地文件,返回ICudaEngine对象 #self.stream=cuda.Stream() self.bindingNames=[ self.engine.get_binding_name(ib) for ib in range(len(self.engine)) ] print('############load seg model trt success: ',weights,self.bindingNames) self.inputs,self.outputs,self.bindings,self.stream=None,None,None,None self.context = self.engine.create_execution_context() elif self.infer_type=='pth': #self.model = DirectionalPointDetector(3, self.par['depth_factor'], self.par['NUM_FEATURE_MAP_CHANNEL']).to(self.device) self.model = build_model(self.modelPar) checkpoint = torch.load(args.weight_path, map_location='cpu') self.model.load_state_dict(checkpoint['model']) self.model=self.model.to(self.device) if not self.modelPar.anchorFlag: if self.infer_type=='trt': self.anchors = AnchorPointsf(pyramid_levels=[3,], strides=None, row=self.modelPar.row, line=self.modelPar.line,device='cpu' ) elif self.infer_type=='pth': self.anchors = AnchorPointsf(pyramid_levels=[3,], strides=None, row=self.modelPar.row, line=self.modelPar.line ,device='cuda:0') print('#########加载模型:',weights,' 类型:',self.infer_type) def preprocess(self,img): tmpImg = preprocess(img,mean=self.mean, std=self.std,minShape=self.minShape,maxShape=self.maxShape) if self.infer_type=='pth': tmpImg = torch.from_numpy(tmpImg) tmpImg = torch.Tensor(tmpImg).unsqueeze(0) elif self.infer_type=='trt': #if not self.height: chs, height, width= tmpImg.shape[0:3] self.width, self.height = width,height self.IOShapes1 = [ (1, chs, height, width ),(1, height//4*width//4,2),(1, height//4*width//4,2) ] self.Oshapes1 = [ (1, height//4*width//4,2),(1, height//4*width//4,2) ] tmpImg = tmpImg[np.newaxis,:,:,:]#CHW->NCHW return tmpImg def ms(self,t1,t0): return '%.1f'%( (t1-t0)*1000 ) def eval(self,img): time0 = time.time() img_b = img.copy() #print('-----line54:',img.shape) samples = self.preprocess(img) time1 = time.time() if self.infer_type=='pth': samples = samples.to(self.device) elif self.infer_type=='trt' : #print('##### line83: 决定是否申请 内存 ',self.IOShapes1, self.IOShapes0,self.IOShapes1==self.IOShapes0) #if self.IOShapes1 != self.IOShapes0: self.inputs,self.outputs,self.bindings,self.stream = trtUtils2.allocate_buffers(self.engine,self.IOShapes1) #print('##### line96: 开辟新内存成功 ' ,self.height,self.width) self.IOShapes0=deepcopy(self.IOShapes1) time2 = time.time() if not self.modelPar.anchorFlag: self.anchor_points = self.anchors.eval(samples) if self.infer_type=='pth': # run inference self.model.eval() with torch.no_grad(): outputs = self.model(samples) outputs['pred_points'] = outputs['pred_points'] + self.anchor_points #print('###line64:',outputs.keys(), outputs['pred_points'].shape, outputs['pred_logits'].shape) elif self.infer_type=='trt': outputs = trtUtils2.trt_inference( samples,self.height,self.width,self.context,self.inputs,self.outputs,self.bindings,self.stream,input_name = self.bindingNames[0]) for i in range(len(self.Oshapes1)): outputs[i] = torch.from_numpy( np.reshape(outputs[i],self.Oshapes1[i])) outputs={'pred_points':outputs[0], 'pred_logits':outputs[1]} #print('###line117:',outputs.keys(), outputs['pred_points'].shape, outputs['pred_logits'].shape) outputs['pred_points'] = outputs['pred_points'] + self.anchor_points time3 = time.time() points,scores = self.postprocess(outputs) time4 = time.time() infos = 'precess:%s datacopy:%s infer:%s post:%s'%( self.ms(time1,time0) , self.ms(time2,time1), self.ms(time3,time2), self.ms(time4,time3) ) p2 = self.toOBBformat(points,scores,cls=0 ) presults=[ img_b, points,p2 ] return presults, infos def postprocess(self,outputs): return postprocess(outputs,threshold=self.threshold) def toOBBformat(self,points,scores,cls=0): outs = [] for i in range(len(points)): pt,score = points[i],scores[i] pts4=[pt]*4 ret = [ pts4,score,cls] outs.append(ret) return outs def main(): par={'mean':[0.485, 0.456, 0.406], 'std':[0.229, 0.224, 0.225],'threshold':0.5, 'output_dir':'./output','input_profile_shapes':[(1,3,256,256),(1,3,1024,1024),(1,3,2048,2048)],'modelPar':{'backbone':'vgg16_bn', 'gpu_id':0,'anchorFlag':False, 'width':None,'height':None ,'line':2, 'row':2} } weights='weights/best_mae_dynamic.engine' #weights='weights/best_mae.pth' cmodel = crowdModel(weights,par) img_path = "./testImages" File = os.listdir(img_path) targetList = [] for file in File[0:]: COORlist = [] imgPath = img_path + os.sep + file img_raw = np.array(Image.open(imgPath).convert('RGB') ) points, infos = cmodel.eval(img_raw) print(file,infos,img_raw.shape) img_to_draw = cv2.cvtColor(np.array(img_raw), cv2.COLOR_RGB2BGR) # 打印预测图像中人头的个数 for p in points: img_to_draw = cv2.circle(img_to_draw, (int(p[0]), int(p[1])), 2, (0, 255, 0), -1) COORlist.append((int(p[0]), int(p[1]))) # 将各测试图像中的人头坐标存储在targetList中, 格式:[[(x1, y1),(x2, y2),...], [(X1, Y1),(X2, Y2),..], ...] targetList.append(COORlist) time.sleep(2) # 保存预测图片 cv2.imwrite(os.path.join(par['output_dir'], file), img_to_draw) #print(targetList ) if __name__ == '__main__': par = {'backbone':'vgg16_bn', 'gpu_id':0, 'line':2, 'output_dir':'./output', 'row':2,'anchorFlag':False, 'weight_path':'./weights/best_mae.pth'} args = DictToObject(par) targetList = main() print("line81", targetList)