90 lines
4.5 KiB
Python
90 lines
4.5 KiB
Python
# app/crud/user.py
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
from sqlalchemy.orm import selectinload # Ensure selectinload is imported
|
|
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
|
|
from typing import Optional
|
|
import logging # Add logging import
|
|
|
|
from app.models import User as UserModel, UserGroup as UserGroupModel, Group as GroupModel # Import related models for selectinload
|
|
from app.schemas.user import UserCreate
|
|
from app.core.security import hash_password
|
|
from app.core.exceptions import (
|
|
UserCreationError,
|
|
EmailAlreadyRegisteredError,
|
|
DatabaseConnectionError,
|
|
DatabaseIntegrityError,
|
|
DatabaseQueryError,
|
|
DatabaseTransactionError,
|
|
UserOperationError # Add if specific user operation errors are needed
|
|
)
|
|
|
|
logger = logging.getLogger(__name__) # Initialize logger
|
|
|
|
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[UserModel]:
|
|
"""Fetches a user from the database by email, with common relationships."""
|
|
try:
|
|
# db.begin() is not strictly necessary for a single read, but ensures atomicity if multiple reads were added.
|
|
# For a single select, it can be omitted if preferred, session handles connection.
|
|
stmt = (
|
|
select(UserModel)
|
|
.filter(UserModel.email == email)
|
|
.options(
|
|
selectinload(UserModel.group_associations).selectinload(UserGroupModel.group), # Groups user is member of
|
|
selectinload(UserModel.created_groups) # Groups user created
|
|
# Add other relationships as needed by UserPublic schema
|
|
)
|
|
)
|
|
result = await db.execute(stmt)
|
|
return result.scalars().first()
|
|
except OperationalError as e:
|
|
logger.error(f"Database connection error while fetching user by email '{email}': {str(e)}", exc_info=True)
|
|
raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Unexpected SQLAlchemy error while fetching user by email '{email}': {str(e)}", exc_info=True)
|
|
raise DatabaseQueryError(f"Failed to query user: {str(e)}")
|
|
|
|
async def create_user(db: AsyncSession, user_in: UserCreate) -> UserModel:
|
|
"""Creates a new user record in the database with common relationships loaded."""
|
|
try:
|
|
async with db.begin_nested() if db.in_transaction() else db.begin() as transaction:
|
|
_hashed_password = hash_password(user_in.password)
|
|
db_user = UserModel(
|
|
email=user_in.email,
|
|
hashed_password=_hashed_password, # Field name in model is hashed_password
|
|
name=user_in.name
|
|
)
|
|
db.add(db_user)
|
|
await db.flush() # Flush to get DB-generated values like ID
|
|
|
|
# Re-fetch with relationships
|
|
stmt = (
|
|
select(UserModel)
|
|
.where(UserModel.id == db_user.id)
|
|
.options(
|
|
selectinload(UserModel.group_associations).selectinload(UserGroupModel.group),
|
|
selectinload(UserModel.created_groups)
|
|
# Add other relationships as needed by UserPublic schema
|
|
)
|
|
)
|
|
result = await db.execute(stmt)
|
|
loaded_user = result.scalar_one_or_none()
|
|
|
|
if loaded_user is None:
|
|
raise UserOperationError("Failed to load user after creation.") # Define UserOperationError
|
|
|
|
return loaded_user
|
|
except IntegrityError as e:
|
|
logger.error(f"Database integrity error during user creation for email '{user_in.email}': {str(e)}", exc_info=True)
|
|
if "unique constraint" in str(e).lower() and ("users_email_key" in str(e).lower() or "ix_users_email" in str(e).lower()):
|
|
raise EmailAlreadyRegisteredError(email=user_in.email)
|
|
raise DatabaseIntegrityError(f"Failed to create user due to integrity issue: {str(e)}")
|
|
except OperationalError as e:
|
|
logger.error(f"Database connection error during user creation for email '{user_in.email}': {str(e)}", exc_info=True)
|
|
raise DatabaseConnectionError(f"Database connection error during user creation: {str(e)}")
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Unexpected SQLAlchemy error during user creation for email '{user_in.email}': {str(e)}", exc_info=True)
|
|
raise DatabaseTransactionError(f"Failed to create user due to other DB error: {str(e)}")
|
|
|
|
# Ensure UserOperationError is defined in app.core.exceptions if used
|
|
# Example: class UserOperationError(AppException): pass |