chenzhaoyang
2025-12-17 063da0bf961e1d35e25dc107f883f7492f4c5a7c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from unittest.mock import patch
 
import pytest
from data_import.functions import _async_import_background_streaming
from data_import.models import FileUpload
from data_import.uploader import load_tasks_for_async_import_streaming
from django.core.files.base import ContentFile
from organizations.tests.factories import OrganizationFactory
from projects.models import ProjectImport
from projects.tests.factories import ProjectFactory
from rest_framework.exceptions import ValidationError
from users.tests.factories import UserFactory
 
pytestmark = pytest.mark.django_db
 
 
@pytest.fixture
def user():
    return UserFactory()
 
 
@pytest.fixture
def organization():
    return OrganizationFactory()
 
 
@pytest.fixture
def project(user, organization):
    # Simple config with a single text field
    return ProjectFactory(
        created_by=user,
        organization=organization,
        label_config='<View><Text name="text" value="$text"/></View>',
    )
 
 
def create_file_upload(user, project, body: bytes, name: str):
    return FileUpload.objects.create(user=user, project=project, file=ContentFile(body, name=name))
 
 
class TestJSONStreamingReader:
    def test_array_of_objects_wraps_data(self, user, project):
        content = b'[{"text":"A"},{"text":"B"}]'
        fu = FileUpload.objects.create(user=user, project=project, file=ContentFile(content, name='tasks.json'))
 
        batches = list(fu.read_tasks_list_from_json_streaming(batch_size=1))
        # Flatten
        tasks = [t for batch in batches for t in batch]
 
        assert len(tasks) == 2
        assert tasks[0]['data'] == {'text': 'A'}
        assert tasks[1]['data'] == {'text': 'B'}
 
    def test_array_of_objects_with_data_preserved(self, user, project):
        content = b'[{"data":{"text":"A"}},{"data":{"text":"B"}}]'
        fu = FileUpload.objects.create(user=user, project=project, file=ContentFile(content, name='tasks.json'))
 
        batches = list(fu.read_tasks_list_from_json_streaming(batch_size=2))
        tasks = [t for batch in batches for t in batch]
 
        assert len(tasks) == 2
        assert tasks[0]['data'] == {'text': 'A'}
        assert tasks[1]['data'] == {'text': 'B'}
 
    def test_single_object(self, user, project):
        content = b'{"text":"A"}'
        fu = FileUpload.objects.create(user=user, project=project, file=ContentFile(content, name='task.json'))
 
        batches = list(fu.read_tasks_list_from_json_streaming(batch_size=10))
        assert len(batches) == 1
        assert len(batches[0]) == 1
        assert batches[0][0]['data'] == {'text': 'A'}
 
    def test_invalid_array_of_strings_raises(self, user, project):
        content = b'["A","B"]'
        fu = FileUpload.objects.create(user=user, project=project, file=ContentFile(content, name='tasks.json'))
 
        with pytest.raises(ValidationError):
            list(fu.read_tasks_list_from_json_streaming(batch_size=2))
 
    def test_invalid_top_level_type_raises(self, user, project):
        content = b'"A"'
        fu = FileUpload.objects.create(user=user, project=project, file=ContentFile(content, name='tasks.json'))
 
        with pytest.raises(ValidationError):
            list(fu.read_tasks_list_from_json_streaming(batch_size=2))
 
    def test_read_tasks_streaming_batches(self, user, project):
        items = ','.join([f'{{"text":"T{i}"}}' for i in range(7)]).encode('utf-8')
        content = b'[' + items + b']'
        fu = FileUpload.objects.create(user=user, project=project, file=ContentFile(content, name='tasks.json'))
 
        batches = list(fu.read_tasks_streaming(batch_size=3))
        # Expect 3,3,1
        sizes = [len(b) for b in batches]
        assert sizes == [3, 3, 1]
        # Spot check the first element
        assert batches[0][0]['data'] == {'text': 'T0'}
 
 
class TestEndToEndStreamingFromUploads:
    def test_load_tasks_from_uploaded_files_streaming_real_files(self, user, project):
        content1 = b'[{"text":"A1"},{"text":"A2"},{"text":"A3"},{"text":"A4"}]'
        content2 = b'[{"text":"B1"},{"text":"B2"},{"text":"B3"},{"text":"B4"}]'
        FileUpload.objects.create(user=user, project=project, file=ContentFile(content1, name='a.json'))
        FileUpload.objects.create(user=user, project=project, file=ContentFile(content2, name='b.json'))
 
        gen = FileUpload.load_tasks_from_uploaded_files_streaming(project, batch_size=3)
        batches = list(gen)
 
        # Expect 3,3,2 tasks across two files aggregated by batch size
        sizes = [len(b[0]) for b in batches]
        assert sizes == [3, 3, 2]
 
        # All tasks must have file_upload_id set
        for batch_tasks, _, _ in batches:
            for t in batch_tasks:
                assert 'file_upload_id' in t
                assert t['data'] and isinstance(t['data'], dict)
 
 
class TestLoadTasksForAsyncImportStreaming:
    def test_from_file_upload_ids_batches_and_metadata(self, user, project, settings):
        settings.IMPORT_BATCH_SIZE = 3
        fu1 = create_file_upload(user, project, b'[{"text":"A1"},{"text":"A2"},{"text":"A3"}]', 'a.json')
        fu2 = create_file_upload(user, project, b'[{"text":"B1"},{"text":"B2"},{"text":"B3"},{"text":"B4"}]', 'b.json')
 
        pimport = ProjectImport.objects.create(project=project, file_upload_ids=[fu1.id, fu2.id])
 
        gen = load_tasks_for_async_import_streaming(pimport, user, batch_size=3)
        batches = list(gen)
 
        # Expect 3,3,1 aggregated across files with batch size 3
        sizes = [len(b[0]) for b in batches]
        assert sizes == [3, 3, 1]
 
        # Each batch returns file_upload_ids and metadata
        for batch_tasks, file_upload_ids, found_formats, data_columns in batches:
            assert file_upload_ids == [fu1.id, fu2.id]
            assert isinstance(found_formats, dict)
            assert isinstance(data_columns, list)
 
    def test_from_url_with_json_string(self, user, project, settings):
        settings.IMPORT_BATCH_SIZE = 10
        # Use JSON-in-URL mode (no network). Function will create an inplace file upload and parse it.
        url_json = '[{"text":"U1"},{"text":"U2"}]'
        pimport = ProjectImport.objects.create(project=project, url=url_json)
 
        gen = load_tasks_for_async_import_streaming(pimport, user, batch_size=2)
        batches = list(gen)
 
        assert len(batches) == 1
        tasks, file_upload_ids, found_formats, data_columns = batches[0]
        assert [t['data'] for t in tasks] == [{'text': 'U1'}, {'text': 'U2'}]
        assert isinstance(file_upload_ids, list)
        assert isinstance(found_formats, dict)
        assert isinstance(data_columns, list)
 
    def test_from_tasks_inline(self, user, project, settings):
        settings.IMPORT_BATCH_SIZE = 2
        tasks = [{'data': {'text': 'T1'}}, {'data': {'text': 'T2'}}, {'data': {'text': 'T3'}}]
        pimport = ProjectImport.objects.create(project=project, tasks=tasks)
 
        gen = load_tasks_for_async_import_streaming(pimport, user, batch_size=2)
        batches = list(gen)
 
        sizes = [len(b[0]) for b in batches]
        assert sizes == [2, 1]
 
 
class TestAsyncImportBackgroundStreaming:
    @patch('data_import.functions.flag_set', return_value=False)
    def test_counts_and_status_without_commit(self, mock_flag, user, project, settings):
        settings.IMPORT_BATCH_SIZE = 4
        fu1 = create_file_upload(user, project, b'[{"text":"A1"},{"text":"A2"}]', 'a.json')
        fu2 = create_file_upload(user, project, b'[{"text":"B1"},{"text":"B2"},{"text":"B3"}]', 'b.json')
 
        pimport = ProjectImport.objects.create(
            project=project,
            file_upload_ids=[fu1.id, fu2.id],
            commit_to_project=False,
            return_task_ids=False,
        )
 
        _async_import_background_streaming(pimport, user)
 
        pimport.refresh_from_db()
        assert pimport.status == ProjectImport.Status.COMPLETED
        assert pimport.task_count == 5
        # found_formats and data_columns are populated
        assert isinstance(pimport.found_formats, dict)
        assert pimport.found_formats.get('.json')
        assert isinstance(pimport.data_columns, (list, set))