"""简化的权限检查系统.""" from functools import wraps from typing import Optional from fastapi import HTTPException, Depends from loguru import logger from sqlalchemy.orm import Session from ..db.database import get_session from ..models.user import User from ..models.permission import Role from ..services.auth import AuthService async def is_super_admin(user: User, session: Session) -> bool: """检查用户是否为超级管理员.""" session.desc = f"检查用户 {user.id} 是否为超级管理员" if not user or not user.is_active: session.desc = f"用户 {user.id} 不是活跃状态" return False try: # 直接使用提供的session查询,避免MissingGreenlet错误 from sqlalchemy import select from ..models.permission import UserRole, Role stmt = select(UserRole).join(Role).filter( UserRole.user_id == user.id, Role.code == 'SUPER_ADMIN', Role.is_active == True ) user_role = await session.execute(stmt) result = user_role.scalar_one_or_none() is not None session.desc = f"用户 {user.id} 超级管理员角色查询结果: {result}" return result except Exception as e: # 如果调用失败,记录错误并返回False session.desc = f"EXCEPTION: 用户 {user.id} 超级管理员角色查询失败: {str(e)}" logger.error(f"检查用户 {user.id} 超级管理员角色失败: {str(e)}") return False async def require_super_admin( current_user: User = Depends(AuthService.get_current_user), session: Session = Depends(get_session) ) -> User: """要求超级管理员权限的依赖项.""" if not await is_super_admin(current_user, session): raise HTTPException( status_code=403, detail="需要超级管理员权限" ) return current_user def require_authenticated_user( current_user: User = Depends(AuthService.get_current_user) ) -> User: """要求已认证用户的依赖项.""" if not current_user or not current_user.is_active: raise HTTPException( status_code=401, detail="需要登录" ) return current_user class SimplePermissionChecker: """简化的权限检查器.""" def __init__(self, db: Session): self.db = db async def check_super_admin(self, user: User) -> bool: """检查是否为超级管理员.""" return await is_super_admin(user, self.db) async def check_user_access(self, user: User, target_user_id: int) -> bool: """检查用户访问权限(自己或超级管理员).""" if not user or not user.is_active: return False # 超级管理员可以访问所有用户 if await self.check_super_admin(user): return True # 用户只能访问自己的信息 return user.id == target_user_id # 权限装饰器 def super_admin_required(func): """超级管理员权限装饰器.""" @wraps(func) def wrapper(*args, **kwargs): # 这个装饰器主要用于服务层,实际的FastAPI依赖项检查在路由层 return func(*args, **kwargs) return wrapper def authenticated_required(func): """认证用户权限装饰器.""" @wraps(func) def wrapper(*args, **kwargs): # 这个装饰器主要用于服务层,实际的FastAPI依赖项检查在路由层 return func(*args, **kwargs) return wrapper