"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
|
"""
|
import logging
|
import re
|
from datetime import datetime
|
from functools import reduce
|
from typing import ClassVar
|
|
import ujson as json
|
from core.feature_flags import flag_set
|
from core.utils.db import fast_first
|
from data_manager.prepare_params import ConjunctionEnum
|
from django.conf import settings
|
from django.contrib.postgres.aggregates import ArrayAgg
|
from django.db import models
|
from django.db.models import (
|
Aggregate,
|
Avg,
|
Case,
|
DateTimeField,
|
Exists,
|
F,
|
FloatField,
|
OuterRef,
|
Q,
|
Subquery,
|
TextField,
|
Value,
|
When,
|
)
|
from django.db.models.fields.json import KeyTextTransform
|
from django.db.models.functions import Cast, Coalesce, Concat
|
from fsm.queryset_mixins import FSMStateQuerySetMixin
|
from fsm.registry import get_state_choices
|
from pydantic import BaseModel
|
from rest_framework.exceptions import ValidationError
|
|
from label_studio.core.utils.common import load_func
|
from label_studio.core.utils.params import cast_bool_from_str
|
|
logger = logging.getLogger(__name__)
|
|
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
|
|
|
class _Operator(BaseModel):
|
EQUAL: ClassVar[str] = 'equal'
|
NOT_EQUAL: ClassVar[str] = 'not_equal'
|
LESS: ClassVar[str] = 'less'
|
GREATER: ClassVar[str] = 'greater'
|
LESS_OR_EQUAL: ClassVar[str] = 'less_or_equal'
|
GREATER_OR_EQUAL: ClassVar[str] = 'greater_or_equal'
|
IN: ClassVar[str] = 'in'
|
NOT_IN: ClassVar[str] = 'not_in'
|
IN_LIST: ClassVar[str] = 'in_list'
|
NOT_IN_LIST: ClassVar[str] = 'not_in_list'
|
EMPTY: ClassVar[str] = 'empty'
|
CONTAINS: ClassVar[str] = 'contains'
|
NOT_CONTAINS: ClassVar[str] = 'not_contains'
|
REGEX: ClassVar[str] = 'regex'
|
|
|
Operator = _Operator()
|
|
operators = {
|
Operator.EQUAL: '',
|
Operator.NOT_EQUAL: '',
|
Operator.LESS: '__lt',
|
Operator.GREATER: '__gt',
|
Operator.LESS_OR_EQUAL: '__lte',
|
Operator.GREATER_OR_EQUAL: '__gte',
|
Operator.IN: '',
|
Operator.NOT_IN: '',
|
Operator.IN_LIST: '',
|
Operator.NOT_IN_LIST: '',
|
Operator.EMPTY: '__isnull',
|
Operator.CONTAINS: '__icontains',
|
Operator.NOT_CONTAINS: '__icontains',
|
Operator.REGEX: '__regex',
|
}
|
|
|
def get_fields_for_filter_ordering(prepare_params):
|
result = []
|
if prepare_params is None:
|
return result
|
|
# collect fields from ordering
|
if prepare_params.ordering:
|
ordering_field_name = prepare_params.ordering[0].replace('tasks:', '').replace('-', '')
|
result.append(ordering_field_name)
|
|
# collect fields from filters
|
if prepare_params.filters:
|
for _filter in prepare_params.filters.items:
|
filter_field_name = _filter.filter.replace('filter:tasks:', '')
|
result.append(filter_field_name)
|
return result
|
|
|
def get_fields_for_evaluation(prepare_params, user, skip_regular=True):
|
"""Collecting field names to annotate them
|
|
:param prepare_params: structure with filters and ordering
|
:param user: user
|
:return: list of field names
|
"""
|
from projects.models import Project
|
from tasks.models import Task
|
|
result = []
|
result += get_fields_for_filter_ordering(prepare_params)
|
|
# visible fields calculation
|
fields = prepare_params.data.get('hiddenColumns', None)
|
if fields:
|
from label_studio.data_manager.functions import TASKS
|
|
GET_ALL_COLUMNS = load_func(settings.DATA_MANAGER_GET_ALL_COLUMNS)
|
all_columns = GET_ALL_COLUMNS(Project.objects.get(id=prepare_params.project), user)
|
all_columns = set(
|
[TASKS + ('data.' if c.get('parent', None) == 'data' else '') + c['id'] for c in all_columns['columns']]
|
)
|
hidden = set(fields['explore']) & set(fields['labeling'])
|
shown = all_columns - hidden
|
shown = {c[len(TASKS) :] for c in shown} - {'data'} # remove tasks:
|
result = set(result) | shown
|
|
# remove duplicates
|
result = set(result)
|
|
# we don't need to annotate regular model fields, so we skip them
|
if skip_regular:
|
skipped_fields = [field.attname for field in Task._meta.fields]
|
skipped_fields.append('id')
|
result = [f for f in result if f not in skipped_fields]
|
result = [f for f in result if not f.startswith('data.')]
|
|
return result
|
|
|
def apply_ordering(queryset, ordering, project, request, view_data=None):
|
if ordering:
|
|
preprocess_field_name = load_func(settings.PREPROCESS_FIELD_NAME)
|
raw_field_name = ordering[0]
|
numeric_ordering = False
|
unsigned_field_name = raw_field_name.lstrip('-+')
|
if (
|
view_data is not None
|
and 'columnsDisplayType' in view_data
|
and unsigned_field_name in view_data['columnsDisplayType']
|
and view_data['columnsDisplayType'][unsigned_field_name] == 'Number'
|
):
|
numeric_ordering = True
|
field_name, ascending = preprocess_field_name(raw_field_name, project=project)
|
|
if field_name.startswith('data__'):
|
# annotate task with data field for float/int/bool ordering support
|
json_field = field_name.replace('data__', '')
|
numeric_ordering_applied = False
|
if numeric_ordering is True:
|
queryset = queryset.annotate(
|
ordering_field=Cast(KeyTextTransform(json_field, 'data'), output_field=FloatField())
|
)
|
# for non numeric values we need fallback to string ordering
|
try:
|
queryset.first()
|
numeric_ordering_applied = True
|
except Exception as e:
|
logger.warning(f'Failed to apply numeric ordering for field {json_field}: {e}')
|
if not numeric_ordering_applied:
|
queryset = queryset.annotate(ordering_field=KeyTextTransform(json_field, 'data'))
|
f = F('ordering_field').asc(nulls_last=True) if ascending else F('ordering_field').desc(nulls_last=True)
|
|
elif field_name == 'state':
|
state_choices = get_state_choices('task')
|
whens = [When(current_state=state, then=Value(i + 1)) for i, state in enumerate(state_choices.values)]
|
queryset = queryset.annotate(
|
state_order=Case(*whens, default=Value(0), output_field=models.IntegerField())
|
)
|
f = F('state_order').asc(nulls_last=True) if ascending else F('state_order').desc(nulls_last=True)
|
else:
|
f = F(field_name).asc(nulls_last=True) if ascending else F(field_name).desc(nulls_last=True)
|
|
queryset = queryset.order_by(f)
|
else:
|
queryset = queryset.order_by('id')
|
|
return queryset
|
|
|
def cast_value(_filter):
|
# range (is between)
|
if hasattr(_filter.value, 'max'):
|
if _filter.type == 'Number':
|
_filter.value.min = float(_filter.value.min)
|
_filter.value.max = float(_filter.value.max)
|
elif _filter.type == 'Datetime':
|
_filter.value.min = datetime.strptime(_filter.value.min, DATETIME_FORMAT)
|
_filter.value.max = datetime.strptime(_filter.value.max, DATETIME_FORMAT)
|
# one value
|
else:
|
if _filter.type == 'Number':
|
_filter.value = float(_filter.value)
|
elif _filter.type == 'Datetime':
|
_filter.value = datetime.strptime(_filter.value, DATETIME_FORMAT)
|
elif _filter.type == 'Boolean':
|
_filter.value = cast_bool_from_str(_filter.value)
|
|
|
def add_result_filter(field_name, _filter, filter_expressions, project):
|
from django.db.models.expressions import RawSQL
|
from tasks.models import Annotation, Prediction
|
|
_class = Annotation if field_name == 'annotations_results' else Prediction
|
|
# Annotation
|
if field_name == 'annotations_results':
|
subquery = Q(
|
id__in=Annotation.objects.annotate(json_str=RawSQL('cast(result as text)', ''))
|
.filter(Q(project=project) & Q(json_str__contains=_filter.value))
|
.filter(task=OuterRef('pk'))
|
.values_list('task', flat=True)
|
)
|
# Predictions: they don't have `project` yet
|
else:
|
subquery = Exists(
|
_class.objects.annotate(json_str=RawSQL('cast(result as text)', '')).filter(
|
Q(task=OuterRef('pk')) & Q(json_str__contains=_filter.value)
|
)
|
)
|
|
if _filter.operator in [Operator.EQUAL, Operator.NOT_EQUAL]:
|
try:
|
value = json.loads(_filter.value)
|
except: # noqa: E722
|
return 'exit'
|
|
q = Exists(_class.objects.filter(Q(task=OuterRef('pk')) & Q(result=value)))
|
filter_expressions.append(q if _filter.operator == Operator.EQUAL else ~q)
|
return 'continue'
|
elif _filter.operator == Operator.CONTAINS:
|
filter_expressions.append(Q(subquery))
|
return 'continue'
|
elif _filter.operator == Operator.NOT_CONTAINS:
|
filter_expressions.append(~Q(subquery))
|
return 'continue'
|
elif _filter.operator == Operator.EMPTY:
|
if cast_bool_from_str(_filter.value):
|
q = Q(annotations__result__isnull=True) | Q(annotations__result=[])
|
else:
|
q = Q(annotations__result__isnull=False) & ~Q(annotations__result=[])
|
filter_expressions.append(q)
|
return 'continue'
|
|
|
def add_user_filter(enabled, key, _filter, filter_expressions):
|
if enabled and _filter.operator == Operator.CONTAINS:
|
filter_expressions.append(Q(**{key: int(_filter.value)}))
|
return 'continue'
|
elif enabled and _filter.operator == Operator.NOT_CONTAINS:
|
filter_expressions.append(~Q(**{key: int(_filter.value)}))
|
return 'continue'
|
elif enabled and _filter.operator == Operator.EMPTY:
|
value = cast_bool_from_str(_filter.value)
|
filter_expressions.append(Q(**{key + '__isnull': value}))
|
return 'continue'
|
|
|
def apply_filters(queryset, filters, project, request):
|
if not filters:
|
return queryset
|
|
# convert conjunction to orm statement
|
custom_filter_expressions = load_func(settings.DATA_MANAGER_CUSTOM_FILTER_EXPRESSIONS)
|
|
# combine child filters with their parent in the same filter expression
|
filter_line_expressions: list[list[Q]] = []
|
for parent_filter in filters.items:
|
filter_line = [parent_filter, parent_filter.child_filter] if parent_filter.child_filter else [parent_filter]
|
filter_expressions: list[Q] = []
|
|
for _filter in filter_line:
|
is_child_filter = parent_filter.child_filter is not None and _filter is parent_filter.child_filter
|
|
# we can also have annotations filters
|
if not _filter.filter.startswith('filter:tasks:') or _filter.value is None:
|
continue
|
|
# django orm loop expression attached to column name
|
preprocess_field_name = load_func(settings.PREPROCESS_FIELD_NAME)
|
field_name, _ = preprocess_field_name(_filter.filter, project)
|
|
# filter pre-processing, value type conversion, etc..
|
preprocess_filter = load_func(settings.DATA_MANAGER_PREPROCESS_FILTER)
|
_filter = preprocess_filter(_filter, field_name)
|
|
# custom expressions for enterprise
|
filter_expression = custom_filter_expressions(
|
_filter,
|
field_name,
|
project,
|
request=request,
|
is_child_filter=is_child_filter,
|
)
|
if filter_expression:
|
filter_expressions.append(filter_expression)
|
continue
|
|
# annotators
|
result = add_user_filter(
|
field_name == 'annotators', 'annotations__completed_by', _filter, filter_expressions
|
)
|
if result == 'continue':
|
continue
|
|
# updated_by
|
result = add_user_filter(field_name == 'updated_by', 'updated_by', _filter, filter_expressions)
|
if result == 'continue':
|
continue
|
|
# annotations results & predictions results
|
if field_name in ['annotations_results', 'predictions_results']:
|
result = add_result_filter(field_name, _filter, filter_expressions, project)
|
if result == 'exit':
|
return queryset.none()
|
elif result == 'continue':
|
continue
|
|
# annotation ids
|
if field_name == 'annotations_ids':
|
field_name = 'annotations__id'
|
if 'contains' in _filter.operator:
|
# convert string like "1 2,3" => [1,2,3]
|
_filter.value = [
|
int(value) for value in re.split(',|;| ', _filter.value) if value and value.isdigit()
|
]
|
_filter.operator = 'in_list' if _filter.operator == 'contains' else 'not_in_list'
|
elif 'equal' in _filter.operator:
|
if not _filter.value.isdigit():
|
_filter.value = 0
|
|
# predictions model versions
|
if field_name == 'predictions_model_versions' and _filter.operator == Operator.CONTAINS:
|
q = Q()
|
for value in _filter.value:
|
q |= Q(predictions__model_version__contains=value)
|
filter_expressions.append(q)
|
continue
|
elif field_name == 'predictions_model_versions' and _filter.operator == Operator.NOT_CONTAINS:
|
q = Q()
|
for value in _filter.value:
|
q &= ~Q(predictions__model_version__contains=value)
|
filter_expressions.append(q)
|
continue
|
elif field_name == 'predictions_model_versions' and _filter.operator == Operator.EMPTY:
|
value = cast_bool_from_str(_filter.value)
|
filter_expressions.append(Q(predictions__model_version__isnull=value))
|
continue
|
|
# use other name because of model names conflict
|
if field_name == 'file_upload':
|
field_name = 'file_upload_field'
|
|
# annotate with cast to number if need
|
if _filter.type == 'Number' and field_name.startswith('data__'):
|
json_field = field_name.replace('data__', '')
|
queryset = queryset.annotate(
|
**{
|
f'filter_{json_field.replace("$undefined$", "undefined")}': Cast(
|
KeyTextTransform(json_field, 'data'), output_field=FloatField()
|
)
|
}
|
)
|
clean_field_name = f'filter_{json_field.replace("$undefined$", "undefined")}'
|
else:
|
clean_field_name = field_name
|
|
# special case: predictions, annotations, cancelled --- for them 0 is equal to is_empty=True
|
if (
|
clean_field_name in ('total_predictions', 'total_annotations', 'cancelled_annotations')
|
and _filter.operator == 'empty'
|
):
|
_filter.operator = 'equal' if cast_bool_from_str(_filter.value) else 'not_equal'
|
_filter.value = 0
|
|
# get type of annotated field
|
value_type = 'str'
|
if queryset.exists():
|
value_type = type(queryset.values_list(field_name, flat=True)[0]).__name__
|
|
if (value_type == 'list' or value_type == 'tuple') and 'equal' in _filter.operator:
|
raise ValidationError('Not supported filter type')
|
|
# special case: for strings empty is "" or null=True
|
if _filter.type in ('String', 'Unknown') and _filter.operator == 'empty':
|
value = cast_bool_from_str(_filter.value)
|
if value: # empty = true
|
q = Q(Q(**{field_name: None}) | Q(**{field_name + '__isnull': True}))
|
if value_type == 'str':
|
q |= Q(**{field_name: ''})
|
if value_type == 'list':
|
q = Q(**{field_name: [None]})
|
|
else: # empty = false
|
q = Q(~Q(**{field_name: None}) & ~Q(**{field_name + '__isnull': True}))
|
if value_type == 'str':
|
q &= ~Q(**{field_name: ''})
|
if value_type == 'list':
|
q = ~Q(**{field_name: [None]})
|
|
filter_expressions.append(q)
|
continue
|
|
# regex pattern check
|
elif _filter.operator == 'regex':
|
try:
|
re.compile(pattern=str(_filter.value))
|
except Exception as e:
|
logger.info('Incorrect regex for filter: %s: %s', _filter.value, str(e))
|
return queryset.none()
|
|
# append operator
|
field_name = f"{clean_field_name}{operators.get(_filter.operator, '')}"
|
|
# in
|
if _filter.operator == 'in':
|
cast_value(_filter)
|
filter_expressions.append(
|
Q(
|
**{
|
f'{field_name}__gte': _filter.value.min,
|
f'{field_name}__lte': _filter.value.max,
|
}
|
),
|
)
|
|
# not in
|
elif _filter.operator == 'not_in':
|
cast_value(_filter)
|
filter_expressions.append(
|
~Q(
|
**{
|
f'{field_name}__gte': _filter.value.min,
|
f'{field_name}__lte': _filter.value.max,
|
}
|
),
|
)
|
|
# in list
|
elif _filter.operator == 'in_list':
|
filter_expressions.append(
|
Q(**{f'{field_name}__in': _filter.value}),
|
)
|
|
# not in list
|
elif _filter.operator == 'not_in_list':
|
filter_expressions.append(
|
~Q(**{f'{field_name}__in': _filter.value}),
|
)
|
|
# empty
|
elif _filter.operator == 'empty':
|
if cast_bool_from_str(_filter.value):
|
filter_expressions.append(Q(**{field_name: True}))
|
else:
|
filter_expressions.append(~Q(**{field_name: True}))
|
|
# starting from not_
|
elif _filter.operator.startswith('not_'):
|
cast_value(_filter)
|
filter_expressions.append(~Q(**{field_name: _filter.value}))
|
|
# all others
|
else:
|
cast_value(_filter)
|
filter_expressions.append(Q(**{field_name: _filter.value}))
|
|
filter_line_expressions.append(filter_expressions)
|
|
resolved_filter_lines = [reduce(lambda x, y: x & y, fle) for fle in filter_line_expressions]
|
|
"""WARNING: Stringifying filter_expressions will evaluate the (sub)queryset.
|
Do not use a log in the following manner:
|
logger.debug(f'Apply filter: {filter_expressions}')
|
Even in DEBUG mode, a subqueryset that has OuterRef will raise an error
|
if evaluated outside a parent queryset.
|
"""
|
if filters.conjunction == ConjunctionEnum.OR:
|
result_filter = Q()
|
for resolved_filter in resolved_filter_lines:
|
result_filter.add(resolved_filter, Q.OR)
|
queryset = queryset.filter(result_filter)
|
else:
|
for resolved_filter in resolved_filter_lines:
|
queryset = queryset.filter(resolved_filter)
|
return queryset
|
|
|
class TaskQuerySet(FSMStateQuerySetMixin, models.QuerySet):
|
"""QuerySet for Task model with Data Manager filters and ordering support."""
|
|
def prepared(self, prepare_params=None):
|
"""Apply filters, ordering and selected items to queryset
|
|
:param prepare_params: prepare params with project, filters, orderings, etc
|
:return: ordered and filtered queryset
|
|
Note: For multi-project queries, filters and ordering will use the first project's
|
configuration (label config, custom fields, etc.). This is backwards compatible
|
with single-project queries.
|
"""
|
from projects.models import Project
|
|
queryset = self
|
|
if prepare_params is None:
|
return queryset
|
|
# Get the project for filter/ordering configuration
|
# For multi-project queries, use the first project's configuration
|
if prepare_params.is_multi_project:
|
project = Project.objects.get(pk=prepare_params.projects[0])
|
else:
|
# Backwards compatible: prepare_params.project is an int
|
project = Project.objects.get(pk=prepare_params.project)
|
|
request = prepare_params.request
|
queryset = apply_filters(queryset, prepare_params.filters, project, request)
|
queryset = apply_ordering(queryset, prepare_params.ordering, project, request, view_data=prepare_params.data)
|
|
if not prepare_params.selectedItems:
|
return queryset
|
|
# included selected items
|
if prepare_params.selectedItems.all is False and prepare_params.selectedItems.included:
|
queryset = queryset.filter(id__in=prepare_params.selectedItems.included)
|
|
# excluded selected items
|
elif prepare_params.selectedItems.all is True and prepare_params.selectedItems.excluded:
|
queryset = queryset.exclude(id__in=prepare_params.selectedItems.excluded)
|
|
return queryset
|
|
|
class GroupConcat(Aggregate):
|
function = 'GROUP_CONCAT'
|
template = '%(function)s(%(distinct)s%(expressions)s)'
|
|
def __init__(self, expression, distinct=False, output_field=None, **extra):
|
output_field = models.JSONField() if output_field is None else output_field
|
super().__init__(expression, distinct='DISTINCT ' if distinct else '', output_field=output_field, **extra)
|
|
|
def newest_annotation_subquery() -> Subquery:
|
from tasks.models import Annotation
|
|
newest_annotations = Annotation.objects.filter(task=OuterRef('pk')).order_by('-id')[:1]
|
return Subquery(newest_annotations.values('created_at'))
|
|
|
def base_annotate_completed_at(queryset: TaskQuerySet) -> TaskQuerySet:
|
return queryset.annotate(completed_at=Case(When(is_labeled=True, then=newest_annotation_subquery())))
|
|
|
def annotate_completed_at(queryset: TaskQuerySet) -> TaskQuerySet:
|
LseProject = load_func(settings.LSE_PROJECT)
|
get_tasks_agreement_queryset = load_func(settings.GET_TASKS_AGREEMENT_QUERYSET)
|
|
is_lse_project = bool(LseProject)
|
has_custom_agreement_queryset = bool(get_tasks_agreement_queryset)
|
|
if is_lse_project and has_custom_agreement_queryset:
|
return annotated_completed_at_considering_agreement_threshold(queryset)
|
|
return base_annotate_completed_at(queryset)
|
|
|
def annotated_completed_at_considering_agreement_threshold(queryset):
|
LseProject = load_func(settings.LSE_PROJECT)
|
get_tasks_agreement_queryset = load_func(settings.GET_TASKS_AGREEMENT_QUERYSET)
|
|
is_lse_project = bool(LseProject)
|
has_custom_agreement_queryset = bool(get_tasks_agreement_queryset)
|
|
project_exists = is_lse_project and hasattr(queryset, 'project') and queryset.project is not None
|
|
project_id = queryset.project.id if project_exists else None
|
|
if project_id is None or not is_lse_project or not has_custom_agreement_queryset:
|
return base_annotate_completed_at(queryset)
|
|
lse_project = fast_first(
|
LseProject.objects.filter(project_id=project_id).values(
|
'agreement_threshold', 'max_additional_annotators_assignable'
|
)
|
)
|
|
agreement_threshold = lse_project['agreement_threshold'] if lse_project else None
|
if not lse_project or not agreement_threshold:
|
# This project doesn't use task_agreement so don't consider it when determining completed_at
|
return base_annotate_completed_at(queryset)
|
|
queryset = get_tasks_agreement_queryset(queryset)
|
max_additional_annotators_assignable = lse_project['max_additional_annotators_assignable']
|
|
if flag_set('fflag_fix_fit_1082_overlap_use_distinct_annotators', user='auto'):
|
completed_at_case = Case(
|
When(
|
# If agreement_threshold is set, evaluate all conditions
|
Q(is_labeled=True)
|
& (
|
Q(_agreement__gte=agreement_threshold)
|
| Q(annotator_count__gte=(F('overlap') + max_additional_annotators_assignable))
|
),
|
then=newest_annotation_subquery(),
|
),
|
default=Value(None),
|
output_field=DateTimeField(),
|
)
|
else:
|
completed_at_case = Case(
|
When(
|
# If agreement_threshold is set, evaluate all conditions
|
Q(is_labeled=True)
|
& (
|
Q(_agreement__gte=agreement_threshold)
|
| Q(annotation_count__gte=(F('overlap') + max_additional_annotators_assignable))
|
),
|
then=newest_annotation_subquery(),
|
),
|
default=Value(None),
|
output_field=DateTimeField(),
|
)
|
|
return queryset.annotate(completed_at=completed_at_case)
|
|
|
def annotate_storage_filename(queryset: TaskQuerySet) -> TaskQuerySet:
|
from label_studio.data_manager.functions import intersperse
|
|
storage_key_names = [F(s + '__key') for s in settings.IO_STORAGES_IMPORT_LINK_NAMES]
|
return queryset.annotate(
|
storage_filename=Concat(*intersperse(storage_key_names, Value(';')), output_field=TextField())
|
)
|
|
|
def annotate_annotations_results(queryset):
|
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
|
return queryset.annotate(
|
annotations_results=Coalesce(
|
GroupConcat('annotations__result'), Value(''), output_field=models.CharField()
|
)
|
)
|
else:
|
return queryset.annotate(annotations_results=ArrayAgg('annotations__result', distinct=True, default=Value([])))
|
|
|
def annotate_predictions_results(queryset):
|
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
|
return queryset.annotate(
|
predictions_results=Coalesce(
|
GroupConcat('predictions__result'), Value(''), output_field=models.CharField()
|
)
|
)
|
else:
|
return queryset.annotate(predictions_results=ArrayAgg('predictions__result', distinct=True, default=Value([])))
|
|
|
def annotate_annotators(queryset):
|
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
|
return queryset.annotate(
|
annotators=Coalesce(GroupConcat('annotations__completed_by'), Value(''), output_field=models.CharField())
|
)
|
else:
|
return queryset.annotate(annotators=ArrayAgg('annotations__completed_by', distinct=True, default=Value([])))
|
|
|
def annotate_predictions_score(queryset):
|
first_task = queryset.first()
|
if not first_task:
|
return queryset
|
|
# new approach with each ML backend contains it's version
|
if flag_set('ff_front_dev_1682_model_version_dropdown_070622_short', first_task.project.organization.created_by):
|
model_versions = list(
|
first_task.project.ml_backends.filter(project=first_task.project).values_list('model_version', flat=True)
|
)
|
if len(model_versions) == 0:
|
return queryset.annotate(predictions_score=Avg('predictions__score'))
|
|
else:
|
return queryset.annotate(
|
predictions_score=Avg('predictions__score', filter=Q(predictions__model_version__in=model_versions))
|
)
|
else:
|
model_version = first_task.project.model_version
|
if model_version is None:
|
return queryset.annotate(predictions_score=Avg('predictions__score'))
|
else:
|
return queryset.annotate(
|
predictions_score=Avg('predictions__score', filter=Q(predictions__model_version=model_version))
|
)
|
|
|
def annotate_annotations_ids(queryset):
|
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
|
return queryset.annotate(annotations_ids=GroupConcat('annotations__id', output_field=models.CharField()))
|
else:
|
return queryset.annotate(annotations_ids=ArrayAgg('annotations__id', default=Value([])))
|
|
|
def annotate_predictions_model_versions(queryset):
|
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
|
return queryset.annotate(
|
predictions_model_versions=GroupConcat('predictions__model_version', output_field=models.CharField())
|
)
|
else:
|
return queryset.annotate(predictions_model_versions=ArrayAgg('predictions__model_version', default=Value([])))
|
|
|
def annotate_avg_lead_time(queryset):
|
return queryset.annotate(avg_lead_time=Avg('annotations__lead_time'))
|
|
|
def annotate_draft_exists(queryset):
|
from tasks.models import AnnotationDraft
|
|
return queryset.annotate(draft_exists=Exists(AnnotationDraft.objects.filter(task=OuterRef('pk'))))
|
|
|
def file_upload(queryset):
|
return queryset.annotate(file_upload_field=F('file_upload__file'))
|
|
|
def dummy(queryset):
|
return queryset
|
|
|
def annotate_state(queryset):
|
"""
|
Annotate queryset with FSM state as 'state' field.
|
|
Uses FSMStateQuerySetMixin.with_state() to efficiently annotate
|
the current state without causing N+1 queries. Aliases 'current_state' to
|
'state' to match the Data Manager column name.
|
|
Note: Feature flag checks and user context validation are handled by
|
with_state() itself, so no additional checks are needed here.
|
"""
|
# Use the mixin's with_state() method which creates 'current_state' annotation
|
# (includes feature flag and user context checks)
|
queryset = queryset.with_state()
|
|
# Alias 'current_state' to 'state' for Data Manager column compatibility
|
# Only add the alias if current_state was actually added (feature flags enabled)
|
if 'current_state' in queryset.query.annotations:
|
return queryset.annotate(state=F('current_state'))
|
|
return queryset
|
|
|
settings.DATA_MANAGER_ANNOTATIONS_MAP = {
|
'avg_lead_time': annotate_avg_lead_time,
|
'completed_at': annotate_completed_at,
|
'annotations_results': annotate_annotations_results,
|
'predictions_results': annotate_predictions_results,
|
'predictions_model_versions': annotate_predictions_model_versions,
|
'predictions_score': annotate_predictions_score,
|
'annotators': annotate_annotators,
|
'annotations_ids': annotate_annotations_ids,
|
'file_upload': file_upload,
|
'draft_exists': annotate_draft_exists,
|
'storage_filename': annotate_storage_filename,
|
'state': annotate_state,
|
}
|
|
|
def get_annotations_map():
|
return settings.DATA_MANAGER_ANNOTATIONS_MAP
|
|
|
def update_annotation_map(obj):
|
settings.DATA_MANAGER_ANNOTATIONS_MAP.update(obj)
|
|
|
class PreparedTaskManager(models.Manager):
|
"""
|
Manager for Task model with Data Manager annotations.
|
|
Provides:
|
- Advanced query annotations for Data Manager
|
- Filter and ordering support
|
- FSM state annotation support (via TaskQuerySet)
|
|
Note: Overrides the base get_queryset() to return TaskQuerySet. Also has
|
a custom get_queryset(fields_for_evaluation, prepare_params, ...) method
|
for Data Manager-specific functionality.
|
"""
|
|
@staticmethod
|
def annotate_queryset(
|
queryset, fields_for_evaluation=None, all_fields=False, excluded_fields_for_evaluation=None, request=None
|
):
|
annotations_map = get_annotations_map()
|
|
if fields_for_evaluation is None:
|
fields_for_evaluation = []
|
|
if excluded_fields_for_evaluation is None:
|
excluded_fields_for_evaluation = []
|
|
first_task = queryset.first()
|
project = None if first_task is None else first_task.project
|
|
# db annotations applied only if we need them in ordering or filters
|
for field in annotations_map.keys():
|
# Include field if it's explicitly requested or all_fields=True, but exclude if it's in the exclusion list
|
if (field in fields_for_evaluation or all_fields) and field not in excluded_fields_for_evaluation:
|
queryset.project = project
|
queryset.request = request
|
function = annotations_map[field]
|
queryset = function(queryset)
|
|
return queryset
|
|
def get_queryset(
|
self, fields_for_evaluation=None, prepare_params=None, all_fields=False, excluded_fields_for_evaluation=None
|
):
|
"""
|
Get queryset with optional Data Manager annotations and filters.
|
|
When called without parameters (Django internal use), returns TaskQuerySet.
|
When called with parameters (Data Manager use), returns annotated and filtered queryset.
|
|
:param fields_for_evaluation: list of annotated fields in task
|
:param prepare_params: filters, ordering, selected items
|
:param all_fields: evaluate all fields for task
|
:param excluded_fields_for_evaluation: list of fields to exclude even when all_fields=True
|
:param request: request for user extraction
|
:return: task queryset with annotated fields
|
"""
|
# If called without parameters, return base TaskQuerySet (for Django internal use)
|
if prepare_params is None:
|
return TaskQuerySet(self.model, using=self._db)
|
|
# Otherwise, use Data Manager filtering and annotation
|
queryset = self.only_filtered(prepare_params=prepare_params)
|
# Expose view data to annotation functions for column-specific configuration
|
queryset.view_data = getattr(prepare_params, 'data', None)
|
return self.annotate_queryset(
|
queryset,
|
fields_for_evaluation=fields_for_evaluation,
|
all_fields=all_fields,
|
excluded_fields_for_evaluation=excluded_fields_for_evaluation,
|
request=prepare_params.request,
|
)
|
|
def only_filtered(self, prepare_params=None):
|
request = prepare_params.request
|
# Support both single and multiple projects
|
if prepare_params.is_multi_project:
|
queryset = TaskQuerySet(self.model).filter(project__in=prepare_params.projects)
|
else:
|
queryset = TaskQuerySet(self.model).filter(project=prepare_params.project)
|
fields_for_filter_ordering = get_fields_for_filter_ordering(prepare_params)
|
queryset = self.annotate_queryset(queryset, fields_for_evaluation=fields_for_filter_ordering, request=request)
|
return queryset.prepared(prepare_params=prepare_params)
|
|
|
class TaskManager(models.Manager):
|
"""
|
Default manager for Task model.
|
|
Provides:
|
- User-scoped filtering
|
- Custom QuerySet with FSM state support
|
|
Note: Overrides get_queryset() to return TaskQuerySet, which includes
|
FSMStateQuerySetMixin for state annotation support.
|
"""
|
|
def get_queryset(self):
|
"""Return TaskQuerySet which includes FSM state annotation support"""
|
return TaskQuerySet(self.model, using=self._db)
|
|
def for_user(self, user):
|
return self.get_queryset().filter(project__organization=user.active_organization)
|
|
def with_state(self):
|
"""Return queryset with FSM state annotated."""
|
return self.get_queryset().with_state()
|