"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import inspect import logging import os import time from core.permissions import ViewClassPermission, all_permissions from core.utils.io import read_yaml from django.conf import settings from drf_spectacular.utils import extend_schema from io_storages.serializers import ExportStorageSerializer, ImportStorageSerializer from projects.models import Project from rest_framework import generics, status from rest_framework.exceptions import NotFound, ValidationError from rest_framework.parsers import FormParser, JSONParser, MultiPartParser from rest_framework.response import Response logger = logging.getLogger(__name__) class ImportStorageListAPI(generics.ListCreateAPIView): permission_required = ViewClassPermission( GET=all_permissions.storages_view, POST=all_permissions.storages_change, ) parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = ImportStorageSerializer def get_queryset(self): project_pk = self.request.query_params.get('project') if not project_pk: raise ValidationError('query parameter "project" is required') project = generics.get_object_or_404(Project, pk=project_pk) self.check_object_permissions(self.request, project) StorageClass = self.serializer_class.Meta.model storages = StorageClass.objects.filter(project_id=project.id) # check failed jobs and sync their statuses StorageClass.ensure_storage_statuses(storages) return storages class ImportStorageDetailAPI(generics.RetrieveUpdateDestroyAPIView): """RUD storage by pk specified in URL""" permission_required = ViewClassPermission( GET=all_permissions.storages_view, PATCH=all_permissions.storages_change, PUT=all_permissions.storages_change, DELETE=all_permissions.storages_change, ) parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = ImportStorageSerializer @extend_schema(exclude=True) def put(self, request, *args, **kwargs): return super(ImportStorageDetailAPI, self).put(request, *args, **kwargs) class ExportStorageListAPI(generics.ListCreateAPIView): permission_required = ViewClassPermission( GET=all_permissions.storages_view, POST=all_permissions.storages_change, ) parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = ExportStorageSerializer def get_queryset(self): project_pk = self.request.query_params.get('project') if not project_pk: raise ValidationError('query parameter "project" is required') project = generics.get_object_or_404(Project, pk=project_pk) self.check_object_permissions(self.request, project) StorageClass = self.serializer_class.Meta.model storages = StorageClass.objects.filter(project_id=project.id) # check failed jobs and sync their statuses StorageClass.ensure_storage_statuses(storages) return storages def perform_create(self, serializer): # double check: not export storages don't validate connection in serializer, # just make another explicit check here, note: in this create API we have credentials in request.data instance = serializer.Meta.model(**serializer.validated_data) try: instance.validate_connection() except Exception as exc: raise ValidationError(exc) storage = serializer.save() if settings.SYNC_ON_TARGET_STORAGE_CREATION: storage.sync() class ExportStorageDetailAPI(generics.RetrieveUpdateDestroyAPIView): """RUD storage by pk specified in URL""" permission_required = ViewClassPermission( GET=all_permissions.storages_view, PATCH=all_permissions.storages_change, PUT=all_permissions.storages_change, DELETE=all_permissions.storages_change, ) parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = ExportStorageSerializer @extend_schema(exclude=True) def put(self, request, *args, **kwargs): return super(ExportStorageDetailAPI, self).put(request, *args, **kwargs) class ImportStorageSyncAPI(generics.GenericAPIView): permission_required = ViewClassPermission( POST=all_permissions.storages_sync, ) parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = ImportStorageSerializer def get_queryset(self): ImportStorageClass = self.serializer_class.Meta.model return ImportStorageClass.objects.all() def post(self, request, *args, **kwargs): storage = self.get_object() # check connectivity & access, raise an exception if not satisfied if not storage.synchronizable: response_data = {'message': f'Storage {str(storage.id)} is not synchronizable'} return Response(status=status.HTTP_400_BAD_REQUEST, data=response_data) storage.validate_connection() storage.sync() storage.refresh_from_db() return Response(self.serializer_class(storage).data) class ExportStorageSyncAPI(generics.GenericAPIView): permission_required = ViewClassPermission( POST=all_permissions.storages_sync, ) parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = ExportStorageSerializer def get_queryset(self): ExportStorageClass = self.serializer_class.Meta.model return ExportStorageClass.objects.all() def post(self, request, *args, **kwargs): storage = self.get_object() # check connectivity & access, raise an exception if not satisfied if not storage.synchronizable: response_data = {'message': f'Storage {str(storage.id)} is not synchronizable'} return Response(status=status.HTTP_400_BAD_REQUEST, data=response_data) storage.validate_connection() storage.sync() storage.refresh_from_db() return Response(self.serializer_class(storage).data) class StorageValidateAPI(generics.CreateAPIView): permission_required = all_permissions.storages_change parser_classes = (JSONParser, FormParser, MultiPartParser) def create(self, request, *args, **kwargs): from .functions import validate_storage_instance validate_storage_instance(request, self.serializer_class) return Response() @extend_schema(exclude=True) class ImportStorageListFilesAPI(generics.CreateAPIView): permission_required = all_permissions.storages_change parser_classes = (JSONParser, FormParser, MultiPartParser) serializer_class = None # Default serializer def __init__(self, serializer_class=None, *args, **kwargs): self.serializer_class = serializer_class super().__init__(*args, **kwargs) @extend_schema(exclude=True) def create(self, request, *args, **kwargs): from .functions import validate_storage_instance instance = validate_storage_instance(request, self.serializer_class) limit = int(request.data.get('limit', settings.DEFAULT_STORAGE_LIST_LIMIT)) try: files = [] start_time = time.time() timeout_seconds = 30 for object in instance.iter_objects(): files.append(instance.get_unified_metadata(object)) # Check if we've reached the file limit if len(files) >= limit: files.append({'key': None, 'last_modified': None, 'size': None}) break # Check if we've exceeded the timeout if time.time() - start_time > timeout_seconds: files.append({'key': '... storage scan timeout reached ...', 'last_modified': None, 'size': None}) break return Response({'files': files}) except Exception as exc: logger.exception('Error listing storage files: %s', exc) raise ValidationError('Failed to list storage files') @extend_schema(exclude=True) class StorageFormLayoutAPI(generics.RetrieveAPIView): permission_required = all_permissions.storages_change parser_classes = (JSONParser, FormParser, MultiPartParser) storage_type = None @extend_schema(exclude=True) def get(self, request, *args, **kwargs): form_layout_file = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), 'form_layout.yml') if not os.path.exists(form_layout_file): raise NotFound(f'"form_layout.yml" is not found for {self.__class__.__name__}') form_layout = read_yaml(form_layout_file) form_layout = self.post_process_form(form_layout) return Response(form_layout[self.storage_type]) def post_process_form(self, form_layout): return form_layout class ImportStorageValidateAPI(StorageValidateAPI): serializer_class = ImportStorageSerializer class ExportStorageValidateAPI(StorageValidateAPI): serializer_class = ExportStorageSerializer class ImportStorageFormLayoutAPI(StorageFormLayoutAPI): storage_type = 'ImportStorage' class ExportStorageFormLayoutAPI(StorageFormLayoutAPI): storage_type = 'ExportStorage'