# -*- coding: utf-8 -*- import sys, os, cv2 from os import makedirs from os.path import join, exists from loguru import logger from json import loads from ruamel.yaml import safe_load, YAML import random, sys, math, inspect, psutil from pathlib import Path from PySide6.QtGui import QIcon, QColor from PySide6.QtCore import QObject, QRectF, QEventLoop, QIODevice, QTextStream, QFile from PySide6.QtWidgets import QApplication import DrGraph.utils.vclEnums as enums #region Property class Property: def __init__(self, read_func=None, write_func=None, default=None, hasMember=True): self.read_func = read_func self.write_func = write_func self.default = default self.owner_class = None self.private_name = None self.hasMember = hasMember def __set_name__(self, owner, name): if self.hasMember: self.private_name = f"_{name}" self.owner_class = owner def callDirectGet(self, instance, owner): if instance is None or not self.hasMember: return self.default if not hasattr(instance, self.private_name): return self.default return getattr(instance, self.private_name) def callCustomGet(self, instance, owner): if instance is None: return self if self.read_func is None: if hasattr(instance, self.private_name): return getattr(instance, self.private_name) return self.default try: if isinstance(self.read_func, str): if hasattr(instance, self.read_func): method = getattr(instance, self.read_func) return method() elif hasattr(self.read_func, '__name__'): method_name = self.read_func.__name__ if hasattr(instance, method_name): method = getattr(instance, method_name) return method() elif callable(self.read_func): try: return self.read_func(instance) except (TypeError, AttributeError): return self.read_func() except Exception as e: logger.error(e) if hasattr(instance, self.private_name): return getattr(instance, self.private_name) return self.default def callDirectSet(self, instance, value): if instance is None or not self.hasMember: return setattr(instance, self.private_name, value) def callCustomSet(self, instance, value): try: if isinstance(self.write_func, str): if hasattr(instance, self.write_func): method = getattr(instance, self.write_func) method(value) elif hasattr(self.write_func, '__name__'): method_name = self.write_func.__name__ if hasattr(instance, method_name): method = getattr(instance, method_name) method(value) elif callable(self.write_func): try: self.write_func(instance, value) except (TypeError, AttributeError): self.write_func(value) except Exception as e: pass class Property_rw(Property): def __init__(self, default=None, hasMember=True): super().__init__(None, None, default, hasMember) def __get__(self, instance, owner): return self.callDirectGet(instance, owner) def __set__(self, instance, value): self.callDirectSet(instance, value) class Property_Rw(Property): def __init__(self, read_func=None, default=None, hasMember=True): super().__init__(read_func, None, default, hasMember) def __get__(self, instance, owner): return self.callCustomGet(instance, owner) def __set__(self, instance, value): setattr(instance, self.private_name, value) class Property_rW(Property): def __init__(self, write_func=None, default=None, hasMember=True): super().__init__(None, write_func, default, hasMember) def __get__(self, instance, owner): return self.callDirectGet(instance, owner) def __set__(self, instance, value): self.callCustomSet(instance, value) class Property_RW(Property): def __init__(self, read_func=None, write_func=None, default=None, hasMember=True): super().__init__(read_func, write_func, default, hasMember) def __get__(self, instance, owner): return self.callCustomGet(instance, owner) def __set__(self, instance, value): self.callCustomSet(instance, value) #endregion Property class AppHelper(QObject): app = Property_rw(None) def setBriefStatusText(self, text): if self.briefStatusControl: self.briefStatusControl.setText(text) else: print(text) briefStatusText = Property_rW(setBriefStatusText, '') def setProgress(self, value): if self.progressBarControl: self.progressBarControl.setValue(value) progress = Property_rW(setProgress, 0) def setProgressMax(self, value): if self.progressBarControl: self.progressBarControl.setMaximum(value) progressMax = Property_rW(setProgressMax, 100) def setProgressMin(self, value): if self.progressBarControl: self.progressBarControl.setMinimum(value) progressMin = Property_rW(setProgressMin, 0) def __init__(self): self.briefStatusControl = None self.progressBarControl = None self._briefStatusText = '' pass class Helper: OnLogMsg = None AppFlag_SaveAnalysisResult = True AppFlag_SaveLog = False App = None @staticmethod def castRange(value, minValue, maxValue): return max(minValue, min(maxValue, value)) # 取得程序目录 @staticmethod def getPath_App(): if getattr(sys, 'frozen', False): # 如果程序是打包的exe文件 return os.path.dirname(sys.executable) else: # 如果是Python脚本 - 获取上两级目录 current_file = os.path.abspath(__file__) # f:\PySide6\AiBase\DrGraph\utils\Helper.py current_dir = os.path.dirname(current_file) # f:\PySide6\AiBase\DrGraph\utils parent_dir = os.path.dirname(current_dir) # f:\PySide6\AiBase\DrGraph root_dir = os.path.dirname(parent_dir) # f:\PySide6\AiBase return root_dir @staticmethod def fitOS(file_name): if sys.platform.startswith('win'): file_name = file_name.replace('/','\\') else: file_name = file_name.replace('\\', '/') return file_name def generateDistinctColors(n, s=0.8, v=0.7): import colorsys colors = [] for i in range(n): hue = i * 1.0 / n # 均匀分布在 [0, 1) r, g, b = colorsys.hsv_to_rgb(hue, s, v) colors.append(QColor(r * 255, g * 255, b * 255)) return colors def setBriefStatusText(self, text): Helper.App.setBriefStatusText(text) briefStatusText = Property_rW(setBriefStatusText, '') @staticmethod def Sleep(msec): QApplication.processEvents(QEventLoop.AllEvents, msec) @staticmethod def getAbsoluteFileName(file_name): if os.path.isabs(file_name): return Helper.fitOS(file_name) else: return Helper.fitOS(os.path.join(Helper.getPath_App(), file_name)) @staticmethod def getConfigs(path, read_type='yml'): """ 读取配置文件并返回解析后的配置信息 :param path: 配置文件路径 :param read_type: 配置文件类型,默认为'yml',可选'json'或'yml' :return: 解析后的配置信息,JSON格式返回字典,YML格式返回对应的数据结构 :raises Exception: 当无法获取配置信息时抛出异常 """ yaml = YAML(typ='safe', pure=True) with open(path, 'r', encoding='utf-8') as f: return yaml.load(f) # with open(path, 'r', encoding="utf-8") as f: # # 根据文件类型选择相应的解析方式 # if read_type == 'json': # return loads(f.read()) # if read_type == 'yml': # return safe_load(f) # 如果未成功读取配置信息,则抛出异常 raise Exception('路径: %s未获取配置信息' % path) @staticmethod def getTooltipText(content): # 增加一个小喇叭图标 # content = f' {content}' return f"""

DrGraph

{content}

""" @staticmethod def log_init(app, base_dir, env): """ 初始化日志配置 :param base_dir: 基础目录路径,用于定位配置文件和日志文件存储位置 :param env: 环境标识,用于加载对应环境的日志配置文件 :return: 无返回值 """ Helper.App = AppHelper() Helper.App.app = app # QToolTip样式 - 自定义样式 - 增加Header app.setStyleSheet(""" QToolTip { background-color: #dd2222; color: #f0f0f0; border: 1px solid #555; border-radius: 4px; padding: 6px; font: 10pt "Segoe UI"; opacity: 220; } """) log_config = Helper.getConfigs(join(base_dir, 'appIOs/configs/logger/drgraph_%s_logger.yml' % env)) # 判断日志文件是否存在,不存在创建 base_path = join(base_dir, log_config.get("base_path")) if not exists(base_path): makedirs(base_path) # 移除日志设置 logger.remove(handler_id=None) # 打印日志到文件 if bool(log_config.get("enable_file_log")): logger.add(join(base_path, log_config.get("log_name")), rotation=log_config.get("rotation"), retention=log_config.get("retention"), format=log_config.get("log_fmt"), level=log_config.get("level"), enqueue=True, encoding=log_config.get("encoding")) # 控制台输出 if bool(log_config.get("enable_stderr")): logger.add(sys.stderr, format=log_config.get("log_fmt"), level=log_config.get("level"), enqueue=True) logger.info("\n\n\n----=========== 日志配置初始化完成, 开始新的日志记录 ==========----") @staticmethod def log_info(msg, toWss = False): if Helper.OnLogMsg: Helper.OnLogMsg(f'INFO: {msg}', 'black') caller = inspect.stack()[1] logger.info(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "INFO", "msg" : msg} ) @staticmethod def log_error(msg, toWss = False): if Helper.OnLogMsg: Helper.OnLogMsg(f'ERROR: {msg}', 'red') caller = inspect.stack()[1] logger.error(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "ERROR", "msg" : msg} ) @staticmethod def log_warning(msg, toWss = False): if Helper.OnLogMsg: Helper.OnLogMsg(f'WARNING: {msg}', (255, 128, 0)) caller = inspect.stack()[1] logger.warning(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "WARNING", "msg" : msg} ) @staticmethod def log_debug(msg, toWss = False): if Helper.OnLogMsg: Helper.OnLogMsg(f'DEBUG: {msg}', (0, 128, 128)) caller = inspect.stack()[1] logger.debug(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "DEBUG", "msg" : msg} ) @staticmethod def log_critical(msg, toWss = False): if Helper.OnLogMsg: Helper.OnLogMsg(f'CRITICAL: {msg}', (128, 0, 128)) caller = inspect.stack()[1] logger.critical(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "CRITICAL", "msg" : msg} ) @staticmethod def log_exception(msg, toWss = False): if Helper.OnLogMsg: Helper.OnLogMsg(f'EXCEPTION: {msg}', (255, 140, 0)) caller = inspect.stack()[1] logger.exception(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "EXCEPTION", "msg" : msg} ) @staticmethod def log(msg, toWss = False): caller = inspect.stack()[1] logger.log(f"[{Path(caller.filename).name}:{caller.lineno}.{caller.function}()] > {msg}") if toWss: Helper.log_wss({"type": "log", "kind": "LOG", "msg" : msg} ) @staticmethod def log_wss(msg): if Helper.wss: Helper.wss.send(msg) @staticmethod def getTextSize(font, text): import pygame as pg surface = font.render(text, True, (0, 0, 0)) return (surface.get_width(), surface.get_height(), surface) @staticmethod def buildSurfaces(font, text, width, color, wordWrap): text = text.strip() w = Helper.getTextSize(font, text)[0] result = [] if w > width and wordWrap: segLen = math.floor(width / w * len(text)) while len(text): if len(text) < segLen: t = text text = '' else: t = text[:segLen] text = text[segLen:] result.append(font.render(t, True, color)) else: result.append(font.render(text, True, color)) return result @staticmethod def randomColor(): '''随机颜色''' return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) @staticmethod def reverseColor(color: QColor): '''反转颜色''' return (255 - color.red(), 255 - color.green(), 255 - color.blue()) @staticmethod def getRGB(color_value): # 如果是元组或列表形式的RGB值 if isinstance(color_value, (tuple, list)): if len(color_value) >= 3: # 取前三个值作为RGB r, g, b = color_value[0], color_value[1], color_value[2] # 确保值在0-255范围内 return (max(0, min(255, int(r))), max(0, min(255, int(g))), max(0, min(255, int(b)))) # 如果是整数形式的颜色值 elif isinstance(color_value, int): # 将整数转换为RGB分量 # 假设格式为0xRRGGBB r = (color_value >> 16) & 0xFF g = (color_value >> 8) & 0xFF b = color_value & 0xFF return (r, g, b) # 如果是字符串形式 elif isinstance(color_value, str): # 处理十六进制颜色值 if color_value.startswith('#'): hex_value = color_value[1:] if len(hex_value) == 3: # 简写形式 #RGB hex_value = ''.join([c*2 for c in hex_value]) if len(hex_value) in (6, 8): # #RRGGBB 或 #RRGGBBAA r = int(hex_value[0:2], 16) g = int(hex_value[2:4], 16) b = int(hex_value[4:6], 16) return (r, g, b) # 处理颜色名称(需要额外的颜色名称映射表) # 这里只列举几种常见颜色 color_names = { 'black': (0, 0, 0), 'white': (255, 255, 255), 'red': (255, 0, 0), 'green': (0, 255, 0), 'blue': (0, 0, 255), 'yellow': (255, 255, 0), 'magenta': (255, 0, 255), 'cyan': (0, 255, 255), 'orange': (255, 128, 0), # 根据项目规范 'teal': (0, 128, 128) # 根据项目规范 } if color_value.lower() in color_names: return color_names[color_value.lower()] # 如果是Color对象(如pygame.Color) elif hasattr(color_value, 'r') and hasattr(color_value, 'g') and hasattr(color_value, 'b'): return (color_value.r, color_value.g, color_value.b) # 默认返回黑色 return (0, 0, 0) @staticmethod def check_system_resources(): """检查系统资源使用情况""" logger.info("检查系统资源使用情况...") cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() network = psutil.net_io_counters() logger.info("检查系统资源使用情况完毕") return { 'cpu_percent': cpu_percent, 'memory_percent': memory.percent, 'memory_available': memory.available / (1024**3), # GB 'network_bytes_sent': int(network.bytes_sent / 1024), 'network_bytes_recv': int(network.bytes_recv / 1024) } @staticmethod def build_response(type, status : enums.Response, msg): status_code, status_msg = status.value result = { "type": "response", "request_type": type, "status_code": status_code, "status_msg": status_msg, "detail_msg": msg } if status_code != 0: Helper.error(result); return result @staticmethod def get_surrounding_rect(points): if len(points) == 0: return Constant.invalid_rect min_x = min(p.x() for p in points) min_y = min(p.y() for p in points) max_x = max(p.x() for p in points) max_y = max(p.y() for p in points) return QRectF(min_x, min_y, max_x - min_x, max_y - min_y) @staticmethod def getYoloLabellingInfo(dir_path, file_names, desc): if len(dir_path) > 0: imageNumber, labelNumber = 0, 0 imagePath = dir_path + 'images/' labelPath = dir_path + 'labels/' for file_name in file_names: if file_name.startswith(imagePath): imageNumber += 1 elif file_name.startswith(labelPath): labelNumber += 1 return f'{desc} {imageNumber - 1} 张图片,{labelNumber - 1} 张标签;', imageNumber - 1 return f'无{desc};', 0 @staticmethod def getMarkdownRenderText(mdContent): # 使用Python库直接将Markdown转换为HTML,避免JavaScript依赖 try: # 尝试导入markdown库 import markdown html_content = markdown.markdown(mdContent) # 添加基本样式使其美观 styled_html = f""" {html_content} """ return styled_html except ImportError: # 回退到JavaScript的marked.js方法 logger.warning("未找到markdown库,使用JavaScript渲染方式") # 从appIOs/configs加载marked.js file_js = QFile('appIOs/configs/marked.min.js') markedJs = '' if file_js.open(QIODevice.ReadOnly | QIODevice.Text): markedJs = file_js.readAll().data().decode('utf-8') file_js.close() # 转义markdown内容 escapedMd = mdContent.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') # 创建HTML模板 htmlTemplate = '''
''' # 生成HTML内容 htmlContent = htmlTemplate.replace('%1', markedJs).replace('%2', escapedMd) return htmlContent @staticmethod def getMarkdownRender(mdFileName): file = QFile(mdFileName) mdContent = '' if file.open(QIODevice.ReadOnly | QIODevice.Text): stream = QTextStream(file) stream.setAutoDetectUnicode(True) mdContent = stream.readAll() file.close() return Helper.getMarkdownRenderText(mdContent) else: logger.error(f'打开文件 {mdFileName} 失败') return f"

无法打开文件: {mdFileName}

" class RTTI: @staticmethod def _do_set_attr(obj, property_name, property_value): if obj is None: logger.error(f"RTTI.set: obj is None") return class_name = type(obj).__name__ object_name = obj.objectName() if property_name not in dir(obj): logger.error(f"RTTI.set: {class_name} {object_name}.{property_name} not in dir(obj)") return if property_name.endswith('icon') and isinstance(property_value, str): original_property_value = property_value if not os.path.exists(property_value): property_value = os.path.join('appIOs/res/images/icons',property_value) # logger.info(f"RTTI.set: {class_name} {object_name}.{property_name} = {property_value}(自动匹配)") if not os.path.exists(property_value): logger.error(f"{original_property_value}文件不存在 > RTTI.set: {class_name} {object_name}.{property_name} = '{original_property_value}'") return property_value = QIcon(property_value) setter_method = getattr(obj, f'set{property_name[0].upper() + property_name[1:]}') setter_method(property_value) @staticmethod def set(obj, property_name, property_value): property_list = property_name.split('.') if len(property_list) == 1: RTTI._do_set_attr(obj, property_name, property_value) else: dest_obj = obj for i in range(len(property_list) - 1): if not dest_obj: logger.error(f"RTTI.set: {property_list.join('.')} not found") return dest_obj = getattr(dest_obj, property_list[i]) RTTI._do_set_attr(dest_obj, property_list[-1], property_value) @staticmethod def _do_get_attr(obj, property_name): if obj is None: logger.error(f"RTTI.get: obj is None") return None, None if property_name not in dir(obj): logger.error(f"RTTI.get: {type(obj).__name__} {obj.objectName()}.{property_name} not in dir(obj)") return None, None # 返回属性类型与属性值 type_name = type(getattr(obj, property_name)).__name__ value = getattr(obj, property_name) return type_name, value # 取得属性类型与属性值 type, value = RTTI.get(obj, property_name) @staticmethod def get(obj, property_name): property_list = property_name.split('.') if len(property_list) == 1: return RTTI._do_get_attr(obj, property_name) else: dest_obj = obj for i in range(len(property_list) - 1): if not dest_obj: logger.error(f"RTTI.get: {property_list.join('.')} not found") return None dest_obj = getattr(dest_obj, property_list[i]) return RTTI._do_get_attr(dest_obj, property_list[-1]) class DrawHelper: @staticmethod def draw_dashed_line(mat, pt1, pt2, color, thickness=1, dash_length=10): dist = ((pt1[0] - pt2[0]) ** 2 + (pt1[1] - pt2[1]) ** 2) ** 0.5 dashes = int(dist / dash_length) for i in range(dashes): start = (int(pt1[0] + (pt2[0] - pt1[0]) * i / dashes), int(pt1[1] + (pt2[1] - pt1[1]) * i / dashes)) end = (int(pt1[0] + (pt2[0] - pt1[0]) * (i + 0.5) / dashes), int(pt1[1] + (pt2[1] - pt1[1]) * (i + 0.5) / dashes)) cv2.line(mat, start, end, color, thickness) @staticmethod def draw_dashed_rect(painter, rect, color, thickness=1, dash_length=10): x1, y1 = rect.left(), rect.top() x2, y2 = rect.right(), rect.bottom() DrawHelper.draw_dashed_line(painter, (x1, y1), (x2, y1), color, thickness, dash_length) DrawHelper.draw_dashed_line(painter, (x1, y2), (x2, y2), color, thickness, dash_length) DrawHelper.draw_dashed_line(painter, (x1, y1), (x1, y2), color, thickness, dash_length) DrawHelper.draw_dashed_line(painter, (x2, y1), (x2, y2), color, thickness, dash_length)