"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
|
"""
|
|
import logging
|
from collections import defaultdict
|
|
import ujson as json
|
from core.label_config import replace_task_data_undefined_with_config_field
|
from core.permissions import AllPermissions
|
from core.redis import start_job_async_or_sync
|
from data_manager.actions import DataManagerAction
|
from data_manager.actions.basic import delete_tasks
|
from io_storages.azure_blob.models import AzureBlobImportStorageLink
|
from io_storages.gcs.models import GCSImportStorageLink
|
from io_storages.localfiles.models import LocalFilesImportStorageLink
|
from io_storages.redis.models import RedisImportStorageLink
|
from io_storages.s3.models import S3ImportStorageLink
|
from rest_framework.exceptions import ValidationError
|
from tasks.models import Task
|
|
logger = logging.getLogger(__name__)
|
all_permissions = AllPermissions()
|
|
|
def remove_duplicates(project, queryset, **kwargs):
|
"""Remove duplicated tasks with the same data fields:
|
Duplicated tasks will be deleted and all annotations will be moved to the first of the duplicated tasks.
|
Storage links will be restored for the first task.
|
"""
|
start_job_async_or_sync(
|
remove_duplicates_job,
|
project,
|
queryset,
|
organization_id=project.organization_id,
|
)
|
return {'response_code': 200}
|
|
|
def remove_duplicates_job(project, queryset, **kwargs):
|
"""Job for start_job_async_or_sync"""
|
duplicates = find_duplicated_tasks_by_data(project, queryset)
|
restore_storage_links_for_duplicated_tasks(duplicates)
|
move_annotations(duplicates)
|
remove_duplicated_tasks(duplicates, project, queryset)
|
|
# totally update tasks counters
|
project._update_tasks_counters_and_task_states(
|
project.tasks.all(),
|
maximum_annotations_changed=True,
|
overlap_cohort_percentage_changed=True,
|
tasks_number_changed=False,
|
from_scratch=True,
|
)
|
|
|
def remove_duplicated_tasks(duplicates, project, queryset):
|
"""Remove duplicated tasks from queryset with condition that they don't have annotations
|
|
:param duplicates: dict with duplicated tasks
|
:param project: Project instance
|
:param queryset: queryset with input tasks
|
:return: queryset with tasks which should be kept
|
"""
|
removing = []
|
# prepare main tasks which won't be deleted
|
for data in duplicates:
|
root = duplicates[data]
|
if len(root) == 1:
|
continue
|
|
one_task_saved = False
|
new_root = []
|
for task in root:
|
# keep all tasks with annotations in safety
|
if task['total_annotations'] + task['cancelled_annotations'] > 0:
|
one_task_saved = True
|
else:
|
new_root.append(task)
|
|
for task in new_root:
|
# keep the first task in safety
|
if not one_task_saved:
|
one_task_saved = True
|
# remove all other tasks
|
else:
|
removing.append(task['id'])
|
|
# get the final queryset for removing tasks
|
queryset = queryset.filter(id__in=removing, annotations__isnull=True)
|
kept = queryset.exclude(id__in=removing, annotations__isnull=True)
|
|
# check that we don't remove tasks with annotations
|
if queryset.count() != len(removing):
|
raise ValidationError(
|
f'Remove duplicates failed, operation is not finished: '
|
f'queryset count {queryset.count()} != removing {len(removing)}. '
|
'It means that some of duplicated tasks have been annotated twice or more.'
|
)
|
|
delete_tasks(project, queryset)
|
logger.info(f'Removed {len(removing)} duplicated tasks')
|
return kept
|
|
|
def move_annotations(duplicates):
|
"""Move annotations to the first task from duplicated tasks"""
|
total_moved_annotations = 0
|
|
for data in duplicates:
|
root = duplicates[data]
|
if len(root) == 1:
|
continue
|
|
# find a task with annotations, make it as "first" main one
|
i, first = 0, root[0]
|
for i, task in enumerate(root):
|
first = task
|
if task['total_annotations'] + task['cancelled_annotations'] > 0:
|
break
|
|
# move annotations to the first task
|
for task in root[i + 1 :]:
|
if task['total_annotations'] + task['cancelled_annotations'] > 0:
|
Task.objects.get(id=task['id']).annotations.update(task_id=first['id'])
|
total_moved_annotations += task['total_annotations'] + task['cancelled_annotations']
|
logger.info(
|
f"Moved {task['total_annotations']} annotations from task {task['id']} to task {first['id']}"
|
)
|
task['total_annotations'] = 0
|
task['cancelled_annotations'] = 0
|
|
|
def restore_storage_links_for_duplicated_tasks(duplicates) -> None:
|
"""Build storage links for duplicated tasks and save them to Task in DB"""
|
|
# storage classes
|
classes = {
|
'io_storages_s3importstoragelink': S3ImportStorageLink,
|
'io_storages_gcsimportstoragelink': GCSImportStorageLink,
|
'io_storages_azureblobimportstoragelink': AzureBlobImportStorageLink,
|
'io_storages_localfilesimportstoragelink': LocalFilesImportStorageLink,
|
'io_storages_redisimportstoragelink': RedisImportStorageLink,
|
# 'lse_io_storages_lses3importstoragelink' # not supported yet
|
}
|
|
total_restored_links = 0
|
for data in list(duplicates):
|
tasks = duplicates[data]
|
|
def _get_storagelink(task):
|
for link in classes:
|
if link_id := task.get(link):
|
return classes[link], link_id
|
return None
|
|
# find first task with existing StorageLink
|
tasks_without_storagelinks = []
|
tasks_with_storagelinks = []
|
for task in tasks:
|
if _get_storagelink(task):
|
tasks_with_storagelinks.append(task)
|
else:
|
tasks_without_storagelinks.append(task)
|
|
# add storage links to duplicates
|
if tasks_with_storagelinks:
|
# we don't support case when there are many storage links in duplicated tasks
|
storage_link_class, storage_link_id = _get_storagelink(tasks_with_storagelinks[0])
|
# get already existing StorageLink
|
link_instance = storage_link_class.objects.get(id=storage_link_id)
|
|
for task in tasks_without_storagelinks:
|
# assign existing StorageLink to other duplicated tasks
|
link = storage_link_class(
|
task_id=task['id'],
|
key=link_instance.key,
|
row_index=link_instance.row_index,
|
row_group=link_instance.row_group,
|
storage=link_instance.storage,
|
)
|
link.save()
|
total_restored_links += 1
|
logger.info(
|
f"Restored storage link for task {task['id']} from source task {tasks_with_storagelinks[0]['id']}"
|
)
|
|
logger.info(f'Restored {total_restored_links} storage links for duplicated tasks')
|
|
|
def find_duplicated_tasks_by_data(project, queryset):
|
"""Find duplicated tasks by `task.data` and return them as a dict"""
|
|
# get io_storage_* links for tasks, we need to copy them
|
storages = []
|
for field in dir(Task):
|
if field.startswith('io_storages_'):
|
storages += [field]
|
|
groups = defaultdict(list)
|
tasks = list(queryset.values('data', 'id', 'total_annotations', 'cancelled_annotations', *storages))
|
logger.info(f'Retrieved {len(tasks)} tasks from queryset')
|
|
for task in list(tasks):
|
replace_task_data_undefined_with_config_field(task['data'], project)
|
task['data'] = json.dumps(task['data'])
|
groups[task['data']].append(task)
|
|
# make groups of duplicated ids for info print
|
duplicates = {d: groups[d] for d in groups if len(groups[d]) > 1}
|
info = {d: [task['id'] for task in duplicates[d]] for d in duplicates}
|
|
logger.info(f'Found {len(duplicates)} duplicated tasks')
|
logger.info(f'Duplicated tasks: {info}')
|
return duplicates
|
|
|
actions: list[DataManagerAction] = [
|
{
|
'entry_point': remove_duplicates,
|
'permission': [all_permissions.projects_change, all_permissions.tasks_delete],
|
'title': 'Remove Duplicated Tasks',
|
'order': 95,
|
'experimental': False,
|
'dialog': {
|
'text': (
|
'Confirm that you want to remove duplicated tasks with the same data fields. '
|
'Duplicated tasks will be deleted and all annotations will be moved to the first task from duplicated tasks. '
|
'Also Source Storage Links will be restored if at least one duplicated task has a storage link. '
|
"Warning: Task assignments (enterprise only) won't be saved."
|
),
|
'type': 'confirm',
|
},
|
},
|
]
|