"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
|
"""
|
import logging
|
import os
|
import threading
|
from urllib.parse import unquote, urlsplit, urlunsplit
|
|
import google.auth
|
from django.conf import settings
|
from django.contrib.staticfiles.storage import ManifestStaticFilesStorage
|
from storages.backends.azure_storage import AzureStorage
|
from storages.backends.gcloud import GoogleCloudStorage, _quote, clean_name
|
from storages.backends.s3boto3 import S3Boto3Storage
|
|
logger = logging.getLogger(__name__)
|
|
|
class SkipMissedManifestStaticFilesStorage(ManifestStaticFilesStorage):
|
"""We need this class to escape missing files from
|
django.contrib.staticfiles.finders.FileSystemFinder:
|
this class tries to find js/css/png/jpg/... inside of you js/css/...
|
"""
|
|
# Disable strict cache manifest checking
|
manifest_strict = False
|
|
def hashed_name(self, name, content=None, filename=None):
|
# `filename` is the name of file to hash if `content` isn't given.
|
# `name` is the base name to construct the new hashed filename from.
|
parsed_name = urlsplit(unquote(name))
|
clean_name = parsed_name.path.strip()
|
filename = (filename and urlsplit(unquote(filename)).path.strip()) or clean_name
|
opened = content is None
|
if opened:
|
if not self.exists(filename):
|
return ''
|
try:
|
content = self.open(filename)
|
except IOError:
|
# Handle directory paths and fragments
|
return name
|
try:
|
file_hash = self.file_hash(clean_name, content)
|
finally:
|
if opened:
|
content.close()
|
path, filename = os.path.split(clean_name)
|
root, ext = os.path.splitext(filename)
|
if file_hash is not None:
|
file_hash = '.%s' % file_hash
|
hashed_name = os.path.join(path, '%s%s%s' % (root, file_hash, ext))
|
unparsed_name = list(parsed_name)
|
unparsed_name[2] = hashed_name
|
# Special casing for a @font-face hack, like url(myfont.eot?#iefix")
|
# http://www.fontspring.com/blog/the-new-bulletproof-font-face-syntax
|
if '?#' in name and not unparsed_name[3]:
|
unparsed_name[2] += '?'
|
return urlunsplit(unparsed_name)
|
|
|
class StorageProxyMixin:
|
def url(self, name, storage_url=False, *args, **kwargs):
|
if storage_url is True:
|
return super().url(name, *args, **kwargs)
|
return f'{settings.HOSTNAME}/storage-data/uploaded/?filepath={name}'
|
|
|
class CustomS3Boto3Storage(StorageProxyMixin, S3Boto3Storage):
|
pass
|
|
|
class CustomAzureStorage(StorageProxyMixin, AzureStorage):
|
pass
|
|
|
class AlternativeGoogleCloudStorageBase(GoogleCloudStorage):
|
"""A subclass to force the use of the IAM signBlob API
|
This allows the signing of blob URLs without having to use a credential file.
|
The service account must have the iam.serviceAccounts.signBlob permission."""
|
|
def __init__(self, **settings):
|
super().__init__(**settings)
|
self._signing_credentials = None
|
self._signing_credentials_lock = threading.Lock()
|
|
def url(self, name):
|
"""
|
Return public url or a signed url for the Blob.
|
This DOES NOT check for existence of Blob - that makes codes too slow
|
for many use cases.
|
Overridden to force the use of the IAM signBlob API.
|
See https://github.com/googleapis/python-storage/blob/519074112775c19742522158f612b467cf590219/google/cloud/storage/_signing.py#L628 # NOQA
|
"""
|
name = self._normalize_name(clean_name(name))
|
blob = self.bucket.blob(name)
|
blob_params = self.get_object_parameters(name)
|
no_signed_url = blob_params.get('acl', self.default_acl) == 'publicRead' or not self.querystring_auth
|
|
if not self.custom_endpoint and no_signed_url:
|
return blob.public_url
|
elif no_signed_url:
|
out = '{storage_base_url}/{quoted_name}'.format(
|
storage_base_url=self.custom_endpoint,
|
quoted_name=_quote(name, safe=b'/~'),
|
)
|
return out
|
elif not self.custom_endpoint:
|
out2 = blob.generate_signed_url(expiration=self.expiration, version='v4', **self._get_signing_kwargs())
|
return out2
|
else:
|
out3 = blob.generate_signed_url(
|
bucket_bound_hostname=self.custom_endpoint,
|
expiration=self.expiration,
|
version='v4',
|
**self._get_signing_kwargs(),
|
)
|
return out3
|
|
def _get_signing_credentials(self):
|
with self._signing_credentials_lock:
|
if self._signing_credentials is None or self._signing_credentials.expired:
|
credentials, _ = google.auth.default(['https://www.googleapis.com/auth/cloud-platform'])
|
auth_req = google.auth.transport.requests.Request()
|
credentials.refresh(auth_req)
|
self._signing_credentials = credentials
|
return self._signing_credentials
|
|
def _get_signing_kwargs(self):
|
credentials = self._get_signing_credentials()
|
out = {
|
'service_account_email': credentials.service_account_email,
|
'access_token': credentials.token,
|
'credentials': credentials,
|
}
|
return out
|
|
|
class AlternativeGoogleCloudStorage(StorageProxyMixin, AlternativeGoogleCloudStorageBase):
|
pass
|