import base64
|
import logging
|
import time
|
from typing import Union
|
from urllib.parse import unquote
|
|
from core.feature_flags import flag_set
|
from django.conf import settings
|
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, StreamingHttpResponse
|
from drf_spectacular.utils import extend_schema
|
from projects.models import Project
|
from rest_framework import status
|
from rest_framework.permissions import IsAuthenticated
|
from rest_framework.response import Response
|
from rest_framework.views import APIView
|
from tasks.models import Task
|
|
from label_studio.io_storages.functions import get_storage_by_url
|
from label_studio.io_storages.utils import parse_range
|
|
logger = logging.getLogger(__name__)
|
|
|
class ResolveStorageUriAPIMixin:
|
def resolve(self, request: HttpRequest, fileuri: str, instance: Union[Task, Project]) -> Response:
|
model_name = type(instance).__name__
|
|
if not instance.has_permission(request.user):
|
return Response(status=status.HTTP_403_FORBIDDEN)
|
|
# Attempt to base64 decode the fileuri
|
try:
|
fileuri = base64.urlsafe_b64decode(fileuri.encode()).decode()
|
# For backwards compatibility, try unquote if this fails
|
except Exception as exc:
|
logger.debug(
|
f'Failed to decode base64 {fileuri} for {model_name} {instance.id}: {exc} falling back to unquote'
|
)
|
fileuri = unquote(fileuri)
|
|
# Try to find storage by URL
|
project = None
|
if flag_set('fflag_optic_all_optic_1938_storage_proxy', user='auto'):
|
project = instance if isinstance(instance, Project) else instance.project
|
storage_objects = project.get_all_import_storage_objects
|
storage = get_storage_by_url(fileuri, storage_objects)
|
if not storage:
|
logger.error(f'Could not find storage for URI {fileuri}')
|
return Response(status=status.HTTP_404_NOT_FOUND)
|
# Not all storages support presigned URLs
|
if not hasattr(storage, 'presign'):
|
logger.error(f'Storage {storage} does not support presign URLs')
|
return Response(status=status.HTTP_404_NOT_FOUND)
|
presign = storage.presign
|
else:
|
presign = True
|
|
# Check if storage should use presigned URLs;
|
# It's important to have this check here, because it increases security:
|
# If storage.presign is False, it means an admin doesn't want to expose presigned URLs anyhow,
|
# and all files are proxied through Label Studio using LS auth and RBAC control.
|
|
if presign:
|
# Redirect to presigned URL (original flow)
|
return self.redirect_to_presign_url(fileuri, instance, model_name)
|
else:
|
return self.proxy_data_from_storage(request, fileuri, project, storage)
|
|
def redirect_to_presign_url(self, fileuri: str, instance: Union[Task, Project], model_name: str) -> Response:
|
"""Generate and redirect to a presigned URL for the given file URI"""
|
try:
|
resolved = instance.resolve_storage_uri(fileuri)
|
except Exception as exc:
|
logger.error(f'Failed to resolve storage uri {fileuri} for {model_name} {instance.id}: {exc}')
|
return Response(status=status.HTTP_404_NOT_FOUND)
|
|
if resolved is None or resolved.get('url') is None:
|
return Response(status=status.HTTP_404_NOT_FOUND)
|
|
url = resolved['url']
|
max_age = 0
|
if resolved.get('presign_ttl'):
|
max_age = resolved.get('presign_ttl') * 60
|
|
# Proxy to presigned url
|
response = HttpResponseRedirect(redirect_to=url, status=status.HTTP_303_SEE_OTHER)
|
response.headers['Cache-Control'] = f'no-store, max-age={max_age}'
|
# Remove Sentry trace propagation headers to avoid CORS issues
|
response.headers.pop('baggage', None)
|
response.headers.pop('sentry-trace', None)
|
return response
|
|
def time_limited_chunker(self, stream_body):
|
"""
|
Generator that stops yielding chunks after timeout seconds.
|
"""
|
chunk_size = settings.RESOLVER_PROXY_BUFFER_SIZE
|
timeout = settings.RESOLVER_PROXY_TIMEOUT
|
start_time = time.monotonic()
|
deadline = start_time + timeout
|
chunks_yielded = 0
|
total_bytes = 0
|
|
try:
|
for chunk in stream_body.iter_chunks(chunk_size=chunk_size):
|
current_time = time.monotonic()
|
# Check if we've exceeded our time limit
|
if current_time >= deadline:
|
logger.warning(
|
f'Time limit ({timeout}s) reached after yielding {chunks_yielded} chunks ({total_bytes} bytes)'
|
)
|
break
|
|
# Track statistics
|
chunks_yielded += 1
|
total_bytes += len(chunk)
|
yield chunk
|
|
except Exception as e:
|
logger.error(f'Error during time-limited streaming: {e}', exc_info=True)
|
finally:
|
elapsed = time.monotonic() - start_time
|
try:
|
stream_body.close()
|
except Exception as e:
|
logger.debug(f"Couldn't close stream: {e}")
|
logger.debug(
|
f'Stream processing finished after {elapsed:.2f}s, yielded {chunks_yielded} chunks ({total_bytes} bytes)'
|
)
|
|
def override_range_header(self, request):
|
"""
|
Process and override Range header to limit stream size.
|
This function does a trick: limit stream chunk sizes to MAX_RANGE,
|
this way we free sync LSE workers for hanging too long,
|
because the connection will be closed after the MAX_RANGE chunk is over.
|
|
This function handles several range request formats:
|
1 'bytes=0-' and 'bytes=0-0': Passes through unchanged (header probes)
|
2 'bytes=123456-' and 'bytes=123456-0': Limits to MAX_RANGE bytes
|
3 'bytes=123456-789012': Limits the range if it exceeds MAX_RANGE
|
4 'bytes=-1024': Handles negative start (not supported)
|
|
Returns:
|
str: Modified range header in format "bytes=start-end" or None if no range header
|
"""
|
max_range_size = settings.RESOLVER_PROXY_MAX_RANGE_SIZE
|
range_header = None
|
|
if rng := request.headers.get('Range'):
|
start, end = parse_range(rng)
|
# Normalize None end to empty for consistent handling
|
if end is None:
|
end = ''
|
logger.debug(f'>> Range read from request: start: {start}, end: {end}')
|
|
"""
|
Pass this range as is to storage:
|
- 'bytes=0-' most likely, browser is requesting just headers
|
- 'bytes=0-0' most likely, browser is requesting just headers
|
Limit stream to MAX_RANGE bytes:
|
- 'bytes=123456-' browser is requesting from 123456 to the end of the file
|
- 'bytes=123456-0' browser is requesting from 123456 to the end of the file
|
- 'bytes=123456-789012' browser is requesting from 123456 to 789012
|
Not supported:
|
- 'bytes=-1024' browser is requesting last 1024 bytes - we don't support this
|
"""
|
# 'bytes=0-' + 'bytes=0-0'
|
if start == 0 and (end == '' or end == 0):
|
pass
|
# 'bytes=123456-' + 'bytes=123456-0'
|
elif start > 0 and (end == '' or end == 0):
|
end = start + max_range_size
|
# 'bytes=123456-' + 'bytes=123456-789012'
|
elif start >= 0 and end > 0:
|
end = start + max_range_size if end >= start + max_range_size else end
|
# 'bytes=-1024'
|
elif start < 0:
|
logger.warning(f'Start range is negative and not supported: {rng}')
|
start = 0
|
end = start + max_range_size
|
else:
|
logger.warning(f'Range is not covered by logic: {rng}')
|
start = 0
|
end = ''
|
|
range_header = f'bytes={start}-{end}'
|
logger.debug(f'>> stream > start: {int(start)/1024/1024} MB')
|
logger.debug(f'>> stream > end: {int(end or 0)/1024/1024} MB')
|
logger.debug(f'>> stream > range_header: {range_header}')
|
|
return range_header
|
|
def prepare_headers(self, response, metadata, request, project):
|
"""Prepare and set headers for the streaming response"""
|
# Copy important headers from storage
|
if metadata.get('ContentLength'):
|
response.headers['Content-Length'] = str(metadata['ContentLength'])
|
if metadata.get('ContentRange'):
|
response.headers['Content-Range'] = metadata['ContentRange']
|
if metadata.get('LastModified'):
|
last_mod = metadata['LastModified']
|
# Accept either datetime-like (has strftime) or preformatted string
|
if hasattr(last_mod, 'strftime'):
|
response.headers['Last-Modified'] = last_mod.strftime('%a, %d %b %Y %H:%M:%S GMT')
|
else:
|
response.headers['Last-Modified'] = str(last_mod)
|
|
# Always enable range requests
|
response.headers['Accept-Ranges'] = 'bytes'
|
|
# Cache control
|
max_age = settings.RESOLVER_PROXY_CACHE_TIMEOUT
|
response.headers['Cache-Control'] = f'private, max-age={max_age}, must-revalidate'
|
|
# Generate an ETag based on user ID and user is_active status
|
# This ensures cache is invalidated when user status changes
|
# "ETag" is a standard HTTP header defined in the HTTP/1.1 specification (RFC 7232)
|
# It stands for "Entity Tag" and is specifically designed for cache validation
|
user = request.user
|
has_access = int(project.has_permission(user))
|
user_status_tag = f'{user.id}{has_access}'
|
response.headers['ETag'] = f'{user_status_tag}'
|
if metadata.get('ETag'):
|
# use original ETag from storage
|
response.headers['ETag'] += metadata['ETag'].strip('"')
|
response.headers['ETag'] = f'"{response.headers["ETag"]}"'
|
|
return response
|
|
def proxy_data_from_storage(self, request, uri, project, storage):
|
"""
|
Proxy the data using iter_chunks directly from storage streaming object.
|
|
This implementation forwards Range headers to cloud storages and streams the response
|
directly using StreamingHttpResponse. It avoids any intermediate buffering
|
but doesn't support backward seeking.
|
"""
|
try:
|
# Process and limit the range header for downloaded files
|
range_header = self.override_range_header(request)
|
|
# Use the storage-specific method to get data stream and content type
|
stream, content_type, metadata = storage.get_bytes_stream(uri, range_header=range_header)
|
|
if stream is None:
|
logger.error(f'Failed to get direct stream from storage {storage}')
|
return Response(
|
{'error': 'Storage stream failed while proxying data', 'detail': 'Stream is None'},
|
status=status.HTTP_424_FAILED_DEPENDENCY,
|
)
|
|
# Create time-limited stream
|
time_limited_stream = self.time_limited_chunker(stream)
|
|
# Set up streaming response with storage's status code
|
status_code = metadata['StatusCode']
|
response = StreamingHttpResponse(
|
time_limited_stream, content_type=content_type or 'application/octet-stream', status=status_code
|
)
|
|
# Prepare response headers
|
response = self.prepare_headers(response, metadata, request, project)
|
|
# Process cached requests using ETag - with range-aware handling
|
if settings.RESOLVER_PROXY_ENABLE_ETAG_CACHE and 'Range' not in request.headers:
|
if request.headers.get('If-None-Match') == response.headers.get('ETag'):
|
return HttpResponse(status=status.HTTP_304_NOT_MODIFIED)
|
|
return response
|
|
except Exception as e:
|
logger.error(f'Error in direct proxy from storage: {e}', exc_info=True)
|
return Response(
|
{'error': 'Storage stream failed while proxying data', 'detail': str(e)},
|
status=status.HTTP_424_FAILED_DEPENDENCY,
|
)
|
|
|
@extend_schema(exclude=True)
|
class TaskResolveStorageUri(ResolveStorageUriAPIMixin, APIView):
|
"""A file proxy to presign storage urls at the task level.
|
|
If the storage has presign=False, it will proxy the data through Label Studio
|
instead of redirecting to presigned URLs.
|
"""
|
|
http_method_names = ['get']
|
permission_classes = (IsAuthenticated,)
|
|
def get(self, request, *args, **kwargs):
|
"""Get the presigned url for a given fileuri or proxy data through Label Studio"""
|
request = self.request
|
task_id = kwargs.get('task_id')
|
fileuri = request.GET.get('fileuri')
|
|
if fileuri is None or task_id is None:
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
task = Task.objects.get(pk=task_id)
|
except Task.DoesNotExist:
|
return Response(status=status.HTTP_404_NOT_FOUND)
|
|
return self.resolve(request, fileuri, task)
|
|
|
@extend_schema(exclude=True)
|
class ProjectResolveStorageUri(ResolveStorageUriAPIMixin, APIView):
|
"""A file proxy to presign storage urls at the project level.
|
|
If the storage has presign=False, it will proxy the data through Label Studio
|
instead of redirecting to presigned URLs.
|
"""
|
|
http_method_names = ['get']
|
permission_classes = (IsAuthenticated,)
|
|
def get(self, request, *args, **kwargs):
|
"""Get the presigned url for a given fileuri or proxy data through Label Studio"""
|
request = self.request
|
project_id = kwargs.get('project_id')
|
fileuri = request.GET.get('fileuri')
|
|
if fileuri is None or project_id is None:
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
project = Project.objects.get(pk=project_id)
|
except Project.DoesNotExist:
|
return Response(status=status.HTTP_404_NOT_FOUND)
|
|
return self.resolve(request, fileuri, project)
|