mitlist/be/app/api/v1/endpoints/auth.py
2025-05-13 20:33:02 +02:00

136 lines
4.5 KiB
Python

# app/api/v1/endpoints/auth.py
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.user import UserCreate, UserPublic
from app.schemas.auth import Token
from app.crud import user as crud_user
from app.core.security import (
verify_password,
create_access_token,
create_refresh_token,
verify_refresh_token
)
from app.core.exceptions import (
EmailAlreadyRegisteredError,
InvalidCredentialsError,
UserCreationError
)
from app.config import settings
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post(
"/signup",
response_model=UserPublic,
status_code=201,
summary="Register New User",
description="Creates a new user account.",
tags=["Authentication"]
)
async def signup(
user_in: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""
Handles user registration.
- Validates input data.
- Checks if email already exists.
- Hashes the password.
- Stores the new user in the database.
"""
logger.info(f"Signup attempt for email: {user_in.email}")
existing_user = await crud_user.get_user_by_email(db, email=user_in.email)
if existing_user:
logger.warning(f"Signup failed: Email already registered - {user_in.email}")
raise EmailAlreadyRegisteredError()
try:
created_user = await crud_user.create_user(db=db, user_in=user_in)
logger.info(f"User created successfully: {created_user.email} (ID: {created_user.id})")
return created_user
except Exception as e:
logger.error(f"Error during user creation for {user_in.email}: {e}", exc_info=True)
raise UserCreationError()
@router.post(
"/login",
response_model=Token,
summary="User Login",
description="Authenticates a user and returns an access and refresh token.",
tags=["Authentication"]
)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: AsyncSession = Depends(get_db)
):
"""
Handles user login.
- Finds user by email (provided in 'username' field of form).
- Verifies the provided password against the stored hash.
- Generates and returns JWT access and refresh tokens upon successful authentication.
"""
logger.info(f"Login attempt for user: {form_data.username}")
user = await crud_user.get_user_by_email(db, email=form_data.username)
if not user or not verify_password(form_data.password, user.password_hash):
logger.warning(f"Login failed: Invalid credentials for user {form_data.username}")
raise InvalidCredentialsError()
access_token = create_access_token(subject=user.email)
refresh_token = create_refresh_token(subject=user.email)
logger.info(f"Login successful, tokens generated for user: {user.email}")
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type=settings.TOKEN_TYPE
)
@router.post(
"/refresh",
response_model=Token,
summary="Refresh Access Token",
description="Refreshes an access token using a refresh token.",
tags=["Authentication"]
)
async def refresh_token(
refresh_token_str: str,
db: AsyncSession = Depends(get_db)
):
"""
Handles access token refresh.
- Verifies the provided refresh token.
- If valid, generates and returns a new JWT access token and a new refresh token.
"""
logger.info("Access token refresh attempt")
payload = verify_refresh_token(refresh_token_str)
if not payload:
logger.warning("Refresh token invalid or expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
user_email = payload.get("sub")
if not user_email:
logger.error("User email not found in refresh token payload")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token payload",
headers={"WWW-Authenticate": "Bearer"},
)
new_access_token = create_access_token(subject=user_email)
new_refresh_token = create_refresh_token(subject=user_email)
logger.info(f"Access token refreshed and new refresh token issued for user: {user_email}")
return Token(
access_token=new_access_token,
refresh_token=new_refresh_token,
token_type=settings.TOKEN_TYPE
)