170 lines
5.3 KiB
Python
170 lines
5.3 KiB
Python
"""Custom file storage classes."""
|
|
import logging
|
|
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.db.models.fields.files import FieldFile
|
|
|
|
from botocore.client import Config
|
|
import boto3
|
|
import botocore.exceptions
|
|
|
|
import nginx_secure_links.storages
|
|
|
|
from storages.backends.s3boto3 import S3Boto3Storage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
_s3_client = None
|
|
|
|
|
|
class S3PublicStorage(S3Boto3Storage):
|
|
"""Disable signing for URLs generated by this storage.
|
|
|
|
Useful for files that are publicly readable and don't need signed URLs, such as thumbnails.
|
|
"""
|
|
|
|
querystring_auth = False
|
|
|
|
|
|
class S3Boto3CustomStorage(S3Boto3Storage):
|
|
"""Override some upload parameters, such as ContentDisposition header."""
|
|
|
|
def _get_write_parameters(self, name, content):
|
|
"""Set ContentDisposition header using original file name.
|
|
|
|
While docstring recommends overriding `get_object_parameters` for this purpose,
|
|
`get_object_parameters` only gets a `name` which is not the original file name,
|
|
but the result of `upload_to`.
|
|
"""
|
|
params = super()._get_write_parameters(name, content)
|
|
original_name = getattr(content, 'name', None)
|
|
if original_name and name != original_name:
|
|
content_disposition = f'attachment; filename="{original_name}"'
|
|
params['ContentDisposition'] = content_disposition
|
|
return params
|
|
|
|
|
|
def _get_s3_client():
|
|
return boto3.client(
|
|
's3',
|
|
config=Config(signature_version='s3v4'),
|
|
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
|
|
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
|
|
region_name=settings.AWS_S3_REGION_NAME,
|
|
)
|
|
|
|
|
|
def get_s3_url(path, expires_in_seconds=settings.FILE_LINK_EXPIRE_SECONDS):
|
|
"""Generate a pre-signed S3 URL to a given path."""
|
|
global _s3_client
|
|
if not _s3_client:
|
|
_s3_client = _get_s3_client()
|
|
|
|
return _s3_client.generate_presigned_url(
|
|
'get_object',
|
|
Params={'Bucket': settings.AWS_STORAGE_BUCKET_NAME, 'Key': path},
|
|
HttpMethod='GET',
|
|
ExpiresIn=expires_in_seconds,
|
|
)
|
|
|
|
|
|
def file_exists(path, bucket=settings.AWS_STORAGE_BUCKET_NAME):
|
|
"""Check if given file exists in S3."""
|
|
global _s3_client
|
|
if not _s3_client:
|
|
_s3_client = _get_s3_client()
|
|
|
|
try:
|
|
_s3_client.head_object(Bucket=bucket, Key=path)
|
|
except botocore.exceptions.ClientError as e:
|
|
return int(e.response['Error']['Code']) != 404
|
|
return True
|
|
|
|
|
|
def file_head(path, bucket=settings.AWS_STORAGE_BUCKET_NAME) -> object:
|
|
"""Return headers of the given file it if exists in S3."""
|
|
global _s3_client
|
|
if not _s3_client:
|
|
_s3_client = _get_s3_client()
|
|
|
|
try:
|
|
return _s3_client.head_object(Bucket=bucket, Key=path)
|
|
except botocore.exceptions.ClientError:
|
|
logger.exception(f'Unable to get header for {path}')
|
|
return {}
|
|
|
|
|
|
def get_s3_post_url_and_fields(
|
|
path,
|
|
bucket=settings.AWS_STORAGE_BUCKET_NAME,
|
|
fields=None,
|
|
conditions=None,
|
|
expires_in_seconds=settings.FILE_LINK_EXPIRE_SECONDS,
|
|
):
|
|
"""Generate a presigned URL S3 POST request to upload a file to a given bucket and path.
|
|
|
|
:param path: string
|
|
:param bucket: string
|
|
:param fields: Dictionary of prefilled form fields
|
|
:param conditions: List of conditions to include in the policy
|
|
:param expires_in_seconds: Time in seconds for the presigned URL to remain valid
|
|
:return: Dictionary with the following keys:
|
|
url: URL to post to
|
|
fields: Dictionary of form fields and values to submit with the POST
|
|
:return: None if error.
|
|
"""
|
|
global _s3_client
|
|
if not _s3_client:
|
|
_s3_client = _get_s3_client()
|
|
|
|
_s3_client = boto3.client('s3')
|
|
try:
|
|
response = _s3_client.generate_presigned_post(
|
|
bucket, str(path), Fields=fields, Conditions=conditions, ExpiresIn=expires_in_seconds,
|
|
)
|
|
except botocore.exceptions.ClientError as e:
|
|
logger.error(e)
|
|
return None
|
|
|
|
# The response contains the presigned URL and required fields
|
|
return response
|
|
|
|
|
|
_storages = {
|
|
's3': S3Boto3CustomStorage(),
|
|
'fs': nginx_secure_links.storages.FileStorage(),
|
|
}
|
|
|
|
|
|
class DynamicStorageFieldFile(FieldFile):
|
|
"""Defines which storage the file is located at."""
|
|
|
|
def __init__(self, instance, *args, **kwargs):
|
|
"""Choose between S3 and file system storage depending on `source_storage`."""
|
|
super().__init__(instance, *args, **kwargs)
|
|
if instance.source_storage is None: # S3 is default
|
|
self.storage = _storages['s3']
|
|
elif instance.source_storage == 'fs':
|
|
self.storage = _storages['fs']
|
|
else:
|
|
raise
|
|
|
|
|
|
class CustomFileField(models.FileField):
|
|
"""Defines which storage the file field is located at."""
|
|
|
|
attr_class = DynamicStorageFieldFile
|
|
|
|
def pre_save(self, model_instance, add):
|
|
"""Choose between S3 and file system storage depending on `source_storage`."""
|
|
if model_instance.source_storage is None:
|
|
storage = _storages['s3']
|
|
elif model_instance.source_storage == 'fs':
|
|
storage = _storages['fs']
|
|
else:
|
|
raise
|
|
self.storage = storage
|
|
model_instance.source.storage = storage
|
|
# TODO: do the same for thumbnail?
|
|
return super().pre_save(model_instance, add)
|