hyf-backend/utils/Helper.py

668 lines
27 KiB
Python
Raw Normal View History

2026-01-21 13:45:39 +08:00
# -*- coding: utf-8 -*-
import sys, os, cv2
from os import makedirs
from os.path import join, exists
from loguru import logger
from json import loads
from ruamel.yaml import safe_load, YAML
import random, sys, math, inspect, psutil
from pathlib import Path
from PySide6.QtGui import QIcon, QColor
from PySide6.QtCore import QObject, QRectF, QEventLoop, QIODevice, QTextStream, QFile
from PySide6.QtWidgets import QApplication
import DrGraph.utils.vclEnums as enums
#region Property
class Property:
def __init__(self, read_func=None, write_func=None, default=None, hasMember=True):
self.read_func = read_func
self.write_func = write_func
self.default = default
self.owner_class = None
self.private_name = None
self.hasMember = hasMember
def __set_name__(self, owner, name):
if self.hasMember:
self.private_name = f"_{name}"
self.owner_class = owner
def callDirectGet(self, instance, owner):
if instance is None or not self.hasMember:
return self.default
if not hasattr(instance, self.private_name):
return self.default
return getattr(instance, self.private_name)
def callCustomGet(self, instance, owner):
if instance is None:
return self
if self.read_func is None:
if hasattr(instance, self.private_name):
return getattr(instance, self.private_name)
return self.default
try:
if isinstance(self.read_func, str):
if hasattr(instance, self.read_func):
method = getattr(instance, self.read_func)
return method()
elif hasattr(self.read_func, '__name__'):
method_name = self.read_func.__name__
if hasattr(instance, method_name):
method = getattr(instance, method_name)
return method()
elif callable(self.read_func):
try:
return self.read_func(instance)
except (TypeError, AttributeError):
return self.read_func()
except Exception as e:
logger.error(e)
if hasattr(instance, self.private_name):
return getattr(instance, self.private_name)
return self.default
def callDirectSet(self, instance, value):
if instance is None or not self.hasMember:
return
setattr(instance, self.private_name, value)
def callCustomSet(self, instance, value):
try:
if isinstance(self.write_func, str):
if hasattr(instance, self.write_func):
method = getattr(instance, self.write_func)
method(value)
elif hasattr(self.write_func, '__name__'):
method_name = self.write_func.__name__
if hasattr(instance, method_name):
method = getattr(instance, method_name)
method(value)
elif callable(self.write_func):
try:
self.write_func(instance, value)
except (TypeError, AttributeError):
self.write_func(value)
except Exception as e:
pass
class Property_rw(Property):
def __init__(self, default=None, hasMember=True):
super().__init__(None, None, default, hasMember)
def __get__(self, instance, owner):
return self.callDirectGet(instance, owner)
def __set__(self, instance, value):
self.callDirectSet(instance, value)
class Property_Rw(Property):
def __init__(self, read_func=None, default=None, hasMember=True):
super().__init__(read_func, None, default, hasMember)
def __get__(self, instance, owner):
return self.callCustomGet(instance, owner)
def __set__(self, instance, value):
setattr(instance, self.private_name, value)
class Property_rW(Property):
def __init__(self, write_func=None, default=None, hasMember=True):
super().__init__(None, write_func, default, hasMember)
def __get__(self, instance, owner):
return self.callDirectGet(instance, owner)
def __set__(self, instance, value):
self.callCustomSet(instance, value)
class Property_RW(Property):
def __init__(self, read_func=None, write_func=None, default=None, hasMember=True):
super().__init__(read_func, write_func, default, hasMember)
def __get__(self, instance, owner):
return self.callCustomGet(instance, owner)
def __set__(self, instance, value):
self.callCustomSet(instance, value)
#endregion Property
class AppHelper(QObject):
app = Property_rw(None)
def setBriefStatusText(self, text):
if self.briefStatusControl:
self.briefStatusControl.setText(text)
else:
print(text)
briefStatusText = Property_rW(setBriefStatusText, '')
def setProgress(self, value):
if self.progressBarControl:
self.progressBarControl.setValue(value)
progress = Property_rW(setProgress, 0)
def setProgressMax(self, value):
if self.progressBarControl:
self.progressBarControl.setMaximum(value)
progressMax = Property_rW(setProgressMax, 100)
def setProgressMin(self, value):
if self.progressBarControl:
self.progressBarControl.setMinimum(value)
progressMin = Property_rW(setProgressMin, 0)
def __init__(self):
self.briefStatusControl = None
self.progressBarControl = None
self._briefStatusText = ''
pass
class Helper:
OnLogMsg = None
AppFlag_SaveAnalysisResult = True
AppFlag_SaveLog = False
App = None
@staticmethod
def castRange(value, minValue, maxValue):
return max(minValue, min(maxValue, value))
# 取得程序目录
@staticmethod
def getPath_App():
if getattr(sys, 'frozen', False):
# 如果程序是打包的exe文件
return os.path.dirname(sys.executable)
else:
# 如果是Python脚本 - 获取上两级目录
current_file = os.path.abspath(__file__) # f:\PySide6\AiBase\DrGraph\utils\Helper.py
current_dir = os.path.dirname(current_file) # f:\PySide6\AiBase\DrGraph\utils
parent_dir = os.path.dirname(current_dir) # f:\PySide6\AiBase\DrGraph
root_dir = os.path.dirname(parent_dir) # f:\PySide6\AiBase
return root_dir
@staticmethod
def fitOS(file_name):
if sys.platform.startswith('win'):
file_name = file_name.replace('/','\\')
else:
file_name = file_name.replace('\\', '/')
return file_name
def generateDistinctColors(n, s=0.8, v=0.7):
import colorsys
colors = []
for i in range(n):
hue = i * 1.0 / n # 均匀分布在 [0, 1)
r, g, b = colorsys.hsv_to_rgb(hue, s, v)
colors.append(QColor(r * 255, g * 255, b * 255))
return colors
def setBriefStatusText(self, text):
Helper.App.setBriefStatusText(text)
briefStatusText = Property_rW(setBriefStatusText, '')
@staticmethod
def Sleep(msec):
QApplication.processEvents(QEventLoop.AllEvents, msec)
@staticmethod
def getAbsoluteFileName(file_name):
if os.path.isabs(file_name):
return Helper.fitOS(file_name)
else:
return Helper.fitOS(os.path.join(Helper.getPath_App(), file_name))
@staticmethod
def getConfigs(path, read_type='yml'):
"""
读取配置文件并返回解析后的配置信息
:param path: 配置文件路径
:param read_type: 配置文件类型默认为'yml'可选'json''yml'
:return: 解析后的配置信息JSON格式返回字典YML格式返回对应的数据结构
:raises Exception: 当无法获取配置信息时抛出异常
"""
yaml = YAML(typ='safe', pure=True)
with open(path, 'r', encoding='utf-8') as f:
return yaml.load(f)
# with open(path, 'r', encoding="utf-8") as f:
# # 根据文件类型选择相应的解析方式
# if read_type == 'json':
# return loads(f.read())
# if read_type == 'yml':
# return safe_load(f)
# 如果未成功读取配置信息,则抛出异常
raise Exception('路径: %s未获取配置信息' % path)
@staticmethod
def getTooltipText(content):
# 增加一个小喇叭图标
# content = f'<img src="appIOs/res/images/icons/info.png" width="16" height="16"> {content}'
return f"""
<html>
<head/><body>
<p><span style=" font-weight:600; color:#ffffff;">DrGraph <img src="appIOs/res/images/icons/Notice.png" width="16" height="16"></span></p>
<p>{content}</p>
</body>
</html>
"""
@staticmethod
def log_init(app, base_dir, env):
"""
初始化日志配置
:param base_dir: 基础目录路径用于定位配置文件和日志文件存储位置
:param env: 环境标识用于加载对应环境的日志配置文件
:return: 无返回值
"""
Helper.App = AppHelper()
Helper.App.app = app
# QToolTip样式 - 自定义样式 - 增加Header
app.setStyleSheet("""
QToolTip {
background-color: #dd2222;
color: #f0f0f0;
border: 1px solid #555;
border-radius: 4px;
padding: 6px;
font: 10pt "Segoe UI";
opacity: 220;
}
""")
log_config = Helper.getConfigs(join(base_dir, 'appIOs/configs/logger/drgraph_%s_logger.yml' % env))
# 判断日志文件是否存在,不存在创建
base_path = join(base_dir, log_config.get("base_path"))
if not exists(base_path):
makedirs(base_path)
# 移除日志设置
logger.remove(handler_id=None)
# 打印日志到文件
if bool(log_config.get("enable_file_log")):
logger.add(join(base_path, log_config.get("log_name")),
rotation=log_config.get("rotation"),
retention=log_config.get("retention"),
format=log_config.get("log_fmt"),
level=log_config.get("level"),
enqueue=True,
encoding=log_config.get("encoding"))
# 控制台输出
if bool(log_config.get("enable_stderr")):
logger.add(sys.stderr,
format=log_config.get("log_fmt"),
level=log_config.get("level"),
enqueue=True)
logger.info("\n\n\n----=========== 日志配置初始化完成, 开始新的日志记录 ==========----")
@staticmethod
def log_info(msg, toWss = False):
if Helper.OnLogMsg:
Helper.OnLogMsg(f'INFO: {msg}', 'black')
caller = inspect.stack()[1]
logger.info(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "INFO", "msg" : msg} )
@staticmethod
def log_error(msg, toWss = False):
if Helper.OnLogMsg:
Helper.OnLogMsg(f'ERROR: {msg}', 'red')
caller = inspect.stack()[1]
logger.error(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "ERROR", "msg" : msg} )
@staticmethod
def log_warning(msg, toWss = False):
if Helper.OnLogMsg:
Helper.OnLogMsg(f'WARNING: {msg}', (255, 128, 0))
caller = inspect.stack()[1]
logger.warning(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "WARNING", "msg" : msg} )
@staticmethod
def log_debug(msg, toWss = False):
if Helper.OnLogMsg:
Helper.OnLogMsg(f'DEBUG: {msg}', (0, 128, 128))
caller = inspect.stack()[1]
logger.debug(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "DEBUG", "msg" : msg} )
@staticmethod
def log_critical(msg, toWss = False):
if Helper.OnLogMsg:
Helper.OnLogMsg(f'CRITICAL: {msg}', (128, 0, 128))
caller = inspect.stack()[1]
logger.critical(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "CRITICAL", "msg" : msg} )
@staticmethod
def log_exception(msg, toWss = False):
if Helper.OnLogMsg:
Helper.OnLogMsg(f'EXCEPTION: {msg}', (255, 140, 0))
caller = inspect.stack()[1]
logger.exception(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "EXCEPTION", "msg" : msg} )
@staticmethod
def log(msg, toWss = False):
caller = inspect.stack()[1]
logger.log(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}")
if toWss:
Helper.log_wss({"type": "log", "kind": "LOG", "msg" : msg} )
@staticmethod
def log_wss(msg):
if Helper.wss:
Helper.wss.send(msg)
@staticmethod
def getTextSize(font, text):
import pygame as pg
surface = font.render(text, True, (0, 0, 0))
return (surface.get_width(), surface.get_height(), surface)
@staticmethod
def buildSurfaces(font, text, width, color, wordWrap):
text = text.strip()
w = Helper.getTextSize(font, text)[0]
result = []
if w > width and wordWrap:
segLen = math.floor(width / w * len(text))
while len(text):
if len(text) < segLen:
t = text
text = ''
else:
t = text[:segLen]
text = text[segLen:]
result.append(font.render(t, True, color))
else:
result.append(font.render(text, True, color))
return result
@staticmethod
def randomColor():
'''随机颜色'''
return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
@staticmethod
def reverseColor(color: QColor):
'''反转颜色'''
return (255 - color.red(), 255 - color.green(), 255 - color.blue())
@staticmethod
def getRGB(color_value):
# 如果是元组或列表形式的RGB值
if isinstance(color_value, (tuple, list)):
if len(color_value) >= 3:
# 取前三个值作为RGB
r, g, b = color_value[0], color_value[1], color_value[2]
# 确保值在0-255范围内
return (max(0, min(255, int(r))),
max(0, min(255, int(g))),
max(0, min(255, int(b))))
# 如果是整数形式的颜色值
elif isinstance(color_value, int):
# 将整数转换为RGB分量
# 假设格式为0xRRGGBB
r = (color_value >> 16) & 0xFF
g = (color_value >> 8) & 0xFF
b = color_value & 0xFF
return (r, g, b)
# 如果是字符串形式
elif isinstance(color_value, str):
# 处理十六进制颜色值
if color_value.startswith('#'):
hex_value = color_value[1:]
if len(hex_value) == 3: # 简写形式 #RGB
hex_value = ''.join([c*2 for c in hex_value])
if len(hex_value) in (6, 8): # #RRGGBB 或 #RRGGBBAA
r = int(hex_value[0:2], 16)
g = int(hex_value[2:4], 16)
b = int(hex_value[4:6], 16)
return (r, g, b)
# 处理颜色名称(需要额外的颜色名称映射表)
# 这里只列举几种常见颜色
color_names = {
'black': (0, 0, 0),
'white': (255, 255, 255),
'red': (255, 0, 0),
'green': (0, 255, 0),
'blue': (0, 0, 255),
'yellow': (255, 255, 0),
'magenta': (255, 0, 255),
'cyan': (0, 255, 255),
'orange': (255, 128, 0), # 根据项目规范
'teal': (0, 128, 128) # 根据项目规范
}
if color_value.lower() in color_names:
return color_names[color_value.lower()]
# 如果是Color对象如pygame.Color
elif hasattr(color_value, 'r') and hasattr(color_value, 'g') and hasattr(color_value, 'b'):
return (color_value.r, color_value.g, color_value.b)
# 默认返回黑色
return (0, 0, 0)
@staticmethod
def check_system_resources():
"""检查系统资源使用情况"""
logger.info("检查系统资源使用情况...")
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
network = psutil.net_io_counters()
logger.info("检查系统资源使用情况完毕")
return {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available': memory.available / (1024**3), # GB
'network_bytes_sent': int(network.bytes_sent / 1024),
'network_bytes_recv': int(network.bytes_recv / 1024)
}
@staticmethod
def build_response(type, status : enums.Response, msg):
status_code, status_msg = status.value
result = {
"type": "response",
"request_type": type,
"status_code": status_code,
"status_msg": status_msg,
"detail_msg": msg
}
if status_code != 0:
Helper.error(result);
return result
@staticmethod
def get_surrounding_rect(points):
if len(points) == 0:
return Constant.invalid_rect
min_x = min(p.x() for p in points)
min_y = min(p.y() for p in points)
max_x = max(p.x() for p in points)
max_y = max(p.y() for p in points)
return QRectF(min_x, min_y, max_x - min_x, max_y - min_y)
@staticmethod
def getYoloLabellingInfo(dir_path, file_names, desc):
if len(dir_path) > 0:
imageNumber, labelNumber = 0, 0
imagePath = dir_path + 'images/'
labelPath = dir_path + 'labels/'
for file_name in file_names:
if file_name.startswith(imagePath):
imageNumber += 1
elif file_name.startswith(labelPath):
labelNumber += 1
return f'{desc} {imageNumber - 1} 张图片,{labelNumber - 1} 张标签;', imageNumber - 1
return f'{desc};', 0
@staticmethod
def getMarkdownRenderText(mdContent):
# 使用Python库直接将Markdown转换为HTML避免JavaScript依赖
try:
# 尝试导入markdown库
import markdown
html_content = markdown.markdown(mdContent)
# 添加基本样式使其美观
styled_html = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
max-width: 900px; margin: 20px auto; padding: 0 20px; line-height: 1.6; }}
code {{ background: #f5f5f5; padding: 2px 4px; border-radius: 3px; }}
pre {{ background: #f5f5f5; padding: 10px; border-radius: 5px; overflow: auto; }}
pre code {{ background: none; padding: 0; }}
h1, h2, h3 {{ color: #333; border-bottom: 1px solid #eee; padding-bottom: 5px; }}
</style>
</head>
<body>
{html_content}
</body>
</html>
"""
return styled_html
except ImportError:
# 回退到JavaScript的marked.js方法
logger.warning("未找到markdown库使用JavaScript渲染方式")
# 从appIOs/configs加载marked.js
file_js = QFile('appIOs/configs/marked.min.js')
markedJs = ''
if file_js.open(QIODevice.ReadOnly | QIODevice.Text):
markedJs = file_js.readAll().data().decode('utf-8')
file_js.close()
# 转义markdown内容
escapedMd = mdContent.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;').replace("'", '&#x27;')
# 创建HTML模板
htmlTemplate = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
max-width: 900px; margin: 20px auto; padding: 0 20px; line-height: 1.6; }
code { background: #f5f5f5; padding: 2px 4px; border-radius: 3px; }
pre { background: #f5f5f5; padding: 10px; border-radius: 5px; overflow: auto; }
pre code { background: none; padding: 0; }
h1, h2, h3 { color: #333; border-bottom: 1px solid #eee; padding-bottom: 5px; }
</style>
<script>
// 加载marked库
%1
</script>
</head>
<body>
<div id="content"></div>
<script>
const md = `%2`;
document.getElementById('content').innerHTML = marked.parse(md);
</script>
</body>
</html>
'''
# 生成HTML内容
htmlContent = htmlTemplate.replace('%1', markedJs).replace('%2', escapedMd)
return htmlContent
@staticmethod
def getMarkdownRender(mdFileName):
file = QFile(mdFileName)
mdContent = ''
if file.open(QIODevice.ReadOnly | QIODevice.Text):
stream = QTextStream(file)
stream.setAutoDetectUnicode(True)
mdContent = stream.readAll()
file.close()
return Helper.getMarkdownRenderText(mdContent)
else:
logger.error(f'打开文件 {mdFileName} 失败')
return f"<p style='color: red;'>无法打开文件: {mdFileName}</p>"
class RTTI:
@staticmethod
def _do_set_attr(obj, property_name, property_value):
if obj is None:
logger.error(f"RTTI.set: obj is None")
return
class_name = type(obj).__name__
object_name = obj.objectName()
if property_name not in dir(obj):
logger.error(f"RTTI.set: {class_name} {object_name}.{property_name} not in dir(obj)")
return
if property_name.endswith('icon') and isinstance(property_value, str):
original_property_value = property_value
if not os.path.exists(property_value):
property_value = os.path.join('appIOs/res/images/icons',property_value)
# logger.info(f"RTTI.set: {class_name} {object_name}.{property_name} = {property_value}(自动匹配)")
if not os.path.exists(property_value):
logger.error(f"{original_property_value}文件不存在 > RTTI.set: {class_name} {object_name}.{property_name} = '{original_property_value}'")
return
property_value = QIcon(property_value)
setter_method = getattr(obj, f'set{property_name[0].upper() + property_name[1:]}')
setter_method(property_value)
@staticmethod
def set(obj, property_name, property_value):
property_list = property_name.split('.')
if len(property_list) == 1:
RTTI._do_set_attr(obj, property_name, property_value)
else:
dest_obj = obj
for i in range(len(property_list) - 1):
if not dest_obj:
logger.error(f"RTTI.set: {property_list.join('.')} not found")
return
dest_obj = getattr(dest_obj, property_list[i])
RTTI._do_set_attr(dest_obj, property_list[-1], property_value)
@staticmethod
def _do_get_attr(obj, property_name):
if obj is None:
logger.error(f"RTTI.get: obj is None")
return None, None
if property_name not in dir(obj):
logger.error(f"RTTI.get: {type(obj).__name__} {obj.objectName()}.{property_name} not in dir(obj)")
return None, None
# 返回属性类型与属性值
type_name = type(getattr(obj, property_name)).__name__
value = getattr(obj, property_name)
return type_name, value
# 取得属性类型与属性值 type, value = RTTI.get(obj, property_name)
@staticmethod
def get(obj, property_name):
property_list = property_name.split('.')
if len(property_list) == 1:
return RTTI._do_get_attr(obj, property_name)
else:
dest_obj = obj
for i in range(len(property_list) - 1):
if not dest_obj:
logger.error(f"RTTI.get: {property_list.join('.')} not found")
return None
dest_obj = getattr(dest_obj, property_list[i])
return RTTI._do_get_attr(dest_obj, property_list[-1])
class DrawHelper:
@staticmethod
def draw_dashed_line(mat, pt1, pt2, color, thickness=1, dash_length=10):
dist = ((pt1[0] - pt2[0]) ** 2 + (pt1[1] - pt2[1]) ** 2) ** 0.5
dashes = int(dist / dash_length)
for i in range(dashes):
start = (int(pt1[0] + (pt2[0] - pt1[0]) * i / dashes), int(pt1[1] + (pt2[1] - pt1[1]) * i / dashes))
end = (int(pt1[0] + (pt2[0] - pt1[0]) * (i + 0.5) / dashes), int(pt1[1] + (pt2[1] - pt1[1]) * (i + 0.5) / dashes))
cv2.line(mat, start, end, color, thickness)
@staticmethod
def draw_dashed_rect(painter, rect, color, thickness=1, dash_length=10):
x1, y1 = rect.left(), rect.top()
x2, y2 = rect.right(), rect.bottom()
DrawHelper.draw_dashed_line(painter, (x1, y1), (x2, y1), color, thickness, dash_length)
DrawHelper.draw_dashed_line(painter, (x1, y2), (x2, y2), color, thickness, dash_length)
DrawHelper.draw_dashed_line(painter, (x1, y1), (x1, y2), color, thickness, dash_length)
DrawHelper.draw_dashed_line(painter, (x2, y1), (x2, y2), color, thickness, dash_length)