""" Async migrations handler for FastAPI application. This file is separate from env.py to avoid Alembic context issues. """ import os import sys from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import pool from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext from alembic.operations import Operations # Ensure the app directory is in the Python path sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..'))) from app.database import Base as DatabaseBase from app.config import settings def _get_migration_fn(script_directory, current_rev): """Create a migration function that knows how to upgrade from current revision.""" def migration_fn(rev, context): # Get all upgrade steps from current revision to head revisions = script_directory._upgrade_revs("head", current_rev) for revision_step in revisions: # Access the revision string from the RevisionStep object script = script_directory.get_revision(revision_step.revision) script.module.upgrade(context) return migration_fn async def run_migrations(): """Run database migrations asynchronously.""" # Get alembic configuration and script directory alembic_cfg = Config(os.path.join(os.path.dirname(__file__), '..', 'alembic.ini')) script_directory = ScriptDirectory.from_config(alembic_cfg) # Create async engine engine = create_async_engine( settings.DATABASE_URL, poolclass=pool.NullPool, ) async with engine.connect() as connection: def get_current_rev(conn): migration_context = MigrationContext.configure( conn, opts={ 'target_metadata': DatabaseBase.metadata, 'script': script_directory } ) return migration_context.get_current_revision() current_rev = await connection.run_sync(get_current_rev) def upgrade_to_head(conn): migration_context = MigrationContext.configure( conn, opts={ 'target_metadata': DatabaseBase.metadata, 'script': script_directory, 'as_sql': False, } ) # Set the migration function migration_context._migrations_fn = _get_migration_fn(script_directory, current_rev) with migration_context.begin_transaction(): migration_context.run_migrations() await connection.run_sync(upgrade_to_head) await engine.dispose()