|
- import math
- import os
- import time
- from collections import namedtuple
-
- import cv2
- import numpy as np
- import torch
- from torchvision.transforms import ToTensor
-
- from DMPRUtils.model import DirectionalPointDetector
- from conf import config
- from utils.datasets import letterbox
- from utils.general import clip_coords
- from utils.torch_utils import select_device
-
- MarkingPoint = namedtuple('MarkingPoint', ['x', 'y', 'direction', 'shape'])
-
-
- def plot_points(image, pred_points, line_thickness=3):
- """Plot marking points on the image."""
- if pred_points.size:
- tl = line_thickness or round(0.002 * (image.shape[0] + image.shape[1]) / 2) + 1 # line/font thickness
- tf = max(tl - 1, 1) # font thickness
- for conf, *point in pred_points:
- p0_x, p0_y = int(point[0]), int(point[1])
- cos_val = math.cos(point[2])
- sin_val = math.sin(point[2])
- p1_x = int(p0_x + 20 * cos_val * tl)
- p1_y = int(p0_y + 20 * sin_val * tl)
- p2_x = int(p0_x - 10 * sin_val * tl)
- p2_y = int(p0_y + 10 * cos_val * tl)
- p3_x = int(p0_x + 10 * sin_val * tl)
- p3_y = int(p0_y - 10 * cos_val * tl)
-
- cv2.line(image, (p0_x, p0_y), (p1_x, p1_y), (0, 0, 255), thickness=tl)
- cv2.putText(image, str(float(conf)), (p0_x, p0_y), cv2.FONT_HERSHEY_PLAIN, 1, (0, 0, 0), thickness=tf)
- if point[3] > 0.5:
- cv2.line(image, (p0_x, p0_y), (p2_x, p2_y), (0, 0, 255), thickness=tl)
- else:
- cv2.line(image, (p2_x, p2_y), (p3_x, p3_y), (0, 0, 255), thickness=tf)
-
-
- def preprocess_image(image):
- """Preprocess numpy image to torch tensor."""
- if image.shape[0] != 640 or image.shape[1] != 640:
- image = cv2.resize(image, (640, 640))
- return torch.unsqueeze(ToTensor()(image), 0)
-
- def non_maximum_suppression(pred_points):
- """Perform non-maxmum suppression on marking points."""
- suppressed = [False] * len(pred_points)
- for i in range(len(pred_points) - 1):
- for j in range(i + 1, len(pred_points)):
- i_x = pred_points[i][1].x
- i_y = pred_points[i][1].y
- j_x = pred_points[j][1].x
- j_y = pred_points[j][1].y
- # 0.0625 = 1 / 16
- if abs(j_x - i_x) < 0.0625 and abs(j_y - i_y) < 0.0625:
- idx = i if pred_points[i][0] < pred_points[j][0] else j
- suppressed[idx] = True
- if any(suppressed):
- unsupres_pred_points = []
- for i, supres in enumerate(suppressed):
- if not supres:
- unsupres_pred_points.append(pred_points[i])
- return unsupres_pred_points
- return pred_points
-
- def get_predicted_points(prediction, thresh):
- """Get marking points from one predicted feature map."""
- assert isinstance(prediction, torch.Tensor)
- predicted_points = []
- prediction = prediction.detach().cpu().numpy()
- for i in range(prediction.shape[1]):
- for j in range(prediction.shape[2]):
- if prediction[0, i, j] >= thresh:
- xval = (j + prediction[2, i, j]) / prediction.shape[2]
- yval = (i + prediction[3, i, j]) / prediction.shape[1]
- # if not (config.BOUNDARY_THRESH <= xval <= 1-config.BOUNDARY_THRESH
- # and config.BOUNDARY_THRESH <= yval <= 1-config.BOUNDARY_THRESH):
- # continue
- cos_value = prediction[4, i, j]
- sin_value = prediction[5, i, j]
- direction = math.atan2(sin_value, cos_value)
- marking_point = MarkingPoint(
- xval, yval, direction, prediction[1, i, j])
- predicted_points.append((prediction[0, i, j], marking_point))
- return non_maximum_suppression(predicted_points)
-
-
- def get_predicted_points2(prediction, thresh):
- """Get marking points from one predicted feature map."""
- assert isinstance(prediction, torch.Tensor)
-
- # predicted_points = []
- # prediction = prediction.detach().cpu().numpy()
- # for i in range(prediction.shape[1]):
- # for j in range(prediction.shape[2]):
- # if prediction[0, i, j] >= thresh:
- # xval = (j + prediction[2, i, j]) / prediction.shape[2]
- # yval = (i + prediction[3, i, j]) / prediction.shape[1]
- # # if not (config.BOUNDARY_THRESH <= xval <= 1-config.BOUNDARY_THRESH
- # # and config.BOUNDARY_THRESH <= yval <= 1-config.BOUNDARY_THRESH):
- # # continue
- # cos_value = prediction[4, i, j]
- # sin_value = prediction[5, i, j]
- # direction = math.atan2(sin_value, cos_value)
- # marking_point = MarkingPoint(
- # xval, yval, direction, prediction[1, i, j])
- # predicted_points.append((prediction[0, i, j], marking_point))
-
- prediction = prediction.permute(1, 2, 0).contiguous() # prediction (20, 20, 6)
- height = prediction.shape[0]
- width = prediction.shape[1]
-
- j = torch.arange(prediction.shape[1], device=prediction.device).float().repeat(prediction.shape[0], 1).unsqueeze(dim=2)
- i = torch.arange(prediction.shape[0], device=prediction.device).float().view(prediction.shape[0], 1).repeat(1, prediction.shape[1]).unsqueeze(dim=2)
- prediction = torch.cat((prediction, j, i), dim=2)
-
- # 过滤小于thresh的置信度
- prediction = prediction[prediction[..., 0] > thresh]
-
- prediction[..., 2] = (prediction[..., 2] + prediction[..., 6]) / width
- prediction[..., 3] = (prediction[..., 3] + prediction[..., 7]) / height
- direction = torch.atan2(prediction[..., 5], prediction[..., 4])
- prediction = torch.stack((prediction[..., 0], prediction[..., 2], prediction[..., 3], direction, prediction[..., 1]), dim=1)
-
- # return non_maximum_suppression(predicted_points)
- return prediction
-
-
- def detect_marking_points(detector, image, thresh, device):
- """Given image read from opencv, return detected marking points."""
- # t1 = time.time()
- # torch.cuda.synchronize(device)
- prediction = detector(preprocess_image(image).to(device))
- # torch.cuda.synchronize(device)
- # t2 = time.time()
- # print(f'detector: {t2 - t1:.3f}s')
- return get_predicted_points2(prediction[0], thresh)
-
- def scale_coords2(img1_shape, coords, img0_shape, ratio_pad=None):
- # Rescale coords (xy) from img1_shape to img0_shape
- if ratio_pad is None: # calculate from img0_shape
- gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
- pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
- else:
- gain = ratio_pad[0][0]
- pad = ratio_pad[1]
- # 百分比x,y转换为实际x,y
- height, width = img1_shape
- coords[:, 0] = torch.round(width * coords[:, 0] - 0.5)
- coords[:, 1] = torch.round(height * coords[:, 1] - 0.5)
-
- coords[:, 0] -= pad[0] # x padding
- coords[:, 1] -= pad[1] # y padding
- coords[:, :3] /= gain
- #恢复成原始图片尺寸
- coords[:, 0].clamp_(0, img0_shape[1])
- coords[:, 1].clamp_(0, img0_shape[0])
-
- return coords
-
-
- def DMPR_process(img0, model, device, args):
- height, width, _ = img0.shape
- img, ratio, (dw, dh) = letterbox(img0, args.dmprimg_size, auto=False)
-
- det = detect_marking_points(model, img, args.dmpr_thresh, device)
-
- # if not pred:
- # return torch.tensor([])
-
- # # 由list转为tensor
- # det = torch.tensor([[conf, *tup] for conf, tup in pred])
- if len(det):
- det[:, 1:3] = scale_coords2(img.shape[:2], det[:, 1:3], img0.shape)
- # conf, x, y, θ, shape
- return det
-
-
-
- if __name__ == '__main__':
- impath = r'I:\zjc\weiting1\Images'
- file = 'DJI_0001_8.jpg'
- imgpath = os.path.join(impath, file)
- img0 = cv2.imread(imgpath)
-
- device_ = '0'
- device = select_device(device_)
- args = config.get_parser_for_inference().parse_args()
- model = DirectionalPointDetector(3, args.depth_factor, config.NUM_FEATURE_MAP_CHANNEL).to(device)
- weights = r"E:\pycharmProject\DMPR-PS\weights\dp_detector_499.pth"
- model.load_state_dict(torch.load(weights))
-
- det = DMPR_process(img0, model, device, args)
-
- plot_points(img0, det)
-
- cv2.imwrite(file, img0, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
|