import base64 import io import unittest from datetime import datetime from unittest.mock import MagicMock, patch import pytest from io_storages.proxy_api import ( ProjectResolveStorageUri, ResolveStorageUriAPIMixin, TaskResolveStorageUri, ) from projects.models import Project from rest_framework import status from rest_framework.response import Response from rest_framework.test import APIRequestFactory, force_authenticate from tasks.models import Task class TestResolveStorageUriAPIMixin(unittest.TestCase): def setUp(self): self.mixin = ResolveStorageUriAPIMixin() self.user = MagicMock() self.project = MagicMock() self.task = MagicMock() self.task.project = self.project # Set the __class__.__name__ to "Task" for type checks type(self.task).__name__ = 'Task' self.task.has_permission.return_value = True self.request = MagicMock() self.request.user = self.user self.storage = MagicMock() self.storage.presign = True def test_resolve_with_permission_denied(self): self.task.has_permission.return_value = False result = self.mixin.resolve(self.request, 'test_fileuri', self.task) assert result.status_code == status.HTTP_403_FORBIDDEN @patch('io_storages.proxy_api.flag_set') @patch('io_storages.proxy_api.get_storage_by_url') def test_resolve_with_base64_decoding(self, mock_get_storage, mock_flag_set): mock_flag_set.return_value = True mock_get_storage.return_value = self.storage fileuri = base64.urlsafe_b64encode(b'test_uri').decode() with patch.object(self.mixin, 'redirect_to_presign_url') as mock_redirect: mock_redirect.return_value = Response() self.mixin.resolve(self.request, fileuri, self.task) mock_redirect.assert_called_once_with('test_uri', self.task, 'Task') @patch('io_storages.proxy_api.flag_set') @patch('io_storages.proxy_api.get_storage_by_url') def test_resolve_with_url_unquote_fallback(self, mock_get_storage, mock_flag_set): mock_flag_set.return_value = True mock_get_storage.return_value = self.storage with patch.object(self.mixin, 'redirect_to_presign_url') as mock_redirect: mock_redirect.return_value = Response() # Non-base64 uri to trigger fallback self.mixin.resolve(self.request, 's3://bucket/file.jpg', self.task) mock_redirect.assert_called_once_with('s3://bucket/file.jpg', self.task, 'Task') @patch('io_storages.proxy_api.flag_set') @patch('io_storages.proxy_api.get_storage_by_url') def test_resolve_storage_not_found(self, mock_get_storage, mock_flag_set): mock_flag_set.return_value = True mock_get_storage.return_value = None result = self.mixin.resolve(self.request, 'fileuri', self.task) assert result.status_code == status.HTTP_404_NOT_FOUND @patch('io_storages.proxy_api.flag_set') @patch('io_storages.proxy_api.get_storage_by_url') def test_resolve_storage_no_presign_support(self, mock_get_storage, mock_flag_set): mock_flag_set.return_value = True mock_storage = MagicMock() delattr(mock_storage, 'presign') mock_get_storage.return_value = mock_storage result = self.mixin.resolve(self.request, 'fileuri', self.task) assert result.status_code == status.HTTP_404_NOT_FOUND @patch('io_storages.proxy_api.flag_set') @patch('io_storages.proxy_api.get_storage_by_url') def test_resolve_with_presign_true(self, mock_get_storage, mock_flag_set): mock_flag_set.return_value = True mock_storage = MagicMock() mock_storage.presign = True mock_get_storage.return_value = mock_storage with patch.object(self.mixin, 'redirect_to_presign_url') as mock_redirect: mock_redirect.return_value = Response() self.mixin.resolve(self.request, 'fileuri', self.task) mock_redirect.assert_called_once() @patch('io_storages.proxy_api.flag_set') @patch('io_storages.proxy_api.get_storage_by_url') def test_resolve_with_presign_false(self, mock_get_storage, mock_flag_set): mock_flag_set.return_value = True mock_storage = MagicMock() mock_storage.presign = False mock_get_storage.return_value = mock_storage project = self.task.project with patch.object(self.mixin, 'proxy_data_from_storage') as mock_proxy: mock_proxy.return_value = Response() self.mixin.resolve(self.request, 'fileuri', self.task) mock_proxy.assert_called_once_with(self.request, 'fileuri', project, mock_storage) def test_redirect_to_presign_url_success(self): self.task.resolve_storage_uri.return_value = {'url': 'https://example.com/file.jpg', 'presign_ttl': 60} result = self.mixin.redirect_to_presign_url('fileuri', self.task, 'Task') assert result.status_code == status.HTTP_303_SEE_OTHER assert result.url == 'https://example.com/file.jpg' assert result.headers['Cache-Control'] == 'no-store, max-age=3600' def test_redirect_to_presign_url_no_url(self): self.task.resolve_storage_uri.return_value = {'url': None} result = self.mixin.redirect_to_presign_url('fileuri', self.task, 'Task') assert result.status_code == status.HTTP_404_NOT_FOUND def test_redirect_to_presign_url_exception(self): self.task.resolve_storage_uri.side_effect = Exception('Error resolving URL') result = self.mixin.redirect_to_presign_url('fileuri', self.task, 'Task') assert result.status_code == status.HTTP_404_NOT_FOUND def test_proxy_data_from_storage_success(self): mock_storage = MagicMock() # Ensure get_bytes_stream returns a three-tuple, metadata can be empty initially mock_storage.get_bytes_stream.return_value = (io.BytesIO(b'test data'), 'image/jpeg', {}) mock_project = MagicMock() with patch('io_storages.proxy_api.StreamingHttpResponse') as mock_response_class, patch( 'io_storages.proxy_api.settings' ) as mock_settings: # Configure mock settings mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024 # 1MB mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192 mock_settings.RESOLVER_PROXY_CACHE_TIMEOUT = 3600 # Set up mock stream and response mock_stream = MagicMock() mock_metadata = { 'StatusCode': 200, 'ContentLength': 1000, 'LastModified': datetime.now(), 'ETag': '"abcdef123456"', } mock_storage.get_bytes_stream.return_value = (mock_stream, 'application/test', mock_metadata) # Set up mock response mock_response = MagicMock() mock_response.headers = {} mock_response_class.return_value = mock_response # Set up request with range header self.request.headers = {'Range': 'bytes=100-200'} # Call the method result = self.mixin.proxy_data_from_storage(self.request, 'uri', mock_project, mock_storage) # Verify the correct range header was passed mock_storage.get_bytes_stream.assert_called_once() args, kwargs = mock_storage.get_bytes_stream.call_args # First positional argument should be the URI self.assertEqual(args[0], 'uri') # Range header should be passed as a keyword argument and start with 'bytes=' self.assertIn('range_header', kwargs) self.assertTrue(kwargs['range_header'].startswith('bytes=')) # Verify the response was created with the stream mock_response_class.assert_called_once() # The first positional argument should be a generator returned by time_limited_chunker called_args, _ = mock_response_class.call_args streaming_generator = called_args[0] self.assertTrue( hasattr(streaming_generator, '__next__') and hasattr(streaming_generator, '__iter__'), 'Expected a generator for streaming chunks', ) # Verify correct headers are set self.assertEqual(result, mock_response) self.assertTrue('ETag' in mock_response.headers) def test_proxy_data_from_storage_no_data(self): mock_storage = MagicMock() # Return three-tuple with empty metadata when no data is available mock_storage.get_bytes_stream.return_value = (None, None, {}) mock_project = MagicMock() result = self.mixin.proxy_data_from_storage(self.request, 'uri', mock_project, mock_storage) assert result.status_code == status.HTTP_424_FAILED_DEPENDENCY def test_proxy_data_from_storage_exception(self): mock_storage = MagicMock() mock_storage.get_bytes_stream.side_effect = Exception('Storage error') mock_project = MagicMock() result = self.mixin.proxy_data_from_storage(self.request, 'uri', mock_project, mock_storage) assert result.status_code == status.HTTP_424_FAILED_DEPENDENCY def test_time_limited_chunker_normal_case(self): """Test time_limited_chunker when all chunks are processed within timeout""" # Create a mock stream with iter_chunks method mock_stream = MagicMock() mock_stream.iter_chunks.return_value = [b'chunk1', b'chunk2', b'chunk3'] # Set up settings with patch('io_storages.proxy_api.settings') as mock_settings, patch('time.monotonic') as mock_time: # Mock settings mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192 mock_settings.RESOLVER_PROXY_TIMEOUT = 20 # Mock time to simulate being within timeout # Add an extra value for the 'finally' block mock_time.side_effect = [0, 1, 2, 3, 4] # Start, three chunk iterations, finally # Run the chunker and collect all chunks chunks = list(self.mixin.time_limited_chunker(mock_stream)) # Verify all chunks were yielded assert chunks == [b'chunk1', b'chunk2', b'chunk3'] assert mock_stream.close.called def test_time_limited_chunker_timeout(self): """Test time_limited_chunker when timeout is reached during processing""" # Create a mock stream with iter_chunks method mock_stream = MagicMock() mock_stream.iter_chunks.return_value = [b'chunk1', b'chunk2', b'chunk3', b'chunk4', b'chunk5'] # Set up settings with patch('io_storages.proxy_api.settings') as mock_settings, patch('time.monotonic') as mock_time: # Mock settings mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192 mock_settings.RESOLVER_PROXY_TIMEOUT = 10 # Mock time to simulate exceeding timeout after second chunk # Add an extra value for the 'finally' block mock_time.side_effect = [0, 5, 9, 15, 20, 25] # Start time, chunk checks, finally # Run the chunker and collect all chunks chunks = list(self.mixin.time_limited_chunker(mock_stream)) # Verify only the chunks before timeout were yielded assert chunks == [b'chunk1', b'chunk2'] assert mock_stream.close.called def test_time_limited_chunker_exception(self): """Test time_limited_chunker when an exception occurs during streaming""" # Create a mock stream with iter_chunks method that raises an exception mock_stream = MagicMock() mock_stream.iter_chunks.side_effect = Exception('Streaming error') # Set up settings with patch('io_storages.proxy_api.settings') as mock_settings, patch('time.monotonic') as mock_time: # Mock settings mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192 mock_settings.RESOLVER_PROXY_TIMEOUT = 20 # Mock time - need two values: one for start and one for the finally block mock_time.side_effect = [0, 1] # Run the chunker and collect all chunks (should be empty due to exception) chunks = list(self.mixin.time_limited_chunker(mock_stream)) # Verify no chunks were yielded and the stream was closed assert chunks == [] assert mock_stream.close.called def test_override_range_header_no_header(self): """Test override_range_header when no Range header is present""" self.request.headers = {} result = self.mixin.override_range_header(self.request) assert result is None def test_override_range_header_header_probes(self): """Test override_range_header with header probe formats""" # Test bytes=0- self.request.headers = {'Range': 'bytes=0-'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024 # Mock the parse_range function to return a known value mock_parse_range.return_value = (0, '') result = self.mixin.override_range_header(self.request) assert result == 'bytes=0-' # Test bytes=0-0 self.request.headers = {'Range': 'bytes=0-0'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024 # Mock the parse_range function to return a known value mock_parse_range.return_value = (0, 0) result = self.mixin.override_range_header(self.request) assert result == 'bytes=0-0' def test_override_range_header_start_no_end(self): """Test override_range_header with a start position but no end""" # Case: bytes=100- self.request.headers = {'Range': 'bytes=100-'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024 # 1MB # Mock the parse_range function to return a known value mock_parse_range.return_value = (100, '') result = self.mixin.override_range_header(self.request) # Should add MAX_RANGE_SIZE to start assert result == f'bytes=100-{100 + 1024*1024}' # Case: bytes=100-0 (treated like bytes=100-) self.request.headers = {'Range': 'bytes=100-0'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024 # 1MB # Mock the parse_range function to return a known value mock_parse_range.return_value = (100, 0) result = self.mixin.override_range_header(self.request) # Should add MAX_RANGE_SIZE to start assert result == f'bytes=100-{100 + 1024*1024}' def test_override_range_header_start_and_end(self): """Test override_range_header with start and end positions""" # Case: Range within limit self.request.headers = {'Range': 'bytes=100-5000'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000 # 10KB # Mock the parse_range function to return a known value mock_parse_range.return_value = (100, 5000) result = self.mixin.override_range_header(self.request) # Should remain unchanged as it's within limit assert result == 'bytes=100-5000' # Case: Range exceeding limit self.request.headers = {'Range': 'bytes=100-20000'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000 # 10KB # Mock the parse_range function to return a known value mock_parse_range.return_value = (100, 20000) result = self.mixin.override_range_header(self.request) # Should limit the range to MAX_RANGE_SIZE from start assert result == f'bytes=100-{100 + 10000}' def test_override_range_header_negative_start(self): """Test override_range_header with negative start position""" self.request.headers = {'Range': 'bytes=-1024'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000 # 10KB # Mock the parse_range function to return a negative start mock_parse_range.return_value = (-1024, None) result = self.mixin.override_range_header(self.request) # Should reset to 0 and add MAX_RANGE_SIZE assert result == f'bytes=0-{10000}' def test_override_range_header_unsupported_format(self): """Test override_range_header with unsupported range format""" self.request.headers = {'Range': 'invalid-range-format'} with patch('io_storages.proxy_api.settings') as mock_settings, patch( 'io_storages.proxy_api.parse_range' ) as mock_parse_range: mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000 # 10KB # Mock parse_range to simulate failure with invalid format mock_parse_range.return_value = (0, None) result = self.mixin.override_range_header(self.request) # Should reset to default assert result == 'bytes=0-' class TestTaskResolveStorageUri: @pytest.fixture def setup(self): # Create the necessary objects for testing without database self.factory = APIRequestFactory() self.user = MagicMock() self.task = MagicMock(spec=Task) self.view = TaskResolveStorageUri.as_view() @patch('io_storages.proxy_api.Task.objects.get') def test_get_with_missing_params(self, mock_task_get, setup): # Mock the database query mock_task_get.return_value = self.task # Test missing fileuri parameter request = self.factory.get('/task/1/resolve/') force_authenticate(request, user=self.user) response = self.view(request, task_id=1) assert response.status_code == status.HTTP_400_BAD_REQUEST # Test missing task_id parameter request = self.factory.get('/task/resolve/?fileuri=test') force_authenticate(request, user=self.user) response = self.view(request) assert response.status_code == status.HTTP_400_BAD_REQUEST @patch('io_storages.proxy_api.Task.objects.get') def test_get_task_not_found(self, mock_task_get, setup): # Mock the database query to raise DoesNotExist mock_task_get.side_effect = Task.DoesNotExist request = self.factory.get('/task/999/resolve/?fileuri=test') force_authenticate(request, user=self.user) response = self.view(request, task_id=999) assert response.status_code == status.HTTP_404_NOT_FOUND @patch('io_storages.proxy_api.Task.objects.get') @patch.object(ResolveStorageUriAPIMixin, 'resolve') def test_get_success(self, mock_resolve, mock_task_get, setup): # Mock the database query and resolve method mock_task_get.return_value = self.task mock_resolve.return_value = Response(status=status.HTTP_200_OK) request = self.factory.get('/task/1/resolve/?fileuri=test') force_authenticate(request, user=self.user) response = self.view(request, task_id=1) mock_task_get.assert_called_once_with(pk=1) # Use any_call instead of assert_called_once_with to handle DRF request vs WSGIRequest assert mock_resolve.call_args is not None assert mock_resolve.call_args[0][1] == 'test' assert mock_resolve.call_args[0][2] == self.task assert response.status_code == status.HTTP_200_OK @pytest.mark.django_db class TestProjectResolveStorageUri: @pytest.fixture def setup(self): # Create the necessary objects for testing without database self.factory = APIRequestFactory() self.user = MagicMock() self.project = MagicMock() # Avoid using spec=Project - it triggers database access self.view = ProjectResolveStorageUri.as_view() @patch('io_storages.proxy_api.Project.objects.get') def test_get_with_missing_params(self, mock_project_get, setup): # Mock the database query mock_project_get.return_value = self.project # Test missing fileuri parameter request = self.factory.get('/project/1/resolve/') force_authenticate(request, user=self.user) response = self.view(request, project_id=1) assert response.status_code == status.HTTP_400_BAD_REQUEST # Test missing project_id parameter request = self.factory.get('/project/resolve/?fileuri=test') force_authenticate(request, user=self.user) response = self.view(request) assert response.status_code == status.HTTP_400_BAD_REQUEST @patch('io_storages.proxy_api.Project.objects.get') def test_get_project_not_found(self, mock_project_get, setup): # Mock the database query to raise DoesNotExist mock_project_get.side_effect = Project.DoesNotExist request = self.factory.get('/project/999/resolve/?fileuri=test') force_authenticate(request, user=self.user) response = self.view(request, project_id=999) assert response.status_code == status.HTTP_404_NOT_FOUND @patch('io_storages.proxy_api.Project.objects.get') @patch.object(ResolveStorageUriAPIMixin, 'resolve') def test_get_success(self, mock_resolve, mock_project_get, setup): # Mock the database query and resolve method mock_project_get.return_value = self.project mock_resolve.return_value = Response(status=status.HTTP_200_OK) request = self.factory.get('/project/1/resolve/?fileuri=test') force_authenticate(request, user=self.user) response = self.view(request, project_id=1) mock_project_get.assert_called_once_with(pk=1) # Use any_call instead of assert_called_once_with to handle DRF request vs WSGIRequest assert mock_resolve.call_args is not None assert mock_resolve.call_args[0][1] == 'test' assert mock_resolve.call_args[0][2] == self.project assert response.status_code == status.HTTP_200_OK