Bin
2025-12-17 1442f92732d7c5311a627a7ba3aaa0bb8ffc539f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
 
import logging
from collections import defaultdict
 
import ujson as json
from core.label_config import replace_task_data_undefined_with_config_field
from core.permissions import AllPermissions
from core.redis import start_job_async_or_sync
from data_manager.actions import DataManagerAction
from data_manager.actions.basic import delete_tasks
from io_storages.azure_blob.models import AzureBlobImportStorageLink
from io_storages.gcs.models import GCSImportStorageLink
from io_storages.localfiles.models import LocalFilesImportStorageLink
from io_storages.redis.models import RedisImportStorageLink
from io_storages.s3.models import S3ImportStorageLink
from rest_framework.exceptions import ValidationError
from tasks.models import Task
 
logger = logging.getLogger(__name__)
all_permissions = AllPermissions()
 
 
def remove_duplicates(project, queryset, **kwargs):
    """Remove duplicated tasks with the same data fields:
    Duplicated tasks will be deleted and all annotations will be moved to the first of the duplicated tasks.
    Storage links will be restored for the first task.
    """
    start_job_async_or_sync(
        remove_duplicates_job,
        project,
        queryset,
        organization_id=project.organization_id,
    )
    return {'response_code': 200}
 
 
def remove_duplicates_job(project, queryset, **kwargs):
    """Job for start_job_async_or_sync"""
    duplicates = find_duplicated_tasks_by_data(project, queryset)
    restore_storage_links_for_duplicated_tasks(duplicates)
    move_annotations(duplicates)
    remove_duplicated_tasks(duplicates, project, queryset)
 
    # totally update tasks counters
    project._update_tasks_counters_and_task_states(
        project.tasks.all(),
        maximum_annotations_changed=True,
        overlap_cohort_percentage_changed=True,
        tasks_number_changed=False,
        from_scratch=True,
    )
 
 
def remove_duplicated_tasks(duplicates, project, queryset):
    """Remove duplicated tasks from queryset with condition that they don't have annotations
 
    :param duplicates: dict with duplicated tasks
    :param project: Project instance
    :param queryset: queryset with input tasks
    :return: queryset with tasks which should be kept
    """
    removing = []
    # prepare main tasks which won't be deleted
    for data in duplicates:
        root = duplicates[data]
        if len(root) == 1:
            continue
 
        one_task_saved = False
        new_root = []
        for task in root:
            # keep all tasks with annotations in safety
            if task['total_annotations'] + task['cancelled_annotations'] > 0:
                one_task_saved = True
            else:
                new_root.append(task)
 
        for task in new_root:
            # keep the first task in safety
            if not one_task_saved:
                one_task_saved = True
            # remove all other tasks
            else:
                removing.append(task['id'])
 
    # get the final queryset for removing tasks
    queryset = queryset.filter(id__in=removing, annotations__isnull=True)
    kept = queryset.exclude(id__in=removing, annotations__isnull=True)
 
    # check that we don't remove tasks with annotations
    if queryset.count() != len(removing):
        raise ValidationError(
            f'Remove duplicates failed, operation is not finished: '
            f'queryset count {queryset.count()} != removing {len(removing)}. '
            'It means that some of duplicated tasks have been annotated twice or more.'
        )
 
    delete_tasks(project, queryset)
    logger.info(f'Removed {len(removing)} duplicated tasks')
    return kept
 
 
def move_annotations(duplicates):
    """Move annotations to the first task from duplicated tasks"""
    total_moved_annotations = 0
 
    for data in duplicates:
        root = duplicates[data]
        if len(root) == 1:
            continue
 
        # find a task with annotations, make it as "first" main one
        i, first = 0, root[0]
        for i, task in enumerate(root):
            first = task
            if task['total_annotations'] + task['cancelled_annotations'] > 0:
                break
 
        # move annotations to the first task
        for task in root[i + 1 :]:
            if task['total_annotations'] + task['cancelled_annotations'] > 0:
                Task.objects.get(id=task['id']).annotations.update(task_id=first['id'])
                total_moved_annotations += task['total_annotations'] + task['cancelled_annotations']
                logger.info(
                    f"Moved {task['total_annotations']} annotations from task {task['id']} to task {first['id']}"
                )
                task['total_annotations'] = 0
                task['cancelled_annotations'] = 0
 
 
def restore_storage_links_for_duplicated_tasks(duplicates) -> None:
    """Build storage links for duplicated tasks and save them to Task in DB"""
 
    # storage classes
    classes = {
        'io_storages_s3importstoragelink': S3ImportStorageLink,
        'io_storages_gcsimportstoragelink': GCSImportStorageLink,
        'io_storages_azureblobimportstoragelink': AzureBlobImportStorageLink,
        'io_storages_localfilesimportstoragelink': LocalFilesImportStorageLink,
        'io_storages_redisimportstoragelink': RedisImportStorageLink,
        # 'lse_io_storages_lses3importstoragelink'  # not supported yet
    }
 
    total_restored_links = 0
    for data in list(duplicates):
        tasks = duplicates[data]
 
        def _get_storagelink(task):
            for link in classes:
                if link_id := task.get(link):
                    return classes[link], link_id
            return None
 
        # find first task with existing StorageLink
        tasks_without_storagelinks = []
        tasks_with_storagelinks = []
        for task in tasks:
            if _get_storagelink(task):
                tasks_with_storagelinks.append(task)
            else:
                tasks_without_storagelinks.append(task)
 
        # add storage links to duplicates
        if tasks_with_storagelinks:
            # we don't support case when there are many storage links in duplicated tasks
            storage_link_class, storage_link_id = _get_storagelink(tasks_with_storagelinks[0])
            # get already existing StorageLink
            link_instance = storage_link_class.objects.get(id=storage_link_id)
 
            for task in tasks_without_storagelinks:
                # assign existing StorageLink to other duplicated tasks
                link = storage_link_class(
                    task_id=task['id'],
                    key=link_instance.key,
                    row_index=link_instance.row_index,
                    row_group=link_instance.row_group,
                    storage=link_instance.storage,
                )
                link.save()
                total_restored_links += 1
                logger.info(
                    f"Restored storage link for task {task['id']} from source task {tasks_with_storagelinks[0]['id']}"
                )
 
    logger.info(f'Restored {total_restored_links} storage links for duplicated tasks')
 
 
def find_duplicated_tasks_by_data(project, queryset):
    """Find duplicated tasks by `task.data` and return them as a dict"""
 
    # get io_storage_* links for tasks, we need to copy them
    storages = []
    for field in dir(Task):
        if field.startswith('io_storages_'):
            storages += [field]
 
    groups = defaultdict(list)
    tasks = list(queryset.values('data', 'id', 'total_annotations', 'cancelled_annotations', *storages))
    logger.info(f'Retrieved {len(tasks)} tasks from queryset')
 
    for task in list(tasks):
        replace_task_data_undefined_with_config_field(task['data'], project)
        task['data'] = json.dumps(task['data'])
        groups[task['data']].append(task)
 
    # make groups of duplicated ids for info print
    duplicates = {d: groups[d] for d in groups if len(groups[d]) > 1}
    info = {d: [task['id'] for task in duplicates[d]] for d in duplicates}
 
    logger.info(f'Found {len(duplicates)} duplicated tasks')
    logger.info(f'Duplicated tasks: {info}')
    return duplicates
 
 
actions: list[DataManagerAction] = [
    {
        'entry_point': remove_duplicates,
        'permission': [all_permissions.projects_change, all_permissions.tasks_delete],
        'title': 'Remove Duplicated Tasks',
        'order': 95,
        'experimental': False,
        'dialog': {
            'text': (
                'Confirm that you want to remove duplicated tasks with the same data fields. '
                'Duplicated tasks will be deleted and all annotations will be moved to the first task from duplicated tasks. '
                'Also Source Storage Links will be restored if at least one duplicated task has a storage link. '
                "Warning: Task assignments (enterprise only) won't be saved."
            ),
            'type': 'confirm',
        },
    },
]