Bin
2025-12-17 1d710f844b65d9bfdf986a71a3b924cd70598a41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
import base64
import io
import unittest
from datetime import datetime
from unittest.mock import MagicMock, patch
 
import pytest
from io_storages.proxy_api import (
    ProjectResolveStorageUri,
    ResolveStorageUriAPIMixin,
    TaskResolveStorageUri,
)
from projects.models import Project
from rest_framework import status
from rest_framework.response import Response
from rest_framework.test import APIRequestFactory, force_authenticate
from tasks.models import Task
 
 
class TestResolveStorageUriAPIMixin(unittest.TestCase):
    def setUp(self):
        self.mixin = ResolveStorageUriAPIMixin()
        self.user = MagicMock()
        self.project = MagicMock()
        self.task = MagicMock()
        self.task.project = self.project
        # Set the __class__.__name__ to "Task" for type checks
        type(self.task).__name__ = 'Task'
        self.task.has_permission.return_value = True
        self.request = MagicMock()
        self.request.user = self.user
        self.storage = MagicMock()
        self.storage.presign = True
 
    def test_resolve_with_permission_denied(self):
        self.task.has_permission.return_value = False
        result = self.mixin.resolve(self.request, 'test_fileuri', self.task)
        assert result.status_code == status.HTTP_403_FORBIDDEN
 
    @patch('io_storages.proxy_api.flag_set')
    @patch('io_storages.proxy_api.get_storage_by_url')
    def test_resolve_with_base64_decoding(self, mock_get_storage, mock_flag_set):
        mock_flag_set.return_value = True
        mock_get_storage.return_value = self.storage
        fileuri = base64.urlsafe_b64encode(b'test_uri').decode()
 
        with patch.object(self.mixin, 'redirect_to_presign_url') as mock_redirect:
            mock_redirect.return_value = Response()
            self.mixin.resolve(self.request, fileuri, self.task)
            mock_redirect.assert_called_once_with('test_uri', self.task, 'Task')
 
    @patch('io_storages.proxy_api.flag_set')
    @patch('io_storages.proxy_api.get_storage_by_url')
    def test_resolve_with_url_unquote_fallback(self, mock_get_storage, mock_flag_set):
        mock_flag_set.return_value = True
        mock_get_storage.return_value = self.storage
 
        with patch.object(self.mixin, 'redirect_to_presign_url') as mock_redirect:
            mock_redirect.return_value = Response()
            # Non-base64 uri to trigger fallback
            self.mixin.resolve(self.request, 's3://bucket/file.jpg', self.task)
            mock_redirect.assert_called_once_with('s3://bucket/file.jpg', self.task, 'Task')
 
    @patch('io_storages.proxy_api.flag_set')
    @patch('io_storages.proxy_api.get_storage_by_url')
    def test_resolve_storage_not_found(self, mock_get_storage, mock_flag_set):
        mock_flag_set.return_value = True
        mock_get_storage.return_value = None
        result = self.mixin.resolve(self.request, 'fileuri', self.task)
        assert result.status_code == status.HTTP_404_NOT_FOUND
 
    @patch('io_storages.proxy_api.flag_set')
    @patch('io_storages.proxy_api.get_storage_by_url')
    def test_resolve_storage_no_presign_support(self, mock_get_storage, mock_flag_set):
        mock_flag_set.return_value = True
        mock_storage = MagicMock()
        delattr(mock_storage, 'presign')
        mock_get_storage.return_value = mock_storage
        result = self.mixin.resolve(self.request, 'fileuri', self.task)
        assert result.status_code == status.HTTP_404_NOT_FOUND
 
    @patch('io_storages.proxy_api.flag_set')
    @patch('io_storages.proxy_api.get_storage_by_url')
    def test_resolve_with_presign_true(self, mock_get_storage, mock_flag_set):
        mock_flag_set.return_value = True
        mock_storage = MagicMock()
        mock_storage.presign = True
        mock_get_storage.return_value = mock_storage
 
        with patch.object(self.mixin, 'redirect_to_presign_url') as mock_redirect:
            mock_redirect.return_value = Response()
            self.mixin.resolve(self.request, 'fileuri', self.task)
            mock_redirect.assert_called_once()
 
    @patch('io_storages.proxy_api.flag_set')
    @patch('io_storages.proxy_api.get_storage_by_url')
    def test_resolve_with_presign_false(self, mock_get_storage, mock_flag_set):
        mock_flag_set.return_value = True
        mock_storage = MagicMock()
        mock_storage.presign = False
        mock_get_storage.return_value = mock_storage
        project = self.task.project
 
        with patch.object(self.mixin, 'proxy_data_from_storage') as mock_proxy:
            mock_proxy.return_value = Response()
            self.mixin.resolve(self.request, 'fileuri', self.task)
            mock_proxy.assert_called_once_with(self.request, 'fileuri', project, mock_storage)
 
    def test_redirect_to_presign_url_success(self):
        self.task.resolve_storage_uri.return_value = {'url': 'https://example.com/file.jpg', 'presign_ttl': 60}
        result = self.mixin.redirect_to_presign_url('fileuri', self.task, 'Task')
 
        assert result.status_code == status.HTTP_303_SEE_OTHER
        assert result.url == 'https://example.com/file.jpg'
        assert result.headers['Cache-Control'] == 'no-store, max-age=3600'
 
    def test_redirect_to_presign_url_no_url(self):
        self.task.resolve_storage_uri.return_value = {'url': None}
        result = self.mixin.redirect_to_presign_url('fileuri', self.task, 'Task')
        assert result.status_code == status.HTTP_404_NOT_FOUND
 
    def test_redirect_to_presign_url_exception(self):
        self.task.resolve_storage_uri.side_effect = Exception('Error resolving URL')
        result = self.mixin.redirect_to_presign_url('fileuri', self.task, 'Task')
        assert result.status_code == status.HTTP_404_NOT_FOUND
 
    def test_proxy_data_from_storage_success(self):
        mock_storage = MagicMock()
        # Ensure get_bytes_stream returns a three-tuple, metadata can be empty initially
        mock_storage.get_bytes_stream.return_value = (io.BytesIO(b'test data'), 'image/jpeg', {})
        mock_project = MagicMock()
 
        with patch('io_storages.proxy_api.StreamingHttpResponse') as mock_response_class, patch(
            'io_storages.proxy_api.settings'
        ) as mock_settings:
            # Configure mock settings
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024  # 1MB
            mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192
            mock_settings.RESOLVER_PROXY_CACHE_TIMEOUT = 3600
 
            # Set up mock stream and response
            mock_stream = MagicMock()
            mock_metadata = {
                'StatusCode': 200,
                'ContentLength': 1000,
                'LastModified': datetime.now(),
                'ETag': '"abcdef123456"',
            }
            mock_storage.get_bytes_stream.return_value = (mock_stream, 'application/test', mock_metadata)
 
            # Set up mock response
            mock_response = MagicMock()
            mock_response.headers = {}
            mock_response_class.return_value = mock_response
 
            # Set up request with range header
            self.request.headers = {'Range': 'bytes=100-200'}
 
            # Call the method
            result = self.mixin.proxy_data_from_storage(self.request, 'uri', mock_project, mock_storage)
 
            # Verify the correct range header was passed
            mock_storage.get_bytes_stream.assert_called_once()
            args, kwargs = mock_storage.get_bytes_stream.call_args
            # First positional argument should be the URI
            self.assertEqual(args[0], 'uri')
            # Range header should be passed as a keyword argument and start with 'bytes='
            self.assertIn('range_header', kwargs)
            self.assertTrue(kwargs['range_header'].startswith('bytes='))
 
            # Verify the response was created with the stream
            mock_response_class.assert_called_once()
            # The first positional argument should be a generator returned by time_limited_chunker
            called_args, _ = mock_response_class.call_args
            streaming_generator = called_args[0]
            self.assertTrue(
                hasattr(streaming_generator, '__next__') and hasattr(streaming_generator, '__iter__'),
                'Expected a generator for streaming chunks',
            )
 
            # Verify correct headers are set
            self.assertEqual(result, mock_response)
            self.assertTrue('ETag' in mock_response.headers)
 
    def test_proxy_data_from_storage_no_data(self):
        mock_storage = MagicMock()
        # Return three-tuple with empty metadata when no data is available
        mock_storage.get_bytes_stream.return_value = (None, None, {})
        mock_project = MagicMock()
 
        result = self.mixin.proxy_data_from_storage(self.request, 'uri', mock_project, mock_storage)
        assert result.status_code == status.HTTP_424_FAILED_DEPENDENCY
 
    def test_proxy_data_from_storage_exception(self):
        mock_storage = MagicMock()
        mock_storage.get_bytes_stream.side_effect = Exception('Storage error')
        mock_project = MagicMock()
 
        result = self.mixin.proxy_data_from_storage(self.request, 'uri', mock_project, mock_storage)
        assert result.status_code == status.HTTP_424_FAILED_DEPENDENCY
 
    def test_time_limited_chunker_normal_case(self):
        """Test time_limited_chunker when all chunks are processed within timeout"""
        # Create a mock stream with iter_chunks method
        mock_stream = MagicMock()
        mock_stream.iter_chunks.return_value = [b'chunk1', b'chunk2', b'chunk3']
 
        # Set up settings
        with patch('io_storages.proxy_api.settings') as mock_settings, patch('time.monotonic') as mock_time:
            # Mock settings
            mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192
            mock_settings.RESOLVER_PROXY_TIMEOUT = 20
 
            # Mock time to simulate being within timeout
            # Add an extra value for the 'finally' block
            mock_time.side_effect = [0, 1, 2, 3, 4]  # Start, three chunk iterations, finally
 
            # Run the chunker and collect all chunks
            chunks = list(self.mixin.time_limited_chunker(mock_stream))
 
            # Verify all chunks were yielded
            assert chunks == [b'chunk1', b'chunk2', b'chunk3']
            assert mock_stream.close.called
 
    def test_time_limited_chunker_timeout(self):
        """Test time_limited_chunker when timeout is reached during processing"""
        # Create a mock stream with iter_chunks method
        mock_stream = MagicMock()
        mock_stream.iter_chunks.return_value = [b'chunk1', b'chunk2', b'chunk3', b'chunk4', b'chunk5']
 
        # Set up settings
        with patch('io_storages.proxy_api.settings') as mock_settings, patch('time.monotonic') as mock_time:
            # Mock settings
            mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192
            mock_settings.RESOLVER_PROXY_TIMEOUT = 10
 
            # Mock time to simulate exceeding timeout after second chunk
            # Add an extra value for the 'finally' block
            mock_time.side_effect = [0, 5, 9, 15, 20, 25]  # Start time, chunk checks, finally
 
            # Run the chunker and collect all chunks
            chunks = list(self.mixin.time_limited_chunker(mock_stream))
 
            # Verify only the chunks before timeout were yielded
            assert chunks == [b'chunk1', b'chunk2']
            assert mock_stream.close.called
 
    def test_time_limited_chunker_exception(self):
        """Test time_limited_chunker when an exception occurs during streaming"""
        # Create a mock stream with iter_chunks method that raises an exception
        mock_stream = MagicMock()
        mock_stream.iter_chunks.side_effect = Exception('Streaming error')
 
        # Set up settings
        with patch('io_storages.proxy_api.settings') as mock_settings, patch('time.monotonic') as mock_time:
            # Mock settings
            mock_settings.RESOLVER_PROXY_BUFFER_SIZE = 8192
            mock_settings.RESOLVER_PROXY_TIMEOUT = 20
 
            # Mock time - need two values: one for start and one for the finally block
            mock_time.side_effect = [0, 1]
 
            # Run the chunker and collect all chunks (should be empty due to exception)
            chunks = list(self.mixin.time_limited_chunker(mock_stream))
 
            # Verify no chunks were yielded and the stream was closed
            assert chunks == []
            assert mock_stream.close.called
 
    def test_override_range_header_no_header(self):
        """Test override_range_header when no Range header is present"""
        self.request.headers = {}
        result = self.mixin.override_range_header(self.request)
        assert result is None
 
    def test_override_range_header_header_probes(self):
        """Test override_range_header with header probe formats"""
        # Test bytes=0-
        self.request.headers = {'Range': 'bytes=0-'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024
            # Mock the parse_range function to return a known value
            mock_parse_range.return_value = (0, '')
 
            result = self.mixin.override_range_header(self.request)
            assert result == 'bytes=0-'
 
        # Test bytes=0-0
        self.request.headers = {'Range': 'bytes=0-0'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024
            # Mock the parse_range function to return a known value
            mock_parse_range.return_value = (0, 0)
 
            result = self.mixin.override_range_header(self.request)
            assert result == 'bytes=0-0'
 
    def test_override_range_header_start_no_end(self):
        """Test override_range_header with a start position but no end"""
        # Case: bytes=100-
        self.request.headers = {'Range': 'bytes=100-'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024  # 1MB
            # Mock the parse_range function to return a known value
            mock_parse_range.return_value = (100, '')
 
            result = self.mixin.override_range_header(self.request)
            # Should add MAX_RANGE_SIZE to start
            assert result == f'bytes=100-{100 + 1024*1024}'
 
        # Case: bytes=100-0 (treated like bytes=100-)
        self.request.headers = {'Range': 'bytes=100-0'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 1024 * 1024  # 1MB
            # Mock the parse_range function to return a known value
            mock_parse_range.return_value = (100, 0)
 
            result = self.mixin.override_range_header(self.request)
            # Should add MAX_RANGE_SIZE to start
            assert result == f'bytes=100-{100 + 1024*1024}'
 
    def test_override_range_header_start_and_end(self):
        """Test override_range_header with start and end positions"""
        # Case: Range within limit
        self.request.headers = {'Range': 'bytes=100-5000'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000  # 10KB
            # Mock the parse_range function to return a known value
            mock_parse_range.return_value = (100, 5000)
 
            result = self.mixin.override_range_header(self.request)
            # Should remain unchanged as it's within limit
            assert result == 'bytes=100-5000'
 
        # Case: Range exceeding limit
        self.request.headers = {'Range': 'bytes=100-20000'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000  # 10KB
            # Mock the parse_range function to return a known value
            mock_parse_range.return_value = (100, 20000)
 
            result = self.mixin.override_range_header(self.request)
            # Should limit the range to MAX_RANGE_SIZE from start
            assert result == f'bytes=100-{100 + 10000}'
 
    def test_override_range_header_negative_start(self):
        """Test override_range_header with negative start position"""
        self.request.headers = {'Range': 'bytes=-1024'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000  # 10KB
            # Mock the parse_range function to return a negative start
            mock_parse_range.return_value = (-1024, None)
 
            result = self.mixin.override_range_header(self.request)
            # Should reset to 0 and add MAX_RANGE_SIZE
            assert result == f'bytes=0-{10000}'
 
    def test_override_range_header_unsupported_format(self):
        """Test override_range_header with unsupported range format"""
        self.request.headers = {'Range': 'invalid-range-format'}
        with patch('io_storages.proxy_api.settings') as mock_settings, patch(
            'io_storages.proxy_api.parse_range'
        ) as mock_parse_range:
            mock_settings.RESOLVER_PROXY_MAX_RANGE_SIZE = 10000  # 10KB
            # Mock parse_range to simulate failure with invalid format
            mock_parse_range.return_value = (0, None)
 
            result = self.mixin.override_range_header(self.request)
            # Should reset to default
            assert result == 'bytes=0-'
 
 
class TestTaskResolveStorageUri:
    @pytest.fixture
    def setup(self):
        # Create the necessary objects for testing without database
        self.factory = APIRequestFactory()
        self.user = MagicMock()
        self.task = MagicMock(spec=Task)
        self.view = TaskResolveStorageUri.as_view()
 
    @patch('io_storages.proxy_api.Task.objects.get')
    def test_get_with_missing_params(self, mock_task_get, setup):
        # Mock the database query
        mock_task_get.return_value = self.task
 
        # Test missing fileuri parameter
        request = self.factory.get('/task/1/resolve/')
        force_authenticate(request, user=self.user)
        response = self.view(request, task_id=1)
        assert response.status_code == status.HTTP_400_BAD_REQUEST
 
        # Test missing task_id parameter
        request = self.factory.get('/task/resolve/?fileuri=test')
        force_authenticate(request, user=self.user)
        response = self.view(request)
        assert response.status_code == status.HTTP_400_BAD_REQUEST
 
    @patch('io_storages.proxy_api.Task.objects.get')
    def test_get_task_not_found(self, mock_task_get, setup):
        # Mock the database query to raise DoesNotExist
        mock_task_get.side_effect = Task.DoesNotExist
 
        request = self.factory.get('/task/999/resolve/?fileuri=test')
        force_authenticate(request, user=self.user)
        response = self.view(request, task_id=999)
 
        assert response.status_code == status.HTTP_404_NOT_FOUND
 
    @patch('io_storages.proxy_api.Task.objects.get')
    @patch.object(ResolveStorageUriAPIMixin, 'resolve')
    def test_get_success(self, mock_resolve, mock_task_get, setup):
        # Mock the database query and resolve method
        mock_task_get.return_value = self.task
        mock_resolve.return_value = Response(status=status.HTTP_200_OK)
 
        request = self.factory.get('/task/1/resolve/?fileuri=test')
        force_authenticate(request, user=self.user)
        response = self.view(request, task_id=1)
 
        mock_task_get.assert_called_once_with(pk=1)
        # Use any_call instead of assert_called_once_with to handle DRF request vs WSGIRequest
        assert mock_resolve.call_args is not None
        assert mock_resolve.call_args[0][1] == 'test'
        assert mock_resolve.call_args[0][2] == self.task
        assert response.status_code == status.HTTP_200_OK
 
 
@pytest.mark.django_db
class TestProjectResolveStorageUri:
    @pytest.fixture
    def setup(self):
        # Create the necessary objects for testing without database
        self.factory = APIRequestFactory()
        self.user = MagicMock()
        self.project = MagicMock()  # Avoid using spec=Project - it triggers database access
        self.view = ProjectResolveStorageUri.as_view()
 
    @patch('io_storages.proxy_api.Project.objects.get')
    def test_get_with_missing_params(self, mock_project_get, setup):
        # Mock the database query
        mock_project_get.return_value = self.project
 
        # Test missing fileuri parameter
        request = self.factory.get('/project/1/resolve/')
        force_authenticate(request, user=self.user)
        response = self.view(request, project_id=1)
        assert response.status_code == status.HTTP_400_BAD_REQUEST
 
        # Test missing project_id parameter
        request = self.factory.get('/project/resolve/?fileuri=test')
        force_authenticate(request, user=self.user)
        response = self.view(request)
        assert response.status_code == status.HTTP_400_BAD_REQUEST
 
    @patch('io_storages.proxy_api.Project.objects.get')
    def test_get_project_not_found(self, mock_project_get, setup):
        # Mock the database query to raise DoesNotExist
        mock_project_get.side_effect = Project.DoesNotExist
 
        request = self.factory.get('/project/999/resolve/?fileuri=test')
        force_authenticate(request, user=self.user)
        response = self.view(request, project_id=999)
 
        assert response.status_code == status.HTTP_404_NOT_FOUND
 
    @patch('io_storages.proxy_api.Project.objects.get')
    @patch.object(ResolveStorageUriAPIMixin, 'resolve')
    def test_get_success(self, mock_resolve, mock_project_get, setup):
        # Mock the database query and resolve method
        mock_project_get.return_value = self.project
        mock_resolve.return_value = Response(status=status.HTTP_200_OK)
 
        request = self.factory.get('/project/1/resolve/?fileuri=test')
        force_authenticate(request, user=self.user)
        response = self.view(request, project_id=1)
 
        mock_project_get.assert_called_once_with(pk=1)
        # Use any_call instead of assert_called_once_with to handle DRF request vs WSGIRequest
        assert mock_resolve.call_args is not None
        assert mock_resolve.call_args[0][1] == 'test'
        assert mock_resolve.call_args[0][2] == self.project
        assert response.status_code == status.HTTP_200_OK