import logging
|
from typing import Dict, Iterable, List, Union
|
|
from django.shortcuts import get_object_or_404
|
from io_storages.base_models import ImportStorage
|
from rest_framework.exceptions import PermissionDenied, ValidationError
|
|
from .azure_blob.api import AzureBlobExportStorageListAPI, AzureBlobImportStorageListAPI
|
from .gcs.api import GCSExportStorageListAPI, GCSImportStorageListAPI
|
from .redis.api import RedisExportStorageListAPI, RedisImportStorageListAPI
|
from .s3.api import S3ExportStorageListAPI, S3ImportStorageListAPI
|
|
logger = logging.getLogger(__name__)
|
|
|
def validate_storage_instance(request, serializer_class):
|
"""
|
Preload and prepare a storage instance from request data.
|
|
This function handles the common logic for loading existing storage instances
|
or creating new ones from request data, including permission checks and
|
serializer validation.
|
|
Args:
|
request: The HTTP request containing storage data
|
serializer_class: The serializer class to use for validation
|
|
Returns:
|
The prepared storage instance
|
|
Raises:
|
PermissionDenied: If user doesn't have permission to access the storage
|
ValidationError: If serializer validation fails
|
"""
|
if not serializer_class or not hasattr(serializer_class, 'Meta'):
|
raise ValidationError('Invalid or missing serializer class')
|
|
storage_id = request.data.get('id')
|
instance = None
|
|
if storage_id:
|
instance = get_object_or_404(serializer_class.Meta.model.objects.all(), pk=storage_id)
|
if not instance.has_permission(request.user):
|
raise PermissionDenied()
|
|
# combine instance fields with request.data
|
serializer = serializer_class(data=request.data)
|
serializer.is_valid(raise_exception=True)
|
|
# if storage exists, we have to use instance from DB,
|
# because instance from serializer won't have credentials, they were popped intentionally
|
if instance:
|
instance = serializer.update(instance, serializer.validated_data)
|
else:
|
instance = serializer_class.Meta.model(**serializer.validated_data)
|
|
# double check: not all storages validate connection in serializer, just make another explicit check here
|
try:
|
instance.validate_connection()
|
except Exception as exc:
|
logger.error(f'Error validating storage connection: {exc}')
|
raise ValidationError('Error validating storage connection')
|
|
return instance
|
|
|
def get_storage_list():
|
return [
|
{
|
'name': 's3',
|
'title': 'AWS S3',
|
'import_list_api': S3ImportStorageListAPI,
|
'export_list_api': S3ExportStorageListAPI,
|
},
|
{
|
'name': 'gcs',
|
'title': 'Google Cloud Storage',
|
'import_list_api': GCSImportStorageListAPI,
|
'export_list_api': GCSExportStorageListAPI,
|
},
|
{
|
'name': 'azure',
|
'title': 'Microsoft Azure',
|
'import_list_api': AzureBlobImportStorageListAPI,
|
'export_list_api': AzureBlobExportStorageListAPI,
|
},
|
{
|
'name': 'redis',
|
'title': 'Redis',
|
'import_list_api': RedisImportStorageListAPI,
|
'export_list_api': RedisExportStorageListAPI,
|
},
|
]
|
|
|
def get_storage_by_url(url: Union[str, List, Dict], storage_objects: Iterable[ImportStorage]) -> ImportStorage:
|
"""Find the first compatible storage and returns storage that can emit pre-signed URL"""
|
|
for storage_object in storage_objects:
|
# check url is string because task can have int, float, dict, list
|
# and 'can_resolve_url' will fail
|
if isinstance(url, str) and storage_object.can_resolve_url(url):
|
return storage_object
|
|
# url is list or dict
|
if isinstance(url, dict) or isinstance(url, list):
|
for storage_object in storage_objects:
|
if storage_object.can_resolve_url(url):
|
# note: only first found storage_object will be used for link resolving
|
# can_resolve_url now checks both the scheme and the bucket to ensure the correct storage is used
|
return storage_object
|