131 lines
5.2 KiB
Python
131 lines
5.2 KiB
Python
"""Authentication endpoints."""
|
|
|
|
from datetime import timedelta
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ...core.config import get_settings
|
|
from ...db.database import DrSession, get_session
|
|
from ...services.auth import AuthService
|
|
from ...services.user import UserService
|
|
from ...schemas.user import UserResponse, UserCreate, LoginResponse
|
|
from utils.util_schemas import Token, LoginRequest
|
|
from loguru import logger
|
|
from utils.util_exceptions import HxfResponse
|
|
|
|
router = APIRouter()
|
|
settings = get_settings()
|
|
|
|
@router.post("/register", response_model=UserResponse, summary="注册新用户")
|
|
async def register(
|
|
request_user_data: UserCreate,
|
|
session: DrSession = Depends(get_session)
|
|
):
|
|
"""注册新用户"""
|
|
user_service = UserService(session)
|
|
session.desc = f"START: 注册用户 {request_user_data.email}"
|
|
if await user_service.get_user_by_email(request_user_data.email):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"邮箱 {request_user_data.email} 已被注册,请使用其他邮箱注册!!!"
|
|
)
|
|
|
|
if await user_service.get_user_by_username(request_user_data.username):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"用户名 {request_user_data.username} 已被注册,请使用其他用户名注册!!!"
|
|
)
|
|
|
|
user = await user_service.create_user(request_user_data)
|
|
response = UserResponse.model_validate(user, from_attributes=True)
|
|
return HxfResponse(response)
|
|
|
|
@router.post("/login", response_model=LoginResponse, summary="邮箱与密码登录")
|
|
async def login(
|
|
login_data: LoginRequest,
|
|
session: DrSession = Depends(get_session)
|
|
):
|
|
"""邮箱与密码登录"""
|
|
# Authenticate user by email
|
|
session.desc = f"START: 用户 {login_data.email} 尝试登录"
|
|
user = await AuthService.authenticate_user_by_email(session, login_data.email, login_data.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"邮箱 {login_data.email} 或密码错误,请检查后重试!!!",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Create access token
|
|
access_token_expires = timedelta(minutes=settings.security.access_token_expire_minutes)
|
|
access_token = await AuthService.create_access_token(
|
|
session, data={"sub": user.username}, expires_delta=access_token_expires
|
|
)
|
|
session.desc = f"用户 {user.username} 登录成功"
|
|
|
|
response = LoginResponse(
|
|
access_token=access_token,
|
|
token_type="bearer",
|
|
expires_in=settings.security.access_token_expire_minutes * 60,
|
|
user=UserResponse.model_validate(user, from_attributes=True)
|
|
)
|
|
return HxfResponse(response)
|
|
|
|
@router.post("/login-oauth", response_model=Token, summary="用户通过用户名和密码登录 (OAuth2 兼容)")
|
|
async def login_oauth(
|
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
|
session: DrSession = Depends(get_session)
|
|
):
|
|
"""用户通过用户名和密码登录 (OAuth2 兼容)"""
|
|
session.desc = f"START: 用户 {form_data.username} 尝试 OAuth2 登录"
|
|
user = await AuthService.authenticate_user(session, form_data.username, form_data.password)
|
|
if not user:
|
|
session.desc = f"用户 {form_data.username} 尝试 OAuth2 登录失败"
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Create access token
|
|
access_token_expires = timedelta(minutes=settings.security.access_token_expire_minutes)
|
|
access_token = await AuthService.create_access_token(
|
|
session, data={"sub": user.username}, expires_delta=access_token_expires
|
|
)
|
|
session.desc = f"用户 {user.username} OAuth2 登录成功"
|
|
|
|
return HxfResponse(
|
|
{
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"expires_in": settings.security.access_token_expire_minutes * 60
|
|
}
|
|
)
|
|
|
|
@router.post("/refresh", response_model=Token, summary="刷新访问token")
|
|
async def refresh_token(
|
|
current_user = Depends(AuthService.get_current_user),
|
|
session: DrSession = Depends(get_session)
|
|
):
|
|
"""刷新访问 token"""
|
|
# Create new access token
|
|
access_token_expires = timedelta(minutes=settings.security.access_token_expire_minutes)
|
|
access_token = await AuthService.create_access_token(
|
|
session, data={"sub": current_user.username}, expires_delta=access_token_expires
|
|
)
|
|
|
|
response = Token(
|
|
access_token=access_token,
|
|
token_type="bearer",
|
|
expires_in=settings.security.access_token_expire_minutes * 60
|
|
)
|
|
return HxfResponse(response)
|
|
|
|
@router.get("/me", response_model=UserResponse, summary="获取当前用户信息")
|
|
async def get_current_user_info(
|
|
current_user = Depends(AuthService.get_current_user)
|
|
):
|
|
"""获取当前用户信息"""
|
|
response = UserResponse.model_validate(current_user, from_attributes=True)
|
|
return HxfResponse(response) |