import logging from typing import Callable, Tuple from core.redis import start_job_async_or_sync from django.conf import settings from django.db import connection from rq import Retry logger = logging.getLogger(__name__) def execute_sql_job(*, migration_name: str, sql: str, apply_on_sqlite: bool = False, reverse: bool = False) -> None: from core.models import AsyncMigrationStatus if not reverse: migration, created = AsyncMigrationStatus.objects.get_or_create( name=migration_name, defaults={'status': AsyncMigrationStatus.STATUS_STARTED}, ) if not created and migration.status == AsyncMigrationStatus.STATUS_FINISHED: logger.info(f'Migration {migration_name} already executed with status FINISHED') return if migration.status == AsyncMigrationStatus.STATUS_SCHEDULED: migration.status = AsyncMigrationStatus.STATUS_STARTED migration.save() try: if connection.vendor == 'sqlite' and not apply_on_sqlite: logger.info('SQLite detected; skipping SQL execution as requested') else: with connection.cursor() as cursor: cursor.execute(sql) migration.status = AsyncMigrationStatus.STATUS_FINISHED migration.save() except Exception as e: logger.exception(f'Migration {migration_name} failed: {e}') migration.status = AsyncMigrationStatus.STATUS_ERROR if not migration.meta: migration.meta = {} migration.meta['error'] = str(e) migration.save() raise else: # Reverse path: don't create/update AsyncMigrationStatus. Just run SQL. try: if connection.vendor == 'sqlite' and not apply_on_sqlite: logger.info('SQLite detected; skipping SQL execution as requested (reverse)') return with connection.cursor() as cursor: cursor.execute(sql) except Exception as e: logger.exception(f'Reverse migration {migration_name} failed: {e}') raise def make_sql_migration( sql_forwards: str, sql_backwards: str, *, apply_on_sqlite: bool = False, execute_immediately: bool = False, migration_name: str | None = None, ) -> Tuple[Callable, Callable]: """Return (forwards, backwards) for migrations.RunPython. - forwards: either schedules job or marks as SCHEDULED - backwards: always schedules job to execute reverse SQL """ if not migration_name: raise ValueError("make_sql_migration requires explicit migration_name like 'app_label:migration_module'") mig_key = migration_name def forwards(apps, schema_editor): # noqa: ARG001 if schema_editor.connection.vendor == 'sqlite' and not apply_on_sqlite: logger.info('Skipping migration for SQLite (apply_on_sqlite=False)') return should_execute = execute_immediately or not settings.ALLOW_SCHEDULED_MIGRATIONS if should_execute: start_job_async_or_sync( execute_sql_job, migration_name=mig_key, sql=sql_forwards, apply_on_sqlite=apply_on_sqlite, reverse=False, retry=Retry(max=3, interval=[60, 300, 1800]), ) else: AsyncMigrationStatus = apps.get_model('core', 'AsyncMigrationStatus') AsyncMigrationStatus.objects.get_or_create( name=mig_key, defaults={'status': 'SCHEDULED'}, ) def backwards(apps, schema_editor): # noqa: ARG001 start_job_async_or_sync( execute_sql_job, migration_name=mig_key, sql=sql_backwards, apply_on_sqlite=apply_on_sqlite, reverse=True, retry=Retry(max=3, interval=[60, 300, 1800]), ) return forwards, backwards