"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import json
import time
from unittest import mock
import pytest
from core.redis import redis_healthcheck
from django.apps import apps
from django.db.models import Q
from projects.models import Project
from tasks.models import Annotation, Prediction, Task
from .utils import (
_client_is_annotator,
invite_client_to_project,
make_annotation,
make_annotator,
make_project,
make_task,
)
_project_for_text_choices_onto_A_B_classes = dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
label_config="""
""",
)
@pytest.mark.parametrize(
'project_config, tasks, status_code, expected_response_value_set',
[
(
_project_for_text_choices_onto_A_B_classes,
[
{'data': {'meta_info': 'meta info A', 'text': 'text A'}},
{'data': {'meta_info': 'meta info B', 'text': 'text B'}},
],
200,
{'id': 'uncompleted_task_ids'},
),
(
_project_for_text_choices_onto_A_B_classes,
[
{
'data': {'meta_info': 'meta info A', 'text': 'text A'},
'annotations': [{'result': [{'r': 1}], 'ground_truth': False}],
},
{'data': {'meta_info': 'meta info B', 'text': 'text B'}},
],
200,
{'id': 'uncompleted_task_ids'},
),
(
_project_for_text_choices_onto_A_B_classes,
[
{
'data': {'meta_info': 'meta info A', 'text': 'text A'},
'annotations': [{'result': [{'r': 1}], 'ground_truth': False}],
},
{
'data': {'meta_info': 'meta info B', 'text': 'text B'},
'annotations': [{'result': [{'r': 2}], 'ground_truth': False}],
},
],
404,
{'detail': {'Not found.'}},
),
# ground truth task still should be sampled regardless of who is a creator
(
_project_for_text_choices_onto_A_B_classes,
[
{
'data': {'meta_info': 'meta info A', 'text': 'text A'},
'annotations': [{'result': [{'r': 1}], 'ground_truth': True}],
},
{
'data': {'meta_info': 'meta info B', 'text': 'text B'},
'annotations': [{'result': [{'r': 2}], 'ground_truth': False}],
},
],
404,
{'id': 'uncompleted_task_ids'},
),
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
label_config="""
""",
),
[{'data': {'location': 'London', 'text': 'text A'}}, {'data': {'location': 'London', 'text': 'text B'}}],
200,
{'id': 'uncompleted_task_ids'},
),
],
)
@pytest.mark.django_db
def test_next_task(business_client, any_client, project_config, tasks, status_code, expected_response_value_set):
project = make_project(project_config, business_client.user)
if _client_is_annotator(any_client):
invite_client_to_project(any_client, project)
# upload tasks with annotations
r = business_client.post(
f'/api/projects/{project.id}/tasks/bulk', data=json.dumps(tasks), content_type='application/json'
)
assert r.status_code == 201
# make sure any annotation was made by current client
Annotation.objects.all().update(completed_by=any_client.annotator)
# collect uncompleted task ids to verify that only them are seen in the next labeling steps
uncompleted_task_ids = set()
for t in Task.objects.all():
if not t.annotations.filter(ground_truth=False).exists():
uncompleted_task_ids.add(t.id)
r = any_client.get(f'/api/projects/{project.id}/next')
assert r.status_code == status_code
rdata = json.loads(r.content)
if r.status_code != 404:
for response_key, expected_value_set in expected_response_value_set.items():
if expected_value_set == 'uncompleted_task_ids':
expected_value_set = uncompleted_task_ids
assert (
rdata[response_key] in expected_value_set
), f'Failed on response {rdata}: expecting value set "{expected_value_set}" for key "{response_key}"'
@pytest.mark.parametrize(
'project_config, tasks, predictions, annotations, num_annotators, status_code, prelabeling_result',
[
# no annotations, second task is chosen due to active learning
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
label_config="""
""",
),
[{'data': {'location': 'London', 'text': 'text A'}}, {'data': {'location': 'London', 'text': 'text B'}}],
[
{'result': [{'some': 'prediction A'}], 'score': 0.9, 'cluster': 0},
{'result': [{'some': 'prediction B'}], 'score': 0.5, 'cluster': 0},
],
[
None,
None,
],
1,
200,
[{'some': 'prediction B'}],
),
# no annotations, first task is chosen due to active learning
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
label_config="""
""",
),
[{'data': {'location': 'London', 'text': 'text A'}}, {'data': {'location': 'London', 'text': 'text B'}}],
[
{'result': [{'some': 'prediction A'}], 'score': 0.5, 'cluster': 0},
{'result': [{'some': 'prediction B'}], 'score': 0.9, 'cluster': 0},
],
[
None,
None,
],
1,
200,
[{'some': 'prediction A'}],
),
# first task annotation, third task is chosen due to active learning
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
maximum_annotations=1,
model_version='12345',
label_config="""
""",
),
[
{'data': {'location': 'London', 'text': 'text A'}},
{'data': {'location': 'London', 'text': 'text B'}},
{'data': {'location': 'London', 'text': 'text C'}},
],
[
{'result': [{'some': 'prediction A'}], 'score': 0.5, 'cluster': 0},
{'result': [{'some': 'prediction B'}], 'score': 0.9, 'cluster': 1},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': 1},
],
[
{'result': [{'some': 'prediction A'}]},
None,
None,
],
1,
200,
[{'some': 'prediction C'}],
),
# first task annotation, forth task is chosen due to active learning (though task with lowest score exists but in the same cluster)
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
label_config="""
""",
),
[
{'data': {'location': 'London', 'text': 'text A'}},
{'data': {'location': 'London', 'text': 'text A 2'}},
{'data': {'location': 'London', 'text': 'text B'}},
{'data': {'location': 'London', 'text': 'text C'}},
],
[
{'result': [{'some': 'prediction A'}], 'score': 0.5, 'cluster': 0},
{'result': [{'some': 'prediction A'}], 'score': 0.1, 'cluster': 0},
{'result': [{'some': 'prediction B'}], 'score': 0.9, 'cluster': 1},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': 1},
],
[
{'result': [{'some': 'prediction A'}]},
None,
None,
None,
],
1,
200,
[{'some': 'prediction C'}],
),
# lowest prediction is chosen from least solved cluster
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
label_config="""
""",
),
[
{'data': {'location': 'London', 'text': 'text A'}},
{'data': {'location': 'London', 'text': 'text A 2'}},
{'data': {'location': 'London', 'text': 'text A 3'}},
{'data': {'location': 'London', 'text': 'text B'}},
{'data': {'location': 'London', 'text': 'text C'}},
{'data': {'location': 'London', 'text': 'text C 2'}},
],
[
{'result': [{'some': 'prediction A'}], 'score': 0.5, 'cluster': 0},
{'result': [{'some': 'prediction A'}], 'score': 0.2, 'cluster': 0},
{'result': [{'some': 'prediction A1'}], 'score': 0.1, 'cluster': 0},
{'result': [{'some': 'prediction B'}], 'score': 0.9, 'cluster': 1},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': 1},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': 1},
],
[
{'result': [{'some': 'prediction A'}]},
None,
None,
None,
{'result': [{'some': 'prediction C'}]},
{'result': [{'some': 'prediction C'}]},
],
1,
200,
[{'some': 'prediction A1'}],
),
# first task annotation, labeling is continued with the same cluster
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
label_config="""
""",
),
[
{'data': {'location': 'London', 'text': 'text A'}},
{'data': {'location': 'London', 'text': 'text B'}},
{'data': {'location': 'London', 'text': 'text C'}},
],
[
{'result': [{'some': 'prediction A'}], 'score': 0.5, 'cluster': 0},
{'result': [{'some': 'prediction B'}], 'score': 0.9, 'cluster': 0},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': 0},
],
[
{'result': [{'some': 'prediction A'}]},
None,
None,
],
1,
200,
[{'some': 'prediction C'}],
),
# first task annotation, third task is chosen since cluster is marked as None (no clustering)
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
label_config="""
""",
),
[
{'data': {'location': 'London', 'text': 'text A'}},
{'data': {'location': 'London', 'text': 'text B'}},
{'data': {'location': 'London', 'text': 'text C'}},
],
[
{'result': [{'some': 'prediction A'}], 'score': 0.5, 'cluster': None},
{'result': [{'some': 'prediction B'}], 'score': 0.9, 'cluster': None},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': None},
],
[
{'result': [{'some': 'prediction A'}]},
None,
None,
],
1,
200,
[{'some': 'prediction C'}],
),
# when some of the tasks are partially labeled, regardless scores sampling operates on depth-first (try to complete all tasks asap)
(
dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
model_version='12345',
maximum_annotations=2,
label_config="""
""",
),
[
{'data': {'location': 'London', 'text': 'text A'}},
{'data': {'location': 'London', 'text': 'text B'}},
{'data': {'location': 'London', 'text': 'text C'}},
{'data': {'location': 'London', 'text': 'text D'}},
{'data': {'location': 'London', 'text': 'text E'}},
],
[
{'result': [{'some': 'prediction A'}], 'score': 0.6, 'cluster': None},
{'result': [{'some': 'prediction B'}], 'score': 0.5, 'cluster': None},
{'result': [{'some': 'prediction C'}], 'score': 0.8, 'cluster': None},
{'result': [{'some': 'prediction D'}], 'score': 0.4, 'cluster': None},
{'result': [{'some': 'prediction E'}], 'score': 0.2, 'cluster': None},
],
[{'result': [{'some': 'prediction A'}]}, None, None, None, None],
2,
200,
[{'some': 'prediction A'}],
),
],
ids=[
'no annotations, second task is chosen due to active learning',
'no annotations, first task is chosen due to active learning',
'first task annotation, third task is chosen due to active learning',
'first task annotation, forth task is chosen due to active learning (though task with lowest score exists but in the same cluster)',
'lowest prediction is chosen from least solved cluster',
'first task annotation, labeling is continued with the same cluster',
'first task annotation, third task is chosen since cluster is marked as None (no clustering)',
'when some of the tasks are partially labeled, regardless scores sampling operates on depth-first (try to complete all tasks asap)',
],
)
@pytest.mark.django_db
def test_next_task_with_active_learning(
mocker,
business_client,
any_client,
annotator2_client,
project_config,
tasks,
predictions,
annotations,
num_annotators,
status_code,
prelabeling_result,
):
project = make_project(project_config, business_client.user, use_ml_backend=False)
if _client_is_annotator(any_client):
invite_client_to_project(any_client, project)
if _client_is_annotator(annotator2_client):
invite_client_to_project(annotator2_client, project)
class MockAnnotatorCount:
def count(self):
return num_annotators
mocker.patch.object(Project, 'annotators', return_value=MockAnnotatorCount())
for task, prediction, annotation in zip(tasks, predictions, annotations):
task = make_task(task, project)
Prediction.objects.create(task=task, project=task.project, model_version=project.model_version, **prediction)
if annotation is not None:
completed_by = any_client.annotator if num_annotators == 1 else annotator2_client.annotator
Annotation.objects.create(task=task, completed_by=completed_by, project=project, **annotation)
r = any_client.get(f'/api/projects/{project.id}/next')
assert r.status_code == status_code
rdata = json.loads(r.content)
if r.status_code == 200:
assert rdata['predictions'][0]['result'] == prelabeling_result
@pytest.mark.django_db
def test_active_learning_with_uploaded_predictions(business_client):
config = dict(
title='Test',
is_published=True,
sampling=Project.UNCERTAINTY,
label_config="""
""",
)
project = make_project(config, business_client.user, use_ml_backend=False)
result = [{'from_name': 'text_class', 'to_name': 'location', 'type': 'choices', 'value': {'choices': ['class_A']}}]
tasks = [
{'data': {'text': 'score = 0.5'}, 'predictions': [{'result': result, 'score': 0.5}]},
{'data': {'text': 'score = 0.1'}, 'predictions': [{'result': result, 'score': 0.1}]},
{'data': {'text': 'score = 0.3'}, 'predictions': [{'result': result, 'score': 0.3}]},
{'data': {'text': 'score = 0.2'}, 'predictions': [{'result': result, 'score': 0.2}]},
{'data': {'text': 'score = 0.4'}, 'predictions': [{'result': result, 'score': 0.4}]},
]
# upload tasks with predictions
r = business_client.post(
f'/api/projects/{project.id}/tasks/bulk/', data=json.dumps(tasks), content_type='application/json'
)
assert r.status_code == 201
def get_next_task_id_and_complete_it():
r = business_client.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
task = json.loads(r.content)
# and completes it
r = business_client.post(
f'/api/tasks/{task["id"]}/annotations/', data={'task': task['id'], 'result': json.dumps(result)}
)
assert r.status_code == 201
return task['data']['text']
assert project.model_version == ''
# tasks will be shown according to the uploaded scores
assert get_next_task_id_and_complete_it() == 'score = 0.1'
assert get_next_task_id_and_complete_it() == 'score = 0.2'
assert get_next_task_id_and_complete_it() == 'score = 0.3'
assert get_next_task_id_and_complete_it() == 'score = 0.4'
assert get_next_task_id_and_complete_it() == 'score = 0.5'
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.parametrize('sampling', (Project.UNIFORM, Project.UNCERTAINTY, Project.SEQUENCE))
@pytest.mark.django_db
def test_label_races(configured_project, business_client, sampling):
config = dict(
title='test_label_races',
is_published=True,
label_config="""
""",
)
project = make_project(config, business_client.user)
project.sampling = sampling
project.save()
id1 = make_task({'data': {'text': 'aaa'}}, project).id
id2 = make_task({'data': {'text': 'bbb'}}, project).id
ann1 = make_annotator({'email': 'ann1@testlabelraces.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testlabelraces.com'}, project, True)
# ann1 takes task id1
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
selected_id = json.loads(r.content)['id']
if project.sampling in (Project.UNIFORM, Project.UNCERTAINTY):
assert selected_id in (id1, id2)
id2 = list({id1, id2} - {selected_id})[0]
else:
assert selected_id == id1
# ann2 takes task id2 because id1 is locked by ann1
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.parametrize('sampling', (Project.UNIFORM, Project.UNCERTAINTY, Project.SEQUENCE))
@pytest.mark.django_db
def test_label_races_after_all_taken(configured_project, business_client, sampling):
config = dict(
title='test_label_races',
is_published=True,
label_config="""
""",
)
project = make_project(config, business_client.user)
project.sampling = sampling
project.save()
id1 = make_task({'data': {'text': 'aaa'}}, project).id
id2 = make_task({'data': {'text': 'bbb'}}, project).id
ann1 = make_annotator({'email': 'ann1@testlabelracesalltaken.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testlabelracesalltaken.com'}, project, True)
# ann1 takes task id1
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
selected_id = json.loads(r.content)['id']
if project.sampling in (Project.UNIFORM, Project.UNCERTAINTY):
assert selected_id in (id1, id2)
id2 = list({id1, id2} - {selected_id})[0]
else:
assert selected_id == id1
id1 = selected_id
# ann2 takes task id2 because id1 is locked by ann1
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
# then ann2 takes id2 again
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
# ann1 takes id1
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id1
@pytest.mark.django_db
def test_breadth_first_simple(business_client):
config = dict(
title='test_label_races',
is_published=True,
maximum_annotations=2,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.SEQUENCE
project.save()
id1 = make_task({'data': {'text': 'aaa'}}, project).id
id2 = make_task({'data': {'text': 'bbb'}}, project).id
ann1 = make_annotator({'email': 'ann1@testbreadthfirst.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testbreadthfirst.com'}, project, True)
# ann1 takes first task
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id1
# and completes it
r = ann1.post(f'/api/tasks/{id1}/annotations/', data={'task': id1, 'result': annotation_result})
assert r.status_code == 201
# ann2 takes first task because maximum_annotations=2
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id1
# and completes it
r = ann2.post(f'/api/tasks/{id1}/annotations/', data={'task': id1, 'result': annotation_result})
assert r.status_code == 201
completed_task = Task.objects.get(id=id1)
assert completed_task.is_labeled
if apps.is_installed('businesses'):
assert completed_task.accuracy == 1.0
# ann2 takes second task because only one unlabeled left
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
@pytest.mark.django_db
def test_breadth_first_overlap_3(business_client):
config = dict(
title='test_label_races',
is_published=True,
maximum_annotations=3,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.UNIFORM
project.save()
def complete_task(annotator):
_r = annotator.get(f'/api/projects/{project.id}/next')
assert _r.status_code == 200
task_id = json.loads(_r.content)['id']
annotator.post(f'/api/tasks/{task_id}/annotations/', data={'task': task_id, 'result': annotation_result})
return task_id
make_task({'data': {'text': 'aaa'}}, project).id
make_task({'data': {'text': 'bbb'}}, project).id
make_task({'data': {'text': 'ccc'}}, project).id
ann1 = make_annotator({'email': 'ann1@testbreadthfirstoverlap3.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testbreadthfirstoverlap3.com'}, project, True)
ann3 = make_annotator({'email': 'ann3@testbreadthfirstoverlap3.com'}, project, True)
# ann1, ann2, ann3 should follow breadth-first scheme: trying to complete the tasks as fast as possible
task_id_ann1 = complete_task(ann1)
task_id_ann2 = complete_task(ann2)
assert task_id_ann2 == task_id_ann1
complete_task(ann1)
complete_task(ann1)
task_id_ann3 = complete_task(ann3)
assert task_id_ann2 == task_id_ann3
task_id_ann2 = complete_task(ann2)
task_id_ann3 = complete_task(ann3)
assert task_id_ann2 == task_id_ann3
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.django_db
def test_try_take_last_task_at_the_same_time(business_client):
config = dict(
title='test_try_take_last_task_at_the_same_time',
is_published=True,
maximum_annotations=2,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.SEQUENCE
project.save()
def complete_task(annotator):
_r = annotator.get(f'/api/projects/{project.id}/next')
assert _r.status_code == 200
task_id = json.loads(_r.content)['id']
annotator.post(f'/api/tasks/{task_id}/annotations/', data={'task': task_id, 'result': annotation_result})
return task_id
make_task({'data': {'text': 'aaa'}}, project)
make_task({'data': {'text': 'bbb'}}, project)
ann1 = make_annotator({'email': 'ann1@lasttask.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@lasttask.com'}, project, True)
ann3 = make_annotator({'email': 'ann3@lasttask.com'}, project, True)
# ann1, ann2 complete first task, then ann3 completes last task
complete_task(ann1)
complete_task(ann2)
complete_task(ann3)
# only one annotator can take the last task
_r = ann1.get(f'/api/projects/{project.id}/next')
assert _r.status_code == 200
_r = ann2.get(f'/api/projects/{project.id}/next')
assert _r.status_code == 404
_r = ann3.get(f'/api/projects/{project.id}/next')
assert _r.status_code == 404
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.django_db
def test_breadth_first_with_label_race(configured_project, business_client):
config = dict(
title='test_label_races',
is_published=True,
maximum_annotations=2,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.SEQUENCE
project.save()
id1 = make_task({'data': {'text': 'aaa'}}, project).id
id2 = make_task({'data': {'text': 'bbb'}}, project).id
ann1 = make_annotator({'email': 'ann1@testbreadthlabelraces.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testbreadthlabelraces.com'}, project, True)
# ann1 takes first task
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id1
# and completes it
r = ann1.post(f'/api/tasks/{id1}/annotations/', data={'task': id1, 'result': annotation_result})
assert r.status_code == 201
# ann1 takes second task and freezes
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
# ann2 takes first task because maximum_annotations=2
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id1
# and completes it
r = ann2.post(f'/api/tasks/{id1}/annotations/', data={'task': id1, 'result': annotation_result})
assert r.status_code == 201
completed_task = Task.objects.get(id=id1)
assert completed_task.is_labeled
if apps.is_installed('businesses'):
assert completed_task.accuracy == 1.0
# ann2 takes 2nd task because maximum_annotations=2
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
# ann1 takes second task again
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == id2
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.django_db
def test_label_race_with_overlap(configured_project, business_client):
"""
2 annotators takes and finish annotations one by one
depending on project settings overlap
create project
make annotation result
make 2 annotators
bulk create tasks
change project settings
check overlap
next annotate tasks
check code comments
"""
config = dict(
title='test_label_races',
is_published=True,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.SEQUENCE
project.save()
ann1 = make_annotator({'email': 'ann1@testlabelracewithoverlap.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testlabelracewithoverlap.com'}, project, True)
# create tasks
tasks = []
num_tasks = 2
for i in range(num_tasks):
tasks.append({'data': {'text': f'this is {str(i)}'}})
r = business_client.post(
f'/api/projects/{project.id}/tasks/bulk/', data=json.dumps(tasks), content_type='application/json'
)
assert r.status_code == 201
# set overlap
r = business_client.patch(
f'/api/projects/{project.id}/',
data=json.dumps({'maximum_annotations': 2, 'overlap_cohort_percentage': 50, 'show_overlap_first': True}),
content_type='application/json',
)
assert r.status_code == 200
t = Task.objects.filter(project=project.id).filter(overlap=2)
assert t.count() == 1
t1 = Task.objects.filter(project=project.id).filter(overlap=1)
assert t1.count() == 1
overlap_id = t.first().id
other_id = t1.first().id
# ann1 takes first task
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
# ann2 takes the same task, since overlap = 2
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
assert Task.objects.get(id=overlap_id).has_lock()
# ann1 takes next task, it is also overlapped because we force show_overlapped_first=True
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
# ann2 completes overlapped task
r = ann2.post(f'/api/tasks/{overlap_id}/annotations/', data={'task': overlap_id, 'result': annotation_result})
assert r.status_code == 201
# ann1 takes next task, and now it is overlapped, since lock was released by ann2 annotation
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
# ann1 completes overlapped task
r = ann1.post(f'/api/tasks/{overlap_id}/annotations/', data={'task': overlap_id, 'result': annotation_result})
assert r.status_code == 201
# ann1 takes next task, now it is another one since overlapped is labeled
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == other_id
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.django_db
def test_label_w_drafts_race_with_overlap(configured_project, business_client):
"""
2 annotators takes and leaves with draft annotations one by one
depending on project settings overlap
create project
make annotation result
make 2 annotators
bulk create tasks
change project settings
check overlap
next annotate tasks
check code comments
"""
config = dict(
title='test_label_races',
is_published=True,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.SEQUENCE
project.save()
ann1 = make_annotator({'email': 'ann1@testlabelracewdrafts.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testlabelracewdrafts.com'}, project, True)
# create tasks
tasks = []
num_tasks = 2
for i in range(num_tasks):
tasks.append({'data': {'text': f'this is {str(i)}'}})
r = business_client.post(
f'/api/projects/{project.id}/tasks/bulk/', data=json.dumps(tasks), content_type='application/json'
)
assert r.status_code == 201
# set overlap
r = business_client.patch(
f'/api/projects/{project.id}/',
data=json.dumps({'maximum_annotations': 2, 'overlap_cohort_percentage': 50, 'show_overlap_first': True}),
content_type='application/json',
)
assert r.status_code == 200
t = Task.objects.filter(project=project.id).filter(overlap=2)
assert t.count() == 1
t1 = Task.objects.filter(project=project.id).filter(overlap=1)
assert t1.count() == 1
overlap_id = t.first().id
other_id = t1.first().id
annotation_draft_result = {
'task': overlap_id,
'lead_time': 640.279,
'draft': json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
),
'result': json.dumps([]),
}
# ann1 takes first task
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
# ann2 takes the same task, since overlap = 2
r = ann2.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
assert Task.objects.get(id=overlap_id).has_lock()
# ann1 takes next task, it is also overlapped because we force show_overlapped_first=True
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
# ann2 send draft for overlapped task
r = ann2.post(f'/api/tasks/{overlap_id}/annotations/', data=annotation_draft_result)
assert r.status_code == 201
# ann1 takes next task, and now it is overlapped, since lock was released by ann2 annotation
# TODO was?
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
assert json.loads(r.content)['id'] == overlap_id
# ann1 completes overlapped task
r = ann1.post(f'/api/tasks/{overlap_id}/annotations/', data={'task': overlap_id, 'result': annotation_result})
assert r.status_code == 201
# ann1 takes next task, now it is another one since overlapped is labeled
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
print(overlap_id, other_id)
assert json.loads(r.content)['id'] == other_id
# try again
r = ann1.get(f'/api/projects/{project.id}/next')
assert r.status_code == 200
@pytest.mark.django_db
def test_fetch_final_taken_task(business_client):
config = dict(
title='test_label_races',
is_published=True,
label_config="""
""",
)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
project = make_project(config, business_client.user)
project.sampling = Project.SEQUENCE
project.save()
ann1 = make_annotator({'email': 'ann1@testfetchfinal.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testfetchfinal.com'}, project, True)
# create tasks
tasks = []
num_tasks = 2
for i in range(num_tasks):
tasks.append({'data': {'text': f'this is {str(i)}'}})
r = business_client.post(
f'/api/projects/{project.id}/tasks/bulk/', data=json.dumps(tasks), content_type='application/json'
)
assert r.status_code == 201
# set max annotations
r = business_client.patch(
f'/api/projects/{project.id}/', data=json.dumps({'maximum_annotations': 2}), content_type='application/json'
)
assert r.status_code == 200
print('ann1 takes any task and complete it')
r = ann1.get(f'/api/projects/{project.id}/next')
task_id = json.loads(r.content)['id']
ann1.post(f'/api/tasks/{task_id}/annotations/', data={'task': task_id, 'result': annotation_result})
print("ann2 takes the same task (because of depth-first) but just lock it - don't complete")
r = ann2.get(f'/api/projects/{project.id}/next')
assert json.loads(r.content)['id'] == task_id
print('ann1 takes another task')
r = ann1.get(f'/api/projects/{project.id}/next')
another_task_id = json.loads(r.content)['id']
assert another_task_id != task_id
print('ann1 should never take task_id since he has completed it')
for i in range(3):
r = ann1.get(f'/api/projects/{project.id}/next')
assert json.loads(r.content)['id'] == another_task_id
@pytest.mark.skipif(not redis_healthcheck(), reason='Multi user locks only supported with redis enabled')
@pytest.mark.django_db
def test_with_bad_annotation_result(business_client):
config = dict(
title='test_with_failed_matching_score',
is_published=True,
sampling=Project.SEQUENCE,
maximum_annotations=1,
label_config="""
""",
)
project = make_project(config, business_client.user, use_ml_backend=False)
bad_result = {
'id': 'Yv_lLEp_8I',
'type': 'polygonlabels',
'value': {'points': [[65.99824119670821, 73.11598603746282]], 'polygonlabels': ['t11']},
'source': '$image',
'to_name': 'img',
'from_name': 'tag',
'parent_id': None,
'image_rotation': 0,
'original_width': 4032,
'original_height': 3024,
}
good_result = {
'id': 'NsccF-AYMT',
'from_name': 'tag',
'to_name': 'img',
'source': '$image',
'type': 'polygonlabels',
'parent_id': None,
'value': {
'points': [
[35.48487164486663, 15.14455036952532],
[34.47935635946919, 13.997479425768038],
[33.617486114842826, 13.997479425768038],
[31.462810503276884, 15.20827653306739],
[30.170005136337327, 16.865156785161243],
[29.308134891710946, 18.64948936433924],
[29.02084481016883, 20.943631251853805],
[28.781436408883717, 23.174046975826304],
[29.403898252224984, 25.022105718546374],
[30.409413537622427, 25.65936735396709],
[31.893745625590064, 25.27701037271466],
[32.755615870216445, 24.958379555004303],
[34.28782963844111, 24.12993942895737],
[35.43698996460961, 23.110320812284233],
[36.442505250007045, 22.53678534040559],
[37.112848773605336, 21.32598823310624],
[36.873440372320225, 19.22302483621788],
[36.63403197103513, 17.69359691120817],
[36.25097852897896, 16.737704458077104],
],
'polygonlabels': ['t11'],
},
'original_width': 4032,
'original_height': 3024,
'image_rotation': 0,
}
num_annotators = 30
anns = []
for i in range(num_annotators):
anns.append(make_annotator({'email': f'ann{i}@testwithbadannotationresult.com'}, project, True))
# create one heavy task with many annotations - it's statistic recalculation should not be done after completing another task
# turn off statistics calculations for now
with mock.patch('tasks.models.update_project_summary_annotations_and_is_labeled'):
for i in range(10):
task = make_task({'data': {'image': f'https://data.s3.amazonaws.com/image/{i}.jpg'}}, project)
for i in range(num_annotators):
make_annotation(
{'result': [bad_result] * 10 + [good_result] * 10, 'completed_by': anns[i].annotator}, task.id
)
# create uncompleted task
uncompleted_task = make_task({'data': {'image': 'https://data.s3.amazonaws.com/image/uncompleted.jpg'}}, project)
print('ann1 takes any task with bad annotation and complete it')
r = anns[0].get(f'/api/projects/{project.id}/next')
task_id = json.loads(r.content)['id']
assert task_id == uncompleted_task.id
def make_async_annotation_submit(new_ann=None):
print('Async annotation submit')
if new_ann is None:
new_ann = make_annotator({'email': 'new_ann@testwithbadannotationresult.com'}, project, True)
new_ann.post(
f'/api/tasks/{task_id}/annotations/',
data={'task': task_id, 'result': json.dumps([good_result])},
)
assert uncompleted_task.has_lock()
# we are checking here that if we submit annotation for the current task,
# there is no any additional computational costs implied by statistics
# recalculation over the entire project
t = time.time()
make_async_annotation_submit(anns[0])
# TODO: measuring response time is not a good way to do that,
# but dunno how to emulate async requests or timeouts for Django test client
assert (
time.time() - t
) < 1, 'Time of annotation.submit() increases - that might be caused by redundant computations over the rest of the tasks - check that only a single task is affected by /api/tasks//annotations'
assert uncompleted_task.has_lock() # Task has lock since it has annotation
@pytest.mark.parametrize('setup_before_upload', (False, True))
@pytest.mark.parametrize('show_overlap_first', (False, True))
@pytest.mark.django_db
def test_overlap_first(business_client, setup_before_upload, show_overlap_first):
c = business_client
config = dict(
title='test_overlap_first',
is_published=True,
maximum_annotations=1,
show_overlap_first=show_overlap_first,
sampling='Uniform sampling',
label_config="""
""",
)
project = make_project(config, business_client.user)
annotation_result = json.dumps(
[{'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['class_A']}}]
)
num_tasks = 1000
overlap_cohort_percentage = 1
# set up tasks overlap
setup_after_upload = True
if setup_before_upload:
r = c.patch(
f'/api/projects/{project.id}/',
data=json.dumps({'maximum_annotations': 2, 'overlap_cohort_percentage': overlap_cohort_percentage}),
content_type='application/json',
)
assert r.status_code == 200
setup_after_upload = False
# create tasks
tasks = []
for i in range(num_tasks):
tasks.append({'data': {'text': f'this is {str(i)}'}})
r = business_client.post(
f'/api/projects/{project.id}/tasks/bulk/', data=json.dumps(tasks), content_type='application/json'
)
assert r.status_code == 201
if setup_after_upload:
r = c.patch(
f'/api/projects/{project.id}/',
data=json.dumps({'maximum_annotations': 2, 'overlap_cohort_percentage': overlap_cohort_percentage}),
content_type='application/json',
)
assert r.status_code == 200
expected_tasks_with_overlap = int(overlap_cohort_percentage / 100.0 * num_tasks)
assert Task.objects.filter(Q(project_id=project.id) & Q(overlap__gt=1)).count() == expected_tasks_with_overlap
def complete_task(annotator):
_r = annotator.get(f'/api/projects/{project.id}/next')
assert _r.status_code == 200
task_id = json.loads(_r.content)['id']
annotator.post(f'/api/tasks/{task_id}/annotations/', data={'task': task_id, 'result': annotation_result})
ann1 = make_annotator({'email': 'ann1@testoverlapfirst.com'}, project, True)
ann2 = make_annotator({'email': 'ann2@testoverlapfirst.com'}, project, True)
for i in range(expected_tasks_with_overlap):
complete_task(ann1), complete_task(ann2)
all_tasks_with_overlap_are_labeled = all(
t.is_labeled for t in Task.objects.filter(Q(project_id=project.id) & Q(overlap__gt=1))
)
all_tasks_without_overlap_are_not_labeled = all(
not t.is_labeled for t in Task.objects.filter(Q(project_id=project.id) & Q(overlap=1))
)
if show_overlap_first:
assert all_tasks_with_overlap_are_labeled
assert all_tasks_without_overlap_are_not_labeled
else:
assert not all_tasks_with_overlap_are_labeled
assert not all_tasks_without_overlap_are_not_labeled