# image_size / 2^5 = 512 / 32 = 16 | # image_size / 2^5 = 512 / 32 = 16 | ||||
FEATURE_MAP_SIZE = 16 | FEATURE_MAP_SIZE = 16 | ||||
# Thresholds to determine whether an detected point match ground truth. | # Thresholds to determine whether an detected point match ground truth. | ||||
SQUARED_DISTANCE_THRESH = 0.0003 | |||||
SQUARED_DISTANCE_THRESH = 0.000277778 | |||||
DIRECTION_ANGLE_THRESH = 0.5 | DIRECTION_ANGLE_THRESH = 0.5 | ||||
help="The weights of optimizer.") | help="The weights of optimizer.") | ||||
parser.add_argument('--batch_size', type=int, default=24, | parser.add_argument('--batch_size', type=int, default=24, | ||||
help="Batch size.") | help="Batch size.") | ||||
parser.add_argument('--data_loading_workers', type=int, default=24, | |||||
parser.add_argument('--data_loading_workers', type=int, default=48, | |||||
help="Number of workers for data loading.") | help="Number of workers for data loading.") | ||||
parser.add_argument('--num_epochs', type=int, default=100, | parser.add_argument('--num_epochs', type=int, default=100, | ||||
help="Number of epochs to train for.") | help="Number of epochs to train for.") | ||||
parser.add_argument('--lr', type=float, default=1e-3, | |||||
parser.add_argument('--lr', type=float, default=1e-4, | |||||
help="The learning rate of back propagation.") | help="The learning rate of back propagation.") | ||||
parser.add_argument('--enable_visdom', action='store_true', | parser.add_argument('--enable_visdom', action='store_true', | ||||
help="Enable Visdom to visualize training progress") | help="Enable Visdom to visualize training progress") | ||||
parser = argparse.ArgumentParser() | parser = argparse.ArgumentParser() | ||||
parser.add_argument('--dataset_directory', required=True, | parser.add_argument('--dataset_directory', required=True, | ||||
help="The location of dataset.") | help="The location of dataset.") | ||||
parser.add_argument('--batch_size', type=int, default=24, | |||||
parser.add_argument('--batch_size', type=int, default=32, | |||||
help="Batch size.") | help="Batch size.") | ||||
parser.add_argument('--data_loading_workers', type=int, default=24, | |||||
parser.add_argument('--data_loading_workers', type=int, default=64, | |||||
help="Number of workers for data loading.") | help="Number of workers for data loading.") | ||||
parser.add_argument('--enable_visdom', action='store_true', | parser.add_argument('--enable_visdom', action='store_true', | ||||
help="Enable Visdom to visualize training progress") | help="Enable Visdom to visualize training progress") | ||||
add_common_arguments(parser) | |||||
return parser | return parser | ||||
"""Defines data structure and related functions.""" | |||||
from collections import namedtuple | |||||
MarkingPoint = namedtuple('MarkingPoint', ['x', 'y', 'direction', 'shape']) | |||||
Slot = namedtuple('Slot', ['x1', 'y1', 'x2', 'y2']) | |||||
"""Data related package.""" | |||||
from .data_process import get_predicted_points, match_marking_points | |||||
from .dataset import ParkingSlotDataset | |||||
from .struct import MarkingPoint, Slot |
import math | import math | ||||
import torch | import torch | ||||
import config | import config | ||||
from . import MarkingPoint | |||||
def generate_objective(marking_points_batch, device): | |||||
"""Get regression objective and gradient for directional point detector.""" | |||||
batch_size = len(marking_points_batch) | |||||
objective = torch.zeros(batch_size, config.NUM_FEATURE_MAP_CHANNEL, | |||||
config.FEATURE_MAP_SIZE, config.FEATURE_MAP_SIZE, | |||||
device=device) | |||||
gradient = torch.zeros_like(objective) | |||||
gradient[:, 0].fill_(1.) | |||||
for batch_idx, marking_points in enumerate(marking_points_batch): | |||||
for marking_point in marking_points: | |||||
col = math.floor(marking_point.x * 16) | |||||
row = math.floor(marking_point.y * 16) | |||||
# Confidence Regression | |||||
objective[batch_idx, 0, row, col] = 1. | |||||
# Makring Point Shape Regression | |||||
objective[batch_idx, 1, row, col] = marking_point.shape | |||||
# Offset Regression | |||||
objective[batch_idx, 2, row, col] = marking_point.x*16 - col | |||||
objective[batch_idx, 3, row, col] = marking_point.y*16 - row | |||||
# Direction Regression | |||||
direction = marking_point.direction | |||||
objective[batch_idx, 4, row, col] = math.cos(direction) | |||||
objective[batch_idx, 5, row, col] = math.sin(direction) | |||||
# Assign Gradient | |||||
gradient[batch_idx, 1:6, row, col].fill_(1.) | |||||
return objective, gradient | |||||
from data.struct import MarkingPoint | |||||
def non_maximum_suppression(pred_points): | def non_maximum_suppression(pred_points): | ||||
def get_predicted_points(prediction, thresh): | def get_predicted_points(prediction, thresh): | ||||
"""Get marking point from one predicted feature map.""" | |||||
"""Get marking points from one predicted feature map.""" | |||||
assert isinstance(prediction, torch.Tensor) | assert isinstance(prediction, torch.Tensor) | ||||
predicted_points = [] | predicted_points = [] | ||||
prediction = prediction.detach().cpu().numpy() | prediction = prediction.detach().cpu().numpy() |
import os.path | import os.path | ||||
import cv2 as cv | import cv2 as cv | ||||
from torch.utils.data import Dataset | from torch.utils.data import Dataset | ||||
from torchvision import transforms | |||||
from . import MarkingPoint | |||||
from torchvision.transforms import ToTensor | |||||
from data.struct import MarkingPoint | |||||
class ParkingSlotDataset(Dataset): | class ParkingSlotDataset(Dataset): | ||||
super(ParkingSlotDataset, self).__init__() | super(ParkingSlotDataset, self).__init__() | ||||
self.root = root | self.root = root | ||||
self.sample_names = [] | self.sample_names = [] | ||||
self.image_transform = transforms.ToTensor() | |||||
self.image_transform = ToTensor() | |||||
for file in os.listdir(root): | for file in os.listdir(root): | ||||
if file.endswith(".json"): | if file.endswith(".json"): | ||||
self.sample_names.append(os.path.splitext(file)[0]) | self.sample_names.append(os.path.splitext(file)[0]) |
"""Defines data structure.""" | |||||
from collections import namedtuple | |||||
MarkingPoint = namedtuple('MarkingPoint', ['x', 'y', 'direction', 'shape']) | |||||
Slot = namedtuple('Slot', ['x1', 'y1', 'x2', 'y2']) |
import torch | import torch | ||||
from torch.utils.data import DataLoader | from torch.utils.data import DataLoader | ||||
import config | import config | ||||
from data.data_process import generate_objective, get_predicted_points, match_marking_points | |||||
from data.dataset import ParkingSlotDataset | |||||
from model.detector import DirectionalPointDetector | |||||
from util.log import Logger | |||||
from util.precision_recall import calc_average_precision, calc_precision_recall | |||||
import util | |||||
from data import get_predicted_points, match_marking_points | |||||
from data import ParkingSlotDataset | |||||
from model import DirectionalPointDetector | |||||
from train import generate_objective | |||||
def evaluate_detector(args): | def evaluate_detector(args): | ||||
"""Evaluate directional point detector.""" | """Evaluate directional point detector.""" | ||||
args.cuda = not args.disable_cuda and torch.cuda.is_available() | args.cuda = not args.disable_cuda and torch.cuda.is_available() | ||||
device = torch.device('cuda:'+str(args.gpu_id) if args.cuda else 'cpu') | |||||
device = torch.device('cuda:' + str(args.gpu_id) if args.cuda else 'cpu') | |||||
torch.set_grad_enabled(False) | |||||
dp_detector = DirectionalPointDetector( | dp_detector = DirectionalPointDetector( | ||||
3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device) | 3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device) | ||||
if args.detector_weights: | if args.detector_weights: | ||||
dp_detector.load_state_dict(torch.load(args.detector_weights)) | dp_detector.load_state_dict(torch.load(args.detector_weights)) | ||||
dp_detector.eval() | |||||
torch.multiprocessing.set_sharing_strategy('file_system') | |||||
data_loader = DataLoader(ParkingSlotDataset(args.dataset_directory), | data_loader = DataLoader(ParkingSlotDataset(args.dataset_directory), | ||||
batch_size=args.batch_size, shuffle=True, | batch_size=args.batch_size, shuffle=True, | ||||
num_workers=args.data_loading_workers, | num_workers=args.data_loading_workers, | ||||
collate_fn=lambda x: list(zip(*x))) | collate_fn=lambda x: list(zip(*x))) | ||||
logger = Logger() | |||||
logger = util.Logger(enable_visdom=args.enable_visdom) | |||||
total_loss = 0 | total_loss = 0 | ||||
num_evaluation = 0 | num_evaluation = 0 | ||||
ground_truths_list = [] | ground_truths_list = [] | ||||
predictions_list = [] | predictions_list = [] | ||||
for image, marking_points in data_loader: | |||||
for iter_idx, (image, marking_points) in enumerate(data_loader): | |||||
image = torch.stack(image) | image = torch.stack(image) | ||||
image = image.to(device) | image = image.to(device) | ||||
ground_truths_list += list(marking_points) | ground_truths_list += list(marking_points) | ||||
pred_points = [get_predicted_points(pred, 0.01) for pred in prediction] | pred_points = [get_predicted_points(pred, 0.01) for pred in prediction] | ||||
predictions_list += pred_points | predictions_list += pred_points | ||||
logger.log(iter=iter_idx, total_loss=total_loss) | |||||
precisions, recalls = calc_precision_recall( | |||||
precisions, recalls = util.calc_precision_recall( | |||||
ground_truths_list, predictions_list, match_marking_points) | ground_truths_list, predictions_list, match_marking_points) | ||||
average_precision = calc_average_precision(precisions, recalls) | |||||
average_precision = util.calc_average_precision(precisions, recalls) | |||||
if args.enable_visdom: | if args.enable_visdom: | ||||
logger.plot_curve(precisions, recalls) | logger.plot_curve(precisions, recalls) | ||||
logger.log(average_loss=total_loss / num_evaluation, | logger.log(average_loss=total_loss / num_evaluation, |
import torch | import torch | ||||
from torchvision.transforms import ToTensor | from torchvision.transforms import ToTensor | ||||
import config | import config | ||||
from data.data_process import get_predicted_points | |||||
from model.detector import DirectionalPointDetector | |||||
from data import get_predicted_points | |||||
from model import DirectionalPointDetector | |||||
from util import Timer | from util import Timer | ||||
"""Inference demo of directional point detector.""" | """Inference demo of directional point detector.""" | ||||
args.cuda = not args.disable_cuda and torch.cuda.is_available() | args.cuda = not args.disable_cuda and torch.cuda.is_available() | ||||
device = torch.device('cuda:' + str(args.gpu_id) if args.cuda else 'cpu') | device = torch.device('cuda:' + str(args.gpu_id) if args.cuda else 'cpu') | ||||
torch.set_grad_enabled(False) | |||||
dp_detector = DirectionalPointDetector( | dp_detector = DirectionalPointDetector( | ||||
3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device) | 3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device) | ||||
dp_detector.load_state_dict(torch.load(args.detector_weights)) | dp_detector.load_state_dict(torch.load(args.detector_weights)) | ||||
dp_detector.eval() | |||||
if args.mode == "image": | if args.mode == "image": | ||||
detect_image(dp_detector, device, args) | detect_image(dp_detector, device, args) | ||||
elif args.mode == "video": | elif args.mode == "video": |
"""Network model related package.""" | |||||
from .detector import DirectionalPointDetector |
"""Train directional marking point detector.""" | """Train directional marking point detector.""" | ||||
import math | |||||
import random | import random | ||||
import torch | import torch | ||||
from torch.utils.data import DataLoader | from torch.utils.data import DataLoader | ||||
import config | import config | ||||
from data.data_process import get_predicted_points, generate_objective | |||||
from data.dataset import ParkingSlotDataset | |||||
from model.detector import DirectionalPointDetector | |||||
from util.log import Logger | |||||
from util import tensor2im | |||||
import data | |||||
import util | |||||
from model import DirectionalPointDetector | |||||
def plot_prediction(logger, image, marking_points, prediction): | def plot_prediction(logger, image, marking_points, prediction): | ||||
"""Plot the ground truth and prediction of a random sample in a batch.""" | """Plot the ground truth and prediction of a random sample in a batch.""" | ||||
rand_sample = random.randint(0, image.size(0)-1) | rand_sample = random.randint(0, image.size(0)-1) | ||||
sampled_image = tensor2im(image[rand_sample]) | |||||
sampled_image = util.tensor2im(image[rand_sample]) | |||||
logger.plot_marking_points(sampled_image, marking_points[rand_sample], | logger.plot_marking_points(sampled_image, marking_points[rand_sample], | ||||
win_name='gt_marking_points') | win_name='gt_marking_points') | ||||
sampled_image = tensor2im(image[rand_sample]) | |||||
pred_points = get_predicted_points(prediction[rand_sample], 0.01) | |||||
sampled_image = util.tensor2im(image[rand_sample]) | |||||
pred_points = data.get_predicted_points(prediction[rand_sample], 0.01) | |||||
if pred_points: | if pred_points: | ||||
logger.plot_marking_points(sampled_image, | logger.plot_marking_points(sampled_image, | ||||
list(list(zip(*pred_points))[1]), | list(list(zip(*pred_points))[1]), | ||||
win_name='pred_marking_points') | win_name='pred_marking_points') | ||||
def generate_objective(marking_points_batch, device): | |||||
"""Get regression objective and gradient for directional point detector.""" | |||||
batch_size = len(marking_points_batch) | |||||
objective = torch.zeros(batch_size, config.NUM_FEATURE_MAP_CHANNEL, | |||||
config.FEATURE_MAP_SIZE, config.FEATURE_MAP_SIZE, | |||||
device=device) | |||||
gradient = torch.zeros_like(objective) | |||||
gradient[:, 0].fill_(1.) | |||||
for batch_idx, marking_points in enumerate(marking_points_batch): | |||||
for marking_point in marking_points: | |||||
col = math.floor(marking_point.x * 16) | |||||
row = math.floor(marking_point.y * 16) | |||||
# Confidence Regression | |||||
objective[batch_idx, 0, row, col] = 1. | |||||
# Makring Point Shape Regression | |||||
objective[batch_idx, 1, row, col] = marking_point.shape | |||||
# Offset Regression | |||||
objective[batch_idx, 2, row, col] = marking_point.x*16 - col | |||||
objective[batch_idx, 3, row, col] = marking_point.y*16 - row | |||||
# Direction Regression | |||||
direction = marking_point.direction | |||||
objective[batch_idx, 4, row, col] = math.cos(direction) | |||||
objective[batch_idx, 5, row, col] = math.sin(direction) | |||||
# Assign Gradient | |||||
gradient[batch_idx, 1:6, row, col].fill_(1.) | |||||
return objective, gradient | |||||
def train_detector(args): | def train_detector(args): | ||||
"""Train directional point detector.""" | """Train directional point detector.""" | ||||
args.cuda = not args.disable_cuda and torch.cuda.is_available() | args.cuda = not args.disable_cuda and torch.cuda.is_available() | ||||
device = torch.device('cuda:'+str(args.gpu_id) if args.cuda else 'cpu') | |||||
device = torch.device('cuda:' + str(args.gpu_id) if args.cuda else 'cpu') | |||||
torch.set_grad_enabled(True) | |||||
dp_detector = DirectionalPointDetector( | dp_detector = DirectionalPointDetector( | ||||
3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device) | 3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device) | ||||
if args.detector_weights: | if args.detector_weights: | ||||
print("Loading weights: %s" % args.detector_weights) | |||||
dp_detector.load_state_dict(torch.load(args.detector_weights)) | dp_detector.load_state_dict(torch.load(args.detector_weights)) | ||||
dp_detector.train() | |||||
optimizer = torch.optim.Adam(dp_detector.parameters(), lr=args.lr) | optimizer = torch.optim.Adam(dp_detector.parameters(), lr=args.lr) | ||||
if args.optimizer_weights: | if args.optimizer_weights: | ||||
print("Loading weights: %s" % args.optimizer_weights) | |||||
optimizer.load_state_dict(torch.load(args.optimizer_weights)) | optimizer.load_state_dict(torch.load(args.optimizer_weights)) | ||||
logger = Logger(['train_loss'] if args.enable_visdom else None) | |||||
data_loader = DataLoader(ParkingSlotDataset(args.dataset_directory), | |||||
logger = util.Logger(args.enable_visdom, | |||||
['train_loss'] if args.enable_visdom else None) | |||||
data_loader = DataLoader(data.ParkingSlotDataset(args.dataset_directory), | |||||
batch_size=args.batch_size, shuffle=True, | batch_size=args.batch_size, shuffle=True, | ||||
num_workers=args.data_loading_workers, | num_workers=args.data_loading_workers, | ||||
collate_fn=lambda x: list(zip(*x))) | collate_fn=lambda x: list(zip(*x))) |
# -*- coding: utf-8 -*- | |||||
import math | |||||
import time | |||||
import cv2 as cv | |||||
import torch | |||||
import numpy as np | |||||
from PIL import Image | |||||
class Timer(object): | |||||
"""Timer.""" | |||||
def __init__(self): | |||||
self.start_ticking = False | |||||
self.start = 0. | |||||
def tic(self): | |||||
"""Start timer.""" | |||||
self.start = time.time() | |||||
self.start_ticking = True | |||||
def toc(self): | |||||
"""End timer.""" | |||||
duration = time.time() - self.start | |||||
self.start_ticking = False | |||||
print("Time elapsed:", duration, "s.") | |||||
def tensor2array(image_tensor, imtype=np.uint8): | |||||
""" | |||||
Convert float CxHxW image tensor between [0, 1] to HxWxC numpy ndarray | |||||
between [0, 255] | |||||
""" | |||||
assert isinstance(image_tensor, torch.Tensor) | |||||
image_numpy = (image_tensor.detach().cpu().numpy()) * 255.0 | |||||
image_numpy = np.transpose(image_numpy, (1, 2, 0)).astype(imtype) | |||||
return image_numpy | |||||
def tensor2im(image_tensor, imtype=np.uint8): | |||||
"""Convert float CxHxW BGR image tensor to RGB PIL Image""" | |||||
image_numpy = tensor2array(image_tensor, imtype) | |||||
image_numpy = cv.cvtColor(image_numpy, cv.COLOR_BGR2RGB) | |||||
return Image.fromarray(image_numpy) | |||||
"""Utility related package.""" | |||||
from .log import Logger | |||||
from .precision_recall import calc_precision_recall, calc_average_precision | |||||
from .utils import Timer, tensor2array, tensor2im |
class Logger(): | class Logger(): | ||||
"""Logger for training.""" | """Logger for training.""" | ||||
def __init__(self, curve_names=None): | |||||
def __init__(self, enable_visdom=False, curve_names=None): | |||||
self.curve_names = curve_names | self.curve_names = curve_names | ||||
if curve_names: | |||||
if enable_visdom: | |||||
self.vis = Visdom() | self.vis = Visdom() | ||||
assert self.vis.check_connection() | assert self.vis.check_connection() | ||||
self.curve_x = np.array([0]) | self.curve_x = np.array([0]) | ||||
else: | |||||
self.curve_names = None | |||||
def log(self, xval=None, win_name='loss', **kwargs): | def log(self, xval=None, win_name='loss', **kwargs): | ||||
"""Log and print the information.""" | """Log and print the information.""" |
"""Universal procedure of calculating precision and recall.""" | """Universal procedure of calculating precision and recall.""" | ||||
import bisect | |||||
def match_gt_with_preds(ground_truth, predictions, match_labels): | def match_gt_with_preds(ground_truth, predictions, match_labels): | ||||
"""Adjust threshold to get mutiple precision recall sample.""" | """Adjust threshold to get mutiple precision recall sample.""" | ||||
true_positive_list, false_positive_list = get_confidence_list( | true_positive_list, false_positive_list = get_confidence_list( | ||||
ground_truths_list, predictions_list, match_labels) | ground_truths_list, predictions_list, match_labels) | ||||
true_positive_list = sorted(true_positive_list) | |||||
false_positive_list = sorted(false_positive_list) | |||||
thresholds = sorted(list(set(true_positive_list))) | |||||
recalls = [0.] | recalls = [0.] | ||||
precisions = [0.] | precisions = [0.] | ||||
thresholds = sorted(list(set(true_positive_list))) | |||||
for thresh in reversed(thresholds): | for thresh in reversed(thresholds): | ||||
if thresh == 0.: | if thresh == 0.: | ||||
recalls.append(1.) | recalls.append(1.) | ||||
precisions.append(0.) | precisions.append(0.) | ||||
true_positives = sum(i >= thresh for i in true_positive_list) | |||||
false_positives = sum(i >= thresh for i in false_positive_list) | |||||
false_negatives = len(true_positive_list) - true_positives | |||||
break | |||||
false_negatives = bisect.bisect_left(true_positive_list, thresh) | |||||
true_positives = len(true_positive_list) - false_negatives | |||||
true_negatives = bisect.bisect_left(false_positive_list, thresh) | |||||
false_positives = len(false_positive_list) - true_negatives | |||||
recalls.append(true_positives / (true_positives+false_negatives)) | recalls.append(true_positives / (true_positives+false_negatives)) | ||||
precisions.append(true_positives / (true_positives + false_positives)) | precisions.append(true_positives / (true_positives + false_positives)) | ||||
return precisions, recalls | return precisions, recalls |
"""Utility classes and functions.""" | |||||
import math | |||||
import time | |||||
import cv2 as cv | |||||
import torch | |||||
import numpy as np | |||||
from PIL import Image | |||||
class Timer(object): | |||||
"""Timer.""" | |||||
def __init__(self): | |||||
self.start_ticking = False | |||||
self.start = 0. | |||||
def tic(self): | |||||
"""Start timer.""" | |||||
self.start = time.time() | |||||
self.start_ticking = True | |||||
def toc(self): | |||||
"""End timer.""" | |||||
duration = time.time() - self.start | |||||
self.start_ticking = False | |||||
print("Time elapsed:", duration, "s.") | |||||
def tensor2array(image_tensor, imtype=np.uint8): | |||||
""" | |||||
Convert float CxHxW image tensor between [0, 1] to HxWxC numpy ndarray | |||||
between [0, 255] | |||||
""" | |||||
assert isinstance(image_tensor, torch.Tensor) | |||||
image_numpy = (image_tensor.detach().cpu().numpy()) * 255.0 | |||||
image_numpy = np.transpose(image_numpy, (1, 2, 0)).astype(imtype) | |||||
return image_numpy | |||||
def tensor2im(image_tensor, imtype=np.uint8): | |||||
"""Convert float CxHxW BGR image tensor to RGB PIL Image""" | |||||
image_numpy = tensor2array(image_tensor, imtype) | |||||
image_numpy = cv.cvtColor(image_numpy, cv.COLOR_BGR2RGB) | |||||
return Image.fromarray(image_numpy) |