-
-
-
-
-
-
-
- {{ getActionLabel(action) }}
-
-
- {{ new Date(action.timestamp).toLocaleString() }}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Mooo
-
-
-
-
-
-
-
-
- Logout
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-import type { RouteRecordRaw } from 'vue-router';
-
-const routes: RouteRecordRaw[] = [
- {
- path: '/',
- component: () => import('layouts/MainLayout.vue'),
- children: [
- { path: '', redirect: '/lists' },
- { path: 'lists', name: 'PersonalLists', component: () => import('pages/ListsPage.vue') },
- {
- path: 'lists/:id',
- name: 'ListDetail',
- component: () => import('pages/ListDetailPage.vue'),
- props: true,
- },
- { path: 'groups', name: 'GroupsList', component: () => import('pages/GroupsPage.vue') },
- {
- path: 'groups/:id',
- name: 'GroupDetail',
- component: () => import('pages/GroupDetailPage.vue'),
- props: true,
- },
- {
- path: 'groups/:groupId/lists',
- name: 'GroupLists',
- component: () => import('pages/ListsPage.vue'),
- props: true,
- },
- { path: 'account', name: 'Account', component: () => import('pages/AccountPage.vue') },
- ],
- },
-
- {
- path: '/',
- component: () => import('layouts/AuthLayout.vue'),
- children: [
- { path: 'login', component: () => import('pages/LoginPage.vue') },
- { path: 'signup', component: () => import('pages/SignupPage.vue') },
- ],
- },
-
- // Always leave this as last one,
- // but you can also remove it
- {
- path: '/:catchAll(.*)*',
- component: () => import('pages/ErrorNotFound.vue'),
- },
-];
-
-export default routes;
-
-
-
-import { defineStore } from 'pinia';
-import { ref, computed } from 'vue';
-import { apiClient, API_ENDPOINTS } from 'src/config/api';
-
-interface AuthState {
- accessToken: string | null;
- refreshToken: string | null;
- user: {
- email: string;
- name: string;
- } | null;
-}
-
-export const useAuthStore = defineStore('auth', () => {
- // State
- const accessToken = ref(localStorage.getItem('token'));
- const refreshToken = ref(localStorage.getItem('refresh_token'));
- const user = ref(null);
-
- // Getters
- const isAuthenticated = computed(() => !!accessToken.value);
- const getUser = computed(() => user.value);
-
- // Actions
- const setTokens = (tokens: { access_token: string; refresh_token: string }) => {
- accessToken.value = tokens.access_token;
- refreshToken.value = tokens.refresh_token;
- localStorage.setItem('token', tokens.access_token);
- localStorage.setItem('refresh_token', tokens.refresh_token);
- };
-
- const clearTokens = () => {
- accessToken.value = null;
- refreshToken.value = null;
- user.value = null;
- localStorage.removeItem('token');
- localStorage.removeItem('refresh_token');
- };
-
- const setUser = (userData: AuthState['user']) => {
- user.value = userData;
- };
-
- const login = async (email: string, password: string) => {
- const formData = new FormData();
- formData.append('username', email);
- formData.append('password', password);
-
- const response = await apiClient.post(API_ENDPOINTS.AUTH.LOGIN, formData, {
- headers: {
- 'Content-Type': 'application/x-www-form-urlencoded',
- },
- });
-
- const { access_token, refresh_token } = response.data;
- setTokens({ access_token, refresh_token });
- return response.data;
- };
-
- const signup = async (userData: { name: string; email: string; password: string }) => {
- const response = await apiClient.post(API_ENDPOINTS.AUTH.SIGNUP, userData);
- return response.data;
- };
-
- const refreshAccessToken = async () => {
- if (!refreshToken.value) {
- throw new Error('No refresh token available');
- }
-
- try {
- const response = await apiClient.post(API_ENDPOINTS.AUTH.REFRESH_TOKEN, {
- refresh_token: refreshToken.value,
- });
-
- const { access_token, refresh_token } = response.data;
- setTokens({ access_token, refresh_token });
- return response.data;
- } catch (error) {
- clearTokens();
- throw error;
- }
- };
-
- const logout = () => {
- clearTokens();
- };
-
- return {
- // State
- accessToken,
- refreshToken,
- user,
- // Getters
- isAuthenticated,
- getUser,
- // Actions
- setTokens,
- clearTokens,
- setUser,
- login,
- signup,
- refreshAccessToken,
- logout,
- };
-});
-
-
-
-import { defineStore } from 'pinia';
-import { ref, computed } from 'vue';
-import { useQuasar } from 'quasar';
-import { LocalStorage } from 'quasar';
-
-export interface OfflineAction {
- id: string;
- type: 'add' | 'complete' | 'update' | 'delete';
- itemId?: string;
- data: unknown;
- timestamp: number;
- version?: number;
-}
-
-export interface ConflictResolution {
- version: 'local' | 'server' | 'merge';
- action: OfflineAction;
-}
-
-export interface ConflictData {
- localVersion: {
- data: Record;
- timestamp: number;
- };
- serverVersion: {
- data: Record;
- timestamp: number;
- };
- action: OfflineAction;
-}
-
-export const useOfflineStore = defineStore('offline', () => {
- const $q = useQuasar();
- const isOnline = ref(navigator.onLine);
- const pendingActions = ref([]);
- const isProcessingQueue = ref(false);
- const showConflictDialog = ref(false);
- const currentConflict = ref(null);
-
- // Initialize from IndexedDB
- const init = () => {
- try {
- const stored = LocalStorage.getItem('offline-actions');
- if (stored) {
- pendingActions.value = JSON.parse(stored as string);
- }
- } catch (error) {
- console.error('Failed to load offline actions:', error);
- }
- };
-
- // Save to IndexedDB
- const saveToStorage = () => {
- try {
- LocalStorage.set('offline-actions', JSON.stringify(pendingActions.value));
- } catch (error) {
- console.error('Failed to save offline actions:', error);
- }
- };
-
- // Add a new offline action
- const addAction = (action: Omit) => {
- const newAction: OfflineAction = {
- ...action,
- id: crypto.randomUUID(),
- timestamp: Date.now(),
- };
- pendingActions.value.push(newAction);
- saveToStorage();
- };
-
- // Process the queue when online
- const processQueue = async () => {
- if (isProcessingQueue.value || !isOnline.value) return;
-
- isProcessingQueue.value = true;
- const actions = [...pendingActions.value];
-
- for (const action of actions) {
- try {
- await processAction(action);
- pendingActions.value = pendingActions.value.filter(a => a.id !== action.id);
- saveToStorage();
- } catch (error) {
- if (error instanceof Error && error.message.includes('409')) {
- $q.notify({
- type: 'warning',
- message: 'Item was modified by someone else while you were offline. Please review.',
- actions: [
- {
- label: 'Review',
- color: 'white',
- handler: () => {
- // TODO: Implement conflict resolution UI
- }
- }
- ]
- });
- } else {
- console.error('Failed to process offline action:', error);
- }
- }
- }
-
- isProcessingQueue.value = false;
- };
-
- // Process a single action
- const processAction = async (action: OfflineAction) => {
- TODO: Implement actual API calls
- switch (action.type) {
- case 'add':
- // await api.addItem(action.data);
- break;
- case 'complete':
- // await api.completeItem(action.itemId, action.data);
- break;
- case 'update':
- // await api.updateItem(action.itemId, action.data);
- break;
- case 'delete':
- // await api.deleteItem(action.itemId);
- break;
- }
- };
-
- // Listen for online/offline status changes
- const setupNetworkListeners = () => {
- window.addEventListener('online', () => {
- (async () => {
- isOnline.value = true;
- await processQueue();
- })().catch(error => {
- console.error('Error processing queue:', error);
- });
- });
-
- window.addEventListener('offline', () => {
- isOnline.value = false;
- });
- };
-
- // Computed properties
- const hasPendingActions = computed(() => pendingActions.value.length > 0);
- const pendingActionCount = computed(() => pendingActions.value.length);
-
- // Initialize
- init();
- setupNetworkListeners();
-
- const handleConflictResolution = (resolution: ConflictResolution) => {
- // Implement the logic to handle the conflict resolution
- console.log('Conflict resolution:', resolution);
- };
-
- return {
- isOnline,
- pendingActions,
- hasPendingActions,
- pendingActionCount,
- showConflictDialog,
- currentConflict,
- addAction,
- processQueue,
- handleConflictResolution,
- };
-});
-
-
-
-{
- "extends": "./.quasar/tsconfig.json"
-}
-
-
-
-# app/api/v1/endpoints/auth.py
-import logging
-from typing import Annotated
-from fastapi import APIRouter, Depends, HTTPException, status
-from fastapi.security import OAuth2PasswordRequestForm
-from sqlalchemy.ext.asyncio import AsyncSession
-
-from app.database import get_db
-from app.schemas.user import UserCreate, UserPublic
-from app.schemas.auth import Token
-from app.crud import user as crud_user
-from app.core.security import (
- verify_password,
- create_access_token,
- create_refresh_token,
- verify_refresh_token
-)
-from app.core.exceptions import (
- EmailAlreadyRegisteredError,
- InvalidCredentialsError,
- UserCreationError
-)
-from app.config import settings
-
-logger = logging.getLogger(__name__)
-router = APIRouter()
-
-@router.post(
- "/signup",
- response_model=UserPublic,
- status_code=201,
- summary="Register New User",
- description="Creates a new user account.",
- tags=["Authentication"]
-)
-async def signup(
- user_in: UserCreate,
- db: AsyncSession = Depends(get_db)
-):
- """
- Handles user registration.
- - Validates input data.
- - Checks if email already exists.
- - Hashes the password.
- - Stores the new user in the database.
- """
- logger.info(f"Signup attempt for email: {user_in.email}")
- existing_user = await crud_user.get_user_by_email(db, email=user_in.email)
- if existing_user:
- logger.warning(f"Signup failed: Email already registered - {user_in.email}")
- raise EmailAlreadyRegisteredError()
-
- try:
- created_user = await crud_user.create_user(db=db, user_in=user_in)
- logger.info(f"User created successfully: {created_user.email} (ID: {created_user.id})")
- return created_user
- except Exception as e:
- logger.error(f"Error during user creation for {user_in.email}: {e}", exc_info=True)
- raise UserCreationError()
-
-@router.post(
- "/login",
- response_model=Token,
- summary="User Login",
- description="Authenticates a user and returns an access and refresh token.",
- tags=["Authentication"]
-)
-async def login(
- form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
- db: AsyncSession = Depends(get_db)
-):
- """
- Handles user login.
- - Finds user by email (provided in 'username' field of form).
- - Verifies the provided password against the stored hash.
- - Generates and returns JWT access and refresh tokens upon successful authentication.
- """
- logger.info(f"Login attempt for user: {form_data.username}")
- user = await crud_user.get_user_by_email(db, email=form_data.username)
-
- if not user or not verify_password(form_data.password, user.password_hash):
- logger.warning(f"Login failed: Invalid credentials for user {form_data.username}")
- raise InvalidCredentialsError()
-
- access_token = create_access_token(subject=user.email)
- refresh_token = create_refresh_token(subject=user.email)
- logger.info(f"Login successful, tokens generated for user: {user.email}")
- return Token(
- access_token=access_token,
- refresh_token=refresh_token,
- token_type=settings.TOKEN_TYPE
- )
-
-@router.post(
- "/refresh",
- response_model=Token,
- summary="Refresh Access Token",
- description="Refreshes an access token using a refresh token.",
- tags=["Authentication"]
-)
-async def refresh_token(
- refresh_token_str: str,
- db: AsyncSession = Depends(get_db)
-):
- """
- Handles access token refresh.
- - Verifies the provided refresh token.
- - If valid, generates and returns a new JWT access token and the same refresh token.
- """
- logger.info("Access token refresh attempt")
- payload = verify_refresh_token(refresh_token_str)
- if not payload:
- logger.warning("Refresh token invalid or expired")
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid or expired refresh token",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- user_email = payload.get("sub")
- if not user_email:
- logger.error("User email not found in refresh token payload")
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid refresh token payload",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- new_access_token = create_access_token(subject=user_email)
- logger.info(f"Access token refreshed for user: {user_email}")
- return Token(
- access_token=new_access_token,
- refresh_token=refresh_token_str,
- token_type=settings.TOKEN_TYPE
- )
-
-
-
-# app/api/v1/endpoints/lists.py
-import logging
-from typing import List as PyList, Optional # Alias for Python List type hint
-
-from fastapi import APIRouter, Depends, HTTPException, status, Response, Query # Added Query
-from sqlalchemy.ext.asyncio import AsyncSession
-
-from app.database import get_db
-from app.api.dependencies import get_current_user
-from app.models import User as UserModel
-from app.schemas.list import ListCreate, ListUpdate, ListPublic, ListDetail
-from app.schemas.message import Message # For simple responses
-from app.crud import list as crud_list
-from app.crud import group as crud_group # Need for group membership check
-from app.schemas.list import ListStatus
-from app.core.exceptions import (
- GroupMembershipError,
- ListNotFoundError,
- ListPermissionError,
- ListStatusNotFoundError,
- ConflictError # Added ConflictError
-)
-
-logger = logging.getLogger(__name__)
-router = APIRouter()
-
-@router.post(
- "", # Route relative to prefix "/lists"
- response_model=ListPublic, # Return basic list info on creation
- status_code=status.HTTP_201_CREATED,
- summary="Create New List",
- tags=["Lists"]
-)
-async def create_list(
- list_in: ListCreate,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Creates a new shopping list.
- - If `group_id` is provided, the user must be a member of that group.
- - If `group_id` is null, it's a personal list.
- """
- logger.info(f"User {current_user.email} creating list: {list_in.name}")
- group_id = list_in.group_id
-
- # Permission Check: If sharing with a group, verify membership
- if group_id:
- is_member = await crud_group.is_user_member(db, group_id=group_id, user_id=current_user.id)
- if not is_member:
- logger.warning(f"User {current_user.email} attempted to create list in group {group_id} but is not a member.")
- raise GroupMembershipError(group_id, "create lists")
-
- created_list = await crud_list.create_list(db=db, list_in=list_in, creator_id=current_user.id)
- logger.info(f"List '{created_list.name}' (ID: {created_list.id}) created successfully for user {current_user.email}.")
- return created_list
-
-
-@router.get(
- "", # Route relative to prefix "/lists"
- response_model=PyList[ListPublic], # Return a list of basic list info
- summary="List Accessible Lists",
- tags=["Lists"]
-)
-async def read_lists(
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
- # Add pagination parameters later if needed: skip: int = 0, limit: int = 100
-):
- """
- Retrieves lists accessible to the current user:
- - Personal lists created by the user.
- - Lists belonging to groups the user is a member of.
- """
- logger.info(f"Fetching lists accessible to user: {current_user.email}")
- lists = await crud_list.get_lists_for_user(db=db, user_id=current_user.id)
- return lists
-
-
-@router.get(
- "/{list_id}",
- response_model=ListDetail, # Return detailed list info including items
- summary="Get List Details",
- tags=["Lists"]
-)
-async def read_list(
- list_id: int,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Retrieves details for a specific list, including its items,
- if the user has permission (creator or group member).
- """
- logger.info(f"User {current_user.email} requesting details for list ID: {list_id}")
- # The check_list_permission function will raise appropriate exceptions
- list_db = await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id)
- return list_db
-
-
-@router.put(
- "/{list_id}",
- response_model=ListPublic, # Return updated basic info
- summary="Update List",
- tags=["Lists"],
- responses={ # Add 409 to responses
- status.HTTP_409_CONFLICT: {"description": "Conflict: List has been modified by someone else"}
- }
-)
-async def update_list(
- list_id: int,
- list_in: ListUpdate,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Updates a list's details (name, description, is_complete).
- Requires user to be the creator or a member of the list's group.
- The client MUST provide the current `version` of the list in the `list_in` payload.
- If the version does not match, a 409 Conflict is returned.
- """
- logger.info(f"User {current_user.email} attempting to update list ID: {list_id} with version {list_in.version}")
- list_db = await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id)
-
- try:
- updated_list = await crud_list.update_list(db=db, list_db=list_db, list_in=list_in)
- logger.info(f"List {list_id} updated successfully by user {current_user.email} to version {updated_list.version}.")
- return updated_list
- except ConflictError as e: # Catch and re-raise as HTTPException for proper FastAPI response
- logger.warning(f"Conflict updating list {list_id} for user {current_user.email}: {str(e)}")
- raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
- except Exception as e: # Catch other potential errors from crud operation
- logger.error(f"Error updating list {list_id} for user {current_user.email}: {str(e)}")
- # Consider a more generic error, but for now, let's keep it specific if possible
- # Re-raising might be better if crud layer already raises appropriate HTTPExceptions
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while updating the list.")
-
-
-@router.delete(
- "/{list_id}",
- status_code=status.HTTP_204_NO_CONTENT, # Standard for successful DELETE with no body
- summary="Delete List",
- tags=["Lists"],
- responses={ # Add 409 to responses
- status.HTTP_409_CONFLICT: {"description": "Conflict: List has been modified, cannot delete specified version"}
- }
-)
-async def delete_list(
- list_id: int,
- expected_version: Optional[int] = Query(None, description="The expected version of the list to delete for optimistic locking."),
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Deletes a list. Requires user to be the creator of the list.
- If `expected_version` is provided and does not match the list's current version,
- a 409 Conflict is returned.
- """
- logger.info(f"User {current_user.email} attempting to delete list ID: {list_id}, expected version: {expected_version}")
- # Use the helper, requiring creator permission
- list_db = await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id, require_creator=True)
-
- if expected_version is not None and list_db.version != expected_version:
- logger.warning(
- f"Conflict deleting list {list_id} for user {current_user.email}. "
- f"Expected version {expected_version}, actual version {list_db.version}."
- )
- raise HTTPException(
- status_code=status.HTTP_409_CONFLICT,
- detail=f"List has been modified. Expected version {expected_version}, but current version is {list_db.version}. Please refresh."
- )
-
- await crud_list.delete_list(db=db, list_db=list_db)
- logger.info(f"List {list_id} (version: {list_db.version}) deleted successfully by user {current_user.email}.")
- return Response(status_code=status.HTTP_204_NO_CONTENT)
-
-
-@router.get(
- "/{list_id}/status",
- response_model=ListStatus,
- summary="Get List Status",
- tags=["Lists"]
-)
-async def read_list_status(
- list_id: int,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Retrieves the last update time for the list and its items, plus item count.
- Used for polling to check if a full refresh is needed.
- Requires user to have permission to view the list.
- """
- # Verify user has access to the list first
- list_db = await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id)
- if not list_db:
- # Check if list exists at all for correct error code
- exists = await crud_list.get_list_by_id(db, list_id)
- if not exists:
- raise ListNotFoundError(list_id)
- raise ListPermissionError(list_id, "access this list's status")
-
- # Fetch the status details
- list_status = await crud_list.get_list_status(db=db, list_id=list_id)
- if not list_status:
- # Should not happen if check_list_permission passed, but handle defensively
- logger.error(f"Could not retrieve status for list {list_id} even though permission check passed.")
- raise ListStatusNotFoundError(list_id)
-
- return list_status
-
-
-
-import logging
-from typing import List
-
-from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, status
-from google.api_core import exceptions as google_exceptions
-
-from app.api.dependencies import get_current_user
-from app.models import User as UserModel
-from app.schemas.ocr import OcrExtractResponse
-from app.core.gemini import extract_items_from_image_gemini, gemini_initialization_error, GeminiOCRService
-from app.core.exceptions import (
- OCRServiceUnavailableError,
- OCRServiceConfigError,
- OCRUnexpectedError,
- OCRQuotaExceededError,
- InvalidFileTypeError,
- FileTooLargeError,
- OCRProcessingError
-)
-from app.config import settings
-
-logger = logging.getLogger(__name__)
-router = APIRouter()
-ocr_service = GeminiOCRService()
-
-@router.post(
- "/extract-items",
- response_model=OcrExtractResponse,
- summary="Extract List Items via OCR (Gemini)",
- tags=["OCR"]
-)
-async def ocr_extract_items(
- current_user: UserModel = Depends(get_current_user),
- image_file: UploadFile = File(..., description="Image file (JPEG, PNG, WEBP) of the shopping list or receipt."),
-):
- """
- Accepts an image upload, sends it to Gemini Flash with a prompt
- to extract shopping list items, and returns the parsed items.
- """
- # Check if Gemini client initialized correctly
- if gemini_initialization_error:
- logger.error("OCR endpoint called but Gemini client failed to initialize.")
- raise OCRServiceUnavailableError(gemini_initialization_error)
-
- logger.info(f"User {current_user.email} uploading image '{image_file.filename}' for OCR extraction.")
-
- # --- File Validation ---
- if image_file.content_type not in settings.ALLOWED_IMAGE_TYPES:
- logger.warning(f"Invalid file type uploaded by {current_user.email}: {image_file.content_type}")
- raise InvalidFileTypeError()
-
- # Simple size check
- contents = await image_file.read()
- if len(contents) > settings.MAX_FILE_SIZE_MB * 1024 * 1024:
- logger.warning(f"File too large uploaded by {current_user.email}: {len(contents)} bytes")
- raise FileTooLargeError()
-
- try:
- # Call the Gemini helper function
- extracted_items = await extract_items_from_image_gemini(
- image_bytes=contents,
- mime_type=image_file.content_type
- )
-
- logger.info(f"Successfully extracted {len(extracted_items)} items for user {current_user.email}.")
- return OcrExtractResponse(extracted_items=extracted_items)
-
- except OCRServiceUnavailableError:
- raise OCRServiceUnavailableError()
- except OCRServiceConfigError:
- raise OCRServiceConfigError()
- except OCRQuotaExceededError:
- raise OCRQuotaExceededError()
- except Exception as e:
- raise OCRProcessingError(str(e))
-
- finally:
- # Ensure file handle is closed
- await image_file.close()
-
-
-
-# app/core/gemini.py
-import logging
-from typing import List
-import google.generativeai as genai
-from google.generativeai.types import HarmCategory, HarmBlockThreshold # For safety settings
-from google.api_core import exceptions as google_exceptions
-from app.config import settings
-from app.core.exceptions import (
- OCRServiceUnavailableError,
- OCRServiceConfigError,
- OCRUnexpectedError,
- OCRQuotaExceededError
-)
-
-logger = logging.getLogger(__name__)
-
-# --- Global variable to hold the initialized model client ---
-gemini_flash_client = None
-gemini_initialization_error = None # Store potential init error
-
-# --- Configure and Initialize ---
-try:
- if settings.GEMINI_API_KEY:
- genai.configure(api_key=settings.GEMINI_API_KEY)
- # Initialize the specific model we want to use
- gemini_flash_client = genai.GenerativeModel(
- model_name=settings.GEMINI_MODEL_NAME,
- # Safety settings from config
- safety_settings={
- getattr(HarmCategory, category): getattr(HarmBlockThreshold, threshold)
- for category, threshold in settings.GEMINI_SAFETY_SETTINGS.items()
- },
- # Generation config from settings
- generation_config=genai.types.GenerationConfig(
- **settings.GEMINI_GENERATION_CONFIG
- )
- )
- logger.info(f"Gemini AI client initialized successfully for model '{settings.GEMINI_MODEL_NAME}'.")
- else:
- # Store error if API key is missing
- gemini_initialization_error = "GEMINI_API_KEY not configured. Gemini client not initialized."
- logger.error(gemini_initialization_error)
-
-except Exception as e:
- # Catch any other unexpected errors during initialization
- gemini_initialization_error = f"Failed to initialize Gemini AI client: {e}"
- logger.exception(gemini_initialization_error) # Log full traceback
- gemini_flash_client = None # Ensure client is None on error
-
-
-# --- Function to get the client (optional, allows checking error) ---
-def get_gemini_client():
- """
- Returns the initialized Gemini client instance.
- Raises an exception if initialization failed.
- """
- if gemini_initialization_error:
- raise RuntimeError(f"Gemini client could not be initialized: {gemini_initialization_error}")
- if gemini_flash_client is None:
- # This case should ideally be covered by the check above, but as a safeguard:
- raise RuntimeError("Gemini client is not available (unknown initialization issue).")
- return gemini_flash_client
-
-# Define the prompt as a constant
-OCR_ITEM_EXTRACTION_PROMPT = """
-Extract the shopping list items from this image.
-List each distinct item on a new line.
-Ignore prices, quantities, store names, discounts, taxes, totals, and other non-item text.
-Focus only on the names of the products or items to be purchased.
-If the image does not appear to be a shopping list or receipt, state that clearly.
-Example output for a grocery list:
-Milk
-Eggs
-Bread
-Apples
-Organic Bananas
-"""
-
-async def extract_items_from_image_gemini(image_bytes: bytes, mime_type: str = "image/jpeg") -> List[str]:
- """
- Uses Gemini Flash to extract shopping list items from image bytes.
-
- Args:
- image_bytes: The image content as bytes.
- mime_type: The MIME type of the image (e.g., "image/jpeg", "image/png", "image/webp").
-
- Returns:
- A list of extracted item strings.
-
- Raises:
- RuntimeError: If the Gemini client is not initialized.
- google_exceptions.GoogleAPIError: For API call errors (quota, invalid key etc.).
- ValueError: If the response is blocked or contains no usable text.
- """
- client = get_gemini_client() # Raises RuntimeError if not initialized
-
- # Prepare image part for multimodal input
- image_part = {
- "mime_type": mime_type,
- "data": image_bytes
- }
-
- # Prepare the full prompt content
- prompt_parts = [
- settings.OCR_ITEM_EXTRACTION_PROMPT, # Text prompt first
- image_part # Then the image
- ]
-
- logger.info("Sending image to Gemini for item extraction...")
- try:
- # Make the API call
- # Use generate_content_async for async FastAPI
- response = await client.generate_content_async(prompt_parts)
-
- # --- Process the response ---
- # Check for safety blocks or lack of content
- if not response.candidates or not response.candidates[0].content.parts:
- logger.warning("Gemini response blocked or empty.", extra={"response": response})
- # Check finish_reason if available
- finish_reason = response.candidates[0].finish_reason if response.candidates else 'UNKNOWN'
- safety_ratings = response.candidates[0].safety_ratings if response.candidates else 'N/A'
- if finish_reason == 'SAFETY':
- raise ValueError(f"Gemini response blocked due to safety settings. Ratings: {safety_ratings}")
- else:
- raise ValueError(f"Gemini response was empty or incomplete. Finish Reason: {finish_reason}")
-
- # Extract text - assumes the first part of the first candidate is the text response
- raw_text = response.text # response.text is a shortcut for response.candidates[0].content.parts[0].text
- logger.info("Received raw text from Gemini.")
- # logger.debug(f"Gemini Raw Text:\n{raw_text}") # Optional: Log full response text
-
- # Parse the text response
- items = []
- for line in raw_text.splitlines(): # Split by newline
- cleaned_line = line.strip() # Remove leading/trailing whitespace
- # Basic filtering: ignore empty lines and potential non-item lines
- if cleaned_line and len(cleaned_line) > 1: # Ignore very short lines too?
- # Add more sophisticated filtering if needed (e.g., regex, keyword check)
- items.append(cleaned_line)
-
- logger.info(f"Extracted {len(items)} potential items.")
- return items
-
- except google_exceptions.GoogleAPIError as e:
- logger.error(f"Gemini API Error: {e}", exc_info=True)
- # Re-raise specific Google API errors for endpoint to handle (e.g., quota)
- raise e
- except Exception as e:
- # Catch other unexpected errors during generation or processing
- logger.error(f"Unexpected error during Gemini item extraction: {e}", exc_info=True)
- # Wrap in a generic ValueError or re-raise
- raise ValueError(f"Failed to process image with Gemini: {e}") from e
-
-class GeminiOCRService:
- def __init__(self):
- try:
- genai.configure(api_key=settings.GEMINI_API_KEY)
- self.model = genai.GenerativeModel(settings.GEMINI_MODEL_NAME)
- self.model.safety_settings = settings.GEMINI_SAFETY_SETTINGS
- self.model.generation_config = settings.GEMINI_GENERATION_CONFIG
- except Exception as e:
- logger.error(f"Failed to initialize Gemini client: {e}")
- raise OCRServiceConfigError()
-
- async def extract_items(self, image_data: bytes) -> List[str]:
- """
- Extract shopping list items from an image using Gemini Vision.
- """
- try:
- # Create image part
- image_parts = [{"mime_type": "image/jpeg", "data": image_data}]
-
- # Generate content
- response = await self.model.generate_content_async(
- contents=[settings.OCR_ITEM_EXTRACTION_PROMPT, *image_parts]
- )
-
- # Process response
- if not response.text:
- raise OCRUnexpectedError()
-
- # Split response into lines and clean up
- items = [
- item.strip()
- for item in response.text.split("\n")
- if item.strip() and not item.strip().startswith("Example")
- ]
-
- return items
-
- except Exception as e:
- logger.error(f"Error during OCR extraction: {e}")
- if "quota" in str(e).lower():
- raise OCRQuotaExceededError()
- raise OCRServiceUnavailableError()
-
-
-
-# be/Dockerfile
-
-# Choose a suitable Python base image
-FROM python:3.11-slim
-
-# Set environment variables
-ENV PYTHONDONTWRITEBYTECODE 1 # Prevent python from writing pyc files
-ENV PYTHONUNBUFFERED 1 # Keep stdout/stderr unbuffered
-
-# Set the working directory in the container
-WORKDIR /app
-
-# Install system dependencies if needed (e.g., for psycopg2 build)
-# RUN apt-get update && apt-get install -y --no-install-recommends gcc build-essential libpq-dev && rm -rf /var/lib/apt/lists/*
-
-# Install Python dependencies
-# Upgrade pip first
-RUN pip install --no-cache-dir --upgrade pip
-# Copy only requirements first to leverage Docker cache
-COPY requirements.txt requirements.txt
-# Install dependencies
-RUN pip install --no-cache-dir -r requirements.txt
-
-# Copy the rest of the application code into the working directory
-COPY . .
-# This includes your 'app/' directory, alembic.ini, etc.
-
-# Expose the port the app runs on
-EXPOSE 8000
-
-# Command to run the application using uvicorn
-# The default command for production (can be overridden in docker-compose for development)
-# Note: Make sure 'app.main:app' correctly points to your FastAPI app instance
-# relative to the WORKDIR (/app). If your main.py is directly in /app, this is correct.
-CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
-
-
-
-fastapi>=0.95.0
-uvicorn[standard]>=0.20.0
-sqlalchemy[asyncio]>=2.0.0 # Core ORM + Async support
-asyncpg>=0.27.0 # Async PostgreSQL driver
-psycopg2-binary>=2.9.0 # Often needed by Alembic even if app uses asyncpg
-alembic>=1.9.0 # Database migrations
-pydantic-settings>=2.0.0 # For loading settings from .env
-python-dotenv>=1.0.0 # To load .env file for scripts/alembic
-passlib[bcrypt]>=1.7.4
-python-jose[cryptography]>=3.3.0
-pydantic[email]
-google-generativeai>=0.5.0
-
-
-
-{
- "name": "mitlist",
- "version": "0.0.1",
- "description": "mitlist pwa",
- "productName": "mitlist",
- "author": "Mohamad ",
- "type": "module",
- "private": true,
- "scripts": {
- "lint": "eslint -c ./eslint.config.js \"./src*/**/*.{ts,js,cjs,mjs,vue}\"",
- "format": "prettier --write \"**/*.{js,ts,vue,scss,html,md,json}\" --ignore-path .gitignore",
- "test": "echo \"No test specified\" && exit 0",
- "dev": "quasar dev",
- "build": "quasar build",
- "postinstall": "quasar prepare"
- },
- "dependencies": {
- "@quasar/extras": "^1.16.4",
- "axios": "^1.2.1",
- "pinia": "^3.0.1",
- "quasar": "^2.16.0",
- "register-service-worker": "^1.7.2",
- "vue": "^3.4.18",
- "vue-i18n": "^11.0.0",
- "vue-router": "^4.0.12"
- },
- "devDependencies": {
- "@eslint/js": "^9.14.0",
- "@intlify/unplugin-vue-i18n": "^4.0.0",
- "@quasar/app-vite": "^2.1.0",
- "@types/node": "^20.5.9",
- "@vue/eslint-config-prettier": "^10.1.0",
- "@vue/eslint-config-typescript": "^14.4.0",
- "autoprefixer": "^10.4.2",
- "eslint": "^9.14.0",
- "eslint-plugin-vue": "^9.30.0",
- "globals": "^15.12.0",
- "prettier": "^3.3.3",
- "typescript": "~5.5.3",
- "vite-plugin-checker": "^0.9.0",
- "vue-tsc": "^2.0.29",
- "workbox-build": "^7.3.0",
- "workbox-cacheable-response": "^7.3.0",
- "workbox-core": "^7.3.0",
- "workbox-expiration": "^7.3.0",
- "workbox-precaching": "^7.3.0",
- "workbox-routing": "^7.3.0",
- "workbox-strategies": "^7.3.0"
- },
- "engines": {
- "node": "^28 || ^26 || ^24 || ^22 || ^20 || ^18",
- "npm": ">= 6.13.4",
- "yarn": ">= 1.21.1"
- }
-}
-
-
-
-import { boot } from 'quasar/wrappers';
-import axios from 'axios';
-import { API_BASE_URL } from 'src/config/api-config';
-
-// Create axios instance
-const api = axios.create({
- baseURL: API_BASE_URL,
- headers: {
- 'Content-Type': 'application/json',
- },
-});
-
-// Request interceptor
-api.interceptors.request.use(
- (config) => {
- const token = localStorage.getItem('token');
- if (token) {
- config.headers.Authorization = `Bearer ${token}`;
- }
- return config;
- },
- (error) => {
- return Promise.reject(new Error(String(error)));
- }
-);
-
-// Response interceptor
-api.interceptors.response.use(
- (response) => response,
- async (error) => {
- const originalRequest = error.config;
-
- // If error is 401 and we haven't tried to refresh token yet
- if (error.response?.status === 401 && !originalRequest._retry) {
- originalRequest._retry = true;
-
- try {
- const refreshToken = localStorage.getItem('refreshToken');
- if (!refreshToken) {
- throw new Error('No refresh token available');
- }
-
- // Call refresh token endpoint
- const response = await api.post('/api/v1/auth/refresh-token', {
- refresh_token: refreshToken,
- });
-
- const { access_token } = response.data;
- localStorage.setItem('token', access_token);
-
- // Retry the original request with new token
- originalRequest.headers.Authorization = `Bearer ${access_token}`;
- return api(originalRequest);
- } catch (refreshError) {
- // If refresh token fails, clear storage and redirect to login
- localStorage.removeItem('token');
- localStorage.removeItem('refreshToken');
- window.location.href = '/login';
- return Promise.reject(new Error(String(refreshError)));
- }
- }
-
- return Promise.reject(new Error(String(error)));
- }
-);
-
-export default boot(({ app }) => {
- app.config.globalProperties.$axios = axios;
- app.config.globalProperties.$api = api;
-});
-
-export { api };
-
-
-
-
-
-
Account Settings
-
-
-
-
Loading profile...
-
-
-
-
-
-
- {{ error }}
-
-
-
-
-
-
-
-
-
-
-
Profile Information
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Change Password
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Notification Preferences
-
-
-
-
-
-
- Email Notifications
- Receive email notifications for important updates
-
-
-
-
-
-
-
-
- List Updates
- Get notified when lists are updated
-
-
-
-
-
-
-
-
- Group Activities
- Receive notifications for group activities
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-# app/api/v1/endpoints/costs.py
-import logging
-from fastapi import APIRouter, Depends, HTTPException, status
-from sqlalchemy import select
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.orm import Session, selectinload
-from decimal import Decimal, ROUND_HALF_UP
-
-from app.database import get_db
-from app.api.dependencies import get_current_user
-from app.models import (
- User as UserModel,
- Group as GroupModel,
- List as ListModel,
- Expense as ExpenseModel,
- Item as ItemModel,
- UserGroup as UserGroupModel,
- SplitTypeEnum,
- ExpenseSplit as ExpenseSplitModel,
- Settlement as SettlementModel
-)
-from app.schemas.cost import ListCostSummary, GroupBalanceSummary
-from app.schemas.expense import ExpenseCreate
-from app.crud import list as crud_list
-from app.crud import expense as crud_expense
-from app.core.exceptions import ListNotFoundError, ListPermissionError, UserNotFoundError, GroupNotFoundError
-
-logger = logging.getLogger(__name__)
-router = APIRouter()
-
-@router.get(
- "/lists/{list_id}/cost-summary",
- response_model=ListCostSummary,
- summary="Get Cost Summary for a List",
- tags=["Costs"],
- responses={
- status.HTTP_403_FORBIDDEN: {"description": "User does not have permission to access this list"},
- status.HTTP_404_NOT_FOUND: {"description": "List or associated user not found"}
- }
-)
-async def get_list_cost_summary(
- list_id: int,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Retrieves a calculated cost summary for a specific list, detailing total costs,
- equal shares per user, and individual user balances based on their contributions.
-
- The user must have access to the list to view its cost summary.
- Costs are split among group members if the list belongs to a group, or just for
- the creator if it's a personal list. All users who added items with prices are
- included in the calculation.
- """
- logger.info(f"User {current_user.email} requesting cost summary for list {list_id}")
-
- # 1. Verify user has access to the target list
- try:
- await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id)
- except ListPermissionError as e:
- logger.warning(f"Permission denied for user {current_user.email} on list {list_id}: {str(e)}")
- raise
- except ListNotFoundError as e:
- logger.warning(f"List {list_id} not found when checking permissions for cost summary: {str(e)}")
- raise
-
- # 2. Get the list with its items and users
- list_result = await db.execute(
- select(ListModel)
- .options(
- selectinload(ListModel.items).options(selectinload(ItemModel.added_by_user)),
- selectinload(ListModel.group).options(selectinload(GroupModel.member_associations).options(selectinload(UserGroupModel.user))),
- selectinload(ListModel.creator)
- )
- .where(ListModel.id == list_id)
- )
- db_list = list_result.scalars().first()
- if not db_list:
- raise ListNotFoundError(list_id)
-
- # 3. Get or create an expense for this list
- expense_result = await db.execute(
- select(ExpenseModel)
- .where(ExpenseModel.list_id == list_id)
- .options(selectinload(ExpenseModel.splits))
- )
- db_expense = expense_result.scalars().first()
-
- if not db_expense:
- # Create a new expense for this list
- total_amount = sum(item.price for item in db_list.items if item.price is not None and item.price > Decimal("0"))
- if total_amount == Decimal("0"):
- return ListCostSummary(
- list_id=db_list.id,
- list_name=db_list.name,
- total_list_cost=Decimal("0.00"),
- num_participating_users=0,
- equal_share_per_user=Decimal("0.00"),
- user_balances=[]
- )
-
- # Create expense with ITEM_BASED split type
- expense_in = ExpenseCreate(
- description=f"Cost summary for list {db_list.name}",
- total_amount=total_amount,
- list_id=list_id,
- split_type=SplitTypeEnum.ITEM_BASED,
- paid_by_user_id=current_user.id # Use current user as payer for now
- )
- db_expense = await crud_expense.create_expense(db=db, expense_in=expense_in)
-
- # 4. Calculate cost summary from expense splits
- participating_users = set()
- user_items_added_value = {}
- total_list_cost = Decimal("0.00")
-
- # Get all users who added items
- for item in db_list.items:
- if item.price is not None and item.price > Decimal("0") and item.added_by_user:
- participating_users.add(item.added_by_user)
- user_items_added_value[item.added_by_user.id] = user_items_added_value.get(item.added_by_user.id, Decimal("0.00")) + item.price
- total_list_cost += item.price
-
- # Get all users from expense splits
- for split in db_expense.splits:
- if split.user:
- participating_users.add(split.user)
-
- num_participating_users = len(participating_users)
- if num_participating_users == 0:
- return ListCostSummary(
- list_id=db_list.id,
- list_name=db_list.name,
- total_list_cost=Decimal("0.00"),
- num_participating_users=0,
- equal_share_per_user=Decimal("0.00"),
- user_balances=[]
- )
-
- equal_share_per_user = (total_list_cost / Decimal(num_participating_users)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
- remainder = total_list_cost - (equal_share_per_user * num_participating_users)
-
- user_balances = []
- first_user_processed = False
- for user in participating_users:
- items_added = user_items_added_value.get(user.id, Decimal("0.00"))
- current_user_share = equal_share_per_user
- if not first_user_processed and remainder != Decimal("0"):
- current_user_share += remainder
- first_user_processed = True
-
- balance = items_added - current_user_share
- user_identifier = user.name if user.name else user.email
- user_balances.append(
- UserCostShare(
- user_id=user.id,
- user_identifier=user_identifier,
- items_added_value=items_added.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP),
- amount_due=current_user_share.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP),
- balance=balance.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
- )
- )
-
- user_balances.sort(key=lambda x: x.user_identifier)
- return ListCostSummary(
- list_id=db_list.id,
- list_name=db_list.name,
- total_list_cost=total_list_cost.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP),
- num_participating_users=num_participating_users,
- equal_share_per_user=equal_share_per_user.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP),
- user_balances=user_balances
- )
-
-@router.get(
- "/groups/{group_id}/balance-summary",
- response_model=GroupBalanceSummary,
- summary="Get Detailed Balance Summary for a Group",
- tags=["Costs", "Groups"],
- responses={
- status.HTTP_403_FORBIDDEN: {"description": "User does not have permission to access this group"},
- status.HTTP_404_NOT_FOUND: {"description": "Group not found"}
- }
-)
-async def get_group_balance_summary(
- group_id: int,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """
- Retrieves a detailed financial balance summary for all users within a specific group.
- It considers all expenses, their splits, and all settlements recorded for the group.
- The user must be a member of the group to view its balance summary.
- """
- logger.info(f"User {current_user.email} requesting balance summary for group {group_id}")
-
- # 1. Verify user is a member of the target group
- group_check = await db.execute(
- select(GroupModel)
- .options(selectinload(GroupModel.member_associations))
- .where(GroupModel.id == group_id)
- )
- db_group_for_check = group_check.scalars().first()
-
- if not db_group_for_check:
- raise GroupNotFoundError(group_id)
-
- user_is_member = any(assoc.user_id == current_user.id for assoc in db_group_for_check.member_associations)
- if not user_is_member:
- raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"User not a member of group {group_id}")
-
- # 2. Get all expenses and settlements for the group
- expenses_result = await db.execute(
- select(ExpenseModel)
- .where(ExpenseModel.group_id == group_id)
- .options(selectinload(ExpenseModel.splits).selectinload(ExpenseSplitModel.user))
- )
- expenses = expenses_result.scalars().all()
-
- settlements_result = await db.execute(
- select(SettlementModel)
- .where(SettlementModel.group_id == group_id)
- .options(
- selectinload(SettlementModel.paid_by_user),
- selectinload(SettlementModel.paid_to_user)
- )
- )
- settlements = settlements_result.scalars().all()
-
- # 3. Calculate user balances
- user_balances_data = {}
- for assoc in db_group_for_check.member_associations:
- if assoc.user:
- user_balances_data[assoc.user.id] = UserBalanceDetail(
- user_id=assoc.user.id,
- user_identifier=assoc.user.name if assoc.user.name else assoc.user.email
- )
-
- # Process expenses
- for expense in expenses:
- if expense.paid_by_user_id in user_balances_data:
- user_balances_data[expense.paid_by_user_id].total_paid_for_expenses += expense.total_amount
-
- for split in expense.splits:
- if split.user_id in user_balances_data:
- user_balances_data[split.user_id].total_share_of_expenses += split.owed_amount
-
- # Process settlements
- for settlement in settlements:
- if settlement.paid_by_user_id in user_balances_data:
- user_balances_data[settlement.paid_by_user_id].total_settlements_paid += settlement.amount
- if settlement.paid_to_user_id in user_balances_data:
- user_balances_data[settlement.paid_to_user_id].total_settlements_received += settlement.amount
-
- # Calculate net balances
- final_user_balances = []
- for user_id, data in user_balances_data.items():
- data.net_balance = (
- data.total_paid_for_expenses + data.total_settlements_received
- ) - (data.total_share_of_expenses + data.total_settlements_paid)
-
- data.total_paid_for_expenses = data.total_paid_for_expenses.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
- data.total_share_of_expenses = data.total_share_of_expenses.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
- data.total_settlements_paid = data.total_settlements_paid.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
- data.total_settlements_received = data.total_settlements_received.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
- data.net_balance = data.net_balance.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
-
- final_user_balances.append(data)
-
- # Sort by user identifier
- final_user_balances.sort(key=lambda x: x.user_identifier)
-
- # Calculate suggested settlements
- suggested_settlements = calculate_suggested_settlements(final_user_balances)
-
- return GroupBalanceSummary(
- group_id=db_group_for_check.id,
- group_name=db_group_for_check.name,
- user_balances=final_user_balances,
- suggested_settlements=suggested_settlements
- )
-
-
-
-# app/api/v1/endpoints/items.py
-import logging
-from typing import List as PyList, Optional
-
-from fastapi import APIRouter, Depends, HTTPException, status, Response, Query
-from sqlalchemy.ext.asyncio import AsyncSession
-
-from app.database import get_db
-from app.api.dependencies import get_current_user
-# --- Import Models Correctly ---
-from app.models import User as UserModel
-from app.models import Item as ItemModel # <-- IMPORT Item and alias it
-# --- End Import Models ---
-from app.schemas.item import ItemCreate, ItemUpdate, ItemPublic
-from app.crud import item as crud_item
-from app.crud import list as crud_list
-from app.core.exceptions import ItemNotFoundError, ListPermissionError, ConflictError
-
-logger = logging.getLogger(__name__)
-router = APIRouter()
-
-# --- Helper Dependency for Item Permissions ---
-# Now ItemModel is defined before being used as a type hint
-async def get_item_and_verify_access(
- item_id: int,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user)
-) -> ItemModel:
- """Dependency to get an item and verify the user has access to its list."""
- item_db = await crud_item.get_item_by_id(db, item_id=item_id)
- if not item_db:
- raise ItemNotFoundError(item_id)
-
- # Check permission on the parent list
- try:
- await crud_list.check_list_permission(db=db, list_id=item_db.list_id, user_id=current_user.id)
- except ListPermissionError as e:
- # Re-raise with a more specific message
- raise ListPermissionError(item_db.list_id, "access this item's list")
- return item_db
-
-
-# --- Endpoints ---
-
-@router.post(
- "/lists/{list_id}/items", # Nested under lists
- response_model=ItemPublic,
- status_code=status.HTTP_201_CREATED,
- summary="Add Item to List",
- tags=["Items"]
-)
-async def create_list_item(
- list_id: int,
- item_in: ItemCreate,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
-):
- """Adds a new item to a specific list. User must have access to the list."""
- user_email = current_user.email # Access email attribute before async operations
- logger.info(f"User {user_email} adding item to list {list_id}: {item_in.name}")
- # Verify user has access to the target list
- try:
- await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id)
- except ListPermissionError as e:
- # Re-raise with a more specific message
- raise ListPermissionError(list_id, "add items to this list")
-
- created_item = await crud_item.create_item(
- db=db, item_in=item_in, list_id=list_id, user_id=current_user.id
- )
- logger.info(f"Item '{created_item.name}' (ID: {created_item.id}) added to list {list_id} by user {user_email}.")
- return created_item
-
-
-@router.get(
- "/lists/{list_id}/items", # Nested under lists
- response_model=PyList[ItemPublic],
- summary="List Items in List",
- tags=["Items"]
-)
-async def read_list_items(
- list_id: int,
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user),
- # Add sorting/filtering params later if needed: sort_by: str = 'created_at', order: str = 'asc'
-):
- """Retrieves all items for a specific list if the user has access."""
- user_email = current_user.email # Access email attribute before async operations
- logger.info(f"User {user_email} listing items for list {list_id}")
- # Verify user has access to the list
- try:
- await crud_list.check_list_permission(db=db, list_id=list_id, user_id=current_user.id)
- except ListPermissionError as e:
- # Re-raise with a more specific message
- raise ListPermissionError(list_id, "view items in this list")
-
- items = await crud_item.get_items_by_list_id(db=db, list_id=list_id)
- return items
-
-
-@router.put(
- "/items/{item_id}", # Operate directly on item ID
- response_model=ItemPublic,
- summary="Update Item",
- tags=["Items"],
- responses={
- status.HTTP_409_CONFLICT: {"description": "Conflict: Item has been modified by someone else"}
- }
-)
-async def update_item(
- item_id: int, # Item ID from path
- item_in: ItemUpdate,
- item_db: ItemModel = Depends(get_item_and_verify_access), # Use dependency to get item and check list access
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user), # Need user ID for completed_by
-):
- """
- Updates an item's details (name, quantity, is_complete, price).
- User must have access to the list the item belongs to.
- The client MUST provide the current `version` of the item in the `item_in` payload.
- If the version does not match, a 409 Conflict is returned.
- Sets/unsets `completed_by_id` based on `is_complete` flag.
- """
- user_email = current_user.email # Access email attribute before async operations
- logger.info(f"User {user_email} attempting to update item ID: {item_id} with version {item_in.version}")
- # Permission check is handled by get_item_and_verify_access dependency
-
- try:
- updated_item = await crud_item.update_item(
- db=db, item_db=item_db, item_in=item_in, user_id=current_user.id
- )
- logger.info(f"Item {item_id} updated successfully by user {user_email} to version {updated_item.version}.")
- return updated_item
- except ConflictError as e:
- logger.warning(f"Conflict updating item {item_id} for user {user_email}: {str(e)}")
- raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
- except Exception as e:
- logger.error(f"Error updating item {item_id} for user {user_email}: {str(e)}")
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while updating the item.")
-
-
-@router.delete(
- "/items/{item_id}", # Operate directly on item ID
- status_code=status.HTTP_204_NO_CONTENT,
- summary="Delete Item",
- tags=["Items"],
- responses={
- status.HTTP_409_CONFLICT: {"description": "Conflict: Item has been modified, cannot delete specified version"}
- }
-)
-async def delete_item(
- item_id: int, # Item ID from path
- expected_version: Optional[int] = Query(None, description="The expected version of the item to delete for optimistic locking."),
- item_db: ItemModel = Depends(get_item_and_verify_access), # Use dependency to get item and check list access
- db: AsyncSession = Depends(get_db),
- current_user: UserModel = Depends(get_current_user), # Log who deleted it
-):
- """
- Deletes an item. User must have access to the list the item belongs to.
- If `expected_version` is provided and does not match the item's current version,
- a 409 Conflict is returned.
- """
- user_email = current_user.email # Access email attribute before async operations
- logger.info(f"User {user_email} attempting to delete item ID: {item_id}, expected version: {expected_version}")
- # Permission check is handled by get_item_and_verify_access dependency
-
- if expected_version is not None and item_db.version != expected_version:
- logger.warning(
- f"Conflict deleting item {item_id} for user {user_email}. "
- f"Expected version {expected_version}, actual version {item_db.version}."
- )
- raise HTTPException(
- status_code=status.HTTP_409_CONFLICT,
- detail=f"Item has been modified. Expected version {expected_version}, but current version is {item_db.version}. Please refresh."
- )
-
- await crud_item.delete_item(db=db, item_db=item_db)
- logger.info(f"Item {item_id} (version {item_db.version}) deleted successfully by user {user_email}.")
- return Response(status_code=status.HTTP_204_NO_CONTENT)
-
-
-
-# app/crud/group.py
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.future import select
-from sqlalchemy.orm import selectinload # For eager loading members
-from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
-from typing import Optional, List
-from sqlalchemy import func
-
-from app.models import User as UserModel, Group as GroupModel, UserGroup as UserGroupModel
-from app.schemas.group import GroupCreate
-from app.models import UserRoleEnum # Import enum
-from app.core.exceptions import (
- GroupOperationError,
- GroupNotFoundError,
- DatabaseConnectionError,
- DatabaseIntegrityError,
- DatabaseQueryError,
- DatabaseTransactionError,
- GroupMembershipError,
- GroupPermissionError # Import GroupPermissionError
-)
-
-# --- Group CRUD ---
-async def create_group(db: AsyncSession, group_in: GroupCreate, creator_id: int) -> GroupModel:
- """Creates a group and adds the creator as the owner."""
- try:
- async with db.begin():
- db_group = GroupModel(name=group_in.name, created_by_id=creator_id)
- db.add(db_group)
- await db.flush()
-
- db_user_group = UserGroupModel(
- user_id=creator_id,
- group_id=db_group.id,
- role=UserRoleEnum.owner
- )
- db.add(db_user_group)
- await db.flush()
- await db.refresh(db_group)
- return db_group
- except IntegrityError as e:
- raise DatabaseIntegrityError(f"Failed to create group: {str(e)}")
- except OperationalError as e:
- raise DatabaseConnectionError(f"Database connection error: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseTransactionError(f"Failed to create group: {str(e)}")
-
-async def get_user_groups(db: AsyncSession, user_id: int) -> List[GroupModel]:
- """Gets all groups a user is a member of."""
- try:
- result = await db.execute(
- select(GroupModel)
- .join(UserGroupModel)
- .where(UserGroupModel.user_id == user_id)
- .options(selectinload(GroupModel.member_associations))
- )
- return result.scalars().all()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query user groups: {str(e)}")
-
-async def get_group_by_id(db: AsyncSession, group_id: int) -> Optional[GroupModel]:
- """Gets a single group by its ID, optionally loading members."""
- try:
- result = await db.execute(
- select(GroupModel)
- .where(GroupModel.id == group_id)
- .options(
- selectinload(GroupModel.member_associations).selectinload(UserGroupModel.user)
- )
- )
- return result.scalars().first()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query group: {str(e)}")
-
-async def is_user_member(db: AsyncSession, group_id: int, user_id: int) -> bool:
- """Checks if a user is a member of a specific group."""
- try:
- result = await db.execute(
- select(UserGroupModel.id)
- .where(UserGroupModel.group_id == group_id, UserGroupModel.user_id == user_id)
- .limit(1)
- )
- return result.scalar_one_or_none() is not None
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to check group membership: {str(e)}")
-
-async def get_user_role_in_group(db: AsyncSession, group_id: int, user_id: int) -> Optional[UserRoleEnum]:
- """Gets the role of a user in a specific group."""
- try:
- result = await db.execute(
- select(UserGroupModel.role)
- .where(UserGroupModel.group_id == group_id, UserGroupModel.user_id == user_id)
- )
- return result.scalar_one_or_none()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query user role: {str(e)}")
-
-async def add_user_to_group(db: AsyncSession, group_id: int, user_id: int, role: UserRoleEnum = UserRoleEnum.member) -> Optional[UserGroupModel]:
- """Adds a user to a group if they aren't already a member."""
- try:
- async with db.begin():
- existing = await db.execute(
- select(UserGroupModel).where(UserGroupModel.group_id == group_id, UserGroupModel.user_id == user_id)
- )
- if existing.scalar_one_or_none():
- return None
-
- db_user_group = UserGroupModel(user_id=user_id, group_id=group_id, role=role)
- db.add(db_user_group)
- await db.flush()
- await db.refresh(db_user_group)
- return db_user_group
- except IntegrityError as e:
- raise DatabaseIntegrityError(f"Failed to add user to group: {str(e)}")
- except OperationalError as e:
- raise DatabaseConnectionError(f"Database connection error: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseTransactionError(f"Failed to add user to group: {str(e)}")
-
-async def remove_user_from_group(db: AsyncSession, group_id: int, user_id: int) -> bool:
- """Removes a user from a group."""
- try:
- async with db.begin():
- result = await db.execute(
- delete(UserGroupModel)
- .where(UserGroupModel.group_id == group_id, UserGroupModel.user_id == user_id)
- .returning(UserGroupModel.id)
- )
- return result.scalar_one_or_none() is not None
- except OperationalError as e:
- raise DatabaseConnectionError(f"Database connection error: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseTransactionError(f"Failed to remove user from group: {str(e)}")
-
-async def get_group_member_count(db: AsyncSession, group_id: int) -> int:
- """Counts the number of members in a group."""
- try:
- result = await db.execute(
- select(func.count(UserGroupModel.id)).where(UserGroupModel.group_id == group_id)
- )
- return result.scalar_one()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to count group members: {str(e)}")
-
-async def check_group_membership(
- db: AsyncSession,
- group_id: int,
- user_id: int,
- action: str = "access this group"
-) -> None:
- """
- Checks if a user is a member of a group. Raises exceptions if not found or not a member.
-
- Raises:
- GroupNotFoundError: If the group_id does not exist.
- GroupMembershipError: If the user_id is not a member of the group.
- """
- try:
- # Check group existence first
- group_exists = await db.get(GroupModel, group_id)
- if not group_exists:
- raise GroupNotFoundError(group_id)
-
- # Check membership
- membership = await db.execute(
- select(UserGroupModel.id)
- .where(UserGroupModel.group_id == group_id, UserGroupModel.user_id == user_id)
- .limit(1)
- )
- if membership.scalar_one_or_none() is None:
- raise GroupMembershipError(group_id, action=action)
- # If we reach here, the user is a member
- return None
- except GroupNotFoundError: # Re-raise specific errors
- raise
- except GroupMembershipError:
- raise
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database while checking membership: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to check group membership: {str(e)}")
-
-async def check_user_role_in_group(
- db: AsyncSession,
- group_id: int,
- user_id: int,
- required_role: UserRoleEnum,
- action: str = "perform this action"
-) -> None:
- """
- Checks if a user is a member of a group and has the required role (or higher).
-
- Raises:
- GroupNotFoundError: If the group_id does not exist.
- GroupMembershipError: If the user_id is not a member of the group.
- GroupPermissionError: If the user does not have the required role.
- """
- # First, ensure user is a member (this also checks group existence)
- await check_group_membership(db, group_id, user_id, action=f"be checked for permissions to {action}")
-
- # Get the user's actual role
- actual_role = await get_user_role_in_group(db, group_id, user_id)
-
- # Define role hierarchy (assuming owner > member)
- role_hierarchy = {UserRoleEnum.owner: 2, UserRoleEnum.member: 1}
-
- if not actual_role or role_hierarchy.get(actual_role, 0) < role_hierarchy.get(required_role, 0):
- raise GroupPermissionError(
- group_id=group_id,
- action=f"{action} (requires at least '{required_role.value}' role)"
- )
- # If role is sufficient, return None
- return None
-
-
-
-# app/main.py
-import logging
-import uvicorn
-from fastapi import FastAPI
-from fastapi.middleware.cors import CORSMiddleware
-
-from app.api.api_router import api_router
-from app.config import settings
-from app.core.api_config import API_METADATA, API_TAGS
-# Import database and models if needed for startup/shutdown events later
-# from . import database, models
-
-# --- Logging Setup ---
-logging.basicConfig(
- level=getattr(logging, settings.LOG_LEVEL),
- format=settings.LOG_FORMAT
-)
-logger = logging.getLogger(__name__)
-
-# --- FastAPI App Instance ---
-app = FastAPI(
- **API_METADATA,
- openapi_tags=API_TAGS
-)
-
-# --- CORS Middleware ---
-# Define allowed origins. Be specific in production!
-# Use ["*"] for wide open access during early development if needed,
-# but restrict it as soon as possible.
-# SvelteKit default dev port is 5173
-origins = [
- "http://localhost:5174",
- "http://localhost:8000", # Allow requests from the API itself (e.g., Swagger UI)
- # Add your deployed frontend URL here later
- # "https://your-frontend-domain.com",
-]
-
-app.add_middleware(
- CORSMiddleware,
- allow_origins=settings.CORS_ORIGINS,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
-)
-# --- End CORS Middleware ---
-
-
-# --- Include API Routers ---
-# All API endpoints will be prefixed with /api
-app.include_router(api_router, prefix=settings.API_PREFIX)
-# --- End Include API Routers ---
-
-
-# --- Root Endpoint (Optional - outside the main API structure) ---
-@app.get("/", tags=["Root"])
-async def read_root():
- """
- Provides a simple welcome message at the root path.
- Useful for basic reachability checks.
- """
- logger.info("Root endpoint '/' accessed.")
- return {"message": settings.ROOT_MESSAGE}
-# --- End Root Endpoint ---
-
-
-# --- Application Startup/Shutdown Events (Optional) ---
-# @app.on_event("startup")
-# async def startup_event():
-# logger.info("Application startup: Connecting to database...")
-# # You might perform initial checks or warm-up here
-# # await database.engine.connect() # Example check (get_db handles sessions per request)
-# logger.info("Application startup complete.")
-
-# @app.on_event("shutdown")
-# async def shutdown_event():
-# logger.info("Application shutdown: Disconnecting from database...")
-# # await database.engine.dispose() # Close connection pool
-# logger.info("Application shutdown complete.")
-# --- End Events ---
-
-
-# --- Direct Run (for simple local testing if needed) ---
-# It's better to use `uvicorn app.main:app --reload` from the terminal
-# if __name__ == "__main__":
-# logger.info("Starting Uvicorn server directly from main.py")
-# uvicorn.run(app, host="0.0.0.0", port=8000)
-# ------------------------------------------------------
-
-
-
-
-
-
-
-
Create New List
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-// API Version
-export const API_VERSION = 'v1';
-
-// API Base URL
-export const API_BASE_URL = import.meta.env.VITE_API_URL || 'http://localhost:8000';
-
-// API Endpoints
-export const API_ENDPOINTS = {
- // Auth
- AUTH: {
- LOGIN: '/auth/login',
- SIGNUP: '/auth/signup',
- REFRESH_TOKEN: '/auth/refresh-token',
- LOGOUT: '/auth/logout',
- VERIFY_EMAIL: '/auth/verify-email',
- RESET_PASSWORD: '/auth/reset-password',
- FORGOT_PASSWORD: '/auth/forgot-password',
- },
-
- // Users
- USERS: {
- PROFILE: '/users/me',
- UPDATE_PROFILE: '/users/me',
- PASSWORD: '/users/password',
- AVATAR: '/users/avatar',
- SETTINGS: '/users/settings',
- NOTIFICATIONS: '/users/notifications',
- PREFERENCES: '/users/preferences',
- },
-
- // Lists
- LISTS: {
- BASE: '/lists',
- BY_ID: (id: string) => `/lists/${id}`,
- ITEMS: (listId: string) => `/lists/${listId}/items`,
- ITEM: (listId: string, itemId: string) => `/lists/${listId}/items/${itemId}`,
- SHARE: (listId: string) => `/lists/${listId}/share`,
- UNSHARE: (listId: string) => `/lists/${listId}/unshare`,
- COMPLETE: (listId: string) => `/lists/${listId}/complete`,
- REOPEN: (listId: string) => `/lists/${listId}/reopen`,
- ARCHIVE: (listId: string) => `/lists/${listId}/archive`,
- RESTORE: (listId: string) => `/lists/${listId}/restore`,
- DUPLICATE: (listId: string) => `/lists/${listId}/duplicate`,
- EXPORT: (listId: string) => `/lists/${listId}/export`,
- IMPORT: '/lists/import',
- },
-
- // Groups
- GROUPS: {
- BASE: '/groups',
- BY_ID: (id: string) => `/groups/${id}`,
- LISTS: (groupId: string) => `/groups/${groupId}/lists`,
- MEMBERS: (groupId: string) => `/groups/${groupId}/members`,
- MEMBER: (groupId: string, userId: string) => `/groups/${groupId}/members/${userId}`,
- LEAVE: (groupId: string) => `/groups/${groupId}/leave`,
- DELETE: (groupId: string) => `/groups/${groupId}`,
- SETTINGS: (groupId: string) => `/groups/${groupId}/settings`,
- ROLES: (groupId: string) => `/groups/${groupId}/roles`,
- ROLE: (groupId: string, roleId: string) => `/groups/${groupId}/roles/${roleId}`,
- },
-
- // Invites
- INVITES: {
- BASE: '/invites',
- BY_ID: (id: string) => `/invites/${id}`,
- ACCEPT: (id: string) => `/invites/${id}/accept`,
- DECLINE: (id: string) => `/invites/${id}/decline`,
- REVOKE: (id: string) => `/invites/${id}/revoke`,
- LIST: '/invites',
- PENDING: '/invites/pending',
- SENT: '/invites/sent',
- },
-
- // Items (for direct operations like update, get by ID)
- ITEMS: {
- BY_ID: (itemId: string) => `/items/${itemId}`,
- },
-
- // OCR
- OCR: {
- PROCESS: '/ocr/extract-items',
- STATUS: (jobId: string) => `/ocr/status/${jobId}`,
- RESULT: (jobId: string) => `/ocr/result/${jobId}`,
- BATCH: '/ocr/batch',
- CANCEL: (jobId: string) => `/ocr/cancel/${jobId}`,
- HISTORY: '/ocr/history',
- },
-
- // Costs
- COSTS: {
- BASE: '/costs',
- LIST_SUMMARY: (listId: string | number) => `/costs/lists/${listId}/cost-summary`,
- GROUP_BALANCE_SUMMARY: (groupId: string | number) => `/costs/groups/${groupId}/balance-summary`,
- },
-
- // Financials
- FINANCIALS: {
- EXPENSES: '/financials/expenses',
- EXPENSE: (id: string) => `/financials/expenses/${id}`,
- SETTLEMENTS: '/financials/settlements',
- SETTLEMENT: (id: string) => `/financials/settlements/${id}`,
- BALANCES: '/financials/balances',
- BALANCE: (userId: string) => `/financials/balances/${userId}`,
- REPORTS: '/financials/reports',
- REPORT: (id: string) => `/financials/reports/${id}`,
- CATEGORIES: '/financials/categories',
- CATEGORY: (id: string) => `/financials/categories/${id}`,
- },
-
- // Health
- HEALTH: {
- CHECK: '/health',
- VERSION: '/health/version',
- STATUS: '/health/status',
- METRICS: '/health/metrics',
- LOGS: '/health/logs',
- },
-};
-
-
-
-from fastapi import HTTPException, status
-from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
-from app.config import settings
-from typing import Optional
-
-class ListNotFoundError(HTTPException):
- """Raised when a list is not found."""
- def __init__(self, list_id: int):
- super().__init__(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"List {list_id} not found"
- )
-
-class ListPermissionError(HTTPException):
- """Raised when a user doesn't have permission to access a list."""
- def __init__(self, list_id: int, action: str = "access"):
- super().__init__(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"You do not have permission to {action} list {list_id}"
- )
-
-class ListCreatorRequiredError(HTTPException):
- """Raised when an action requires the list creator but the user is not the creator."""
- def __init__(self, list_id: int, action: str):
- super().__init__(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"Only the list creator can {action} list {list_id}"
- )
-
-class GroupNotFoundError(HTTPException):
- """Raised when a group is not found."""
- def __init__(self, group_id: int):
- super().__init__(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Group {group_id} not found"
- )
-
-class GroupPermissionError(HTTPException):
- """Raised when a user doesn't have permission to perform an action in a group."""
- def __init__(self, group_id: int, action: str):
- super().__init__(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"You do not have permission to {action} in group {group_id}"
- )
-
-class GroupMembershipError(HTTPException):
- """Raised when a user attempts to perform an action that requires group membership."""
- def __init__(self, group_id: int, action: str = "access"):
- super().__init__(
- status_code=status.HTTP_403_FORBIDDEN,
- detail=f"You must be a member of group {group_id} to {action}"
- )
-
-class GroupOperationError(HTTPException):
- """Raised when a group operation fails."""
- def __init__(self, detail: str):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=detail
- )
-
-class GroupValidationError(HTTPException):
- """Raised when a group operation is invalid."""
- def __init__(self, detail: str):
- super().__init__(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=detail
- )
-
-class ItemNotFoundError(HTTPException):
- """Raised when an item is not found."""
- def __init__(self, item_id: int):
- super().__init__(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Item {item_id} not found"
- )
-
-class UserNotFoundError(HTTPException):
- """Raised when a user is not found."""
- def __init__(self, user_id: Optional[int] = None, identifier: Optional[str] = None):
- detail_msg = "User not found."
- if user_id:
- detail_msg = f"User with ID {user_id} not found."
- elif identifier:
- detail_msg = f"User with identifier '{identifier}' not found."
- super().__init__(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=detail_msg
- )
-
-class InvalidOperationError(HTTPException):
- """Raised when an operation is invalid or disallowed by business logic."""
- def __init__(self, detail: str, status_code: int = status.HTTP_400_BAD_REQUEST):
- super().__init__(
- status_code=status_code,
- detail=detail
- )
-
-class DatabaseConnectionError(HTTPException):
- """Raised when there is an error connecting to the database."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
- detail=settings.DB_CONNECTION_ERROR
- )
-
-class DatabaseIntegrityError(HTTPException):
- """Raised when a database integrity constraint is violated."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=settings.DB_INTEGRITY_ERROR
- )
-
-class DatabaseTransactionError(HTTPException):
- """Raised when a database transaction fails."""
- def __init__(self, detail: str = settings.DB_TRANSACTION_ERROR):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=detail
- )
-
-class DatabaseQueryError(HTTPException):
- """Raised when a database query fails."""
- def __init__(self, detail: str = settings.DB_QUERY_ERROR):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=detail
- )
-
-class OCRServiceUnavailableError(HTTPException):
- """Raised when the OCR service is unavailable."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
- detail=settings.OCR_SERVICE_UNAVAILABLE
- )
-
-class OCRServiceConfigError(HTTPException):
- """Raised when there is an error in the OCR service configuration."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=settings.OCR_SERVICE_CONFIG_ERROR
- )
-
-class OCRUnexpectedError(HTTPException):
- """Raised when there is an unexpected error in the OCR service."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=settings.OCR_UNEXPECTED_ERROR
- )
-
-class OCRQuotaExceededError(HTTPException):
- """Raised when the OCR service quota is exceeded."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_429_TOO_MANY_REQUESTS,
- detail=settings.OCR_QUOTA_EXCEEDED
- )
-
-class InvalidFileTypeError(HTTPException):
- """Raised when an invalid file type is uploaded for OCR."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=settings.OCR_INVALID_FILE_TYPE.format(types=", ".join(settings.ALLOWED_IMAGE_TYPES))
- )
-
-class FileTooLargeError(HTTPException):
- """Raised when an uploaded file exceeds the size limit."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=settings.OCR_FILE_TOO_LARGE.format(size=settings.MAX_FILE_SIZE_MB)
- )
-
-class OCRProcessingError(HTTPException):
- """Raised when there is an error processing the image with OCR."""
- def __init__(self, detail: str):
- super().__init__(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=settings.OCR_PROCESSING_ERROR.format(detail=detail)
- )
-
-class EmailAlreadyRegisteredError(HTTPException):
- """Raised when attempting to register with an email that is already in use."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Email already registered."
- )
-
-class UserCreationError(HTTPException):
- """Raised when there is an error creating a new user."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="An error occurred during user creation."
- )
-
-class InviteNotFoundError(HTTPException):
- """Raised when an invite is not found."""
- def __init__(self, invite_code: str):
- super().__init__(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Invite code {invite_code} not found"
- )
-
-class InviteExpiredError(HTTPException):
- """Raised when an invite has expired."""
- def __init__(self, invite_code: str):
- super().__init__(
- status_code=status.HTTP_410_GONE,
- detail=f"Invite code {invite_code} has expired"
- )
-
-class InviteAlreadyUsedError(HTTPException):
- """Raised when an invite has already been used."""
- def __init__(self, invite_code: str):
- super().__init__(
- status_code=status.HTTP_410_GONE,
- detail=f"Invite code {invite_code} has already been used"
- )
-
-class InviteCreationError(HTTPException):
- """Raised when an invite cannot be created."""
- def __init__(self, group_id: int):
- super().__init__(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"Failed to create invite for group {group_id}"
- )
-
-class ListStatusNotFoundError(HTTPException):
- """Raised when a list's status cannot be retrieved."""
- def __init__(self, list_id: int):
- super().__init__(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Status for list {list_id} not found"
- )
-
-class ConflictError(HTTPException):
- """Raised when an optimistic lock version conflict occurs."""
- def __init__(self, detail: str):
- super().__init__(
- status_code=status.HTTP_409_CONFLICT,
- detail=detail
- )
-
-class InvalidCredentialsError(HTTPException):
- """Raised when login credentials are invalid."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=settings.AUTH_INVALID_CREDENTIALS,
- headers={settings.AUTH_HEADER_NAME: f"{settings.AUTH_HEADER_PREFIX} error=\"invalid_credentials\""}
- )
-
-class NotAuthenticatedError(HTTPException):
- """Raised when the user is not authenticated."""
- def __init__(self):
- super().__init__(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=settings.AUTH_NOT_AUTHENTICATED,
- headers={settings.AUTH_HEADER_NAME: f"{settings.AUTH_HEADER_PREFIX} error=\"not_authenticated\""}
- )
-
-class JWTError(HTTPException):
- """Raised when there is an error with the JWT token."""
- def __init__(self, error: str):
- super().__init__(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=settings.JWT_ERROR.format(error=error),
- headers={settings.AUTH_HEADER_NAME: f"{settings.AUTH_HEADER_PREFIX} error=\"invalid_token\""}
- )
-
-class JWTUnexpectedError(HTTPException):
- """Raised when there is an unexpected error with the JWT token."""
- def __init__(self, error: str):
- super().__init__(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=settings.JWT_UNEXPECTED_ERROR.format(error=error),
- headers={settings.AUTH_HEADER_NAME: f"{settings.AUTH_HEADER_PREFIX} error=\"invalid_token\""}
- )
-
-
-
-# app/crud/list.py
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.future import select
-from sqlalchemy.orm import selectinload, joinedload
-from sqlalchemy import or_, and_, delete as sql_delete, func as sql_func, desc
-from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
-from typing import Optional, List as PyList
-
-from app.schemas.list import ListStatus
-from app.models import List as ListModel, UserGroup as UserGroupModel, Item as ItemModel
-from app.schemas.list import ListCreate, ListUpdate
-from app.core.exceptions import (
- ListNotFoundError,
- ListPermissionError,
- ListCreatorRequiredError,
- DatabaseConnectionError,
- DatabaseIntegrityError,
- DatabaseQueryError,
- DatabaseTransactionError,
- ConflictError
-)
-
-async def create_list(db: AsyncSession, list_in: ListCreate, creator_id: int) -> ListModel:
- """Creates a new list record."""
- try:
- async with db.begin():
- db_list = ListModel(
- name=list_in.name,
- description=list_in.description,
- group_id=list_in.group_id,
- created_by_id=creator_id,
- is_complete=False
- )
- db.add(db_list)
- await db.flush()
- await db.refresh(db_list)
- return db_list
- except IntegrityError as e:
- raise DatabaseIntegrityError(f"Failed to create list: {str(e)}")
- except OperationalError as e:
- raise DatabaseConnectionError(f"Database connection error: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseTransactionError(f"Failed to create list: {str(e)}")
-
-async def get_lists_for_user(db: AsyncSession, user_id: int) -> PyList[ListModel]:
- """Gets all lists accessible by a user."""
- try:
- group_ids_result = await db.execute(
- select(UserGroupModel.group_id).where(UserGroupModel.user_id == user_id)
- )
- user_group_ids = group_ids_result.scalars().all()
-
- # Build conditions for the OR clause dynamically
- conditions = [
- and_(ListModel.created_by_id == user_id, ListModel.group_id.is_(None))
- ]
- if user_group_ids: # Only add the IN clause if there are group IDs
- conditions.append(ListModel.group_id.in_(user_group_ids))
-
- query = select(ListModel).where(or_(*conditions)).order_by(ListModel.updated_at.desc())
-
- result = await db.execute(query)
- return result.scalars().all()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query user lists: {str(e)}")
-
-async def get_list_by_id(db: AsyncSession, list_id: int, load_items: bool = False) -> Optional[ListModel]:
- """Gets a single list by ID, optionally loading its items."""
- try:
- query = select(ListModel).where(ListModel.id == list_id)
- if load_items:
- query = query.options(
- selectinload(ListModel.items)
- .options(
- joinedload(ItemModel.added_by_user),
- joinedload(ItemModel.completed_by_user)
- )
- )
- result = await db.execute(query)
- return result.scalars().first()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query list: {str(e)}")
-
-async def update_list(db: AsyncSession, list_db: ListModel, list_in: ListUpdate) -> ListModel:
- """Updates an existing list record, checking for version conflicts."""
- try:
- async with db.begin():
- if list_db.version != list_in.version:
- raise ConflictError(
- f"List '{list_db.name}' (ID: {list_db.id}) has been modified. "
- f"Your version is {list_in.version}, current version is {list_db.version}. Please refresh."
- )
-
- update_data = list_in.model_dump(exclude_unset=True, exclude={'version'})
-
- for key, value in update_data.items():
- setattr(list_db, key, value)
-
- list_db.version += 1
-
- db.add(list_db)
- await db.flush()
- await db.refresh(list_db)
- return list_db
- except IntegrityError as e:
- await db.rollback()
- raise DatabaseIntegrityError(f"Failed to update list due to integrity constraint: {str(e)}")
- except OperationalError as e:
- await db.rollback()
- raise DatabaseConnectionError(f"Database connection error while updating list: {str(e)}")
- except ConflictError:
- await db.rollback()
- raise
- except SQLAlchemyError as e:
- await db.rollback()
- raise DatabaseTransactionError(f"Failed to update list: {str(e)}")
-
-async def delete_list(db: AsyncSession, list_db: ListModel) -> None:
- """Deletes a list record. Version check should be done by the caller (API endpoint)."""
- try:
- async with db.begin():
- await db.delete(list_db)
- return None
- except OperationalError as e:
- await db.rollback()
- raise DatabaseConnectionError(f"Database connection error while deleting list: {str(e)}")
- except SQLAlchemyError as e:
- await db.rollback()
- raise DatabaseTransactionError(f"Failed to delete list: {str(e)}")
-
-async def check_list_permission(db: AsyncSession, list_id: int, user_id: int, require_creator: bool = False) -> ListModel:
- """Fetches a list and verifies user permission."""
- try:
- list_db = await get_list_by_id(db, list_id=list_id, load_items=True)
- if not list_db:
- raise ListNotFoundError(list_id)
-
- is_creator = list_db.created_by_id == user_id
-
- if require_creator:
- if not is_creator:
- raise ListCreatorRequiredError(list_id, "access")
- return list_db
-
- if is_creator:
- return list_db
-
- if list_db.group_id:
- from app.crud.group import is_user_member
- is_member = await is_user_member(db, group_id=list_db.group_id, user_id=user_id)
- if not is_member:
- raise ListPermissionError(list_id)
- return list_db
- else:
- raise ListPermissionError(list_id)
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to check list permissions: {str(e)}")
-
-async def get_list_status(db: AsyncSession, list_id: int) -> ListStatus:
- """Gets the update timestamps and item count for a list."""
- try:
- list_query = select(ListModel.updated_at).where(ListModel.id == list_id)
- list_result = await db.execute(list_query)
- list_updated_at = list_result.scalar_one_or_none()
-
- if list_updated_at is None:
- raise ListNotFoundError(list_id)
-
- item_status_query = (
- select(
- sql_func.max(ItemModel.updated_at).label("latest_item_updated_at"),
- sql_func.count(ItemModel.id).label("item_count")
- )
- .where(ItemModel.list_id == list_id)
- )
- item_result = await db.execute(item_status_query)
- item_status = item_result.first()
-
- return ListStatus(
- list_updated_at=list_updated_at,
- latest_item_updated_at=item_status.latest_item_updated_at if item_status else None,
- item_count=item_status.item_count if item_status else 0
- )
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to get list status: {str(e)}")
-
-
-
-# app/models.py
-import enum
-import secrets
-from datetime import datetime, timedelta, timezone
-
-from sqlalchemy import (
- Column,
- Integer,
- String,
- DateTime,
- ForeignKey,
- Boolean,
- Enum as SAEnum,
- UniqueConstraint,
- Index,
- DDL,
- event,
- delete,
- func,
- text as sa_text,
- Text, # <-- Add Text for description
- Numeric # <-- Add Numeric for price
-)
-from sqlalchemy.orm import relationship, backref
-
-from .database import Base
-
-# --- Enums ---
-class UserRoleEnum(enum.Enum):
- owner = "owner"
- member = "member"
-
-class SplitTypeEnum(enum.Enum):
- EQUAL = "EQUAL" # Split equally among all involved users
- EXACT_AMOUNTS = "EXACT_AMOUNTS" # Specific amounts for each user (defined in ExpenseSplit)
- PERCENTAGE = "PERCENTAGE" # Percentage for each user (defined in ExpenseSplit)
- SHARES = "SHARES" # Proportional to shares/units (defined in ExpenseSplit)
- ITEM_BASED = "ITEM_BASED" # If an expense is derived directly from item prices and who added them
- # Add more types as needed, e.g., UNPAID (for tracking debts not part of a formal expense)
-
-# --- User Model ---
-class User(Base):
- __tablename__ = "users"
-
- id = Column(Integer, primary_key=True, index=True)
- email = Column(String, unique=True, index=True, nullable=False)
- password_hash = Column(String, nullable=False)
- name = Column(String, index=True, nullable=True)
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
-
- # --- Relationships ---
- created_groups = relationship("Group", back_populates="creator")
- group_associations = relationship("UserGroup", back_populates="user", cascade="all, delete-orphan")
- created_invites = relationship("Invite", back_populates="creator")
-
- # --- NEW Relationships for Lists/Items ---
- created_lists = relationship("List", foreign_keys="List.created_by_id", back_populates="creator") # Link List.created_by_id -> User
- added_items = relationship("Item", foreign_keys="Item.added_by_id", back_populates="added_by_user") # Link Item.added_by_id -> User
- completed_items = relationship("Item", foreign_keys="Item.completed_by_id", back_populates="completed_by_user") # Link Item.completed_by_id -> User
- # --- End NEW Relationships ---
-
- # --- Relationships for Cost Splitting ---
- expenses_paid = relationship("Expense", foreign_keys="Expense.paid_by_user_id", back_populates="paid_by_user", cascade="all, delete-orphan")
- expense_splits = relationship("ExpenseSplit", foreign_keys="ExpenseSplit.user_id", back_populates="user", cascade="all, delete-orphan")
- settlements_made = relationship("Settlement", foreign_keys="Settlement.paid_by_user_id", back_populates="payer", cascade="all, delete-orphan")
- settlements_received = relationship("Settlement", foreign_keys="Settlement.paid_to_user_id", back_populates="payee", cascade="all, delete-orphan")
- # --- End Relationships for Cost Splitting ---
-
-
-# --- Group Model ---
-class Group(Base):
- __tablename__ = "groups"
-
- id = Column(Integer, primary_key=True, index=True)
- name = Column(String, index=True, nullable=False)
- created_by_id = Column(Integer, ForeignKey("users.id"), nullable=False)
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
-
- # --- Relationships ---
- creator = relationship("User", back_populates="created_groups")
- member_associations = relationship("UserGroup", back_populates="group", cascade="all, delete-orphan")
- invites = relationship("Invite", back_populates="group", cascade="all, delete-orphan")
-
- # --- NEW Relationship for Lists ---
- lists = relationship("List", back_populates="group", cascade="all, delete-orphan") # Link List.group_id -> Group
- # --- End NEW Relationship ---
-
- # --- Relationships for Cost Splitting ---
- expenses = relationship("Expense", foreign_keys="Expense.group_id", back_populates="group", cascade="all, delete-orphan")
- settlements = relationship("Settlement", foreign_keys="Settlement.group_id", back_populates="group", cascade="all, delete-orphan")
- # --- End Relationships for Cost Splitting ---
-
-
-# --- UserGroup Association Model ---
-class UserGroup(Base):
- __tablename__ = "user_groups"
- __table_args__ = (UniqueConstraint('user_id', 'group_id', name='uq_user_group'),)
-
- id = Column(Integer, primary_key=True, index=True)
- user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
- group_id = Column(Integer, ForeignKey("groups.id", ondelete="CASCADE"), nullable=False)
- role = Column(SAEnum(UserRoleEnum, name="userroleenum", create_type=True), nullable=False, default=UserRoleEnum.member)
- joined_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
-
- user = relationship("User", back_populates="group_associations")
- group = relationship("Group", back_populates="member_associations")
-
-
-# --- Invite Model ---
-class Invite(Base):
- __tablename__ = "invites"
- __table_args__ = (
- Index('ix_invites_active_code', 'code', unique=True, postgresql_where=sa_text('is_active = true')),
- )
-
- id = Column(Integer, primary_key=True, index=True)
- code = Column(String, unique=False, index=True, nullable=False, default=lambda: secrets.token_urlsafe(16))
- group_id = Column(Integer, ForeignKey("groups.id", ondelete="CASCADE"), nullable=False)
- created_by_id = Column(Integer, ForeignKey("users.id"), nullable=False)
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- expires_at = Column(DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc) + timedelta(days=7))
- is_active = Column(Boolean, default=True, nullable=False)
-
- group = relationship("Group", back_populates="invites")
- creator = relationship("User", back_populates="created_invites")
-
-
-# === NEW: List Model ===
-class List(Base):
- __tablename__ = "lists"
-
- id = Column(Integer, primary_key=True, index=True)
- name = Column(String, index=True, nullable=False)
- description = Column(Text, nullable=True)
- created_by_id = Column(Integer, ForeignKey("users.id"), nullable=False) # Who created this list
- group_id = Column(Integer, ForeignKey("groups.id"), nullable=True) # Which group it belongs to (NULL if personal)
- is_complete = Column(Boolean, default=False, nullable=False)
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
- version = Column(Integer, nullable=False, default=1, server_default='1')
-
- # --- Relationships ---
- creator = relationship("User", back_populates="created_lists") # Link to User.created_lists
- group = relationship("Group", back_populates="lists") # Link to Group.lists
- items = relationship("Item", back_populates="list", cascade="all, delete-orphan", order_by="Item.created_at") # Link to Item.list, cascade deletes
-
- # --- Relationships for Cost Splitting ---
- expenses = relationship("Expense", foreign_keys="Expense.list_id", back_populates="list", cascade="all, delete-orphan")
- # --- End Relationships for Cost Splitting ---
-
-
-# === NEW: Item Model ===
-class Item(Base):
- __tablename__ = "items"
-
- id = Column(Integer, primary_key=True, index=True)
- list_id = Column(Integer, ForeignKey("lists.id", ondelete="CASCADE"), nullable=False) # Belongs to which list
- name = Column(String, index=True, nullable=False)
- quantity = Column(String, nullable=True) # Flexible quantity (e.g., "1", "2 lbs", "a bunch")
- is_complete = Column(Boolean, default=False, nullable=False)
- price = Column(Numeric(10, 2), nullable=True) # For cost splitting later (e.g., 12345678.99)
- added_by_id = Column(Integer, ForeignKey("users.id"), nullable=False) # Who added this item
- completed_by_id = Column(Integer, ForeignKey("users.id"), nullable=True) # Who marked it complete
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
- version = Column(Integer, nullable=False, default=1, server_default='1')
-
- # --- Relationships ---
- list = relationship("List", back_populates="items") # Link to List.items
- added_by_user = relationship("User", foreign_keys=[added_by_id], back_populates="added_items") # Link to User.added_items
- completed_by_user = relationship("User", foreign_keys=[completed_by_id], back_populates="completed_items") # Link to User.completed_items
-
- # --- Relationships for Cost Splitting ---
- # If an item directly results in an expense, or an expense can be tied to an item.
- expenses = relationship("Expense", back_populates="item") # An item might have multiple associated expenses
- # --- End Relationships for Cost Splitting ---
-
-
-# === NEW Models for Advanced Cost Splitting ===
-
-class Expense(Base):
- __tablename__ = "expenses"
-
- id = Column(Integer, primary_key=True, index=True)
- description = Column(String, nullable=False)
- total_amount = Column(Numeric(10, 2), nullable=False)
- currency = Column(String, nullable=False, default="USD") # Consider making this an Enum too if few currencies
- expense_date = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- split_type = Column(SAEnum(SplitTypeEnum, name="splittypeenum", create_type=True), nullable=False)
-
- # Foreign Keys
- list_id = Column(Integer, ForeignKey("lists.id"), nullable=True)
- group_id = Column(Integer, ForeignKey("groups.id"), nullable=True) # If not list-specific but group-specific
- item_id = Column(Integer, ForeignKey("items.id"), nullable=True) # If the expense is for a specific item
- paid_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
-
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
- version = Column(Integer, nullable=False, default=1, server_default='1')
-
- # Relationships
- paid_by_user = relationship("User", foreign_keys=[paid_by_user_id], back_populates="expenses_paid")
- list = relationship("List", foreign_keys=[list_id], back_populates="expenses")
- group = relationship("Group", foreign_keys=[group_id], back_populates="expenses")
- item = relationship("Item", foreign_keys=[item_id], back_populates="expenses")
- splits = relationship("ExpenseSplit", back_populates="expense", cascade="all, delete-orphan")
-
- __table_args__ = (
- # Example: Ensure either list_id or group_id is present if item_id is null
- # CheckConstraint('(item_id IS NOT NULL) OR (list_id IS NOT NULL) OR (group_id IS NOT NULL)', name='chk_expense_context'),
- )
-
-class ExpenseSplit(Base):
- __tablename__ = "expense_splits"
- __table_args__ = (UniqueConstraint('expense_id', 'user_id', name='uq_expense_user_split'),)
-
- id = Column(Integer, primary_key=True, index=True)
- expense_id = Column(Integer, ForeignKey("expenses.id", ondelete="CASCADE"), nullable=False)
- user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
-
- owed_amount = Column(Numeric(10, 2), nullable=False) # For EQUAL or EXACT_AMOUNTS
- # For PERCENTAGE split (value from 0.00 to 100.00)
- share_percentage = Column(Numeric(5, 2), nullable=True)
- # For SHARES split (e.g., user A has 2 shares, user B has 3 shares)
- share_units = Column(Integer, nullable=True)
-
- # is_settled might be better tracked via actual Settlement records or a reconciliation process
- # is_settled = Column(Boolean, default=False, nullable=False)
-
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
-
- # Relationships
- expense = relationship("Expense", back_populates="splits")
- user = relationship("User", foreign_keys=[user_id], back_populates="expense_splits")
-
-
-class Settlement(Base):
- __tablename__ = "settlements"
-
- id = Column(Integer, primary_key=True, index=True)
- group_id = Column(Integer, ForeignKey("groups.id"), nullable=False) # Settlements usually within a group
- paid_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
- paid_to_user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
- amount = Column(Numeric(10, 2), nullable=False)
- settlement_date = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- description = Column(Text, nullable=True)
-
- created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
- updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
- version = Column(Integer, nullable=False, default=1, server_default='1')
-
- # Relationships
- group = relationship("Group", foreign_keys=[group_id], back_populates="settlements")
- payer = relationship("User", foreign_keys=[paid_by_user_id], back_populates="settlements_made")
- payee = relationship("User", foreign_keys=[paid_to_user_id], back_populates="settlements_received")
-
- __table_args__ = (
- # Ensure payer and payee are different users
- # CheckConstraint('paid_by_user_id <> paid_to_user_id', name='chk_settlement_payer_ne_payee'),
- )
-
-# Potential future: PaymentMethod model, etc.
-
-
-
-
-
-
-
Your Groups
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {{ group.name }}
-
-
-
-
- You are not a member of any groups yet.
-
-
-
-
-
-
-
-
Create New Group
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
{{ pageTitle }}
-
-
-
-
Loading lists...
-
-
-
-
-
-
- {{ error }}
-
-
-
-
-
-
-
{{ noListsMessage }}
-
-
-
-
-
-
- {{ list.name }}
- {{ list.description || 'No description' }}
-
- Personal List
-
-
- Group List (ID: {{ list.group_id }})
-
-
-
-
-
- Updated: {{ new Date(list.updated_at).toLocaleDateString() }}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-# app/crud/item.py
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.future import select
-from sqlalchemy import delete as sql_delete, update as sql_update # Use aliases
-from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
-from typing import Optional, List as PyList
-from datetime import datetime, timezone
-
-from app.models import Item as ItemModel
-from app.schemas.item import ItemCreate, ItemUpdate
-from app.core.exceptions import (
- ItemNotFoundError,
- DatabaseConnectionError,
- DatabaseIntegrityError,
- DatabaseQueryError,
- DatabaseTransactionError,
- ConflictError
-)
-
-async def create_item(db: AsyncSession, item_in: ItemCreate, list_id: int, user_id: int) -> ItemModel:
- """Creates a new item record for a specific list."""
- try:
- db_item = ItemModel(
- name=item_in.name,
- quantity=item_in.quantity,
- list_id=list_id,
- added_by_id=user_id,
- is_complete=False # Default on creation
- # version is implicitly set to 1 by model default
- )
- db.add(db_item)
- await db.flush()
- await db.refresh(db_item)
- await db.commit() # Explicitly commit here
- return db_item
- except IntegrityError as e:
- await db.rollback() # Rollback on integrity error
- raise DatabaseIntegrityError(f"Failed to create item: {str(e)}")
- except OperationalError as e:
- await db.rollback() # Rollback on operational error
- raise DatabaseConnectionError(f"Database connection error: {str(e)}")
- except SQLAlchemyError as e:
- await db.rollback() # Rollback on other SQLAlchemy errors
- raise DatabaseTransactionError(f"Failed to create item: {str(e)}")
- except Exception as e: # Catch any other exception and attempt rollback
- await db.rollback()
- raise # Re-raise the original exception
-
-async def get_items_by_list_id(db: AsyncSession, list_id: int) -> PyList[ItemModel]:
- """Gets all items belonging to a specific list, ordered by creation time."""
- try:
- result = await db.execute(
- select(ItemModel)
- .where(ItemModel.list_id == list_id)
- .order_by(ItemModel.created_at.asc()) # Or desc() if preferred
- )
- return result.scalars().all()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query items: {str(e)}")
-
-async def get_item_by_id(db: AsyncSession, item_id: int) -> Optional[ItemModel]:
- """Gets a single item by its ID."""
- try:
- result = await db.execute(select(ItemModel).where(ItemModel.id == item_id))
- return result.scalars().first()
- except OperationalError as e:
- raise DatabaseConnectionError(f"Failed to connect to database: {str(e)}")
- except SQLAlchemyError as e:
- raise DatabaseQueryError(f"Failed to query item: {str(e)}")
-
-async def update_item(db: AsyncSession, item_db: ItemModel, item_in: ItemUpdate, user_id: int) -> ItemModel:
- """Updates an existing item record, checking for version conflicts."""
- try:
- # Check version conflict
- if item_db.version != item_in.version:
- raise ConflictError(
- f"Item '{item_db.name}' (ID: {item_db.id}) has been modified. "
- f"Your version is {item_in.version}, current version is {item_db.version}. Please refresh."
- )
-
- update_data = item_in.model_dump(exclude_unset=True, exclude={'version'}) # Exclude version
-
- # Special handling for is_complete
- if 'is_complete' in update_data:
- if update_data['is_complete'] is True:
- if item_db.completed_by_id is None: # Only set if not already completed by someone
- update_data['completed_by_id'] = user_id
- else:
- update_data['completed_by_id'] = None # Clear if marked incomplete
-
- # Apply updates
- for key, value in update_data.items():
- setattr(item_db, key, value)
-
- item_db.version += 1 # Increment version
-
- db.add(item_db)
- await db.flush()
- await db.refresh(item_db)
-
- # Commit the transaction if not part of a larger transaction
- await db.commit()
-
- return item_db
- except IntegrityError as e:
- await db.rollback()
- raise DatabaseIntegrityError(f"Failed to update item due to integrity constraint: {str(e)}")
- except OperationalError as e:
- await db.rollback()
- raise DatabaseConnectionError(f"Database connection error while updating item: {str(e)}")
- except ConflictError: # Re-raise ConflictError
- await db.rollback()
- raise
- except SQLAlchemyError as e:
- await db.rollback()
- raise DatabaseTransactionError(f"Failed to update item: {str(e)}")
-
-async def delete_item(db: AsyncSession, item_db: ItemModel) -> None:
- """Deletes an item record. Version check should be done by the caller (API endpoint)."""
- try:
- await db.delete(item_db)
- await db.commit()
- return None
- except OperationalError as e:
- await db.rollback()
- raise DatabaseConnectionError(f"Database connection error while deleting item: {str(e)}")
- except SQLAlchemyError as e:
- await db.rollback()
- raise DatabaseTransactionError(f"Failed to delete item: {str(e)}")
-
-
-
-
-
-