import json
|
import logging
|
import os
|
import shutil
|
import sys
|
|
from core.feature_flags import flag_set
|
from core.models import AsyncMigrationStatus
|
from core.redis import start_job_async_or_sync
|
from core.utils.common import batch, batched_iterator
|
from core.utils.iterators import iterate_queryset
|
from data_export.mixins import ExportMixin
|
from data_export.models import DataExport
|
from data_export.serializers import ExportDataSerializer
|
from data_manager.managers import TaskQuerySet
|
from django.conf import settings
|
from django.db.models import Count, F, Q
|
from django.db.models.lookups import GreaterThanOrEqual
|
from organizations.models import Organization
|
from projects.models import Project
|
from tasks.models import Annotation, Prediction, Task
|
|
logger = logging.getLogger(__name__)
|
|
|
def calculate_stats_all_orgs(from_scratch, redis, migration_name='0018_manual_migrate_counters'):
|
logger = logging.getLogger(__name__)
|
# Don't load full Organization objects bc some columns (contact_info, verify_ssl_certs)
|
# aren't created until after a migration calls this code
|
organization_ids = Organization.objects.order_by('-id').values_list('id', flat=True)
|
|
for org_id in organization_ids:
|
logger.debug(f'Start recalculating stats for Organization {org_id}')
|
|
# start async calculation job on redis
|
start_job_async_or_sync(
|
redis_job_for_calculation,
|
org_id,
|
from_scratch,
|
redis=redis,
|
queue_name='critical',
|
job_timeout=3600 * 24, # 24 hours for one organization
|
migration_name=migration_name,
|
)
|
|
logger.debug(f'Organization {org_id} stats were recalculated')
|
|
logger.debug('All organizations were recalculated')
|
|
|
def redis_job_for_calculation(org_id, from_scratch, migration_name='0018_manual_migrate_counters'):
|
"""
|
Recalculate counters for projects list
|
:param org_id: ID of organization to recalculate
|
:param from_scratch: Start calculation from scratch or skip calculated tasks
|
"""
|
logger = logging.getLogger()
|
logger.setLevel(logging.DEBUG)
|
|
handler = logging.StreamHandler(sys.stdout)
|
handler.setLevel(logging.DEBUG)
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
handler.setFormatter(formatter)
|
logger.addHandler(handler)
|
|
project_dicts = (
|
Project.objects.filter(organization_id=org_id)
|
.order_by('-updated_at')
|
.values(
|
'id',
|
'updated_at',
|
'title',
|
)
|
)
|
for project_dict in project_dicts:
|
migration = AsyncMigrationStatus.objects.create(
|
project_id=project_dict['id'],
|
name=migration_name,
|
status=AsyncMigrationStatus.STATUS_STARTED,
|
)
|
project_tasks = Task.objects.filter(project_id=project_dict['id'])
|
logger.debug(
|
f'Start processing stats project <{project_dict["title"]}> ({project_dict["id"]}) '
|
f'with task count {project_tasks.count()} and updated_at {project_dict["updated_at"]}'
|
)
|
|
task_count = update_tasks_counters(project_tasks, from_scratch=from_scratch)
|
|
migration.status = AsyncMigrationStatus.STATUS_FINISHED
|
migration.meta = {'tasks_processed': task_count, 'total_project_tasks': project_tasks.count()}
|
migration.save()
|
logger.debug(
|
f'End processing counters for project <{project_dict["title"]}> ({project_dict["id"]}), '
|
f'processed {str(task_count)} tasks'
|
)
|
|
|
def export_project(project_id, export_format, path, serializer_context=None):
|
logger = logging.getLogger(__name__)
|
|
project = Project.objects.get(id=project_id)
|
|
export_format = export_format.upper()
|
supported_formats = [s['name'] for s in DataExport.get_export_formats(project)]
|
assert export_format in supported_formats, f'Export format is not supported, please use {supported_formats}'
|
|
task_ids = (
|
Task.objects.filter(project=project).select_related('project').prefetch_related('annotations', 'predictions')
|
)
|
|
logger.debug(f'Start exporting project <{project.title}> ({project.id}) with task count {task_ids.count()}.')
|
|
# serializer context
|
if isinstance(serializer_context, str):
|
serializer_context = json.loads(serializer_context)
|
serializer_options = ExportMixin._get_export_serializer_option(serializer_context)
|
|
# export cycle
|
tasks = []
|
for _task_ids in batch(task_ids, 1000):
|
tasks += ExportDataSerializer(_task_ids, many=True, **serializer_options).data
|
|
# convert to output format
|
export_file, _, filename = DataExport.generate_export_file(
|
project, tasks, export_format, settings.CONVERTER_DOWNLOAD_RESOURCES, {}
|
)
|
|
# write to file
|
filepath = os.path.join(path, filename) if os.path.isdir(path) else path
|
with open(filepath, 'wb') as file:
|
shutil.copyfileobj(export_file, file)
|
export_file.close()
|
|
logger.debug(f'End exporting project <{project.title}> ({project.id}) in {export_format} format.')
|
|
return filepath
|
|
|
def _fill_annotations_project(project_id):
|
Annotation.objects.filter(task__project_id=project_id).update(project_id=project_id)
|
|
|
def fill_annotations_project():
|
logger.info('Start filling project field for Annotation model')
|
|
project_ids = Project.objects.all().values_list('id', flat=True)
|
for project_id in project_ids:
|
start_job_async_or_sync(_fill_annotations_project, project_id)
|
|
logger.info('Finished filling project field for Annotation model')
|
|
|
def _fill_predictions_project(migration_name='0043_auto_20230825'):
|
project_ids = Project.objects.all().values_list('id', flat=True)
|
for project_id in project_ids:
|
migration = AsyncMigrationStatus.objects.create(
|
project_id=project_id,
|
name=migration_name,
|
status=AsyncMigrationStatus.STATUS_STARTED,
|
)
|
|
updated_count = Prediction.objects.filter(task__project_id=project_id).update(project_id=project_id)
|
|
migration.status = AsyncMigrationStatus.STATUS_FINISHED
|
migration.meta = {
|
'predictions_processed': updated_count,
|
'total_project_predictions': Prediction.objects.filter(project_id=project_id).count(),
|
}
|
migration.save()
|
|
|
def fill_predictions_project(migration_name):
|
logger.info('Start filling project field for Prediction model')
|
start_job_async_or_sync(_fill_predictions_project, migration_name=migration_name)
|
logger.info('Finished filling project field for Prediction model')
|
|
|
def update_tasks_counters(queryset, from_scratch=True):
|
"""
|
Update tasks counters for the passed queryset of Tasks
|
:param queryset: Tasks to update queryset
|
:param from_scratch: Skip calculated tasks
|
:return: Count of updated tasks
|
"""
|
total_annotations = Count('annotations', distinct=True, filter=Q(annotations__was_cancelled=False))
|
cancelled_annotations = Count('annotations', distinct=True, filter=Q(annotations__was_cancelled=True))
|
total_predictions = Count('predictions', distinct=True)
|
# construct QuerySet in case of list of Tasks
|
if isinstance(queryset, list) and len(queryset) > 0 and isinstance(queryset[0], Task):
|
queryset = Task.objects.filter(id__in=[task.id for task in queryset])
|
# construct QuerySet in case annotated queryset
|
if isinstance(queryset, TaskQuerySet) and queryset.exists() and isinstance(queryset[0], int):
|
queryset = Task.objects.filter(id__in=queryset)
|
|
if not from_scratch:
|
queryset = queryset.exclude(
|
Q(total_annotations__gt=0) | Q(cancelled_annotations__gt=0) | Q(total_predictions__gt=0)
|
)
|
|
# filter our tasks with 0 annotations and 0 predictions and update them with 0
|
# order_by('id') ensures consistent row locking order to prevent deadlocks
|
queryset.filter(annotations__isnull=True, predictions__isnull=True).order_by('id').update(
|
total_annotations=0, cancelled_annotations=0, total_predictions=0
|
)
|
|
# filter our tasks with 0 annotations and 0 predictions
|
queryset = queryset.filter(Q(annotations__isnull=False) | Q(predictions__isnull=False))
|
queryset = queryset.annotate(
|
new_total_annotations=total_annotations,
|
new_cancelled_annotations=cancelled_annotations,
|
new_total_predictions=total_predictions,
|
)
|
|
updated_count = 0
|
|
tasks_iterator = iterate_queryset(
|
queryset.only('id', 'total_annotations', 'cancelled_annotations', 'total_predictions'),
|
chunk_size=settings.BATCH_SIZE,
|
)
|
|
for _batch in batched_iterator(tasks_iterator, settings.BATCH_SIZE):
|
batch_list = []
|
for task in _batch:
|
task.total_annotations = task.new_total_annotations
|
task.cancelled_annotations = task.new_cancelled_annotations
|
task.total_predictions = task.new_total_predictions
|
batch_list.append(task)
|
|
if batch_list:
|
Task.objects.bulk_update(
|
batch_list,
|
['total_annotations', 'cancelled_annotations', 'total_predictions'],
|
batch_size=settings.BATCH_SIZE,
|
)
|
updated_count += len(batch_list)
|
|
return updated_count
|
|
|
def bulk_update_is_labeled_by_overlap(tasks_ids, project):
|
if not tasks_ids:
|
return
|
|
batch_size = settings.BATCH_SIZE
|
|
if flag_set('fflag_fix_fit_1082_overlap_use_distinct_annotators', user='auto'):
|
# Use distinct annotator count for overlap comparison
|
for i in range(0, len(tasks_ids), batch_size):
|
batch_ids = tasks_ids[i : i + batch_size]
|
|
# Annotate with distinct annotator count
|
if project.skip_queue == project.SkipQueue.IGNORE_SKIPPED:
|
annotator_count_expr = Count('annotations__completed_by', distinct=True)
|
else:
|
annotator_count_expr = Count(
|
'annotations__completed_by',
|
distinct=True,
|
filter=Q(annotations__was_cancelled=False),
|
)
|
|
tasks_qs = Task.objects.filter(id__in=batch_ids, project=project).annotate(
|
annotator_count=annotator_count_expr
|
)
|
|
# Get IDs of tasks that meet the overlap requirement
|
finished_task_ids = list(tasks_qs.filter(annotator_count__gte=F('overlap')).values_list('id', flat=True))
|
|
# Update is_labeled based on annotator count
|
Task.objects.filter(id__in=finished_task_ids, project=project).update(is_labeled=True)
|
Task.objects.filter(id__in=batch_ids, project=project).exclude(id__in=finished_task_ids).update(
|
is_labeled=False
|
)
|
else:
|
# Original behavior: use total annotation count
|
completed_annotations_f_expr = F('total_annotations')
|
if project.skip_queue == project.SkipQueue.IGNORE_SKIPPED:
|
completed_annotations_f_expr += F('cancelled_annotations')
|
finished_q = Q(GreaterThanOrEqual(completed_annotations_f_expr, F('overlap')))
|
|
for i in range(0, len(tasks_ids), batch_size):
|
batch_ids = tasks_ids[i : i + batch_size]
|
|
Task.objects.filter(id__in=batch_ids, project=project).filter(finished_q).update(is_labeled=True)
|
Task.objects.filter(id__in=batch_ids, project=project).exclude(finished_q).update(is_labeled=False)
|