"""User model.""" from sqlalchemy import String, Boolean, Text from sqlalchemy.orm import relationship, Mapped, mapped_column from typing import List, Optional from loguru import logger from ..db.base import BaseModel class User(BaseModel): """User model.""" __tablename__ = "users" username: Mapped[str] = mapped_column(String(50), unique=True, index=True, nullable=False) email: Mapped[str] = mapped_column(String(100), unique=True, index=True, nullable=False) hashed_password: Mapped[str] = mapped_column(String(255), nullable=False) full_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) avatar_url: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # 关系 - 只保留角色关系 roles = relationship("Role", secondary="user_roles", back_populates="users") def __repr__(self): return f"" def to_dict(self, include_sensitive=False, include_roles=False): """Convert to dictionary, optionally excluding sensitive data.""" data = super().to_dict() data.update({ 'username': self.username, 'email': self.email, 'full_name': self.full_name, 'is_active': self.is_active, 'avatar_url': self.avatar_url, 'bio': self.bio, 'is_superuser': self.is_admin # 使用同步的 is_admin 属性代替异步的 is_superuser 方法 }) if not include_sensitive: data.pop('hashed_password', None) if include_roles: try: # 安全访问roles关系属性 data['roles'] = [role.to_dict() for role in self.roles if role.is_active] except Exception: # 如果角色关系未加载或访问出错,返回空列表 data['roles'] = [] return data async def has_role(self, role_code: str) -> bool: """检查用户是否拥有指定角色.""" try: # 在异步环境中,需要先加载关系属性 from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import object_session from sqlalchemy import select from .permission import Role, UserRole session = object_session(self) if isinstance(session, AsyncSession): # 如果是异步会话,使用await加载关系 await session.refresh(self, ['roles']) return any(role.code == role_code and role.is_active for role in self.roles) except Exception: # 如果对象已分离或加载关系失败,使用数据库查询 from sqlalchemy.orm import object_session from sqlalchemy import select from .permission import Role, UserRole session = object_session(self) if session is None: # 如果没有会话,返回False return False else: from sqlalchemy.ext.asyncio import AsyncSession if isinstance(session, AsyncSession): # 如果是异步会话,使用异步查询 user_role = await session.execute( select(UserRole).join(Role).filter( UserRole.user_id == self.id, Role.code == role_code, Role.is_active == True ) ) return user_role.scalar_one_or_none() is not None else: # 如果是同步会话,使用同步查询 user_role = session.query(UserRole).join(Role).filter( UserRole.user_id == self.id, Role.code == role_code, Role.is_active == True ).first() return user_role is not None async def is_superuser(self) -> bool: """检查用户是否为超级管理员.""" return await self.has_role('SUPER_ADMIN') async def is_admin_user(self) -> bool: """检查用户是否为管理员(兼容性方法).""" return await self.is_superuser() # 注意:属性方式的 is_admin 无法是异步的,所以我们改为同步方法并简化实现 @property def is_admin(self) -> bool: """检查用户是否为管理员(属性方式).""" # 同步属性无法使用 await,所以我们只能检查已加载的角色 # 使用try-except捕获可能的MissingGreenlet错误 try: # 检查角色关系是否已经加载 # 如果roles属性是一个InstrumentedList且已经加载,那么它应该有__iter__方法 return any(role.code == 'SUPER_ADMIN' and role.is_active for role in self.roles) except Exception: # 如果角色关系未加载或访问出错,返回False return False