Bin
2025-12-17 d616898802dfe7e5dd648bcf53c6d1f86b6d3642
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
import json
 
import boto3
import mock
import pytest
from io_storages.models import S3ImportStorage
from io_storages.s3.models import S3ImportStorageLink
from io_storages.tests.factories import (
    AzureBlobImportStorageFactory,
    GCSImportStorageFactory,
    RedisImportStorageFactory,
    S3ImportStorageFactory,
)
from io_storages.utils import StorageObject, load_tasks_json
from moto import mock_s3
from projects.tests.factories import ProjectFactory
from rest_framework.test import APIClient
from tests.utils import azure_client_mock, gcs_client_mock, mock_feature_flag, redis_client_mock
 
 
@pytest.fixture(name='fflag_feat_root_11_support_jsonl_cloud_storage_on')
def fflag_feat_root_11_support_jsonl_cloud_storage_on():
    from core.feature_flags import flag_set
 
    def fake_flag_set(*args, **kwargs):
        if args[0] == 'fflag_feat_root_11_support_jsonl_cloud_storage':
            return True
        return flag_set(*args, **kwargs)
 
    with mock.patch('io_storages.utils.flag_set', wraps=fake_flag_set):
        yield
 
 
@pytest.fixture(name='fflag_feat_root_11_support_jsonl_cloud_storage_off')
def fflag_feat_root_11_support_jsonl_cloud_storage_off():
    from core.feature_flags import flag_set
 
    def fake_flag_set(*args, **kwargs):
        if args[0] == 'fflag_feat_root_11_support_jsonl_cloud_storage':
            return False
        return flag_set(*args, **kwargs)
 
    with mock.patch('io_storages.utils.flag_set', wraps=fake_flag_set):
        yield
 
 
#
# Integration tests for storage.sync()
#
 
pytestmark = pytest.mark.django_db
 
 
@pytest.fixture
def project():
    return ProjectFactory()
 
 
@pytest.fixture(scope='module')
def common_task_data():
    return [
        {'data': {'image_url': 'http://ggg.com/image.jpg', 'text': 'Task 1 text'}},
        {'data': {'image_url': 'http://ggg.com/image2.jpg', 'text': 'Task 2 text'}},
    ]
 
 
def _test_storage_import(project, storage_class, task_data, **storage_kwargs):
    """Helper to test import for a specific storage type"""
    client = APIClient()
    client.force_authenticate(user=project.created_by)
 
    # Setup storage with required credentials
    storage = storage_class(project=project, **storage_kwargs)
 
    # Save the storage to the database before syncing
    storage.save()
 
    # Validate connection before sync
    try:
        storage.validate_connection()
    except Exception as e:
        pytest.fail(f'Storage connection validation failed: {str(e)}')
 
    # Sync storage
    # Mock redis_connected to force synchronous execution in tests
    import mock
 
    with mock.patch('io_storages.base_models.redis_connected', return_value=False):
        storage.sync()
 
    # Validate tasks were imported correctly
    tasks_response = client.get(f'/api/tasks?project={project.id}')
    assert tasks_response.status_code == 200
    tasks = tasks_response.json()['tasks']
    assert len(tasks) == len(task_data)
 
    # Validate task content
    for task, expected_data in zip(tasks, task_data):
        assert task['data'] == expected_data['data']
 
 
def test_import_multiple_tasks_s3(project, common_task_data):
    with mock_s3():
        # Setup S3 bucket and test data
        s3 = boto3.client('s3', region_name='us-east-1')
        bucket_name = 'pytest-s3-jsons'
        s3.create_bucket(Bucket=bucket_name)
 
        # Put test data into S3
        s3.put_object(Bucket=bucket_name, Key='test.json', Body=json.dumps(common_task_data))
 
        _test_storage_import(
            project,
            S3ImportStorageFactory,
            common_task_data,
            bucket='pytest-s3-jsons',
            aws_access_key_id='example',
            aws_secret_access_key='example',
            use_blob_urls=False,
            recursive_scan=True,
        )
 
 
def test_import_multiple_tasks_gcs(project, common_task_data):
    # initialize mock with sample data
    with gcs_client_mock():
        _test_storage_import(
            project,
            GCSImportStorageFactory,
            common_task_data,
            # magic bucket name to set correct data in gcs_client_mock
            bucket='multitask_JSON',
            use_blob_urls=False,
            recursive_scan=True,
        )
 
 
def test_import_multiple_tasks_azure(project, common_task_data):
    # initialize mock with sample data
    with azure_client_mock(sample_json_contents=common_task_data, sample_blob_names=['test.json']):
        _test_storage_import(
            project,
            AzureBlobImportStorageFactory,
            common_task_data,
            use_blob_urls=False,
            recursive_scan=True,
        )
 
 
def test_import_multiple_tasks_redis(project, common_task_data):
    with redis_client_mock() as redis:
        redis.set('test.json', json.dumps(common_task_data))
 
        _test_storage_import(
            project,
            RedisImportStorageFactory,
            common_task_data,
            path='',
            use_blob_urls=False,
        )
 
 
def test_storagelink_fields(project, common_task_data):
    # use an actual storage and storagelink to test this, since factories aren't connected properly
    with mock_s3():
        # Setup S3 bucket and test data
        s3 = boto3.client('s3', region_name='us-east-1')
        bucket_name = 'pytest-s3-jsons'
        s3.create_bucket(Bucket=bucket_name)
 
        # Put test data into S3
        s3.put_object(Bucket=bucket_name, Key='test.json', Body=json.dumps(common_task_data))
 
        # create a real storage and sync it
        storage = S3ImportStorage(
            project=project,
            bucket=bucket_name,
            aws_access_key_id='example',
            aws_secret_access_key='example',
            use_blob_urls=False,
            recursive_scan=True,
        )
        storage.save()
        storage.sync()
 
        # check that the storage link fields are set correctly
        storage_links = S3ImportStorageLink.objects.filter(storage=storage).order_by('task_id')
        assert storage_links[0].row_index == 0
        assert storage_links[0].row_group is None
        assert storage_links[1].row_index == 1
        assert storage_links[1].row_group is None
 
 
#
# Unit tests for load_tasks_json()
#
 
 
@pytest.fixture
def storage():
    project = ProjectFactory(
        label_config="""
        <View>
          <Text name="text" value="$text"/>
          <Labels name="label" toName="text">
            <Label value="FIELD" background="red"/>
            <Label value="ACTION" background="blue"/>
          </Labels>
        </View>
        """
    )
    storage = S3ImportStorage(
        project=project,
        bucket='example',
        aws_access_key_id='example',
        aws_secret_access_key='example',
        use_blob_urls=False,
    )
    storage.save()
    return project, storage
 
 
def create_tasks(storage, params_list: list[StorageObject]):
    project, storage = storage
    # check that no errors are raised during task creation; not checking the task itself
    for params in params_list:
        _ = S3ImportStorage.add_task(project, 1, 0, storage, params, S3ImportStorageLink)
 
 
# Test data
bare_task_list = [
    {
        'text': 'Test task 1',
    },
    {
        'text': 'Test task 2',
    },
]
 
annots_preds_task_list = [
    {
        'data': {'text': 'Machine learning models require high-quality labeled data.'},
        'annotations': [
            {
                'result': [
                    {
                        'value': {'start': 0, 'end': 22, 'text': 'Machine learning models', 'labels': ['FIELD']},
                        'from_name': 'label',
                        'to_name': 'text',
                        'type': 'labels',
                    },
                    {
                        'value': {'start': 44, 'end': 56, 'text': 'labeled data', 'labels': ['ACTION']},
                        'from_name': 'label',
                        'to_name': 'text',
                        'type': 'labels',
                    },
                ]
            }
        ],
        'predictions': [
            {
                'result': [
                    {
                        'value': {'start': 0, 'end': 22, 'text': 'Machine learning models', 'labels': ['FIELD']},
                        'from_name': 'label',
                        'to_name': 'text',
                        'type': 'labels',
                    }
                ]
            }
        ],
    },
    {'data': {'text': 'Prosper annotation helps improve model accuracy.'}},
]
 
 
def test_bare_task(storage):
    task_data = bare_task_list[0]
 
    blob = json.dumps(task_data).encode()
    output = load_tasks_json(blob, 'test.json')
    expected_output = [StorageObject(key='test.json', task_data=task_data)]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
def test_data_key(storage):
    task_data = {'data': bare_task_list[0]}
 
    blob = json.dumps(task_data).encode()
    output = load_tasks_json(blob, 'test.json')
    expected_output = [StorageObject(key='test.json', task_data=task_data)]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
def test_1elem_list(storage):
    task_data = bare_task_list[:1]
 
    blob = json.dumps(task_data).encode()
    output = load_tasks_json(blob, 'test.json')
    expected_output = [
        StorageObject(key='test.json', task_data=task_data[0], row_index=0),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
def test_2elem_list(storage):
    task_data = bare_task_list
 
    blob = json.dumps(task_data).encode()
    output = load_tasks_json(blob, 'test.json')
    expected_output = [
        StorageObject(key='test.json', task_data=task_data[0], row_index=0),
        StorageObject(key='test.json', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
def test_preds_and_annots_list(storage):
    task_data = annots_preds_task_list
 
    blob = json.dumps(task_data).encode()
    output = load_tasks_json(blob, 'test.json')
 
    expected_output = [
        StorageObject(key='test.json', task_data=task_data[0], row_index=0),
        StorageObject(key='test.json', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
def test_mixed_formats(storage):
    task_data = [bare_task_list[0], annots_preds_task_list[0]]
 
    blob = json.dumps(task_data).encode()
    output = load_tasks_json(blob, 'test.json')
 
    expected_output = [
        StorageObject(key='test.json', task_data=task_data[0], row_index=0),
        StorageObject(key='test.json', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
@mock_feature_flag('fflag_feat_root_11_support_jsonl_cloud_storage', True, 'io_storages.utils')
def test_list_jsonl(storage):
    task_data = bare_task_list
 
    blob = '\n'.join([json.dumps(task) for task in task_data]).encode()
    output = load_tasks_json(blob, 'test.jsonl')
    expected_output = [
        StorageObject(key='test.jsonl', task_data=task_data[0], row_index=0),
        StorageObject(key='test.jsonl', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
@mock_feature_flag('fflag_feat_root_11_support_jsonl_cloud_storage', True, 'io_storages.utils')
def test_list_jsonl_with_preds_and_annots(storage):
    task_data = annots_preds_task_list
 
    blob = '\n'.join([json.dumps(task) for task in task_data]).encode()
    output = load_tasks_json(blob, 'test.jsonl')
 
    expected_output = [
        StorageObject(key='test.jsonl', task_data=task_data[0], row_index=0),
        StorageObject(key='test.jsonl', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
@mock_feature_flag('fflag_feat_root_11_support_jsonl_cloud_storage', False, 'io_storages.utils')
def test_ff_blocks_jsonl():
    with pytest.raises(ValueError):
        list(load_tasks_json(b'{"text": "Test task 1"}\n{"text": "Test task 2"}', 'test.jsonl'))
 
 
@mock_feature_flag('fflag_feat_root_11_support_jsonl_cloud_storage', True, 'io_storages.utils')
def test_mixed_formats_jsonl(storage):
    task_data = [bare_task_list[0], annots_preds_task_list[0]]
 
    blob = '\n'.join([json.dumps(task) for task in task_data]).encode()
    output = load_tasks_json(blob, 'test.jsonl')
 
    expected_output = [
        StorageObject(key='test.jsonl', task_data=task_data[0], row_index=0),
        StorageObject(key='test.jsonl', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
@mock_feature_flag('fflag_feat_root_11_support_jsonl_cloud_storage', True, 'io_storages.utils')
def test_list_jsonl_with_datetimes(storage):
    task_data = [
        {'data': {'text': 'Test task 1', 'created_at': '2021-01-01T00:00:00Z'}},
        {'data': {'text': 'Test task 2', 'created_at': '2021-01-02T00:00:00Z'}},
    ]
 
    blob = '\n'.join([json.dumps(task) for task in task_data]).encode()
    output = load_tasks_json(blob, 'test.jsonl')
    expected_output = [
        StorageObject(key='test.jsonl', task_data=task_data[0], row_index=0),
        StorageObject(key='test.jsonl', task_data=task_data[1], row_index=1),
    ]
    assert list(output) == expected_output
 
    create_tasks(storage, list(output))
 
 
def test_allow_skip_false_is_saved(storage):
    project, s3_storage = storage
    task_data = {
        'data': {'text': 'Task with disallowed skip'},
        'allow_skip': False,
    }
    params = StorageObject(key='test.json', task_data=task_data)
    # Create one task via cloud import pathway
    task = S3ImportStorage.add_task(project, 1, 1, s3_storage, params, S3ImportStorageLink)
    assert task.allow_skip is False