"""Role management API endpoints.""" from loguru import logger from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from sqlalchemy import select, and_, or_, delete from ...core.simple_permissions import require_super_admin from ...db.database import get_session from ...models.user import User from ...models.permission import Role, UserRole from ...services.auth import AuthService from ...schemas.permission import ( RoleCreate, RoleUpdate, RoleResponse, UserRoleAssign ) router = APIRouter(prefix="/roles", tags=["roles"]) @router.get("/", response_model=List[RoleResponse], summary="获取角色列表") async def get_roles( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), search: Optional[str] = Query(None), is_active: Optional[bool] = Query(None), session: Session = Depends(get_session), current_user = Depends(require_super_admin), ): """获取角色列表.""" session.desc = f"START: 获取用户 {current_user.username} 角色列表" stmt = select(Role) # 搜索 if search: stmt = stmt.where( or_( Role.name.ilike(f"%{search}%"), Role.code.ilike(f"%{search}%"), Role.description.ilike(f"%{search}%") ) ) # 状态筛选 if is_active is not None: stmt = stmt.where(Role.is_active == is_active) # 分页 stmt = stmt.offset(skip).limit(limit) roles = (await session.execute(stmt)).scalars().all() session.desc = f"SUCCESS: 用户 {current_user.username} 有 {len(roles)} 个角色" return [role.to_dict() for role in roles] @router.get("/{role_id}", response_model=RoleResponse, summary="获取角色详情") async def get_role( role_id: int, session: Session = Depends(get_session), current_user: User = Depends(require_super_admin) ): """获取角色详情.""" session.desc = f"START: 获取角色 {role_id} 详情" stmt = select(Role).where(Role.id == role_id) role = (await session.execute(stmt)).scalar_one_or_none() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="角色不存在" ) return role.to_dict() @router.post("/", response_model=RoleResponse, status_code=status.HTTP_201_CREATED, summary="创建角色") async def create_role( role_data: RoleCreate, session: Session = Depends(get_session), current_user: User = Depends(require_super_admin) ): """创建角色.""" session.desc = f"START: 创建角色 {role_data.name}" # 检查角色代码是否已存在 stmt = select(Role).where(Role.code == role_data.code) existing_role = (await session.execute(stmt)).scalar_one_or_none() if existing_role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="角色代码已存在" ) # 创建角色 role = Role( name=role_data.name, code=role_data.code, description=role_data.description, is_active=role_data.is_active ) role.set_audit_fields(current_user.id) await session.add(role) await session.commit() await session.refresh(role) logger.info(f"Role created: {role.name} by user {current_user.username}") return role.to_dict() @router.put("/{role_id}", response_model=RoleResponse, summary="更新角色") async def update_role( role_id: int, role_data: RoleUpdate, session: Session = Depends(get_session), current_user: User = Depends(require_super_admin) ): """更新角色.""" session.desc = f"更新用户 {current_user.username} 角色 {role_id}" stmt = select(Role).where(Role.id == role_id) role = (await session.execute(stmt)).scalar_one_or_none() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="角色不存在" ) # 超级管理员角色不能被编辑 if role.code == "SUPER_ADMIN": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="超级管理员角色不能被编辑" ) # 检查角色编码是否已存在(排除当前角色) if role_data.code and role_data.code != role.code: stmt = select(Role).where( and_( Role.code == role_data.code, Role.id != role_id ) ) existing_role = (await session.execute(stmt)).scalar_one_or_none() if existing_role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="角色代码已存在" ) # 更新字段 update_data = role_data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(role, field, value) # Audit fields are set automatically by SQLAlchemy event listener await session.commit() await session.refresh(role) logger.info(f"Role updated: {role.name} by user {current_user.username}") return role.to_dict() @router.delete("/{role_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除角色") async def delete_role( role_id: int, session: Session = Depends(get_session), current_user: User = Depends(require_super_admin) ): """删除角色.""" stmt = select(Role).where(Role.id == role_id) role = (await session.execute(stmt)).scalar_one_or_none() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="角色不存在" ) # 超级管理员角色不能被删除 if role.code == "SUPER_ADMIN": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="超级管理员角色不能被删除" ) # 检查是否有用户使用该角色 stmt = select(UserRole).where(UserRole.role_id == role_id) user_count = (await session.execute(stmt)).scalars().count() if user_count > 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"无法删除角色,还有 {user_count} 个用户关联此角色" ) # 删除角色 await session.delete(role) await session.commit() session.desc = f"角色删除成功: {role.name} by user {current_user.username}" return {"message": f"Role deleted successfully: {role.name} by user {current_user.username}"} # 用户角色管理路由 user_role_router = APIRouter(prefix="/user-roles", tags=["user-roles"]) @user_role_router.post("/assign", status_code=status.HTTP_201_CREATED, summary="为用户分配角色") async def assign_user_roles( assignment_data: UserRoleAssign, session: Session = Depends(get_session), current_user: User = Depends(require_super_admin) ): """为用户分配角色.""" # 验证用户是否存在 stmt = select(User).where(User.id == assignment_data.user_id) user = (await session.execute(stmt)).scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) # 验证角色是否存在 stmt = select(Role).where(Role.id.in_(assignment_data.role_ids)) roles = (await session.execute(stmt)).scalars().all() if len(roles) != len(assignment_data.role_ids): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="部分角色不存在" ) # 删除现有角色关联 stmt = delete(UserRole).where(UserRole.user_id == assignment_data.user_id) await session.execute(stmt) # 添加新的角色关联 for role_id in assignment_data.role_ids: user_role = UserRole( user_id=assignment_data.user_id, role_id=role_id ) await session.add(user_role) await session.commit() session.desc = f"User roles assigned: user {user.username}, roles {assignment_data.role_ids} by user {current_user.username}" return {"message": "角色分配成功"} @user_role_router.get("/user/{user_id}", response_model=List[RoleResponse], summary="获取用户角色列表") async def get_user_roles( user_id: int, session: Session = Depends(get_session), current_user: User = Depends(AuthService.get_current_active_user) ): """获取用户角色列表.""" # 检查权限:用户只能查看自己的角色,或者是超级管理员 if current_user.id != user_id and not await current_user.is_superuser(): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="无权限查看其他用户的角色" ) stmt = select(User).where(User.id == user_id) user = (await session.execute(stmt)).scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在" ) stmt = select(Role).join( UserRole, Role.id == UserRole.role_id ).where( UserRole.user_id == user_id ) roles = (await session.execute(stmt)).scalars().all() return [role.to_dict() for role in roles] # 将子路由添加到主路由 router.include_router(user_role_router)