import logging import time from django.conf import settings from django.core.management.commands.migrate import Command as MigrateCommand from django.db import connections, transaction logger = logging.getLogger(__name__) DEFAULT_LOCK_ID = getattr(settings, 'MIGRATE_LOCK_ID', 1000) LOCKED_MIGRATE_CMD_CONNECTION_ALIAS = 'locked_migrate_cmd_connection' RETRY_INTERVAL = 5 # Time to wait between retries in seconds MAX_WAIT_TIME = 300 # Maximum time to wait for the lock in seconds (5 minutes) class Command(MigrateCommand): help = 'Run Django migrations safely, using a lock' def add_arguments(self, parser): MigrateCommand.add_arguments(self, parser) parser.add_argument( '--migrate-lock-id', default=DEFAULT_LOCK_ID, type=int, help='The id of the advisory lock to use', ) def handle(self, *args, **options): lock_id = options.pop('migrate_lock_id') # Create a separate database connection to hold the lock separate_lock_connection = connections.create_connection('default') connections[LOCKED_MIGRATE_CMD_CONNECTION_ALIAS] = separate_lock_connection try: # Use a transaction to hold the lock for the duration of the migration with transaction.atomic(using=LOCKED_MIGRATE_CMD_CONNECTION_ALIAS): # Attempt to acquire the lock with retries self.acquire_lock_with_retry(separate_lock_connection, lock_id) # Run the standard Django migration once lock is acquired super().handle(*args, **options) logger.info('Migration complete, the migration lock has now been released.') finally: # Ensure the lock connection is closed to free resources separate_lock_connection.close() def acquire_lock_with_retry(self, lock_connection, lock_id): start_time = time.time() while True: with lock_connection.cursor() as cursor: logger.info(f'Attempting to acquire the postgres advisory transaction lock with id: {lock_id}.') # Attempt to acquire the transaction-level lock without blocking cursor.execute(f'SELECT pg_try_advisory_xact_lock({lock_id})') lock_acquired = cursor.fetchone()[0] if lock_acquired: logger.info('Acquired the transaction lock, proceeding with migration.') return # Exit the function if the lock is acquired # Check if the maximum wait time has been reached elapsed_time = time.time() - start_time if elapsed_time >= MAX_WAIT_TIME: logger.info('Could not acquire the transaction lock within the timeout period.') raise TimeoutError('Failed to acquire PostgreSQL advisory transaction lock within 5 minutes.') # Wait before retrying logger.info(f'Lock not acquired. Retrying in {RETRY_INTERVAL} seconds...') time.sleep(RETRY_INTERVAL)