mitlist/be/app/auth.py
mohamad 84b046508a feat: Implement refresh token functionality in authentication flow
- Added support for refresh tokens in the authentication backend, allowing users to obtain new access tokens using valid refresh tokens.
- Created a new `BearerResponseWithRefresh` model to structure responses containing both access and refresh tokens.
- Updated the `AuthenticationBackend` to handle login and logout processes with refresh token support.
- Introduced a new `/auth/jwt/refresh` endpoint to facilitate token refreshing, validating the refresh token and generating new tokens as needed.
- Modified OAuth callback logic to generate and return both access and refresh tokens upon successful authentication.
- Updated frontend API service to send the refresh token in the Authorization header for token refresh requests.
2025-05-25 12:51:02 +02:00

151 lines
5.2 KiB
Python

from typing import Optional
from fastapi import Depends, Request
from fastapi.security import OAuth2PasswordRequestForm
from fastapi_users import BaseUserManager, FastAPIUsers, IntegerIDMixin
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
JWTStrategy,
)
from fastapi_users.db import SQLAlchemyUserDatabase
from sqlalchemy.ext.asyncio import AsyncSession
from authlib.integrations.starlette_client import OAuth
from starlette.config import Config
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import Response
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from .database import get_session
from .models import User
from .config import settings
# OAuth2 configuration
config = Config('.env')
oauth = OAuth(config)
# Google OAuth2 setup
oauth.register(
name='google',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email profile',
'redirect_uri': settings.GOOGLE_REDIRECT_URI
}
)
# Apple OAuth2 setup
oauth.register(
name='apple',
server_metadata_url='https://appleid.apple.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email name',
'redirect_uri': settings.APPLE_REDIRECT_URI
}
)
# Custom Bearer Response with Refresh Token
class BearerResponseWithRefresh(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
# Custom Bearer Transport that supports refresh tokens
class BearerTransportWithRefresh(BearerTransport):
async def get_login_response(self, token: str, refresh_token: str = None) -> Response:
if refresh_token:
bearer_response = BearerResponseWithRefresh(
access_token=token,
refresh_token=refresh_token,
token_type="bearer"
)
else:
# Fallback to standard response if no refresh token
bearer_response = {
"access_token": token,
"token_type": "bearer"
}
return JSONResponse(bearer_response.dict() if hasattr(bearer_response, 'dict') else bearer_response)
# Custom Authentication Backend with Refresh Token Support
class AuthenticationBackendWithRefresh(AuthenticationBackend):
def __init__(
self,
name: str,
transport: BearerTransportWithRefresh,
get_strategy,
get_refresh_strategy,
):
self.name = name
self.transport = transport
self.get_strategy = get_strategy
self.get_refresh_strategy = get_refresh_strategy
async def login(self, strategy, user) -> Response:
# Generate both access and refresh tokens
access_token = await strategy.write_token(user)
refresh_strategy = self.get_refresh_strategy()
refresh_token = await refresh_strategy.write_token(user)
return await self.transport.get_login_response(
token=access_token,
refresh_token=refresh_token
)
async def logout(self, strategy, user, token) -> Response:
return await self.transport.get_logout_response()
class UserManager(IntegerIDMixin, BaseUserManager[User, int]):
reset_password_token_secret = settings.SECRET_KEY
verification_token_secret = settings.SECRET_KEY
async def on_after_register(self, user: User, request: Optional[Request] = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")
async def on_after_login(
self, user: User, request: Optional[Request] = None, response: Optional[Response] = None
):
print(f"User {user.id} has logged in.")
async def get_user_db(session: AsyncSession = Depends(get_session)):
yield SQLAlchemyUserDatabase(session, User)
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)
# Updated transport with refresh token support
bearer_transport = BearerTransportWithRefresh(tokenUrl="auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=settings.SECRET_KEY, lifetime_seconds=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60)
def get_refresh_jwt_strategy() -> JWTStrategy:
# Refresh tokens last longer - 7 days
return JWTStrategy(secret=settings.SECRET_KEY, lifetime_seconds=7 * 24 * 60 * 60)
# Updated auth backend with refresh token support
auth_backend = AuthenticationBackendWithRefresh(
name="jwt",
transport=bearer_transport,
get_strategy=get_jwt_strategy,
get_refresh_strategy=get_refresh_jwt_strategy,
)
fastapi_users = FastAPIUsers[User, int](
get_user_manager,
[auth_backend],
)
current_active_user = fastapi_users.current_user(active=True)
current_superuser = fastapi_users.current_user(active=True, superuser=True)