""" Async migrations handler for FastAPI application. This file is separate from env.py to avoid Alembic context issues. """ import os import sys from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import pool from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext from alembic.operations import Operations # Ensure the app directory is in the Python path sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..'))) from app.database import Base as DatabaseBase from app.config import settings async def run_migrations(): """Run database migrations asynchronously.""" # Get alembic configuration and script directory alembic_cfg = Config(os.path.join(os.path.dirname(__file__), '..', 'alembic.ini')) script_directory = ScriptDirectory.from_config(alembic_cfg) # Create async engine engine = create_async_engine( settings.DATABASE_URL, poolclass=pool.NullPool, ) async with engine.connect() as connection: def get_current_rev(conn): migration_context = MigrationContext.configure( conn, opts={ 'target_metadata': DatabaseBase.metadata, 'script': script_directory } ) return migration_context.get_current_revision() current_rev = await connection.run_sync(get_current_rev) def upgrade_to_head(conn): migration_context = MigrationContext.configure( conn, opts={ 'target_metadata': DatabaseBase.metadata, 'script': script_directory, 'as_sql': False, } ) with migration_context.begin_transaction(): migration_context._migrations_fn = script_directory._upgrade_revs( "head", current_rev ) migration_context.run_migrations() await connection.run_sync(upgrade_to_head) await engine.dispose()