Bin
2025-12-17 611bfe34c3c96199eaaf6cf9e41a75892e44e879
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from unittest.mock import patch
 
from core.migration_helpers import execute_sql_job
from core.models import AsyncMigrationStatus
from django.contrib import admin as dj_admin
from django.test import RequestFactory, TestCase
from users.admin import AsyncMigrationStatusAdmin
 
 
class TestAdminRunScheduledMigrations(TestCase):
    def setUp(self):
        self.factory = RequestFactory()
        self.request = self.factory.get('/')
        # Avoid Django messages in tests
        self.admin = AsyncMigrationStatusAdmin(AsyncMigrationStatus, dj_admin.site)
        self.admin.message_user = lambda *args, **kwargs: None
 
    @patch('core.redis.start_job_async_or_sync')
    def test_success_dispatches_job_and_updates_status(self, mock_start):
        m = AsyncMigrationStatus.objects.create(
            name='label_studio.tasks.migrations.0059_task_completion_id_updated_at_idx_async',
            status=AsyncMigrationStatus.STATUS_SCHEDULED,
        )
 
        qs = AsyncMigrationStatus.objects.filter(pk=m.pk)
        self.admin.run_scheduled_migrations(self.request, qs)
 
        m.refresh_from_db()
        assert m.status == AsyncMigrationStatus.STATUS_STARTED
        assert mock_start.called
        args, kwargs = mock_start.call_args
        # First positional arg is the job function
        assert args and args[0] is execute_sql_job
        # Ensure correct kwargs passed to the job
        assert kwargs.get('migration_name') == m.name
        assert isinstance(kwargs.get('sql'), str)
        assert kwargs.get('reverse') is False
 
    @patch('core.redis.start_job_async_or_sync')
    def test_invalid_path_marks_error(self, mock_start):
        m = AsyncMigrationStatus.objects.create(
            name='0059_task_completion_id_updated_at_idx_async',
            status=AsyncMigrationStatus.STATUS_SCHEDULED,
        )
 
        qs = AsyncMigrationStatus.objects.filter(pk=m.pk)
        self.admin.run_scheduled_migrations(self.request, qs)
 
        m.refresh_from_db()
        assert m.status == AsyncMigrationStatus.STATUS_ERROR
        assert 'import path' in (m.meta or {}).get('error', '')
        mock_start.assert_not_called()
 
    @patch('core.redis.start_job_async_or_sync')
    def test_import_error_marks_error(self, mock_start):
        m = AsyncMigrationStatus.objects.create(
            name='label_studio.nonexistentapp.migrations.nonexistent_migration',
            status=AsyncMigrationStatus.STATUS_SCHEDULED,
        )
 
        qs = AsyncMigrationStatus.objects.filter(pk=m.pk)
        self.admin.run_scheduled_migrations(self.request, qs)
 
        m.refresh_from_db()
        assert m.status == AsyncMigrationStatus.STATUS_ERROR
        assert 'Import error' in (m.meta or {}).get('error', '')
        mock_start.assert_not_called()