Toward parking slot detection

This commit is contained in:
Teoge 2018-10-02 15:54:42 +08:00
parent 48296591cc
commit 97c1df51f7
12 changed files with 526 additions and 179 deletions

View File

@ -3,10 +3,14 @@ import argparse
INPUT_IMAGE_SIZE = 512
# 0: confidence, 1: offset_x, 2: offset_y, 3: cos(direction), 4: sin(direction)
NUM_FEATURE_MAP_CHANNEL = 5
# 0: confidence, 1: point_shape, 2: offset_x, 3: offset_y, 4: cos(direction),
# 5: sin(direction)
NUM_FEATURE_MAP_CHANNEL = 6
# image_size / 2^5 = 512 / 32 = 16
FEATURE_MAP_SIZE = 16
# Thresholds to determine whether an detected point match ground truth.
SQUARED_DISTANCE_THRESH = 0.0003
DIRECTION_ANGLE_THRESH = 0.5
def add_common_arguments(parser):
@ -17,7 +21,7 @@ def add_common_arguments(parser):
help="Depth factor.")
parser.add_argument('--disable_cuda', action='store_true',
help="Disable CUDA.")
parser.add_argument('--gpu_id', type=int, default=1,
parser.add_argument('--gpu_id', type=int, default=0,
help="Select which gpu to use.")
@ -28,8 +32,10 @@ def get_parser_for_training():
help="The location of dataset.")
parser.add_argument('--optimizer_weights',
help="The weights of optimizer.")
parser.add_argument('--batch_size', type=int, default=16,
parser.add_argument('--batch_size', type=int, default=24,
help="Batch size.")
parser.add_argument('--data_loading_workers', type=int, default=24,
help="Number of workers for data loading.")
parser.add_argument('--num_epochs', type=int, default=100,
help="Number of epochs to train for.")
parser.add_argument('--lr', type=float, default=1e-3,
@ -40,6 +46,20 @@ def get_parser_for_training():
return parser
def get_parser_for_evaluation():
"""Return argument parser for testing."""
parser = argparse.ArgumentParser()
parser.add_argument('--dataset_directory', required=True,
help="The location of dataset.")
parser.add_argument('--batch_size', type=int, default=24,
help="Batch size.")
parser.add_argument('--data_loading_workers', type=int, default=24,
help="Number of workers for data loading.")
parser.add_argument('--enable_visdom', action='store_true',
help="Enable Visdom to visualize training progress")
return parser
def get_parser_for_inference():
"""Return argument parser for inference."""
parser = argparse.ArgumentParser()

126
data.py
View File

@ -1,35 +1,99 @@
# -*- coding: utf-8 -*-
import os
import os.path
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
from collections import namedtuple
import math
import torch
import config
class ParkingSlotDataset(Dataset):
"""Parking slot dataset."""
def __init__(self, root):
super(ParkingSlotDataset, self).__init__()
self.root = root
self.sample_names = []
self.image_transform = transforms.Compose([
transforms.Resize((512, 512)),
transforms.ToTensor(),
])
for file in os.listdir(root):
if file.endswith(".txt"):
self.sample_names.append(os.path.splitext(file)[0])
MarkingPoint = namedtuple('MarkingPoint', ['x', 'y', 'direction', 'shape'])
Slot = namedtuple('Slot', ['x1', 'y1', 'x2', 'y2'])
def __getitem__(self, index):
name = self.sample_names[index]
image = Image.open(os.path.join(self.root, name+'.bmp'))
image = self.image_transform(image)
marking_points = []
with open(os.path.join(self.root, name+'.txt'), 'r') as file:
for line in file:
marking_point = tuple([float(n) for n in line.split()])
marking_points.append(marking_point)
return image, marking_points
def __len__(self):
return len(self.sample_names)
def generate_objective(marking_points_batch, device):
"""Get regression objective and gradient for directional point detector."""
batch_size = len(marking_points_batch)
objective = torch.zeros(batch_size, config.NUM_FEATURE_MAP_CHANNEL,
config.FEATURE_MAP_SIZE, config.FEATURE_MAP_SIZE,
device=device)
gradient = torch.zeros_like(objective)
gradient[:, 0].fill_(1.)
for batch_idx, marking_points in enumerate(marking_points_batch):
for marking_point in marking_points:
col = math.floor(marking_point.x * 16)
row = math.floor(marking_point.y * 16)
# Confidence Regression
objective[batch_idx, 0, row, col] = 1.
# Makring Point Shape Regression
objective[batch_idx, 1, row, col] = marking_point.shape
# Offset Regression
objective[batch_idx, 2, row, col] = marking_point.x*16 - col
objective[batch_idx, 3, row, col] = marking_point.y*16 - row
# Direction Regression
direction = marking_point.direction
objective[batch_idx, 4, row, col] = math.cos(direction)
objective[batch_idx, 5, row, col] = math.sin(direction)
# Assign Gradient
gradient[batch_idx, 1:6, row, col].fill_(1.)
return objective, gradient
def non_maximum_suppression(pred_points):
"""Perform non-maxmum suppression on marking points."""
suppressed = [False] * len(pred_points)
for i in range(len(pred_points) - 1):
for j in range(i + 1, len(pred_points)):
dist_square = cal_squre_dist(pred_points[i][1], pred_points[j][1])
# TODO: recalculate following parameter
# minimum distance in training set: 40.309
# (40.309 / 600)^2 = 0.004513376
if dist_square < 0.0045:
idx = i if pred_points[i][0] < pred_points[j][0] else j
suppressed[idx] = True
if any(suppressed):
unsupres_pred_points = []
for i, supres in enumerate(suppressed):
if not supres:
unsupres_pred_points.append(pred_points[i])
return unsupres_pred_points
return pred_points
def get_predicted_points(prediction, thresh):
"""Get marking point from one predicted feature map."""
assert isinstance(prediction, torch.Tensor)
predicted_points = []
prediction = prediction.detach().cpu().numpy()
for i in range(prediction.shape[1]):
for j in range(prediction.shape[2]):
if prediction[0, i, j] >= thresh:
xval = (j + prediction[2, i, j]) / prediction.shape[2]
yval = (i + prediction[3, i, j]) / prediction.shape[1]
cos_value = prediction[4, i, j]
sin_value = prediction[5, i, j]
direction = math.atan2(sin_value, cos_value)
marking_point = MarkingPoint(
xval, yval, direction, prediction[1, i, j])
predicted_points.append((prediction[0, i, j], marking_point))
return non_maximum_suppression(predicted_points)
def cal_squre_dist(point_a, point_b):
"""Calculate distance between two marking points."""
distx = point_a.x - point_b.x
disty = point_a.y - point_b.y
return distx ** 2 + disty ** 2
def cal_direction_angle(point_a, point_b):
"""Calculate angle between direction in rad."""
angle = abs(point_a.direction - point_b.direction)
if angle > math.pi:
angle = 2*math.pi - angle
return angle
def match_marking_points(point_a, point_b):
"""Determine whether a detected point match ground truth."""
dist_square = cal_squre_dist(point_a, point_b)
angle = cal_direction_angle(point_a, point_b)
return (dist_square < config.SQUARED_DISTANCE_THRESH
and angle < config.DIRECTION_ANGLE_THRESH)

33
dataset.py Normal file
View File

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
import json
import os
import os.path
import cv2 as cv
from torch.utils.data import Dataset
from torchvision import transforms
from data import MarkingPoint
class ParkingSlotDataset(Dataset):
"""Parking slot dataset."""
def __init__(self, root):
super(ParkingSlotDataset, self).__init__()
self.root = root
self.sample_names = []
self.image_transform = transforms.ToTensor()
for file in os.listdir(root):
if file.endswith(".json"):
self.sample_names.append(os.path.splitext(file)[0])
def __getitem__(self, index):
name = self.sample_names[index]
image = cv.imread(os.path.join(self.root, name+'.jpg'))
image = self.image_transform(image)
marking_points = []
with open(os.path.join(self.root, name + '.json'), 'r') as file:
for label in json.load(file):
marking_points.append(MarkingPoint(*label))
return image, marking_points
def __len__(self):
return len(self.sample_names)

View File

@ -57,7 +57,9 @@ class DirectionalPointDetector(nn.modules.Module):
def forward(self, *x):
feature = self.extract_feature(x[0])
prediction = self.predict(feature)
point_pred, angle_pred = torch.split(prediction, 3, dim=1)
point_pred = nn.functional.sigmoid(point_pred)
angle_pred = nn.functional.tanh(angle_pred)
# 4 represents that there are 4 value: confidence, shape, offset_x,
# offset_y, whose range is between [0, 1].
point_pred, angle_pred = torch.split(prediction, 4, dim=1)
point_pred = torch.sigmoid(point_pred)
angle_pred = torch.tanh(angle_pred)
return torch.cat((point_pred, angle_pred), dim=1)

59
evaluate.py Normal file
View File

@ -0,0 +1,59 @@
"""Evaluate directional marking point detector."""
import torch
from torch.utils.data import DataLoader
from precision_recall import calc_average_precision
from precision_recall import calc_precision_recall
import config
from data import generate_objective
from data import get_predicted_points
from data import match_marking_points
from dataset import ParkingSlotDataset
from detector import DirectionalPointDetector
from log import Logger
def evaluate_detector(args):
"""Evaluate directional point detector."""
args.cuda = not args.disable_cuda and torch.cuda.is_available()
device = torch.device('cuda:'+str(args.gpu_id) if args.cuda else 'cpu')
dp_detector = DirectionalPointDetector(
3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device)
if args.detector_weights:
dp_detector.load_state_dict(torch.load(args.detector_weights))
data_loader = DataLoader(ParkingSlotDataset(args.dataset_directory),
batch_size=args.batch_size, shuffle=True,
num_workers=args.data_loading_workers,
collate_fn=lambda x: list(zip(*x)))
logger = Logger()
total_loss = 0
num_evaluation = 0
ground_truths_list = []
predictions_list = []
for image, marking_points in data_loader:
image = torch.stack(image)
image = image.to(device)
ground_truths_list += list(marking_points)
prediction = dp_detector(image)
objective, gradient = generate_objective(marking_points, device)
loss = (prediction - objective) ** 2
total_loss += torch.sum(loss*gradient).item()
num_evaluation += loss.size(0)
pred_points = [get_predicted_points(pred, 0.01) for pred in prediction]
predictions_list += pred_points
precisions, recalls = calc_precision_recall(
ground_truths_list, predictions_list, match_marking_points)
average_precision = calc_average_precision(precisions, recalls)
if args.enable_visdom:
logger.plot_curve(precisions, recalls)
logger.log(average_loss=total_loss / num_evaluation,
average_precision=average_precision)
if __name__ == '__main__':
evaluate_detector(config.get_parser_for_evaluation().parse_args())

View File

@ -5,8 +5,9 @@ import numpy as np
import torch
from torchvision.transforms import ToTensor
import config
from data import get_predicted_points
from detector import DirectionalPointDetector
from utils import get_marking_points, Timer
from utils import Timer
def plot_marking_points(image, marking_points):
@ -14,17 +15,29 @@ def plot_marking_points(image, marking_points):
height = image.shape[0]
width = image.shape[1]
for marking_point in marking_points:
p0_x = width * marking_point[0]
p0_y = height * marking_point[1]
p1_x = p0_x + 50 * math.cos(marking_point[2])
p1_y = p0_y + 50 * math.sin(marking_point[2])
p0_x = width * marking_point.x - 0.5
p0_y = height * marking_point.y - 0.5
cos_val = math.cos(marking_point.direction)
sin_val = math.sin(marking_point.direction)
p1_x = p0_x + 50*cos_val
p1_y = p0_y + 50*sin_val
p2_x = p0_x - 50*sin_val
p2_y = p0_y + 50*cos_val
p3_x = p0_x + 50*sin_val
p3_y = p0_y - 50*cos_val
p0_x = int(round(p0_x))
p0_y = int(round(p0_y))
p1_x = int(round(p1_x))
p1_y = int(round(p1_y))
cv.arrowedLine(image, (p0_x, p0_y), (p1_x, p1_y), (0, 0, 255))
cv.imshow('demo', image)
cv.waitKey(1)
p2_x = int(round(p2_x))
p2_y = int(round(p2_y))
cv.line(image, (p0_x, p0_y), (p1_x, p1_y), (0, 0, 255))
if marking_point.shape > 0.5:
cv.line(image, (p0_x, p0_y), (p2_x, p2_y), (0, 0, 255))
else:
p3_x = int(round(p3_x))
p3_y = int(round(p3_y))
cv.line(image, (p2_x, p2_y), (p3_x, p3_y), (0, 0, 255))
def preprocess_image(image):
@ -52,8 +65,11 @@ def detect_video(detector, device, args):
prediction = detector(preprocess_image(frame).to(device))
if args.timing:
timer.toc()
pred_points = get_marking_points(prediction[0], args.thresh)
plot_marking_points(frame, pred_points)
pred_points = get_predicted_points(prediction[0], args.thresh)
if pred_points:
plot_marking_points(frame, list(list(zip(*pred_points))[1]))
cv.imshow('demo', frame)
cv.waitKey(1)
if args.save:
output_video.write(frame)
input_video.release()
@ -65,15 +81,19 @@ def detect_image(detector, device, args):
image_file = input('Enter image file path: ')
image = cv.imread(image_file)
prediction = detector(preprocess_image(image).to(device))
pred_points = get_marking_points(prediction[0], args.thresh)
plot_marking_points(image, pred_points)
pred_points = get_predicted_points(prediction[0], args.thresh)
if pred_points:
plot_marking_points(image, list(list(zip(*pred_points))[1]))
cv.imshow('demo', image)
cv.waitKey(1)
def inference_detector(args):
"""Inference demo of directional point detector."""
args.cuda = not args.disable_cuda and torch.cuda.is_available()
device = torch.device("cuda:" + str(args.gpu_id) if args.cuda else "cpu")
dp_detector = DirectionalPointDetector(3, args.depth_factor, 5).to(device)
device = torch.device('cuda:' + str(args.gpu_id) if args.cuda else 'cpu')
dp_detector = DirectionalPointDetector(
3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device)
dp_detector.load_state_dict(torch.load(args.detector_weights))
if args.mode == "image":
detect_image(dp_detector, device, args)

60
log.py
View File

@ -7,54 +7,54 @@ from PIL import ImageDraw
class Logger():
"""Logger for training."""
def __init__(self, curve_names=None):
self.curve_names = curve_names
if curve_names:
self.vis = Visdom()
assert self.vis.check_connection()
self.curve_y = None
self.curve_x_start = 0
self.curve_x_end = 0
self.curve_x = np.array([0])
def log(self, **kwargs):
def log(self, xval=None, win_name='loss', **kwargs):
"""Log and print the information."""
print("##############################################################")
for key, value in kwargs.items():
print(key, value, sep='\t')
if not self.curve_names:
return
curve_step = np.array([kwargs[cn] for cn in self.curve_names])
if self.curve_y is None:
self.curve_y = curve_step
else:
self.curve_y = np.row_stack((self.curve_y, curve_step))
self.curve_x_end = self.curve_x_end + 1
def plot_curve(self):
"""Plot curve on visdom."""
if (self.curve_x_end - self.curve_x_start < 2 or not self.curve_names):
return
if self.curve_x_start == 0:
update_opt = None
else:
update_opt = 'append'
curve_x = np.arange(self.curve_x_start, self.curve_x_end)
curve_x = np.transpose(np.tile(curve_x, (len(self.curve_names), 1)))
self.vis.line(Y=self.curve_y, X=curve_x, win='loss', update=update_opt,
opts=dict(showlegend=True, legend=self.curve_names))
self.curve_x_start = self.curve_x_end
self.curve_y = None
if self.curve_names:
if not xval:
xval = self.curve_x
for i in range(len(self.curve_names)):
name = self.curve_names[i]
if name not in kwargs:
continue
yval = np.array([kwargs[name]])
self.vis.line(Y=yval, X=xval, win=win_name, update='append',
name=name, opts=dict(showlegend=True))
self.curve_x += 1
def plot_curve(self, yvals, xvals, win_name='pr_curves'):
"""Plot curve."""
self.vis.line(Y=np.array(yvals), X=np.array(xvals), win=win_name)
def plot_marking_points(self, image, marking_points, win_name='mk_points'):
"""Plot marking points on visdom."""
width, height = image.size
draw = ImageDraw.Draw(image)
for point in marking_points:
p0_x = width * point[0]
p0_y = height * point[1]
p1_x = p0_x + 50*math.cos(point[2])
p1_y = p0_y + 50*math.sin(point[2])
p0_x = width * point.x
p0_y = height * point.y
p1_x = p0_x + 50*math.cos(point.direction)
p1_y = p0_y + 50*math.sin(point.direction)
draw.line((p0_x, p0_y, p1_x, p1_y), fill=(255, 0, 0))
p2_x = p0_x - 50*math.sin(point.direction)
p2_y = p0_y + 50*math.cos(point.direction)
if point.shape > 0.5:
draw.line((p2_x, p2_y, p0_x, p0_y), fill=(255, 0, 0))
else:
p3_x = p0_x + 50*math.sin(point.direction)
p3_y = p0_y - 50*math.cos(point.direction)
draw.line((p2_x, p2_y, p3_x, p3_y), fill=(255, 0, 0))
image = np.asarray(image, dtype="uint8")
image = np.transpose(image, (2, 0, 1))
self.vis.image(image, win=win_name)

View File

@ -23,7 +23,7 @@ def define_expand_unit(basic_channel_size):
def define_halve_unit(basic_channel_size):
"""Define a 3x3 expand stride 2 convolution with norm and activation."""
"""Define a 4x4 stride 2 expand convolution with norm and activation."""
conv = nn.Conv2d(basic_channel_size, 2 * basic_channel_size, kernel_size=4,
stride=2, padding=1, bias=False)
norm = nn.BatchNorm2d(2 * basic_channel_size)

63
precision_recall.py Normal file
View File

@ -0,0 +1,63 @@
"""Universal procedure of calculating average precision defined in VOC"""
def match_gt_with_preds(ground_truth, predictions, match_labels):
"""Match a ground truth with every predictions and return matched index."""
max_confidence = 0.
matched_idx = -1
for i, pred in enumerate(predictions):
if match_labels(ground_truth, pred[1]) and max_confidence < pred[0]:
max_confidence = pred[0]
matched_idx = i
return matched_idx
def get_confidence_list(ground_truths_list, predictions_list, match_labels):
"""Generate a list of confidence of true positives and false positives."""
assert len(ground_truths_list) == len(predictions_list)
true_positive_list = []
false_positive_list = []
num_samples = len(ground_truths_list)
for i in range(num_samples):
ground_truths = ground_truths_list[i]
predictions = predictions_list[i]
prediction_matched = [False] * len(predictions)
for ground_truth in ground_truths:
idx = match_gt_with_preds(ground_truth, predictions, match_labels)
if idx >= 0:
prediction_matched[idx] = True
true_positive_list.append(predictions[idx][0])
else:
true_positive_list.append(.0)
for idx, pred_matched in enumerate(prediction_matched):
if not pred_matched:
false_positive_list.append(predictions[idx][0])
return true_positive_list, false_positive_list
def calc_precision_recall(ground_truths_list, predictions_list, match_labels):
"""Adjust threshold to get mutiple precision recall sample."""
true_positive_list, false_positive_list = get_confidence_list(
ground_truths_list, predictions_list, match_labels)
recalls = [0.]
precisions = [0.]
thresholds = sorted(list(set(true_positive_list)))
for thresh in reversed(thresholds):
if thresh == 0.:
recalls.append(1.)
precisions.append(0.)
true_positives = sum(i >= thresh for i in true_positive_list)
false_positives = sum(i >= thresh for i in false_positive_list)
false_negatives = len(true_positive_list) - true_positives
recalls.append(true_positives / (true_positives+false_negatives))
precisions.append(true_positives / (true_positives + false_positives))
return precisions, recalls
def calc_average_precision(precisions, recalls):
"""Calculate average precision defined in VOC contest."""
total_precision = 0.
for i in range(11):
index = next(conf[0] for conf in enumerate(recalls) if conf[1] >= i/10)
total_precision += max(precisions[index:])
return total_precision / 11

147
scripts/prepare_dataset.py Normal file
View File

@ -0,0 +1,147 @@
"""Perform data augmentation and preprocessing."""
import argparse
import json
import math
import os
import random
import cv2 as cv
import numpy as np
def get_parser():
"""Return argument parser for generating dataset."""
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', required=True,
choices=['trainval', 'test'],
help="Generate trainval or test dataset.")
parser.add_argument('--val_prop', type=float, default=0.1,
help="The proportion of val sample in trainval.")
parser.add_argument('--label_directory', required=True,
help="The location of label directory.")
parser.add_argument('--image_directory', required=True,
help="The location of image directory.")
parser.add_argument('--output_directory', required=True,
help="The location of output directory.")
return parser
def boundary_check(centralied_marks):
"""Check situation that marking point appears too near to border."""
for mark in centralied_marks:
if mark[0] < -260 or mark[0] > 260 or mark[1] < -260 or mark[1] > 260:
return False
return True
def overlap_check(centralied_marks):
"""Check situation that multiple marking points appear in same cell."""
for i in range(len(centralied_marks) - 1):
i_x = centralied_marks[i, 0]
i_y = centralied_marks[i, 1]
for j in range(i + 1, len(centralied_marks)):
j_x = centralied_marks[j, 0]
j_y = centralied_marks[j, 1]
if abs(j_x - i_x) < 600 / 16 and abs(j_y - i_y) < 600 / 16:
return False
return True
def generalize_marks(centralied_marks):
"""Convert coordinate to [0, 1] and calculate direction label."""
generalized_marks = []
for mark in centralied_marks:
xval = (mark[0] + 300) / 600
yval = (mark[1] + 300) / 600
direction = math.atan2(mark[3] - mark[1], mark[2] - mark[0])
generalized_marks.append([xval, yval, direction, mark[4]])
return generalized_marks
def write_image_and_label(name, image, centralied_marks, name_list):
"""Write image and label with given name."""
name_list.append(os.path.basename(name))
print("Processing NO.%d samples: %s..." % (len(name_list), name_list[-1]))
image = cv.resize(image, (512, 512))
cv.imwrite(name + '.jpg', image, [int(cv.IMWRITE_JPEG_QUALITY), 100])
with open(name + '.json', 'w') as file:
json.dump(generalize_marks(centralied_marks), file)
def rotate_vector(vector, angle_degree):
"""Rotate a vector with given angle in degree."""
angle_rad = math.pi * angle_degree / 180
xval = vector[0]*math.cos(angle_rad) + vector[1]*math.sin(angle_rad)
yval = -vector[0]*math.sin(angle_rad) + vector[1]*math.cos(angle_rad)
return xval, yval
def rotate_centralized_marks(centralied_marks, angle_degree):
"""Rotate centralized marks with given angle in degree."""
rotated_marks = centralied_marks.copy()
for i in range(centralied_marks.shape[0]):
mark = centralied_marks[i]
rotated_marks[i, 0:2] = rotate_vector(mark[0:2], angle_degree)
rotated_marks[i, 2:4] = rotate_vector(mark[2:4], angle_degree)
return rotated_marks
def rotate_image(image, angle_degree):
"""Rotate image with given angle in degree."""
rows, cols, _ = image.shape
rotation_matrix = cv.getRotationMatrix2D((rows/2, cols/2), angle_degree, 1)
return cv.warpAffine(image, rotation_matrix, (rows, cols))
def generate_dataset(args):
"""Generate dataset according to arguments."""
if args.dataset == 'trainval':
val_directory = os.path.join(args.output_directory, 'val')
args.output_directory = os.path.join(args.output_directory, 'train')
elif args.dataset == 'test':
args.output_directory = os.path.join(args.output_directory, 'test')
os.makedirs(args.output_directory, exist_ok=True)
name_list = []
for label_file in os.listdir(args.label_directory):
name = os.path.splitext(label_file)[0]
image = cv.imread(os.path.join(args.image_directory, name + '.jpg'))
with open(os.path.join(args.label_directory, label_file), 'r') as file:
label = json.load(file)
centralied_marks = np.array(label['marks'])
if len(centralied_marks.shape) < 2:
centralied_marks = np.expand_dims(centralied_marks, axis=0)
centralied_marks[:, 0: 4] -= 300.5
if boundary_check(centralied_marks):
output_name = os.path.join(args.output_directory, name)
write_image_and_label(output_name, image,
centralied_marks, name_list)
if args.dataset == 'test':
continue
for angle in range(5, 360, 5):
rotated_marks = rotate_centralized_marks(centralied_marks, angle)
if boundary_check(rotated_marks) and overlap_check(rotated_marks):
rotated_image = rotate_image(image, angle)
output_name = os.path.join(
args.output_directory, name + '_' + str(angle))
write_image_and_label(
output_name, rotated_image, rotated_marks, name_list)
if args.dataset == 'trainval':
print("Dividing training set and validation set...")
val_idx = random.sample(list(range(len(name_list))),
int(round(len(name_list)*args.val_prop)))
val_samples = [name_list[idx] for idx in val_idx]
os.makedirs(val_directory, exist_ok=True)
for val_sample in val_samples:
train_directory = args.output_directory
image_src = os.path.join(train_directory, val_sample + '.jpg')
label_src = os.path.join(train_directory, val_sample + '.json')
image_dst = os.path.join(val_directory, val_sample + '.jpg')
label_dst = os.path.join(val_directory, val_sample + '.json')
os.rename(image_src, image_dst)
os.rename(label_src, label_dst)
print("Done.")
if __name__ == '__main__':
generate_dataset(get_parser().parse_args())

View File

@ -1,76 +1,50 @@
"""Train directional point detector."""
import math
"""Train directional marking point detector."""
import random
import torch
from torch.utils.data import DataLoader
import config
from data import ParkingSlotDataset
from data import get_predicted_points
from data import generate_objective
from dataset import ParkingSlotDataset
from detector import DirectionalPointDetector
from log import Logger
from utils import tensor2im, get_marking_points
from utils import tensor2im
def get_objective_from_labels(marking_points_batch, device):
"""Get regression objective and gradient for directional point detector."""
batch_size = len(marking_points_batch)
objective = torch.zeros(batch_size, config.NUM_FEATURE_MAP_CHANNEL,
config.FEATURE_MAP_SIZE, config.FEATURE_MAP_SIZE,
device=device)
gradient = torch.zeros_like(objective)
gradient[:, 0].fill_(1.)
for batch_idx, marking_points in enumerate(marking_points_batch):
for marking_point in marking_points:
col = math.floor(marking_point[0] * 16)
row = math.floor(marking_point[1] * 16)
# Confidence Regression
objective[batch_idx, 0, row, col] = 1.
# Offset Regression
offset_x = marking_point[0]*16 - col
offset_y = marking_point[1]*16 - row
objective[batch_idx, 1, row, col] = offset_x
objective[batch_idx, 2, row, col] = offset_y
# Direction Regression
direction = marking_point[2]
objective[batch_idx, 3, row, col] = math.cos(direction)
objective[batch_idx, 4, row, col] = math.sin(direction)
# Assign Gradient
gradient[batch_idx, 1:5, row, col].fill_(1.)
return objective, gradient
def plot_random_prediction(logger, image, marking_points, prediction):
def plot_prediction(logger, image, marking_points, prediction):
"""Plot the ground truth and prediction of a random sample in a batch."""
rand_sample = random.randint(0, image.size(0)-1)
sampled_image = tensor2im(image[rand_sample])
logger.plot_marking_points(sampled_image, marking_points[rand_sample],
win_name='gt_marking_points')
sampled_image = tensor2im(image[rand_sample])
pred_points = get_marking_points(prediction[rand_sample], 0.01)
logger.plot_marking_points(sampled_image, pred_points,
pred_points = get_predicted_points(prediction[rand_sample], 0.01)
if pred_points:
logger.plot_marking_points(sampled_image,
list(list(zip(*pred_points))[1]),
win_name='pred_marking_points')
def train_detector(args):
"""Train directional point detector."""
args.cuda = not args.disable_cuda and torch.cuda.is_available()
device = torch.device("cuda:"+str(args.gpu_id) if args.cuda else "cpu")
device = torch.device('cuda:'+str(args.gpu_id) if args.cuda else 'cpu')
dp_detector = DirectionalPointDetector(3, args.depth_factor, 5).to(device)
if args.detector_weights is not None:
dp_detector = DirectionalPointDetector(
3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device)
if args.detector_weights:
dp_detector.load_state_dict(torch.load(args.detector_weights))
optimizer = torch.optim.Adam(dp_detector.parameters(), lr=args.lr)
if args.optimizer_weights is not None:
if args.optimizer_weights:
optimizer.load_state_dict(torch.load(args.optimizer_weights))
if args.enable_visdom:
logger = Logger(['loss'])
else:
logger = Logger()
logger = Logger(['train_loss'] if args.enable_visdom else None)
data_loader = DataLoader(ParkingSlotDataset(args.dataset_directory),
batch_size=args.batch_size, shuffle=True,
num_workers=args.data_loading_workers,
collate_fn=lambda x: list(zip(*x)))
for epoch_idx in range(args.num_epochs):
for iter_idx, (image, marking_points) in enumerate(data_loader):
image = torch.stack(image)
@ -78,18 +52,15 @@ def train_detector(args):
optimizer.zero_grad()
prediction = dp_detector(image)
objective, gradient = get_objective_from_labels(marking_points,
device)
objective, gradient = generate_objective(marking_points, device)
loss = (prediction - objective) ** 2
loss.backward(gradient)
optimizer.step()
logger.log(epoch=epoch_idx, iter=iter_idx,
loss=torch.sum(loss * gradient).item())
train_loss = torch.sum(loss*gradient).item() / loss.size(0)
logger.log(epoch=epoch_idx, iter=iter_idx, train_loss=train_loss)
if args.enable_visdom:
logger.plot_curve()
plot_random_prediction(logger, image, marking_points,
prediction)
plot_prediction(logger, image, marking_points, prediction)
torch.save(dp_detector.state_dict(),
'weights/dp_detector_%d.pth' % epoch_idx)
torch.save(optimizer.state_dict(), 'weights/optimizer.pth')

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import math
import time
import cv2 as cv
import torch
import numpy as np
from PIL import Image
@ -8,6 +9,7 @@ from PIL import Image
class Timer(object):
"""Timer."""
def __init__(self):
self.start_ticking = False
self.start = 0.
@ -24,53 +26,19 @@ class Timer(object):
print("Time elapsed:", duration, "s.")
def non_maximum_suppression(marking_points):
"""Perform non-maxmum suppression on marking points."""
suppressed = [False] * len(marking_points)
for i in range(len(marking_points) - 1):
for j in range(i + 1, len(marking_points)):
distx = marking_points[i][0] - marking_points[j][0]
disty = marking_points[i][1] - marking_points[j][1]
dist_square = distx ** 2 + disty ** 2
# minimum distance in training set: 40.309
# (40.309 / 600)^2 = 0.004513376
if dist_square < 0.0045:
idx = i if marking_points[i][3] < marking_points[j][3] else j
suppressed[idx] = True
if any(suppressed):
new_marking_points = []
for i, supres in enumerate(suppressed):
if not supres:
new_marking_points.append(marking_points[i])
return new_marking_points
return marking_points
def get_marking_points(prediction, thresh):
"""Get marking point from predicted feature map."""
assert isinstance(prediction, torch.Tensor)
marking_points = []
prediction = prediction.detach().cpu().numpy()
for i in range(prediction.shape[1]):
for j in range(prediction.shape[2]):
if prediction[0, i, j] > thresh:
xval = (j + prediction[1, i, j]) / prediction.shape[2]
yval = (i + prediction[2, i, j]) / prediction.shape[1]
cos_value = prediction[3, i, j]
sin_value = prediction[4, i, j]
angle = math.atan2(sin_value, cos_value)
marking_points.append([xval, yval, angle, prediction[0, i, j]])
return non_maximum_suppression(marking_points)
def tensor2array(image_tensor, imtype=np.uint8):
"""Convert float image tensor to numpy ndarray"""
"""
Convert float CxHxW image tensor between [0, 1] to HxWxC numpy ndarray
between [0, 255]
"""
assert isinstance(image_tensor, torch.Tensor)
image_numpy = (image_tensor.detach().cpu().numpy()) * 255.0
return image_numpy.astype(imtype)
image_numpy = np.transpose(image_numpy, (1, 2, 0)).astype(imtype)
return image_numpy
def tensor2im(image_tensor, imtype=np.uint8):
"""Convert float image tensor to PIL Image"""
image_numpy = np.transpose(tensor2array(image_tensor, imtype), (1, 2, 0))
"""Convert float CxHxW BGR image tensor to RGB PIL Image"""
image_numpy = tensor2array(image_tensor, imtype)
image_numpy = cv.cvtColor(image_numpy, cv.COLOR_BGR2RGB)
return Image.fromarray(image_numpy)