Bin
2025-12-17 262fecaa75b2909ad244f12c3b079ed3ff4ae329
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import logging
import os
import threading
from urllib.parse import unquote, urlsplit, urlunsplit
 
import google.auth
from django.conf import settings
from django.contrib.staticfiles.storage import ManifestStaticFilesStorage
from storages.backends.azure_storage import AzureStorage
from storages.backends.gcloud import GoogleCloudStorage, _quote, clean_name
from storages.backends.s3boto3 import S3Boto3Storage
 
logger = logging.getLogger(__name__)
 
 
class SkipMissedManifestStaticFilesStorage(ManifestStaticFilesStorage):
    """We need this class to escape missing files from
    django.contrib.staticfiles.finders.FileSystemFinder:
    this class tries to find js/css/png/jpg/... inside of you js/css/...
    """
 
    # Disable strict cache manifest checking
    manifest_strict = False
 
    def hashed_name(self, name, content=None, filename=None):
        # `filename` is the name of file to hash if `content` isn't given.
        # `name` is the base name to construct the new hashed filename from.
        parsed_name = urlsplit(unquote(name))
        clean_name = parsed_name.path.strip()
        filename = (filename and urlsplit(unquote(filename)).path.strip()) or clean_name
        opened = content is None
        if opened:
            if not self.exists(filename):
                return ''
            try:
                content = self.open(filename)
            except IOError:
                # Handle directory paths and fragments
                return name
        try:
            file_hash = self.file_hash(clean_name, content)
        finally:
            if opened:
                content.close()
        path, filename = os.path.split(clean_name)
        root, ext = os.path.splitext(filename)
        if file_hash is not None:
            file_hash = '.%s' % file_hash
        hashed_name = os.path.join(path, '%s%s%s' % (root, file_hash, ext))
        unparsed_name = list(parsed_name)
        unparsed_name[2] = hashed_name
        # Special casing for a @font-face hack, like url(myfont.eot?#iefix")
        # http://www.fontspring.com/blog/the-new-bulletproof-font-face-syntax
        if '?#' in name and not unparsed_name[3]:
            unparsed_name[2] += '?'
        return urlunsplit(unparsed_name)
 
 
class StorageProxyMixin:
    def url(self, name, storage_url=False, *args, **kwargs):
        if storage_url is True:
            return super().url(name, *args, **kwargs)
        return f'{settings.HOSTNAME}/storage-data/uploaded/?filepath={name}'
 
 
class CustomS3Boto3Storage(StorageProxyMixin, S3Boto3Storage):
    pass
 
 
class CustomAzureStorage(StorageProxyMixin, AzureStorage):
    pass
 
 
class AlternativeGoogleCloudStorageBase(GoogleCloudStorage):
    """A subclass to force the use of the IAM signBlob API
    This allows the signing of blob URLs without having to use a credential file.
    The service account must have the iam.serviceAccounts.signBlob permission."""
 
    def __init__(self, **settings):
        super().__init__(**settings)
        self._signing_credentials = None
        self._signing_credentials_lock = threading.Lock()
 
    def url(self, name):
        """
        Return public url or a signed url for the Blob.
        This DOES NOT check for existence of Blob - that makes codes too slow
        for many use cases.
        Overridden to force the use of the IAM signBlob API.
        See https://github.com/googleapis/python-storage/blob/519074112775c19742522158f612b467cf590219/google/cloud/storage/_signing.py#L628  # NOQA
        """
        name = self._normalize_name(clean_name(name))
        blob = self.bucket.blob(name)
        blob_params = self.get_object_parameters(name)
        no_signed_url = blob_params.get('acl', self.default_acl) == 'publicRead' or not self.querystring_auth
 
        if not self.custom_endpoint and no_signed_url:
            return blob.public_url
        elif no_signed_url:
            out = '{storage_base_url}/{quoted_name}'.format(
                storage_base_url=self.custom_endpoint,
                quoted_name=_quote(name, safe=b'/~'),
            )
            return out
        elif not self.custom_endpoint:
            out2 = blob.generate_signed_url(expiration=self.expiration, version='v4', **self._get_signing_kwargs())
            return out2
        else:
            out3 = blob.generate_signed_url(
                bucket_bound_hostname=self.custom_endpoint,
                expiration=self.expiration,
                version='v4',
                **self._get_signing_kwargs(),
            )
            return out3
 
    def _get_signing_credentials(self):
        with self._signing_credentials_lock:
            if self._signing_credentials is None or self._signing_credentials.expired:
                credentials, _ = google.auth.default(['https://www.googleapis.com/auth/cloud-platform'])
                auth_req = google.auth.transport.requests.Request()
                credentials.refresh(auth_req)
                self._signing_credentials = credentials
        return self._signing_credentials
 
    def _get_signing_kwargs(self):
        credentials = self._get_signing_credentials()
        out = {
            'service_account_email': credentials.service_account_email,
            'access_token': credentials.token,
            'credentials': credentials,
        }
        return out
 
 
class AlternativeGoogleCloudStorage(StorageProxyMixin, AlternativeGoogleCloudStorageBase):
    pass