Merge pull request 'refactor: Improve Alembic migration functions by integrating configuration and script directory handling for enhanced migration context management' (#31) from ph4 into prod

Reviewed-on: #31
This commit is contained in:
mo 2025-06-01 17:42:33 +02:00
commit 26315cd407

View File

@ -6,6 +6,10 @@ import os
import sys import sys
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import pool from sqlalchemy import pool
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations
# Ensure the app directory is in the Python path # Ensure the app directory is in the Python path
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..'))) sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), '..')))
@ -15,8 +19,9 @@ from app.config import settings
async def run_migrations(): async def run_migrations():
"""Run database migrations asynchronously.""" """Run database migrations asynchronously."""
from alembic.runtime.migration import MigrationContext # Get alembic configuration and script directory
from alembic.operations import Operations alembic_cfg = Config(os.path.join(os.path.dirname(__file__), '..', 'alembic.ini'))
script_directory = ScriptDirectory.from_config(alembic_cfg)
# Create async engine # Create async engine
engine = create_async_engine( engine = create_async_engine(
@ -25,33 +30,34 @@ async def run_migrations():
) )
async with engine.connect() as connection: async with engine.connect() as connection:
# Get current database schema version def get_current_rev(conn):
def do_get_current_rev(conn):
migration_context = MigrationContext.configure( migration_context = MigrationContext.configure(
conn, conn,
opts={ opts={
'target_metadata': DatabaseBase.metadata, 'target_metadata': DatabaseBase.metadata,
'compare_type': True, 'script': script_directory
'compare_server_default': True
} }
) )
return migration_context.get_current_revision() return migration_context.get_current_revision()
current_rev = await connection.run_sync(do_get_current_rev) current_rev = await connection.run_sync(get_current_rev)
# Run migrations def upgrade_to_head(conn):
def do_upgrade(conn):
migration_context = MigrationContext.configure( migration_context = MigrationContext.configure(
conn, conn,
opts={ opts={
'target_metadata': DatabaseBase.metadata, 'target_metadata': DatabaseBase.metadata,
'compare_type': True, 'script': script_directory,
'compare_server_default': True 'as_sql': False,
} }
) )
with Operations.context(migration_context):
with migration_context.begin_transaction():
migration_context._migrations_fn = script_directory._upgrade_revs(
"head", current_rev
)
migration_context.run_migrations() migration_context.run_migrations()
await connection.run_sync(do_upgrade) await connection.run_sync(upgrade_to_head)
await engine.dispose() await engine.dispose()