"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import copy import io import zipfile import pytest import requests_mock import ujson as json from projects.models import Project from rest_framework.authtoken.models import Token from tasks.models import Annotation, Prediction, Task def post_data_as_format(setup, format_type, body, archive, multiply_files): # post as data if format_type == 'json_data': return setup.post(setup.urls.task_bulk, data=body, content_type='application/json') # post as files if format_type == 'json_file': files = {f'upload_file{i}.json': io.StringIO(body) for i in range(0, multiply_files)} elif format_type == 'csv_file': files = {f'upload_file{i}.csv': io.StringIO(body) for i in range(0, multiply_files)} elif format_type == 'tsv_file': files = {f'upload_file{i}.tsv': io.StringIO(body) for i in range(0, multiply_files)} elif format_type == 'txt_file': files = {f'upload_file{i}.txt': io.StringIO(body) for i in range(0, multiply_files)} else: raise Exception('Incorrect task data format to post') # zip: take files below and zip them if 'zip' in archive: file = io.BytesIO() ref = zipfile.ZipFile(file, mode='w', compression=zipfile.ZIP_DEFLATED) [ref.writestr(name, body.read()) for name, body in files.items()] ref.close() file.seek(0, 0) files = {'upload_file.zip': file} # replicate zip file x2 if 'zip_x2' == archive: files.update({'upload_file2.zip': copy.deepcopy(file)}) return setup.post(setup.urls.task_bulk, files) @pytest.mark.parametrize('multiply_files', [1, 5]) @pytest.mark.parametrize('format_type', ['json_file', 'json_data']) @pytest.mark.parametrize( 'tasks, status_code, task_count', [ ([{'data': {'dialog': 'some'}}], 201, 1), ([{'data': {'dialog': 'some'}}] * 10, 201, 10), ([{'data': {'another_field': 'some', 'dialog': 'some'}}], 201, 1), ([{'data': {'dialog': 123}, 'created_at': 123}], 201, 1), ([{'data': {'another_field': 'some'}}] * 10, 400, 0), ([{'data': {}}], 400, 0), ([{'data': None}], 400, 0), (None, 400, 0), ([{'data': 'string'}], 400, 0), ([{}, {}], 400, 0), ([{}], 400, 0), ({}, 400, 0), ([], 400, 0), ([{'dialog': 'some'}] * 10, 201, 10), ({'dialog': 'some'}, 201, 1), ([{'dialog': 'some', 'second_field': 123}] * 10, 201, 10), ([{'none': 'some', 'second_field': 123}] * 10, 400, 0), ], ) @pytest.mark.django_db def test_json_task_upload(setup_project_dialog, format_type, tasks, status_code, task_count, multiply_files): """Upload JSON as file and data with one task to project. Decorator pytest.mark.django_db means it will be clean DB setup_project_dialog for this test. """ if format_type == 'json_data' and multiply_files > 1: pytest.skip('Senseless parameter combination') r = post_data_as_format(setup_project_dialog, format_type, json.dumps(tasks), 'none', multiply_files) print(f'Create json {format_type} tasks result:', r.content) assert r.status_code == status_code, f'Upload tasks failed. Response data: {r.data}' assert Task.objects.filter(project=setup_project_dialog.project.id).count() == task_count * multiply_files @pytest.mark.parametrize( 'tasks, status_code, task_count, annotation_count', [ ([{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}]}] * 10, 201, 10, 10), ( [{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}], 'ground_truth': True}]}], 201, 1, 1, ), ([{'data': {'dialog': 'Test'}, 'annotations': [{'result': '123'}]}], 400, 0, 0), ([{'data': {'dialog': 'Test'}, 'meta': 'test'}] * 10, 400, 0, 0), ([{'data': {'dialog': 'Test'}, 'annotations': 'test'}] * 10, 400, 0, 0), ([{'data': {'dialog': 'Test'}, 'annotations': [{'trash': '123'}]}] * 10, 400, 0, 0), ], ) @pytest.mark.django_db def test_json_task_annotation_and_meta_upload(setup_project_dialog, tasks, status_code, task_count, annotation_count): """Upload JSON task with annotation to project""" format_type = 'json_file' multiply_files = 1 r = post_data_as_format(setup_project_dialog, format_type, json.dumps(tasks), 'none', multiply_files) print('Create json tasks with annotations result:', r.content) assert r.status_code == status_code, 'Upload one task with annotation failed' # tasks tasks_db = Task.objects.filter(project=setup_project_dialog.project.id) assert tasks_db.count() == task_count * multiply_files for task in tasks_db: assert task.is_labeled, 'Task should be labeled' # annotations annotations = Annotation.objects.filter(task__project=setup_project_dialog.project.id) assert annotations.count() == annotation_count * multiply_files for i, annotation in enumerate(annotations): assert annotation.ground_truth @pytest.mark.parametrize( 'tasks, status_code, task_count, prediction_count', [ ( [ { 'data': {'dialog': 'Test'}, 'predictions': [ { 'result': [ { 'id': '123', 'from_name': 'answer', 'to_name': 'dialog', 'type': 'textarea', 'value': {'text': ['Test prediction']}, } ], 'model_version': 'test', } ], } ], 201, 1, 1, ), ([{'data': {'dialog': 'Test'}, 'predictions': [{'WRONG_FIELD': '123'}]}], 400, 0, 0), ], ) @pytest.mark.django_db def test_json_task_predictions(setup_project_dialog, tasks, status_code, task_count, prediction_count): """Upload JSON task with predictions to project""" r = post_data_as_format(setup_project_dialog, 'json_file', json.dumps(tasks), 'none', 1) assert r.status_code == status_code, 'Upload one task with prediction failed' # predictions predictions = Prediction.objects.filter(project=setup_project_dialog.project.id) assert predictions.count() == prediction_count for i, predictions in enumerate(predictions): assert predictions.model_version == 'test' @pytest.mark.parametrize('multiply_files', [1, 5]) @pytest.mark.parametrize('archive', ['none']) @pytest.mark.parametrize('format_type', ['json_file']) @pytest.mark.parametrize( 'tasks, status_code, task_count, annotation_count', [ ( [{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}, {'result': [{'id': '456'}]}]}] * 10, 201, 10, 20, ), ([{'data': {'dialog': 'Test'}, 'annotations': [{'trash': '123'}]}] * 10, 400, 0, 0), ], ) @pytest.mark.django_db def test_archives( setup_project_dialog, format_type, tasks, status_code, task_count, annotation_count, archive, multiply_files ): """Upload JSON task with annotation to project""" multiplier = (2 if 'zip_x2' == archive else 1) * multiply_files r = post_data_as_format(setup_project_dialog, format_type, json.dumps(tasks), archive, multiply_files) print('Create json tasks with annotations result:', r.content) assert r.status_code == status_code, 'Upload one task with annotation failed' # tasks tasks = Task.objects.filter(project=setup_project_dialog.project.id) assert tasks.count() == task_count * multiplier for task in tasks: assert task.is_labeled, 'Task should be labeled' # annotations annotations = Annotation.objects.filter(task__project=setup_project_dialog.project.id) assert annotations.count() == annotation_count * multiplier for annotation in annotations: assert annotation.ground_truth @pytest.mark.parametrize('multiply_files', [1, 5]) @pytest.mark.parametrize('archive', ['none']) @pytest.mark.parametrize('format_type', ['csv_file', 'tsv_file']) @pytest.mark.parametrize( 'tasks, status_code, task_count', [ ('dialog,second\ndialog 1,second 1\ndialog 2,second 2', 201, 2), ('dialog,second,class\ndialog 1, second 2, class 1', 201, 1), ('here_is_error_in_column_count,second\ndialog 1, second 1, class 1', 400, 0), ('empty_rows\n', 400, 0), ('', 400, 0), ], ) @pytest.mark.django_db def test_csv_tsv_task_upload( setup_project_dialog, format_type, tasks, status_code, task_count, archive, multiply_files ): """Upload CSV/TSV with one task to project""" multiplier = (2 if 'zip_x2' == archive else 1) * multiply_files tasks = tasks if format_type == 'csv_file' else tasks.replace(',', '\t') # prepare tsv file from csv r = post_data_as_format(setup_project_dialog, format_type, tasks, archive, multiply_files) print(f'Create {format_type} tasks result:', r.content) assert r.status_code == status_code, f'Upload one task {format_type} failed. Response data: {r.data}' assert Task.objects.filter(project=setup_project_dialog.project.id).count() == task_count * multiplier @pytest.mark.parametrize('multiply_files', [1, 5]) @pytest.mark.parametrize('format_type', ['txt_file']) @pytest.mark.parametrize('tasks, status_code, task_count', [('my text 1\nmy text 2\nmy text 3', 201, 3), ('', 400, 0)]) @pytest.mark.django_db def test_txt_task_upload(setup_project_dialog, format_type, tasks, status_code, task_count, multiply_files): """Upload CSV/TSV with one task to project""" multiplier = multiply_files r = post_data_as_format(setup_project_dialog, format_type, tasks, 'none', multiply_files) print(f'Create {format_type} tasks result:', r.content) assert r.status_code == status_code, f'Upload one task {format_type} failed. Response data: {r.data}' assert Task.objects.filter(project=setup_project_dialog.project.id).count() == task_count * multiplier @pytest.mark.parametrize( 'tasks, status_code, task_count, max_duration', [([{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}]}] * 1000, 201, 1000, 30)], ) @pytest.mark.django_db def test_upload_duration(setup_project_dialog, tasks, status_code, task_count, max_duration): """Upload JSON task with annotation to project""" r = post_data_as_format(setup_project_dialog, 'json_data', json.dumps(tasks), 'none', 1) print('Create json tasks with annotations result:', r.content) assert r.status_code == status_code, ('Upload one task with annotation failed', r.content) # tasks tasks = Task.objects.filter(project=setup_project_dialog.project.id) assert tasks.count() == task_count for task in tasks: assert task.is_labeled, 'Task should be labeled' # check max duration result = json.loads(r.content) assert result['duration'] < max_duration, 'Max duration of adding tasks is exceeded' @pytest.mark.parametrize( 'tasks, status_code, task_count', [([{'data': {'dialog': 'Test'}, 'annotations': [{'result': [{'id': '123'}]}]}] * 100, 201, 100)], ) @pytest.mark.django_db def test_url_upload(mocker, setup_project_dialog, tasks, status_code, task_count): """Upload tasks from URL""" with requests_mock.Mocker(real_http=True) as m: url = 'http://localhost:8111/test.json' m.get(url, text=json.dumps(tasks), headers={'Content-Length': '100'}) r = setup_project_dialog.post( setup_project_dialog.urls.task_bulk, data='url=' + url, content_type='application/x-www-form-urlencoded' ) assert r.status_code == status_code, 'Upload URL failed: ' + str(r.content) # tasks tasks = Task.objects.filter(project=setup_project_dialog.project.id) assert tasks.count() == task_count for task in tasks: assert task.is_labeled, 'Task should be labeled since annotation is ground_truth' @pytest.mark.parametrize( 'tasks, status_code, task_count, bad_token', [([{'dialog': 'Test'}] * 1, 201, 1, False), ([{'dialog': 'Test'}] * 1, 401, 0, True)], ) @pytest.mark.django_db def test_upload_with_token(setup_project_for_token, tasks, status_code, task_count, bad_token): """Upload with Django Token""" setup = setup_project_for_token token = Token.objects.get(user=setup.user) token = 'Token ' + str(token) broken_token = 'Token broken' data = setup.project_config data['organization_pk'] = setup.org.pk r = setup.post(setup.urls.project_create, data=data, HTTP_AUTHORIZATION=token) print('Project create with status code:', r.status_code, r.content) assert r.status_code == 201, 'Create project result should be redirect to the next page: ' + str(r.content) project = Project.objects.filter(title=setup.project_config['title']).first() setup.urls.set_project(project.pk) r = setup.post( setup.urls.task_bulk, data=json.dumps(tasks), content_type='application/json', HTTP_AUTHORIZATION=broken_token if bad_token else token, ) assert r.status_code == status_code, 'Create json tasks result: ' + str(r.content) # tasks tasks = Task.objects.filter(project=project.id) assert tasks.count() == task_count