127 lines
5.4 KiB
Python
127 lines
5.4 KiB
Python
# app/crud/item.py
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
from sqlalchemy import delete as sql_delete, update as sql_update # Use aliases
|
|
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
|
|
from typing import Optional, List as PyList
|
|
from datetime import datetime, timezone
|
|
|
|
from app.models import Item as ItemModel
|
|
from app.schemas.item import ItemCreate, ItemUpdate
|
|
from app.core.exceptions import (
|
|
ItemNotFoundError,
|
|
DatabaseConnectionError,
|
|
DatabaseIntegrityError,
|
|
DatabaseQueryError,
|
|
DatabaseTransactionError,
|
|
ConflictError
|
|
)
|
|
|
|
async def create_item(db: AsyncSession, item_in: ItemCreate, list_id: int, user_id: int) -> ItemModel:
|
|
"""Creates a new item record for a specific list."""
|
|
try:
|
|
db_item = ItemModel(
|
|
name=item_in.name,
|
|
quantity=item_in.quantity,
|
|
list_id=list_id,
|
|
added_by_id=user_id,
|
|
is_complete=False # Default on creation
|
|
# version is implicitly set to 1 by model default
|
|
)
|
|
db.add(db_item)
|
|
await db.flush()
|
|
await db.refresh(db_item)
|
|
await db.commit() # Explicitly commit here
|
|
return db_item
|
|
except IntegrityError as e:
|
|
await db.rollback() # Rollback on integrity error
|
|
raise DatabaseIntegrityError(f"Failed to create item: {str(e)}")
|
|
except OperationalError as e:
|
|
await db.rollback() # Rollback on operational error
|
|
raise DatabaseConnectionError(f"Database connection error: {str(e)}")
|
|
except SQLAlchemyError as e:
|
|
await db.rollback() # Rollback on other SQLAlchemy errors
|
|
raise DatabaseTransactionError(f"Failed to create item: {str(e)}")
|
|
except Exception as e: # Catch any other exception and attempt rollback
|
|
await db.rollback()
|
|
raise # Re-raise the original exception
|
|
|
|
async def get_items_by_list_id(db: AsyncSession, list_id: int) -> PyList[ItemModel]:
|
|
"""Gets all items belonging to a specific list, ordered by creation time."""
|
|
try:
|
|
result = await db.execute(
|
|
select(ItemModel)
|
|
.where(ItemModel.list_id == list_id)
|
|
.order_by(ItemModel.created_at.asc()) # Or desc() if preferred
|
|
)
|
|
return result.scalars().all()
|
|
except OperationalError as e:
|
|
raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
|
|
except SQLAlchemyError as e:
|
|
raise DatabaseQueryError(f"Failed to query items: {str(e)}")
|
|
|
|
async def get_item_by_id(db: AsyncSession, item_id: int) -> Optional[ItemModel]:
|
|
"""Gets a single item by its ID."""
|
|
try:
|
|
result = await db.execute(select(ItemModel).where(ItemModel.id == item_id))
|
|
return result.scalars().first()
|
|
except OperationalError as e:
|
|
raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
|
|
except SQLAlchemyError as e:
|
|
raise DatabaseQueryError(f"Failed to query item: {str(e)}")
|
|
|
|
async def update_item(db: AsyncSession, item_db: ItemModel, item_in: ItemUpdate, user_id: int) -> ItemModel:
|
|
"""Updates an existing item record, checking for version conflicts."""
|
|
try:
|
|
async with db.begin():
|
|
if item_db.version != item_in.version:
|
|
raise ConflictError(
|
|
f"Item '{item_db.name}' (ID: {item_db.id}) has been modified. "
|
|
f"Your version is {item_in.version}, current version is {item_db.version}. Please refresh."
|
|
)
|
|
|
|
update_data = item_in.model_dump(exclude_unset=True, exclude={'version'}) # Exclude version
|
|
|
|
# Special handling for is_complete
|
|
if 'is_complete' in update_data:
|
|
if update_data['is_complete'] is True:
|
|
if item_db.completed_by_id is None: # Only set if not already completed by someone
|
|
update_data['completed_by_id'] = user_id
|
|
else:
|
|
update_data['completed_by_id'] = None # Clear if marked incomplete
|
|
|
|
for key, value in update_data.items():
|
|
setattr(item_db, key, value)
|
|
|
|
item_db.version += 1 # Increment version
|
|
|
|
db.add(item_db)
|
|
await db.flush()
|
|
await db.refresh(item_db)
|
|
return item_db
|
|
except IntegrityError as e:
|
|
await db.rollback()
|
|
raise DatabaseIntegrityError(f"Failed to update item due to integrity constraint: {str(e)}")
|
|
except OperationalError as e:
|
|
await db.rollback()
|
|
raise DatabaseConnectionError(f"Database connection error while updating item: {str(e)}")
|
|
except ConflictError: # Re-raise ConflictError
|
|
await db.rollback()
|
|
raise
|
|
except SQLAlchemyError as e:
|
|
await db.rollback()
|
|
raise DatabaseTransactionError(f"Failed to update item: {str(e)}")
|
|
|
|
async def delete_item(db: AsyncSession, item_db: ItemModel) -> None:
|
|
"""Deletes an item record. Version check should be done by the caller (API endpoint)."""
|
|
try:
|
|
async with db.begin():
|
|
await db.delete(item_db)
|
|
# await db.commit() # Not needed with async with db.begin()
|
|
return None
|
|
except OperationalError as e:
|
|
await db.rollback()
|
|
raise DatabaseConnectionError(f"Database connection error while deleting item: {str(e)}")
|
|
except SQLAlchemyError as e:
|
|
await db.rollback()
|
|
raise DatabaseTransactionError(f"Failed to delete item: {str(e)}") |