Bin
2025-12-17 2b99d77d73ba568beff0a549534017caaad8a6de
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
from typing import Callable, Tuple
 
from core.redis import start_job_async_or_sync
from django.conf import settings
from django.db import connection
from rq import Retry
 
logger = logging.getLogger(__name__)
 
 
def execute_sql_job(*, migration_name: str, sql: str, apply_on_sqlite: bool = False, reverse: bool = False) -> None:
    from core.models import AsyncMigrationStatus
 
    if not reverse:
        migration, created = AsyncMigrationStatus.objects.get_or_create(
            name=migration_name,
            defaults={'status': AsyncMigrationStatus.STATUS_STARTED},
        )
        if not created and migration.status == AsyncMigrationStatus.STATUS_FINISHED:
            logger.info(f'Migration {migration_name} already executed with status FINISHED')
            return
        if migration.status == AsyncMigrationStatus.STATUS_SCHEDULED:
            migration.status = AsyncMigrationStatus.STATUS_STARTED
            migration.save()
 
        try:
            if connection.vendor == 'sqlite' and not apply_on_sqlite:
                logger.info('SQLite detected; skipping SQL execution as requested')
            else:
                with connection.cursor() as cursor:
                    cursor.execute(sql)
            migration.status = AsyncMigrationStatus.STATUS_FINISHED
            migration.save()
        except Exception as e:
            logger.exception(f'Migration {migration_name} failed: {e}')
            migration.status = AsyncMigrationStatus.STATUS_ERROR
            if not migration.meta:
                migration.meta = {}
            migration.meta['error'] = str(e)
            migration.save()
            raise
    else:
        # Reverse path: don't create/update AsyncMigrationStatus. Just run SQL.
        try:
            if connection.vendor == 'sqlite' and not apply_on_sqlite:
                logger.info('SQLite detected; skipping SQL execution as requested (reverse)')
                return
            with connection.cursor() as cursor:
                cursor.execute(sql)
        except Exception as e:
            logger.exception(f'Reverse migration {migration_name} failed: {e}')
            raise
 
 
def make_sql_migration(
    sql_forwards: str,
    sql_backwards: str,
    *,
    apply_on_sqlite: bool = False,
    execute_immediately: bool = False,
    migration_name: str | None = None,
) -> Tuple[Callable, Callable]:
    """Return (forwards, backwards) for migrations.RunPython.
 
    - forwards: either schedules job or marks as SCHEDULED
    - backwards: always schedules job to execute reverse SQL
    """
    if not migration_name:
        raise ValueError("make_sql_migration requires explicit migration_name like 'app_label:migration_module'")
    mig_key = migration_name
 
    def forwards(apps, schema_editor):  # noqa: ARG001
        if schema_editor.connection.vendor == 'sqlite' and not apply_on_sqlite:
            logger.info('Skipping migration for SQLite (apply_on_sqlite=False)')
            return
        should_execute = execute_immediately or not settings.ALLOW_SCHEDULED_MIGRATIONS
        if should_execute:
            start_job_async_or_sync(
                execute_sql_job,
                migration_name=mig_key,
                sql=sql_forwards,
                apply_on_sqlite=apply_on_sqlite,
                reverse=False,
                retry=Retry(max=3, interval=[60, 300, 1800]),
            )
        else:
            AsyncMigrationStatus = apps.get_model('core', 'AsyncMigrationStatus')
            AsyncMigrationStatus.objects.get_or_create(
                name=mig_key,
                defaults={'status': 'SCHEDULED'},
            )
 
    def backwards(apps, schema_editor):  # noqa: ARG001
        start_job_async_or_sync(
            execute_sql_job,
            migration_name=mig_key,
            sql=sql_backwards,
            apply_on_sqlite=apply_on_sqlite,
            reverse=True,
            retry=Retry(max=3, interval=[60, 300, 1800]),
        )
 
    return forwards, backwards