Big refactoring of file storage handling
- Moved pillar.api.utils.{gcs,storage} to pillar.api.file_storage_backends - Implemented GCS and local storage using abstract Bucket and Blob classes - Removed file processing from the Blob class, and kept it in the file_storage/__init__.py class. That way storage and processing are kept separate.
This commit is contained in:
@@ -1,279 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from bson import ObjectId
|
||||
from gcloud.storage.client import Client
|
||||
from gcloud.exceptions import NotFound
|
||||
from flask import current_app, g
|
||||
from werkzeug.local import LocalProxy
|
||||
|
||||
from pillar.api.utils.storage import Bucket, Blob
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_client():
|
||||
"""Stores the GCS client on the global Flask object.
|
||||
|
||||
The GCS client is not user-specific anyway.
|
||||
|
||||
:rtype: Client
|
||||
"""
|
||||
|
||||
_gcs = getattr(g, '_gcs_client', None)
|
||||
if _gcs is None:
|
||||
_gcs = g._gcs_client = Client()
|
||||
return _gcs
|
||||
|
||||
|
||||
# This hides the specifics of how/where we store the GCS client,
|
||||
# and allows the rest of the code to use 'gcs' as a simple variable
|
||||
# that does the right thing.
|
||||
gcs = LocalProxy(get_client)
|
||||
|
||||
|
||||
class GoogleCloudStorageBucket(Bucket):
|
||||
"""Cloud Storage bucket interface. We create a bucket for every project. In
|
||||
the bucket we create first level subdirs as follows:
|
||||
- '_' (will contain hashed assets, and stays on top of default listing)
|
||||
- 'svn' (svn checkout mirror)
|
||||
- 'shared' (any additional folder of static folder that is accessed via a
|
||||
node of 'storage' node_type)
|
||||
|
||||
:type bucket_name: string
|
||||
:param bucket_name: Name of the bucket.
|
||||
|
||||
:type subdir: string
|
||||
:param subdir: The local entry point to browse the bucket.
|
||||
|
||||
"""
|
||||
|
||||
backend_name = 'gcs'
|
||||
|
||||
def __init__(self, name, subdir='_/'):
|
||||
super(GoogleCloudStorageBucket, self).__init__(name=name)
|
||||
try:
|
||||
self._gcs_bucket = gcs.get_bucket(name)
|
||||
except NotFound:
|
||||
self._gcs_bucket = gcs.bucket(name)
|
||||
# Hardcode the bucket location to EU
|
||||
self._gcs_bucket.location = 'EU'
|
||||
# Optionally enable CORS from * (currently only used for vrview)
|
||||
# self.gcs_bucket.cors = [
|
||||
# {
|
||||
# "origin": ["*"],
|
||||
# "responseHeader": ["Content-Type"],
|
||||
# "method": ["GET", "HEAD", "DELETE"],
|
||||
# "maxAgeSeconds": 3600
|
||||
# }
|
||||
# ]
|
||||
self._gcs_bucket.create()
|
||||
|
||||
self.subdir = subdir
|
||||
|
||||
def blob(self, blob_name):
|
||||
return GoogleCloudStorageBlob(name=blob_name, bucket=self)
|
||||
|
||||
def List(self, path=None):
|
||||
"""Display the content of a subdir in the project bucket. If the path
|
||||
points to a file the listing is simply empty.
|
||||
|
||||
:type path: string
|
||||
:param path: The relative path to the directory or asset.
|
||||
"""
|
||||
if path and not path.endswith('/'):
|
||||
path += '/'
|
||||
prefix = os.path.join(self.subdir, path)
|
||||
|
||||
fields_to_return = 'nextPageToken,items(name,size,contentType),prefixes'
|
||||
req = self._gcs_bucket.list_blobs(fields=fields_to_return, prefix=prefix,
|
||||
delimiter='/')
|
||||
|
||||
files = []
|
||||
for f in req:
|
||||
filename = os.path.basename(f.name)
|
||||
if filename != '': # Skip own folder name
|
||||
files.append(dict(
|
||||
path=os.path.relpath(f.name, self.subdir),
|
||||
text=filename,
|
||||
type=f.content_type))
|
||||
|
||||
directories = []
|
||||
for dir_path in req.prefixes:
|
||||
directory_name = os.path.basename(os.path.normpath(dir_path))
|
||||
directories.append(dict(
|
||||
text=directory_name,
|
||||
path=os.path.relpath(dir_path, self.subdir),
|
||||
type='group_storage',
|
||||
children=True))
|
||||
# print os.path.basename(os.path.normpath(path))
|
||||
|
||||
list_dict = dict(
|
||||
name=os.path.basename(os.path.normpath(path)),
|
||||
type='group_storage',
|
||||
children=files + directories
|
||||
)
|
||||
|
||||
return list_dict
|
||||
|
||||
def blob_to_dict(self, blob):
|
||||
blob.reload()
|
||||
expiration = datetime.datetime.now() + datetime.timedelta(days=1)
|
||||
expiration = int(time.mktime(expiration.timetuple()))
|
||||
return dict(
|
||||
updated=blob.updated,
|
||||
name=os.path.basename(blob.name),
|
||||
size=blob.size,
|
||||
content_type=blob.content_type,
|
||||
signed_url=blob.generate_signed_url(expiration),
|
||||
public_url=blob.public_url)
|
||||
|
||||
def Get(self, path, to_dict=True):
|
||||
"""Get selected file info if the path matches.
|
||||
|
||||
:type path: string
|
||||
:param path: The relative path to the file.
|
||||
:type to_dict: bool
|
||||
:param to_dict: Return the object as a dictionary.
|
||||
"""
|
||||
path = os.path.join(self.subdir, path)
|
||||
blob = self._gcs_bucket.blob(path)
|
||||
if blob.exists():
|
||||
if to_dict:
|
||||
return self.blob_to_dict(blob)
|
||||
else:
|
||||
return blob
|
||||
else:
|
||||
return None
|
||||
|
||||
def Post(self, full_path, path=None):
|
||||
"""Create new blob and upload data to it.
|
||||
"""
|
||||
path = path if path else os.path.join('_', os.path.basename(full_path))
|
||||
blob = self._gcs_bucket.blob(path)
|
||||
if blob.exists():
|
||||
return None
|
||||
blob.upload_from_filename(full_path)
|
||||
return blob
|
||||
# return self.blob_to_dict(blob) # Has issues with threading
|
||||
|
||||
def Delete(self, path):
|
||||
"""Delete blob (when removing an asset or replacing a preview)"""
|
||||
|
||||
# We want to get the actual blob to delete
|
||||
blob = self.Get(path, to_dict=False)
|
||||
try:
|
||||
blob.delete()
|
||||
return True
|
||||
except NotFound:
|
||||
return None
|
||||
|
||||
def update_name(self, blob, name):
|
||||
"""Set the ContentDisposition metadata so that when a file is downloaded
|
||||
it has a human-readable name.
|
||||
"""
|
||||
blob.content_disposition = 'attachment; filename="{0}"'.format(name)
|
||||
blob.patch()
|
||||
|
||||
def copy_blob(self, blob, to_bucket):
|
||||
"""Copies the given blob from this bucket to the other bucket.
|
||||
|
||||
Returns the new blob.
|
||||
"""
|
||||
|
||||
assert isinstance(to_bucket, GoogleCloudStorageBucket)
|
||||
return self._gcs_bucket.copy_blob(blob, to_bucket._gcs_bucket)
|
||||
|
||||
def get_blob(self, internal_fname, chunk_size=256 * 1024 * 2):
|
||||
return self._gcs_bucket.blob('_/' + internal_fname, chunk_size)
|
||||
|
||||
|
||||
class GoogleCloudStorageBlob(Blob):
|
||||
"""GCS blob interface."""
|
||||
def __init__(self, name, bucket):
|
||||
super(GoogleCloudStorageBlob, self).__init__(name, bucket)
|
||||
|
||||
self.blob = bucket.gcs_bucket.blob('_/' + name, chunk_size=256 * 1024 * 2)
|
||||
|
||||
def create_from_file(self, uploaded_file, file_size):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _process_image(self, file_doc):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _process_video(self, file_doc):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def update_file_name(node):
|
||||
"""Assign to the CGS blob the same name of the asset node. This way when
|
||||
downloading an asset we get a human-readable name.
|
||||
"""
|
||||
|
||||
# Process only files that are not processing
|
||||
if node['properties'].get('status', '') == 'processing':
|
||||
return
|
||||
|
||||
def _format_name(name, override_ext, size=None, map_type=''):
|
||||
root, _ = os.path.splitext(name)
|
||||
size = '-{}'.format(size) if size else ''
|
||||
map_type = '-{}'.format(map_type) if map_type else ''
|
||||
return '{}{}{}{}'.format(root, size, map_type, override_ext)
|
||||
|
||||
def _update_name(file_id, file_props):
|
||||
files_collection = current_app.data.driver.db['files']
|
||||
file_doc = files_collection.find_one({'_id': ObjectId(file_id)})
|
||||
|
||||
if file_doc is None or file_doc.get('backend') != 'gcs':
|
||||
return
|
||||
|
||||
# For textures -- the map type should be part of the name.
|
||||
map_type = file_props.get('map_type', '')
|
||||
|
||||
storage = GoogleCloudStorageBucket(str(node['project']))
|
||||
blob = storage.Get(file_doc['file_path'], to_dict=False)
|
||||
if blob is None:
|
||||
log.warning('Unable to find blob for file %s in project %s',
|
||||
file_doc['file_path'], file_doc['project'])
|
||||
return
|
||||
|
||||
# Pick file extension from original filename
|
||||
_, ext = os.path.splitext(file_doc['filename'])
|
||||
name = _format_name(node['name'], ext, map_type=map_type)
|
||||
storage.update_name(blob, name)
|
||||
|
||||
# Assign the same name to variations
|
||||
for v in file_doc.get('variations', []):
|
||||
_, override_ext = os.path.splitext(v['file_path'])
|
||||
name = _format_name(node['name'], override_ext, v['size'], map_type=map_type)
|
||||
blob = storage.Get(v['file_path'], to_dict=False)
|
||||
if blob is None:
|
||||
log.info('Unable to find blob for file %s in project %s. This can happen if the '
|
||||
'video encoding is still processing.', v['file_path'], node['project'])
|
||||
continue
|
||||
storage.update_name(blob, name)
|
||||
|
||||
# Currently we search for 'file' and 'files' keys in the object properties.
|
||||
# This could become a bit more flexible and realy on a true reference of the
|
||||
# file object type from the schema.
|
||||
if 'file' in node['properties']:
|
||||
_update_name(node['properties']['file'], {})
|
||||
|
||||
if 'files' in node['properties']:
|
||||
for file_props in node['properties']['files']:
|
||||
_update_name(file_props['file'], file_props)
|
||||
|
||||
|
||||
def copy_to_bucket(file_path, src_project_id, dest_project_id):
|
||||
"""Copies a file from one bucket to the other."""
|
||||
|
||||
log.info('Copying %s from project bucket %s to %s',
|
||||
file_path, src_project_id, dest_project_id)
|
||||
|
||||
src_storage = GoogleCloudStorageBucket(str(src_project_id))
|
||||
dest_storage = GoogleCloudStorageBucket(str(dest_project_id))
|
||||
|
||||
blob = src_storage.Get(file_path, to_dict=False)
|
||||
src_storage.copy_blob(blob, dest_storage)
|
@@ -5,6 +5,7 @@ from PIL import Image
|
||||
from flask import current_app
|
||||
|
||||
|
||||
# TODO: refactor to use pathlib.Path and f-strings.
|
||||
def generate_local_thumbnails(name_base, src):
|
||||
"""Given a source image, use Pillow to generate thumbnails according to the
|
||||
application settings.
|
||||
|
@@ -1,266 +1 @@
|
||||
"""Utility for managing storage backends and files."""
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import typing
|
||||
|
||||
from bson import ObjectId
|
||||
from flask import current_app
|
||||
|
||||
from pillar.api import utils
|
||||
from pillar.api.utils.authorization import user_has_role
|
||||
from pillar.api.utils.imaging import generate_local_thumbnails
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Bucket(metaclass=abc.ABCMeta):
|
||||
"""Can be a GCS bucket or simply a project folder in Pillar
|
||||
|
||||
:type name: string
|
||||
:param name: Name of the bucket. As a convention, we use the ID of
|
||||
the project to name the bucket.
|
||||
|
||||
"""
|
||||
|
||||
# Mapping from backend name to Bucket class
|
||||
backends = {}
|
||||
|
||||
backend_name: str = None # define in subclass.
|
||||
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __init_subclass__(cls, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
assert cls.backend_name, '%s.backend_name must be non-empty string' % cls
|
||||
cls.backends[cls.backend_name] = cls
|
||||
|
||||
@classmethod
|
||||
def for_backend(cls, backend_name: str) -> type:
|
||||
"""Returns the Bucket subclass for the given backend."""
|
||||
return cls.backends[backend_name]
|
||||
|
||||
@abc.abstractmethod
|
||||
def blob(self, blob_name) -> 'Blob':
|
||||
"""Factory constructor for blob object.
|
||||
|
||||
:type blob_name: string
|
||||
:param blob_name: The name of the blob to be instantiated.
|
||||
"""
|
||||
return Blob(name=blob_name, bucket=self)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_blob(self, blob_name) -> typing.Optional['Blob']:
|
||||
"""Get a blob object by name.
|
||||
|
||||
If the blob exists return the object, otherwise None.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class Blob(metaclass=abc.ABCMeta):
|
||||
"""A wrapper for file or blob objects.
|
||||
|
||||
:type name: string
|
||||
:param name: Name of the blob.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, bucket: Bucket):
|
||||
self.name = name
|
||||
self.bucket = bucket
|
||||
self._size_in_bytes = None
|
||||
|
||||
@property
|
||||
def size(self) -> typing.Optional[int]:
|
||||
"""Size of the object, in bytes.
|
||||
|
||||
:returns: The size of the blob or ``None`` if the property
|
||||
is not set locally.
|
||||
"""
|
||||
size = self._size_in_bytes
|
||||
if size is not None:
|
||||
return int(size)
|
||||
return self._size_in_bytes
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_from_file(self, uploaded_file, file_size):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def _process_image(self, file_doc):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def _process_video(self, file_doc):
|
||||
pass
|
||||
|
||||
# TODO Sybren: change file_id type to ObjectId?
|
||||
def process_file(self, file_id: str):
|
||||
"""Generate image thumbnails or encode video.
|
||||
|
||||
:type file_id: string
|
||||
:param file_id: The document ID for the file processed. We need it to
|
||||
update the document as we process the file.
|
||||
"""
|
||||
|
||||
def update_file_doc(file_id, **updates):
|
||||
res = files.update_one({'_id': ObjectId(file_id)},
|
||||
{'$set': updates})
|
||||
|
||||
log.debug('update_file_doc(%s, %s): %i matched, %i updated.',
|
||||
file_id, updates, res.matched_count, res.modified_count)
|
||||
return res
|
||||
|
||||
file_id = ObjectId(file_id)
|
||||
|
||||
# Fetch the src_file document from MongoDB.
|
||||
files = current_app.data.driver.db['files']
|
||||
src_file = files.find_one(file_id)
|
||||
if not src_file:
|
||||
log.warning(
|
||||
'process_file(%s): no such file document found, ignoring.')
|
||||
return
|
||||
|
||||
# Update the 'format' field from the content type.
|
||||
# TODO: overrule the content type based on file extention & magic numbers.
|
||||
mime_category, src_file['format'] = src_file['content_type'].split('/', 1)
|
||||
|
||||
# Prevent video handling for non-admins.
|
||||
if not user_has_role('admin') and mime_category == 'video':
|
||||
if src_file['format'].startswith('x-'):
|
||||
xified = src_file['format']
|
||||
else:
|
||||
xified = 'x-' + src_file['format']
|
||||
|
||||
src_file['content_type'] = 'application/%s' % xified
|
||||
mime_category = 'application'
|
||||
log.info('Not processing video file %s for non-admin user', file_id)
|
||||
|
||||
# Run the required processor, based on the MIME category.
|
||||
processors = {
|
||||
'image': self._process_image,
|
||||
'video': self._process_video,
|
||||
}
|
||||
|
||||
try:
|
||||
processor = processors[mime_category]
|
||||
except KeyError:
|
||||
log.info("POSTed file %s was of type %r, which isn't "
|
||||
"thumbnailed/encoded.", file_id,
|
||||
mime_category)
|
||||
src_file['status'] = 'complete'
|
||||
else:
|
||||
log.debug('process_file(%s): marking file status as "processing"',
|
||||
file_id)
|
||||
src_file['status'] = 'processing'
|
||||
update_file_doc(file_id, status='processing')
|
||||
|
||||
try:
|
||||
processor(src_file)
|
||||
except Exception:
|
||||
log.warning('process_file(%s): error when processing file, '
|
||||
'resetting status to '
|
||||
'"queued_for_processing"', file_id, exc_info=True)
|
||||
update_file_doc(file_id, status='queued_for_processing')
|
||||
return
|
||||
|
||||
src_file = utils.remove_private_keys(src_file)
|
||||
# Update the original file with additional info, e.g. image resolution
|
||||
r, _, _, status = current_app.put_internal('files', src_file,
|
||||
_id=file_id)
|
||||
if status not in (200, 201):
|
||||
log.warning(
|
||||
'process_file(%s): status %i when saving processed file '
|
||||
'info to MongoDB: %s',
|
||||
file_id, status, r)
|
||||
|
||||
|
||||
class LocalBucket(Bucket):
|
||||
backend_name = 'local'
|
||||
|
||||
def blob(self, blob_name: str) -> 'LocalBlob':
|
||||
return LocalBlob(name=blob_name, bucket=self)
|
||||
|
||||
def get_blob(self, blob_name: str) -> typing.Optional['LocalBlob']:
|
||||
# TODO: Check if file exists, otherwise None
|
||||
return self.blob(blob_name)
|
||||
|
||||
|
||||
class LocalBlob(Blob):
|
||||
def __init__(self, name: str, bucket: LocalBucket):
|
||||
super().__init__(name=name, bucket=bucket)
|
||||
|
||||
bucket_name = bucket.name
|
||||
self.partial_path = os.path.join(bucket_name[:2], bucket_name,
|
||||
name[:2], name)
|
||||
self.path = os.path.join(
|
||||
current_app.config['STORAGE_DIR'], self.partial_path)
|
||||
|
||||
def create_from_file(self, uploaded_file: typing.io.BinaryIO, file_size: int):
|
||||
assert hasattr(uploaded_file, 'read')
|
||||
|
||||
# Ensure path exists before saving
|
||||
directory = os.path.dirname(self.path)
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
|
||||
with open(self.path, 'wb') as outfile:
|
||||
shutil.copyfileobj(uploaded_file, outfile)
|
||||
|
||||
self._size_in_bytes = file_size
|
||||
|
||||
def _process_image(self, file_doc: dict):
|
||||
from PIL import Image
|
||||
|
||||
im = Image.open(self.path)
|
||||
res = im.size
|
||||
file_doc['width'] = res[0]
|
||||
file_doc['height'] = res[1]
|
||||
|
||||
# Generate previews
|
||||
log.info('Generating thumbnails for file %s', file_doc['_id'])
|
||||
file_doc['variations'] = generate_local_thumbnails(file_doc['name'],
|
||||
self.path)
|
||||
|
||||
# Send those previews to Google Cloud Storage.
|
||||
log.info('Uploading %i thumbnails for file %s to Google Cloud Storage '
|
||||
'(GCS)', len(file_doc['variations']), file_doc['_id'])
|
||||
|
||||
# TODO: parallelize this at some point.
|
||||
for variation in file_doc['variations']:
|
||||
fname = variation['file_path']
|
||||
if current_app.config['TESTING']:
|
||||
log.warning(' - NOT making thumbnails', fname)
|
||||
else:
|
||||
log.debug(' - Sending thumbnail %s to %s', fname, self.bucket)
|
||||
|
||||
blob = self.bucket.blob(fname)
|
||||
with open(variation['local_path'], 'rb') as local_file:
|
||||
blob.create_from_file(local_file, variation['length'])
|
||||
|
||||
try:
|
||||
os.unlink(variation['local_path'])
|
||||
except OSError:
|
||||
log.warning(
|
||||
'Unable to unlink %s, ignoring this but it will need '
|
||||
'cleanup later.', variation['local_path'])
|
||||
|
||||
del variation['local_path']
|
||||
|
||||
log.info('Done processing file %s', file_doc['_id'])
|
||||
file_doc['status'] = 'complete'
|
||||
|
||||
def _process_video(self, file_doc):
|
||||
pass
|
||||
|
||||
|
||||
def default_storage_backend(name):
|
||||
from flask import current_app
|
||||
|
||||
backend_name = current_app.config['STORAGE_BACKEND']
|
||||
backend_cls = Bucket.for_backend(backend_name)
|
||||
return backend_cls(name)
|
||||
|
Reference in New Issue
Block a user