Bin
2025-12-16 9e0b2ba2c317b1a86212f24cbae3195ad1f3dbfa
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging
import time
 
from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import connections, transaction
 
logger = logging.getLogger(__name__)
 
DEFAULT_LOCK_ID = getattr(settings, 'MIGRATE_LOCK_ID', 1000)
 
LOCKED_MIGRATE_CMD_CONNECTION_ALIAS = 'locked_migrate_cmd_connection'
RETRY_INTERVAL = 5  # Time to wait between retries in seconds
MAX_WAIT_TIME = 300  # Maximum time to wait for the lock in seconds (5 minutes)
 
 
class Command(MigrateCommand):
    help = 'Run Django migrations safely, using a lock'
 
    def add_arguments(self, parser):
        MigrateCommand.add_arguments(self, parser)
        parser.add_argument(
            '--migrate-lock-id',
            default=DEFAULT_LOCK_ID,
            type=int,
            help='The id of the advisory lock to use',
        )
 
    def handle(self, *args, **options):
        lock_id = options.pop('migrate_lock_id')
 
        # Create a separate database connection to hold the lock
        separate_lock_connection = connections.create_connection('default')
        connections[LOCKED_MIGRATE_CMD_CONNECTION_ALIAS] = separate_lock_connection
        try:
            # Use a transaction to hold the lock for the duration of the migration
            with transaction.atomic(using=LOCKED_MIGRATE_CMD_CONNECTION_ALIAS):
                # Attempt to acquire the lock with retries
                self.acquire_lock_with_retry(separate_lock_connection, lock_id)
                # Run the standard Django migration once lock is acquired
                super().handle(*args, **options)
            logger.info('Migration complete, the migration lock has now been released.')
        finally:
            # Ensure the lock connection is closed to free resources
            separate_lock_connection.close()
 
    def acquire_lock_with_retry(self, lock_connection, lock_id):
        start_time = time.time()
 
        while True:
            with lock_connection.cursor() as cursor:
                logger.info(f'Attempting to acquire the postgres advisory transaction lock with id: {lock_id}.')
 
                # Attempt to acquire the transaction-level lock without blocking
                cursor.execute(f'SELECT pg_try_advisory_xact_lock({lock_id})')
                lock_acquired = cursor.fetchone()[0]
 
                if lock_acquired:
                    logger.info('Acquired the transaction lock, proceeding with migration.')
                    return  # Exit the function if the lock is acquired
 
                # Check if the maximum wait time has been reached
                elapsed_time = time.time() - start_time
                if elapsed_time >= MAX_WAIT_TIME:
                    logger.info('Could not acquire the transaction lock within the timeout period.')
                    raise TimeoutError('Failed to acquire PostgreSQL advisory transaction lock within 5 minutes.')
 
                # Wait before retrying
                logger.info(f'Lock not acquired. Retrying in {RETRY_INTERVAL} seconds...')
                time.sleep(RETRY_INTERVAL)