chenzhaoyang
2025-12-17 063da0bf961e1d35e25dc107f883f7492f4c5a7c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
---
description: 
globs: **/migrations/*.py
alwaysApply: false
---
# Async Migrations Best Practices
 
## When to Use Async Migrations
- Long-running schema changes (e.g. index creation on very large tables modifications)
- Operations that risk exceeding CI/CD migration timeouts (e.g. > 5 minutes)
- DDL (Data Definition Language - it's a subset of SQL statements used to define and modify the structure of a database) that blocks writes if run normally (concurrent index builds, data backfills)
- To avoid production downtime during heavy migrations or data migrations
 
## Why Can't We Use Simple Django Migrations with Concurrency?
 
Even though Django provides `AddIndexConcurrently` and similar operations, they still cause deployment problems:
 
**The Migration Runner Problem:**
- Django's migration runner is fundamentally **synchronous** 
- It waits for each operation to complete before marking the migration as "Applied"
- Even `AddIndexConcurrently` blocks the **CI/CD pipeline** until the index finishes building (the database itself is not blocked)
- Large tables (100M+ rows) can take **hours** to build indexes, far exceeding CI/CD timeouts (usually 5-15 minutes)
 
**What Happens During Deployment:**
```python
# This STILL blocks the CI/CD deployment process
operations = [
    AddIndexConcurrently(
        model_name="task", 
        index=BrinIndex(fields=["updated_at"])
    )
]
# ↑ CI/CD pipeline waits here until index creation completes
# Database writes are NOT blocked, but deployment fails if it takes longer than timeout
```
 
## How Async Migrations Work
- Mark the migration `atomic = False` to disable wrapping in a transaction
- In migration, a custom operation enqueues the real DDL via `start_job_async_or_sync` that is running as asyncronios rqworker job
- DDL uses `CREATE INDEX CONCURRENTLY ...` (or `DROP INDEX CONCURRENTLY`) for non-blocking behavior
- Track progress in a table (e.g. `AsyncMigrationStatus`) before and after execution
- Background workers (RQ, Celery) execute the migration job independently of the main process
- **Migration completes immediately** after queuing the job, allowing deployment to proceed
 
## Example Template
```python
from django.db import migrations, connection
from django.conf import settings
from core.redis import start_job_async_or_sync
from core.models import AsyncMigrationStatus
import logging
 
logger = logging.getLogger(__name__)
migration_name = '0054_add_brin_index_updated_at'
 
# Actual DDL to run
def forward_migration(migration_name):
    migration = AsyncMigrationStatus.objects.create(
        name=migration_name,
        status=AsyncMigrationStatus.STATUS_STARTED,
    )
    logger.debug(f'Start async migration {migration_name}')
    
    # Check database backend and use appropriate SQL
    if connection.vendor == 'postgresql':
        # PostgreSQL: Use CONCURRENTLY and specific index types (BRIN, GIN, etc.)
        sql = '''
        CREATE INDEX CONCURRENTLY IF NOT EXISTS "task_updated_at_brin_idx" 
        ON "task" USING BRIN ("updated_at");
        '''
    else:
        # SQLite/Other: Fallback to standard B-tree index
        sql = '''
        CREATE INDEX IF NOT EXISTS "task_updated_at_brin_idx" 
        ON "task" ("updated_at");
        '''
    
    with connection.cursor() as cursor:
        cursor.execute(sql)
    
    migration.status = AsyncMigrationStatus.STATUS_FINISHED
    migration.save()
    logger.debug(f'Async migration {migration_name} complete')
 
# Reverse DDL
def reverse_migration(migration_name):
    migration = AsyncMigrationStatus.objects.create(
        name=migration_name,
        status=AsyncMigrationStatus.STATUS_STARTED,
    )
    logger.debug(f'Start async migration rollback {migration_name}')
    
    # Drop index (handle database differences)
    if connection.vendor == 'postgresql':
        sql = 'DROP INDEX CONCURRENTLY IF EXISTS "task_updated_at_brin_idx";'
    else:
        sql = 'DROP INDEX IF EXISTS "task_updated_at_brin_idx";'
    
    with connection.cursor() as cursor:
        cursor.execute(sql)
    
    migration.status = AsyncMigrationStatus.STATUS_FINISHED
    migration.save()
    logger.debug(f'Async migration rollback {migration_name} complete')
 
# Hook into Django migration
def forwards(apps, schema_editor):
    start_job_async_or_sync(forward_migration, migration_name=migration_name)
 
def backwards(apps, schema_editor):
    start_job_async_or_sync(reverse_migration, migration_name=migration_name)
 
class Migration(migrations.Migration):
    atomic = False
    dependencies = [
        ('tasks', '0053_annotation_bulk_created'),
    ]
    operations = [
        migrations.RunPython(forwards, backwards),
    ]
```  
 
## Other Important Points
- Label Studio uses two databases: SQLite and Postgres. All migrations should be designed for both of 
- Always use `CREATE INDEX CONCURRENTLY` and `DROP INDEX CONCURRENTLY` for non-blocking index operations in Postgres
- Check `connection.vendor` to handle database differences (PostgreSQL vs SQLite/others)
- SQLite doesn't support CONCURRENTLY, BRIN, GIN, or other PostgreSQL-specific features
- Ensure `atomic = False` so the concurrent DDL can run outside a transaction
- Monitor and retry async jobs on failure; ensure your worker pool is healthy
- Test async migrations in a staging environment with realistic data volumes
- Clean up completed migrations and maintain migration history for on-prem rollouts