Bin
2025-12-17 d616898802dfe7e5dd648bcf53c6d1f86b6d3642
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
from typing import Dict, Iterable, List, Union
 
from django.shortcuts import get_object_or_404
from io_storages.base_models import ImportStorage
from rest_framework.exceptions import PermissionDenied, ValidationError
 
from .azure_blob.api import AzureBlobExportStorageListAPI, AzureBlobImportStorageListAPI
from .gcs.api import GCSExportStorageListAPI, GCSImportStorageListAPI
from .redis.api import RedisExportStorageListAPI, RedisImportStorageListAPI
from .s3.api import S3ExportStorageListAPI, S3ImportStorageListAPI
 
logger = logging.getLogger(__name__)
 
 
def validate_storage_instance(request, serializer_class):
    """
    Preload and prepare a storage instance from request data.
 
    This function handles the common logic for loading existing storage instances
    or creating new ones from request data, including permission checks and
    serializer validation.
 
    Args:
        request: The HTTP request containing storage data
        serializer_class: The serializer class to use for validation
 
    Returns:
        The prepared storage instance
 
    Raises:
        PermissionDenied: If user doesn't have permission to access the storage
        ValidationError: If serializer validation fails
    """
    if not serializer_class or not hasattr(serializer_class, 'Meta'):
        raise ValidationError('Invalid or missing serializer class')
 
    storage_id = request.data.get('id')
    instance = None
 
    if storage_id:
        instance = get_object_or_404(serializer_class.Meta.model.objects.all(), pk=storage_id)
        if not instance.has_permission(request.user):
            raise PermissionDenied()
 
    # combine instance fields with request.data
    serializer = serializer_class(data=request.data)
    serializer.is_valid(raise_exception=True)
 
    # if storage exists, we have to use instance from DB,
    # because instance from serializer won't have credentials, they were popped intentionally
    if instance:
        instance = serializer.update(instance, serializer.validated_data)
    else:
        instance = serializer_class.Meta.model(**serializer.validated_data)
 
    # double check: not all storages validate connection in serializer, just make another explicit check here
    try:
        instance.validate_connection()
    except Exception as exc:
        logger.error(f'Error validating storage connection: {exc}')
        raise ValidationError('Error validating storage connection')
 
    return instance
 
 
def get_storage_list():
    return [
        {
            'name': 's3',
            'title': 'AWS S3',
            'import_list_api': S3ImportStorageListAPI,
            'export_list_api': S3ExportStorageListAPI,
        },
        {
            'name': 'gcs',
            'title': 'Google Cloud Storage',
            'import_list_api': GCSImportStorageListAPI,
            'export_list_api': GCSExportStorageListAPI,
        },
        {
            'name': 'azure',
            'title': 'Microsoft Azure',
            'import_list_api': AzureBlobImportStorageListAPI,
            'export_list_api': AzureBlobExportStorageListAPI,
        },
        {
            'name': 'redis',
            'title': 'Redis',
            'import_list_api': RedisImportStorageListAPI,
            'export_list_api': RedisExportStorageListAPI,
        },
    ]
 
 
def get_storage_by_url(url: Union[str, List, Dict], storage_objects: Iterable[ImportStorage]) -> ImportStorage:
    """Find the first compatible storage and returns storage that can emit pre-signed URL"""
 
    for storage_object in storage_objects:
        # check url is string because task can have int, float, dict, list
        # and 'can_resolve_url' will fail
        if isinstance(url, str) and storage_object.can_resolve_url(url):
            return storage_object
 
    # url is list or dict
    if isinstance(url, dict) or isinstance(url, list):
        for storage_object in storage_objects:
            if storage_object.can_resolve_url(url):
                # note: only first found storage_object will be used for link resolving
                # can_resolve_url now checks both the scheme and the bucket to ensure the correct storage is used
                return storage_object