
- Introduced a new `RecurrencePattern` model to manage recurrence details for expenses, allowing for daily, weekly, monthly, and yearly patterns. - Updated the `Expense` model to include fields for recurrence management, such as `is_recurring`, `recurrence_pattern_id`, and `next_occurrence`. - Modified the database schema to reflect these changes, including alterations to existing columns and the removal of obsolete fields. - Enhanced the expense creation logic to accommodate recurring expenses and updated related CRUD operations accordingly. - Implemented necessary migrations to ensure database integrity and support for the new features.
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
|
from app.config import settings
|
|
from app.jobs.recurring_expenses import generate_recurring_expenses
|
|
from app.db.session import async_session
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Convert async database URL to sync URL for APScheduler
|
|
# Replace postgresql+asyncpg:// with postgresql://
|
|
sync_db_url = settings.DATABASE_URL.replace('postgresql+asyncpg://', 'postgresql://')
|
|
|
|
# Configure the scheduler
|
|
jobstores = {
|
|
'default': SQLAlchemyJobStore(url=sync_db_url)
|
|
}
|
|
|
|
executors = {
|
|
'default': ThreadPoolExecutor(20)
|
|
}
|
|
|
|
job_defaults = {
|
|
'coalesce': False,
|
|
'max_instances': 1
|
|
}
|
|
|
|
scheduler = AsyncIOScheduler(
|
|
jobstores=jobstores,
|
|
executors=executors,
|
|
job_defaults=job_defaults,
|
|
timezone='UTC'
|
|
)
|
|
|
|
async def run_recurring_expenses_job():
|
|
"""Wrapper function to run the recurring expenses job with a database session."""
|
|
try:
|
|
async with async_session() as session:
|
|
await generate_recurring_expenses(session)
|
|
except Exception as e:
|
|
logger.error(f"Error running recurring expenses job: {str(e)}")
|
|
raise
|
|
|
|
def init_scheduler():
|
|
"""Initialize and start the scheduler."""
|
|
try:
|
|
# Add the recurring expenses job
|
|
scheduler.add_job(
|
|
run_recurring_expenses_job,
|
|
trigger=CronTrigger(hour=0, minute=0), # Run at midnight UTC
|
|
id='generate_recurring_expenses',
|
|
name='Generate Recurring Expenses',
|
|
replace_existing=True
|
|
)
|
|
|
|
# Start the scheduler
|
|
scheduler.start()
|
|
logger.info("Scheduler started successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing scheduler: {str(e)}")
|
|
raise
|
|
|
|
def shutdown_scheduler():
|
|
"""Shutdown the scheduler gracefully."""
|
|
try:
|
|
scheduler.shutdown()
|
|
logger.info("Scheduler shut down successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error shutting down scheduler: {str(e)}")
|
|
raise |