"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import logging import os import re import shutil import tempfile from copy import deepcopy from datetime import datetime, timedelta from pathlib import Path from types import SimpleNamespace from unittest.mock import MagicMock import boto3 import mock import pytest import requests_mock import ujson as json from botocore.exceptions import ClientError from django.conf import settings from freezegun import freeze_time from moto import mock_s3 from organizations.models import Organization from projects.models import Project from tasks.models import Task from users.models import User # if we haven't this package, pytest.ini::env doesn't work try: import pytest_env.plugin # noqa: F401 except ImportError: print('\n\n !!! Please, pip install pytest-env \n\n') exit(-100) from label_studio.tests.sdk.fixtures import * # noqa: F403 from .utils import ( azure_client_mock, create_business, gcs_client_mock, import_from_url_mock, make_project, ml_backend_mock, redis_client_mock, register_ml_backend_mock, signin, ) boto3.set_stream_logger('botocore.credentials', logging.DEBUG) @pytest.fixture(autouse=True) def set_test_password_hasher(settings): """ Set the password hasher to less expensive MD5 for testing purposes. """ settings.PASSWORD_HASHERS = ['django.contrib.auth.hashers.MD5PasswordHasher'] @pytest.fixture(autouse=False) def enable_csrf(settings): settings.USE_ENFORCE_CSRF_CHECKS = True @pytest.fixture(autouse=False) def label_stream_history_limit(settings): settings.LABEL_STREAM_HISTORY_LIMIT = 1 @pytest.fixture(autouse=True) def disable_sentry(settings): settings.SENTRY_RATE = 0 settings.SENTRY_DSN = None @pytest.fixture() def debug_modal_exceptions_false(settings): settings.DEBUG_MODAL_EXCEPTIONS = False @pytest.fixture(scope='function') def enable_sentry(): settings.SENTRY_RATE = 0 # it's disabled key, but this is correct settings.SENTRY_DSN = 'https://44f7a50de5ab425ca6bc406ef69b2122@o227124.ingest.sentry.io/5820521' @pytest.fixture(scope='function') def aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ['AWS_ACCESS_KEY_ID'] = 'testing' os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing' os.environ['AWS_SECURITY_TOKEN'] = 'testing' os.environ['AWS_SESSION_TOKEN'] = 'testing' @pytest.fixture(autouse=True, scope='session') def azure_credentials(): """Mocked Azure credentials""" os.environ['AZURE_BLOB_ACCOUNT_NAME'] = 'testing' os.environ['AZURE_BLOB_ACCOUNT_KEY'] = 'testing' @pytest.fixture(scope='function') def s3(aws_credentials): with mock_s3(): yield boto3.client('s3', region_name='us-east-1') @pytest.fixture(autouse=True) def s3_with_images(s3): """ Bucket structure: s3://pytest-s3-images/image1.jpg s3://pytest-s3-images/subdir/image1.jpg s3://pytest-s3-images/subdir/image2.jpg """ bucket_name = 'pytest-s3-images' s3.create_bucket(Bucket=bucket_name) s3.put_object(Bucket=bucket_name, Key='image1.jpg', Body='123') s3.put_object(Bucket=bucket_name, Key='subdir/image1.jpg', Body='456') s3.put_object(Bucket=bucket_name, Key='subdir/image2.jpg', Body='789') s3.put_object(Bucket=bucket_name, Key='subdir/another/image2.jpg', Body='0ab') yield s3 def s3_remove_bucket(): """ Remove pytest-s3-images """ bucket_name = 'pytest-s3-images' _s3 = boto3.client('s3', region_name='us-east-1') _s3.delete_object(Bucket=bucket_name, Key='image1.jpg') _s3.delete_object(Bucket=bucket_name, Key='subdir/image1.jpg') _s3.delete_object(Bucket=bucket_name, Key='subdir/image2.jpg') _s3.delete_object(Bucket=bucket_name, Key='subdir/another/image2.jpg') _s3.delete_bucket(Bucket=bucket_name) return '' @pytest.fixture(autouse=True) def s3_with_jsons(s3): bucket_name = 'pytest-s3-jsons' s3.create_bucket(Bucket=bucket_name) s3.put_object(Bucket=bucket_name, Key='test.json', Body=json.dumps({'image_url': 'http://ggg.com/image.jpg'})) yield s3 @pytest.fixture(autouse=True) def s3_with_hypertext_s3_links(s3): bucket_name = 'pytest-s3-jsons-hypertext' s3.create_bucket(Bucket=bucket_name) s3.put_object( Bucket=bucket_name, Key='test.json', Body=json.dumps( {'text': ''} ), ) yield s3 @pytest.fixture(autouse=True) def s3_with_partially_encoded_s3_links(s3): bucket_name = 'pytest-s3-json-partially-encoded' s3.create_bucket(Bucket=bucket_name) s3.put_object( Bucket=bucket_name, Key='test.json', Body=json.dumps( { 'text': '' } ), ) yield s3 @pytest.fixture(autouse=True) def s3_with_unexisted_links(s3): bucket_name = 'pytest-s3-jsons-unexisted_links' s3.create_bucket(Bucket=bucket_name) s3.put_object(Bucket=bucket_name, Key='some-existed-image.jpg', Body='qwerty') yield s3 @pytest.fixture(autouse=True) def s3_export_bucket(s3): bucket_name = 'pytest-export-s3-bucket' s3.create_bucket(Bucket=bucket_name) yield s3 @pytest.fixture(autouse=True) def s3_export_bucket_sse(s3): bucket_name = 'pytest-export-s3-bucket-with-sse' s3.create_bucket(Bucket=bucket_name) # Set the bucket policy policy = { 'Version': '2012-10-17', 'Statement': [ { 'Effect': 'Deny', 'Principal': '*', 'Action': 's3:PutObject', 'Resource': [f'arn:aws:s3:::{bucket_name}', f'arn:aws:s3:::{bucket_name}/*'], 'Condition': {'StringNotEquals': {'s3:x-amz-server-side-encryption': 'AES256'}}, }, { 'Effect': 'Deny', 'Principal': '*', 'Action': 's3:PutObject', 'Resource': [f'arn:aws:s3:::{bucket_name}', f'arn:aws:s3:::{bucket_name}/*'], 'Condition': {'Null': {'s3:x-amz-server-side-encryption': 'true'}}, }, { 'Effect': 'Deny', 'Principal': '*', 'Action': 's3:*', 'Resource': [f'arn:aws:s3:::{bucket_name}', f'arn:aws:s3:::{bucket_name}/*'], 'Condition': {'Bool': {'aws:SecureTransport': 'false'}}, }, ], } s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy)) yield s3 @pytest.fixture(autouse=True) def s3_export_bucket_kms(s3): bucket_name = 'pytest-export-s3-bucket-with-kms' s3.create_bucket(Bucket=bucket_name) # Set the bucket policy policy = { 'Version': '2012-10-17', 'Statement': [ { 'Effect': 'Deny', 'Principal': '*', 'Action': 's3:PutObject', 'Resource': [f'arn:aws:s3:::{bucket_name}', f'arn:aws:s3:::{bucket_name}/*'], 'Condition': {'StringNotEquals': {'s3:x-amz-server-side-encryption': 'aws:kms'}}, }, { 'Effect': 'Deny', 'Principal': '*', 'Action': 's3:PutObject', 'Resource': [f'arn:aws:s3:::{bucket_name}', f'arn:aws:s3:::{bucket_name}/*'], 'Condition': {'Null': {'s3:x-amz-server-side-encryption': 'true'}}, }, { 'Effect': 'Deny', 'Principal': '*', 'Action': 's3:*', 'Resource': [f'arn:aws:s3:::{bucket_name}', f'arn:aws:s3:::{bucket_name}/*'], 'Condition': {'Bool': {'aws:SecureTransport': 'false'}}, }, ], } s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy)) yield s3 def mock_put(*args, **kwargs): client_error = ClientError( error_response={'Error': {'Code': 'AccessDenied', 'Message': 'Access Denied'}}, operation_name='PutObject' ) if kwargs['ServerSideEncryption'] == 'AES256': if 'ServerSideEncryption' not in kwargs: raise client_error elif kwargs['ServerSideEncryption'] == 'aws:kms': if 'ServerSideEncryption' not in kwargs or 'SSEKMSKeyId' not in kwargs: raise client_error else: raise client_error @pytest.fixture() def mock_s3_resource_aes(mocker): mock_object = MagicMock() mock_object.put = mock_put mock_object_constructor = MagicMock() mock_object_constructor.return_value = mock_object mock_s3_resource = MagicMock() mock_s3_resource.Object = mock_object_constructor # Patch boto3.Session.resource to return the mock s3 resource mocker.patch('boto3.Session.resource', return_value=mock_s3_resource) @pytest.fixture() def mock_s3_resource_kms(mocker): mock_object = MagicMock() mock_object.put = mock_put mock_object_constructor = MagicMock() mock_object_constructor.return_value = mock_object mock_s3_resource = MagicMock() mock_s3_resource.Object = mock_object_constructor # Patch boto3.Session.resource to return the mock s3 resource mocker.patch('boto3.Session.resource', return_value=mock_s3_resource) @pytest.fixture(autouse=True) def gcs_client(): # be careful, this is a global fixture and will affect all tests # because it will be applied to all tests that use gcs_client # and it may lead to flaky tests if the sample blob names are not deterministic with gcs_client_mock(): yield @pytest.fixture(autouse=True) def azure_client(): with azure_client_mock(): yield @pytest.fixture(autouse=True) def redis_client(): with redis_client_mock(): yield @pytest.fixture def ml_backend_for_test_predict(ml_backend): # ML backend with single prediction per task register_ml_backend_mock( ml_backend, url='http://test.ml.backend.for.sdk.com:9092', predictions={ 'results': [ { 'model_version': 'ModelSingle', 'score': 0.1, 'result': [ {'from_name': 'label', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['label_A']}} ], }, ] }, ) # ML backend with multiple predictions per task register_ml_backend_mock( ml_backend, url='http://test.ml.backend.for.sdk.com:9093', predictions={ 'results': [ [ { 'model_version': 'ModelA', 'score': 0.2, 'result': [ { 'from_name': 'label', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['label_A']}, } ], }, { 'model_version': 'ModelB', 'score': 0.3, 'result': [ { 'from_name': 'label', 'to_name': 'text', 'type': 'choices', 'value': {'choices': ['label_B']}, } ], }, ] ] }, ) yield ml_backend @pytest.fixture(autouse=True) def ml_backend(): with ml_backend_mock() as m: yield m @pytest.fixture(name='import_from_url') def import_from_url(): with import_from_url_mock() as m: yield m @pytest.fixture(autouse=True) def ml_backend_1(ml_backend): register_ml_backend_mock( ml_backend, url='https://test.heartex.mlbackend.com:9090', setup_model_version='Fri Feb 19 17:10:44 2021' ) register_ml_backend_mock(ml_backend, url='https://test.heartex.mlbackend.com:9091', health_connect_timeout=True) register_ml_backend_mock(ml_backend, url='http://localhost:8999', predictions={'results': []}) yield ml_backend def pytest_configure(): for q in settings.RQ_QUEUES.values(): q['ASYNC'] = False # Reload django-rq module to pick up the ASYNC=False changes in django-rq 3.x try: import importlib import django_rq.queues as dq importlib.reload(dq) except ImportError: # django_rq might not be installed or imported yet pass class URLS: """This class keeps urls with api""" def __init__(self): self.project_create = '/api/projects/' self.task_bulk = None def set_project(self, pk): self.task_bulk = f'/api/projects/{pk}/tasks/bulk/' self.plots = f'/projects/{pk}/plots' def project_ranker(): label = """ """ return {'label_config': label, 'title': 'test'} def project_dialog(): """Simple project with dialog configs :return: config of project with task """ label = """
""" return {'label_config': label, 'title': 'test'} def project_choices(): label = """ """ return {'label_config': label, 'title': 'test'} def setup_project(client, project_template, do_auth=True, legacy_api_tokens_enabled=False): """Create new test@gmail.com user, login via client, create test project. Project configs are thrown over params and automatically grabs from functions names started with 'project_' :param client: fixture with http client (from pytest-django package) and simulation of http server :param project_template: dict with project config :param do_auth: make authorization for creating user """ client = deepcopy(client) email = 'test@gmail.com' password = 'test' urls = URLS() project_config = project_template() # we work in empty database, so let's create business user and login user = User.objects.create(email=email) user.set_password(password) # set password without hash create_business(user) org = Organization.create_organization(created_by=user, title=user.first_name) if legacy_api_tokens_enabled: org.jwt.legacy_api_tokens_enabled = True org.jwt.save() user.active_organization = org user.save() if do_auth: assert signin(client, email, password).status_code == 302 # create project with requests_mock.Mocker() as m: m.register_uri('POST', re.compile(r'ml\.heartex\.net/\d+/validate'), text=json.dumps({'status': 'ok'})) m.register_uri('GET', re.compile(r'ml\.heartex\.net/\d+/health'), text=json.dumps({'status': 'UP'})) r = client.post(urls.project_create, data=project_config) print('Project create with status code:', r.status_code) assert r.status_code == 201, 'Create project result should be redirect to the next page' # get project id and prepare url project = Project.objects.filter(title=project_config['title']).first() urls.set_project(project.pk) print('Project id:', project.id) client.project = project client.user = user client.urls = urls client.project_config = project_config client.org = org return client @pytest.fixture def setup_project_dialog(client): return setup_project(client, project_dialog) @pytest.fixture def setup_project_for_token(client): return setup_project(client, project_dialog, do_auth=False, legacy_api_tokens_enabled=True) @pytest.fixture def setup_project_ranker(client): return setup_project(client, project_ranker) @pytest.fixture def setup_project_choices(client): return setup_project(client, project_choices) @pytest.fixture() def contextlog_test_config(settings): """ Configure settings for contextlog tests in CI. Be sure that responses is activated in any testcase where this fixture is used. """ settings.COLLECT_ANALYTICS = True settings.CONTEXTLOG_SYNC = True settings.TEST_ENVIRONMENT = False settings.DEBUG_CONTEXTLOG = False @pytest.fixture def business_client(client): # we work in empty database, so let's create business user and login client = deepcopy(client) email = 'business@pytest.net' password = 'pytest' user = User.objects.create(email=email) user.set_password(password) # set password without hash business = create_business(user) user.save() org = Organization.create_organization(created_by=user, title=user.first_name) org.jwt.legacy_api_tokens_enabled = True org.jwt.save() client.business = business if business else SimpleNamespace(admin=user) client.team = None if business else SimpleNamespace(id=1) client.admin = user client.annotator = user client.user = user client.api_key = user.reset_token().key client.organization = org if signin(client, email, password).status_code != 302: print(f'User {user} failed to login!') return client @pytest.fixture def annotator_client(client): # we work in empty database, so let's create business user and login client = deepcopy(client) email = 'annotator@pytest.net' password = 'pytest' user = User.objects.create(email=email) user.set_password(password) # set password without hash user.save() create_business(user) Organization.create_organization(created_by=user, title=user.first_name) if signin(client, email, password).status_code != 302: print(f'User {user} failed to login!') client.user = user client.annotator = user return client @pytest.fixture def annotator2_client(client): # we work in empty database, so let's create business user and login client = deepcopy(client) email = 'annotator2@pytest.net' password = 'pytest' user = User.objects.create(email=email) user.set_password(password) # set password without hash user.save() create_business(user) Organization.create_organization(created_by=user, title=user.first_name) if signin(client, email, password).status_code != 302: print(f'User {user} failed to login!') client.user = user client.annotator = user return client @pytest.fixture(params=['business', 'annotator']) def any_client(request, business_client, annotator_client): if request.param == 'business': return business_client elif request.param == 'annotator': return annotator_client @pytest.fixture def configured_project(business_client, annotator_client): _project_for_text_choices_onto_A_B_classes = dict( title='Test', label_config=""" """, ) _2_tasks_with_textA_and_textB = [ {'meta_info': 'meta info A', 'text': 'text A'}, {'meta_info': 'meta info B', 'text': 'text B'}, ] # get user to be owner users = User.objects.filter(email='business@pytest.net') # TODO(nik): how to get proper email for business here? project = make_project(_project_for_text_choices_onto_A_B_classes, users[0]) assert project.ml_backends.first().url == 'http://localhost:8999' Task.objects.bulk_create([Task(data=task, project=project) for task in _2_tasks_with_textA_and_textB]) return project @pytest.fixture(name='django_live_url') def get_server_url(live_server): yield live_server.url @pytest.fixture(name='ff_front_dev_1682_model_version_dropdown_070622_short_off', autouse=True) def ff_front_dev_1682_model_version_dropdown_070622_short_off(): from core.feature_flags import flag_set def fake_flag_set(*args, **kwargs): if args[0] == 'ff_front_dev_1682_model_version_dropdown_070622_short': return False return flag_set(*args, **kwargs) with mock.patch('tasks.serializers.flag_set', wraps=fake_flag_set): yield @pytest.fixture(name='async_import_off', autouse=True) def async_import_off(): from core.feature_flags import flag_set def fake_flag_set(*args, **kwargs): return flag_set(*args, **kwargs) with mock.patch('data_import.api.flag_set', wraps=fake_flag_set): yield @pytest.fixture(autouse=True, scope='session') def set_feature_flag_envvar(): """ Automatically set the environment variable for all tests, including Tavern tests. """ os.environ['fflag_optic_all_optic_1938_storage_proxy'] = 'true' os.environ['fflag_feat_utc_210_prediction_validation_15082025'] = 'true' @pytest.fixture(name='fflag_feat_back_lsdv_3958_server_side_encryption_for_target_storage_short_on') def fflag_feat_back_lsdv_3958_server_side_encryption_for_target_storage_short_on(): from core.feature_flags import flag_set def fake_flag_set(*args, **kwargs): if args[0] == 'fflag_feat_back_lsdv_3958_server_side_encryption_for_target_storage_short': return True return flag_set(*args, **kwargs) with mock.patch('io_storages.s3.models.flag_set', wraps=fake_flag_set): yield @pytest.fixture(name='fflag_fix_all_lsdv_4813_async_export_conversion_22032023_short_on') def fflag_fix_all_lsdv_4813_async_export_conversion_22032023_short_on(): from core.feature_flags import flag_set def fake_flag_set(*args, **kwargs): if args[0] == 'fflag_fix_all_lsdv_4813_async_export_conversion_22032023_short': return True return flag_set(*args, **kwargs) with mock.patch('data_export.api.flag_set', wraps=fake_flag_set): yield @pytest.fixture(name='ff_back_dev_4664_remove_storage_file_on_export_delete_29032023_short_on') def ff_back_dev_4664_remove_storage_file_on_export_delete_29032023_short_on(): from core.feature_flags import flag_set def fake_flag_set(*args, **kwargs): if args[0] == 'ff_back_dev_4664_remove_storage_file_on_export_delete_29032023_short': return True return flag_set(*args, **kwargs) with mock.patch('data_export.api.flag_set', wraps=fake_flag_set): yield @pytest.fixture(name='local_files_storage') def local_files_storage(settings): settings.LOCAL_FILES_SERVING_ENABLED = True tempdir = Path(tempfile.gettempdir()) / Path('files') subdir = tempdir / Path('subdir') os.makedirs(str(subdir), exist_ok=True) test_image = Path(*'tests/test_suites/samples/test_image.png'.split('/')) shutil.copyfile(str(test_image), str(tempdir / Path('test_image1.png'))) shutil.copyfile(str(test_image), str(subdir / Path('test_image2.png'))) @pytest.fixture(name='local_files_document_root_tempdir') def local_files_document_root_tempdir(settings): tempdir = Path(tempfile.gettempdir()) settings.LOCAL_FILES_DOCUMENT_ROOT = tempdir.root @pytest.fixture(name='local_files_document_root_subdir') def local_files_document_root_subdir(settings): tempdir = Path(tempfile.gettempdir()) / Path('files') settings.LOCAL_FILES_DOCUMENT_ROOT = str(tempdir) @pytest.fixture def mock_ml_auto_update(name='mock_ml_auto_update'): url = 'http://localhost:9090' with requests_mock.Mocker(real_http=True) as m: m.register_uri( 'POST', f'{url}/setup', [ {'json': {'model_version': 'version1', 'status': 'ok'}, 'status_code': 200}, {'json': {'model_version': 'version1', 'status': 'ok'}, 'status_code': 200}, {'json': {'model_version': 'version1', 'status': 'ok'}, 'status_code': 200}, {'json': {'model_version': 'version2', 'status': 'ok'}, 'status_code': 200}, {'json': {'model_version': 'version3', 'status': 'ok'}, 'status_code': 200}, ], ) m.get(f'{url}/health', text=json.dumps({'status': 'UP'})) yield m @pytest.fixture(name='mock_ml_backend_auto_update_disabled') def mock_ml_backend_auto_update_disabled(): with ml_backend_mock(setup_model_version='version1') as m: m.register_uri( 'GET', 'http://localhost:9090/setup', [ {'json': {'model_version': '', 'status': 'ok'}, 'status_code': 200}, {'json': {'model_version': '2', 'status': 'ok'}, 'status_code': 200}, ], ) yield m freezer = None now = None @pytest.fixture(name='freeze_clock') def freeze_clock(): global freezer global now now = datetime.now() freezer = freeze_time(now) freezer.start() yield # teardown steps after yield freezer.stop() freezer = None now = None def tick_clock(_, seconds: int = 1) -> None: global freezer global now freezer.stop() now += timedelta(seconds=seconds) freezer = freeze_time(now) freezer.start() def freeze_datetime(response, utc_time: str) -> None: global freezer freezer.stop() freezer = freeze_time(utc_time) freezer.start() def pytest_collection_modifyitems(config, items): # This function is called by pytest after the collection of tests has been completed to modify their order # it is being used as a workaround for the fact the kms and aes mocks resist teardown and cause other test failures mock_tests = [] other_tests = [] for item in items: if 'mock_s3_resource_kms' in item.fixturenames or 'mock_s3_resource_aes' in item.fixturenames: mock_tests.append(item) else: other_tests.append(item) items[:] = other_tests + mock_tests