|
|
@@ -1,416 +1,464 @@ |
|
|
|
# 耗时最短代码 |
|
|
|
import numpy as np |
|
|
|
import cv2,time,math |
|
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
|
|
|
|
|
|
def get_ms(time2, time1): |
|
|
|
return (time2-time1)*1000.0 |
|
|
|
|
|
|
|
|
|
|
|
# 计算一点到二次函数曲线的距离,二次函数的表达式为x = a*(y**2) + b*y + c |
|
|
|
def point2QF(a, b, c, y, x): # 坐标点(y, x) |
|
|
|
distance = abs(x - a*(y**2) - b*y - c) / math.sqrt(1 + ((2*a*y + b)**2)) |
|
|
|
return distance |
|
|
|
|
|
|
|
|
|
|
|
# 存储所有speedRoad的contours |
|
|
|
def storageRoad(contours, pars): |
|
|
|
allRoadCnt = [] # 存储所有speedRoad的contours |
|
|
|
for cnt in contours: # 道路 |
|
|
|
if len(cnt) >= 6: |
|
|
|
rect = cv2.minAreaRect(cnt) |
|
|
|
if rect[1][0] * rect[1][1] > pars['RoadArea']: # 过滤掉面积小于阈值的speedRoad |
|
|
|
allRoadCnt.append(cnt) |
|
|
|
return allRoadCnt |
|
|
|
|
|
|
|
|
|
|
|
# 用于判断lane数量是否大于2,当lane数量大于等于2时,返回laneNumber |
|
|
|
def storageLane(contours, pars): |
|
|
|
laneNumber = 0 |
|
|
|
for cnt in contours: |
|
|
|
if len(cnt) >= 6: |
|
|
|
rect = cv2.minAreaRect(cnt) |
|
|
|
if rect[1][0] * rect[1][1] > pars['laneArea'] and min(rect[1]) / max(rect[1]) <= pars['roundness']: |
|
|
|
laneNumber += 1 |
|
|
|
if laneNumber > 2: |
|
|
|
break |
|
|
|
return laneNumber |
|
|
|
|
|
|
|
|
|
|
|
# 将contours中顶点数大于等于6的车辆信息(合格vehicle)和顶点数小于6的车辆信息(不合格vehicle)分别保存起来 |
|
|
|
def vehicleDivide(contours, vehicleBD, normVehicle, dets, count, i, unnormVehicle, normVehicleCOOR, centerCOOR): |
|
|
|
if len(contours) >= 6: |
|
|
|
vehicleBD.append(contours) |
|
|
|
normVehicle.append(dets[count]) |
|
|
|
normVehicleCOOR.append(centerCOOR) |
|
|
|
else: |
|
|
|
dets[int(i / 2)].append(0) |
|
|
|
dets[int(i / 2)].append(0) |
|
|
|
unnormVehicle.append(dets[int(i / 2)]) |
|
|
|
return vehicleBD, normVehicle, unnormVehicle, normVehicleCOOR |
|
|
|
|
|
|
|
|
|
|
|
# 存储所有vehicle的信息 |
|
|
|
def storageVehicle(pars, imgVehicle, dets): |
|
|
|
""" |
|
|
|
输入 |
|
|
|
pars:字典名 |
|
|
|
imgVehicle:分割图,只包含vehicle和背景 |
|
|
|
dets:是一个list,其中存储检测得到的各vehicle的信息,即[[x0, y0, x1, y1, 车辆得分, cls], ...] |
|
|
|
输出 |
|
|
|
dets:存储合格vehicle的信息,即[x0, y0, x1, y1, 车辆得分, cls] |
|
|
|
vehicleBD:存储合格vehicle的contours |
|
|
|
unnormVehicle:存储不合格vehicle的信息,即[x0, y0, x1, y1, 车辆得分, cls] |
|
|
|
normVehicleCOOR:存储合格vehicle的中心点坐标 |
|
|
|
说明 |
|
|
|
合格vehicle:contours中的顶点数大于等于6 |
|
|
|
不合格vehicle:contours中的顶点数小于6 |
|
|
|
""" |
|
|
|
vehicleBD = [] # 存储一副图像中vehicles的contours |
|
|
|
normVehicle = [] # 将合格vehicle的信息存储在normVehicle中 |
|
|
|
unnormVehicle = [] # 将不合格vehicle的信息存储在unnormVehicle中 |
|
|
|
normVehicleCOOR = [] # 存储合格vehicle的中心点坐标 |
|
|
|
img = cv2.cvtColor(imgVehicle, cv2.COLOR_BGR2GRAY) |
|
|
|
count = 0 |
|
|
|
for i in range(0, len(pars['vehicleCOOR']), 2): |
|
|
|
y1 = int(pars['vehicleCOOR'][i][1] * pars['ZoomFactor']['y']) |
|
|
|
y2 = int(pars['vehicleCOOR'][i + 1][1] * pars['ZoomFactor']['y']) |
|
|
|
x1 = int(pars['vehicleCOOR'][i][0] * pars['ZoomFactor']['x']) |
|
|
|
x2 = int(pars['vehicleCOOR'][i + 1][0] * pars['ZoomFactor']['x']) |
|
|
|
if y1 >= 2: |
|
|
|
y1 = y1 - 2 |
|
|
|
if y2 <= (pars['modelSize'][1] - 2): |
|
|
|
y2 = y2 + 2 |
|
|
|
if x1 >= 2: |
|
|
|
x1 = x1 - 2 |
|
|
|
if x2 <= (pars['modelSize'][0] - 2): |
|
|
|
x2 = x2 + 2 |
|
|
|
centerCOOR = (int((x1 + x2) / 2), int((y1 + y2) / 2)) |
|
|
|
img1 = img[y1:y2, x1:x2] |
|
|
|
up = np.zeros((20, (x2 - x1)), dtype='uint8') |
|
|
|
left = np.zeros(((40 + y2 - y1), 20), dtype='uint8') |
|
|
|
img1 = np.concatenate((up, img1), axis=0) |
|
|
|
img1 = np.concatenate((img1, up), axis=0) |
|
|
|
img1 = np.concatenate((left, img1), axis=1) |
|
|
|
img2 = np.concatenate((img1, left), axis=1) |
|
|
|
contours2, hierarchy = cv2.findContours(img2, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) |
|
|
|
if len(contours2) != 0: |
|
|
|
if len(contours2) > 1: |
|
|
|
vehicleArea = [] # 存储vehicle的最小外接矩形的面积 |
|
|
|
for j in range(len(contours2)): |
|
|
|
rect = cv2.minAreaRect(contours2[j]) |
|
|
|
vehicleArea.append(rect[1][0] * rect[1][1]) |
|
|
|
maxAreaIndex = vehicleArea.index(max(vehicleArea)) |
|
|
|
maxAreaContours = contours2[maxAreaIndex] |
|
|
|
vehicleBD, normVehicle, unnormVehicle, normVehicleCOOR = vehicleDivide(maxAreaContours, vehicleBD, normVehicle, dets, count, i, unnormVehicle, normVehicleCOOR, centerCOOR) |
|
|
|
elif len(contours2) == 1: |
|
|
|
vehicleBD, normVehicle, unnormVehicle, normVehicleCOOR = vehicleDivide(contours2[0], vehicleBD, normVehicle, dets, count, i, unnormVehicle, normVehicleCOOR, centerCOOR) |
|
|
|
else: |
|
|
|
dets[int(i / 2)].append(0) |
|
|
|
dets[int(i / 2)].append(0) |
|
|
|
unnormVehicle.append(dets[int(i / 2)]) |
|
|
|
count += 1 |
|
|
|
dets = normVehicle |
|
|
|
return dets, vehicleBD, unnormVehicle, normVehicleCOOR |
|
|
|
|
|
|
|
|
|
|
|
# 计算违停得分 |
|
|
|
def IllegalParkScore(vehicleBD, allRoadCnt, dets, laneNumber, unnormVehicle, normVehicleCOOR, a_l, b_l, c_l, a_r, b_r, c_r): |
|
|
|
""" |
|
|
|
对vehicle是否在speedRoad上进行判断,并计算违章得分 |
|
|
|
输出targetList 其格式为:[[cls, x0, y0, x1, y1, score, 违章得分, 违章类别], ...] |
|
|
|
""" |
|
|
|
if len(vehicleBD) != 0: |
|
|
|
for i in range(len(vehicleBD)): |
|
|
|
rect = cv2.minAreaRect(vehicleBD[i]) |
|
|
|
center = normVehicleCOOR[i] # vehicle的中心点坐标 |
|
|
|
if len(allRoadCnt) != 0 and laneNumber >= 2: # 当车道线个数至少有两条时,才计算违章得分 |
|
|
|
for j in range(len(allRoadCnt)): |
|
|
|
# 判断车辆矩形框的中心点坐标是否在道路矩形框的范围内 |
|
|
|
flag = cv2.pointPolygonTest(allRoadCnt[j], center, False) |
|
|
|
if flag >= 0: |
|
|
|
dets[i].append(0) # 给违章得分占位 |
|
|
|
dets[i].append(0) # 给违章类别占位 |
|
|
|
if center[0] < predict(a_l, b_l, c_l, center[1]): |
|
|
|
distance = point2QF(a_l, b_l, c_l, center[1], center[0]) |
|
|
|
if distance >= min(rect[1]) / 2: |
|
|
|
dets[i][6], dets[i][7] = 1, 1 |
|
|
|
else: |
|
|
|
dets[i][6], dets[i][7] = distance / (min(rect[1]) / 2), 1 |
|
|
|
elif center[0] > predict(a_r, b_r, c_r, center[1]): |
|
|
|
distance = point2QF(a_r, b_r, c_r, center[1], center[0]) |
|
|
|
if distance >= min(rect[1]) / 2: |
|
|
|
dets[i][6], dets[i][7] = 1, 1 |
|
|
|
else: |
|
|
|
dets[i][6], dets[i][7] = distance / (min(rect[1]) / 2), 1 |
|
|
|
else: |
|
|
|
dets[i][6], dets[i][7] = 0, 0 |
|
|
|
break |
|
|
|
# 如果分割图像中不存在speedRoad,则无法进行违章判定,将所有车辆的违章类别设为0,即没有违章 |
|
|
|
if len(dets[i]) < 8: |
|
|
|
dets[i].append(0) # 违章得分为0 |
|
|
|
dets[i].append(0) # 0表示没有违章 |
|
|
|
targetList = dets |
|
|
|
if len(unnormVehicle) != 0: |
|
|
|
for i in range(len(unnormVehicle)): |
|
|
|
targetList.append(unnormVehicle[i]) # 将所有车辆的信息合并到一起 |
|
|
|
else: |
|
|
|
targetList = unnormVehicle |
|
|
|
return targetList |
|
|
|
|
|
|
|
|
|
|
|
# 找最左侧lane时,将要删除的右侧lane的序号存储在delRightLane。找最右侧lane时,将要删除的左侧lane的序号存储在delLeftLane。 |
|
|
|
def devideLane(laneInfo, i, m, delRightLane, delLeftLane, y): |
|
|
|
index1 = np.where(laneInfo[i][3] == y) |
|
|
|
index1 = index1[0].tolist() |
|
|
|
index1.sort() |
|
|
|
x_1 = laneInfo[i][5][index1[0]][0] |
|
|
|
index2 = np.where(laneInfo[m][3] == y) |
|
|
|
index2 = index2[0].tolist() |
|
|
|
index2.sort() |
|
|
|
x_2 = laneInfo[m][5][index2[0]][0] |
|
|
|
if x_1 < x_2: |
|
|
|
if i not in delLeftLane: |
|
|
|
delLeftLane.append(i) # 保留右侧lane |
|
|
|
if m not in delRightLane: |
|
|
|
delRightLane.append(m) |
|
|
|
else: |
|
|
|
if m not in delLeftLane: |
|
|
|
delLeftLane.append(m) |
|
|
|
if i not in delRightLane: |
|
|
|
delRightLane.append(i) |
|
|
|
|
|
|
|
return delRightLane, delLeftLane |
|
|
|
|
|
|
|
|
|
|
|
# 确定最左侧和最右侧的lane簇 |
|
|
|
def detLine(contours): |
|
|
|
""" |
|
|
|
输入 |
|
|
|
contours:各lane的contours |
|
|
|
输出 |
|
|
|
laneInfo:存储各lane的信息,每条lane的信息为:[contours, y坐标范围, lane序号, arr_y, y坐标范围的长度, cnt] |
|
|
|
delRightLane:在确定最左侧lane时,其存储需要删除的lane的序号 |
|
|
|
delLeftLane:在确定最右侧lane时,其存储需要删除的lane的序号 |
|
|
|
""" |
|
|
|
mergList = [] |
|
|
|
for i in range(len(contours)): |
|
|
|
cnt = np.squeeze(contours[i], 1) |
|
|
|
arr_y = cnt[:, 1] |
|
|
|
arrList = list(set(arr_y)) |
|
|
|
cnt_y = np.sort(np.array(arrList)) |
|
|
|
mergList.append([contours[i], cnt_y, i, arr_y, len(cnt_y), cnt]) |
|
|
|
laneInfo = sorted(mergList, key=(lambda x: x[4])) # [[contours[i], cnt_y, i, arr_y, len(cnt_y)],...] |
|
|
|
delRightLane = [] # 求最左侧lane |
|
|
|
delLeftLane = [] # 求最右侧lane |
|
|
|
laneInfoNew = [] |
|
|
|
for i in range(len(laneInfo)): |
|
|
|
laneInfoNew.append([laneInfo[i][1][0], laneInfo[i][1][-1], i]) # [[y_min, y_max, i],...] |
|
|
|
laneInfoNew = np.array(laneInfoNew) |
|
|
|
new1 = laneInfoNew[:, np.newaxis, :].repeat(laneInfoNew.shape[0], 1) |
|
|
|
new2 = laneInfoNew[np.newaxis, ...].repeat(laneInfoNew.shape[0], 0) |
|
|
|
new3 = np.concatenate((new1, new2), axis=2) |
|
|
|
y_i_min, y_i_max, y_m_min, y_m_max = new3[..., 0], new3[..., 1], new3[..., 3], new3[..., 4] |
|
|
|
mask1 = (y_i_min >= y_m_min) & (y_i_min <= y_m_max) & (y_i_max > y_m_max) |
|
|
|
mask2 = (y_i_max >= y_m_min) & (y_i_max <= y_m_max) & (y_i_min < y_m_min) |
|
|
|
mask3 = (y_i_min >= y_m_min) & (y_i_max <= y_m_max) |
|
|
|
mask4 = (y_i_min < y_m_min) & (y_i_max > y_m_max) |
|
|
|
if len(np.nonzero(mask1)[0]) != 0: |
|
|
|
mask1 = np.triu(mask1, k=1) |
|
|
|
serial_i = new3[mask1][..., 2] |
|
|
|
serial_m = new3[mask1][..., 5] |
|
|
|
for k in range(len(serial_i)): |
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane): |
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_i[k]][1][0]) |
|
|
|
|
|
|
|
if len(np.nonzero(mask2)[0]) != 0: |
|
|
|
mask2 = np.triu(mask2, k=1) |
|
|
|
serial_i = new3[mask2][..., 2] |
|
|
|
serial_m = new3[mask2][..., 5] |
|
|
|
for k in range(len(serial_i)): |
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane): |
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_i[k]][1][-1]) |
|
|
|
|
|
|
|
if len(np.nonzero(mask3)[0]) != 0: |
|
|
|
mask3 = np.triu(mask3, k=1) |
|
|
|
serial_i = new3[mask3][..., 2] |
|
|
|
serial_m = new3[mask3][..., 5] |
|
|
|
for k in range(len(serial_i)): |
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane): |
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_i[k]][1][0]) |
|
|
|
|
|
|
|
if len(np.nonzero(mask4)[0]) != 0: |
|
|
|
mask4 = np.triu(mask4, k=1) |
|
|
|
serial_i = new3[mask4][..., 2] |
|
|
|
serial_m = new3[mask4][..., 5] |
|
|
|
for k in range(len(serial_i)): |
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane): |
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_m[k]][1][0]) |
|
|
|
return laneInfo, delRightLane, delLeftLane |
|
|
|
|
|
|
|
|
|
|
|
# 对lane中的y值坐标进行下采样 |
|
|
|
def downSample(cnt_y): |
|
|
|
# number = len(cnt_y) * 0.0125 |
|
|
|
# cnt_y = np.random.choice(cnt_y, size=number, replace=False) |
|
|
|
if len(cnt_y) >= 1000: |
|
|
|
cnt_y = cnt_y[1::80] |
|
|
|
elif len(cnt_y) >= 900 and len(cnt_y) < 1000: |
|
|
|
cnt_y = cnt_y[1::75] |
|
|
|
elif len(cnt_y) >= 800 and len(cnt_y) < 900: |
|
|
|
cnt_y = cnt_y[1::70] |
|
|
|
elif len(cnt_y) >= 700 and len(cnt_y) < 800: |
|
|
|
cnt_y = cnt_y[1::65] |
|
|
|
elif len(cnt_y) >= 600 and len(cnt_y) < 700: |
|
|
|
cnt_y = cnt_y[1::60] |
|
|
|
elif len(cnt_y) >= 500 and len(cnt_y) < 600: |
|
|
|
cnt_y = cnt_y[1::55] |
|
|
|
elif len(cnt_y) >= 400 and len(cnt_y) < 500: |
|
|
|
cnt_y = cnt_y[1::40] |
|
|
|
elif len(cnt_y) >= 300 and len(cnt_y) < 400: |
|
|
|
cnt_y = cnt_y[1::45] |
|
|
|
elif len(cnt_y) >= 200 and len(cnt_y) < 300: |
|
|
|
cnt_y = cnt_y[1::40] |
|
|
|
elif len(cnt_y) >= 100 and len(cnt_y) < 200: |
|
|
|
cnt_y = cnt_y[1::35] |
|
|
|
elif len(cnt_y) >= 50 and len(cnt_y) < 100: |
|
|
|
cnt_y = cnt_y[1::20] |
|
|
|
elif len(cnt_y) >= 20 and len(cnt_y) < 50: |
|
|
|
cnt_y = cnt_y[1::6] |
|
|
|
else: |
|
|
|
cnt_y = cnt_y[1::5] |
|
|
|
return cnt_y |
|
|
|
|
|
|
|
|
|
|
|
# 求最左侧lane或最右侧lane中的各点坐标 |
|
|
|
def targetCOOR(laneInfo, delLane): |
|
|
|
""" |
|
|
|
输入 |
|
|
|
laneInfo:存储各lane的信息,每条lane的信息为:[contours, y坐标范围, lane序号, arr_y, y坐标范围的长度, cnt] |
|
|
|
delLane:在确定最左侧lane或最右侧lane时,其存储需要删除的lane的序号。 |
|
|
|
输出 |
|
|
|
laneCOOR:存储最左侧或最右侧lane簇中各点的坐标 |
|
|
|
""" |
|
|
|
laneCOOR = [] # 存储lane中各点的坐标 |
|
|
|
centerSort = [] # 存储各lane按照中心点的y坐标排序后的结果 |
|
|
|
for j in range(len(laneInfo)): |
|
|
|
if j not in delLane: |
|
|
|
cnt = laneInfo[j][0] |
|
|
|
rect = cv2.minAreaRect(cnt) |
|
|
|
cnt = np.squeeze(cnt, 1) |
|
|
|
cnt_y = laneInfo[j][1] |
|
|
|
cnt_y = downSample(cnt_y) |
|
|
|
centerSort.append([rect[0][1], cnt_y, laneInfo[j][3], cnt, j]) |
|
|
|
centerSort = sorted(centerSort, key=(lambda x: x[0])) |
|
|
|
for i in range(len(centerSort)): |
|
|
|
centerCoordinate = [] |
|
|
|
for j in range(len(centerSort[i][1])): |
|
|
|
index = np.where(centerSort[i][2] == centerSort[i][1][j]) |
|
|
|
indexList = index[0].tolist() |
|
|
|
indexList.sort() |
|
|
|
x = (centerSort[i][3][indexList[0]][0] + centerSort[i][3][indexList[-1]][0]) / 2 |
|
|
|
y = (centerSort[i][3][indexList[0]][1] + centerSort[i][3][indexList[-1]][1]) / 2 |
|
|
|
centerCoordinate.append([x, y]) |
|
|
|
laneCOOR = laneCOOR + centerCoordinate |
|
|
|
return laneCOOR |
|
|
|
|
|
|
|
|
|
|
|
# 二次函数曲线表达式:x = a*(y**2) + b*y + c,根据图像中一点的y坐标求二次曲线中的x坐标 |
|
|
|
def predict(a, b, c, y): |
|
|
|
x = a * (y**2) + b * y + c |
|
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
def mixNoParking_road_postprocess(dets, mask, pars): |
|
|
|
""" |
|
|
|
对于字典traffic_dict中的各个键,说明如下: |
|
|
|
RoadArea:speedRoad的最小外接矩形的面积 |
|
|
|
vehicleCOOR:是一个列表,用于存储被检测出的vehicle的坐标(vehicle检测模型) |
|
|
|
roundness:圆度 ,lane的长与宽的比率,作为判定是否为车道线的标准之一 |
|
|
|
laneArea:车道线的最小外接矩形的面积 |
|
|
|
ZoomFactor:图像在H和W方向上的缩放因子,其值小于1 |
|
|
|
fitOrder:多点拟合曲线的阶数 |
|
|
|
最终输出格式:[[x0, y0, x1, y1, 车辆得分, cls, 违章停车得分, 违章类别], ...] |
|
|
|
违章类别:0表示正常车辆,1表示违章车辆 |
|
|
|
""" |
|
|
|
det_cors = [] |
|
|
|
for bb in dets: |
|
|
|
det_cors.append((int(bb[0]), int(bb[1]))) |
|
|
|
det_cors.append((int(bb[2]), int(bb[3]))) |
|
|
|
#print('#################line341:',det_cors) |
|
|
|
pars['vehicleCOOR'] = det_cors |
|
|
|
H, W = mask.shape[0:2] # mask的分辨率为360x640 |
|
|
|
scaleH = pars['modelSize'][1] / H # 自适应调整缩放比例 |
|
|
|
scaleW = pars['modelSize'][0] / W |
|
|
|
pars['ZoomFactor'] = {'x': scaleW, 'y': scaleH} |
|
|
|
new_hw = [int(H * scaleH), int(W * scaleW)] |
|
|
|
mask = cv2.resize(mask, (new_hw[1], new_hw[0])) |
|
|
|
if len(mask.shape) == 3: |
|
|
|
mask = mask[:, :, 0] |
|
|
|
|
|
|
|
t1 = time.time() |
|
|
|
imgRoad = mask.copy() |
|
|
|
imgVehicle = mask.copy() |
|
|
|
lane_line = mask.copy() |
|
|
|
# 将vehicle和lane过滤掉,只包含背景和speedRoad |
|
|
|
imgRoad[imgRoad == 2] = 1 |
|
|
|
imgRoad[imgRoad == 3] = 1 |
|
|
|
# 将speedRoad和lane过滤掉,只保留vehicle和背景 |
|
|
|
imgVehicle[imgVehicle != 2] = 0 |
|
|
|
# 将speedRoad和vehicle过滤掉,只保留lane和背景 |
|
|
|
lane_line[lane_line < 3] = 0 |
|
|
|
imgRoad = cv2.cvtColor(np.uint8(imgRoad), cv2.COLOR_RGB2BGR) # 道路 |
|
|
|
imgVehicle = cv2.cvtColor(np.uint8(imgVehicle), cv2.COLOR_RGB2BGR) # 车辆 |
|
|
|
lane_line = cv2.cvtColor(np.uint8(lane_line), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
# 对车道线进行膨胀操作 |
|
|
|
# kernel = np.ones((3, 3), np.uint8) # 膨胀范围 |
|
|
|
# lane_line = cv2.dilate(lane_line, kernel, iterations=2) # 迭代次数为2 |
|
|
|
|
|
|
|
t2 = time.time() |
|
|
|
img1 = cv2.cvtColor(imgRoad, cv2.COLOR_BGR2GRAY) |
|
|
|
roadContours, hierarchy = cv2.findContours(img1, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) |
|
|
|
t3 = time.time() |
|
|
|
# 存储所有speedRoad的信息 |
|
|
|
allRoadCnt = storageRoad(roadContours, pars) |
|
|
|
t4 = time.time() |
|
|
|
img3 = cv2.cvtColor(lane_line, cv2.COLOR_BGR2GRAY) |
|
|
|
laneContours, hierarchy = cv2.findContours(img3, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) |
|
|
|
t5 = time.time() |
|
|
|
laneInfo, delRightLane, delLeftLane = detLine(laneContours) |
|
|
|
t6 = time.time() |
|
|
|
# 存储所有lane的信息 |
|
|
|
laneNumber = storageLane(laneContours, pars) |
|
|
|
t7 = time.time() |
|
|
|
# 存储所有vehicle的信息 |
|
|
|
dets, vehicleBD, unnormVehicle, normVehicleCOOR = storageVehicle(pars, imgVehicle, dets) |
|
|
|
t8 = time.time() |
|
|
|
leftLaneCOOR = targetCOOR(laneInfo, delRightLane) |
|
|
|
rightLaneCOOR = targetCOOR(laneInfo, delLeftLane) |
|
|
|
rightLaneCOOR = np.array(rightLaneCOOR) |
|
|
|
rightX = rightLaneCOOR[:, 0] |
|
|
|
rightY = rightLaneCOOR[:, 1] |
|
|
|
leftLaneCOOR = np.array(leftLaneCOOR) |
|
|
|
leftX = leftLaneCOOR[:, 0] |
|
|
|
leftY = leftLaneCOOR[:, 1] |
|
|
|
# a_r,b_r,c_r分别是:最右侧车道线簇拟合的二次函数的二次项系数一次项系数,和常数项 |
|
|
|
a_r, b_r, c_r = np.polyfit(rightY, rightX, pars['fitOrder'])[0], np.polyfit(rightY, rightX, pars['fitOrder'])[1], np.polyfit(rightY, rightX, pars['fitOrder'])[2] |
|
|
|
# a_l,b_l,c_l分别是:最左侧车道线簇拟合的二次函数的二次项系数,一次项系数,和常数项 |
|
|
|
a_l, b_l, c_l = np.polyfit(leftY, leftX, pars['fitOrder'])[0], np.polyfit(leftY, leftX, pars['fitOrder'])[1], np.polyfit(leftY, leftX, pars['fitOrder'])[2] |
|
|
|
|
|
|
|
"""以下四行代码用于在后处理函数外画图""" |
|
|
|
finalLane = [] |
|
|
|
abc = [a_l, b_l, c_l, a_r, b_r, c_r] # abc中存储的是最左侧和最右侧二次函数的各项系数 |
|
|
|
finalLane.append(rightLaneCOOR) |
|
|
|
finalLane.append(leftLaneCOOR) |
|
|
|
t9 = time.time() |
|
|
|
# 计算违停得分 |
|
|
|
targetList = IllegalParkScore(vehicleBD, allRoadCnt, dets, laneNumber, unnormVehicle, normVehicleCOOR, a_l, b_l, c_l, a_r, b_r, c_r) |
|
|
|
t10 = time.time() |
|
|
|
time_infos = 'postTime:%.2f(分割时间:%.2f, findContours:%.2f, ruleJudge:%.2f, storageRoad:%.2f, detLane:%.2f, storageLane:%.2f, storageVehicle:%.2f, fitLine:%.2f, IllegalParkScore:%.2f)' % ( |
|
|
|
get_ms(t10, t1), get_ms(t2, t1), get_ms(t3, t2), get_ms(t8, t3), get_ms(t4, t3), get_ms(t6, t5), get_ms(t7, t6), get_ms(t8, t7), get_ms(t9, t8), get_ms(t10, t9)) |
|
|
|
#, finalLane, lane_line, abc |
|
|
|
targetList = [ [ *b[0:4],b[6] if b[6]>0 else b[4], b[7] ] for b in targetList ] |
|
|
|
#print('---------line415:',targetList) |
|
|
|
return targetList, time_infos |
|
|
|
# 耗时最短代码
|
|
|
|
import numpy as np
|
|
|
|
import cv2,time,math
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
|
|
|
|
|
|
def get_ms(time2, time1):
|
|
|
|
return (time2-time1)*1000.0
|
|
|
|
|
|
|
|
|
|
|
|
# 计算一点到二次函数曲线的距离,二次函数的表达式为x = a*(y**2) + b*y + c
|
|
|
|
def point2QF(a, b, c, y, x): # 坐标点(y, x)
|
|
|
|
distance = abs(x - a*(y**2) - b*y - c) / math.sqrt(1 + ((2*a*y + b)**2))
|
|
|
|
return distance
|
|
|
|
|
|
|
|
|
|
|
|
# 存储所有speedRoad的contours
|
|
|
|
def storageRoad(contours, pars):
|
|
|
|
allRoadCnt = [] # 存储所有speedRoad的contours
|
|
|
|
for cnt in contours: # 道路
|
|
|
|
if len(cnt) >= 6:
|
|
|
|
rect = cv2.minAreaRect(cnt)
|
|
|
|
if rect[1][0] * rect[1][1] > pars['RoadArea']: # 过滤掉面积小于阈值的speedRoad
|
|
|
|
allRoadCnt.append(cnt)
|
|
|
|
return allRoadCnt
|
|
|
|
|
|
|
|
|
|
|
|
# 返回符合标准的lane的个数及contours
|
|
|
|
def storageLane(contours, pars):
|
|
|
|
"""
|
|
|
|
contours:lane分割后的原始contours
|
|
|
|
newLaneContours:符合标准的lane的contours
|
|
|
|
laneNumber:符合标准的lane的个数
|
|
|
|
符合标准的lane定义如下:
|
|
|
|
(1)contours中的坐标点个数不小于6
|
|
|
|
(2)lane最小外接矩形的面积大于阈值laneArea
|
|
|
|
(3)lane最小外接矩形的最短边与最长边的比值小于等于阈值roundness
|
|
|
|
"""
|
|
|
|
laneNumber = 0
|
|
|
|
newLaneContours = ()
|
|
|
|
for cnt in contours:
|
|
|
|
if len(cnt) >= 6:
|
|
|
|
rect = cv2.minAreaRect(cnt)
|
|
|
|
if rect[1][0] * rect[1][1] > pars['laneArea'] and min(rect[1]) / max(rect[1]) <= pars['roundness']:
|
|
|
|
laneNumber += 1
|
|
|
|
newLaneContours = newLaneContours + (cnt, )
|
|
|
|
return laneNumber, newLaneContours
|
|
|
|
|
|
|
|
|
|
|
|
# 将contours中顶点数大于等于6的车辆信息(合格vehicle)和顶点数小于6的车辆信息(不合格vehicle)分别保存起来
|
|
|
|
def vehicleDivide(contours, vehicleBD, normVehicle, dets, count, i, unnormVehicle, normVehicleCOOR, centerCOOR):
|
|
|
|
if len(contours) >= 6:
|
|
|
|
vehicleBD.append(contours)
|
|
|
|
normVehicle.append(dets[count])
|
|
|
|
normVehicleCOOR.append(centerCOOR)
|
|
|
|
else:
|
|
|
|
dets[int(i / 2)].append(0)
|
|
|
|
dets[int(i / 2)].append(0)
|
|
|
|
unnormVehicle.append(dets[int(i / 2)])
|
|
|
|
return vehicleBD, normVehicle, unnormVehicle, normVehicleCOOR
|
|
|
|
|
|
|
|
|
|
|
|
# 存储所有vehicle的信息
|
|
|
|
def storageVehicle(pars, imgVehicle, dets):
|
|
|
|
"""
|
|
|
|
输入
|
|
|
|
pars:字典名
|
|
|
|
imgVehicle:分割图,只包含vehicle和背景
|
|
|
|
dets:是一个list,其中存储检测得到的各vehicle的信息,即[[x0, y0, x1, y1, 车辆得分, cls], ...]
|
|
|
|
输出
|
|
|
|
dets:存储合格vehicle的信息,即[x0, y0, x1, y1, 车辆得分, cls]
|
|
|
|
vehicleBD:存储合格vehicle的contours
|
|
|
|
unnormVehicle:存储不合格vehicle的信息,即[x0, y0, x1, y1, 车辆得分, cls]
|
|
|
|
normVehicleCOOR:存储合格vehicle的中心点坐标
|
|
|
|
说明
|
|
|
|
合格vehicle:contours中的顶点数大于等于6
|
|
|
|
不合格vehicle:contours中的顶点数小于6
|
|
|
|
"""
|
|
|
|
vehicleBD = [] # 存储一副图像中vehicles的contours
|
|
|
|
normVehicle = [] # 将合格vehicle的信息存储在normVehicle中
|
|
|
|
unnormVehicle = [] # 将不合格vehicle的信息存储在unnormVehicle中
|
|
|
|
normVehicleCOOR = [] # 存储合格vehicle的中心点坐标
|
|
|
|
img = cv2.cvtColor(imgVehicle, cv2.COLOR_BGR2GRAY)
|
|
|
|
count = 0
|
|
|
|
for i in range(0, len(pars['vehicleCOOR']), 2):
|
|
|
|
y1 = int(pars['vehicleCOOR'][i][1] * pars['ZoomFactor']['y'])
|
|
|
|
y2 = int(pars['vehicleCOOR'][i + 1][1] * pars['ZoomFactor']['y'])
|
|
|
|
x1 = int(pars['vehicleCOOR'][i][0] * pars['ZoomFactor']['x'])
|
|
|
|
x2 = int(pars['vehicleCOOR'][i + 1][0] * pars['ZoomFactor']['x'])
|
|
|
|
if y1 >= 2:
|
|
|
|
y1 = y1 - 2
|
|
|
|
if y2 <= (pars['modelSize'][1] - 2):
|
|
|
|
y2 = y2 + 2
|
|
|
|
if x1 >= 2:
|
|
|
|
x1 = x1 - 2
|
|
|
|
if x2 <= (pars['modelSize'][0] - 2):
|
|
|
|
x2 = x2 + 2
|
|
|
|
centerCOOR = (int((x1 + x2) / 2), int((y1 + y2) / 2))
|
|
|
|
img1 = img[y1:y2, x1:x2]
|
|
|
|
up = np.zeros((20, (x2 - x1)), dtype='uint8')
|
|
|
|
left = np.zeros(((40 + y2 - y1), 20), dtype='uint8')
|
|
|
|
img1 = np.concatenate((up, img1), axis=0)
|
|
|
|
img1 = np.concatenate((img1, up), axis=0)
|
|
|
|
img1 = np.concatenate((left, img1), axis=1)
|
|
|
|
img2 = np.concatenate((img1, left), axis=1)
|
|
|
|
contours2, hierarchy = cv2.findContours(img2, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
|
|
|
|
if len(contours2) != 0:
|
|
|
|
if len(contours2) > 1:
|
|
|
|
vehicleArea = [] # 存储vehicle的最小外接矩形的面积
|
|
|
|
for j in range(len(contours2)):
|
|
|
|
rect = cv2.minAreaRect(contours2[j])
|
|
|
|
vehicleArea.append(rect[1][0] * rect[1][1])
|
|
|
|
maxAreaIndex = vehicleArea.index(max(vehicleArea))
|
|
|
|
maxAreaContours = contours2[maxAreaIndex]
|
|
|
|
vehicleBD, normVehicle, unnormVehicle, normVehicleCOOR = vehicleDivide(maxAreaContours, vehicleBD, normVehicle, dets, count, i, unnormVehicle, normVehicleCOOR, centerCOOR)
|
|
|
|
elif len(contours2) == 1:
|
|
|
|
vehicleBD, normVehicle, unnormVehicle, normVehicleCOOR = vehicleDivide(contours2[0], vehicleBD, normVehicle, dets, count, i, unnormVehicle, normVehicleCOOR, centerCOOR)
|
|
|
|
else:
|
|
|
|
dets[int(i / 2)].append(0)
|
|
|
|
dets[int(i / 2)].append(0)
|
|
|
|
unnormVehicle.append(dets[int(i / 2)])
|
|
|
|
count += 1
|
|
|
|
dets = normVehicle
|
|
|
|
return dets, vehicleBD, unnormVehicle, normVehicleCOOR
|
|
|
|
|
|
|
|
|
|
|
|
# 计算违停得分
|
|
|
|
def IllegalParkScore1(vehicleBD, allRoadCnt, dets, unnormVehicle, normVehicleCOOR, a_l, b_l, c_l, a_r, b_r, c_r):
|
|
|
|
"""
|
|
|
|
对vehicle是否在speedRoad上进行判断,并计算违章得分
|
|
|
|
输出targetList 其格式为:[[cls, x0, y0, x1, y1, score, 违章得分, 违章类别], ...]
|
|
|
|
"""
|
|
|
|
if len(vehicleBD) != 0:
|
|
|
|
for i in range(len(vehicleBD)):
|
|
|
|
rect = cv2.minAreaRect(vehicleBD[i])
|
|
|
|
center = normVehicleCOOR[i] # vehicle的中心点坐标
|
|
|
|
if len(allRoadCnt) != 0: # 当车道线个数至少有两条时,才计算违章得分
|
|
|
|
for j in range(len(allRoadCnt)):
|
|
|
|
# 判断车辆矩形框的中心点坐标是否在道路矩形框的范围内
|
|
|
|
flag = cv2.pointPolygonTest(allRoadCnt[j], center, False)
|
|
|
|
if flag >= 0:
|
|
|
|
dets[i].append(0) # 给违章得分占位
|
|
|
|
dets[i].append(0) # 给违章类别占位
|
|
|
|
if center[0] < predict(a_l, b_l, c_l, center[1]):
|
|
|
|
distance = point2QF(a_l, b_l, c_l, center[1], center[0])
|
|
|
|
if distance >= min(rect[1]) / 2:
|
|
|
|
dets[i][6], dets[i][7] = 1, 1
|
|
|
|
else:
|
|
|
|
dets[i][6], dets[i][7] = distance / (min(rect[1]) / 2), 1
|
|
|
|
elif center[0] > predict(a_r, b_r, c_r, center[1]):
|
|
|
|
distance = point2QF(a_r, b_r, c_r, center[1], center[0])
|
|
|
|
if distance >= min(rect[1]) / 2:
|
|
|
|
dets[i][6], dets[i][7] = 1, 1
|
|
|
|
else:
|
|
|
|
dets[i][6], dets[i][7] = distance / (min(rect[1]) / 2), 1
|
|
|
|
else:
|
|
|
|
dets[i][6], dets[i][7] = 0, 0
|
|
|
|
break
|
|
|
|
# 如果分割图像中不存在speedRoad,则无法进行违章判定,将所有车辆的违章类别设为0,即没有违章
|
|
|
|
if len(dets[i]) < 8:
|
|
|
|
dets[i].append(0) # 违章得分为0
|
|
|
|
dets[i].append(0) # 0表示没有违章
|
|
|
|
targetList = dets
|
|
|
|
if len(unnormVehicle) != 0:
|
|
|
|
for i in range(len(unnormVehicle)):
|
|
|
|
targetList.append(unnormVehicle[i]) # 将所有车辆的信息合并到一起
|
|
|
|
else:
|
|
|
|
targetList = unnormVehicle
|
|
|
|
return targetList
|
|
|
|
|
|
|
|
|
|
|
|
# 计算违停得分
|
|
|
|
def IllegalParkScore2(vehicleBD, dets, unnormVehicle):
|
|
|
|
"""
|
|
|
|
计算违章得分
|
|
|
|
输出targetList 其格式为:[[cls, x0, y0, x1, y1, score, 违章得分, 违章类别], ...]
|
|
|
|
"""
|
|
|
|
if len(vehicleBD) != 0:
|
|
|
|
for i in range(len(vehicleBD)):
|
|
|
|
if len(dets[i]) < 8:
|
|
|
|
dets[i].append(0) # 违章得分为0
|
|
|
|
dets[i].append(0) # 0表示没有违章
|
|
|
|
targetList = dets
|
|
|
|
if len(unnormVehicle) != 0:
|
|
|
|
for i in range(len(unnormVehicle)):
|
|
|
|
targetList.append(unnormVehicle[i]) # 将所有车辆的信息合并到一起
|
|
|
|
else:
|
|
|
|
targetList = unnormVehicle
|
|
|
|
return targetList
|
|
|
|
|
|
|
|
|
|
|
|
# 找最左侧lane时,将要删除的右侧lane的序号存储在delRightLane。找最右侧lane时,将要删除的左侧lane的序号存储在delLeftLane。
|
|
|
|
def devideLane(laneInfo, i, m, delRightLane, delLeftLane, y):
|
|
|
|
index1 = np.where(laneInfo[i][3] == y)
|
|
|
|
index1 = index1[0].tolist()
|
|
|
|
index1.sort()
|
|
|
|
x_1 = laneInfo[i][5][index1[0]][0]
|
|
|
|
index2 = np.where(laneInfo[m][3] == y)
|
|
|
|
index2 = index2[0].tolist()
|
|
|
|
index2.sort()
|
|
|
|
x_2 = laneInfo[m][5][index2[0]][0]
|
|
|
|
if x_1 < x_2:
|
|
|
|
if i not in delLeftLane:
|
|
|
|
delLeftLane.append(i) # 保留右侧lane
|
|
|
|
if m not in delRightLane:
|
|
|
|
delRightLane.append(m)
|
|
|
|
else:
|
|
|
|
if m not in delLeftLane:
|
|
|
|
delLeftLane.append(m)
|
|
|
|
if i not in delRightLane:
|
|
|
|
delRightLane.append(i)
|
|
|
|
|
|
|
|
return delRightLane, delLeftLane
|
|
|
|
|
|
|
|
|
|
|
|
# 确定最左侧和最右侧的lane簇
|
|
|
|
def detLine(contours):
|
|
|
|
"""
|
|
|
|
输入
|
|
|
|
contours:各lane的contours
|
|
|
|
输出
|
|
|
|
laneInfo:存储各lane的信息,每条lane的信息为:[contours, y坐标范围, lane序号, arr_y, y坐标范围的长度, cnt]
|
|
|
|
delRightLane:在确定最左侧lane时,其存储需要删除的lane的序号
|
|
|
|
delLeftLane:在确定最右侧lane时,其存储需要删除的lane的序号
|
|
|
|
"""
|
|
|
|
mergList = []
|
|
|
|
for i in range(len(contours)):
|
|
|
|
cnt = np.squeeze(contours[i], 1)
|
|
|
|
arr_y = cnt[:, 1]
|
|
|
|
arrList = list(set(arr_y))
|
|
|
|
cnt_y = np.sort(np.array(arrList))
|
|
|
|
mergList.append([contours[i], cnt_y, i, arr_y, len(cnt_y), cnt])
|
|
|
|
laneInfo = sorted(mergList, key=(lambda x: x[4])) # [[contours[i], cnt_y, i, arr_y, len(cnt_y)],...]
|
|
|
|
delRightLane = [] # 求最左侧lane
|
|
|
|
delLeftLane = [] # 求最右侧lane
|
|
|
|
laneInfoNew = []
|
|
|
|
for i in range(len(laneInfo)):
|
|
|
|
laneInfoNew.append([laneInfo[i][1][0], laneInfo[i][1][-1], i]) # [[y_min, y_max, i],...]
|
|
|
|
laneInfoNew = np.array(laneInfoNew)
|
|
|
|
new1 = laneInfoNew[:, np.newaxis, :].repeat(laneInfoNew.shape[0], 1)
|
|
|
|
new2 = laneInfoNew[np.newaxis, ...].repeat(laneInfoNew.shape[0], 0)
|
|
|
|
new3 = np.concatenate((new1, new2), axis=2)
|
|
|
|
y_i_min, y_i_max, y_m_min, y_m_max = new3[..., 0], new3[..., 1], new3[..., 3], new3[..., 4]
|
|
|
|
mask1 = (y_i_min >= y_m_min) & (y_i_min <= y_m_max) & (y_i_max > y_m_max)
|
|
|
|
mask2 = (y_i_max >= y_m_min) & (y_i_max <= y_m_max) & (y_i_min < y_m_min)
|
|
|
|
mask3 = (y_i_min >= y_m_min) & (y_i_max <= y_m_max)
|
|
|
|
mask4 = (y_i_min < y_m_min) & (y_i_max > y_m_max)
|
|
|
|
if len(np.nonzero(mask1)[0]) != 0:
|
|
|
|
mask1 = np.triu(mask1, k=1)
|
|
|
|
serial_i = new3[mask1][..., 2]
|
|
|
|
serial_m = new3[mask1][..., 5]
|
|
|
|
for k in range(len(serial_i)):
|
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane):
|
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_i[k]][1][0])
|
|
|
|
|
|
|
|
if len(np.nonzero(mask2)[0]) != 0:
|
|
|
|
mask2 = np.triu(mask2, k=1)
|
|
|
|
serial_i = new3[mask2][..., 2]
|
|
|
|
serial_m = new3[mask2][..., 5]
|
|
|
|
for k in range(len(serial_i)):
|
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane):
|
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_i[k]][1][-1])
|
|
|
|
|
|
|
|
if len(np.nonzero(mask3)[0]) != 0:
|
|
|
|
mask3 = np.triu(mask3, k=1)
|
|
|
|
serial_i = new3[mask3][..., 2]
|
|
|
|
serial_m = new3[mask3][..., 5]
|
|
|
|
for k in range(len(serial_i)):
|
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane):
|
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_i[k]][1][0])
|
|
|
|
|
|
|
|
if len(np.nonzero(mask4)[0]) != 0:
|
|
|
|
mask4 = np.triu(mask4, k=1)
|
|
|
|
serial_i = new3[mask4][..., 2]
|
|
|
|
serial_m = new3[mask4][..., 5]
|
|
|
|
for k in range(len(serial_i)):
|
|
|
|
if (serial_m[k] not in delLeftLane) or (serial_m[k] not in delRightLane) or (serial_i[k] not in delLeftLane) or (serial_i[k] not in delRightLane):
|
|
|
|
delRightLane, delLeftLane = devideLane(laneInfo, serial_i[k], serial_m[k], delRightLane, delLeftLane, laneInfo[serial_m[k]][1][0])
|
|
|
|
return laneInfo, delRightLane, delLeftLane
|
|
|
|
|
|
|
|
|
|
|
|
# 对lane中的y值坐标进行下采样
|
|
|
|
def downSample(cnt_y):
|
|
|
|
# number = len(cnt_y) * 0.0125
|
|
|
|
# cnt_y = np.random.choice(cnt_y, size=number, replace=False)
|
|
|
|
if len(cnt_y) >= 1000:
|
|
|
|
cnt_y = cnt_y[1::80]
|
|
|
|
elif len(cnt_y) >= 900 and len(cnt_y) < 1000:
|
|
|
|
cnt_y = cnt_y[1::75]
|
|
|
|
elif len(cnt_y) >= 800 and len(cnt_y) < 900:
|
|
|
|
cnt_y = cnt_y[1::70]
|
|
|
|
elif len(cnt_y) >= 700 and len(cnt_y) < 800:
|
|
|
|
cnt_y = cnt_y[1::65]
|
|
|
|
elif len(cnt_y) >= 600 and len(cnt_y) < 700:
|
|
|
|
cnt_y = cnt_y[1::60]
|
|
|
|
elif len(cnt_y) >= 500 and len(cnt_y) < 600:
|
|
|
|
cnt_y = cnt_y[1::55]
|
|
|
|
elif len(cnt_y) >= 400 and len(cnt_y) < 500:
|
|
|
|
cnt_y = cnt_y[1::40]
|
|
|
|
elif len(cnt_y) >= 300 and len(cnt_y) < 400:
|
|
|
|
cnt_y = cnt_y[1::45]
|
|
|
|
elif len(cnt_y) >= 200 and len(cnt_y) < 300:
|
|
|
|
cnt_y = cnt_y[1::40]
|
|
|
|
elif len(cnt_y) >= 100 and len(cnt_y) < 200:
|
|
|
|
cnt_y = cnt_y[1::35]
|
|
|
|
elif len(cnt_y) >= 50 and len(cnt_y) < 100:
|
|
|
|
cnt_y = cnt_y[1::20]
|
|
|
|
elif len(cnt_y) >= 20 and len(cnt_y) < 50:
|
|
|
|
cnt_y = cnt_y[1::6]
|
|
|
|
else:
|
|
|
|
cnt_y = cnt_y[1::5]
|
|
|
|
return cnt_y
|
|
|
|
|
|
|
|
|
|
|
|
# 求最左侧lane或最右侧lane中的各点坐标
|
|
|
|
def targetCOOR(laneInfo, delLane):
|
|
|
|
"""
|
|
|
|
输入
|
|
|
|
laneInfo:存储各lane的信息,每条lane的信息为:[contours, y坐标范围, lane序号, arr_y, y坐标范围的长度, cnt]
|
|
|
|
delLane:在确定最左侧lane或最右侧lane时,其存储需要删除的lane的序号。
|
|
|
|
输出
|
|
|
|
laneCOOR:存储最左侧或最右侧lane簇中各点的坐标
|
|
|
|
"""
|
|
|
|
laneCOOR = [] # 存储lane中各点的坐标
|
|
|
|
centerSort = [] # 存储各lane按照中心点的y坐标排序后的结果
|
|
|
|
for j in range(len(laneInfo)):
|
|
|
|
if j not in delLane:
|
|
|
|
cnt = laneInfo[j][0]
|
|
|
|
rect = cv2.minAreaRect(cnt)
|
|
|
|
cnt = np.squeeze(cnt, 1)
|
|
|
|
cnt_y = laneInfo[j][1]
|
|
|
|
cnt_y = downSample(cnt_y)
|
|
|
|
centerSort.append([rect[0][1], cnt_y, laneInfo[j][3], cnt, j])
|
|
|
|
centerSort = sorted(centerSort, key=(lambda x: x[0]))
|
|
|
|
for i in range(len(centerSort)):
|
|
|
|
centerCoordinate = []
|
|
|
|
for j in range(len(centerSort[i][1])):
|
|
|
|
index = np.where(centerSort[i][2] == centerSort[i][1][j])
|
|
|
|
indexList = index[0].tolist()
|
|
|
|
indexList.sort()
|
|
|
|
x = (centerSort[i][3][indexList[0]][0] + centerSort[i][3][indexList[-1]][0]) / 2
|
|
|
|
y = (centerSort[i][3][indexList[0]][1] + centerSort[i][3][indexList[-1]][1]) / 2
|
|
|
|
centerCoordinate.append([x, y])
|
|
|
|
laneCOOR = laneCOOR + centerCoordinate
|
|
|
|
return laneCOOR
|
|
|
|
|
|
|
|
|
|
|
|
# 二次函数曲线表达式:x = a*(y**2) + b*y + c,根据图像中一点的y坐标求二次曲线中的x坐标
|
|
|
|
def predict(a, b, c, y):
|
|
|
|
x = a * (y**2) + b * y + c
|
|
|
|
return x
|
|
|
|
|
|
|
|
|
|
|
|
def mixNoParking_road_postprocess(dets, mask, pars):
|
|
|
|
"""
|
|
|
|
对于字典traffic_dict中的各个键,说明如下:
|
|
|
|
RoadArea:speedRoad的最小外接矩形的面积
|
|
|
|
vehicleCOOR:是一个列表,用于存储被检测出的vehicle的坐标(vehicle检测模型)
|
|
|
|
roundness:圆度 ,lane的长与宽的比率,作为判定是否为车道线的标准之一
|
|
|
|
laneArea:车道线的最小外接矩形的面积
|
|
|
|
ZoomFactor:图像在H和W方向上的缩放因子,其值小于1
|
|
|
|
fitOrder:多点拟合曲线的阶数
|
|
|
|
最终输出格式:[[x0, y0, x1, y1, 车辆得分, cls, 违章停车得分, 违章类别], ...]
|
|
|
|
违章类别:0表示正常车辆,1表示违章车辆
|
|
|
|
"""
|
|
|
|
det_cors = []
|
|
|
|
for bb in dets:
|
|
|
|
det_cors.append((int(bb[0]), int(bb[1])))
|
|
|
|
det_cors.append((int(bb[2]), int(bb[3])))
|
|
|
|
print('###line341:', det_cors)
|
|
|
|
pars['vehicleCOOR'] = det_cors
|
|
|
|
H, W = mask.shape[0:2] # mask的分辨率为360x640
|
|
|
|
scaleH = pars['modelSize'][1] / H # 自适应调整缩放比例
|
|
|
|
scaleW = pars['modelSize'][0] / W
|
|
|
|
pars['ZoomFactor'] = {'x': scaleW, 'y': scaleH}
|
|
|
|
new_hw = [int(H * scaleH), int(W * scaleW)]
|
|
|
|
mask = cv2.resize(mask, (new_hw[1], new_hw[0]))
|
|
|
|
if len(mask.shape) == 3:
|
|
|
|
mask = mask[:, :, 0]
|
|
|
|
|
|
|
|
t1 = time.time()
|
|
|
|
imgRoad = mask.copy()
|
|
|
|
imgVehicle = mask.copy()
|
|
|
|
lane_line = mask.copy()
|
|
|
|
# 将vehicle和lane过滤掉,只包含背景和speedRoad
|
|
|
|
imgRoad[imgRoad == 2] = 1
|
|
|
|
imgRoad[imgRoad == 3] = 1
|
|
|
|
# 将speedRoad和lane过滤掉,只保留vehicle和背景
|
|
|
|
imgVehicle[imgVehicle != 2] = 0
|
|
|
|
# 将speedRoad和vehicle过滤掉,只保留lane和背景
|
|
|
|
lane_line[lane_line < 3] = 0
|
|
|
|
imgRoad = cv2.cvtColor(np.uint8(imgRoad), cv2.COLOR_RGB2BGR) # 道路
|
|
|
|
imgVehicle = cv2.cvtColor(np.uint8(imgVehicle), cv2.COLOR_RGB2BGR) # 车辆
|
|
|
|
lane_line = cv2.cvtColor(np.uint8(lane_line), cv2.COLOR_RGB2BGR)
|
|
|
|
|
|
|
|
# 对车道线进行膨胀操作
|
|
|
|
# kernel = np.ones((3, 3), np.uint8) # 膨胀范围
|
|
|
|
# lane_line = cv2.dilate(lane_line, kernel, iterations=2) # 迭代次数为2
|
|
|
|
|
|
|
|
t2 = time.time()
|
|
|
|
img1 = cv2.cvtColor(imgRoad, cv2.COLOR_BGR2GRAY)
|
|
|
|
roadContours, hierarchy = cv2.findContours(img1, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
|
|
|
|
t3 = time.time()
|
|
|
|
# 存储所有speedRoad的信息
|
|
|
|
allRoadCnt = storageRoad(roadContours, pars)
|
|
|
|
t4 = time.time()
|
|
|
|
img3 = cv2.cvtColor(lane_line, cv2.COLOR_BGR2GRAY)
|
|
|
|
laneContours, hierarchy = cv2.findContours(img3, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
|
|
|
|
# 存储所有lane的信息
|
|
|
|
laneNumber, newLaneContours = storageLane(laneContours, pars)
|
|
|
|
t5 = time.time()
|
|
|
|
|
|
|
|
if laneNumber >= 2:
|
|
|
|
laneInfo, delRightLane, delLeftLane = detLine(newLaneContours)
|
|
|
|
t6 = time.time()
|
|
|
|
# 存储所有vehicle的信息
|
|
|
|
dets, vehicleBD, unnormVehicle, normVehicleCOOR = storageVehicle(pars, imgVehicle, dets)
|
|
|
|
t7 = time.time()
|
|
|
|
leftLaneCOOR = targetCOOR(laneInfo, delRightLane)
|
|
|
|
rightLaneCOOR = targetCOOR(laneInfo, delLeftLane)
|
|
|
|
rightLaneCOOR = np.array(rightLaneCOOR)
|
|
|
|
rightX = rightLaneCOOR[:, 0]
|
|
|
|
rightY = rightLaneCOOR[:, 1]
|
|
|
|
leftLaneCOOR = np.array(leftLaneCOOR)
|
|
|
|
leftX = leftLaneCOOR[:, 0]
|
|
|
|
leftY = leftLaneCOOR[:, 1]
|
|
|
|
# a_r,b_r,c_r分别是:最右侧车道线簇拟合的二次函数的二次项系数一次项系数,和常数项
|
|
|
|
a_r, b_r, c_r = np.polyfit(rightY, rightX, pars['fitOrder'])[0], np.polyfit(rightY, rightX, pars['fitOrder'])[1], np.polyfit(rightY, rightX, pars['fitOrder'])[2]
|
|
|
|
# a_l,b_l,c_l分别是:最左侧车道线簇拟合的二次函数的二次项系数,一次项系数,和常数项
|
|
|
|
a_l, b_l, c_l = np.polyfit(leftY, leftX, pars['fitOrder'])[0], np.polyfit(leftY, leftX, pars['fitOrder'])[1], np.polyfit(leftY, leftX, pars['fitOrder'])[2]
|
|
|
|
|
|
|
|
# """以下四行代码用于在后处理函数外画图"""
|
|
|
|
# finalLane = []
|
|
|
|
# abc = [a_l, b_l, c_l, a_r, b_r, c_r] # abc中存储的是最左侧和最右侧二次函数的各项系数
|
|
|
|
# finalLane.append(rightLaneCOOR)
|
|
|
|
# finalLane.append(leftLaneCOOR)
|
|
|
|
|
|
|
|
# 计算违停得分
|
|
|
|
t8 = time.time()
|
|
|
|
targetList = IllegalParkScore1(vehicleBD, allRoadCnt, dets, unnormVehicle, normVehicleCOOR, a_l, b_l, c_l, a_r, b_r, c_r)
|
|
|
|
t9 = time.time()
|
|
|
|
time_infos = 'postTime:%.2f(分割时间:%.2f, findContours:%.2f, ruleJudge:%.2f, storageRoad:%.2f, detLane:%.2f, storageLane:%.2f, storageVehicle:%.2f, fitLine:%.2f, IllegalParkScore1:%.2f)' % (
|
|
|
|
get_ms(t9, t1), get_ms(t2, t1), get_ms(t3, t2), get_ms(t9, t3), get_ms(t4, t3), get_ms(t6, t5),
|
|
|
|
get_ms(t5, t4), get_ms(t7, t6), get_ms(t8, t7), get_ms(t9, t8))
|
|
|
|
# print('####line445:', targetList)
|
|
|
|
# return targetList, time_infos, finalLane, lane_line, abc
|
|
|
|
targetList = [ [ *b[0:4],b[6] if b[6]>0 else b[4], b[7] ] for b in targetList ]
|
|
|
|
return targetList, time_infos
|
|
|
|
|
|
|
|
else:
|
|
|
|
dets, vehicleBD, unnormVehicle, normVehicleCOOR = storageVehicle(pars, imgVehicle, dets)
|
|
|
|
t6 = time.time()
|
|
|
|
targetList = IllegalParkScore2(vehicleBD, dets, unnormVehicle)
|
|
|
|
t7 = time.time()
|
|
|
|
time_infos = 'postTime:%.2f(分割时间:%.2f, findContours:%.2f, ruleJudge:%.2f, storageRoad:%.2f, storageLane:%.2f, storageVehicle:%.2f, IllegalParkScore2:%.2f)' % (
|
|
|
|
get_ms(t7, t1), get_ms(t2, t1), get_ms(t3, t2), get_ms(t7, t3), get_ms(t4, t3), get_ms(t5, t4), get_ms(t6, t5), get_ms(t7, t6))
|
|
|
|
# print('####line456:', targetList)
|
|
|
|
targetList = [ [ *b[0:4],b[6] if b[6]>0 else b[4], b[7] ] for b in targetList ]
|
|
|
|
return targetList, time_infos
|
|
|
|
def mixNoParking_road_postprocess_N(predList, pars):
|
|
|
|
dets, mask =predList[0:2]
|
|
|
|
return mixNoParking_road_postprocess(dets, mask, pars)
|
|
|
|
|
|
|
|
|