|
- from __future__ import print_function
-
- import os
- import numpy as np
- import matplotlib
- matplotlib.use('Agg')
- import matplotlib.pyplot as plt
- import matplotlib.patches as patches
- from skimage import io
-
- import glob
- import time,cv2
- import argparse
- from filterpy.kalman import KalmanFilter
- from PIL import Image,ImageDraw,ImageFont
-
- np.random.seed(0)
- '''
- def plot_one_box_ForTrack(x, im, color=None, label=None, line_thickness=3):
- # Plots one bounding box on image 'im' using OpenCV
- assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to plot_on_box() input image.'
- tl = line_thickness or round(0.002 * (im.shape[0] + im.shape[1]) / 2) + 1 # line/font thickness
- color = color or [random.randint(0, 255) for _ in range(3)]
- c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
- cv2.rectangle(im, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
-
- if label:
- tf = max(tl - 1, 1) # font thickness
- t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
- c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
- cv2.rectangle(im, c1, c2, color, -1, cv2.LINE_AA) # filled
- cv2.putText(im, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
- '''
- def plot_one_box_ForTrack(box, im, color=None, label=None, line_thickness=None):
- # Plots one bounding box on image 'im' using PIL
- im = Image.fromarray(im)
- draw = ImageDraw.Draw(im)
- line_thickness = line_thickness or max(int(min(im.size) / 200), 2)
-
- draw.rectangle([(box[0],box[1]),(box[2],box[3])], width=line_thickness, outline=tuple(color)) # plot
-
-
- if label:
- tmax = min(round(max(im.size) / 40),20)
- fontsize = max(tmax, 12)
- font = ImageFont.truetype("../AIlib2/conf/platech.ttf", fontsize,encoding='utf-8')
- txt_width, txt_height = font.getsize(label)
- draw.rectangle([box[0], box[1] - txt_height + 4, box[0] + txt_width, box[1]], fill=tuple(color))
- draw.text((box[0], box[1] - txt_height + 1), label, fill=(255, 255, 255), font=font)
-
- im_array = np.asarray(im)
-
- return im_array
-
- def drawBoxTraceSimplied(track_det_result,iiframe, img_draw,rainbows=None,boxFlag=True,traceFlag=True,names=[]):
-
- boxes_oneFrame = track_det_result[ track_det_result[:,6]==iiframe ]
-
- if boxFlag:
- ###在某一帧上,画上检测框
- for box in boxes_oneFrame:
- x0,y0,x1,y1,conf,cls = box[0:6]
-
- #cv2.rectangle(img_draw, ( int(x0), int(y0) ), ( int(x1), int(y1) ), (255,0,20), 2)
- if len(names)==0:
- txtstring='%d:%.2f'%(cls,conf)
- else: txtstring='%s:%.2f'%(names[int(cls)],conf)
- img_draw = plot_one_box_ForTrack( box[0:4], img_draw, color=rainbows[ int(cls)], label=txtstring, line_thickness=3)
-
- if traceFlag:
- ###在某一帧上,画上轨迹
- track_ids = boxes_oneFrame[:,7].tolist()
- boxes_before_oneFrame = track_det_result[ track_det_result[:,6]<=iiframe ]
- for trackId in track_ids:
- boxes_before_oneFrame_oneId = boxes_before_oneFrame[boxes_before_oneFrame[:,7]==trackId]
- xcs = (boxes_before_oneFrame_oneId[:,0]+boxes_before_oneFrame_oneId[:,2])//2
- ycs = (boxes_before_oneFrame_oneId[:,1]+boxes_before_oneFrame_oneId[:,3])//2
-
- [cv2.line(img_draw, ( int(xcs[i]) , int(ycs[i]) ),
- ( int(xcs[i+1]),int(ycs[i+1]) ),(255,0,0), thickness=2)
- for i,_ in enumerate(xcs) if i < len(xcs)-1 ]
-
- return img_draw
-
-
- def moving_average_wang(interval, windowsize):
- outNum = interval.copy()
- if windowsize==1:
- return outNum
- assert windowsize%2!=0
- window = np.ones(int(windowsize)) / float(windowsize)
- re = np.convolve(interval, window, 'valid')
- cnt = int((windowsize - 1)/2+0.5)
- total = len(interval)
- outNum = np.zeros( (total,),dtype=np.float32 )
- outNum[0]=interval[0]
- outNum[-1]=interval[-1]
- for i in range(1,cnt):
- outNum[i] = np.mean( interval[0:2*i-1] )
- outNum[-i-1] = np.mean( interval[-2*i-1:] )
- #print('###line113:',outNum.shape,re.shape,cnt,windowsize)
- outNum[cnt:-cnt]=re[:]
- return outNum
-
- def track_draw_trace(tracks,im0):
- for track in tracks:
- [cv2.line(im0, (int(track.centroidarr[i][0]),
- int(track.centroidarr[i][1])),
- (int(track.centroidarr[i+1][0]),
- int(track.centroidarr[i+1][1])),
- (255,0,0), thickness=2)
- for i,_ in enumerate(track.centroidarr)
- if i < len(track.centroidarr)-1 ]
-
-
- return im0
-
- """Function to Draw Bounding boxes"""
- def track_draw_boxes(img, bbox, identities=None, categories=None, names=None ):
- for i, box in enumerate(bbox):
- #print('####line33 sort.py:',box)
- x1, y1, x2, y2 = [int(x) for x in box]
-
- cat = int(categories[i]) if categories is not None else 0
- id = int(identities[i]) if identities is not None else 0
- data = (int((box[0]+box[2])/2),(int((box[1]+box[3])/2)))
- label = str(id) + ":"+ names[cat]
- (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
- cv2.rectangle(img, (x1, y1), (x2, y2), (255,0,20), 2)
- cv2.rectangle(img, (x1, y1 - 20), (x1 + w, y1), (255,144,30), -1)
- cv2.putText(img, label, (x1, y1 - 5),cv2.FONT_HERSHEY_SIMPLEX,
- 0.6, [255, 255, 255], 1)
- # cv2.circle(img, data, 6, color,-1) #centroid of box
-
-
- return img
-
- def track_draw_all_boxes(tracked_dets,im0,names):
- if len(tracked_dets)>0:
- bbox_xyxy = tracked_dets[:,:4]
- identities = tracked_dets[:, 8]
- categories = tracked_dets[:, 4]
- track_draw_boxes(im0, bbox_xyxy, identities, categories, names)
- return im0
- ####轨迹采用跟踪链中的结果。box采用track.update后的结果。
- def track_draw_boxAndTrace(tracked_dets,tracks,im0,names):
- track_draw_all_boxes(tracked_dets,im0,names)
- track_draw_trace(tracks,im0)
- return im0
-
- ####轨迹和box都采用跟踪链中的结果
- def track_draw_trace_boxes(tracks,im0,names):
- for track in tracks:
- [cv2.line(im0, (int(track.centroidarr[i][0]),
- int(track.centroidarr[i][1])),
- (int(track.centroidarr[i+1][0]),
- int(track.centroidarr[i+1][1])),
- (255,0,0), thickness=2)
- for i,_ in enumerate(track.centroidarr)
- if i < len(track.centroidarr)-1 ]
- bbox_xyxy = track.bbox_history[-1][0:4]
- identities,categories = track.id , track.detclass
- #print('####sort.py line74:',bbox_xyxy)
- track_draw_boxes(im0, [bbox_xyxy], [identities], [categories], names)
-
-
- return im0
-
- def linear_assignment(cost_matrix):
- try:
- import lap #linear assignment problem solver
- _, x, y = lap.lapjv(cost_matrix, extend_cost = True)
- return np.array([[y[i],i] for i in x if i>=0])
- except ImportError:
- from scipy.optimize import linear_sum_assignment
- x,y = linear_sum_assignment(cost_matrix)
- return np.array(list(zip(x,y)))
-
-
- """From SORT: Computes IOU between two boxes in the form [x1,y1,x2,y2]"""
- def iou_batch(bb_test, bb_gt):
-
- bb_gt = np.expand_dims(bb_gt, 0)
- bb_test = np.expand_dims(bb_test, 1)
-
- xx1 = np.maximum(bb_test[...,0], bb_gt[..., 0])
- yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
- xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
- yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
- w = np.maximum(0., xx2 - xx1)
- h = np.maximum(0., yy2 - yy1)
- wh = w * h
- o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
- + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
- return(o)
-
-
- """Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form [x,y,s,r] where x,y is the center of the box and s is the scale/area and r is the aspect ratio"""
- def convert_bbox_to_z(bbox):
- w = bbox[2] - bbox[0]
- h = bbox[3] - bbox[1]
- x = bbox[0] + w/2.
- y = bbox[1] + h/2.
- s = w * h
- #scale is just area
- r = w / float(h)
- return np.array([x, y, s, r]).reshape((4, 1))
-
-
- """Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
- [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right"""
- def convert_x_to_bbox(x, score=None):
- w = np.sqrt(x[2] * x[3])
- h = x[2] / w
- if(score==None):
- return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
- else:
- return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))
-
- """This class represents the internal state of individual tracked objects observed as bbox."""
- class KalmanBoxTracker(object):
-
- count = 0
- def __init__(self, bbox):
- """
- Initialize a tracker using initial bounding box
-
- Parameter 'bbox' must have 'detected class' int number at the -1 position.
- """
- self.kf = KalmanFilter(dim_x=7, dim_z=4)
- self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],[0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
- self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])
-
- self.kf.R[2:,2:] *= 10. # R: Covariance matrix of measurement noise (set to high for noisy inputs -> more 'inertia' of boxes')
- self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
- self.kf.P *= 10.
- self.kf.Q[-1,-1] *= 0.5 # Q: Covariance matrix of process noise (set to high for erratically moving things)
- self.kf.Q[4:,4:] *= 0.5
-
- self.kf.x[:4] = convert_bbox_to_z(bbox) # STATE VECTOR
- self.time_since_update = 0
- self.id = KalmanBoxTracker.count
- KalmanBoxTracker.count += 1
- self.history = []
- self.hits = 0
- self.hit_streak = 0
- self.age = 0
- self.frames = []
- self.centroidarr = []
- CX = (bbox[0]+bbox[2])//2
- CY = (bbox[1]+bbox[3])//2
- self.centroidarr.append((CX,CY))
- #keep yolov5 detected class information
- self.detclass = bbox[5]
- self.frames.append( bbox[6] ) ###new added for interpolation
- # If we want to store bbox
-
- self.bbox_history = [bbox]
-
- def update(self, bbox):
- """
- Updates the state vector with observed bbox
- """
- self.time_since_update = 0
- self.history = []
- self.hits += 1
- self.hit_streak += 1
- self.kf.update(convert_bbox_to_z(bbox))
- self.detclass = bbox[5]
- CX = (bbox[0]+bbox[2])//2
- CY = (bbox[1]+bbox[3])//2
- self.centroidarr.append((CX,CY))
- self.frames.append( bbox[6] ) ###new added for interpolation
- self.bbox_history.append(bbox)
-
- def predict(self):
- """
- Advances the state vector and returns the predicted bounding box estimate
- """
- if((self.kf.x[6]+self.kf.x[2])<=0):
- self.kf.x[6] *= 0.0
- self.kf.predict()
- self.age += 1
- if(self.time_since_update>0):
- self.hit_streak = 0
- self.time_since_update += 1
- self.history.append(convert_x_to_bbox(self.kf.x))
- # bbox=self.history[-1]
- # CX = (bbox[0]+bbox[2])/2
- # CY = (bbox[1]+bbox[3])/2
- # self.centroidarr.append((CX,CY))
-
- return self.history[-1]
-
-
- def get_state(self):
- """
- Returns the current bounding box estimate
- # test
- arr1 = np.array([[1,2,3,4]])
- arr2 = np.array([0])
- arr3 = np.expand_dims(arr2, 0)
- np.concatenate((arr1,arr3), axis=1)
- """
- arr_detclass = np.expand_dims(np.array([self.detclass]), 0)
-
- arr_u_dot = np.expand_dims(self.kf.x[4],0)
- arr_v_dot = np.expand_dims(self.kf.x[5],0)
- arr_s_dot = np.expand_dims(self.kf.x[6],0)
-
- return np.concatenate((convert_x_to_bbox(self.kf.x), arr_detclass, arr_u_dot, arr_v_dot, arr_s_dot), axis=1)
-
- def associate_detections_to_trackers(detections, trackers, iou_threshold = 0.3):
- """
- Assigns detections to tracked object (both represented as bounding boxes)
- Returns 3 lists of
- 1. matches,
- 2. unmatched_detections
- 3. unmatched_trackers
- """
- if(len(trackers)==0):
- return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)
-
- iou_matrix = iou_batch(detections, trackers)
-
- if min(iou_matrix.shape) > 0:
- a = (iou_matrix > iou_threshold).astype(np.int32)
- if a.sum(1).max() == 1 and a.sum(0).max() ==1:
- matched_indices = np.stack(np.where(a), axis=1)
- else:
- matched_indices = linear_assignment(-iou_matrix)
- else:
- matched_indices = np.empty(shape=(0,2))
-
- unmatched_detections = []
- for d, det in enumerate(detections):
- if(d not in matched_indices[:,0]):
- unmatched_detections.append(d)
-
- unmatched_trackers = []
- for t, trk in enumerate(trackers):
- if(t not in matched_indices[:,1]):
- unmatched_trackers.append(t)
-
- #filter out matched with low IOU
- matches = []
- for m in matched_indices:
- if(iou_matrix[m[0], m[1]]<iou_threshold):
- unmatched_detections.append(m[0])
- unmatched_trackers.append(m[1])
- else:
- matches.append(m.reshape(1,2))
-
- if(len(matches)==0):
- matches = np.empty((0,2), dtype=int)
- else:
- matches = np.concatenate(matches, axis=0)
-
- return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
-
-
- class Sort(object):
- # def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
- def __init__(self, max_age=1, min_hits=1000, iou_threshold=0.1):
- """
- Parameters for SORT
- """
- self.max_age = max_age # 最大检测数:目标未被检测到的帧数,超过之后会被删
- self.min_hits = min_hits # 目标命中的最小次数,小于该次数不返回
- self.iou_threshold = iou_threshold
- self.trackers = []
- self.frame_count = 0
- def getTrackers(self,):
- return self.trackers
-
- def update(self, dets= np.empty((0,6))):
- """
- Parameters:
- 'dets' - a numpy array of detection in the format [[x1, y1, x2, y2, score], [x1,y1,x2,y2,score],...]
-
- Ensure to call this method even frame has no detections. (pass np.empty((0,5)))
-
- Returns a similar array, where the last column is object ID (replacing confidence score)
-
- NOTE: The number of objects returned may differ from the number of objects provided.
- """
- self.frame_count += 1
-
- # 在当前帧逐个预测轨迹位置,记录状态异常的跟踪器索引
- # 根据当前所有的卡尔曼跟踪器个数(即上一帧中跟踪的目标个数)创建二维数组:行号为卡尔曼滤波器的标识索引,列向量为跟踪
-
- # Get predicted locations from existing trackers
- trks = np.zeros((len(self.trackers), 6)) # 存储跟踪器的预测
- to_del = [] # 存储要删除的目标框
- ret = [] # 存储要返回的追踪目标框
- # 循环遍历卡尔曼跟踪器列表
- for t, trk in enumerate(trks):
- # 使用卡尔曼跟踪器t产生对应目标的跟踪框
- pos = self.trackers[t].predict()[0]
- # 遍历完成后,trk中存储了上一帧中跟踪的目标的预测跟踪框
- trk[:] = [pos[0], pos[1], pos[2], pos[3], 0, 0]
- # 如果跟踪框中包含空值则将该跟踪框添加到要删除的列表中
- if np.any(np.isnan(pos)):
- to_del.append(t)
- # numpy.ma.masked_invalid 屏蔽出现无效值的数组(NaN 或 inf)
- # numpy.ma.compress_rows 压缩包含掩码值的2-D 数组的整行,将包含掩码值的整行去除
- # trks中存储了上一帧中跟踪的目标并且在当前帧中的预测跟踪框
- trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
- # 逆向删除异常的跟踪器,防止破坏索引
- for t in reversed(to_del):
- self.trackers.pop(t)
- # 将目标检测框与卡尔曼滤波器预测的跟踪框关联获取跟踪成功的目标,新增的目标,离开画面的目标
-
- matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.iou_threshold)
-
- # 将跟踪成功的目标框更新到对应的卡尔曼滤波器
- # Update matched trackers with assigned detections
- for m in matched:
- self.trackers[m[1]].update(dets[m[0], :])
-
- # 为新增的目标创建新的卡尔曼滤波器对象进行跟踪
- # Create and initialize new trackers for unmatched detections
- for i in unmatched_dets:
- trk = KalmanBoxTracker(np.hstack(dets[i,:]))
-
- #trk = KalmanBoxTracker(np.hstack( (dets[i,:], np.array([0] )) ) ) ##初始化多了一个数,可能是为了标记说明这是第一次出现,box有7个数
- #print(' ###line271: ', np.hstack((dets[i,:], np.array([0])) ).shape)
-
- self.trackers.append(trk)
-
-
-
- # 自后向前遍历,仅返回在当前帧出现且命中周期大于self.min_hits(除非跟踪刚开始)的跟踪结果;如果未命中时间大于self.max_age则删除跟踪器。
- # hit_streak忽略目标初始的若干帧
- i = len(self.trackers)
- for trk in reversed(self.trackers):
- # 返回当前边界框的估计值
- d = trk.get_state()[0]
- # 跟踪成功目标的box与id放入ret列表中
- if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
- ret.append(np.concatenate((d, [trk.id+1])).reshape(1,-1)) #+1'd because MOT benchmark requires positive value
- i -= 1
- #remove dead tracklet
- # 跟踪失败或离开画面的目标从卡尔曼跟踪器中删除
- if(trk.time_since_update >self.max_age):
- self.trackers.pop(i) #pop按键或索引位置删除对应元素
- # 返回当前画面中所有目标的box与id,以二维矩阵形式返回
- if(len(ret) > 0):
- #print('####sort.py line282:',len(ret),ret[0].shape, (np.concatenate(ret)).shape)
- return np.concatenate(ret)
- return np.empty((0,6))
-
- def parse_args():
- """Parse input arguments."""
- parser = argparse.ArgumentParser(description='SORT demo')
- parser.add_argument('--display', dest='display', help='Display online tracker output (slow) [False]',action='store_true')
- parser.add_argument("--seq_path", help="Path to detections.", type=str, default='data')
- parser.add_argument("--phase", help="Subdirectory in seq_path.", type=str, default='train')
- parser.add_argument("--max_age",
- help="Maximum number of frames to keep alive a track without associated detections.",
- type=int, default=1)
- parser.add_argument("--min_hits",
- help="Minimum number of associated detections before track is initialised.",
- type=int, default=3)
- parser.add_argument("--iou_threshold", help="Minimum IOU for match.", type=float, default=0.3)
- args = parser.parse_args()
- return args
-
- if __name__ == '__main__':
- # all train
- args = parse_args()
- display = args.display
- phase = args.phase
- total_time = 0.0
- total_frames = 0
- colours = np.random.rand(32, 3) #used only for display
- if(display):
- if not os.path.exists('mot_benchmark'):
- print('\n\tERROR: mot_benchmark link not found!\n\n Create a symbolic link to the MOT benchmark\n (https://motchallenge.net/data/2D_MOT_2015/#download). E.g.:\n\n $ ln -s /path/to/MOT2015_challenge/2DMOT2015 mot_benchmark\n\n')
- exit()
- plt.ion()
- fig = plt.figure()
- ax1 = fig.add_subplot(111, aspect='equal')
-
- if not os.path.exists('output'):
- os.makedirs('output')
- pattern = os.path.join(args.seq_path, phase, '*', 'det', 'det.txt')
- for seq_dets_fn in glob.glob(pattern):
- mot_tracker = Sort(max_age=args.max_age,
- min_hits=args.min_hits,
- iou_threshold=args.iou_threshold) #create instance of the SORT tracker
- seq_dets = np.loadtxt(seq_dets_fn, delimiter=',')
- seq = seq_dets_fn[pattern.find('*'):].split(os.path.sep)[0]
-
- with open(os.path.join('output', '%s.txt'%(seq)),'w') as out_file:
- print("Processing %s."%(seq))
- for frame in range(int(seq_dets[:,0].max())):
- frame += 1 #detection and frame numbers begin at 1
- dets = seq_dets[seq_dets[:, 0]==frame, 2:7]
- dets[:, 2:4] += dets[:, 0:2] #convert to [x1,y1,w,h] to [x1,y1,x2,y2]
- total_frames += 1
-
- if(display):
- fn = os.path.join('mot_benchmark', phase, seq, 'img1', '%06d.jpg'%(frame))
- im =io.imread(fn)
- ax1.imshow(im)
- plt.title(seq + ' Tracked Targets')
-
- start_time = time.time()
- trackers = mot_tracker.update(dets)
- cycle_time = time.time() - start_time
- total_time += cycle_time
-
- for d in trackers:
- print('%d,%d,%.2f,%.2f,%.2f,%.2f,1,-1,-1,-1'%(frame,d[4],d[0],d[1],d[2]-d[0],d[3]-d[1]),file=out_file)
- if(display):
- d = d.astype(np.int32)
- ax1.add_patch(patches.Rectangle((d[0],d[1]),d[2]-d[0],d[3]-d[1],fill=False,lw=3,ec=colours[d[4]%32,:]))
-
- if(display):
- fig.canvas.flush_events()
- plt.draw()
- ax1.cla()
-
- print("Total Tracking took: %.3f seconds for %d frames or %.1f FPS" % (total_time, total_frames, total_frames / total_time))
-
- if(display):
- print("Note: to get real runtime results run without the option: --display")
|