Bin
2025-12-16 7423b0c6e1959f30a7e8e453e953310f32ce13c6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import json
import logging
import os
import shutil
import sys
 
from core.feature_flags import flag_set
from core.models import AsyncMigrationStatus
from core.redis import start_job_async_or_sync
from core.utils.common import batch, batched_iterator
from core.utils.iterators import iterate_queryset
from data_export.mixins import ExportMixin
from data_export.models import DataExport
from data_export.serializers import ExportDataSerializer
from data_manager.managers import TaskQuerySet
from django.conf import settings
from django.db.models import Count, F, Q
from django.db.models.lookups import GreaterThanOrEqual
from organizations.models import Organization
from projects.models import Project
from tasks.models import Annotation, Prediction, Task
 
logger = logging.getLogger(__name__)
 
 
def calculate_stats_all_orgs(from_scratch, redis, migration_name='0018_manual_migrate_counters'):
    logger = logging.getLogger(__name__)
    # Don't load full Organization objects bc some columns (contact_info, verify_ssl_certs)
    # aren't created until after a migration calls this code
    organization_ids = Organization.objects.order_by('-id').values_list('id', flat=True)
 
    for org_id in organization_ids:
        logger.debug(f'Start recalculating stats for Organization {org_id}')
 
        # start async calculation job on redis
        start_job_async_or_sync(
            redis_job_for_calculation,
            org_id,
            from_scratch,
            redis=redis,
            queue_name='critical',
            job_timeout=3600 * 24,  # 24 hours for one organization
            migration_name=migration_name,
        )
 
        logger.debug(f'Organization {org_id} stats were recalculated')
 
    logger.debug('All organizations were recalculated')
 
 
def redis_job_for_calculation(org_id, from_scratch, migration_name='0018_manual_migrate_counters'):
    """
    Recalculate counters for projects list
    :param org_id: ID of organization to recalculate
    :param from_scratch: Start calculation from scratch or skip calculated tasks
    """
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
 
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
 
    project_dicts = (
        Project.objects.filter(organization_id=org_id)
        .order_by('-updated_at')
        .values(
            'id',
            'updated_at',
            'title',
        )
    )
    for project_dict in project_dicts:
        migration = AsyncMigrationStatus.objects.create(
            project_id=project_dict['id'],
            name=migration_name,
            status=AsyncMigrationStatus.STATUS_STARTED,
        )
        project_tasks = Task.objects.filter(project_id=project_dict['id'])
        logger.debug(
            f'Start processing stats project <{project_dict["title"]}> ({project_dict["id"]}) '
            f'with task count {project_tasks.count()} and updated_at {project_dict["updated_at"]}'
        )
 
        task_count = update_tasks_counters(project_tasks, from_scratch=from_scratch)
 
        migration.status = AsyncMigrationStatus.STATUS_FINISHED
        migration.meta = {'tasks_processed': task_count, 'total_project_tasks': project_tasks.count()}
        migration.save()
        logger.debug(
            f'End processing counters for project <{project_dict["title"]}> ({project_dict["id"]}), '
            f'processed {str(task_count)} tasks'
        )
 
 
def export_project(project_id, export_format, path, serializer_context=None):
    logger = logging.getLogger(__name__)
 
    project = Project.objects.get(id=project_id)
 
    export_format = export_format.upper()
    supported_formats = [s['name'] for s in DataExport.get_export_formats(project)]
    assert export_format in supported_formats, f'Export format is not supported, please use {supported_formats}'
 
    task_ids = (
        Task.objects.filter(project=project).select_related('project').prefetch_related('annotations', 'predictions')
    )
 
    logger.debug(f'Start exporting project <{project.title}> ({project.id}) with task count {task_ids.count()}.')
 
    # serializer context
    if isinstance(serializer_context, str):
        serializer_context = json.loads(serializer_context)
    serializer_options = ExportMixin._get_export_serializer_option(serializer_context)
 
    # export cycle
    tasks = []
    for _task_ids in batch(task_ids, 1000):
        tasks += ExportDataSerializer(_task_ids, many=True, **serializer_options).data
 
    # convert to output format
    export_file, _, filename = DataExport.generate_export_file(
        project, tasks, export_format, settings.CONVERTER_DOWNLOAD_RESOURCES, {}
    )
 
    # write to file
    filepath = os.path.join(path, filename) if os.path.isdir(path) else path
    with open(filepath, 'wb') as file:
        shutil.copyfileobj(export_file, file)
    export_file.close()
 
    logger.debug(f'End exporting project <{project.title}> ({project.id}) in {export_format} format.')
 
    return filepath
 
 
def _fill_annotations_project(project_id):
    Annotation.objects.filter(task__project_id=project_id).update(project_id=project_id)
 
 
def fill_annotations_project():
    logger.info('Start filling project field for Annotation model')
 
    project_ids = Project.objects.all().values_list('id', flat=True)
    for project_id in project_ids:
        start_job_async_or_sync(_fill_annotations_project, project_id)
 
    logger.info('Finished filling project field for Annotation model')
 
 
def _fill_predictions_project(migration_name='0043_auto_20230825'):
    project_ids = Project.objects.all().values_list('id', flat=True)
    for project_id in project_ids:
        migration = AsyncMigrationStatus.objects.create(
            project_id=project_id,
            name=migration_name,
            status=AsyncMigrationStatus.STATUS_STARTED,
        )
 
        updated_count = Prediction.objects.filter(task__project_id=project_id).update(project_id=project_id)
 
        migration.status = AsyncMigrationStatus.STATUS_FINISHED
        migration.meta = {
            'predictions_processed': updated_count,
            'total_project_predictions': Prediction.objects.filter(project_id=project_id).count(),
        }
        migration.save()
 
 
def fill_predictions_project(migration_name):
    logger.info('Start filling project field for Prediction model')
    start_job_async_or_sync(_fill_predictions_project, migration_name=migration_name)
    logger.info('Finished filling project field for Prediction model')
 
 
def update_tasks_counters(queryset, from_scratch=True):
    """
    Update tasks counters for the passed queryset of Tasks
    :param queryset: Tasks to update queryset
    :param from_scratch: Skip calculated tasks
    :return: Count of updated tasks
    """
    total_annotations = Count('annotations', distinct=True, filter=Q(annotations__was_cancelled=False))
    cancelled_annotations = Count('annotations', distinct=True, filter=Q(annotations__was_cancelled=True))
    total_predictions = Count('predictions', distinct=True)
    # construct QuerySet in case of list of Tasks
    if isinstance(queryset, list) and len(queryset) > 0 and isinstance(queryset[0], Task):
        queryset = Task.objects.filter(id__in=[task.id for task in queryset])
    # construct QuerySet in case annotated queryset
    if isinstance(queryset, TaskQuerySet) and queryset.exists() and isinstance(queryset[0], int):
        queryset = Task.objects.filter(id__in=queryset)
 
    if not from_scratch:
        queryset = queryset.exclude(
            Q(total_annotations__gt=0) | Q(cancelled_annotations__gt=0) | Q(total_predictions__gt=0)
        )
 
    # filter our tasks with 0 annotations and 0 predictions and update them with 0
    # order_by('id') ensures consistent row locking order to prevent deadlocks
    queryset.filter(annotations__isnull=True, predictions__isnull=True).order_by('id').update(
        total_annotations=0, cancelled_annotations=0, total_predictions=0
    )
 
    # filter our tasks with 0 annotations and 0 predictions
    queryset = queryset.filter(Q(annotations__isnull=False) | Q(predictions__isnull=False))
    queryset = queryset.annotate(
        new_total_annotations=total_annotations,
        new_cancelled_annotations=cancelled_annotations,
        new_total_predictions=total_predictions,
    )
 
    updated_count = 0
 
    tasks_iterator = iterate_queryset(
        queryset.only('id', 'total_annotations', 'cancelled_annotations', 'total_predictions'),
        chunk_size=settings.BATCH_SIZE,
    )
 
    for _batch in batched_iterator(tasks_iterator, settings.BATCH_SIZE):
        batch_list = []
        for task in _batch:
            task.total_annotations = task.new_total_annotations
            task.cancelled_annotations = task.new_cancelled_annotations
            task.total_predictions = task.new_total_predictions
            batch_list.append(task)
 
        if batch_list:
            Task.objects.bulk_update(
                batch_list,
                ['total_annotations', 'cancelled_annotations', 'total_predictions'],
                batch_size=settings.BATCH_SIZE,
            )
            updated_count += len(batch_list)
 
    return updated_count
 
 
def bulk_update_is_labeled_by_overlap(tasks_ids, project):
    if not tasks_ids:
        return
 
    batch_size = settings.BATCH_SIZE
 
    if flag_set('fflag_fix_fit_1082_overlap_use_distinct_annotators', user='auto'):
        # Use distinct annotator count for overlap comparison
        for i in range(0, len(tasks_ids), batch_size):
            batch_ids = tasks_ids[i : i + batch_size]
 
            # Annotate with distinct annotator count
            if project.skip_queue == project.SkipQueue.IGNORE_SKIPPED:
                annotator_count_expr = Count('annotations__completed_by', distinct=True)
            else:
                annotator_count_expr = Count(
                    'annotations__completed_by',
                    distinct=True,
                    filter=Q(annotations__was_cancelled=False),
                )
 
            tasks_qs = Task.objects.filter(id__in=batch_ids, project=project).annotate(
                annotator_count=annotator_count_expr
            )
 
            # Get IDs of tasks that meet the overlap requirement
            finished_task_ids = list(tasks_qs.filter(annotator_count__gte=F('overlap')).values_list('id', flat=True))
 
            # Update is_labeled based on annotator count
            Task.objects.filter(id__in=finished_task_ids, project=project).update(is_labeled=True)
            Task.objects.filter(id__in=batch_ids, project=project).exclude(id__in=finished_task_ids).update(
                is_labeled=False
            )
    else:
        # Original behavior: use total annotation count
        completed_annotations_f_expr = F('total_annotations')
        if project.skip_queue == project.SkipQueue.IGNORE_SKIPPED:
            completed_annotations_f_expr += F('cancelled_annotations')
        finished_q = Q(GreaterThanOrEqual(completed_annotations_f_expr, F('overlap')))
 
        for i in range(0, len(tasks_ids), batch_size):
            batch_ids = tasks_ids[i : i + batch_size]
 
            Task.objects.filter(id__in=batch_ids, project=project).filter(finished_q).update(is_labeled=True)
            Task.objects.filter(id__in=batch_ids, project=project).exclude(finished_q).update(is_labeled=False)