AIlib2/crowd.py

191 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
from PIL import Image
from crowdUtils.engine import standard_transforms,preprocess,postprocess,DictToObject,AnchorPointsf
from crowdUtils.models import build_model
from segutils import trtUtils2
import os,torch,cv2,time
import numpy as np
import warnings
import tensorrt as trt
from copy import deepcopy
warnings.filterwarnings('ignore')
class crowdModel(object):
def __init__(self, weights=None,
par={'mean':[0.485, 0.456, 0.406], 'std':[0.229, 0.224, 0.225],'threshold':0.5,
'modelPar':{'backbone':'vgg16_bn', 'gpu_id':0,'anchorFlag':False,'line':2,'width':None,'height':None , 'output_dir':'./output', 'row':2}
}
):
print('-'*20,par['modelPar'] )
self.mean = par['mean']
self.std =par['std']
self.width = par['modelPar']['width']
self.height = par['modelPar']['height']
self.minShape = par['input_profile_shapes'][0]
self.maxShape = par['input_profile_shapes'][2]
self.IOShapes0,self.IOShapes1 = [ None,None,None ],[ None,None,None ]
self.Oshapes0,self.Oshapes1 = [ None,None,None ], [ None,None,None ]
self.modelPar = DictToObject(par['modelPar'])
self.threshold = par['threshold']
self.device = 'cuda:0'
if weights.endswith('.engine') or weights.endswith('.trt'):
self.infer_type ='trt'
elif weights.endswith('.pth') or weights.endswith('.pt') :
self.infer_type ='pth'
else:
print('#########ERROR:',weights,': no registered inference type, exit')
sys.exit(0)
if self.infer_type=='trt':
logger = trt.Logger(trt.Logger.ERROR)
with open(weights, "rb") as f, trt.Runtime(logger) as runtime:
self.engine=runtime.deserialize_cuda_engine(f.read())# 输入trt本地文件返回ICudaEngine对象
#self.stream=cuda.Stream()
self.bindingNames=[ self.engine.get_binding_name(ib) for ib in range(len(self.engine)) ]
print('############load seg model trt success: ',weights,self.bindingNames)
self.inputs,self.outputs,self.bindings,self.stream=None,None,None,None
self.context = self.engine.create_execution_context()
elif self.infer_type=='pth':
#self.model = DirectionalPointDetector(3, self.par['depth_factor'], self.par['NUM_FEATURE_MAP_CHANNEL']).to(self.device)
self.model = build_model(self.modelPar)
checkpoint = torch.load(args.weight_path, map_location='cpu')
self.model.load_state_dict(checkpoint['model'])
self.model=self.model.to(self.device)
if not self.modelPar.anchorFlag:
if self.infer_type=='trt':
self.anchors = AnchorPointsf(pyramid_levels=[3,], strides=None, row=self.modelPar.row, line=self.modelPar.line,device='cpu' )
elif self.infer_type=='pth':
self.anchors = AnchorPointsf(pyramid_levels=[3,], strides=None, row=self.modelPar.row, line=self.modelPar.line ,device='cuda:0')
print('#########加载模型:',weights,' 类型:',self.infer_type)
def preprocess(self,img):
tmpImg = preprocess(img,mean=self.mean, std=self.std,minShape=self.minShape,maxShape=self.maxShape)
if self.infer_type=='pth':
tmpImg = torch.from_numpy(tmpImg)
tmpImg = torch.Tensor(tmpImg).unsqueeze(0)
elif self.infer_type=='trt':
#if not self.height:
chs, height, width= tmpImg.shape[0:3]
self.width, self.height = width,height
self.IOShapes1 = [ (1, chs, height, width ),(1, height//4*width//4,2),(1, height//4*width//4,2) ]
self.Oshapes1 = [ (1, height//4*width//4,2),(1, height//4*width//4,2) ]
tmpImg = tmpImg[np.newaxis,:,:,:]#CHW->NCHW
return tmpImg
def ms(self,t1,t0):
return '%.1f'%( (t1-t0)*1000 )
def eval(self,img):
time0 = time.time()
img_b = img.copy()
#print('-----line54:',img.shape)
samples = self.preprocess(img)
time1 = time.time()
if self.infer_type=='pth':
samples = samples.to(self.device)
elif self.infer_type=='trt' :
#print('##### line83: 决定是否申请 内存 ',self.IOShapes1, self.IOShapes0,self.IOShapes1==self.IOShapes0)
#if self.IOShapes1 != self.IOShapes0:
self.inputs,self.outputs,self.bindings,self.stream = trtUtils2.allocate_buffers(self.engine,self.IOShapes1)
#print('##### line96: 开辟新内存成功 ' ,self.height,self.width)
self.IOShapes0=deepcopy(self.IOShapes1)
time2 = time.time()
if not self.modelPar.anchorFlag:
self.anchor_points = self.anchors.eval(samples)
if self.infer_type=='pth':
# run inference
self.model.eval()
with torch.no_grad():
outputs = self.model(samples)
outputs['pred_points'] = outputs['pred_points'] + self.anchor_points
#print('###line64:',outputs.keys(), outputs['pred_points'].shape, outputs['pred_logits'].shape)
elif self.infer_type=='trt':
outputs = trtUtils2.trt_inference( samples,self.height,self.width,self.context,self.inputs,self.outputs,self.bindings,self.stream,input_name = self.bindingNames[0])
for i in range(len(self.Oshapes1)):
outputs[i] = torch.from_numpy( np.reshape(outputs[i],self.Oshapes1[i]))
outputs={'pred_points':outputs[0], 'pred_logits':outputs[1]}
#print('###line117:',outputs.keys(), outputs['pred_points'].shape, outputs['pred_logits'].shape)
outputs['pred_points'] = outputs['pred_points'] + self.anchor_points
time3 = time.time()
points,scores = self.postprocess(outputs)
time4 = time.time()
infos = 'precess:%s datacopy:%s infer:%s post:%s'%( self.ms(time1,time0) , self.ms(time2,time1), self.ms(time3,time2), self.ms(time4,time3) )
p2 = self.toOBBformat(points,scores,cls=0 )
presults=[ img_b, points,p2 ]
return presults, infos
def postprocess(self,outputs):
return postprocess(outputs,threshold=self.threshold)
def toOBBformat(self,points,scores,cls=0):
outs = []
for i in range(len(points)):
pt,score = points[i],scores[i]
pts4=[pt]*4
ret = [ pts4,score,cls]
outs.append(ret)
return outs
def main():
par={'mean':[0.485, 0.456, 0.406], 'std':[0.229, 0.224, 0.225],'threshold':0.5, 'output_dir':'./output','input_profile_shapes':[(1,3,256,256),(1,3,1024,1024),(1,3,2048,2048)],'modelPar':{'backbone':'vgg16_bn', 'gpu_id':0,'anchorFlag':False, 'width':None,'height':None ,'line':2, 'row':2}
}
weights='weights/best_mae_dynamic.engine'
#weights='weights/best_mae.pth'
cmodel = crowdModel(weights,par)
img_path = "./testImages"
File = os.listdir(img_path)
targetList = []
for file in File[0:]:
COORlist = []
imgPath = img_path + os.sep + file
img_raw = np.array(Image.open(imgPath).convert('RGB') )
points, infos = cmodel.eval(img_raw)
print(file,infos,img_raw.shape)
img_to_draw = cv2.cvtColor(np.array(img_raw), cv2.COLOR_RGB2BGR)
# 打印预测图像中人头的个数
for p in points:
img_to_draw = cv2.circle(img_to_draw, (int(p[0]), int(p[1])), 2, (0, 255, 0), -1)
COORlist.append((int(p[0]), int(p[1])))
# 将各测试图像中的人头坐标存储在targetList中, 格式:[[(x1, y1),(x2, y2),...], [(X1, Y1),(X2, Y2),..], ...]
targetList.append(COORlist)
time.sleep(2)
# 保存预测图片
cv2.imwrite(os.path.join(par['output_dir'], file), img_to_draw)
#print(targetList )
if __name__ == '__main__':
par = {'backbone':'vgg16_bn', 'gpu_id':0, 'line':2, 'output_dir':'./output', 'row':2,'anchorFlag':False, 'weight_path':'./weights/best_mae.pth'}
args = DictToObject(par)
targetList = main()
print("line81", targetList)