Bin
2025-12-16 7423b0c6e1959f30a7e8e453e953310f32ce13c6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import copy
import io
import zipfile
 
import pytest
import requests_mock
import ujson as json
from projects.models import Project
from rest_framework.authtoken.models import Token
from tasks.models import Annotation, Prediction, Task
 
 
def post_data_as_format(setup, format_type, body, archive, multiply_files):
    # post as data
    if format_type == 'json_data':
        return setup.post(setup.urls.task_bulk, data=body, content_type='application/json')
 
    # post as files
    if format_type == 'json_file':
        files = {f'upload_file{i}.json': io.StringIO(body) for i in range(0, multiply_files)}
    elif format_type == 'csv_file':
        files = {f'upload_file{i}.csv': io.StringIO(body) for i in range(0, multiply_files)}
    elif format_type == 'tsv_file':
        files = {f'upload_file{i}.tsv': io.StringIO(body) for i in range(0, multiply_files)}
    elif format_type == 'txt_file':
        files = {f'upload_file{i}.txt': io.StringIO(body) for i in range(0, multiply_files)}
    else:
        raise Exception('Incorrect task data format to post')
 
    # zip: take files below and zip them
    if 'zip' in archive:
        file = io.BytesIO()
        ref = zipfile.ZipFile(file, mode='w', compression=zipfile.ZIP_DEFLATED)
        [ref.writestr(name, body.read()) for name, body in files.items()]
 
        ref.close()
        file.seek(0, 0)
        files = {'upload_file.zip': file}
 
        # replicate zip file x2
        if 'zip_x2' == archive:
            files.update({'upload_file2.zip': copy.deepcopy(file)})
 
    return setup.post(setup.urls.task_bulk, files)
 
 
@pytest.mark.parametrize('multiply_files', [1, 5])
@pytest.mark.parametrize('format_type', ['json_file', 'json_data'])
@pytest.mark.parametrize(
    'tasks, status_code, task_count',
    [
        ([{'data': {'dialog': 'some'}}], 201, 1),
        ([{'data': {'dialog': 'some'}}] * 10, 201, 10),
        ([{'data': {'another_field': 'some', 'dialog': 'some'}}], 201, 1),
        ([{'data': {'dialog': 123}, 'created_at': 123}], 201, 1),
        ([{'data': {'another_field': 'some'}}] * 10, 400, 0),
        ([{'data': {}}], 400, 0),
        ([{'data': None}], 400, 0),
        (None, 400, 0),
        ([{'data': 'string'}], 400, 0),
        ([{}, {}], 400, 0),
        ([{}], 400, 0),
        ({}, 400, 0),
        ([], 400, 0),
        ([{'dialog': 'some'}] * 10, 201, 10),
        ({'dialog': 'some'}, 201, 1),
        ([{'dialog': 'some', 'second_field': 123}] * 10, 201, 10),
        ([{'none': 'some', 'second_field': 123}] * 10, 400, 0),
    ],
)
@pytest.mark.django_db
def test_json_task_upload(setup_project_dialog, format_type, tasks, status_code, task_count, multiply_files):
    """Upload JSON as file and data with one task to project.
    Decorator pytest.mark.django_db means it will be clean DB setup_project_dialog for this test.
    """
    if format_type == 'json_data' and multiply_files > 1:
        pytest.skip('Senseless parameter combination')
 
    r = post_data_as_format(setup_project_dialog, format_type, json.dumps(tasks), 'none', multiply_files)
    print(f'Create json {format_type} tasks result:', r.content)
    assert r.status_code == status_code, f'Upload tasks failed. Response data: {r.data}'
    assert Task.objects.filter(project=setup_project_dialog.project.id).count() == task_count * multiply_files
 
 
@pytest.mark.parametrize(
    'tasks, status_code, task_count, annotation_count',
    [
        ([{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}]}] * 10, 201, 10, 10),
        (
            [{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}], 'ground_truth': True}]}],
            201,
            1,
            1,
        ),
        ([{'data': {'dialog': 'Test'}, 'annotations': [{'result': '123'}]}], 400, 0, 0),
        ([{'data': {'dialog': 'Test'}, 'meta': 'test'}] * 10, 400, 0, 0),
        ([{'data': {'dialog': 'Test'}, 'annotations': 'test'}] * 10, 400, 0, 0),
        ([{'data': {'dialog': 'Test'}, 'annotations': [{'trash': '123'}]}] * 10, 400, 0, 0),
    ],
)
@pytest.mark.django_db
def test_json_task_annotation_and_meta_upload(setup_project_dialog, tasks, status_code, task_count, annotation_count):
    """Upload JSON task with annotation to project"""
    format_type = 'json_file'
    multiply_files = 1
 
    r = post_data_as_format(setup_project_dialog, format_type, json.dumps(tasks), 'none', multiply_files)
    print('Create json tasks with annotations result:', r.content)
    assert r.status_code == status_code, 'Upload one task with annotation failed'
 
    # tasks
    tasks_db = Task.objects.filter(project=setup_project_dialog.project.id)
    assert tasks_db.count() == task_count * multiply_files
    for task in tasks_db:
        assert task.is_labeled, 'Task should be labeled'
 
    # annotations
    annotations = Annotation.objects.filter(task__project=setup_project_dialog.project.id)
    assert annotations.count() == annotation_count * multiply_files
    for i, annotation in enumerate(annotations):
        assert annotation.ground_truth
 
 
@pytest.mark.parametrize(
    'tasks, status_code, task_count, prediction_count',
    [
        (
            [
                {
                    'data': {'dialog': 'Test'},
                    'predictions': [
                        {
                            'result': [
                                {
                                    'id': '123',
                                    'from_name': 'answer',
                                    'to_name': 'dialog',
                                    'type': 'textarea',
                                    'value': {'text': ['Test prediction']},
                                }
                            ],
                            'model_version': 'test',
                        }
                    ],
                }
            ],
            201,
            1,
            1,
        ),
        ([{'data': {'dialog': 'Test'}, 'predictions': [{'WRONG_FIELD': '123'}]}], 400, 0, 0),
    ],
)
@pytest.mark.django_db
def test_json_task_predictions(setup_project_dialog, tasks, status_code, task_count, prediction_count):
    """Upload JSON task with predictions to project"""
    r = post_data_as_format(setup_project_dialog, 'json_file', json.dumps(tasks), 'none', 1)
    assert r.status_code == status_code, 'Upload one task with prediction failed'
 
    # predictions
    predictions = Prediction.objects.filter(project=setup_project_dialog.project.id)
    assert predictions.count() == prediction_count
    for i, predictions in enumerate(predictions):
        assert predictions.model_version == 'test'
 
 
@pytest.mark.parametrize('multiply_files', [1, 5])
@pytest.mark.parametrize('archive', ['none'])
@pytest.mark.parametrize('format_type', ['json_file'])
@pytest.mark.parametrize(
    'tasks, status_code, task_count, annotation_count',
    [
        (
            [{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}, {'result': [{'id': '456'}]}]}]
            * 10,
            201,
            10,
            20,
        ),
        ([{'data': {'dialog': 'Test'}, 'annotations': [{'trash': '123'}]}] * 10, 400, 0, 0),
    ],
)
@pytest.mark.django_db
def test_archives(
    setup_project_dialog, format_type, tasks, status_code, task_count, annotation_count, archive, multiply_files
):
    """Upload JSON task with annotation to project"""
    multiplier = (2 if 'zip_x2' == archive else 1) * multiply_files
 
    r = post_data_as_format(setup_project_dialog, format_type, json.dumps(tasks), archive, multiply_files)
    print('Create json tasks with annotations result:', r.content)
    assert r.status_code == status_code, 'Upload one task with annotation failed'
 
    # tasks
    tasks = Task.objects.filter(project=setup_project_dialog.project.id)
    assert tasks.count() == task_count * multiplier
    for task in tasks:
        assert task.is_labeled, 'Task should be labeled'
 
    # annotations
    annotations = Annotation.objects.filter(task__project=setup_project_dialog.project.id)
    assert annotations.count() == annotation_count * multiplier
    for annotation in annotations:
        assert annotation.ground_truth
 
 
@pytest.mark.parametrize('multiply_files', [1, 5])
@pytest.mark.parametrize('archive', ['none'])
@pytest.mark.parametrize('format_type', ['csv_file', 'tsv_file'])
@pytest.mark.parametrize(
    'tasks, status_code, task_count',
    [
        ('dialog,second\ndialog 1,second 1\ndialog 2,second 2', 201, 2),
        ('dialog,second,class\ndialog 1, second 2, class 1', 201, 1),
        ('here_is_error_in_column_count,second\ndialog 1, second 1, class 1', 400, 0),
        ('empty_rows\n', 400, 0),
        ('', 400, 0),
    ],
)
@pytest.mark.django_db
def test_csv_tsv_task_upload(
    setup_project_dialog, format_type, tasks, status_code, task_count, archive, multiply_files
):
    """Upload CSV/TSV with one task to project"""
    multiplier = (2 if 'zip_x2' == archive else 1) * multiply_files
 
    tasks = tasks if format_type == 'csv_file' else tasks.replace(',', '\t')  # prepare tsv file from csv
    r = post_data_as_format(setup_project_dialog, format_type, tasks, archive, multiply_files)
    print(f'Create {format_type} tasks result:', r.content)
 
    assert r.status_code == status_code, f'Upload one task {format_type} failed. Response data: {r.data}'
    assert Task.objects.filter(project=setup_project_dialog.project.id).count() == task_count * multiplier
 
 
@pytest.mark.parametrize('multiply_files', [1, 5])
@pytest.mark.parametrize('format_type', ['txt_file'])
@pytest.mark.parametrize('tasks, status_code, task_count', [('my text 1\nmy text 2\nmy text 3', 201, 3), ('', 400, 0)])
@pytest.mark.django_db
def test_txt_task_upload(setup_project_dialog, format_type, tasks, status_code, task_count, multiply_files):
    """Upload CSV/TSV with one task to project"""
    multiplier = multiply_files
 
    r = post_data_as_format(setup_project_dialog, format_type, tasks, 'none', multiply_files)
    print(f'Create {format_type} tasks result:', r.content)
 
    assert r.status_code == status_code, f'Upload one task {format_type} failed. Response data: {r.data}'
    assert Task.objects.filter(project=setup_project_dialog.project.id).count() == task_count * multiplier
 
 
@pytest.mark.parametrize(
    'tasks, status_code, task_count, max_duration',
    [([{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}]}] * 1000, 201, 1000, 30)],
)
@pytest.mark.django_db
def test_upload_duration(setup_project_dialog, tasks, status_code, task_count, max_duration):
    """Upload JSON task with annotation to project"""
    r = post_data_as_format(setup_project_dialog, 'json_data', json.dumps(tasks), 'none', 1)
    print('Create json tasks with annotations result:', r.content)
    assert r.status_code == status_code, ('Upload one task with annotation failed', r.content)
 
    # tasks
    tasks = Task.objects.filter(project=setup_project_dialog.project.id)
    assert tasks.count() == task_count
    for task in tasks:
        assert task.is_labeled, 'Task should be labeled'
 
    # check max duration
    result = json.loads(r.content)
    assert result['duration'] < max_duration, 'Max duration of adding tasks is exceeded'
 
 
@pytest.mark.parametrize(
    'tasks, status_code, task_count',
    [([{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}]}] * 100, 201, 100)],
)
@pytest.mark.django_db
def test_url_upload(mocker, setup_project_dialog, tasks, status_code, task_count):
    """Upload tasks from URL"""
    with requests_mock.Mocker(real_http=True) as m:
        url = 'http://localhost:8111/test.json'
        m.get(url, text=json.dumps(tasks), headers={'Content-Length': '100'})
        r = setup_project_dialog.post(
            setup_project_dialog.urls.task_bulk, data='url=' + url, content_type='application/x-www-form-urlencoded'
        )
        assert r.status_code == status_code, 'Upload URL failed: ' + str(r.content)
 
        # tasks
        tasks = Task.objects.filter(project=setup_project_dialog.project.id)
        assert tasks.count() == task_count
        for task in tasks:
            assert task.is_labeled, 'Task should be labeled since annotation is ground_truth'
 
 
@pytest.mark.parametrize(
    'tasks, status_code, task_count, bad_token',
    [([{'dialog': 'Test'}] * 1, 201, 1, False), ([{'dialog': 'Test'}] * 1, 401, 0, True)],
)
@pytest.mark.django_db
def test_upload_with_token(setup_project_for_token, tasks, status_code, task_count, bad_token):
    """Upload with Django Token"""
    setup = setup_project_for_token
    token = Token.objects.get(user=setup.user)
    token = 'Token ' + str(token)
    broken_token = 'Token broken'
    data = setup.project_config
    data['organization_pk'] = setup.org.pk
    r = setup.post(setup.urls.project_create, data=data, HTTP_AUTHORIZATION=token)
    print('Project create with status code:', r.status_code, r.content)
    assert r.status_code == 201, 'Create project result should be redirect to the next page: ' + str(r.content)
 
    project = Project.objects.filter(title=setup.project_config['title']).first()
    setup.urls.set_project(project.pk)
 
    r = setup.post(
        setup.urls.task_bulk,
        data=json.dumps(tasks),
        content_type='application/json',
        HTTP_AUTHORIZATION=broken_token if bad_token else token,
    )
    assert r.status_code == status_code, 'Create json tasks result: ' + str(r.content)
 
    # tasks
    tasks = Task.objects.filter(project=project.id)
    assert tasks.count() == task_count