2025-06-25 17:00:52 +08:00
# 设定开关, 将最小外接矩形中心点间的距离作为vehicle之间的距离
import numpy as np
import math , cv2 , time
from copy import deepcopy
2025-07-10 17:54:17 +08:00
def xyxy_coordinate ( boundbxs , contour ) :
'''
输入 : 两个对角坐标xyxy
输出 : 四个点位置
'''
x1 = boundbxs [ 0 ]
y1 = boundbxs [ 1 ]
x2 = boundbxs [ 2 ]
y2 = boundbxs [ 3 ]
for x in ( x1 , x2 ) :
for y in ( y1 , y2 ) :
flag = cv2 . pointPolygonTest ( contour , ( int ( x ) , int ( y ) ) ,
False ) # 若为False, 会找点是否在内, 外, 或轮廓上(相应返回+1, -1, 0)。
if flag == 1 :
return 1
return flag
2025-06-25 17:00:52 +08:00
def get_ms ( time2 , time1 ) :
return ( time2 - time1 ) * 1000.0
def two_points_distance ( x1 , y1 , x2 , y2 ) :
distance = math . sqrt ( ( x1 - x2 ) * * 2 + ( y1 - y2 ) * * 2 )
return distance
# 保存正常vehicle和非正常vehicle的信息( 当contours顶点数小于6时, 无法拟合最小外接矩形, 定义为非正常vehicle)
def saveVehicle1 ( traffic_dict , contours , normVehicleBD , normVehicle , count , i , unnormVehicle , normVehicleCOOR ) :
if len ( contours ) > = 6 :
normVehicleBD . append ( contours )
normVehicle . append ( traffic_dict [ ' det ' ] [ count ] )
rect = cv2 . minAreaRect ( contours )
normVehicleCOOR . append ( rect [ 0 ] )
else :
traffic_dict [ ' det ' ] [ int ( i / 2 ) ] = traffic_dict [ ' det ' ] [ int ( i / 2 ) ] + [ 0 , 0.3 , 999 , - 1 , 3 ]
unnormVehicle . append ( traffic_dict [ ' det ' ] [ int ( i / 2 ) ] )
return normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR
# saveVehicle2和saveVehicle1有区别
def saveVehicle2 ( traffic_dict , contours , normVehicleBD , normVehicle , count , i , unnormVehicle , normVehicleCOOR , centerCOOR ) :
if len ( contours ) > = 6 :
normVehicleBD . append ( contours )
normVehicle . append ( traffic_dict [ ' det ' ] [ count ] )
normVehicleCOOR . append ( centerCOOR )
else :
traffic_dict [ ' det ' ] [ int ( i / 2 ) ] = traffic_dict [ ' det ' ] [ int ( i / 2 ) ] + [ 0 , 0.3 , 999 , - 1 , 3 ]
unnormVehicle . append ( traffic_dict [ ' det ' ] [ int ( i / 2 ) ] )
return normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR
# 对于不在道路上的vehicle, 将输出信息补全
def supplementInformation ( traffic_dict , i , roundness , y_min , y_max , imgVehicle , rect ) :
score = - 1
traffic_dict [ ' det ' ] [ i ] = traffic_dict [ ' det ' ] [ i ] + [ 0 , roundness , 999 , [ - 1 , - 1 , - 1 ] , 666 ]
if y_min > 0 and y_max < imgVehicle . shape [ 0 ] and roundness > traffic_dict [ ' roundness ' ] : # 过滤掉上下方被speedRoad的边界截断的vehicle
score = ( min ( rect [ 1 ] ) - max ( rect [ 1 ] ) * traffic_dict [ ' roundness ' ] ) / ( max ( rect [ 1 ] ) * ( 1 - traffic_dict [ ' roundness ' ] ) )
return score
# 判断交通事故类型
def judgeAccidentType ( traffic_dict , b ) :
if max ( traffic_dict [ ' det ' ] [ b ] [ 9 ] ) == traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 0 ] and traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 0 ] != - 1 :
return 0
elif max ( traffic_dict [ ' det ' ] [ b ] [ 9 ] ) == traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 1 ] and traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 1 ] != - 1 :
return 1
elif max ( traffic_dict [ ' det ' ] [ b ] [ 9 ] ) == traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 2 ] and traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 2 ] != - 1 :
return 2
else :
return 3
# 计算距离得分
def distanceScore ( vehicleWH , index1 , index2 , smallestDistance , traffic_dict ) :
d1 = ( min ( vehicleWH [ index1 ] ) + min ( vehicleWH [ index2 ] ) ) / 2
d2 = min ( min ( vehicleWH [ index1 ] ) , min ( vehicleWH [ index2 ] ) ) + max ( min ( vehicleWH [ index1 ] ) , min ( vehicleWH [ index2 ] ) ) / 2
if smallestDistance == d1 :
score1 = 1
traffic_dict [ ' det ' ] [ index2 ] [ 9 ] [ 2 ] = score1
traffic_dict [ ' det ' ] [ index2 ] [ 10 ] = judgeAccidentType ( traffic_dict , index2 )
elif smallestDistance < d2 :
score1 = 1 - ( smallestDistance - d1 ) / ( d2 - d1 )
if 0 < score1 < 1 :
traffic_dict [ ' det ' ] [ index2 ] [ 9 ] [ 2 ] = score1
traffic_dict [ ' det ' ] [ index2 ] [ 10 ] = judgeAccidentType ( traffic_dict , index2 )
else :
traffic_dict [ ' det ' ] [ index2 ] [ 10 ] = judgeAccidentType ( traffic_dict , index2 )
else :
traffic_dict [ ' det ' ] [ index2 ] [ 10 ] = judgeAccidentType ( traffic_dict , index2 )
return traffic_dict [ ' det ' ]
# 计算两个contours之间的最短距离
def array_distance ( arr1 , arr2 ) :
'''
计算两个数组中 , 每任意两个点之间L2距离
arr1和arr2都必须是numpy数组
且维度分别是mx2 , nx2
输出数组维度为mxn
'''
m , _ = arr1 . shape
n , _ = arr2 . shape
arr1_power = np . power ( arr1 , 2 )
arr1_power_sum = arr1_power [ : , 0 ] + arr1_power [ : , 1 ] # 第1区域, x与y的平方和
arr1_power_sum = np . tile ( arr1_power_sum , ( n , 1 ) ) # 将arr1_power_sum沿着y轴复制n倍, 沿着x轴复制1倍, 这里用于与arr2进行计算。 n x m 维度
arr1_power_sum = arr1_power_sum . T # 将arr1_power_sum进行转置
arr2_power = np . power ( arr2 , 2 )
arr2_power_sum = arr2_power [ : , 0 ] + arr2_power [ : , 1 ] # 第2区域, x与y的平方和
arr2_power_sum = np . tile ( arr2_power_sum , ( m , 1 ) ) # 将arr1_power_sum沿着y轴复制m倍, 沿着x轴复制1倍, 这里用于与arr1进行计算。 m x n 维度
dis = arr1_power_sum + arr2_power_sum - ( 2 * np . dot ( arr1 , arr2 . T ) ) # np.dot(arr1, arr2.T)矩阵相乘, 得到xy的值。
dis = np . sqrt ( dis )
return dis
# 存储所有道路的信息
def storageRoad ( contours , allRoadContent , traffic_dict ) :
speedRoadAngle = 0
for cnt in contours : # 道路
rect = cv2 . minAreaRect ( cnt )
if rect [ 1 ] [ 0 ] * rect [ 1 ] [ 1 ] > traffic_dict [ ' RoadArea ' ] : # 过滤掉面积小于阈值的speedRoad
if rect [ 1 ] [ 0 ] < = rect [ 1 ] [ 1 ] :
if rect [ 2 ] > = 0 and rect [ 2 ] < 90 :
speedRoadAngle = rect [ 2 ] + 90
elif rect [ 2 ] == 90 :
speedRoadAngle = 0
else :
if rect [ 2 ] > = 0 and rect [ 2 ] < = 90 :
speedRoadAngle = rect [ 2 ]
allRoadContent . append ( [ cnt , speedRoadAngle , rect [ 1 ] ] )
return allRoadContent
# 存储所有vehicle的信息, 方法1
def storageVehicle1 ( traffic_dict , normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR , imgVehicle ) :
#输入:
#
#输出: traffic_dict['det'], normVehicleBD, unnormVehicle, normVehicleCOOR
# traffic_dict['det']: resize缩小之后的坐标, 类别, 得分.[cls,x0,y0,x1,y1,score]
# normVehicleBD : 正常车辆的contours。( 正常车辆指的是countrous定点数>=6)
# unnormVehicle : resize缩小之后的异常车辆坐标, 类别, 得分.[cls,x0,y0,x1,y1,score]
count = 0
for i in range ( 0 , len ( traffic_dict [ ' vehicleCOOR ' ] ) , 2 ) :
mask = np . zeros ( imgVehicle . shape [ : 2 ] , dtype = " uint8 " )
x0 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i ] [ 0 ] * traffic_dict [ ' ZoomFactor ' ] [ ' y ' ] )
y0 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i ] [ 1 ] * traffic_dict [ ' ZoomFactor ' ] [ ' x ' ] )
x1 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i + 1 ] [ 0 ] * traffic_dict [ ' ZoomFactor ' ] [ ' y ' ] )
y1 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i + 1 ] [ 1 ] * traffic_dict [ ' ZoomFactor ' ] [ ' x ' ] )
cv2 . rectangle ( mask , ( x0 , y0 ) , ( x1 , y1 ) , 255 , - 1 , lineType = cv2 . LINE_AA )
imgVehicle_masked = cv2 . bitwise_and ( imgVehicle , imgVehicle , mask = mask )
img2 = cv2 . cvtColor ( imgVehicle_masked , cv2 . COLOR_BGR2GRAY )
contours2 , hierarchy2 = cv2 . findContours ( img2 , cv2 . RETR_EXTERNAL , cv2 . CHAIN_APPROX_NONE )
if len ( contours2 ) != 0 :
if len ( contours2 ) > 1 : # 这里我通过比较同一检测框内各个contours对应的最小外接矩形的面积, 来剔除那些存在干扰的contours, 最终只保留一个contours
vehicleArea = [ ] # 存储vehicle的最小外接矩形的面积
for j in range ( len ( contours2 ) ) :
rect = cv2 . minAreaRect ( contours2 [ j ] )
vehicleArea . append ( rect [ 1 ] [ 0 ] * rect [ 1 ] [ 1 ] )
maxAreaIndex = vehicleArea . index ( max ( vehicleArea ) )
maxAreaContours = contours2 [ maxAreaIndex ]
normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR = saveVehicle1 ( traffic_dict , maxAreaContours , normVehicleBD , normVehicle , count , i , unnormVehicle , normVehicleCOOR )
elif len ( contours2 ) == 1 :
normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR = saveVehicle1 ( traffic_dict , contours2 [ 0 ] , normVehicleBD , normVehicle , count , i , unnormVehicle , normVehicleCOOR )
else :
traffic_dict [ ' det ' ] [ int ( i / 2 ) ] = traffic_dict [ ' det ' ] [ int ( i / 2 ) ] + [ 0 , 0.3 , 999 , - 1 , 3 ]
unnormVehicle . append ( traffic_dict [ ' det ' ] [ int ( i / 2 ) ] )
count + = 1
traffic_dict [ ' det ' ] = normVehicle
return traffic_dict [ ' det ' ] , normVehicleBD , unnormVehicle , normVehicleCOOR
# 存储所有vehicle的信息, 方法2
def storageVehicle2 ( traffic_dict , normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR , imgVehicle ) :
img = cv2 . cvtColor ( imgVehicle , cv2 . COLOR_BGR2GRAY )
count = 0
for i in range ( 0 , len ( traffic_dict [ ' vehicleCOOR ' ] ) , 2 ) :
row1 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i ] [ 1 ] * traffic_dict [ ' ZoomFactor ' ] [ ' x ' ] )
row2 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i + 1 ] [ 1 ] * traffic_dict [ ' ZoomFactor ' ] [ ' x ' ] )
col1 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i ] [ 0 ] * traffic_dict [ ' ZoomFactor ' ] [ ' y ' ] )
col2 = int ( traffic_dict [ ' vehicleCOOR ' ] [ i + 1 ] [ 0 ] * traffic_dict [ ' ZoomFactor ' ] [ ' y ' ] )
if row1 > = 2 :
row1 = row1 - 2
if row2 < = ( traffic_dict [ ' modelSize ' ] [ 1 ] - 2 ) :
row2 = row2 + 2
if col1 > = 2 :
col1 = col1 - 2
if col2 < = ( traffic_dict [ ' modelSize ' ] [ 0 ] - 2 ) :
col2 = col2 + 2
centerCOOR = ( int ( ( col1 + col2 ) / 2 ) , int ( ( row1 + row2 ) / 2 ) )
img1 = img [ row1 : row2 , col1 : col2 ]
up = np . zeros ( ( 20 , ( col2 - col1 ) ) , dtype = ' uint8 ' )
left = np . zeros ( ( ( 40 + row2 - row1 ) , 20 ) , dtype = ' uint8 ' )
img1 = np . concatenate ( ( up , img1 ) , axis = 0 )
img1 = np . concatenate ( ( img1 , up ) , axis = 0 )
img1 = np . concatenate ( ( left , img1 ) , axis = 1 )
img2 = np . concatenate ( ( img1 , left ) , axis = 1 )
contours2 , hierarchy = cv2 . findContours ( img2 , cv2 . RETR_EXTERNAL , cv2 . CHAIN_APPROX_NONE )
if len ( contours2 ) != 0 :
if len ( contours2 ) > 1 :
vehicleArea = [ ] # 存储vehicle的最小外接矩形的面积
for j in range ( len ( contours2 ) ) :
rect = cv2 . minAreaRect ( contours2 [ j ] )
vehicleArea . append ( rect [ 1 ] [ 0 ] * rect [ 1 ] [ 1 ] )
maxAreaIndex = vehicleArea . index ( max ( vehicleArea ) )
maxAreaContours = contours2 [ maxAreaIndex ]
normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR = saveVehicle2 ( traffic_dict , maxAreaContours , normVehicleBD , normVehicle , count , i , unnormVehicle , normVehicleCOOR , centerCOOR )
elif len ( contours2 ) == 1 :
normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR = saveVehicle2 ( traffic_dict , contours2 [ 0 ] , normVehicleBD , normVehicle , count , i , unnormVehicle , normVehicleCOOR , centerCOOR )
else :
traffic_dict [ ' det ' ] [ int ( i / 2 ) ] = traffic_dict [ ' det ' ] [ int ( i / 2 ) ] + [ 0 , 0.3 , 999 , - 1 , 3 ]
unnormVehicle . append ( traffic_dict [ ' det ' ] [ int ( i / 2 ) ] )
count + = 1
traffic_dict [ ' det ' ] = normVehicle
return traffic_dict [ ' det ' ] , normVehicleBD , unnormVehicle , normVehicleCOOR
# 计算角度和长宽比得分
def angleRoundness ( normVehicleBD , vehicleBox , vehicleWH , allRoadContent , traffic_dict , normVehicleCOOR , imgVehicle ) :
##输出: vehicleBox, vehicleWH, traffic_dict['det']
# vehicleBox--正常车辆通过contours得出的box,[ (x0,y0),(x1,y1),(x2,y2),(x3,y3)]
# vehicleWH--正常车辆通过contours得出的box,[ (w,h)]
# traffic_dict['det']--[[cls, x0, y0, x1, y1, score, 角度, 长宽比, 最小距离, max([角度得分, 长宽比得分, 最小距离得分]), 交通事故类别], ...]
for i in range ( len ( normVehicleBD ) ) :
ellipse = cv2 . fitEllipse ( normVehicleBD [ i ] )
vehicleAngle = 0
if ellipse [ 2 ] > = 0 and ellipse [ 2 ] < 90 :
vehicleAngle = 90 + ellipse [ 2 ]
elif ellipse [ 2 ] > = 90 and ellipse [ 2 ] < 180 :
vehicleAngle = ellipse [ 2 ] - 90
elif ellipse [ 2 ] == 180 :
vehicleAngle = 90
rect = cv2 . minAreaRect ( normVehicleBD [ i ] )
box = cv2 . boxPoints ( rect ) . astype ( np . int32 )
center = normVehicleCOOR [ i ]
vehicleBox . append ( box )
vehicleWH . append ( rect [ 1 ] )
roundness = min ( rect [ 1 ] ) / max ( rect [ 1 ] )
y_min = np . min ( box [ : , 1 ] )
y_max = np . max ( box [ : , 1 ] )
if len ( allRoadContent ) != 0 :
for j in range ( len ( allRoadContent ) ) :
flag = cv2 . pointPolygonTest ( allRoadContent [ j ] [ 0 ] , center , False )
if flag > = 0 :
roadVehicleAngle = abs ( vehicleAngle - allRoadContent [ j ] [ 1 ] )
traffic_dict [ ' det ' ] [ i ] = traffic_dict [ ' det ' ] [ i ] + [ roadVehicleAngle , roundness , 999 , [ - 1 , - 1 , - 1 ] , 666 ]
if y_min > 0 and y_max < imgVehicle . shape [ 0 ] : # 过滤掉上下方被speedRoad的边界截断的vehicle
if roadVehicleAngle > = traffic_dict [ ' roadVehicleAngle ' ] : # 当道路同水平方向的夹角与车辆同水平方向的夹角的差值在15°和75°之间时, 需要将车辆框出来
if roadVehicleAngle > 90 :
score1 = float ( ( 180 - roadVehicleAngle ) / 90 )
else :
score1 = float ( roadVehicleAngle / 90 )
traffic_dict [ ' det ' ] [ i ] [ 9 ] [ 0 ] = score1
if roundness > traffic_dict [ ' roundness ' ] :
score2 = ( min ( rect [ 1 ] ) - max ( rect [ 1 ] ) * traffic_dict [ ' roundness ' ] ) / ( max ( rect [ 1 ] ) * ( 1 - traffic_dict [ ' roundness ' ] ) )
traffic_dict [ ' det ' ] [ i ] [ 9 ] [ 1 ] = score2
break
else :
j + = 1
if len ( traffic_dict [ ' det ' ] [ i ] ) == 6 :
traffic_dict [ ' det ' ] [ i ] [ 9 ] [ 1 ] = supplementInformation ( traffic_dict , i , roundness , y_min , y_max , imgVehicle , rect )
else :
traffic_dict [ ' det ' ] [ i ] [ 9 ] [ 1 ] = supplementInformation ( traffic_dict , i , roundness , y_min , y_max , imgVehicle , rect )
i + = 1
return vehicleBox , vehicleWH , traffic_dict [ ' det ' ]
# 对于某一vehicle, 以该vehicle的最小外接矩形的中心点为圆心O1, 划定半径范围, 求O1与半径范围内的其他vehicle的中心点之间的距离
def vehicleDistance1 ( normVehicleCOOR , normVehicleBD , traffic_dict , vehicleWH ) :
if len ( normVehicleCOOR ) > 1 :
for b in range ( len ( normVehicleCOOR ) ) :
contoursMinDistance = [ ] # 存储contours之间的最短距离
tmp = normVehicleCOOR [ b ]
normVehicleCOOR [ b ] = normVehicleCOOR [ 0 ]
normVehicleCOOR [ 0 ] = tmp
targetContours = [ ] # 存储目标vehicle和中心点同目标车辆中心点之间的距离小于traffic_dict['radius']的vehicle的box
for c in range ( 1 , len ( normVehicleCOOR ) ) :
if two_points_distance ( normVehicleCOOR [ 0 ] [ 0 ] , normVehicleCOOR [ 0 ] [ 1 ] , normVehicleCOOR [ c ] [ 0 ] , normVehicleCOOR [ c ] [ 1 ] ) < = traffic_dict [ ' radius ' ] :
if normVehicleBD [ b ] not in targetContours :
targetContours . append ( normVehicleBD [ b ] )
if c == b :
targetContours . append ( normVehicleBD [ 0 ] )
else :
targetContours . append ( normVehicleBD [ c ] )
if len ( targetContours ) != 0 :
goalVehicleContour = np . squeeze ( targetContours [ 0 ] , 1 )
for d in range ( 1 , len ( targetContours ) ) :
elseVehicleContour = np . squeeze ( targetContours [ d ] , 1 )
dist_arr = array_distance ( goalVehicleContour , elseVehicleContour )
min_dist = dist_arr [ dist_arr > 0 ] . min ( )
contoursMinDistance . append ( min_dist )
traffic_dict [ ' det ' ] [ b ] [ 8 ] = min ( contoursMinDistance )
if traffic_dict [ ' det ' ] [ b ] [ 8 ] < min ( vehicleWH [ b ] ) * traffic_dict [ ' vehicleFactor ' ] :
score1 = 1 - traffic_dict [ ' det ' ] [ b ] [ 8 ] / ( min ( vehicleWH [ b ] ) * traffic_dict [ ' vehicleFactor ' ] )
traffic_dict [ ' det ' ] [ b ] [ 9 ] [ 2 ] = score1
traffic_dict [ ' det ' ] [ b ] [ 10 ] = judgeAccidentType ( traffic_dict , b )
else :
traffic_dict [ ' det ' ] [ b ] [ 8 ] = 999
traffic_dict [ ' det ' ] [ b ] [ 10 ] = judgeAccidentType ( traffic_dict , b )
tmp = normVehicleCOOR [ b ]
normVehicleCOOR [ b ] = normVehicleCOOR [ 0 ]
normVehicleCOOR [ 0 ] = tmp
else : # 路上只有一辆车
if max ( traffic_dict [ ' det ' ] [ 0 ] [ 9 ] ) == traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 0 ] and traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 0 ] != - 1 :
traffic_dict [ ' det ' ] [ 0 ] [ 10 ] = 0
elif max ( traffic_dict [ ' det ' ] [ 0 ] [ 9 ] ) == traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 1 ] and traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 1 ] != - 1 :
traffic_dict [ ' det ' ] [ 0 ] [ 10 ] = 1
else :
traffic_dict [ ' det ' ] [ 0 ] [ 10 ] = 3
return traffic_dict [ ' det ' ]
# 计算vehicle的最小外接矩形中心点之间的距离和距离得分
def vehicleDistance2 ( normVehicleCOOR , traffic_dict , vehicleWH ) :
if len ( normVehicleCOOR ) > 1 : # 有多辆车
for b in range ( len ( normVehicleCOOR ) ) :
centerDistance = [ ] # 存储contours之间的最短距离
tmp = normVehicleCOOR [ b ]
normVehicleCOOR [ b ] = normVehicleCOOR [ 0 ]
normVehicleCOOR [ 0 ] = tmp
for c in range ( 1 , len ( normVehicleCOOR ) ) :
centerDistance . append ( two_points_distance ( normVehicleCOOR [ 0 ] [ 0 ] , normVehicleCOOR [ 0 ] [ 1 ] , normVehicleCOOR [ c ] [ 0 ] , normVehicleCOOR [ c ] [ 1 ] ) )
smallestDistance = min ( centerDistance )
index = centerDistance . index ( smallestDistance )
traffic_dict [ ' det ' ] [ b ] [ 8 ] = smallestDistance
if index == b - 1 : # 序号0和b对应的vehicle
traffic_dict [ ' det ' ] = distanceScore ( vehicleWH , 0 , b , smallestDistance , traffic_dict )
else :
traffic_dict [ ' det ' ] = distanceScore ( vehicleWH , index + 1 , b , smallestDistance , traffic_dict )
tmp = normVehicleCOOR [ b ]
normVehicleCOOR [ b ] = normVehicleCOOR [ 0 ]
normVehicleCOOR [ 0 ] = tmp
else : # 路上只有一辆车
if max ( traffic_dict [ ' det ' ] [ 0 ] [ 9 ] ) == traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 0 ] and traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 0 ] != - 1 :
traffic_dict [ ' det ' ] [ 0 ] [ 10 ] = 0
elif max ( traffic_dict [ ' det ' ] [ 0 ] [ 9 ] ) == traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 1 ] and traffic_dict [ ' det ' ] [ 0 ] [ 9 ] [ 1 ] != - 1 :
traffic_dict [ ' det ' ] [ 0 ] [ 10 ] = 1
else :
traffic_dict [ ' det ' ] [ 0 ] [ 10 ] = 3
return traffic_dict [ ' det ' ]
def PostProcessing ( traffic_dict ) :
"""
对于字典traffic_dict中的各个键 , 说明如下 :
RoadArea : speedRoad的最小外接矩形的面积
roadVehicleAngle : 判定发生交通事故的speedRoad与vehicle间的最小夹角
vehicleCOOR : 是一个列表 , 用于存储被检测出的vehicle的坐标 ( vehicle检测模型 )
roundness : 长宽比 , vehicle的长与宽的比率 , 设置为0 .7 , 若宽与长的比值大于0 .7 , 则判定该vehicle发生交通事故
ZoomFactor : 存储的是图像在H和W方向上的缩放因子 , 其值小于1
' cls ' : 类别号
' vehicleFactor ' : 两辆车之间的安全距离被定义为 : min ( 车辆1的宽 , 车辆2的宽 ) * vehicleFactor
' radius ' : 半径 , 以某一vehicle的最小外接矩形的中点为圆心 , 以radius为半径 , 划定范围 , 过滤车辆
' distanceFlag ' : 开关 。 计算vehicle之间的距离时 , 可选择不同的函数
' vehicleFlag ' : 开关 。 存储vehicle的信息时 , 可选择不同的函数
未发生交通事故时 , 得分为 - 1 , ” 事故类型 “ 为3
最终输出格式 : [ [ cls , x0 , y0 , x1 , y1 , score , 角度 , 长宽比 , 最小距离 , max ( [ 角度得分 , 长宽比得分 , 最小距离得分 ] ) , 交通事故类别 ] , . . . ]
交通事故类别 : 0 表示角度 , 1 表示长宽比 , 2 表示最短距离 , 3 表示未发生交通事故
"""
det_cors = [ ]
#print('###line338:', traffic_dict['det'])
for bb in traffic_dict [ ' det ' ] :
det_cors . append ( ( int ( bb [ 1 ] ) , int ( bb [ 2 ] ) ) )
det_cors . append ( ( int ( bb [ 3 ] ) , int ( bb [ 4 ] ) ) )
traffic_dict [ ' vehicleCOOR ' ] = det_cors
#testImageArray = testImageArray[:, :, 0]
#H, W = testImageArray.shape[0:2] # sourceImage的分辨率为1080x1920
traffic_dict [ ' modelSize ' ] = [ 640 , 360 ]
#traffic_dict['mask'] = cv2.resize(traffic_dict['mask'],(640,360))
mask = traffic_dict [ ' mask ' ]
H , W = mask . shape [ 0 : 2 ]
#(640, 360) 720 1280 (720, 1280)
####line361: (1920, 1080) 720 1280 (720, 1280)
###line361: [640, 360] 360 640 (360, 640)
#print('###line361:',traffic_dict['modelSize'], H,W ,mask.shape)
scaleH = traffic_dict [ ' modelSize ' ] [ 1 ] / H # 自适应调整缩放比例
scaleW = traffic_dict [ ' modelSize ' ] [ 0 ] / W
traffic_dict [ ' ZoomFactor ' ] = { ' x ' : scaleH , ' y ' : scaleW }
new_hw = [ int ( H * scaleH ) , int ( W * scaleW ) ]
mask = cv2 . resize ( mask , ( new_hw [ 1 ] , new_hw [ 0 ] ) )
if len ( mask . shape ) == 3 :
mask = mask [ : , : , 0 ]
t1 = time . time ( )
normVehicleBD = [ ] # 存储一副图像中合格vehicle的contours,合格vehicle, 即: contours中的顶点数大于等于6
imgRoad = mask . copy ( )
imgVehicle = mask . copy ( )
imgRoad [ imgRoad == 2 ] = 0 # 将vehicle过滤掉, 只包含背景和speedRoad
imgVehicle [ imgVehicle == 1 ] = 0 # 将speedRoad过滤掉, 只包含背景和vehicle
imgRoad = cv2 . cvtColor ( np . uint8 ( imgRoad ) , cv2 . COLOR_RGB2BGR ) # 道路
imgVehicle = cv2 . cvtColor ( np . uint8 ( imgVehicle ) , cv2 . COLOR_RGB2BGR ) # 车辆
t2 = time . time ( )
img1 = cv2 . cvtColor ( imgRoad , cv2 . COLOR_BGR2GRAY )
contours , hierarchy = cv2 . findContours ( img1 , cv2 . RETR_EXTERNAL , cv2 . CHAIN_APPROX_NONE )
t3 = time . time ( )
allRoadContent = [ ] # 存放所有的speedRoad信息, 单个speedRoad的信息为: [cnt, speedRoadAngle, rect[1]]
vehicleBox = [ ] # 存储合格vehicle的box参数, 合格vehicle,即: contours顶点个数大于等于6
vehicleWH = [ ] # 存储合格vehicle的宽高
normVehicle = [ ] # 存储合格vehicle的信息
unnormVehicle = [ ] # 存储不合格vehicle的信息, 不合格vehicle,即: contours顶点个数小于6
normVehicleCOOR = [ ] # 存储合格vehicle的中心点坐标
allRoadContent = storageRoad ( contours , allRoadContent , traffic_dict )
t4 = time . time ( )
# 开关。存储vehicle的信息时, 可选择不同的函数
if traffic_dict [ ' vehicleFlag ' ] == True :
traffic_dict [ ' det ' ] , normVehicleBD , unnormVehicle , normVehicleCOOR = storageVehicle1 ( traffic_dict , normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR , imgVehicle )
#所有车辆的[cls,x0,y0,x1,y1,score]
else :
traffic_dict [ ' det ' ] , normVehicleBD , unnormVehicle , normVehicleCOOR = storageVehicle2 ( traffic_dict , normVehicleBD , normVehicle , unnormVehicle , normVehicleCOOR , imgVehicle )
t5 = time . time ( )
if len ( normVehicleBD ) != 0 :
t6 = time . time ( )
vehicleBox , vehicleWH , traffic_dict [ ' det ' ] = angleRoundness ( normVehicleBD , vehicleBox , vehicleWH , allRoadContent , traffic_dict , normVehicleCOOR , imgVehicle )
t7 = time . time ( )
# 开关。计算vehicle之间的距离时, 可选择不同的函数
if traffic_dict [ ' distanceFlag ' ] == True :
traffic_dict [ ' det ' ] = vehicleDistance1 ( normVehicleCOOR , normVehicleBD , traffic_dict , vehicleWH )
else :
traffic_dict [ ' det ' ] = vehicleDistance2 ( normVehicleCOOR , traffic_dict , vehicleWH )
t8 = time . time ( )
targetList = traffic_dict [ ' det ' ]
# print("line393", targetList)
for i in range ( len ( targetList ) ) :
targetList [ i ] [ 9 ] = max ( targetList [ i ] [ 9 ] )
if len ( unnormVehicle ) != 0 :
targetList = targetList + unnormVehicle
t9 = time . time ( )
# print("line462", targetList) # 目标对象list, [[cls, x0, y0, x1, y1, score, 角度, 长宽比, 最小距离, max([角度得分, 长宽比得分, 最小距离得分]), 类别], ...]
ruleJudge = ' angle-rundness-distance: %.1f ' % ( get_ms ( t9 , t6 ) )
else :
targetList = unnormVehicle
ruleJudge = ' No angle-rundness-distance judging '
t10 = time . time ( )
time_infos = ' ---test---nothing--- '
#time_infos = 'postTime:%.2f (分割时间:%.2f, findContours:%.2f, ruleJudge:%.2f, storageRoad:%.2f, storageVehicle:%.2f, angleRoundScore:%.2f, vehicleDistance:%.2f, mergeList:%.2f)' % (
# get_ms(t10, t1), get_ms(t2, t1), get_ms(t3, t2), get_ms(t10, t3), get_ms(t4, t3), get_ms(t5, t4), get_ms(t7, t6), get_ms(t8, t7), get_ms(t9, t8))
time_infos = ' postTime: %.2f , ( findContours: %.1f , carContourFilter: %.1f , %s ) ' % ( get_ms ( t10 , t1 ) , get_ms ( t4 , t1 ) , get_ms ( t5 , t4 ) , ruleJudge )
return targetList , time_infos
2025-07-10 17:54:17 +08:00
def TrafficPostProcessing ( traffic_dict ) :
"""
对于字典traffic_dict中的各个键 , 说明如下 :
RoadArea : speedRoad的最小外接矩形的面积
spillsCOOR : 是一个列表 , 用于存储被检测出的spill的坐标 ( spill检测模型 )
ZoomFactor : 存储的是图像在H和W方向上的缩放因子 , 其值小于1
' cls ' : 类别号
"""
traffic_dict [ ' modelSize ' ] = [ 640 , 360 ]
mask = traffic_dict [ ' mask ' ]
H , W = mask . shape [ 0 : 2 ]
scaleH = traffic_dict [ ' modelSize ' ] [ 1 ] / H # 自适应调整缩放比例
scaleW = traffic_dict [ ' modelSize ' ] [ 0 ] / W
traffic_dict [ ' ZoomFactor ' ] = { ' x ' : scaleH , ' y ' : scaleW }
new_hw = [ int ( H * scaleH ) , int ( W * scaleW ) ]
t0 = time . time ( )
mask = cv2 . resize ( mask , ( new_hw [ 1 ] , new_hw [ 0 ] ) )
if len ( mask . shape ) == 3 :
mask = mask [ : , : , 0 ]
imgRoad = mask . copy ( )
imgRoad [ imgRoad == 2 ] = 0 # 将vehicle过滤掉, 只包含背景和speedRoad
imgRoad = cv2 . cvtColor ( np . uint8 ( imgRoad ) , cv2 . COLOR_RGB2BGR ) # 道路
imgRoad = cv2 . cvtColor ( imgRoad , cv2 . COLOR_BGR2GRAY ) #
contours , thresh = cv2 . threshold ( imgRoad , 0 , 255 , cv2 . THRESH_BINARY + cv2 . THRESH_OTSU )
# 寻找轮廓(多边界)
contours , hierarchy = cv2 . findContours ( thresh , cv2 . RETR_LIST , 2 )
contour_info = [ ]
for c in contours :
contour_info . append ( (
c ,
cv2 . isContourConvex ( c ) ,
cv2 . contourArea ( c ) ,
) )
contour_info = sorted ( contour_info , key = lambda c : c [ 2 ] , reverse = True )
t1 = time . time ( )
''' 新增模块::如果路面为空,则返回原图、无抛洒物等。 '''
if contour_info == [ ] :
# final_img=_img_cv
timeInfos = ' road is empty findContours: %.1f ' % get_ms ( t0 , t1 )
return [ ] , timeInfos
else :
# print(contour_info[0])
max_contour = contour_info [ 0 ] [ 0 ]
max_contour [ : , : , 0 ] = ( max_contour [ : , : , 0 ] / scaleW ) . astype ( np . int32 ) # contours恢复原图尺寸
max_contour [ : , : , 1 ] = ( max_contour [ : , : , 1 ] / scaleH ) . astype ( np . int32 ) # contours恢复原图尺寸
''' 3、preds中spillage, 通过1中路面过滤 '''
init_spillage_filterroad = traffic_dict [ ' det ' ]
final_spillage_filterroad = [ ]
for i in range ( len ( init_spillage_filterroad ) ) :
flag = xyxy_coordinate ( init_spillage_filterroad [ i ] , max_contour )
if flag == 1 :
final_spillage_filterroad . append ( init_spillage_filterroad [ i ] )
t2 = time . time ( )
timeInfos = ' findContours: %.1f , carContourFilter: %.1f ' % ( get_ms ( t0 , t1 ) , get_ms ( t2 , t1 ) )
return final_spillage_filterroad , timeInfos # 返回最终绘制的结果图、最高速搞萨物(坐标、类别、置信度)
2025-06-25 17:00:52 +08:00
def tracfficAccidentMixFunction ( preds , seg_pred_mulcls , pars ) :
tjime0 = time . time ( )
roadIou = pars [ ' roadIou ' ] if ' roadIou ' in pars . keys ( ) else 0.5
preds = np . array ( preds )
#area_factors= np.array([np.sum(seg_pred_mulcls[int(x[2]):int(x[4]), int(x[1]):int(x[3])] )*1.0/(1.0*(x[3]-x[1])*(x[4]-x[2])+0.00001) for x in preds] )
area_factors = np . array ( [ np . sum ( seg_pred_mulcls [ int ( x [ 1 ] ) : int ( x [ 3 ] ) , int ( x [ 0 ] ) : int ( x [ 2 ] ) ] ) * 1.0 / ( 1.0 * ( x [ 2 ] - x [ 0 ] ) * ( x [ 3 ] - x [ 1 ] ) + 0.00001 ) for x in preds ] ) #2023.08.03修改数据格式
water_flag = np . array ( area_factors > roadIou )
#print('##line936:',preds )
dets = preds [ water_flag ] ##如果是水上目标, 则需要与水的iou超过0.1;如果是岸坡目标,则直接保留。
dets = dets . tolist ( )
#label_info = get_label_info(pars['label_csv'])
imH , imW = seg_pred_mulcls . shape [ 0 : 2 ]
seg_pred = cv2 . resize ( seg_pred_mulcls , ( pars [ ' modelSize ' ] [ 0 ] , pars [ ' modelSize ' ] [ 1 ] ) )
mmH , mmW = seg_pred . shape [ 0 : 2 ]
fx = mmW / imW ; fy = mmH / imH
det_coords = [ ]
det_coords_original = [ ]
for box in dets :
#b_0 = box[1:5];b_0.insert(0,box[0]);b_0.append(box[5] )
b_0 = box [ 0 : 4 ] ; b_0 . insert ( 0 , box [ 5 ] ) ; b_0 . append ( box [ 4 ] )
det_coords_original . append ( box )
2025-07-10 17:54:17 +08:00
if int ( box [ 5 ] ) != pars [ ' CarId ' ] and int ( box [ 5 ] ) != pars [ ' CthcId ' ] : continue
2025-06-25 17:00:52 +08:00
det_coords . append ( b_0 )
#print('##line957:',det_coords_original )
pars [ ' ZoomFactor ' ] = { ' x ' : mmW / imW , ' y ' : mmH / imH }
#pars['mask']=seg_pred;
pars [ ' mask ' ] = seg_pred_mulcls ;
pars [ ' det ' ] = deepcopy ( det_coords )
#pars['label_info']=label_info
tlist = list ( pars . keys ( ) ) ; tlist . sort ( )
if len ( det_coords ) > 0 :
#print('###line459:',pars['mask'].shape, pars['det'])
list8 , time_infos = PostProcessing ( pars )
#print('###line461:',list8 )
Accident_results = np . array ( list8 , dtype = object )
acc_det = [ ]
#[1.0, 1692.0, 169.0, 1803.0, 221.0, 0.494875431060791, 30, 0.5, 3.0, 0.3, 0]
#[0 , 1 , 2 , 3 , 4 , 5 , 6, 7 , 8 , 9 , 10]
for bpoints in list8 :
if bpoints [ 9 ] > pars [ ' confThres ' ] :
xyxy = bpoints [ 1 : 5 ] ; xyxy = [ int ( x ) for x in xyxy ]
cls = pars [ ' cls ' ] ; conf = bpoints [ 9 ] ;
box_acc = [ * xyxy , conf , cls ]
acc_det . append ( box_acc )
#if cls in allowedList:
# p_result[1] = draw_painting_joint(xyxy,p_result[1],label_arraylist[int(cls)],score=conf,color=rainbows[int(cls)%20],font=font,socre_location="leftBottom")
#print('###line475:',acc_det )
#去掉被定为事故的车辆
carCorslist = [ [ int ( x [ 0 ] ) , int ( x [ 1 ] ) , int ( x [ 2 ] ) , int ( x [ 3 ] ) ] for x in det_coords_original ]
#print('##line81:',det_coords_original )
accidentCarIndexs = [ carCorslist . index ( [ int ( x [ 0 ] ) , int ( x [ 1 ] ) , int ( x [ 2 ] ) , int ( x [ 3 ] ) ] ) for x in acc_det ]
accidentCarIndexsKeep = set ( list ( range ( len ( det_coords_original ) ) ) ) - set ( accidentCarIndexs )
det_coords_original_tmp = [ det_coords_original [ x ] for x in accidentCarIndexsKeep ]
det_coords_original = det_coords_original_tmp
#print('##line85:',det_coords_original )
det_coords_original . extend ( acc_det )
#4.0, 961.0, 275.0, 1047.0, 288.0, 0.26662659645080566, 0.0, 0.0
#0 , 1 , 2 , 3 , 4 , 5 , 6 , 7
#det_coords_original =[ [ *x[1:6], x[0],*x[6:8] ] for x in det_coords_original]
else :
time_infos = " no tracfficAccidentMix process "
#p_result[2]= deepcopy(det_coords_original)
return deepcopy ( det_coords_original ) , time_infos
def tracfficAccidentMixFunction_N ( predList , pars ) :
preds , seg_pred_mulcls = predList [ 0 : 2 ]
2025-07-10 17:54:17 +08:00
return tracfficAccidentMixFunction ( preds , seg_pred_mulcls , pars )
def mixTraffic_postprocess ( preds , seg_pred_mulcls , pars = None ) :
''' 输入:路面上的结果(类别+坐标) 、原图、mask图像
过程 : 获得mask的轮廓 , 判断抛洒物是否在轮廓内 。
在 , 则保留且绘制 ; 不在 , 舍弃 。
返回 : 最终绘制的结果图 、 最终路面上物体 ( 坐标 、 类别 、 置信度 ) ,
'''
''' 1、最大分隔路面作为判断依据 '''
roadIou = pars [ ' roadIou ' ] if ' roadIou ' in pars . keys ( ) else 0.5
preds = np . array ( preds )
area_factors = np . array ( [ np . sum ( seg_pred_mulcls [ int ( x [ 1 ] ) : int ( x [ 3 ] ) , int ( x [ 0 ] ) : int ( x [ 2 ] ) ] ) * 1.0 / (
1.0 * ( x [ 2 ] - x [ 0 ] ) * ( x [ 3 ] - x [ 1 ] ) + 0.00001 ) for x in preds ] ) # 2023.08.03修改数据格式
water_flag = np . array ( area_factors > roadIou )
dets = preds [ water_flag ] ##如果是水上目标, 则需要与水的iou超过0.1;如果是岸坡目标,则直接保留。
dets = dets . tolist ( )
imH , imW = seg_pred_mulcls . shape [ 0 : 2 ]
seg_pred = cv2 . resize ( seg_pred_mulcls , ( pars [ ' modelSize ' ] [ 0 ] , pars [ ' modelSize ' ] [ 1 ] ) )
mmH , mmW = seg_pred . shape [ 0 : 2 ]
fx = mmW / imW ;
fy = mmH / imH
det_coords = [ ]
for box in dets :
if int ( box [ 5 ] ) != pars [ ' cls ' ] : continue
det_coords . append ( box )
pars [ ' ZoomFactor ' ] = { ' x ' : mmW / imW , ' y ' : mmH / imH }
pars [ ' mask ' ] = seg_pred_mulcls ;
pars [ ' det ' ] = deepcopy ( det_coords )
if len ( det_coords ) > 0 :
# print('###line459:',pars['mask'].shape, pars['det'])
return TrafficPostProcessing ( pars )
else :
return [ ] , ' no spills find in road '