import json from unittest import TestCase import pytest import requests import requests_mock from django.urls import reverse from projects.models import Project from webhooks.models import Webhook, WebhookAction from webhooks.utils import emit_webhooks, emit_webhooks_for_instance, run_webhook @pytest.fixture def organization_webhook(configured_project): organization = configured_project.organization uri = 'http://127.0.0.1:8000/api/organization/' return Webhook.objects.create( organization=organization, project=None, url=uri, ) @pytest.fixture def project_webhook(configured_project): organization = configured_project.organization uri = 'http://127.0.0.1:8000/api/project/' return Webhook.objects.create( organization=organization, project=configured_project, url=uri, ) @pytest.fixture def ml_start_training_webhook(configured_project): organization = configured_project.organization uri = 'http://0.0.0.0:9090/webhook' return Webhook.objects.create( organization=organization, project=configured_project, url=uri, ) @pytest.mark.django_db def test_run_webhook(setup_project_dialog, organization_webhook): webhook = organization_webhook with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) run_webhook(organization_webhook, WebhookAction.PROJECT_CREATED, {'data': 'test'}) request_history = m.request_history assert len(request_history) == 1 assert request_history[0].method == 'POST' assert request_history[0].url == organization_webhook.url TestCase().assertDictEqual(request_history[0].json(), {'action': WebhookAction.PROJECT_CREATED, 'data': 'test'}) @pytest.mark.django_db def test_emit_webhooks(setup_project_dialog, organization_webhook): webhook = organization_webhook with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) emit_webhooks(webhook.organization, webhook.project, WebhookAction.PROJECT_CREATED, {'data': 'test'}) request_history = m.request_history assert len(request_history) == 1 assert request_history[0].method == 'POST' assert request_history[0].url == webhook.url TestCase().assertDictEqual(request_history[0].json(), {'action': WebhookAction.PROJECT_CREATED, 'data': 'test'}) @pytest.mark.django_db def test_emit_webhooks_for_instance(setup_project_dialog, organization_webhook): webhook = organization_webhook project_title = 'Projects 1' project = Project.objects.create(title=project_title) with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) emit_webhooks_for_instance( webhook.organization, webhook.project, WebhookAction.PROJECT_CREATED, instance=project ) assert len(m.request_history) == 1 assert m.request_history[0].method == 'POST' data = m.request_history[0].json() assert 'action' in data assert 'project' in data assert project_title == data['project']['title'] @pytest.mark.django_db def test_exception_catch(organization_webhook): webhook = organization_webhook with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url, exc=requests.exceptions.ConnectTimeout) result = run_webhook(webhook, WebhookAction.PROJECT_CREATED) assert result is None # PROJECT CREATE/UPDATE/DELETE API @pytest.mark.django_db def test_webhooks_for_projects(configured_project, business_client, organization_webhook): webhook = organization_webhook # create/update/delete project through API # PROJECT_CREATED with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post(reverse('projects:api:project-list')) assert response.status_code == 201 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.PROJECT_CREATED project_id = response.json()['id'] # PROJECT_UPDATED with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.patch( reverse('projects:api:project-detail', kwargs={'pk': project_id}), data=json.dumps({'title': 'Test title'}), content_type='application/json', ) assert response.status_code == 200 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.PROJECT_UPDATED assert r.json()['project']['title'] == 'Test title' # PROJECT_DELETED with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.delete( reverse('projects:api:project-detail', kwargs={'pk': project_id}), ) assert response.status_code == 204 assert len(list(filter(lambda x: x.url == organization_webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == organization_webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.PROJECT_DELETED assert r.json()['project']['id'] == project_id # TASK CREATE/DELETE API # WE DON'T SUPPORT UPDATE FOR TASK @pytest.mark.django_db def test_webhooks_for_tasks(configured_project, business_client, organization_webhook): webhook = organization_webhook # CREATE with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post( reverse('tasks:api:task-list'), data=json.dumps( { 'project': configured_project.id, 'data': {'meta_info': 'meta info A', 'text': 'text A'}, } ), content_type='application/json', ) assert response.status_code == 201 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.TASKS_CREATED assert 'tasks' in r.json() assert 'project' in r.json() # DELETE task_id = response.json()['id'] url = webhook.url with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', url) response = business_client.delete(reverse('tasks:api:task-detail', kwargs={'pk': task_id})) assert response.status_code == 204 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.TASKS_DELETED assert 'tasks' in r.json() assert 'project' in r.json() # TASK CREATE on IMPORT @pytest.mark.django_db def test_webhooks_for_tasks_import(configured_project, business_client, organization_webhook): from django.core.files.uploadedfile import SimpleUploadedFile webhook = organization_webhook IMPORT_CSV = 'tests/test_suites/samples/test_5.csv' with open(IMPORT_CSV, 'rb') as file_: data = SimpleUploadedFile('test_5.csv', file_.read(), content_type='multipart/form-data') with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post( f'/api/projects/{configured_project.id}/import', data={'csv_1': data}, format='multipart', ) assert response.status_code == 201 assert response.json()['task_count'] == 3 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.TASKS_CREATED assert 'tasks' in r.json() assert 'project' in r.json() assert len(r.json()['tasks']) == response.json()['task_count'] == 3 # ANNOTATION CREATE/UPDATE/DELETE @pytest.mark.django_db def test_webhooks_for_annotation(configured_project, business_client, organization_webhook): webhook = organization_webhook task = configured_project.tasks.all().first() # CREATE with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post( f'/api/tasks/{task.id}/annotations?project={configured_project.id}', data=json.dumps( { 'result': [ { 'value': {'choices': ['class_A']}, 'id': 'nJS76J03pi', 'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'origin': 'manual', } ], 'draft_id': 0, 'parent_prediction': None, 'parent_annotation': None, 'project': configured_project.id, } ), content_type='application/json', ) assert response.status_code == 201 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.ANNOTATION_CREATED annotation_id = response.json()['id'] # UPDATE POST with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.put( f'/api/annotations/{annotation_id}?project={configured_project.id}&taskId={task.id}', data=json.dumps( { 'result': [], } ), content_type='application/json', ) assert response.status_code == 200 response = business_client.patch( f'/api/annotations/{annotation_id}?project={configured_project.id}&taskId={task.id}', data=json.dumps( { 'result': [ { 'value': {'choices': ['class_B']}, 'id': 'nJS76J03pi', 'from_name': 'text_class', 'to_name': 'text', 'type': 'choices', 'origin': 'manual', } ], } ), content_type='application/json', ) assert response.status_code == 200 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 2 for r in list(filter(lambda x: x.url == webhook.url, m.request_history)): assert r.json()['action'] == WebhookAction.ANNOTATION_UPDATED assert 'task' in r.json() assert 'annotation' in r.json() assert 'project' in r.json() # DELETE with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.delete( f'/api/annotations/{annotation_id}', content_type='application/json', ) assert response.status_code == 204 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.ANNOTATIONS_DELETED assert 'annotations' in r.json() assert annotation_id == r.json()['annotations'][0]['id'] # ACTION: DELETE ANNOTATIONS @pytest.mark.django_db def test_webhooks_for_action_delete_tasks_annotations(configured_project, business_client, organization_webhook): webhook = organization_webhook # create annotations for tasks for task in configured_project.tasks.all(): response = business_client.post( f'/api/tasks/{task.id}/annotations?project={configured_project.id}', data=json.dumps({'result': [{'value': {'choices': ['class_B']}}]}), content_type='application/json', ) assert response.status_code == 201 with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post( f'/api/dm/actions?id=delete_tasks_annotations&project={configured_project.id}', data=json.dumps( { 'project': str(configured_project.id), 'selectedItems': {'all': True}, } ), content_type='application/json', ) assert response.status_code == 200 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.ANNOTATIONS_DELETED # ACTION: DELETE TASKS @pytest.mark.django_db def test_webhooks_for_action_delete_tasks(configured_project, business_client, organization_webhook): webhook = organization_webhook with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post( f'/api/dm/actions?id=delete_tasks&project={configured_project.id}', data=json.dumps( { 'project': str(configured_project.id), 'selectedItems': {'all': True}, } ), content_type='application/json', ) assert response.status_code == 200 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.TASKS_DELETED # CREATE TASKS FROM STORAGES @pytest.mark.django_db def test_webhooks_for_tasks_from_storages(configured_project, business_client, organization_webhook): webhook = organization_webhook # CREATE with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) add_url = '/api/storages/s3' payload = { 'bucket': 'pytest-s3-images', 'project': configured_project.id, 'title': 'Testing S3 storage (bucket from conftest.py)', 'use_blob_urls': True, 'presign_ttl': 3600, } add_response = business_client.post(add_url, data=json.dumps(payload), content_type='application/json') storage_pk = add_response.json()['id'] # Sync S3 Storage sync_url = f'/api/storages/s3/{storage_pk}/sync' business_client.post(sync_url) # assert response.status_code == 201 assert len(list(filter(lambda x: x.url == webhook.url, m.request_history))) == 1 r = list(filter(lambda x: x.url == webhook.url, m.request_history))[0] assert r.json()['action'] == WebhookAction.TASKS_CREATED assert 'tasks' in r.json() assert 'project' in r.json() @pytest.mark.django_db def test_start_training_webhook(setup_project_dialog, ml_start_training_webhook, business_client): """ 1. Setup: The test uses the project_webhook fixture, which assumes that a webhook is already configured for the project. 2. Mocking the POST Request: The requests_mock.Mocker is used to mock the POST request to the webhook URL. This is where you expect the START_TRAINING action to be sent. 3. Making the Request: The test makes a POST request to the /api/ml/{id}/train endpoint. Assertions: - The response status code is checked to ensure the request was successful. - It verifies that exactly one request was made to the webhook URL. - It checks that the request method was POST. - The request URL and the JSON payload are validated against expected values. """ from ml.models import MLBackend webhook = ml_start_training_webhook project = webhook.project ml = MLBackend.objects.create(project=project, url='http://0.0.0.0:9090') # Mock the POST request to the ML backend train endpoint with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) response = business_client.post( f'/api/ml/{ml.id}/train', data=json.dumps({'action': 'START_TRAINING'}), content_type='application/json', ) assert response.status_code == 200 request_history = m.request_history assert len(request_history) == 1 assert request_history[0].method == 'POST' assert request_history[0].url == webhook.url assert 'project' in request_history[0].json() assert request_history[0].json()['action'] == 'START_TRAINING' @pytest.mark.django_db def test_webhook_batching_with_feature_flag(configured_project, organization_webhook): """Test that webhooks are sent in batches when feature flag is enabled.""" from unittest.mock import patch from django.conf import settings from tasks.models import Task from webhooks.utils import emit_webhooks_for_instance_sync webhook = organization_webhook project = configured_project # Create multiple tasks to test batching tasks = [] for i in range(250): # Create more than WEBHOOK_BATCH_SIZE task = Task.objects.create(data={'text': f'Test task {i}'}, project=project) tasks.append(task) # Test with feature flag enabled with patch('webhooks.utils.flag_set') as mock_flag_set: mock_flag_set.return_value = True with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) # Set batch size to 100 for testing original_batch_size = getattr(settings, 'WEBHOOK_BATCH_SIZE', 100) settings.WEBHOOK_BATCH_SIZE = 100 try: emit_webhooks_for_instance_sync( webhook.organization, webhook.project, WebhookAction.TASKS_CREATED, instance=tasks ) # Should have 3 requests (250 tasks / 100 batch size = 3 batches) webhook_requests = list(filter(lambda x: x.url == webhook.url, m.request_history)) assert len(webhook_requests) == 3 # Check first batch has 100 tasks first_batch = webhook_requests[0].json() assert 'tasks' in first_batch assert len(first_batch['tasks']) == 100 # Check second batch has 100 tasks second_batch = webhook_requests[1].json() assert len(second_batch['tasks']) == 100 # Check third batch has 50 tasks (remaining) third_batch = webhook_requests[2].json() assert len(third_batch['tasks']) == 50 finally: settings.WEBHOOK_BATCH_SIZE = original_batch_size @pytest.mark.django_db def test_webhook_no_batching_without_feature_flag(configured_project, organization_webhook): """Test that webhooks are sent in single request when feature flag is disabled.""" from unittest.mock import patch from tasks.models import Task from webhooks.utils import emit_webhooks_for_instance_sync webhook = organization_webhook project = configured_project # Create multiple tasks tasks = [] for i in range(150): task = Task.objects.create(data={'text': f'Test task {i}'}, project=project) tasks.append(task) # Test with feature flag disabled with patch('webhooks.utils.flag_set') as mock_flag_set: mock_flag_set.return_value = False with requests_mock.Mocker(real_http=True) as m: m.register_uri('POST', webhook.url) emit_webhooks_for_instance_sync( webhook.organization, webhook.project, WebhookAction.TASKS_CREATED, instance=tasks ) # Should have only 1 request (no batching) webhook_requests = list(filter(lambda x: x.url == webhook.url, m.request_history)) assert len(webhook_requests) == 1 # Check all 150 tasks are in single request request_data = webhook_requests[0].json() assert 'tasks' in request_data assert len(request_data['tasks']) == 150