"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import json import pytest from django.db import transaction from io_storages.azure_blob.models import ( AzureBlobImportStorage, AzureBlobImportStorageLink, ) from io_storages.gcs.models import GCSImportStorage, GCSImportStorageLink from io_storages.localfiles.models import ( LocalFilesImportStorage, LocalFilesImportStorageLink, ) from io_storages.redis.models import RedisImportStorage, RedisImportStorageLink from io_storages.s3.models import S3ImportStorage, S3ImportStorageLink from projects.models import Project from ..utils import make_annotation, make_prediction, make_task, project_id # noqa @pytest.mark.parametrize( 'tasks_count, annotations_count, predictions_count', [ [10, 2, 2], ], ) @pytest.mark.django_db def test_action_delete_all_tasks(tasks_count, annotations_count, predictions_count, business_client, project_id): # create payload = dict(project=project_id, data={'test': 1}) response = business_client.post( '/api/dm/views/', data=json.dumps(payload), content_type='application/json', ) assert response.status_code == 201, response.content response.json()['id'] project = Project.objects.get(pk=project_id) for _ in range(0, tasks_count): task_id = make_task({'data': {}}, project).id print('TASK_ID: %s' % task_id) for _ in range(0, annotations_count): print('COMPLETION') make_annotation({'result': []}, task_id) for _ in range(0, predictions_count): make_prediction({'result': []}, task_id) with transaction.atomic(): business_client.post( f'/api/dm/actions?project={project_id}&id=delete_tasks', json={'selectedItems': {'all': True, 'excluded': []}}, ) assert project.tasks.count() == 0 @pytest.mark.parametrize( 'tasks_count, annotations_count, predictions_count', [ [10, 2, 2], ], ) @pytest.mark.django_db def test_action_delete_all_annotations(tasks_count, annotations_count, predictions_count, business_client, project_id): # create payload = dict(project=project_id, data={'test': 1}) response = business_client.post( '/api/dm/views/', data=json.dumps(payload), content_type='application/json', ) assert response.status_code == 201, response.content response.json()['id'] project = Project.objects.get(pk=project_id) for _ in range(0, tasks_count): task_id = make_task({'data': {}}, project).id print('TASK_ID: %s' % task_id) for _ in range(0, annotations_count): print('COMPLETION') make_annotation({'result': []}, task_id) for _ in range(0, predictions_count): make_prediction({'result': []}, task_id) # get next task - should be 0 status = business_client.post( f'/api/dm/actions?project={project_id}&id=next_task', json={'selectedItems': {'all': True, 'excluded': []}}, ) assert status.status_code == 404 business_client.post( f'/api/dm/actions?project={project_id}&id=delete_tasks_annotations', json={'selectedItems': {'all': True, 'excluded': []}}, ) # get next task - should be 1 status = business_client.post( f'/api/dm/actions?project={project_id}&id=next_task', json={'selectedItems': {'all': True, 'excluded': []}}, ) assert status.status_code == 200 @pytest.mark.django_db @pytest.mark.parametrize( 'storage_model, link_model', [ (AzureBlobImportStorage, AzureBlobImportStorageLink), (GCSImportStorage, GCSImportStorageLink), (S3ImportStorage, S3ImportStorageLink), (LocalFilesImportStorage, LocalFilesImportStorageLink), (RedisImportStorage, RedisImportStorageLink), ], ) def test_action_remove_duplicates(business_client, project_id, storage_model, link_model): # Setup project = Project.objects.get(pk=project_id) storage = storage_model.objects.create(project=project) # task 1: add not a duplicated task task_data = {'data': {'image': 'normal.jpg'}} task1 = make_task(task_data, project) # task 2: add duplicated task, no annotations task_data = {'data': {'image': 'duplicated.jpg'}} make_task(task_data, project) # task 3: add duplicated task, with annotations task3 = make_task(task_data, project) for _ in range(3): make_annotation({'result': []}, task3.id) # task 4: add duplicated task, with storage link and one annotation task4 = make_task(task_data, project) make_annotation({'result': []}, task4.id) # this task would have row_index=0 instead of None if it was created after multitask support was added link_model.objects.create(task=task4, key='duplicated.jpg', storage=storage) # task 5: add a non-duplicated task using the same key, ensuring multiple tasks in the same key don't interfere different_task_data = {'data': {'image': 'normal2.jpg'}} task5 = make_task(different_task_data, project) link_model.objects.create(task=task5, key='duplicated.jpg', row_index=1, storage=storage) # task 6: add duplicated task with a different storage link task6 = make_task(task_data, project) link_model.objects.create(task=task6, key='duplicated2.jpg', storage=storage) # task 7: add duplicated task with a different storage link task7 = make_task(task_data, project) link_model.objects.create(task=task7, key='duplicated3.jpg', storage=storage) # call the "remove duplicated tasks" action status = business_client.post( f'/api/dm/actions?project={project_id}&id=remove_duplicates', json={'selectedItems': {'all': True, 'excluded': []}}, ) # As the result, we should have only 3 tasks left: # task 1, task 5, and task 3 with storage link copied from task 4 assert list(project.tasks.order_by('id').values_list('id', flat=True)) == [ task1.id, task3.id, task5.id, ] assert status.status_code == 200 assert link_model.objects.count() == 2 assert project.annotations.count() == 4 assert project.tasks.count() == 3 @pytest.mark.django_db def test_action_remove_duplicates_with_annotations(business_client, project_id): """This test checks that the "remove_duplicates" action works correctly when there are annotations distributed among multiple duplicated tasks. Remove duplicates should keep the task with the first task with annotations, link other annotations to the first task and remove the excess tasks. """ # Setup project = Project.objects.get(pk=project_id) storage = S3ImportStorage.objects.create(project=project) # task 1: add not a duplicated task task_data = {'data': {'image': 'normal.jpg'}} task1 = make_task(task_data, project) # task 2: add duplicated task, no annotations task_data = {'data': {'image': 'duplicated.jpg'}} task2 = make_task(task_data, project) make_annotation({'result': []}, task2.id) make_annotation({'result': [], 'was_cancelled': True}, task2.id) # task 3: add duplicated task, with annotations task3 = make_task(task_data, project) for _ in range(3): make_annotation({'result': []}, task3.id) # task 4: add duplicated task, with storage link and one annotation task4 = make_task(task_data, project) make_annotation({'result': []}, task4.id) S3ImportStorageLink.objects.create(task=task4, key='duplicated.jpg', storage=storage) # call the "remove duplicated tasks" action status = business_client.post( f'/api/dm/actions?project={project_id}&id=remove_duplicates', json={'selectedItems': {'all': True, 'excluded': []}}, ) # as the result, we should have only 2 tasks left: # task 1 and task 3 with storage link copied from task 4 assert list(project.tasks.order_by('id').values_list('id', flat=True)) == [ task1.id, task2.id, ], 'tasks ids wrong' assert status.status_code == 200, 'status code wrong' assert S3ImportStorageLink.objects.count() == 1, 'storage links count wrong' assert project.annotations.count() == 6, 'annotations count wrong' assert project.annotations.filter(was_cancelled=True).count() == 1, 'was_cancelled counter wrong' assert project.tasks.count() == 2, 'tasks count wrong' assert task1.annotations.count() == 0, 'task1 annotations count wrong' assert task2.annotations.count() == 6, 'task2 annotations count wrong' assert task2.annotations.filter(was_cancelled=True).count() == 1, 'was_cancelled counter wrong' @pytest.mark.django_db def test_action_cache_labels(business_client, project_id): """This test checks that the "cache_labels" action works correctly when there are annotations distributed among multiple tasks. """ # Setup project = Project.objects.get(pk=project_id) # task 1: add a task with specific labels task_data = {'data': {'image': 'image1.jpg'}} task1 = make_task(task_data, project) make_annotation( { 'result': [ { 'from_name': 'label1', 'to_name': 'image', 'type': 'labels', 'value': {'labels': ['Car']}, } ] }, task1.id, ) # task 2: add a task with different labels task_data = {'data': {'image': 'image2.jpg'}} task2 = make_task(task_data, project) make_annotation( { 'result': [ { 'from_name': 'label1', 'to_name': 'image', 'type': 'labels', 'value': {'labels': ['Car']}, }, { 'from_name': 'label1', 'to_name': 'image', 'type': 'labels', 'value': {'labels': ['Car', 'Airplane']}, }, ] }, task2.id, ) # call the "cache_labels" action with counters status = business_client.post( f'/api/dm/actions?project={project_id}&id=cache_labels', data=json.dumps( { 'selectedItems': {'all': True, 'excluded': []}, 'control_tag': 'label1', 'with_counters': 'yes', } ), content_type='application/json', ) # Assertions # Replace these with the actual assertions for your cache_labels function tasks = project.tasks assert status.status_code == 200, 'status code wrong' assert tasks.count() == 2, 'tasks count wrong' assert tasks.get(id=task1.id).data.get('cache_label1') == 'Car: 1', 'cache_label1 wrong for task 1' assert tasks.get(id=task2.id).data.get('cache_label1') == 'Airplane: 1, Car: 2', 'cache_label1 wrong for task 2' # call the "cache_labels" action without counters status = business_client.post( f'/api/dm/actions?project={project_id}&id=cache_labels', data=json.dumps( { 'selectedItems': {'all': True, 'excluded': []}, 'control_tag': 'label1', 'with_counters': 'no', } ), content_type='application/json', ) # Assertions # Replace these with the actual assertions for your cache_labels function assert status.status_code == 200, 'status code wrong' assert tasks.get(id=task1.id).data.get('cache_label1') == 'Car', 'cache_label1 wrong for task 1' assert tasks.get(id=task2.id).data.get('cache_label1') == 'Airplane, Car', 'cache_label1 wrong for task 2'