from unittest.mock import patch from core.migration_helpers import execute_sql_job from core.models import AsyncMigrationStatus from django.contrib import admin as dj_admin from django.test import RequestFactory, TestCase from users.admin import AsyncMigrationStatusAdmin class TestAdminRunScheduledMigrations(TestCase): def setUp(self): self.factory = RequestFactory() self.request = self.factory.get('/') # Avoid Django messages in tests self.admin = AsyncMigrationStatusAdmin(AsyncMigrationStatus, dj_admin.site) self.admin.message_user = lambda *args, **kwargs: None @patch('core.redis.start_job_async_or_sync') def test_success_dispatches_job_and_updates_status(self, mock_start): m = AsyncMigrationStatus.objects.create( name='label_studio.tasks.migrations.0059_task_completion_id_updated_at_idx_async', status=AsyncMigrationStatus.STATUS_SCHEDULED, ) qs = AsyncMigrationStatus.objects.filter(pk=m.pk) self.admin.run_scheduled_migrations(self.request, qs) m.refresh_from_db() assert m.status == AsyncMigrationStatus.STATUS_STARTED assert mock_start.called args, kwargs = mock_start.call_args # First positional arg is the job function assert args and args[0] is execute_sql_job # Ensure correct kwargs passed to the job assert kwargs.get('migration_name') == m.name assert isinstance(kwargs.get('sql'), str) assert kwargs.get('reverse') is False @patch('core.redis.start_job_async_or_sync') def test_invalid_path_marks_error(self, mock_start): m = AsyncMigrationStatus.objects.create( name='0059_task_completion_id_updated_at_idx_async', status=AsyncMigrationStatus.STATUS_SCHEDULED, ) qs = AsyncMigrationStatus.objects.filter(pk=m.pk) self.admin.run_scheduled_migrations(self.request, qs) m.refresh_from_db() assert m.status == AsyncMigrationStatus.STATUS_ERROR assert 'import path' in (m.meta or {}).get('error', '') mock_start.assert_not_called() @patch('core.redis.start_job_async_or_sync') def test_import_error_marks_error(self, mock_start): m = AsyncMigrationStatus.objects.create( name='label_studio.nonexistentapp.migrations.nonexistent_migration', status=AsyncMigrationStatus.STATUS_SCHEDULED, ) qs = AsyncMigrationStatus.objects.filter(pk=m.pk) self.admin.run_scheduled_migrations(self.request, qs) m.refresh_from_db() assert m.status == AsyncMigrationStatus.STATUS_ERROR assert 'Import error' in (m.meta or {}).get('error', '') mock_start.assert_not_called()