@@ -0,0 +1,4 @@ | |||
.vscode | |||
.pylintrc | |||
__pycache__ | |||
weights/* |
@@ -0,0 +1,57 @@ | |||
"""Configurate arguments.""" | |||
import argparse | |||
INPUT_IMAGE_SIZE = 512 | |||
# 0: confidence, 1: offset_x, 2: offset_y, 3: cos(direction), 4: sin(direction) | |||
NUM_FEATURE_MAP_CHANNEL = 5 | |||
# image_size / 2^5 = 512 / 32 = 16 | |||
FEATURE_MAP_SIZE = 16 | |||
def add_common_arguments(parser): | |||
"""Add common arguments for training and inference.""" | |||
parser.add_argument('--detector_weights', | |||
help="The weights of pretrained detector.") | |||
parser.add_argument('--depth_factor', type=int, default=32, | |||
help="Depth factor.") | |||
parser.add_argument('--disable_cuda', action='store_true', | |||
help="Disable CUDA.") | |||
parser.add_argument('--gpu_id', type=int, default=1, | |||
help="Select which gpu to use.") | |||
def get_parser_for_training(): | |||
"""Return argument parser for training.""" | |||
parser = argparse.ArgumentParser() | |||
parser.add_argument('--dataset_directory', required=True, | |||
help="The location of dataset.") | |||
parser.add_argument('--optimizer_weights', | |||
help="The weights of optimizer.") | |||
parser.add_argument('--batch_size', type=int, default=16, | |||
help="Batch size.") | |||
parser.add_argument('--num_epochs', type=int, default=100, | |||
help="Number of epochs to train for.") | |||
parser.add_argument('--lr', type=float, default=1e-3, | |||
help="The learning rate of back propagation.") | |||
parser.add_argument('--enable_visdom', action='store_true', | |||
help="Enable Visdom to visualize training progress") | |||
add_common_arguments(parser) | |||
return parser | |||
def get_parser_for_inference(): | |||
"""Return argument parser for inference.""" | |||
parser = argparse.ArgumentParser() | |||
parser.add_argument('--mode', required=True, choices=['image', 'video'], | |||
help="Inference image or video.") | |||
parser.add_argument('--video', | |||
help="Video path if you choose to inference video.") | |||
parser.add_argument('--thresh', type=float, default=0.5, | |||
help="Detection threshold.") | |||
parser.add_argument('--timing', action='store_true', | |||
help="Perform timing during reference.") | |||
parser.add_argument('--save', action='store_true', | |||
help="Save detection result to file.") | |||
add_common_arguments(parser) | |||
return parser |
@@ -0,0 +1,35 @@ | |||
# -*- coding: utf-8 -*- | |||
import os | |||
import os.path | |||
from PIL import Image | |||
from torch.utils.data import Dataset | |||
from torchvision import transforms | |||
class ParkingSlotDataset(Dataset): | |||
"""Parking slot dataset.""" | |||
def __init__(self, root): | |||
super(ParkingSlotDataset, self).__init__() | |||
self.root = root | |||
self.sample_names = [] | |||
self.image_transform = transforms.Compose([ | |||
transforms.Resize((512, 512)), | |||
transforms.ToTensor(), | |||
]) | |||
for file in os.listdir(root): | |||
if file.endswith(".txt"): | |||
self.sample_names.append(os.path.splitext(file)[0]) | |||
def __getitem__(self, index): | |||
name = self.sample_names[index] | |||
image = Image.open(os.path.join(self.root, name+'.bmp')) | |||
image = self.image_transform(image) | |||
marking_points = [] | |||
with open(os.path.join(self.root, name+'.txt'), 'r') as file: | |||
for line in file: | |||
marking_point = tuple([float(n) for n in line.split()]) | |||
marking_points.append(marking_point) | |||
return image, marking_points | |||
def __len__(self): | |||
return len(self.sample_names) |
@@ -0,0 +1,63 @@ | |||
# -*- coding: utf-8 -*- | |||
import torch | |||
from torch import nn | |||
from network import define_halve_unit, define_detector_block | |||
class YetAnotherDarknet(nn.modules.Module): | |||
"""Yet another darknet, imitating darknet-53 with depth of darknet-19.""" | |||
def __init__(self, input_channel_size, depth_factor): | |||
super(YetAnotherDarknet, self).__init__() | |||
layers = [] | |||
# 0 | |||
layers += [nn.Conv2d(input_channel_size, depth_factor, kernel_size=3, | |||
stride=1, padding=1, bias=False)] | |||
layers += [nn.BatchNorm2d(depth_factor)] | |||
layers += [nn.LeakyReLU(0.1)] | |||
# 1 | |||
layers += define_halve_unit(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
# 2 | |||
depth_factor *= 2 | |||
layers += define_halve_unit(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
# 3 | |||
depth_factor *= 2 | |||
layers += define_halve_unit(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
# 4 | |||
depth_factor *= 2 | |||
layers += define_halve_unit(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
# 5 | |||
depth_factor *= 2 | |||
layers += define_halve_unit(depth_factor) | |||
layers += define_detector_block(depth_factor) | |||
self.model = nn.Sequential(*layers) | |||
def forward(self, *x): | |||
return self.model(x[0]) | |||
class DirectionalPointDetector(nn.modules.Module): | |||
"""Detector for point with direction.""" | |||
def __init__(self, input_channel_size, depth_factor, output_channel_size): | |||
super(DirectionalPointDetector, self).__init__() | |||
self.extract_feature = YetAnotherDarknet(input_channel_size, | |||
depth_factor) | |||
layers = [] | |||
layers += define_detector_block(16 * depth_factor) | |||
layers += define_detector_block(16 * depth_factor) | |||
layers += [nn.Conv2d(32 * depth_factor, output_channel_size, | |||
kernel_size=1, stride=1, padding=0, bias=False)] | |||
self.predict = nn.Sequential(*layers) | |||
def forward(self, *x): | |||
feature = self.extract_feature(x[0]) | |||
prediction = self.predict(feature) | |||
point_pred, angle_pred = torch.split(prediction, 3, dim=1) | |||
point_pred = nn.functional.sigmoid(point_pred) | |||
angle_pred = nn.functional.tanh(angle_pred) | |||
return torch.cat((point_pred, angle_pred), dim=1) |
@@ -0,0 +1,85 @@ | |||
"""Inference demo of directional point detector.""" | |||
import math | |||
import cv2 as cv | |||
import numpy as np | |||
import torch | |||
from torchvision.transforms import ToTensor | |||
import config | |||
from detector import DirectionalPointDetector | |||
from utils import get_marking_points, Timer | |||
def plot_marking_points(image, marking_points): | |||
"""Plot marking points on the image and show.""" | |||
height = image.shape[0] | |||
width = image.shape[1] | |||
for marking_point in marking_points: | |||
p0_x = width * marking_point[0] | |||
p0_y = height * marking_point[1] | |||
p1_x = p0_x + 50 * math.cos(marking_point[2]) | |||
p1_y = p0_y + 50 * math.sin(marking_point[2]) | |||
p0_x = int(round(p0_x)) | |||
p0_y = int(round(p0_y)) | |||
p1_x = int(round(p1_x)) | |||
p1_y = int(round(p1_y)) | |||
cv.arrowedLine(image, (p0_x, p0_y), (p1_x, p1_y), (0, 0, 255)) | |||
cv.imshow('demo', image) | |||
cv.waitKey(1) | |||
def preprocess_image(image): | |||
"""Preprocess numpy image to torch tensor.""" | |||
if image.shape[0] != 512 or image.shape[1] != 512: | |||
image = cv.resize(image, (512, 512)) | |||
return torch.unsqueeze(ToTensor()(image), 0) | |||
def detect_video(detector, device, args): | |||
"""Demo for detecting video.""" | |||
timer = Timer() | |||
input_video = cv.VideoCapture(args.video) | |||
frame_width = int(input_video.get(cv.CAP_PROP_FRAME_WIDTH)) | |||
frame_height = int(input_video.get(cv.CAP_PROP_FRAME_HEIGHT)) | |||
output_video = cv.VideoWriter() | |||
if args.save: | |||
output_video.open('record.avi', cv.VideoWriter_fourcc(* 'MJPG'), | |||
input_video.get(cv.CAP_PROP_FPS), | |||
(frame_width, frame_height)) | |||
frame = np.empty([frame_height, frame_width, 3], dtype=np.uint8) | |||
while input_video.read(frame)[0]: | |||
if args.timing: | |||
timer.tic() | |||
prediction = detector(preprocess_image(frame).to(device)) | |||
if args.timing: | |||
timer.toc() | |||
pred_points = get_marking_points(prediction[0], args.thresh) | |||
plot_marking_points(frame, pred_points) | |||
if args.save: | |||
output_video.write(frame) | |||
input_video.release() | |||
output_video.release() | |||
def detect_image(detector, device, args): | |||
"""Demo for detecting images.""" | |||
image_file = input('Enter image file path: ') | |||
image = cv.imread(image_file) | |||
prediction = detector(preprocess_image(image).to(device)) | |||
pred_points = get_marking_points(prediction[0], args.thresh) | |||
plot_marking_points(image, pred_points) | |||
def inference_detector(args): | |||
"""Inference demo of directional point detector.""" | |||
args.cuda = not args.disable_cuda and torch.cuda.is_available() | |||
device = torch.device("cuda:" + str(args.gpu_id) if args.cuda else "cpu") | |||
dp_detector = DirectionalPointDetector(3, args.depth_factor, 5).to(device) | |||
dp_detector.load_state_dict(torch.load(args.detector_weights)) | |||
if args.mode == "image": | |||
detect_image(dp_detector, device, args) | |||
elif args.mode == "video": | |||
detect_video(dp_detector, device, args) | |||
if __name__ == '__main__': | |||
inference_detector(config.get_parser_for_inference().parse_args()) |
@@ -0,0 +1,60 @@ | |||
# -*- coding: utf-8 -*- | |||
import math | |||
import numpy as np | |||
from visdom import Visdom | |||
from PIL import ImageDraw | |||
class Logger(): | |||
"""Logger for training.""" | |||
def __init__(self, curve_names=None): | |||
self.curve_names = curve_names | |||
if curve_names: | |||
self.vis = Visdom() | |||
assert self.vis.check_connection() | |||
self.curve_y = None | |||
self.curve_x_start = 0 | |||
self.curve_x_end = 0 | |||
def log(self, **kwargs): | |||
"""Log and print the information.""" | |||
print("##############################################################") | |||
for key, value in kwargs.items(): | |||
print(key, value, sep='\t') | |||
if not self.curve_names: | |||
return | |||
curve_step = np.array([kwargs[cn] for cn in self.curve_names]) | |||
if self.curve_y is None: | |||
self.curve_y = curve_step | |||
else: | |||
self.curve_y = np.row_stack((self.curve_y, curve_step)) | |||
self.curve_x_end = self.curve_x_end + 1 | |||
def plot_curve(self): | |||
"""Plot curve on visdom.""" | |||
if (self.curve_x_end - self.curve_x_start < 2 or not self.curve_names): | |||
return | |||
if self.curve_x_start == 0: | |||
update_opt = None | |||
else: | |||
update_opt = 'append' | |||
curve_x = np.arange(self.curve_x_start, self.curve_x_end) | |||
curve_x = np.transpose(np.tile(curve_x, (len(self.curve_names), 1))) | |||
self.vis.line(Y=self.curve_y, X=curve_x, win='loss', update=update_opt, | |||
opts=dict(showlegend=True, legend=self.curve_names)) | |||
self.curve_x_start = self.curve_x_end | |||
self.curve_y = None | |||
def plot_marking_points(self, image, marking_points, win_name='mk_points'): | |||
"""Plot marking points on visdom.""" | |||
width, height = image.size | |||
draw = ImageDraw.Draw(image) | |||
for point in marking_points: | |||
p0_x = width * point[0] | |||
p0_y = height * point[1] | |||
p1_x = p0_x + 50*math.cos(point[2]) | |||
p1_y = p0_y + 50*math.sin(point[2]) | |||
draw.line((p0_x, p0_y, p1_x, p1_y), fill=(255, 0, 0)) | |||
image = np.asarray(image, dtype="uint8") | |||
image = np.transpose(image, (2, 0, 1)) | |||
self.vis.image(image, win=win_name) |
@@ -0,0 +1,40 @@ | |||
"""Common network struture unit definition.""" | |||
from torch import nn | |||
def define_squeeze_unit(basic_channel_size): | |||
"""Define a 1x1 squeeze convolution with norm and activation.""" | |||
conv = nn.Conv2d(2 * basic_channel_size, basic_channel_size, kernel_size=1, | |||
stride=1, padding=0, bias=False) | |||
norm = nn.BatchNorm2d(basic_channel_size) | |||
relu = nn.LeakyReLU(0.1) | |||
layers = [conv, norm, relu] | |||
return layers | |||
def define_expand_unit(basic_channel_size): | |||
"""Define a 3x3 expand convolution with norm and activation.""" | |||
conv = nn.Conv2d(basic_channel_size, 2 * basic_channel_size, kernel_size=3, | |||
stride=1, padding=1, bias=False) | |||
norm = nn.BatchNorm2d(2 * basic_channel_size) | |||
relu = nn.LeakyReLU(0.1) | |||
layers = [conv, norm, relu] | |||
return layers | |||
def define_halve_unit(basic_channel_size): | |||
"""Define a 3x3 expand stride 2 convolution with norm and activation.""" | |||
conv = nn.Conv2d(basic_channel_size, 2 * basic_channel_size, kernel_size=4, | |||
stride=2, padding=1, bias=False) | |||
norm = nn.BatchNorm2d(2 * basic_channel_size) | |||
relu = nn.LeakyReLU(0.1) | |||
layers = [conv, norm, relu] | |||
return layers | |||
def define_detector_block(basic_channel_size): | |||
"""Define a unit composite of a squeeze and expand unit.""" | |||
layers = [] | |||
layers += define_squeeze_unit(basic_channel_size) | |||
layers += define_expand_unit(basic_channel_size) | |||
return layers |
@@ -0,0 +1,99 @@ | |||
"""Train directional point detector.""" | |||
import math | |||
import random | |||
import torch | |||
from torch.utils.data import DataLoader | |||
import config | |||
from data import ParkingSlotDataset | |||
from detector import DirectionalPointDetector | |||
from log import Logger | |||
from utils import tensor2im, get_marking_points | |||
def get_objective_from_labels(marking_points_batch, device): | |||
"""Get regression objective and gradient for directional point detector.""" | |||
batch_size = len(marking_points_batch) | |||
objective = torch.zeros(batch_size, config.NUM_FEATURE_MAP_CHANNEL, | |||
config.FEATURE_MAP_SIZE, config.FEATURE_MAP_SIZE, | |||
device=device) | |||
gradient = torch.zeros_like(objective) | |||
gradient[:, 0].fill_(1.) | |||
for batch_idx, marking_points in enumerate(marking_points_batch): | |||
for marking_point in marking_points: | |||
col = math.floor(marking_point[0] * 16) | |||
row = math.floor(marking_point[1] * 16) | |||
# Confidence Regression | |||
objective[batch_idx, 0, row, col] = 1. | |||
# Offset Regression | |||
offset_x = marking_point[0]*16 - col | |||
offset_y = marking_point[1]*16 - row | |||
objective[batch_idx, 1, row, col] = offset_x | |||
objective[batch_idx, 2, row, col] = offset_y | |||
# Direction Regression | |||
direction = marking_point[2] | |||
objective[batch_idx, 3, row, col] = math.cos(direction) | |||
objective[batch_idx, 4, row, col] = math.sin(direction) | |||
# Assign Gradient | |||
gradient[batch_idx, 1:5, row, col].fill_(1.) | |||
return objective, gradient | |||
def plot_random_prediction(logger, image, marking_points, prediction): | |||
"""Plot the ground truth and prediction of a random sample in a batch.""" | |||
rand_sample = random.randint(0, image.size(0)-1) | |||
sampled_image = tensor2im(image[rand_sample]) | |||
logger.plot_marking_points(sampled_image, marking_points[rand_sample], | |||
win_name='gt_marking_points') | |||
sampled_image = tensor2im(image[rand_sample]) | |||
pred_points = get_marking_points(prediction[rand_sample], 0.01) | |||
logger.plot_marking_points(sampled_image, pred_points, | |||
win_name='pred_marking_points') | |||
def train_detector(args): | |||
"""Train directional point detector.""" | |||
args.cuda = not args.disable_cuda and torch.cuda.is_available() | |||
device = torch.device("cuda:"+str(args.gpu_id) if args.cuda else "cpu") | |||
dp_detector = DirectionalPointDetector(3, args.depth_factor, 5).to(device) | |||
if args.detector_weights is not None: | |||
dp_detector.load_state_dict(torch.load(args.detector_weights)) | |||
optimizer = torch.optim.Adam(dp_detector.parameters(), lr=args.lr) | |||
if args.optimizer_weights is not None: | |||
optimizer.load_state_dict(torch.load(args.optimizer_weights)) | |||
if args.enable_visdom: | |||
logger = Logger(['loss']) | |||
else: | |||
logger = Logger() | |||
data_loader = DataLoader(ParkingSlotDataset(args.dataset_directory), | |||
batch_size=args.batch_size, shuffle=True, | |||
collate_fn=lambda x: list(zip(*x))) | |||
for epoch_idx in range(args.num_epochs): | |||
for iter_idx, (image, marking_points) in enumerate(data_loader): | |||
image = torch.stack(image) | |||
image = image.to(device) | |||
optimizer.zero_grad() | |||
prediction = dp_detector(image) | |||
objective, gradient = get_objective_from_labels(marking_points, | |||
device) | |||
loss = (prediction - objective) ** 2 | |||
loss.backward(gradient) | |||
optimizer.step() | |||
logger.log(epoch=epoch_idx, iter=iter_idx, | |||
loss=torch.sum(loss * gradient).item()) | |||
if args.enable_visdom: | |||
logger.plot_curve() | |||
plot_random_prediction(logger, image, marking_points, | |||
prediction) | |||
torch.save(dp_detector.state_dict(), | |||
'weights/dp_detector_%d.pth' % epoch_idx) | |||
torch.save(optimizer.state_dict(), 'weights/optimizer.pth') | |||
if __name__ == '__main__': | |||
train_detector(config.get_parser_for_training().parse_args()) |
@@ -0,0 +1,76 @@ | |||
# -*- coding: utf-8 -*- | |||
import math | |||
import time | |||
import torch | |||
import numpy as np | |||
from PIL import Image | |||
class Timer(object): | |||
"""Timer.""" | |||
def __init__(self): | |||
self.start_ticking = False | |||
self.start = 0. | |||
def tic(self): | |||
"""Start timer.""" | |||
self.start = time.time() | |||
self.start_ticking = True | |||
def toc(self): | |||
"""End timer.""" | |||
duration = time.time() - self.start | |||
self.start_ticking = False | |||
print("Time elapsed:", duration, "s.") | |||
def non_maximum_suppression(marking_points): | |||
"""Perform non-maxmum suppression on marking points.""" | |||
suppressed = [False] * len(marking_points) | |||
for i in range(len(marking_points) - 1): | |||
for j in range(i + 1, len(marking_points)): | |||
distx = marking_points[i][0] - marking_points[j][0] | |||
disty = marking_points[i][1] - marking_points[j][1] | |||
dist_square = distx ** 2 + disty ** 2 | |||
# minimum distance in training set: 40.309 | |||
# (40.309 / 600)^2 = 0.004513376 | |||
if dist_square < 0.0045: | |||
idx = i if marking_points[i][3] < marking_points[j][3] else j | |||
suppressed[idx] = True | |||
if any(suppressed): | |||
new_marking_points = [] | |||
for i, supres in enumerate(suppressed): | |||
if not supres: | |||
new_marking_points.append(marking_points[i]) | |||
return new_marking_points | |||
return marking_points | |||
def get_marking_points(prediction, thresh): | |||
"""Get marking point from predicted feature map.""" | |||
assert isinstance(prediction, torch.Tensor) | |||
marking_points = [] | |||
prediction = prediction.detach().cpu().numpy() | |||
for i in range(prediction.shape[1]): | |||
for j in range(prediction.shape[2]): | |||
if prediction[0, i, j] > thresh: | |||
xval = (j + prediction[1, i, j]) / prediction.shape[2] | |||
yval = (i + prediction[2, i, j]) / prediction.shape[1] | |||
cos_value = prediction[3, i, j] | |||
sin_value = prediction[4, i, j] | |||
angle = math.atan2(sin_value, cos_value) | |||
marking_points.append([xval, yval, angle, prediction[0, i, j]]) | |||
return non_maximum_suppression(marking_points) | |||
def tensor2array(image_tensor, imtype=np.uint8): | |||
"""Convert float image tensor to numpy ndarray""" | |||
assert isinstance(image_tensor, torch.Tensor) | |||
image_numpy = (image_tensor.detach().cpu().numpy()) * 255.0 | |||
return image_numpy.astype(imtype) | |||
def tensor2im(image_tensor, imtype=np.uint8): | |||
"""Convert float image tensor to PIL Image""" | |||
image_numpy = np.transpose(tensor2array(image_tensor, imtype), (1, 2, 0)) | |||
return Image.fromarray(image_numpy) |