from unittest.mock import MagicMock, patch
|
|
import pytest
|
from core.migration_helpers import execute_sql_job, make_sql_migration
|
from core.models import AsyncMigrationStatus
|
from django.test import TestCase, override_settings
|
|
|
class TestExecuteSqlJob(TestCase):
|
"""Test execute_sql_job function."""
|
|
def setUp(self):
|
self.migration_name = 'test.migrations.test_migration'
|
self.sql = 'CREATE INDEX test_idx ON test_table (col1);'
|
|
@patch('core.migration_helpers.connection')
|
def test_creates_migration_status_record(self, mock_connection):
|
"""Test that a new AsyncMigrationStatus record is created."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'postgresql'
|
|
execute_sql_job(migration_name=self.migration_name, sql=self.sql)
|
|
migration = AsyncMigrationStatus.objects.get(name=self.migration_name)
|
assert migration.status == AsyncMigrationStatus.STATUS_FINISHED
|
mock_cursor.execute.assert_called_once_with(self.sql)
|
|
@patch('core.migration_helpers.connection')
|
def test_skips_if_already_finished(self, mock_connection):
|
"""Test that migration is skipped if already finished."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'postgresql'
|
|
# Create finished migration
|
AsyncMigrationStatus.objects.create(
|
name=self.migration_name,
|
status=AsyncMigrationStatus.STATUS_FINISHED,
|
)
|
|
execute_sql_job(migration_name=self.migration_name, sql=self.sql)
|
|
# SQL should not be executed
|
mock_cursor.execute.assert_not_called()
|
|
@patch('core.migration_helpers.connection')
|
def test_updates_scheduled_to_started(self, mock_connection):
|
"""Test that SCHEDULED status is updated to STARTED before execution."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'postgresql'
|
|
# Create scheduled migration
|
migration = AsyncMigrationStatus.objects.create(
|
name=self.migration_name,
|
status=AsyncMigrationStatus.STATUS_SCHEDULED,
|
)
|
|
execute_sql_job(migration_name=self.migration_name, sql=self.sql)
|
|
migration.refresh_from_db()
|
assert migration.status == AsyncMigrationStatus.STATUS_FINISHED
|
mock_cursor.execute.assert_called_once_with(self.sql)
|
|
@patch('core.migration_helpers.connection')
|
def test_skips_sqlite_when_requested(self, mock_connection):
|
"""Test that SQLite is skipped when apply_on_sqlite=False."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'sqlite'
|
|
execute_sql_job(
|
migration_name=self.migration_name,
|
sql=self.sql,
|
apply_on_sqlite=False,
|
)
|
|
migration = AsyncMigrationStatus.objects.get(name=self.migration_name)
|
assert migration.status == AsyncMigrationStatus.STATUS_FINISHED
|
# SQL should not be executed on SQLite
|
mock_cursor.execute.assert_not_called()
|
|
@patch('core.migration_helpers.connection')
|
def test_executes_on_sqlite_when_requested(self, mock_connection):
|
"""Test that SQLite execution works when apply_on_sqlite=True."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'sqlite'
|
|
execute_sql_job(
|
migration_name=self.migration_name,
|
sql=self.sql,
|
apply_on_sqlite=True,
|
)
|
|
migration = AsyncMigrationStatus.objects.get(name=self.migration_name)
|
assert migration.status == AsyncMigrationStatus.STATUS_FINISHED
|
mock_cursor.execute.assert_called_once_with(self.sql)
|
|
@patch('core.migration_helpers.connection')
|
def test_marks_error_on_exception(self, mock_connection):
|
"""Test that exceptions are caught and migration is marked as ERROR."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'postgresql'
|
mock_cursor.execute.side_effect = Exception('Test error')
|
|
with pytest.raises(Exception, match='Test error'):
|
execute_sql_job(migration_name=self.migration_name, sql=self.sql)
|
|
migration = AsyncMigrationStatus.objects.get(name=self.migration_name)
|
assert migration.status == AsyncMigrationStatus.STATUS_ERROR
|
assert migration.meta['error'] == 'Test error'
|
|
@patch('core.migration_helpers.connection')
|
def test_reverse_does_not_create_status(self, mock_connection):
|
"""Test that reverse migrations don't create/update AsyncMigrationStatus."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'postgresql'
|
|
execute_sql_job(
|
migration_name=self.migration_name,
|
sql=self.sql,
|
reverse=True,
|
)
|
|
# No status should be created
|
assert not AsyncMigrationStatus.objects.filter(name=self.migration_name).exists()
|
mock_cursor.execute.assert_called_once_with(self.sql)
|
|
@patch('core.migration_helpers.connection')
|
def test_reverse_skips_sqlite_when_requested(self, mock_connection):
|
"""Test that reverse migrations skip SQLite when apply_on_sqlite=False."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'sqlite'
|
|
execute_sql_job(
|
migration_name=self.migration_name,
|
sql=self.sql,
|
apply_on_sqlite=False,
|
reverse=True,
|
)
|
|
mock_cursor.execute.assert_not_called()
|
|
@patch('core.migration_helpers.connection')
|
def test_reverse_raises_on_exception(self, mock_connection):
|
"""Test that reverse migrations raise exceptions properly."""
|
mock_cursor = MagicMock()
|
mock_connection.cursor.return_value.__enter__.return_value = mock_cursor
|
mock_connection.vendor = 'postgresql'
|
mock_cursor.execute.side_effect = Exception('Test reverse error')
|
|
with pytest.raises(Exception, match='Test reverse error'):
|
execute_sql_job(
|
migration_name=self.migration_name,
|
sql=self.sql,
|
reverse=True,
|
)
|
|
|
class TestMakeSqlMigration(TestCase):
|
"""Test make_sql_migration function."""
|
|
def setUp(self):
|
self.sql_forwards = 'CREATE INDEX test_idx ON test_table (col1);'
|
self.sql_backwards = 'DROP INDEX test_idx;'
|
self.migration_name = 'test.migrations.test_migration'
|
|
def test_requires_migration_name(self):
|
"""Test that migration_name is required."""
|
with pytest.raises(ValueError, match='explicit migration_name'):
|
make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
)
|
|
@override_settings(ALLOW_SCHEDULED_MIGRATIONS=False)
|
@patch('core.migration_helpers.start_job_async_or_sync')
|
def test_executes_immediately_when_scheduled_disabled(self, mock_start):
|
"""Test that migration executes immediately when ALLOW_SCHEDULED_MIGRATIONS=False."""
|
forwards, backwards = make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
migration_name=self.migration_name,
|
)
|
|
apps = MagicMock()
|
schema_editor = MagicMock()
|
schema_editor.connection.vendor = 'postgresql'
|
|
forwards(apps, schema_editor)
|
|
mock_start.assert_called_once()
|
args, kwargs = mock_start.call_args
|
assert kwargs['migration_name'] == self.migration_name
|
assert kwargs['sql'] == self.sql_forwards
|
assert kwargs['reverse'] is False
|
|
@override_settings(ALLOW_SCHEDULED_MIGRATIONS=True)
|
def test_creates_scheduled_status_when_enabled(self):
|
"""Test that SCHEDULED status is created when ALLOW_SCHEDULED_MIGRATIONS=True."""
|
forwards, backwards = make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
migration_name=self.migration_name,
|
execute_immediately=False,
|
)
|
|
apps = MagicMock()
|
apps.get_model.return_value = AsyncMigrationStatus
|
schema_editor = MagicMock()
|
schema_editor.connection.vendor = 'postgresql'
|
|
forwards(apps, schema_editor)
|
|
migration = AsyncMigrationStatus.objects.get(name=self.migration_name)
|
assert migration.status == AsyncMigrationStatus.STATUS_SCHEDULED
|
|
@override_settings(ALLOW_SCHEDULED_MIGRATIONS=True)
|
@patch('core.migration_helpers.start_job_async_or_sync')
|
def test_executes_immediately_when_forced(self, mock_start):
|
"""Test that migration executes immediately when execute_immediately=True."""
|
forwards, backwards = make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
migration_name=self.migration_name,
|
execute_immediately=True,
|
)
|
|
apps = MagicMock()
|
schema_editor = MagicMock()
|
schema_editor.connection.vendor = 'postgresql'
|
|
forwards(apps, schema_editor)
|
|
mock_start.assert_called_once()
|
args, kwargs = mock_start.call_args
|
assert kwargs['migration_name'] == self.migration_name
|
assert kwargs['sql'] == self.sql_forwards
|
|
def test_skips_sqlite_when_requested(self):
|
"""Test that SQLite is skipped when apply_on_sqlite=False."""
|
forwards, backwards = make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
migration_name=self.migration_name,
|
apply_on_sqlite=False,
|
)
|
|
apps = MagicMock()
|
schema_editor = MagicMock()
|
schema_editor.connection.vendor = 'sqlite'
|
|
# Should return early without creating status
|
forwards(apps, schema_editor)
|
|
assert not AsyncMigrationStatus.objects.filter(name=self.migration_name).exists()
|
|
@patch('core.migration_helpers.start_job_async_or_sync')
|
def test_backwards_always_executes(self, mock_start):
|
"""Test that backwards migration always executes immediately."""
|
forwards, backwards = make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
migration_name=self.migration_name,
|
)
|
|
apps = MagicMock()
|
schema_editor = MagicMock()
|
schema_editor.connection.vendor = 'postgresql'
|
|
backwards(apps, schema_editor)
|
|
mock_start.assert_called_once()
|
args, kwargs = mock_start.call_args
|
assert kwargs['migration_name'] == self.migration_name
|
assert kwargs['sql'] == self.sql_backwards
|
assert kwargs['reverse'] is True
|
|
@patch('core.migration_helpers.start_job_async_or_sync')
|
def test_passes_apply_on_sqlite_parameter(self, mock_start):
|
"""Test that apply_on_sqlite parameter is passed to execute_sql_job."""
|
forwards, backwards = make_sql_migration(
|
self.sql_forwards,
|
self.sql_backwards,
|
migration_name=self.migration_name,
|
apply_on_sqlite=True,
|
execute_immediately=True,
|
)
|
|
apps = MagicMock()
|
schema_editor = MagicMock()
|
schema_editor.connection.vendor = 'postgresql'
|
|
forwards(apps, schema_editor)
|
|
args, kwargs = mock_start.call_args
|
assert kwargs['apply_on_sqlite'] is True
|