
All checks were successful
Deploy to Production, build images and push to Gitea Registry / build_and_push (pull_request) Successful in 1m23s
- Updated the `check_list_access_for_financials` function to allow access for list creators and members. - Refactored the `list_expenses` endpoint to support filtering by `list_id`, `group_id`, and `isRecurring`, providing more flexible expense retrieval options. - Introduced a new `read_list_expenses` endpoint to fetch expenses associated with a specific list, ensuring proper permission checks. - Enhanced expense retrieval logic in the `get_expenses_for_list` and `get_user_accessible_expenses` functions to include settlement activities. - Updated frontend API configuration to reflect new endpoint paths and ensure consistency across the application.
658 lines
36 KiB
Python
658 lines
36 KiB
Python
# app/api/v1/endpoints/financials.py
|
|
import logging
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query, Response
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import joinedload
|
|
from typing import List as PyList, Optional, Sequence
|
|
|
|
from app.database import get_transactional_session
|
|
from app.auth import current_active_user
|
|
from app.models import (
|
|
User as UserModel,
|
|
Group as GroupModel,
|
|
List as ListModel,
|
|
UserGroup as UserGroupModel,
|
|
UserRoleEnum,
|
|
ExpenseSplit as ExpenseSplitModel
|
|
)
|
|
from app.schemas.expense import (
|
|
ExpenseCreate, ExpensePublic,
|
|
SettlementCreate, SettlementPublic,
|
|
ExpenseUpdate, SettlementUpdate
|
|
)
|
|
from app.schemas.settlement_activity import SettlementActivityCreate, SettlementActivityPublic # Added
|
|
from app.crud import expense as crud_expense
|
|
from app.crud import settlement as crud_settlement
|
|
from app.crud import settlement_activity as crud_settlement_activity # Added
|
|
from app.crud import group as crud_group
|
|
from app.crud import list as crud_list
|
|
from app.core.exceptions import (
|
|
ListNotFoundError, GroupNotFoundError, UserNotFoundError,
|
|
InvalidOperationError, GroupPermissionError, ListPermissionError,
|
|
ItemNotFoundError, GroupMembershipError
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
# --- Helper for permissions ---
|
|
async def check_list_access_for_financials(db: AsyncSession, list_id: int, user_id: int, action: str = "access financial data for"):
|
|
try:
|
|
await crud_list.check_list_permission(db=db, list_id=list_id, user_id=user_id, require_creator=False)
|
|
except ListPermissionError as e:
|
|
logger.warning(f"ListPermissionError in check_list_access_for_financials for list {list_id}, user {user_id}, action '{action}': {e.detail}")
|
|
raise ListPermissionError(list_id, action=action)
|
|
except ListNotFoundError:
|
|
raise
|
|
|
|
# --- Expense Endpoints ---
|
|
@router.post(
|
|
"/expenses",
|
|
response_model=ExpensePublic,
|
|
status_code=status.HTTP_201_CREATED,
|
|
summary="Create New Expense",
|
|
tags=["Expenses"]
|
|
)
|
|
async def create_new_expense(
|
|
expense_in: ExpenseCreate,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} creating expense: {expense_in.description}")
|
|
effective_group_id = expense_in.group_id
|
|
is_group_context = False
|
|
|
|
if expense_in.list_id:
|
|
# Check basic access to list (implies membership if list is in group)
|
|
await check_list_access_for_financials(db, expense_in.list_id, current_user.id, action="create expenses for")
|
|
list_obj = await db.get(ListModel, expense_in.list_id)
|
|
if not list_obj:
|
|
raise ListNotFoundError(expense_in.list_id)
|
|
if list_obj.group_id:
|
|
if expense_in.group_id and list_obj.group_id != expense_in.group_id:
|
|
raise InvalidOperationError(f"List {list_obj.id} belongs to group {list_obj.group_id}, not group {expense_in.group_id} specified in expense.")
|
|
effective_group_id = list_obj.group_id
|
|
is_group_context = True # Expense is tied to a group via the list
|
|
elif expense_in.group_id:
|
|
raise InvalidOperationError(f"Personal list {list_obj.id} cannot have expense associated with group {expense_in.group_id}.")
|
|
# If list is personal, no group check needed yet, handled by payer check below.
|
|
|
|
elif effective_group_id: # Only group_id provided for expense
|
|
is_group_context = True
|
|
# Ensure user is at least a member to create expense in group context
|
|
await crud_group.check_group_membership(db, group_id=effective_group_id, user_id=current_user.id, action="create expenses for")
|
|
else:
|
|
# This case should ideally be caught by earlier checks if list_id was present but list was personal.
|
|
# If somehow reached, it means no list_id and no group_id.
|
|
raise InvalidOperationError("Expense must be linked to a list_id or group_id.")
|
|
|
|
# Finalize expense payload with correct group_id if derived
|
|
expense_in_final = expense_in.model_copy(update={"group_id": effective_group_id})
|
|
|
|
# --- Granular Permission Check for Payer ---
|
|
if expense_in_final.paid_by_user_id != current_user.id:
|
|
logger.warning(f"User {current_user.email} attempting to create expense paid by other user {expense_in_final.paid_by_user_id}")
|
|
# If creating expense paid by someone else, user MUST be owner IF in group context
|
|
if is_group_context and effective_group_id:
|
|
try:
|
|
await crud_group.check_user_role_in_group(db, group_id=effective_group_id, user_id=current_user.id, required_role=UserRoleEnum.owner, action="create expense paid by another user")
|
|
except GroupPermissionError as e:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Only group owners can create expenses paid by others. {str(e)}")
|
|
else:
|
|
# Cannot create expense paid by someone else for a personal list (no group context)
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create expense paid by another user for a personal list.")
|
|
# If paying for self, basic list/group membership check above is sufficient.
|
|
|
|
try:
|
|
created_expense = await crud_expense.create_expense(db=db, expense_in=expense_in_final, current_user_id=current_user.id)
|
|
logger.info(f"Expense '{created_expense.description}' (ID: {created_expense.id}) created successfully.")
|
|
return created_expense
|
|
except (UserNotFoundError, ListNotFoundError, GroupNotFoundError, InvalidOperationError, GroupMembershipError) as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
except NotImplementedError as e:
|
|
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error creating expense: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
|
|
|
|
@router.get("/expenses/{expense_id}", response_model=ExpensePublic, summary="Get Expense by ID", tags=["Expenses"])
|
|
async def get_expense(
|
|
expense_id: int,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} requesting expense ID {expense_id}")
|
|
expense = await crud_expense.get_expense_by_id(db, expense_id=expense_id)
|
|
if not expense:
|
|
raise ItemNotFoundError(item_id=expense_id)
|
|
|
|
if expense.list_id:
|
|
await check_list_access_for_financials(db, expense.list_id, current_user.id)
|
|
elif expense.group_id:
|
|
await crud_group.check_group_membership(db, group_id=expense.group_id, user_id=current_user.id)
|
|
elif expense.paid_by_user_id != current_user.id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to view this expense")
|
|
return expense
|
|
|
|
@router.get("/expenses", response_model=PyList[ExpensePublic], summary="List Expenses", tags=["Expenses"])
|
|
async def list_expenses(
|
|
list_id: Optional[int] = Query(None, description="Filter by list ID"),
|
|
group_id: Optional[int] = Query(None, description="Filter by group ID"),
|
|
isRecurring: Optional[bool] = Query(None, description="Filter by recurring expenses"),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=200),
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
"""
|
|
List expenses with optional filters.
|
|
If list_id is provided, returns expenses for that list (user must have list access).
|
|
If group_id is provided, returns expenses for that group (user must be group member).
|
|
If both are provided, returns expenses for the list (list_id takes precedence).
|
|
If neither is provided, returns all expenses the user has access to.
|
|
"""
|
|
logger.info(f"User {current_user.email} listing expenses with filters: list_id={list_id}, group_id={group_id}, isRecurring={isRecurring}")
|
|
|
|
if list_id:
|
|
# Use existing list expenses endpoint logic
|
|
await check_list_access_for_financials(db, list_id, current_user.id)
|
|
expenses = await crud_expense.get_expenses_for_list(db, list_id=list_id, skip=skip, limit=limit)
|
|
elif group_id:
|
|
# Use existing group expenses endpoint logic
|
|
await crud_group.check_group_membership(db, group_id=group_id, user_id=current_user.id, action="list expenses for")
|
|
expenses = await crud_expense.get_expenses_for_group(db, group_id=group_id, skip=skip, limit=limit)
|
|
else:
|
|
# Get all expenses the user has access to (user's personal expenses + group expenses + list expenses)
|
|
expenses = await crud_expense.get_user_accessible_expenses(db, user_id=current_user.id, skip=skip, limit=limit)
|
|
|
|
# Apply recurring filter if specified
|
|
if isRecurring is not None:
|
|
expenses = [expense for expense in expenses if bool(expense.recurrence_rule) == isRecurring]
|
|
|
|
return expenses
|
|
|
|
@router.get("/groups/{group_id}/expenses", response_model=PyList[ExpensePublic], summary="List Expenses for a Group", tags=["Expenses", "Groups"])
|
|
async def list_group_expenses(
|
|
group_id: int,
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=200),
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} listing expenses for group ID {group_id}")
|
|
await crud_group.check_group_membership(db, group_id=group_id, user_id=current_user.id, action="list expenses for")
|
|
expenses = await crud_expense.get_expenses_for_group(db, group_id=group_id, skip=skip, limit=limit)
|
|
return expenses
|
|
|
|
@router.put("/expenses/{expense_id}", response_model=ExpensePublic, summary="Update Expense", tags=["Expenses"])
|
|
async def update_expense_details(
|
|
expense_id: int,
|
|
expense_in: ExpenseUpdate,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
"""
|
|
Updates an existing expense (description, currency, expense_date only).
|
|
Requires the current version number for optimistic locking.
|
|
User must have permission to modify the expense (e.g., be the payer or group admin).
|
|
"""
|
|
logger.info(f"User {current_user.email} attempting to update expense ID {expense_id} (version {expense_in.version})")
|
|
expense_db = await crud_expense.get_expense_by_id(db, expense_id=expense_id)
|
|
if not expense_db:
|
|
raise ItemNotFoundError(item_id=expense_id)
|
|
|
|
# --- Granular Permission Check ---
|
|
can_modify = False
|
|
# 1. User paid for the expense
|
|
if expense_db.paid_by_user_id == current_user.id:
|
|
can_modify = True
|
|
# 2. OR User is owner of the group the expense belongs to
|
|
elif expense_db.group_id:
|
|
try:
|
|
await crud_group.check_user_role_in_group(db, group_id=expense_db.group_id, user_id=current_user.id, required_role=UserRoleEnum.owner, action="modify group expenses")
|
|
can_modify = True
|
|
logger.info(f"Allowing update for expense {expense_id} by group owner {current_user.email}")
|
|
except GroupMembershipError: # User not even a member
|
|
pass # Keep can_modify as False
|
|
except GroupPermissionError: # User is member but not owner
|
|
pass # Keep can_modify as False
|
|
except GroupNotFoundError: # Group doesn't exist (data integrity issue)
|
|
logger.error(f"Group {expense_db.group_id} not found for expense {expense_id} during update check.")
|
|
pass # Keep can_modify as False
|
|
# Note: If expense is only linked to a personal list (no group), only payer can modify.
|
|
|
|
if not can_modify:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User cannot modify this expense (must be payer or group owner)")
|
|
|
|
try:
|
|
updated_expense = await crud_expense.update_expense(db=db, expense_db=expense_db, expense_in=expense_in)
|
|
logger.info(f"Expense ID {expense_id} updated successfully to version {updated_expense.version}.")
|
|
return updated_expense
|
|
except InvalidOperationError as e:
|
|
# Check if it's a version conflict (409) or other validation error (400)
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
if "version" in str(e).lower():
|
|
status_code = status.HTTP_409_CONFLICT
|
|
raise HTTPException(status_code=status_code, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error updating expense {expense_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
|
|
|
|
@router.delete("/expenses/{expense_id}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete Expense", tags=["Expenses"])
|
|
async def delete_expense_record(
|
|
expense_id: int,
|
|
expected_version: Optional[int] = Query(None, description="Expected version for optimistic locking"),
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
"""
|
|
Deletes an expense and its associated splits.
|
|
Requires expected_version query parameter for optimistic locking.
|
|
User must have permission to delete the expense (e.g., be the payer or group admin).
|
|
"""
|
|
logger.info(f"User {current_user.email} attempting to delete expense ID {expense_id} (expected version {expected_version})")
|
|
expense_db = await crud_expense.get_expense_by_id(db, expense_id=expense_id)
|
|
if not expense_db:
|
|
# Return 204 even if not found, as the end state is achieved (item is gone)
|
|
logger.warning(f"Attempt to delete non-existent expense ID {expense_id}")
|
|
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
|
# Alternatively, raise NotFoundError(detail=f"Expense {expense_id} not found") -> 404
|
|
|
|
# --- Granular Permission Check ---
|
|
can_delete = False
|
|
# 1. User paid for the expense
|
|
if expense_db.paid_by_user_id == current_user.id:
|
|
can_delete = True
|
|
# 2. OR User is owner of the group the expense belongs to
|
|
elif expense_db.group_id:
|
|
try:
|
|
await crud_group.check_user_role_in_group(db, group_id=expense_db.group_id, user_id=current_user.id, required_role=UserRoleEnum.owner, action="delete group expenses")
|
|
can_delete = True
|
|
logger.info(f"Allowing delete for expense {expense_id} by group owner {current_user.email}")
|
|
except GroupMembershipError:
|
|
pass
|
|
except GroupPermissionError:
|
|
pass
|
|
except GroupNotFoundError:
|
|
logger.error(f"Group {expense_db.group_id} not found for expense {expense_id} during delete check.")
|
|
pass
|
|
|
|
if not can_delete:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User cannot delete this expense (must be payer or group owner)")
|
|
|
|
try:
|
|
await crud_expense.delete_expense(db=db, expense_db=expense_db, expected_version=expected_version)
|
|
logger.info(f"Expense ID {expense_id} deleted successfully.")
|
|
# No need to return content on 204
|
|
except InvalidOperationError as e:
|
|
# Check if it's a version conflict (409) or other validation error (400)
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
if "version" in str(e).lower():
|
|
status_code = status.HTTP_409_CONFLICT
|
|
raise HTTPException(status_code=status_code, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error deleting expense {expense_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
|
|
|
|
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
|
|
|
# --- Settlement Activity Endpoints (for ExpenseSplits) ---
|
|
@router.post(
|
|
"/expense_splits/{expense_split_id}/settle",
|
|
response_model=SettlementActivityPublic,
|
|
status_code=status.HTTP_201_CREATED,
|
|
summary="Record a Settlement Activity for an Expense Split",
|
|
tags=["Expenses", "Settlements"]
|
|
)
|
|
async def record_settlement_for_expense_split(
|
|
expense_split_id: int,
|
|
activity_in: SettlementActivityCreate,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} attempting to record settlement for expense_split_id {expense_split_id} with amount {activity_in.amount_paid}")
|
|
|
|
if activity_in.expense_split_id != expense_split_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Expense split ID in path does not match expense split ID in request body."
|
|
)
|
|
|
|
# Fetch the ExpenseSplit and its parent Expense to check context (group/list)
|
|
stmt = (
|
|
select(ExpenseSplitModel)
|
|
.options(joinedload(ExpenseSplitModel.expense)) # Load parent expense
|
|
.where(ExpenseSplitModel.id == expense_split_id)
|
|
)
|
|
result = await db.execute(stmt)
|
|
expense_split = result.scalar_one_or_none()
|
|
|
|
if not expense_split:
|
|
raise ItemNotFoundError(item_id=expense_split_id, detail_suffix="Expense split not found.")
|
|
|
|
parent_expense = expense_split.expense
|
|
if not parent_expense:
|
|
# Should not happen if data integrity is maintained
|
|
logger.error(f"Data integrity issue: ExpenseSplit {expense_split_id} has no parent Expense.")
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Associated expense not found for this split.")
|
|
|
|
# --- Permission Checks ---
|
|
# The user performing the action (current_user) must be either:
|
|
# 1. The person who is making the payment (activity_in.paid_by_user_id).
|
|
# 2. An owner of the group, if the expense is tied to a group.
|
|
#
|
|
# Additionally, the payment (activity_in.paid_by_user_id) should ideally be made by the user who owes the split (expense_split.user_id).
|
|
# For simplicity, we'll first check if current_user is the one making the payment.
|
|
# More complex scenarios (e.g., a group owner settling on behalf of someone) are handled next.
|
|
|
|
can_record_settlement = False
|
|
if current_user.id == activity_in.paid_by_user_id:
|
|
# User is recording their own payment. This is allowed if they are the one who owes this split,
|
|
# or if they are paying for someone else and have group owner rights (covered below).
|
|
# We also need to ensure the person *being paid for* (activity_in.paid_by_user_id) is actually the one who owes this split.
|
|
if activity_in.paid_by_user_id != expense_split.user_id:
|
|
# Allow if current_user is group owner (checked next)
|
|
pass # Will be checked by group owner logic
|
|
else:
|
|
can_record_settlement = True # User is settling their own owed split
|
|
logger.info(f"User {current_user.email} is settling their own expense split {expense_split_id}.")
|
|
|
|
|
|
if not can_record_settlement and parent_expense.group_id:
|
|
try:
|
|
# Check if current_user is an owner of the group associated with the expense
|
|
await crud_group.check_user_role_in_group(
|
|
db,
|
|
group_id=parent_expense.group_id,
|
|
user_id=current_user.id,
|
|
required_role=UserRoleEnum.owner,
|
|
action="record settlement activities for group members"
|
|
)
|
|
can_record_settlement = True
|
|
logger.info(f"Group owner {current_user.email} is recording settlement for expense split {expense_split_id} in group {parent_expense.group_id}.")
|
|
except (GroupPermissionError, GroupMembershipError, GroupNotFoundError):
|
|
# If not group owner, and not settling own split, then permission denied.
|
|
pass # can_record_settlement remains False
|
|
|
|
if not can_record_settlement:
|
|
logger.warning(f"User {current_user.email} does not have permission to record settlement for expense split {expense_split_id}.")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You do not have permission to record this settlement activity. Must be the payer or a group owner."
|
|
)
|
|
|
|
# Final check: if someone is recording a payment for a split, the `paid_by_user_id` in the activity
|
|
# should match the `user_id` of the `ExpenseSplit` (the person who owes).
|
|
# The above permissions allow the current_user to *initiate* this, but the data itself must be consistent.
|
|
if activity_in.paid_by_user_id != expense_split.user_id:
|
|
logger.warning(f"Attempt to record settlement for expense split {expense_split_id} where activity payer ({activity_in.paid_by_user_id}) "
|
|
f"does not match split owner ({expense_split.user_id}). Only allowed if current_user is group owner and recording on behalf of split owner.")
|
|
# This scenario is tricky. If a group owner is settling for someone, they *might* set paid_by_user_id to the split owner.
|
|
# The current permission model allows the group owner to act. The crucial part is that the activity links to the correct split owner.
|
|
# If the intent is "current_user (owner) pays on behalf of expense_split.user_id", then activity_in.paid_by_user_id should be expense_split.user_id
|
|
# and current_user.id is the one performing the action (created_by_user_id in settlement_activity model).
|
|
# The CRUD `create_settlement_activity` will set `created_by_user_id` to `current_user.id`.
|
|
# The main point is that `activity_in.paid_by_user_id` should be the person whose debt is being cleared.
|
|
if current_user.id != expense_split.user_id and not (parent_expense.group_id and await crud_group.is_user_role_in_group(db, group_id=parent_expense.group_id, user_id=current_user.id, role=UserRoleEnum.owner)):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"The payer ID ({activity_in.paid_by_user_id}) in the settlement activity must match the user ID of the expense split owner ({expense_split.user_id}), unless you are a group owner acting on their behalf."
|
|
)
|
|
|
|
|
|
try:
|
|
created_activity = await crud_settlement_activity.create_settlement_activity(
|
|
db=db,
|
|
settlement_activity_in=activity_in,
|
|
current_user_id=current_user.id
|
|
)
|
|
logger.info(f"Settlement activity {created_activity.id} recorded for expense split {expense_split_id} by user {current_user.email}")
|
|
return created_activity
|
|
except UserNotFoundError as e:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User referenced in settlement activity not found: {str(e)}")
|
|
except InvalidOperationError as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error recording settlement activity for expense_split_id {expense_split_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred while recording settlement activity.")
|
|
|
|
@router.get(
|
|
"/expense_splits/{expense_split_id}/settlement_activities",
|
|
response_model=PyList[SettlementActivityPublic],
|
|
summary="List Settlement Activities for an Expense Split",
|
|
tags=["Expenses", "Settlements"]
|
|
)
|
|
async def list_settlement_activities_for_split(
|
|
expense_split_id: int,
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=200),
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} listing settlement activities for expense_split_id {expense_split_id}")
|
|
|
|
# Fetch the ExpenseSplit and its parent Expense to check context (group/list) for permissions
|
|
stmt = (
|
|
select(ExpenseSplitModel)
|
|
.options(joinedload(ExpenseSplitModel.expense)) # Load parent expense
|
|
.where(ExpenseSplitModel.id == expense_split_id)
|
|
)
|
|
result = await db.execute(stmt)
|
|
expense_split = result.scalar_one_or_none()
|
|
|
|
if not expense_split:
|
|
raise ItemNotFoundError(item_id=expense_split_id, detail_suffix="Expense split not found.")
|
|
|
|
parent_expense = expense_split.expense
|
|
if not parent_expense:
|
|
logger.error(f"Data integrity issue: ExpenseSplit {expense_split_id} has no parent Expense.")
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Associated expense not found for this split.")
|
|
|
|
# --- Permission Check (similar to viewing an expense) ---
|
|
# User must have access to the parent expense.
|
|
can_view_activities = False
|
|
if parent_expense.list_id:
|
|
try:
|
|
await check_list_access_for_financials(db, parent_expense.list_id, current_user.id, action="view settlement activities for list expense")
|
|
can_view_activities = True
|
|
except (ListPermissionError, ListNotFoundError):
|
|
pass # Keep can_view_activities False
|
|
elif parent_expense.group_id:
|
|
try:
|
|
await crud_group.check_group_membership(db, group_id=parent_expense.group_id, user_id=current_user.id, action="view settlement activities for group expense")
|
|
can_view_activities = True
|
|
except (GroupMembershipError, GroupNotFoundError):
|
|
pass # Keep can_view_activities False
|
|
elif parent_expense.paid_by_user_id == current_user.id or expense_split.user_id == current_user.id :
|
|
# If expense is not tied to list/group (e.g. item-based personal expense),
|
|
# allow if current user paid the expense OR is the one who owes this specific split.
|
|
can_view_activities = True
|
|
|
|
if not can_view_activities:
|
|
logger.warning(f"User {current_user.email} does not have permission to view settlement activities for expense split {expense_split_id}.")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You do not have permission to view settlement activities for this expense split."
|
|
)
|
|
|
|
activities = await crud_settlement_activity.get_settlement_activities_for_split(
|
|
db=db, expense_split_id=expense_split_id, skip=skip, limit=limit
|
|
)
|
|
return activities
|
|
|
|
|
|
# --- Settlement Endpoints ---
|
|
@router.post(
|
|
"/settlements",
|
|
response_model=SettlementPublic,
|
|
status_code=status.HTTP_201_CREATED,
|
|
summary="Record New Settlement",
|
|
tags=["Settlements"]
|
|
)
|
|
async def create_new_settlement(
|
|
settlement_in: SettlementCreate,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} recording settlement in group {settlement_in.group_id}")
|
|
await crud_group.check_group_membership(db, group_id=settlement_in.group_id, user_id=current_user.id, action="record settlements in")
|
|
try:
|
|
await crud_group.check_group_membership(db, group_id=settlement_in.group_id, user_id=settlement_in.paid_by_user_id, action="be a payer in this group's settlement")
|
|
await crud_group.check_group_membership(db, group_id=settlement_in.group_id, user_id=settlement_in.paid_to_user_id, action="be a payee in this group's settlement")
|
|
except GroupMembershipError as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Payer or payee issue: {str(e)}")
|
|
except GroupNotFoundError as e:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
|
|
|
|
try:
|
|
created_settlement = await crud_settlement.create_settlement(db=db, settlement_in=settlement_in, current_user_id=current_user.id)
|
|
logger.info(f"Settlement ID {created_settlement.id} recorded successfully in group {settlement_in.group_id}.")
|
|
return created_settlement
|
|
except (UserNotFoundError, GroupNotFoundError, InvalidOperationError, GroupMembershipError) as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error recording settlement: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
|
|
|
|
@router.get("/settlements/{settlement_id}", response_model=SettlementPublic, summary="Get Settlement by ID", tags=["Settlements"])
|
|
async def get_settlement(
|
|
settlement_id: int,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} requesting settlement ID {settlement_id}")
|
|
settlement = await crud_settlement.get_settlement_by_id(db, settlement_id=settlement_id)
|
|
if not settlement:
|
|
raise ItemNotFoundError(item_id=settlement_id)
|
|
|
|
is_party_to_settlement = current_user.id in [settlement.paid_by_user_id, settlement.paid_to_user_id]
|
|
try:
|
|
await crud_group.check_group_membership(db, group_id=settlement.group_id, user_id=current_user.id)
|
|
except GroupMembershipError:
|
|
if not is_party_to_settlement:
|
|
raise GroupMembershipError(settlement.group_id, action="view this settlement's details")
|
|
logger.info(f"User {current_user.email} (party to settlement) viewing settlement {settlement_id} for group {settlement.group_id}.")
|
|
return settlement
|
|
|
|
@router.get("/groups/{group_id}/settlements", response_model=PyList[SettlementPublic], summary="List Settlements for a Group", tags=["Settlements", "Groups"])
|
|
async def list_group_settlements(
|
|
group_id: int,
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=200),
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
logger.info(f"User {current_user.email} listing settlements for group ID {group_id}")
|
|
await crud_group.check_group_membership(db, group_id=group_id, user_id=current_user.id, action="list settlements for this group")
|
|
settlements = await crud_settlement.get_settlements_for_group(db, group_id=group_id, skip=skip, limit=limit)
|
|
return settlements
|
|
|
|
@router.put("/settlements/{settlement_id}", response_model=SettlementPublic, summary="Update Settlement", tags=["Settlements"])
|
|
async def update_settlement_details(
|
|
settlement_id: int,
|
|
settlement_in: SettlementUpdate,
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
"""
|
|
Updates an existing settlement (description, settlement_date only).
|
|
Requires the current version number for optimistic locking.
|
|
User must have permission (e.g., be involved party or group admin).
|
|
"""
|
|
logger.info(f"User {current_user.email} attempting to update settlement ID {settlement_id} (version {settlement_in.version})")
|
|
settlement_db = await crud_settlement.get_settlement_by_id(db, settlement_id=settlement_id)
|
|
if not settlement_db:
|
|
raise ItemNotFoundError(item_id=settlement_id)
|
|
|
|
# --- Granular Permission Check ---
|
|
can_modify = False
|
|
# 1. User is involved party (payer or payee)
|
|
is_party = current_user.id in [settlement_db.paid_by_user_id, settlement_db.paid_to_user_id]
|
|
if is_party:
|
|
can_modify = True
|
|
# 2. OR User is owner of the group the settlement belongs to
|
|
# Note: Settlements always have a group_id based on current model
|
|
elif settlement_db.group_id:
|
|
try:
|
|
await crud_group.check_user_role_in_group(db, group_id=settlement_db.group_id, user_id=current_user.id, required_role=UserRoleEnum.owner, action="modify group settlements")
|
|
can_modify = True
|
|
logger.info(f"Allowing update for settlement {settlement_id} by group owner {current_user.email}")
|
|
except GroupMembershipError:
|
|
pass
|
|
except GroupPermissionError:
|
|
pass
|
|
except GroupNotFoundError:
|
|
logger.error(f"Group {settlement_db.group_id} not found for settlement {settlement_id} during update check.")
|
|
pass
|
|
|
|
if not can_modify:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User cannot modify this settlement (must be involved party or group owner)")
|
|
|
|
try:
|
|
updated_settlement = await crud_settlement.update_settlement(db=db, settlement_db=settlement_db, settlement_in=settlement_in)
|
|
logger.info(f"Settlement ID {settlement_id} updated successfully to version {updated_settlement.version}.")
|
|
return updated_settlement
|
|
except InvalidOperationError as e:
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
if "version" in str(e).lower():
|
|
status_code = status.HTTP_409_CONFLICT
|
|
raise HTTPException(status_code=status_code, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error updating settlement {settlement_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
|
|
|
|
@router.delete("/settlements/{settlement_id}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete Settlement", tags=["Settlements"])
|
|
async def delete_settlement_record(
|
|
settlement_id: int,
|
|
expected_version: Optional[int] = Query(None, description="Expected version for optimistic locking"),
|
|
db: AsyncSession = Depends(get_transactional_session),
|
|
current_user: UserModel = Depends(current_active_user),
|
|
):
|
|
"""
|
|
Deletes a settlement.
|
|
Requires expected_version query parameter for optimistic locking.
|
|
User must have permission (e.g., be involved party or group admin).
|
|
"""
|
|
logger.info(f"User {current_user.email} attempting to delete settlement ID {settlement_id} (expected version {expected_version})")
|
|
settlement_db = await crud_settlement.get_settlement_by_id(db, settlement_id=settlement_id)
|
|
if not settlement_db:
|
|
logger.warning(f"Attempt to delete non-existent settlement ID {settlement_id}")
|
|
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
|
|
|
# --- Granular Permission Check ---
|
|
can_delete = False
|
|
# 1. User is involved party (payer or payee)
|
|
is_party = current_user.id in [settlement_db.paid_by_user_id, settlement_db.paid_to_user_id]
|
|
if is_party:
|
|
can_delete = True
|
|
# 2. OR User is owner of the group the settlement belongs to
|
|
elif settlement_db.group_id:
|
|
try:
|
|
await crud_group.check_user_role_in_group(db, group_id=settlement_db.group_id, user_id=current_user.id, required_role=UserRoleEnum.owner, action="delete group settlements")
|
|
can_delete = True
|
|
logger.info(f"Allowing delete for settlement {settlement_id} by group owner {current_user.email}")
|
|
except GroupMembershipError:
|
|
pass
|
|
except GroupPermissionError:
|
|
pass
|
|
except GroupNotFoundError:
|
|
logger.error(f"Group {settlement_db.group_id} not found for settlement {settlement_id} during delete check.")
|
|
pass
|
|
|
|
if not can_delete:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User cannot delete this settlement (must be involved party or group owner)")
|
|
|
|
try:
|
|
await crud_settlement.delete_settlement(db=db, settlement_db=settlement_db, expected_version=expected_version)
|
|
logger.info(f"Settlement ID {settlement_id} deleted successfully.")
|
|
except InvalidOperationError as e:
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
if "version" in str(e).lower():
|
|
status_code = status.HTTP_409_CONFLICT
|
|
raise HTTPException(status_code=status_code, detail=str(e))
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error deleting settlement {settlement_id}: {str(e)}", exc_info=True)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
|
|
|
|
return Response(status_code=status.HTTP_204_NO_CONTENT) |