AIlib2/DrGraph/util/drHelper.py

703 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json, sys, cv2, os, glob, random, time, datetime
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from loguru import logger
from pathlib import Path
from DrGraph.util import torchHelper
class ServiceException(Exception): # 继承异常类
def __init__(self, code, msg, desc=None):
self.code = code
if desc is None:
self.msg = msg
else:
self.msg = msg % desc
def __str__(self):
logger.error("异常编码:{}, 异常描述:{}", self.code, self.msg)
class mathHelper:
@staticmethod
def init_seeds(seed=0):
"""
初始化随机数生成器种子
该函数用于设置随机数生成器的种子,以确保实验的可重现性。
它会同时设置Python标准库、NumPy和PyTorch的随机种子。
参数:
seed (int): 随机种子值默认为0
返回值:
无返回值
"""
random.seed(seed)
np.random.seed(seed)
torchHelper.init_torch_seeds(seed)
@staticmethod
def center_coordinate(boundbxs):
'''
计算矩形框的中心坐标
参数:
boundbxs (list/tuple): 包含两个对角坐标[x1, y1, x2, y2]的序列
返回:
tuple: 矩形框中心坐标(center_x, center_y)
'''
boundbxs_x1=boundbxs[0]
boundbxs_y1=boundbxs[1]
boundbxs_x2=boundbxs[2]
boundbxs_y2=boundbxs[3]
center_x=0.5*(boundbxs_x1+boundbxs_x2)
center_y=0.5*(boundbxs_y1+boundbxs_y2)
return center_x,center_y
@staticmethod
def fourcorner_coordinate(boundbxs):
'''
根据矩形框的两个对角坐标计算四个角点坐标
参数:
boundbxs (list): 包含两个对角坐标[x1, y1, x2, y2]的列表
返回:
list: 矩形框四个角点坐标按contours顺序排列的二维列表
[[x1,y1], [x3,y3], [x2,y2], [x4,y4]]
'''
boundbxs_x1=boundbxs[0]
boundbxs_y1=boundbxs[1]
boundbxs_x2=boundbxs[2]
boundbxs_y2=boundbxs[3]
wid=boundbxs_x2-boundbxs_x1
hei=boundbxs_y2-boundbxs_y1
boundbxs_x3=boundbxs_x1+wid
boundbxs_y3=boundbxs_y1
boundbxs_x4=boundbxs_x1
boundbxs_y4 = boundbxs_y1+hei
contours_rec=[[boundbxs_x1,boundbxs_y1],[boundbxs_x3,boundbxs_y3],[boundbxs_x2,boundbxs_y2],[boundbxs_x4,boundbxs_y4]]
return contours_rec
@staticmethod
def xywh2xyxy(box,iW=None,iH=None):
"""
将边界框从中心点+宽高格式(xc,yc,w,h)转换为左上角+右下角格式(x0,y0,x1,y1)
参数:
box: 包含边界框坐标信息的列表或数组前4个元素为[xc,yc,w,h]
iW: 图像宽度,用于将归一化坐标转换为像素坐标
iH: 图像高度,用于将归一化坐标转换为像素坐标
返回:
list: 转换后的边界框坐标[x0,y0,x1,y1]
"""
xc,yc,w,h = box[0:4]
x0 =max(0, xc-w/2.0)
x1 =min(1, xc+w/2.0)
y0=max(0, yc-h/2.0)
y1=min(1,yc+h/2.0)
if iW: x0,x1 = x0*iW,x1*iW
if iH: y0,y1 = y0*iH,y1*iH
return [x0,y0,x1,y1]
class ioHelper: # IO工具类
@staticmethod
def get_labelnames(labelnames):
"""
从JSON文件中读取标签名称列表
参数:
labelnames (str): 包含标签名称的JSON文件路径
返回:
list: 从JSON文件中读取的标签名称列表
"""
with open(labelnames,'r') as fp:
namesjson=json.load(fp)
names_fromfile=namesjson['labelnames']
names = names_fromfile
return names
@staticmethod
def get_images_videos(destPath, imageFixs=['.jpg','.JPG','.PNG','.png'],videoFixs=['.MP4','.mp4','.avi']):
'''
获取指定路径下的所有图像和视频文件路径
参数:
destPath (str): 输入路径,可以是文件夹路径或单个文件路径
imageFixs (list): 图像文件后缀名列表,默认为['.jpg','.JPG','.PNG','.png']
videoFixs (list): 视频文件后缀名列表,默认为['.MP4','.mp4','.avi']
返回:
tuple: 包含两个列表的元组 (imgpaths, videopaths)
- imgpaths (list): 图像文件路径列表
- videopaths (list): 视频文件路径列表
'''
imageFileNames = [];###获取文件里所有的图像
videoFileNames = []###获取文件里所有的视频
if os.path.isdir(destPath):
for postfix in imageFixs:
imageFileNames . extend(glob.glob('%s/*%s'%(destPath,postfix )) )
for postfix in videoFixs:
videoFileNames.extend(glob.glob('%s/*%s'%(destPath,postfix )) )
else:
postfix = os.path.splitext(destPath)[-1]
if postfix in imageFixs: imageFileNames = [ destPath ]
if postfix in videoFixs: videoFileNames = [destPath ]
logger.info('目录 [%s]下多媒体文件数量: Images %d, videos %d' % (destPath, len(imageFileNames), len(videoFileNames)))
return imageFileNames , videoFileNames
@staticmethod
def get_postProcess_para(parfile):
"""
从参数文件中读取后处理参数
参数:
parfile (str): 包含后处理参数的JSON格式参数文件路径
返回:
tuple: 包含四个元素的元组:
- conf_thres (float): 置信度阈值
- iou_thres (float): IOU阈值
- classes (list): 类别列表
- rainbows (list): 彩虹显示参数列表
异常:
AssertionError: 当参数文件中不包含'post_process'关键字时抛出
"""
with open(parfile) as fp:
par = json.load(fp)
assert 'post_process' in par.keys(), ' parfile has not key word:post_process'
parPost=par['post_process']
return parPost["conf_thres"],parPost["iou_thres"],parPost["classes"],parPost["rainbows"]
@staticmethod
def get_postProcess_para_dic(parfile):
"""
从参数文件中读取后处理参数字典
参数:
parfile (str): 参数文件路径文件应包含JSON格式的配置数据
返回:
dict: 包含后处理参数的字典,从参数文件的'post_process'键中提取
"""
with open(parfile) as fp:
par = json.load(fp)
parPost=par['post_process']
return parPost
@staticmethod
def checkFile(fileName, desc):
"""
检查文件是否存在
参数:
fileName (str): 要检查的文件名
desc (str): 文件描述信息,用于日志输出
返回值:
bool: 文件存在返回True不存在返回False
"""
if (len(fileName) > 0) and (os.path.exists(fileName) is False):
logger.error(f"{desc} - {fileName} 不存在,请检查!")
return False
else:
logger.info(f"{desc} - {fileName} 存在")
return True
class timeHelper:
@staticmethod
def date_modified(path=__file__):
# 将文件的修改时间戳转换为datetime对象
t = datetime.datetime.fromtimestamp(Path(path).stat().st_mtime)
# 返回格式化的日期字符串
return f'{t.year}-{t.month}-{t.day}'
@staticmethod
def deltaTimeString_MS(t2, t1, floatNumber = 1):
"""
计算两个时间点之间的差值,并以毫秒为单位返回格式化字符串
参数:
t2: 结束时间点(秒)
t1: 开始时间点(秒)
返回值:
str: 两个时间点差值的毫秒表示,保留一位小数
"""
formatString = '0.%df' % floatNumber
return ('%' + formatString) % ( (t2 - t1)*1000.0)
@staticmethod
def deltaTime_MS(t2,t1): # get_ms
"""
计算两个时间点之间的差值,并转换为毫秒单位
参数:
t2: 结束时间点
t1: 起始时间点
返回值:
float: 两个时间点差值的毫秒表示
"""
return (t2-t1)*1000.0
class drawHelper:
@staticmethod
def drawAllBox(preds,imgDraw,label_arraylist,rainbows,font):
"""
在图像上绘制所有检测框和标签信息
参数:
preds: 检测结果列表,每个元素包含检测框坐标、置信度和类别信息
imgDraw: 用于绘制的图像对象
label_arraylist: 标签名称列表
rainbows: 颜色列表,用于不同类别的框绘制
font: 绘制文本使用的字体
返回值:
imgDraw: 绘制完成的图像对象
"""
for box in preds:
cls,conf,xyxy = box[5],box[4], box[0:4] ##2023.08.03,修改了格式
#print('#####line46 demo.py:', cls,conf,xyxy, len(label_arraylist),len(rainbows) )
imgDraw = drawHelper.draw_painting_joint(xyxy,imgDraw,label_arraylist[int(cls)],score=conf,color=rainbows[int(cls)%20],font=font,socre_location="leftTop")
return imgDraw
@staticmethod
def draw_painting_joint(box,img,label_array,score=0.5,color=None,font={ 'line_thickness':None,'boxLine_thickness':None, 'fontSize':None},socre_location="leftTop"):
"""
在图像上绘制检测框、类别标签和置信度分数。
参数:
box (list or tuple): 检测框坐标,支持两种格式:
- 四点格式: [(x0,y0),(x1,y1),(x2,y2),(x3,y3)]
- 两点格式: [x0,y0,x1,y1]
img (numpy.ndarray): 输入图像,形状为 (H, W, C)
label_array (numpy.ndarray): 类别标签图像,形状为 (H, W, C)
score (float): 检测置信度分数默认为0.5
color (tuple or list): 检测框颜色,格式为(B, G, R)
font (dict): 字体相关参数字典,包含以下键值:
- 'line_thickness': 文本线条粗细
- 'boxLine_thickness': 检测框线条粗细
- 'fontSize': 字体大小
socre_location (str): 分数显示位置,支持'leftTop''leftBottom',默认为'leftTop'
返回:
numpy.ndarray: 绘制完成的图像
"""
#如果box[0]不是list or 元组则box是[ (x0,y0),(x1,y1),(x2,y2),(x3,y3)]四点格式
if isinstance(box[0], (list, tuple,np.ndarray ) ):
###先把中文类别字体赋值到img中
lh, lw, lc = label_array.shape
imh, imw, imc = img.shape
if socre_location=='leftTop':
x0 , y1 = box[0][0],box[0][1]
elif socre_location=='leftBottom':
x0,y1=box[3][0],box[3][1]
else:
print('plot.py line217 ,label_location:%s not implemented '%( socre_location ))
sys.exit(0)
x1 , y0 = x0 + lw , y1 - lh
if y0<0:y0=0;y1=y0+lh
if y1>imh: y1=imh;y0=y1-lh
if x0<0:x0=0;x1=x0+lw
if x1>imw:x1=imw;x0=x1-lw
img[y0:y1,x0:x1,:] = label_array
pts_cls=[(x0,y0),(x1,y1) ]
#把四边形的框画上
box_tl= font['boxLine_thickness'] or round(0.002 * (imh + imw) / 2) + 1
cv2.polylines(img, [box], True,color , box_tl)
####把英文字符score画到类别旁边
tl = font['line_thickness'] or round(0.002*(imh+imw)/2)+1#line/font thickness
label = ' %.2f'%(score)
tf = max(tl , 1) # font thickness
fontScale = font['fontSize'] or tl * 0.33
t_size = cv2.getTextSize(label, 0, fontScale=fontScale , thickness=tf)[0]
#if socre_location=='leftTop':
p1,p2= (pts_cls[1][0], pts_cls[0][1]),(pts_cls[1][0]+t_size[0],pts_cls[1][1])
cv2.rectangle(img, p1 , p2, color, -1, cv2.LINE_AA)
p3 = pts_cls[1][0],pts_cls[1][1]-(lh-t_size[1])//2
cv2.putText(img, label,p3, 0, fontScale, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
return img
else:####两点格式[x0,y0,x1,y1]
try:
box = [int(xx.cpu()) for xx in box]
except:
box=[ int(x) for x in box]
###先把中文类别字体赋值到img中
lh, lw, lc = label_array.shape
imh, imw, imc = img.shape
if socre_location=='leftTop':
x0 , y1 = box[0:2]
elif socre_location=='leftBottom':
x0,y1=box[0],box[3]
else:
print('plot.py line217 ,socre_location:%s not implemented '%( socre_location ))
sys.exit(0)
x1 , y0 = x0 + lw , y1 - lh
if y0<0:y0=0;y1=y0+lh
if y1>imh: y1=imh;y0=y1-lh
if x0<0:x0=0;x1=x0+lw
if x1>imw:x1=imw;x0=x1-lw
img[y0:y1,x0:x1,:] = label_array
###把矩形框画上,指定颜色和线宽
tl = font['line_thickness'] or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
box_tl= font['boxLine_thickness'] or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1
c1, c2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3]))
cv2.rectangle(img, c1, c2, color, thickness=box_tl, lineType=cv2.LINE_AA)
###把英文字符score画到类别旁边
label = ' %.2f'%(score)
tf = max(tl , 1) # font thickness
fontScale = font['fontSize'] or tl * 0.33
t_size = cv2.getTextSize(label, 0, fontScale=fontScale , thickness=tf)[0]
if socre_location=='leftTop':
c2 = c1[0]+ lw + t_size[0], c1[1] - lh
cv2.rectangle(img, (int(box[0])+lw,int(box[1])) , c2, color, -1, cv2.LINE_AA) # filled
cv2.putText(img, label, (c1[0]+lw, c1[1] - (lh-t_size[1])//2 ), 0, fontScale, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
elif socre_location=='leftBottom':
c2 = box[0]+ lw + t_size[0], box[3] - lh
cv2.rectangle(img, (int(box[0])+lw,int(box[3])) , c2, color, -1, cv2.LINE_AA) # filled
cv2.putText(img, label, ( box[0] + lw, box[3] - (lh-t_size[1])//2 ), 0, fontScale, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
#print('#####line224 fontScale:',fontScale,' thickness:',tf,' line_thickness:',font['line_thickness'],' boxLine thickness:',box_tl)
return img
class imgHelper:
@staticmethod
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
"""
对图像进行等比例缩放并填充letterbox操作使其满足指定尺寸要求并保持宽高比。
参数:
img (numpy.ndarray): 输入图像,形状为 [H, W, C]。
new_shape (int or tuple): 目标图像的尺寸。若为整数,则表示目标图像的宽和高相同;
若为元组,则格式为 (height, width)。
color (tuple): 填充边框的颜色,格式为 (B, G, R)。
auto (bool): 是否自动调整填充大小以满足 stride 的倍数约束。
scaleFill (bool): 是否拉伸图像以完全填充目标尺寸(不保持宽高比)。
scaleup (bool): 是否允许放大图像。若为 False则只缩小图像。
stride (int): 步长,用于确保输出图像尺寸是该值的倍数。
返回:
img (numpy.ndarray): 处理后的图像。
ratio (tuple): 宽度和高度的缩放比例 (width_ratio, height_ratio)。
(dw, dh) (tuple): 图像左右和上下方向的填充像素数量。
"""
# 获取当前图像的高度和宽度
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# 计算缩放比例,取宽高方向上的最小缩放比例以保持图像不被裁剪
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# 根据缩放比例计算新的未填充尺寸
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
# 根据不同模式调整填充尺寸
if auto: # 最小矩形填充,使图像尺寸满足 stride 的倍数
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # 拉伸图像以填满整个区域(不保持宽高比)
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
# 将总填充量分配到图像两侧
dw /= 2 # divide padding into 2 sides
dh /= 2
# 如果需要缩放图像,则执行 resize 操作
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
# 计算上下左右的填充像素数
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
# 在图像周围添加填充边框
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return img, ratio, (dw, dh)
@staticmethod
def img_pad(img, size, pad_value=[114,114,114]):
"""
将图像填充成固定尺寸
参数:
img: 输入图像numpy数组格式
size: 目标尺寸,格式为(高, 宽)
pad_value: 填充区域的颜色值,默认为[114,114,114]
返回值:
pad_image: 填充后的图像
(top, left, r): 填充信息元组,包含上边填充像素数、左边填充像素数和缩放比例
"""
H,W,_ = img.shape
r = max(H/size[0], W/size[1])
img_r = cv2.resize(img, (int(W/r), int(H/r)))
tb = size[0] - img_r.shape[0]
lr = size[1] - img_r.shape[1]
top = int(tb/2)
bottom = tb - top
left = int(lr/2)
right = lr - left
pad_image = cv2.copyMakeBorder(img_r, top, bottom, left, right, cv2.BORDER_CONSTANT,value=pad_value)
return pad_image,(top, left,r)
@staticmethod
def get_label_array( color=None, label=None,outfontsize=None,fontpath="./DrGraph/appIOs/conf/platech.ttf"):
"""
创建一个包含指定标签文本的图像数组
参数:
color: 标签背景颜色,元组格式(R, G, B)
label: 要显示的标签文本
outfontsize: 输出字体大小
fontpath: 字体文件路径,默认为"conf/platech.ttf"
返回:
numpy数组格式的标签图像
"""
# Plots one bounding box on image 'im' using PIL
fontsize = outfontsize
font = ImageFont.truetype(fontpath, fontsize,encoding='utf-8')
txt_width, txt_height = font.getsize(label)
im = np.zeros((txt_height,txt_width,3),dtype=np.uint8)
im = Image.fromarray(im)
draw = ImageDraw.Draw(im)
draw.rectangle([0, 0 , txt_width, txt_height ], fill=tuple(color))
draw.text(( 0 , -3 ), label, fill=(255, 255, 255), font=font)
im_array = np.asarray(im)
if outfontsize:
scaley = outfontsize / txt_height
im_array= cv2.resize(im_array,(0,0),fx = scaley ,fy =scaley)
return im_array
@staticmethod
def get_label_arrays(labelnames,colors,outfontsize=40,fontpath="./DrGraph/appIOs/conf/platech.ttf"):
"""
生成标签数组列表
该函数根据提供的标签名称和颜色列表,为每个标签创建对应的数组表示
参数:
labelnames (list): 标签名称列表
colors (list): 颜色列表,用于为标签着色
outfontsize (int): 输出字体大小默认为40
fontpath (str): 字体文件路径,默认为"conf/platech.ttf"
返回:
list: 包含每个标签对应数组的列表
"""
label_arraylist = []
if len(labelnames) > len(colors):
print('#####labelnames cnt > colors cnt#####')
for ii,labelname in enumerate(labelnames):
color = colors[ii%20]
label_arraylist.append(imgHelper.get_label_array(color=color,label=labelname,outfontsize=outfontsize,fontpath=fontpath))
return label_arraylist
def clip_coords(boxes, img_shape):
"""
将边界框坐标裁剪到图像边界范围内
参数:
boxes: torch.Tensor, 形状为(n, 4)的边界框坐标张量,格式为(xyxy)
img_shape: tuple or list, 图像形状(height, width)
返回值:
无返回值直接在原地修改boxes张量
"""
# Clip bounding xyxy bounding boxes to image shape (height, width)
boxes[:, 0].clamp_(0, img_shape[1]) # x1
boxes[:, 1].clamp_(0, img_shape[0]) # y1
boxes[:, 2].clamp_(0, img_shape[1]) # x2
boxes[:, 3].clamp_(0, img_shape[0]) # y2
@staticmethod
def scale_back(boxes,padInfos):
'''
将边界框坐标从填充后的图像空间缩放回原始图像空间
参数:
boxes: numpy数组形状为(n, 4),表示边界框坐标,格式为[x1, y1, x2, y2]
padInfos: 列表或数组,包含填充信息,前三个元素分别为[top, left, scale_ratio]
返回值:
numpy数组形状为(n, 4),缩放回原始图像空间的边界框坐标
'''
top, left,r = padInfos[0:3]
boxes[:,0] = (boxes[:,0] - left) * r
boxes[:,2] = (boxes[:,2] - left) * r
boxes[:,1] = (boxes[:,1] - top) * r
boxes[:,3] = (boxes[:,3] - top) * r
return boxes
@staticmethod
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):
"""
将坐标从img1_shape尺寸缩放回img0_shape原始尺寸
参数:
img1_shape: 目标图像尺寸 (height, width)
coords: 需要缩放的坐标数组格式为xyxy [x1, y1, x2, y2]
img0_shape: 原始图像尺寸 (height, width)
ratio_pad: 缩放比例和填充信息如果为None则自动计算
返回:
缩放后的坐标数组
"""
# Rescale coords (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
coords[:, [0, 2]] -= pad[0] # x padding
coords[:, [1, 3]] -= pad[1] # y padding
coords[:, :4] /= gain
imgHelper.clip_coords(coords, img0_shape)
return coords
@staticmethod
def expand_rectangle(rec,imgSize,ex_width,ex_height):
'''
矩形框外扩,且不超过图像范围
输入矩形框xyxy左上和右下坐标图像外扩宽度大小外扩高度大小
输出扩后的矩形框坐标xyxy
参数:
rec: 列表包含4个元素的矩形框坐标[x1, y1, x3, y3],其中(x1,y1)为左上角坐标,(x3,y3)为右下角坐标
imgSize: 图像尺寸,格式为[width, height]
ex_width: int矩形框在宽度方向的外扩像素大小
ex_height: int矩形框在高度方向的外扩像素大小
返回值:
list外扩后的矩形框坐标[x1, y1, x3, y3]
'''
#img_height=img.shape[0];img_width=img.shape[1]
img_width,img_height = imgSize[0:2]
#print('高、宽',img_height,img_width)
x1=rec[0]
y1=rec[1]
x3=rec[2]
y3=rec[3]
x1=x1-ex_width if x1-ex_width >= 0 else 0
y1=y1-ex_height if y1-ex_height >= 0 else 0
x3=x3+ex_width if x3+ex_width <= img_width else img_width
y3=y3+ex_height if y3+ex_height <=img_height else img_height
xyxy=[x1,y1,x3,y3]
return xyxy
class TimeDebugger:
currentDebugger = None
def __init__(self, bussinessName, enabled = True, logAtExit = False):
self.enabled = enabled
if not enabled:
return
self.indexInParentStep = 0
if TimeDebugger.currentDebugger:
self.parent = TimeDebugger.currentDebugger
self.level = self.parent.level + 1
self.parent.children.append(self)
self.indexInParentStep = len(self.parent.stepMoments)
else:
self.parent = None
self.level = 1
self.bussinessName = bussinessName
self.logAtExit = logAtExit
self.withDetail_MS = False
self.reset()
def reset(self):
self.stepMoments = []
self.startMoment = time.time()
self.children = []
self.exitMoment = 0
def __enter__(self):
if self.enabled:
TimeDebugger.currentDebugger = self
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.enabled:
TimeDebugger.currentDebugger = self.parent
self.exitMoment = time.time()
if self.logAtExit:
logger.info(self.getReportInfo())
def addStep(self, msg):
if self.enabled:
import inspect
frame = inspect.currentframe().f_back
filename = frame.f_code.co_filename
lineno = frame.f_lineno
function_name = frame.f_code.co_name
callerLocation = f"{os.path.basename(filename)}:{lineno} in {function_name}"
self.stepMoments.append((time.time(), msg, callerLocation))
def getStepInfo(self, index, lastTime):
(t, msg, callerLocation) = self.stepMoments[index]
info = '\n' + '\t' * (self.level + 1) + '%s: %s 毫秒' % (msg, timeHelper.deltaTimeString_MS(t, lastTime))
if self.withDetail_MS:
info += '(%d ~ %d)' % (lastTime * 10000 % 1000000, t * 10000 % 1000000)
l = 3 - (len(info) // 25)
info += '\t' * l if l > 0 else '\t'
info += callerLocation # + (' %d, %d, %d' % (len(info), 3 - (len(info) // 25), l))
return info
def getReportInfo(self, desc = ""):
t = time.time() if self.exitMoment == 0 else self.exitMoment
info = desc if len(desc) > 0 else ""
info += '[%s]业务 总共耗时 %s 毫秒,其中:' % (self.bussinessName, timeHelper.deltaTimeString_MS(t, self.startMoment))
if self.withDetail_MS:
info += '(%d ~ %d)' % (self.startMoment * 10000 % 1000000, t * 10000 % 1000000)
lastTime = self.startMoment
nextStepIndex = 0
for child in self.children:
childStepIndex = child.indexInParentStep
if childStepIndex == len(self.stepMoments):
childStepIndex -= 1
for i in range(nextStepIndex, childStepIndex + 1):
info += self.getStepInfo(i, lastTime)
lastTime = self.stepMoments[i][0]
nextStepIndex = i + 1
info += ' -> ' + child.getReportInfo()
for i in range(nextStepIndex, len(self.stepMoments)):
info += self.getStepInfo(i, lastTime)
lastTime = self.stepMoments[i][0]
# (t, msg) = self.stepMoments[i]
# info += '\n' + '\t' * (self.level + 1) + '%s: %s 毫秒' % (msg, timeHelper.deltaTimeString_MS(t, lastTime))
# if self.withDetail_MS:
# info += '(%d ~ %d)' % (lastTime * 10000 % 1000000, t * 10000 % 1000000)
# lastTime = t
return info