hyf-backend/th_agenter/api/endpoints/auth.py

131 lines
5.2 KiB
Python
Raw Permalink Normal View History

2026-01-21 13:45:39 +08:00
"""Authentication endpoints."""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from ...core.config import get_settings
from ...db.database import DrSession, get_session
from ...services.auth import AuthService
from ...services.user import UserService
from ...schemas.user import UserResponse, UserCreate, LoginResponse
from utils.util_schemas import Token, LoginRequest
from loguru import logger
from utils.util_exceptions import HxfResponse
router = APIRouter()
settings = get_settings()
@router.post("/register", response_model=UserResponse, summary="注册新用户")
async def register(
request_user_data: UserCreate,
session: DrSession = Depends(get_session)
):
"""注册新用户"""
user_service = UserService(session)
session.desc = f"START: 注册用户 {request_user_data.email}"
if await user_service.get_user_by_email(request_user_data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"邮箱 {request_user_data.email} 已被注册,请使用其他邮箱注册!!!"
)
if await user_service.get_user_by_username(request_user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"用户名 {request_user_data.username} 已被注册,请使用其他用户名注册!!!"
)
user = await user_service.create_user(request_user_data)
response = UserResponse.model_validate(user, from_attributes=True)
return HxfResponse(response)
@router.post("/login", response_model=LoginResponse, summary="邮箱与密码登录")
async def login(
login_data: LoginRequest,
session: DrSession = Depends(get_session)
):
"""邮箱与密码登录"""
# Authenticate user by email
session.desc = f"START: 用户 {login_data.email} 尝试登录"
user = await AuthService.authenticate_user_by_email(session, login_data.email, login_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"邮箱 {login_data.email} 或密码错误,请检查后重试!!!",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.security.access_token_expire_minutes)
access_token = await AuthService.create_access_token(
session, data={"sub": user.username}, expires_delta=access_token_expires
)
session.desc = f"用户 {user.username} 登录成功"
response = LoginResponse(
access_token=access_token,
token_type="bearer",
expires_in=settings.security.access_token_expire_minutes * 60,
user=UserResponse.model_validate(user, from_attributes=True)
)
return HxfResponse(response)
@router.post("/login-oauth", response_model=Token, summary="用户通过用户名和密码登录 (OAuth2 兼容)")
async def login_oauth(
form_data: OAuth2PasswordRequestForm = Depends(),
session: DrSession = Depends(get_session)
):
"""用户通过用户名和密码登录 (OAuth2 兼容)"""
session.desc = f"START: 用户 {form_data.username} 尝试 OAuth2 登录"
user = await AuthService.authenticate_user(session, form_data.username, form_data.password)
if not user:
session.desc = f"用户 {form_data.username} 尝试 OAuth2 登录失败"
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.security.access_token_expire_minutes)
access_token = await AuthService.create_access_token(
session, data={"sub": user.username}, expires_delta=access_token_expires
)
session.desc = f"用户 {user.username} OAuth2 登录成功"
return HxfResponse(
{
"access_token": access_token,
"token_type": "bearer",
"expires_in": settings.security.access_token_expire_minutes * 60
}
)
@router.post("/refresh", response_model=Token, summary="刷新访问token")
async def refresh_token(
current_user = Depends(AuthService.get_current_user),
session: DrSession = Depends(get_session)
):
"""刷新访问 token"""
# Create new access token
access_token_expires = timedelta(minutes=settings.security.access_token_expire_minutes)
access_token = await AuthService.create_access_token(
session, data={"sub": current_user.username}, expires_delta=access_token_expires
)
response = Token(
access_token=access_token,
token_type="bearer",
expires_in=settings.security.access_token_expire_minutes * 60
)
return HxfResponse(response)
@router.get("/me", response_model=UserResponse, summary="获取当前用户信息")
async def get_current_user_info(
current_user = Depends(AuthService.get_current_user)
):
"""获取当前用户信息"""
response = UserResponse.model_validate(current_user, from_attributes=True)
return HxfResponse(response)