"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import logging import os import traceback as tb from datetime import datetime from urllib.parse import urlparse from core.feature_flags import flag_set from core.permissions import all_permissions from core.redis import start_job_async_or_sync from core.utils.common import batch from django.conf import settings from django.core.files import File from django.core.files.storage import FileSystemStorage from django.db import transaction from django.db.models import Prefetch from django.http import FileResponse, HttpResponse from django.utils.decorators import method_decorator from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema from projects.models import Project from ranged_fileresponse import RangedFileResponse from rest_framework import generics, status from rest_framework.exceptions import NotFound, ValidationError from rest_framework.response import Response from rest_framework.views import APIView from tasks.models import Task from .models import ConvertedFormat, DataExport, Export from .serializers import ( ExportConvertSerializer, ExportCreateSerializer, ExportDataSerializer, ExportParamSerializer, ExportSerializer, ) logger = logging.getLogger(__name__) @method_decorator( name='get', decorator=extend_schema( tags=['Export'], summary='Get export formats', description='Retrieve the available export formats for the current project by ID.', parameters=[ OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ), ], responses={ 200: OpenApiResponse( description='Export formats', response={ 'type': 'array', 'items': {'type': 'string', 'title': 'Export format'}, 'description': 'List of available formats', 'title': 'Format list', }, ) }, extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'list_formats', 'x-fern-audiences': ['public'], }, ), ) class ExportFormatsListAPI(generics.RetrieveAPIView): permission_required = all_permissions.projects_view def get_queryset(self): return Project.objects.filter(organization=self.request.user.active_organization) def get(self, request, *args, **kwargs): project = self.get_object() formats = DataExport.get_export_formats(project) return Response(formats) @method_decorator( name='get', decorator=extend_schema( parameters=[ OpenApiParameter( name='export_type', type=OpenApiTypes.STR, location='query', description='Selected export format (JSON by default)', ), OpenApiParameter( name='download_all_tasks', type=OpenApiTypes.BOOL, location='query', description='If true, download all tasks regardless of status. If false, download only annotated tasks.', ), OpenApiParameter( name='download_resources', type=OpenApiTypes.BOOL, location='query', description='If true, download all resource files such as images, audio, and others relevant to the tasks.', ), OpenApiParameter( name='ids', many=True, location='query', description='Specify a list of task IDs to retrieve only the details for those tasks.', ), OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ), ], tags=['Export'], summary='Easy export of tasks and annotations', description=""" Note: if you have a large project it's recommended to use export snapshots, this easy export endpoint might have timeouts.

Export annotated tasks as a file in a specific format. For example, to export JSON annotations for a project to a file called `annotations.json`, run the following from the command line: ```bash curl -X GET {}/api/projects/{{id}}/export?exportType=JSON -H \'Authorization: Token abc123\' --output 'annotations.json' ``` To export all tasks, including skipped tasks and others without annotations, run the following from the command line: ```bash curl -X GET {}/api/projects/{{id}}/export?exportType=JSON&download_all_tasks=true -H \'Authorization: Token abc123\' --output 'annotations.json' ``` To export specific tasks with IDs of 123 and 345, run the following from the command line: ```bash curl -X GET '{}/api/projects/{{id}}/export?ids[]=123&ids[]=345' -H 'Authorization: Token abc123' --output 'annotations.json' ``` """.format( settings.HOSTNAME or 'https://localhost:8080', settings.HOSTNAME or 'https://localhost:8080', settings.HOSTNAME or 'https://localhost:8080', ), responses={ 200: OpenApiResponse( description='Exported data', response={ 'title': 'Export file', 'description': 'Export file with results', 'type': 'string', 'format': 'binary', }, ) }, extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'download_sync', 'x-fern-audiences': ['public'], }, ), ) class ExportAPI(generics.RetrieveAPIView): permission_required = all_permissions.projects_change def get_queryset(self): return Project.objects.filter(organization=self.request.user.active_organization) def get_task_queryset(self, queryset): # Import here to avoid circular dependencies from core.feature_flags import flag_set from tasks.models import Annotation # Create a prefetch for annotations with FSM state annotations_qs = Annotation.objects.all() # Only annotate FSM state if both feature flags are enabled user = getattr(self.request, 'user', None) if ( flag_set('fflag_feat_fit_568_finite_state_management', user=user) and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) and hasattr(annotations_qs, 'with_state') ): annotations_qs = annotations_qs.with_state() qs = queryset.select_related('project').prefetch_related( Prefetch('annotations', queryset=annotations_qs), 'predictions' ) # Add FSM state annotation to tasks as well to avoid N+1 queries during export if ( flag_set('fflag_feat_fit_568_finite_state_management', user=user) and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) and hasattr(qs, 'with_state') ): qs = qs.with_state() return qs def get(self, request, *args, **kwargs): project = self.get_object() query_serializer = ExportParamSerializer(data=request.GET) query_serializer.is_valid(raise_exception=True) export_type = ( query_serializer.validated_data.get('exportType') or query_serializer.validated_data['export_type'] ) only_finished = not query_serializer.validated_data['download_all_tasks'] download_resources = query_serializer.validated_data['download_resources'] interpolate_key_frames = query_serializer.validated_data['interpolate_key_frames'] # 调试日志: API 层参数 logger.info(f'[Export API Debug] ========== Export Request Received ==========') logger.info(f'[Export API Debug] Project ID: {project.id}') logger.info(f'[Export API Debug] export_type: {export_type}') logger.info(f'[Export API Debug] download_resources: {download_resources}') logger.info(f'[Export API Debug] only_finished: {only_finished}') logger.info(f'[Export API Debug] Request GET params: {dict(request.GET)}') tasks_ids = request.GET.getlist('ids[]') logger.debug('Get tasks') query = Task.objects.filter(project=project) if tasks_ids and len(tasks_ids) > 0: logger.debug(f'Select only subset of {len(tasks_ids)} tasks') query = query.filter(id__in=tasks_ids) if only_finished: query = query.filter(annotations__isnull=False).distinct() task_ids = query.values_list('id', flat=True) logger.info(f'[Export API Debug] Total tasks to export: {len(task_ids)}') logger.debug('Serialize tasks for export') tasks = [] for _task_ids in batch(task_ids, 1000): tasks += ExportDataSerializer( self.get_task_queryset(query.filter(id__in=_task_ids)), many=True, expand=['drafts'], context={'interpolate_key_frames': interpolate_key_frames}, ).data logger.debug('Prepare export files') export_file, content_type, filename = DataExport.generate_export_file( project, tasks, export_type, download_resources, request.GET, hostname=request.build_absolute_uri('/') ) r = FileResponse(export_file, as_attachment=True, content_type=content_type, filename=filename) r['filename'] = filename return r # @method_decorator( # name='get', # decorator=extend_schema( # tags=['Export'], # summary='List exported files', # description=""" # Retrieve a list of files exported from the Label Studio UI using the Export button on the Data Manager page. # To retrieve the files themselves, see [Download export file](/api#operation/api_projects_exports_download_read). # """, # ), # ) just in case we put it back in swagger API docs @extend_schema(exclude=True) class ProjectExportFiles(generics.RetrieveAPIView): permission_required = all_permissions.projects_change def get_queryset(self): return Project.objects.filter(organization=self.request.user.active_organization) def get(self, request, *args, **kwargs): # project permission check self.get_object() paths = [] for name in os.listdir(settings.EXPORT_DIR): if name.endswith('.json') and not name.endswith('-info.json'): project_id = name.split('-')[0] if str(kwargs['pk']) == project_id: paths.append(settings.EXPORT_URL_ROOT + name) items = [{'name': p.split('/')[2].split('.')[0], 'url': p} for p in sorted(paths)[::-1]] return Response({'export_files': items}, status=status.HTTP_200_OK) @extend_schema(exclude=True) class ProjectExportFilesAuthCheck(APIView): """Check auth for nginx auth_request (/api/auth/export/)""" http_method_names = ['get'] permission_required = all_permissions.projects_change def get(self, request, *args, **kwargs): """Get export files list""" original_url = request.META['HTTP_X_ORIGINAL_URI'] filename = original_url.replace('/export/', '') project_id = filename.split('-')[0] try: pk = int(project_id) except ValueError: return Response({'detail': 'Incorrect filename in export'}, status=status.HTTP_422_UNPROCESSABLE_ENTITY) generics.get_object_or_404(Project.objects.filter(organization=self.request.user.active_organization), pk=pk) return Response({'detail': 'auth ok'}, status=status.HTTP_200_OK) @method_decorator( name='get', decorator=extend_schema( tags=['Export'], summary='List all export snapshots', description='Returns a list of exported files for a specific project by ID.', parameters=[ OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ) ], extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'list', 'x-fern-audiences': ['public'], }, ), ) @method_decorator( name='post', decorator=extend_schema( tags=['Export'], summary='Create new export snapshot', description='Create a new export request to start a background task and generate an export file for a specific project by ID.', parameters=[ OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ) ], extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'create', 'x-fern-audiences': ['public'], }, ), ) class ExportListAPI(generics.ListCreateAPIView): queryset = Export.objects.all().order_by('-created_at') project_model = Project serializer_class = ExportSerializer permission_required = all_permissions.projects_change def get_serializer_class(self): if self.request.method == 'GET': return ExportSerializer if self.request.method == 'POST': return ExportCreateSerializer return super().get_serializer_class() def get_serializer_context(self): context = super(ExportListAPI, self).get_serializer_context() context['user'] = self.request.user return context def _get_project(self): project_pk = self.kwargs.get('pk') project = generics.get_object_or_404( self.project_model.objects.for_user(self.request.user), pk=project_pk, ) return project def perform_create(self, serializer): task_filter_options = serializer.validated_data.pop('task_filter_options') annotation_filter_options = serializer.validated_data.pop('annotation_filter_options') serialization_options = serializer.validated_data.pop('serialization_options') project = self._get_project() serializer.save(project=project, created_by=self.request.user) instance = serializer.instance instance.run_file_exporting( task_filter_options=task_filter_options, annotation_filter_options=annotation_filter_options, serialization_options=serialization_options, ) def get_queryset(self): project = self._get_project() return super().get_queryset().filter(project=project) def filter_queryset(self, queryset): queryset = super().filter_queryset(queryset) return queryset.order_by('-created_at')[:100] @method_decorator( name='get', decorator=extend_schema( tags=['Export'], summary='Get export snapshot by ID', description='Retrieve information about an export file by export ID for a specific project.', parameters=[ OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ), OpenApiParameter( name='export_pk', type=OpenApiTypes.INT, location='path', description='Primary key identifying the export file.', ), ], extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'get', 'x-fern-audiences': ['public'], }, ), ) @method_decorator( name='delete', decorator=extend_schema( tags=['Export'], summary='Delete export snapshot', description='Delete an export file by specified export ID.', parameters=[ OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ), OpenApiParameter( name='export_pk', type=OpenApiTypes.INT, location='path', description='Primary key identifying the export file.', ), ], extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'delete', 'x-fern-audiences': ['public'], }, ), ) class ExportDetailAPI(generics.RetrieveDestroyAPIView): queryset = Export.objects.all() project_model = Project serializer_class = ExportSerializer lookup_url_kwarg = 'export_pk' permission_required = all_permissions.projects_change def delete(self, *args, **kwargs): if flag_set('ff_back_dev_4664_remove_storage_file_on_export_delete_29032023_short'): try: export = self.get_object() export.file.delete() for converted_format in export.converted_formats.all(): if converted_format.file: converted_format.file.delete() except Exception as e: return Response( status=status.HTTP_500_INTERNAL_SERVER_ERROR, data={ 'detail': 'Could not delete file from storage. Check that your user has permissions to delete files: %s' % str(e) }, ) return super().delete(*args, **kwargs) def _get_project(self): project_pk = self.kwargs.get('pk') project = generics.get_object_or_404( self.project_model.objects.for_user(self.request.user), pk=project_pk, ) return project def get_queryset(self): project = self._get_project() return super().get_queryset().filter(project=project) @method_decorator( name='get', decorator=extend_schema( tags=['Export'], summary='Download export snapshot as file in specified format', description=""" Download an export file in the specified format for a specific project. Specify the project ID with the `id` parameter in the path and the ID of the export file you want to download using the `export_pk` parameter in the path. Get the `export_pk` from the response of the request to [Create new export](/api#operation/api_projects_exports_create) or after [listing export files](/api#operation/api_projects_exports_list). """, parameters=[ OpenApiParameter( name='exportType', type=OpenApiTypes.STR, location='query', description='Selected export format', ), OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ), OpenApiParameter( name='export_pk', type=OpenApiTypes.INT, location='path', description='Primary key identifying the export file.', ), ], responses={ (200, 'application/*'): OpenApiResponse( description='Export file', response={ 'type': 'string', 'format': 'binary', }, ), }, extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'download', 'x-fern-audiences': ['public'], }, ), ) class ExportDownloadAPI(generics.RetrieveAPIView): queryset = Export.objects.all() project_model = Project serializer_class = None lookup_url_kwarg = 'export_pk' permission_required = all_permissions.projects_change def _get_project(self): project_pk = self.kwargs.get('pk') project = generics.get_object_or_404( self.project_model.objects.for_user(self.request.user), pk=project_pk, ) return project def get_queryset(self): project = self._get_project() return super().get_queryset().filter(project=project) def get(self, request, *args, **kwargs): snapshot = self.get_object() export_type = request.GET.get('exportType') if snapshot.status != Export.Status.COMPLETED: return HttpResponse('Export is not completed', status=404) if flag_set('fflag_fix_all_lsdv_4813_async_export_conversion_22032023_short', request.user): file = snapshot.file if export_type is not None and export_type != 'JSON': converted_file = snapshot.converted_formats.filter(export_type=export_type).first() if converted_file is None: raise NotFound(f'{export_type} format is not converted yet') file = converted_file.file if isinstance(file.storage, FileSystemStorage): url = file.storage.url(file.name) else: url = file.storage.url(file.name, storage_url=True) protocol = urlparse(url).scheme # NGINX downloads are a solid way to make uwsgi workers free if settings.USE_NGINX_FOR_EXPORT_DOWNLOADS: # let NGINX handle it response = HttpResponse() # below header tells NGINX to catch it and serve, see docker-config/nginx-app.conf redirect = '/file_download/' + protocol + '/' + url.replace(protocol + '://', '') response['X-Accel-Redirect'] = redirect response['Content-Disposition'] = 'attachment; filename="{}"'.format(file.name) response['filename'] = os.path.basename(file.name) return response # No NGINX: standard way for export downloads in the community edition else: ext = file.name.split('.')[-1] response = RangedFileResponse(request, file, content_type=f'application/{ext}') response['Content-Disposition'] = f'attachment; filename="{file.name}"' response['filename'] = os.path.basename(file.name) return response else: if export_type is None: file_ = snapshot.file else: file_ = snapshot.convert_file(export_type) if file_ is None: return HttpResponse("Can't get file", status=404) ext = file_.name.split('.')[-1] response = RangedFileResponse(request, file_, content_type=f'application/{ext}') response['Content-Disposition'] = f'attachment; filename="{file_.name}"' response['filename'] = file_.name return response def async_convert(converted_format_id, export_type, project, hostname, download_resources=False, **kwargs): with transaction.atomic(): try: converted_format = ConvertedFormat.objects.get(id=converted_format_id) except ConvertedFormat.DoesNotExist: logger.error(f'ConvertedFormat with id {converted_format_id} not found, conversion failed') return if converted_format.status != ConvertedFormat.Status.CREATED: logger.error(f'Conversion for export id {converted_format.export.id} to {export_type} already started') return converted_format.status = ConvertedFormat.Status.IN_PROGRESS converted_format.save(update_fields=['status']) snapshot = converted_format.export converted_file = snapshot.convert_file(export_type, download_resources=download_resources, hostname=hostname) if converted_file is None: raise ValidationError('No converted file found, probably there are no annotations in the export snapshot') md5 = Export.eval_md5(converted_file) ext = converted_file.name.split('.')[-1] now = datetime.now() file_name = f'project-{project.id}-at-{now.strftime("%Y-%m-%d-%H-%M")}-{md5[0:8]}.{ext}' file_path = f'{project.id}/{file_name}' # finally file will be in settings.DELAYED_EXPORT_DIR/project.id/file_name file_ = File(converted_file, name=file_path) converted_format.file.save(file_path, file_) converted_format.status = ConvertedFormat.Status.COMPLETED converted_format.save(update_fields=['file', 'status']) def set_convert_background_failure(job, connection, type, value, traceback_obj): from data_export.models import ConvertedFormat convert_id = job.args[0] try: trace = ''.join(tb.format_exception(type, value, traceback_obj)) except Exception as e: if flag_set('fflag_fix_back_leap_1818_set_convert_background_failure_logging_02062025_short'): logger.error(f'Failed to format traceback: {job=} {type=} {value=} {traceback_obj=} {e=}', exc_info=True) trace = 'Exception while processing traceback. See stderr for details' ConvertedFormat.objects.filter(id=convert_id).update(status=Export.Status.FAILED, traceback=trace) @method_decorator( name='post', decorator=extend_schema( tags=['Export'], summary='Export conversion', description='Convert export snapshot to selected format', request=ExportConvertSerializer, parameters=[ OpenApiParameter( name='id', type=OpenApiTypes.INT, location='path', description='A unique integer value identifying this project.', ), OpenApiParameter( name='export_pk', type=OpenApiTypes.INT, location='path', description='Primary key identifying the export file.', ), ], responses={ 200: OpenApiResponse( response={ 'type': 'object', 'properties': { 'export_type': {'type': 'string'}, 'converted_format': {'type': 'integer'}, }, }, ), }, extensions={ 'x-fern-sdk-group-name': ['projects', 'exports'], 'x-fern-sdk-method-name': 'convert', 'x-fern-audiences': ['public'], }, ), ) class ExportConvertAPI(generics.CreateAPIView): queryset = Export.objects.all() lookup_url_kwarg = 'export_pk' permission_required = all_permissions.projects_change def post(self, request, *args, **kwargs): snapshot = self.get_object() serializer = ExportConvertSerializer(data=request.data, context={'project': snapshot.project}) serializer.is_valid(raise_exception=True) export_type = serializer.validated_data['export_type'] download_resources = serializer.validated_data.get('download_resources') converted_format, created = ConvertedFormat.objects.exclude( status=ConvertedFormat.Status.FAILED ).get_or_create(export=snapshot, export_type=export_type) if not created: raise ValidationError(f'Conversion to {export_type} already started') start_job_async_or_sync( async_convert, converted_format.id, export_type, snapshot.project, request.build_absolute_uri('/'), download_resources=download_resources, on_failure=set_convert_background_failure, ) return Response({'export_type': export_type, 'converted_format': converted_format.id})