Bin
2025-12-17 1d710f844b65d9bfdf986a71a3b924cd70598a41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import base64
import logging
import time
from typing import Union
from urllib.parse import unquote
 
from core.feature_flags import flag_set
from django.conf import settings
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, StreamingHttpResponse
from drf_spectacular.utils import extend_schema
from projects.models import Project
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from tasks.models import Task
 
from label_studio.io_storages.functions import get_storage_by_url
from label_studio.io_storages.utils import parse_range
 
logger = logging.getLogger(__name__)
 
 
class ResolveStorageUriAPIMixin:
    def resolve(self, request: HttpRequest, fileuri: str, instance: Union[Task, Project]) -> Response:
        model_name = type(instance).__name__
 
        if not instance.has_permission(request.user):
            return Response(status=status.HTTP_403_FORBIDDEN)
 
        # Attempt to base64 decode the fileuri
        try:
            fileuri = base64.urlsafe_b64decode(fileuri.encode()).decode()
        # For backwards compatibility, try unquote if this fails
        except Exception as exc:
            logger.debug(
                f'Failed to decode base64 {fileuri} for {model_name} {instance.id}: {exc} falling back to unquote'
            )
            fileuri = unquote(fileuri)
 
        # Try to find storage by URL
        project = None
        if flag_set('fflag_optic_all_optic_1938_storage_proxy', user='auto'):
            project = instance if isinstance(instance, Project) else instance.project
            storage_objects = project.get_all_import_storage_objects
            storage = get_storage_by_url(fileuri, storage_objects)
            if not storage:
                logger.error(f'Could not find storage for URI {fileuri}')
                return Response(status=status.HTTP_404_NOT_FOUND)
            # Not all storages support presigned URLs
            if not hasattr(storage, 'presign'):
                logger.error(f'Storage {storage} does not support presign URLs')
                return Response(status=status.HTTP_404_NOT_FOUND)
            presign = storage.presign
        else:
            presign = True
 
        # Check if storage should use presigned URLs;
        # It's important to have this check here, because it increases security:
        # If storage.presign is False, it means an admin doesn't want to expose presigned URLs anyhow,
        # and all files are proxied through Label Studio using LS auth and RBAC control.
 
        if presign:
            # Redirect to presigned URL (original flow)
            return self.redirect_to_presign_url(fileuri, instance, model_name)
        else:
            return self.proxy_data_from_storage(request, fileuri, project, storage)
 
    def redirect_to_presign_url(self, fileuri: str, instance: Union[Task, Project], model_name: str) -> Response:
        """Generate and redirect to a presigned URL for the given file URI"""
        try:
            resolved = instance.resolve_storage_uri(fileuri)
        except Exception as exc:
            logger.error(f'Failed to resolve storage uri {fileuri} for {model_name} {instance.id}: {exc}')
            return Response(status=status.HTTP_404_NOT_FOUND)
 
        if resolved is None or resolved.get('url') is None:
            return Response(status=status.HTTP_404_NOT_FOUND)
 
        url = resolved['url']
        max_age = 0
        if resolved.get('presign_ttl'):
            max_age = resolved.get('presign_ttl') * 60
 
        # Proxy to presigned url
        response = HttpResponseRedirect(redirect_to=url, status=status.HTTP_303_SEE_OTHER)
        response.headers['Cache-Control'] = f'no-store, max-age={max_age}'
        # Remove Sentry trace propagation headers to avoid CORS issues
        response.headers.pop('baggage', None)
        response.headers.pop('sentry-trace', None)
        return response
 
    def time_limited_chunker(self, stream_body):
        """
        Generator that stops yielding chunks after timeout seconds.
        """
        chunk_size = settings.RESOLVER_PROXY_BUFFER_SIZE
        timeout = settings.RESOLVER_PROXY_TIMEOUT
        start_time = time.monotonic()
        deadline = start_time + timeout
        chunks_yielded = 0
        total_bytes = 0
 
        try:
            for chunk in stream_body.iter_chunks(chunk_size=chunk_size):
                current_time = time.monotonic()
                # Check if we've exceeded our time limit
                if current_time >= deadline:
                    logger.warning(
                        f'Time limit ({timeout}s) reached after yielding {chunks_yielded} chunks ({total_bytes} bytes)'
                    )
                    break
 
                # Track statistics
                chunks_yielded += 1
                total_bytes += len(chunk)
                yield chunk
 
        except Exception as e:
            logger.error(f'Error during time-limited streaming: {e}', exc_info=True)
        finally:
            elapsed = time.monotonic() - start_time
            try:
                stream_body.close()
            except Exception as e:
                logger.debug(f"Couldn't close stream: {e}")
            logger.debug(
                f'Stream processing finished after {elapsed:.2f}s, yielded {chunks_yielded} chunks ({total_bytes} bytes)'
            )
 
    def override_range_header(self, request):
        """
        Process and override Range header to limit stream size.
        This function does a trick: limit stream chunk sizes to MAX_RANGE,
        this way we free sync LSE workers for hanging too long,
        because the connection will be closed after the MAX_RANGE chunk is over.
 
        This function handles several range request formats:
        1 'bytes=0-' and 'bytes=0-0': Passes through unchanged (header probes)
        2 'bytes=123456-' and 'bytes=123456-0': Limits to MAX_RANGE bytes
        3 'bytes=123456-789012': Limits the range if it exceeds MAX_RANGE
        4 'bytes=-1024': Handles negative start (not supported)
 
        Returns:
            str: Modified range header in format "bytes=start-end" or None if no range header
        """
        max_range_size = settings.RESOLVER_PROXY_MAX_RANGE_SIZE
        range_header = None
 
        if rng := request.headers.get('Range'):
            start, end = parse_range(rng)
            # Normalize None end to empty for consistent handling
            if end is None:
                end = ''
            logger.debug(f'>> Range read from request: start: {start}, end: {end}')
 
            """
            Pass this range as is to storage:
              - 'bytes=0-'  most likely, browser is requesting just headers
              - 'bytes=0-0'  most likely, browser is requesting just headers
            Limit stream to MAX_RANGE bytes:
              - 'bytes=123456-'  browser is requesting from 123456 to the end of the file
              - 'bytes=123456-0'  browser is requesting from 123456 to the end of the file
              - 'bytes=123456-789012'  browser is requesting from 123456 to 789012
            Not supported:
              - 'bytes=-1024'  browser is requesting last 1024 bytes - we don't support this
            """
            # 'bytes=0-' + 'bytes=0-0'
            if start == 0 and (end == '' or end == 0):
                pass
            # 'bytes=123456-' + 'bytes=123456-0'
            elif start > 0 and (end == '' or end == 0):
                end = start + max_range_size
            # 'bytes=123456-' + 'bytes=123456-789012'
            elif start >= 0 and end > 0:
                end = start + max_range_size if end >= start + max_range_size else end
            # 'bytes=-1024'
            elif start < 0:
                logger.warning(f'Start range is negative and not supported: {rng}')
                start = 0
                end = start + max_range_size
            else:
                logger.warning(f'Range is not covered by logic: {rng}')
                start = 0
                end = ''
 
            range_header = f'bytes={start}-{end}'
            logger.debug(f'>> stream > start: {int(start)/1024/1024} MB')
            logger.debug(f'>> stream > end: {int(end or 0)/1024/1024} MB')
            logger.debug(f'>> stream > range_header: {range_header}')
 
        return range_header
 
    def prepare_headers(self, response, metadata, request, project):
        """Prepare and set headers for the streaming response"""
        # Copy important headers from storage
        if metadata.get('ContentLength'):
            response.headers['Content-Length'] = str(metadata['ContentLength'])
        if metadata.get('ContentRange'):
            response.headers['Content-Range'] = metadata['ContentRange']
        if metadata.get('LastModified'):
            last_mod = metadata['LastModified']
            # Accept either datetime-like (has strftime) or preformatted string
            if hasattr(last_mod, 'strftime'):
                response.headers['Last-Modified'] = last_mod.strftime('%a, %d %b %Y %H:%M:%S GMT')
            else:
                response.headers['Last-Modified'] = str(last_mod)
 
        # Always enable range requests
        response.headers['Accept-Ranges'] = 'bytes'
 
        # Cache control
        max_age = settings.RESOLVER_PROXY_CACHE_TIMEOUT
        response.headers['Cache-Control'] = f'private, max-age={max_age}, must-revalidate'
 
        # Generate an ETag based on user ID and user is_active status
        # This ensures cache is invalidated when user status changes
        # "ETag" is a standard HTTP header defined in the HTTP/1.1 specification (RFC 7232)
        #  It stands for "Entity Tag" and is specifically designed for cache validation
        user = request.user
        has_access = int(project.has_permission(user))
        user_status_tag = f'{user.id}{has_access}'
        response.headers['ETag'] = f'{user_status_tag}'
        if metadata.get('ETag'):
            # use original ETag from storage
            response.headers['ETag'] += metadata['ETag'].strip('"')
        response.headers['ETag'] = f'"{response.headers["ETag"]}"'
 
        return response
 
    def proxy_data_from_storage(self, request, uri, project, storage):
        """
        Proxy the data using iter_chunks directly from storage streaming object.
 
        This implementation forwards Range headers to cloud storages and streams the response
        directly using StreamingHttpResponse. It avoids any intermediate buffering
        but doesn't support backward seeking.
        """
        try:
            # Process and limit the range header for downloaded files
            range_header = self.override_range_header(request)
 
            # Use the storage-specific method to get data stream and content type
            stream, content_type, metadata = storage.get_bytes_stream(uri, range_header=range_header)
 
            if stream is None:
                logger.error(f'Failed to get direct stream from storage {storage}')
                return Response(
                    {'error': 'Storage stream failed while proxying data', 'detail': 'Stream is None'},
                    status=status.HTTP_424_FAILED_DEPENDENCY,
                )
 
            # Create time-limited stream
            time_limited_stream = self.time_limited_chunker(stream)
 
            # Set up streaming response with storage's status code
            status_code = metadata['StatusCode']
            response = StreamingHttpResponse(
                time_limited_stream, content_type=content_type or 'application/octet-stream', status=status_code
            )
 
            # Prepare response headers
            response = self.prepare_headers(response, metadata, request, project)
 
            # Process cached requests using ETag - with range-aware handling
            if settings.RESOLVER_PROXY_ENABLE_ETAG_CACHE and 'Range' not in request.headers:
                if request.headers.get('If-None-Match') == response.headers.get('ETag'):
                    return HttpResponse(status=status.HTTP_304_NOT_MODIFIED)
 
            return response
 
        except Exception as e:
            logger.error(f'Error in direct proxy from storage: {e}', exc_info=True)
            return Response(
                {'error': 'Storage stream failed while proxying data', 'detail': str(e)},
                status=status.HTTP_424_FAILED_DEPENDENCY,
            )
 
 
@extend_schema(exclude=True)
class TaskResolveStorageUri(ResolveStorageUriAPIMixin, APIView):
    """A file proxy to presign storage urls at the task level.
 
    If the storage has presign=False, it will proxy the data through Label Studio
    instead of redirecting to presigned URLs.
    """
 
    http_method_names = ['get']
    permission_classes = (IsAuthenticated,)
 
    def get(self, request, *args, **kwargs):
        """Get the presigned url for a given fileuri or proxy data through Label Studio"""
        request = self.request
        task_id = kwargs.get('task_id')
        fileuri = request.GET.get('fileuri')
 
        if fileuri is None or task_id is None:
            return Response(status=status.HTTP_400_BAD_REQUEST)
 
        try:
            task = Task.objects.get(pk=task_id)
        except Task.DoesNotExist:
            return Response(status=status.HTTP_404_NOT_FOUND)
 
        return self.resolve(request, fileuri, task)
 
 
@extend_schema(exclude=True)
class ProjectResolveStorageUri(ResolveStorageUriAPIMixin, APIView):
    """A file proxy to presign storage urls at the project level.
 
    If the storage has presign=False, it will proxy the data through Label Studio
    instead of redirecting to presigned URLs.
    """
 
    http_method_names = ['get']
    permission_classes = (IsAuthenticated,)
 
    def get(self, request, *args, **kwargs):
        """Get the presigned url for a given fileuri or proxy data through Label Studio"""
        request = self.request
        project_id = kwargs.get('project_id')
        fileuri = request.GET.get('fileuri')
 
        if fileuri is None or project_id is None:
            return Response(status=status.HTTP_400_BAD_REQUEST)
 
        try:
            project = Project.objects.get(pk=project_id)
        except Project.DoesNotExist:
            return Response(status=status.HTTP_404_NOT_FOUND)
 
        return self.resolve(request, fileuri, project)