mitlist/be/app/api/auth/oauth.py

99 lines
4.0 KiB
Python

from fastapi import APIRouter, Depends, Request
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.database import get_async_session
from app.models import User
from app.auth import oauth, fastapi_users, auth_backend
from app.config import settings
router = APIRouter()
@router.get('/google/login')
async def google_login(request: Request):
return await oauth.google.authorize_redirect(request, settings.GOOGLE_REDIRECT_URI)
@router.get('/google/callback')
async def google_callback(request: Request, db: AsyncSession = Depends(get_async_session)):
token_data = await oauth.google.authorize_access_token(request)
user_info = await oauth.google.parse_id_token(request, token_data)
# Check if user exists
existing_user = (await db.execute(select(User).where(User.email == user_info['email']))).scalar_one_or_none()
user_to_login = existing_user
if not existing_user:
# Create new user
new_user = User(
email=user_info['email'],
name=user_info.get('name', user_info.get('email')),
is_verified=True, # Email is verified by Google
is_active=True
)
db.add(new_user)
await db.commit()
await db.refresh(new_user)
user_to_login = new_user
# Generate JWT token
strategy = auth_backend.get_strategy()
token_response = await strategy.write_token(user_to_login)
access_token = token_response["access_token"]
refresh_token = token_response.get("refresh_token") # Use .get for safety, though it should be there
# Redirect to frontend with tokens
redirect_url = f"{settings.FRONTEND_URL}/auth/callback?access_token={access_token}"
if refresh_token:
redirect_url += f"&refresh_token={refresh_token}"
return RedirectResponse(url=redirect_url)
@router.get('/apple/login')
async def apple_login(request: Request):
return await oauth.apple.authorize_redirect(request, settings.APPLE_REDIRECT_URI)
@router.get('/apple/callback')
async def apple_callback(request: Request, db: AsyncSession = Depends(get_async_session)):
token_data = await oauth.apple.authorize_access_token(request)
user_info = token_data.get('user', await oauth.apple.userinfo(token=token_data) if hasattr(oauth.apple, 'userinfo') else {})
if 'email' not in user_info and 'sub' in token_data:
parsed_id_token = await oauth.apple.parse_id_token(request, token_data) if hasattr(oauth.apple, 'parse_id_token') else {}
user_info = {**parsed_id_token, **user_info}
if 'email' not in user_info:
return RedirectResponse(url=f"{settings.FRONTEND_URL}/auth/callback?error=apple_email_missing")
# Check if user exists
existing_user = (await db.execute(select(User).where(User.email == user_info['email']))).scalar_one_or_none()
user_to_login = existing_user
if not existing_user:
# Create new user
name_info = user_info.get('name', {})
first_name = name_info.get('firstName', '')
last_name = name_info.get('lastName', '')
full_name = f"{first_name} {last_name}".strip() if first_name or last_name else user_info.get('email')
new_user = User(
email=user_info['email'],
name=full_name,
is_verified=True, # Email is verified by Apple
is_active=True
)
db.add(new_user)
await db.commit()
await db.refresh(new_user)
user_to_login = new_user
# Generate JWT token
strategy = auth_backend.get_strategy()
token_response = await strategy.write_token(user_to_login)
access_token = token_response["access_token"]
refresh_token = token_response.get("refresh_token") # Use .get for safety
# Redirect to frontend with tokens
redirect_url = f"{settings.FRONTEND_URL}/auth/callback?access_token={access_token}"
if refresh_token:
redirect_url += f"&refresh_token={refresh_token}"
return RedirectResponse(url=redirect_url)