AIlib2/obbUtils/shipUtils.py

527 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import torch
import numpy as np
import cv2
import time
import os
import sys
sys.path.extend(['../AIlib2/obbUtils'])
import matplotlib.pyplot as plt
import func_utils
import time
import torchvision.transforms as transforms
from obbmodels import ctrbox_net
import decoder
import tensorrt as trt
import onnx
import onnxruntime as ort
sys.path.extend(['../AIlib2/utils'])
#sys.path.extend(['../AIlib2/utils'])
from plots import draw_painting_joint
from copy import deepcopy
from scipy import interpolate
def obbTohbb(obb):
obbarray=np.array(obb)
x0=np.min(obbarray[:,0])
x1=np.max(obbarray[:,0])
y0=np.min(obbarray[:,1])
y1=np.max(obbarray[:,1])
return [x0,y0,x1,y1]
def trt_version():
return trt.__version__
def torch_device_from_trt(device):
if device == trt.TensorLocation.DEVICE:
return torch.device("cuda")
elif device == trt.TensorLocation.HOST:
return torch.device("cpu")
else:
return TypeError("%s is not supported by torch" % device)
def torch_dtype_from_trt(dtype):
if dtype == trt.int8:
return torch.int8
elif trt_version() >= '7.0' and dtype == trt.bool:
return torch.bool
elif dtype == trt.int32:
return torch.int32
elif dtype == trt.float16:
return torch.float16
elif dtype == trt.float32:
return torch.float32
else:
raise TypeError("%s is not supported by torch" % dtype)
def segTrtForward(engine,inputs,contextFlag=False):
if not contextFlag: context = engine.create_execution_context()
else: context=contextFlag
#with engine.create_execution_context() as context:
#input_names=['images'];output_names=['output']
namess=[ engine.get_binding_name(index) for index in range(engine.num_bindings) ]
input_names = [namess[0]];output_names=namess[1:]
batch_size = inputs[0].shape[0]
bindings = [None] * (len(input_names) + len(output_names))
# 创建输出tensor并分配内存
outputs = [None] * len(output_names)
for i, output_name in enumerate(output_names):
idx = engine.get_binding_index(output_name)#通过binding_name找到对应的input_id
dtype = torch_dtype_from_trt(engine.get_binding_dtype(idx))#找到对应的数据类型
shape = (batch_size,) + tuple(engine.get_binding_shape(idx))#找到对应的形状大小
device = torch_device_from_trt(engine.get_location(idx))
output = torch.empty(size=shape, dtype=dtype, device=device)
#print('&'*10,'batch_size:',batch_size , 'device:',device,'idx:',idx,'shape:',shape,'dtype:',dtype,' device:',output.get_device())
outputs[i] = output
#print('###line65:',output_name,i,idx,dtype,shape)
bindings[idx] = output.data_ptr()#绑定输出数据指针
for i, input_name in enumerate(input_names):
idx =engine.get_binding_index(input_name)
bindings[idx] = inputs[0].contiguous().data_ptr()#应当为inputs[i]对应3个输入。但由于我们使用的是单张图片所以将3个输入全设置为相同的图片。
#print('#'*10,'input_names:,', input_name,'idx:',idx, inputs[0].dtype,', inputs[0] device:',inputs[0].get_device())
context.execute_v2(bindings) # 执行推理
if len(outputs) == 1:
outputs = outputs[0]
return outputs[0]
else:
return outputs
def apply_mask(image, mask, alpha=0.5):
"""Apply the given mask to the image.
"""
color = np.random.rand(3)
for c in range(3):
image[:, :, c] = np.where(mask == 1,
image[:, :, c] *
(1 - alpha) + alpha * color[c] * 255,
image[:, :, c])
return image
if not os.path.exists('output'):
os.mkdir('output')
saveDir = 'output'
def get_ms(t2,t1):
return (t2-t1)*1000.0
def draw_painting_joint_2(box,img,label_array,score=0.5,color=None,font={ 'line_thickness':None,'boxLine_thickness':None, 'fontSize':None},socre_location="leftTop"):
###先把中文类别字体赋值到img中
lh, lw, lc = label_array.shape
imh, imw, imc = img.shape
if socre_location=='leftTop':
x0 , y1 = box[0][0],box[0][1]
elif socre_location=='leftBottom':
x0,y1=box[3][0],box[3][1]
else:
print('plot.py line217 ,label_location:%s not implemented '%( socre_location ))
sys.exit(0)
x1 , y0 = x0 + lw , y1 - lh
if y0<0:y0=0;y1=y0+lh
if y1>imh: y1=imh;y0=y1-lh
if x0<0:x0=0;x1=x0+lw
if x1>imw:x1=imw;x0=x1-lw
img[y0:y1,x0:x1,:] = label_array
pts_cls=[(x0,y0),(x1,y1) ]
#把四边形的框画上
box_tl= font['boxLine_thickness'] or round(0.002 * (imh + imw) / 2) + 1
cv2.polylines(img, [box], True,color , box_tl)
####把英文字符score画到类别旁边
tl = font['line_thickness'] or round(0.002*(imh+imw)/2)+1#line/font thickness
label = ' %.2f'%(score)
tf = max(tl , 1) # font thickness
fontScale = font['fontSize'] or tl * 0.33
t_size = cv2.getTextSize(label, 0, fontScale=fontScale , thickness=tf)[0]
#if socre_location=='leftTop':
p1,p2= (pts_cls[1][0], pts_cls[0][1]),(pts_cls[1][0]+t_size[0],pts_cls[1][1])
cv2.rectangle(img, p1 , p2, color, -1, cv2.LINE_AA)
p3 = pts_cls[1][0],pts_cls[1][1]-(lh-t_size[1])//2
cv2.putText(img, label,p3, 0, fontScale, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
return img
def OBB_infer(model,ori_image,par):
'''
输出:[img_origin,ori_image, out_box,9999],infos
img_origin---原图
ori_image---画框图
out_box---检测目标框
---格式如下[ [ [ (x0,y0),(x1,y1),(x2,y2),(x3,y3) ],score cls ], [ [ (x0,y0),(x1,y1),(x2,y2),(x3,y3) ],score cls ],........ ],etc
---[ [ [(1159, 297), [922, 615], [817, 591], [1054, 272]], 0.86560535430908214],
[[(1330, 0), [1289, 58], [1228, 50], [1270, 0]], 0.392808765172958414] #2023.08.03,修改输出格式
]
9999---无意义,备用
'''
t1 = time.time()
#ori_image = cv2.imread(impth+folders[i])
t2 = time.time()
img= cv2.resize(ori_image, (par['model_size']))
img_origin = ori_image.copy()
t3 = time.time()
transf2 = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean=par['mean'], std=par['std'])])
img_tensor = transf2(img)
img_tensor1=img_tensor.unsqueeze(0) #转成了需要的tensor格式中心归一化及颜色通道匹配上
t4=time.time()
#print('###line170: resize:%.1f ToTensor-Normal-Destd:%.1f '%(get_ms(t3,t2),get_ms(t4,t3) ), img_origin.shape,img_tensor1.size() )
#img_tensor1= img_tensor1.to( par['device']) #布置到cuda上
img_tensor1 = img_tensor1.cuda()
t5 =time.time()
img_tensor1 = img_tensor1.half() if par['half'] else img_tensor1
if par['saveType']=='trt':
preds= segTrtForward(model,[img_tensor1])
preds=[x[0] for x in preds ]
pr_decs={}
heads=list(par['heads'].keys())
pr_decs={ heads[i]: preds[i] for i in range(len(heads)) }
elif par['saveType']=='pth':
with torch.no_grad(): # no Back propagation
pr_decs = model(img_tensor1) # 前向传播一部分
elif par['saveType']=='onnx':
img=img_tensor1.cpu().numpy().astype(np.float32)
preds = model['sess'].run(None, {model['input_name']: img})
pr_decs={}
heads=list(par['heads'].keys())
pr_decs={ heads[i]: torch.from_numpy(preds[i]) for i in range(len(heads)) }
t6 = time.time()
category=par['labelnames']
#torch.cuda.synchronize(par['device']) # 时间异步变同步
decoded_pts = []
decoded_scores = []
predictions = par['decoder'].ctdet_decode(pr_decs) # 解码
t6_1=time.time()
pts0, scores0 = func_utils.decode_prediction(predictions, category,par['model_size'], par['down_ratio'],ori_image) # 改3
decoded_pts.append(pts0)
decoded_scores.append(scores0)
t7 = time.time()
# nms
results = {cat: [] for cat in category}
# '''这里啊
for cat in category:
if cat == 'background':
continue
pts_cat = []
scores_cat = []
for pts0, scores0 in zip(decoded_pts, decoded_scores):
pts_cat.extend(pts0[cat])
scores_cat.extend(scores0[cat])
pts_cat = np.asarray(pts_cat, np.float32)
scores_cat = np.asarray(scores_cat, np.float32)
if pts_cat.shape[0]:
nms_results = func_utils.non_maximum_suppression(pts_cat, scores_cat)
results[cat].extend(nms_results)
t8 = time.time()
height, width, _ = ori_image.shape
# nms
out_box=[]
for cat in category:
if cat == 'background':
continue
result = results[cat]
for pred in result:
score = pred[-1]
cls = category.index(cat)
boxF=[ max(int(x),0) for x in pred[0:8]]
#box_out=[ cls,[ ( boxF[0], boxF[1]),([boxF[2], boxF[3]]), ([boxF[4], boxF[5]]), ([boxF[6], boxF[7]]) ],score]
box_out=[ [ ( boxF[0], boxF[1]),([boxF[2], boxF[3]]), ([boxF[4], boxF[5]]), ([boxF[6], boxF[7]]) ],score,cls]
'''
if par['drawBox']:
tl = np.asarray([pred[0], pred[1]], np.float32)
tr = np.asarray([pred[2], pred[3]], np.float32)
br = np.asarray([pred[4], pred[5]], np.float32)
bl = np.asarray([pred[6], pred[7]], np.float32)
box = np.asarray([tl, tr, br, bl], np.int32)
bgColor=par['rainbows'][cls%len( par['rainbows'])]
label_array =par['label_array'][cls]
font=par['digitWordFont']
label_location=font['label_location']
ori_image=draw_painting_joint(box,ori_image,label_array,score=score,color=bgColor,font=font,socre_location=label_location)
'''
out_box.append(box_out)
t9 = time.time()
t10 = time.time()
infos=' preProcess:%.1f ToGPU:%.1f infer:%.1f decoder:%.1f, corr_change:%.1f nms:%.1f postProcess:%.1f, total process:%.1f '%( get_ms(t4,t2), get_ms(t5,t4),get_ms(t6,t5),get_ms(t6_1,t6),get_ms(t7,t6_1),get_ms(t8,t7) ,get_ms(t9,t8) ,get_ms(t9,t2) )
#'preProcess:%.1f ToGPU:%.1f infer:%.1f decoder:%.1f, corr_change:%.1f nms:%.1f postProcess:%.1f, total process:%.1f '%
#( get_ms(t4,t2), get_ms(t5,t4),get_ms(t6,t5),get_ms(t6_1,t6),get_ms(t7,t6_1), get_ms(t8,t7) ,get_ms(t9,t8) , get_ms(t9,t2) )
if len(out_box) > 0:
ret_4pts = np.array([ x[0] for x in out_box ] )
ret_4pts = rectangle_quadrangle_batch (ret_4pts)
cnt = len(out_box )
for ii in range(cnt):
out_box[ii][0] = ret_4pts[ii]
return [img_origin,ori_image, out_box,9999],infos
def draw_obb(preds,ori_image,par):
for pred in preds:
box = np.asarray(pred[0][0:4],np.int32)
cls = int(pred[2]);score = pred[1]
bgColor=par['rainbows'][cls%len( par['rainbows'])]
label_array =par['label_array'][cls]
font=par['digitWordFont']
label_location=font['label_location']
#print('###line285:',box,cls,score)
ori_image=draw_painting_joint(box,ori_image,label_array,score=score,color=bgColor,font=font,socre_location=label_location)
#cv2.imwrite( 'test.jpg',ori_image )
return ori_image
def OBB_tracker(sort_tracker,hbbs,obbs,iframe):
#sort_tracker--跟踪器
#hbbs--目标的水平框[x0,y0,x1,y1]
#obbs--目标的倾斜框box = np.asarray([tl, tr, br, bl], np.int32)
#返回值sort_tracker跟踪器
dets_to_sort = np.empty((0,7), dtype=np.float32)
# NOTE: We send in detected object class too
for x1,y1,x2,y2,conf, detclass in hbbs:
#print('#######line342:',x1,y1,x2,y2,img.shape,[x1, y1, x2, y2, conf, detclass,iframe])
dets_to_sort = np.vstack((dets_to_sort,
np.array([x1, y1, x2, y2, conf, detclass,iframe],dtype=np.float32) ))
# Run SORT
tracked_dets = deepcopy(sort_tracker.update(dets_to_sort,obbs) )
return tracked_dets
def rectangle_quadrangle(vectors):
##输入的是四个点偏离中心点的向量,(M,4,2)
##输出vectors--修正后的向量M,4,2
# wh_thetas--矩形的向量 (M,1,3)[w,h,theta]
distans = np.sqrt(np.sum(vectors**2,axis=2))#(M,4)
mean_dis = np.mean( distans,axis=1 ).reshape(-1,1) #(M,1)
mean_dis = np.tile(mean_dis,(1,4) ) #(M,4)
scale_factors = mean_dis/distans #(M,4)
scale_factors = np.expand_dims(scale_factors, axis=2 ) #(M,4,1)
scale_factors = np.tile(scale_factors, (1,1,2) ) #M(M,4,2)
vectors = vectors*scale_factors
vectors = vectors.astype(np.int32)
cnt = vectors.shape[0]
boxes = [ cv2.minAreaRect( vectors[i] ) for i in range(cnt) ]
wh_thetas = [[x[1][0],x[1][1],x[2] ] for x in boxes]#(M,3),[w,h,theta]
wh_thetas = np.array(wh_thetas)##(M,3)
return vectors,wh_thetas
def adjust_pts_orders(vectors):
#输入一系列(M,4,2)点
#输入原定框顺序的(M,4,2)
#前后两个四边形框一次判定,调整下一个四边形框内四个点的顺序,保证与上一个一致。
cnt = vectors.shape[0]
if cnt<=1: return vectors
else:
out=[];out.append(vectors[0])
for i in range(1,cnt):
pts1 = out[-1]
pts2 = vectors[i]
diss,min_dis,min_index,pts2_adjust = pts_setDistance(pts1,pts2)
#if min_index!=0: print(min_index,pts1,pts2 )
out.append(pts2_adjust)
out = np.array(out)
#if out[4,0,0]==53 and out[4,0,1]==10:
#print('#line339:',out.shape ,' ','in ', vectors.reshape(-1,8) , ' out :',out.reshape(-1,8))
return out
def pts_setDistance(pts1,pts2):
#输入是两个四边形的坐标4,2pts1保持不变pts2逐个调整顺序找到与pts2最匹配的四个点。
#输出pts2 原始的距离,最匹配点的距离,最匹配的点的序号
pts3=np.vstack((pts2,pts2))
diss =[np.sum((pts1-pts3[i:i+4])**2) for i in range(4)]
min_dis = min(diss)
min_index = diss.index(min_dis)
return diss[0],min_dis,min_index,pts3[min_index:min_index+4]
def obbPointsConvert(obbs):
obbArray = np.array(obbs)#( M,4,2)
#计算中心点
middlePts = np.mean( obbArray,axis=1 )##中心点M,2
middlePts = np.expand_dims(middlePts,axis=1)#(M,1,2)
#将中心点扩展成M,4,2
vectors = np.tile(middlePts,(1,4,1))#(M,4,2)
#计算偏移向量
vectors = obbArray - vectors #(M,4,2)
##校正偏移向量
vectors,wh_thetas=rectangle_quadrangle(vectors) #vectors--(M,4,2)
##校正每一个框内四个点的顺序
vectors = adjust_pts_orders(vectors) # (M,4,2)
#将中心点附在偏移向量后面
vectors = np.concatenate( (vectors,middlePts),axis=1 )#(M,5,2),
#将数据拉平
vectors = vectors.reshape(-1,10)#(M,10)
return vectors
def rectangle_quadrangle_batch(obbs):
##输入出四边形的四个点(M,4,2)
##输出是矩形话后的4个点M,4,2
obbArray = np.array(obbs)#( M,4,2)
#计算中心点
middlePts = np.mean( obbArray,axis=1 )##中心点M,2
middlePts = np.expand_dims(middlePts,axis=1)#(M,1,2)
#将中心点扩展成M,4,2
middlePts = np.tile(middlePts,(1,4,1))#(M,4,2)
#vectors = np.tile(middlePts,(1,4,1))#(M,4,2)
#计算偏移向量
vectors = obbArray - middlePts #(M,4,2)
##校正偏移向量
vectors,wh_thetas=rectangle_quadrangle(vectors) #vectors--(M,4,2)
vectors = vectors + middlePts
return vectors
def obbPointsConvert_reverse(vectors):
vectors = np.array(vectors)#(M,10)
_vectors = vectors[:,:8] #(M,8)
middlePts = vectors[:,8:10] #(M,2)
middlePts = np.tile( middlePts,(1,4) ) #(M,8)
_vectors += middlePts #(M,8)
return _vectors
def OBB_tracker_batch(imgarray_list,iframe_list,modelPar,obbModelPar,sort_tracker,trackPar,segPar=None):
'''
输入:
imgarray_list--图像列表
iframe_list -- 帧号列表
modelPar--模型参数,字典,modelPar={'det_Model':,'seg_Model':}
obbModelpar--字典,存放检测相关参数,'half', 'device', 'conf_thres', 'iou_thres','trtFlag_det'
sort_tracker--对象,初始化的跟踪对象。为了保持一致,即使是单帧也要有。
trackPar--跟踪参数关键字包括det_cntwindowsize
segPar--None,分割模型相关参数。如果用不到则为None
输入:[imgarray_list,track_det_result,detResults ] , timeInfos
# timeInfos---时间信息
# imgarray_list--图像列表
# track_det_result--numpy 格式(M,14)--( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 , 11, 12, 13 )
# (x0,y0,x1,y1,x2,y2,x3,y3,xc,yc,conf, detclass,iframe, trackId)
# detResults---给DSP的结果.每一帧是一个list内部每一个框时一个list格式为[ [(x0,y0),(x1,y1),(x2,y2),(x3,y3)],score,cls ] 2023.08.03,修改输出格式
'''
det_cnt,windowsize = trackPar['det_cnt'] ,trackPar['windowsize']
trackers_dic={}
index_list = list(range( 0, len(iframe_list) ,det_cnt ));
if len(index_list)>1 and index_list[-1]!= iframe_list[-1]:
index_list.append( len(iframe_list) - 1 )
#print('###line349:',index_list ,iframe_list)
if len(imgarray_list)==1: #如果是单帧图片,则不用跟踪
ori_image_list,infos = OBB_infer(modelPar['obbmodel'],imgarray_list[0],obbModelPar)
#print('##'*20,'line405:',np.array(ori_image_list[2]),ret_4pts )
return ori_image_list,infos
else:
timeInfos_track=''
t1=time.time()
for iframe_index, index_frame in enumerate(index_list):
ori_image_list,infos = OBB_infer(modelPar['obbmodel'],imgarray_list[index_frame],obbModelPar)
obbs = [x[0] for x in ori_image_list[2] ];hbbs = []
for i in range(len(ori_image_list[2])):
hbb=obbTohbb( ori_image_list[2][i][0] );
box=[ *hbb, ori_image_list[2][i][1],ori_image_list[2][i][2]]
hbbs.append(box)
tracked_dets = OBB_tracker(sort_tracker,hbbs,obbs,iframe_list[index_frame] )
tracks =sort_tracker.getTrackers()
tt=[tracker.id for tracker in tracks]
for tracker in tracks:
trackers_dic[tracker.id]=deepcopy(tracker)
t2=time.time()
track_det_result = np.empty((0,14))
###( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 , 11, 12, 13 )
###(x0,y0,x1,y1,x2,y2,x3,y3,xc,yc,conf, detclass,iframe, trackId)
trackIdIndex=13;frameIndex=12
#print('###line372:',list(trackers_dic.keys()))
for trackId in trackers_dic.keys():
tracker = trackers_dic[trackId]
obb_history = np.array(tracker.obb_history)
hbb_history = np.array(tracker.bbox_history)
#print('#'*20,obb_history.shape )
if len(obb_history)<2:
#print('#'*20, trackId, ' trace Cnt:',len(obb_history))
continue
#原来格式 np.asarray([tl, tr, br, bl], np.int32)--->中心点到tl, tr, br, bl的向量
#print('###line381: 插值转换前 obb_history:',obb_history.shape, ' trackId:',trackId, ' \n' ,obb_history.reshape(-1,8) )
obb_history = obbPointsConvert(obb_history) #(M,10)
#print('###line381: 插值前 obb_history:',obb_history.shape , ' hbb_history[:,4:7]:',hbb_history[:,4:7].shape, ' trackId:',trackId,'\n',obb_history)
arrays_box = np.concatenate( (obb_history,hbb_history[:,4:7]),axis=1)
arrays_box = arrays_box.transpose();frames=hbb_history[:,6]
#frame_min--表示该批次图片的起始帧,如该批次是[1,100],则frame_min=1[101,200]--frame_min=101
#frames[0]--表示该目标出现的起始帧,如[1,11,21,31,41],则frames[0]=1frames[0]可能会在frame_min之前出现即一个横跨了多个批次。
##如果要最小化插值范围,则取内区间[frame_min,则frame_max ]和[frames[0],frames[-1] ]的交集
#inter_frame_min = int(max(frame_min, frames[0])); inter_frame_max = int(min( frame_max, frames[-1] )) ##
##如果要求得到完整的目标轨迹,则插值区间要以目标出现的起始点为准
inter_frame_min=int(frames[0]);inter_frame_max=int(frames[-1])
new_frames= np.linspace(inter_frame_min,inter_frame_max,inter_frame_max-inter_frame_min+1 )
#print('###line389:',trackId, inter_frame_min,inter_frame_max ,frames)
#print(' ##line396: 插值前:' ,arrays_box)
f_linear = interpolate.interp1d(frames,arrays_box); interpolation_x0s = (f_linear(new_frames)).transpose()
move_cnt_use =(len(interpolation_x0s)+1)//2*2-1 if len(interpolation_x0s)<windowsize else windowsize
###将坐标tl, tr, br, bl的向量--->[tl, tr, br, bl]
interpolation_x0s[:,0:8] = obbPointsConvert_reverse(interpolation_x0s[:,0:10] )
#print('##line403: 插值转换后: ',interpolation_x0s.shape, inter_frame_min,inter_frame_max,frames, '\n',interpolation_x0s )
#for im in range(10):
# interpolation_x0s[:,im] = moving_average_wang(interpolation_x0s[:,im],move_cnt_use )
cnt = inter_frame_max-inter_frame_min+1; trackIds = np.zeros((cnt,1)) + trackId
interpolation_x0s = np.hstack( (interpolation_x0s, trackIds ) )
track_det_result = np.vstack(( track_det_result, interpolation_x0s) )
detResults=[]
for iiframe in iframe_list:
boxes_oneFrame = track_det_result[ track_det_result[:,frameIndex]==iiframe ]
###( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 , 11, 12, 13 )
###(x0,y0,x1,y1,x2,y2,x3,y3,xc,yc,conf, detclass,iframe, trackId)
res = [ [ [(b[0],b[1]),(b[2],b[3]),(b[4],b[5]),(b[6],b[7])],b[10],b[11],b[12],b[13] ]
for b in boxes_oneFrame]
detResults.append( res )
t3 = time.time()
timeInfos='%d frames,detect and track:%.1f ,interpolation:%.1f '%( len(index_list), get_ms(t2,t1),get_ms(t3,t2) )
retResults=[imgarray_list,track_det_result,detResults ]
return retResults, timeInfos