from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.cron import CronTrigger from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from app.core.config import settings from app.jobs.recurring_expenses import generate_recurring_expenses from app.db.session import async_session import logging logger = logging.getLogger(__name__) # Configure the scheduler jobstores = { 'default': SQLAlchemyJobStore(url=settings.SQLALCHEMY_DATABASE_URI) } executors = { 'default': ThreadPoolExecutor(20) } job_defaults = { 'coalesce': False, 'max_instances': 1 } scheduler = AsyncIOScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='UTC' ) async def run_recurring_expenses_job(): """Wrapper function to run the recurring expenses job with a database session.""" try: async with async_session() as session: await generate_recurring_expenses(session) except Exception as e: logger.error(f"Error running recurring expenses job: {str(e)}") raise def init_scheduler(): """Initialize and start the scheduler.""" try: # Add the recurring expenses job scheduler.add_job( run_recurring_expenses_job, trigger=CronTrigger(hour=0, minute=0), # Run at midnight UTC id='generate_recurring_expenses', name='Generate Recurring Expenses', replace_existing=True ) # Start the scheduler scheduler.start() logger.info("Scheduler started successfully") except Exception as e: logger.error(f"Error initializing scheduler: {str(e)}") raise def shutdown_scheduler(): """Shutdown the scheduler gracefully.""" try: scheduler.shutdown() logger.info("Scheduler shut down successfully") except Exception as e: logger.error(f"Error shutting down scheduler: {str(e)}") raise