diff --git a/pillar/api/encoding.py b/pillar/api/encoding.py index e10cc40f..ae161b09 100644 --- a/pillar/api/encoding.py +++ b/pillar/api/encoding.py @@ -1,14 +1,15 @@ -import logging - import datetime +import logging import os + from bson import ObjectId, tz_util from flask import Blueprint from flask import abort from flask import current_app from flask import request + from pillar.api import utils -from pillar.api.utils.gcs import GoogleCloudStorageBucket +from pillar.api.file_storage_backends.gcs import GoogleCloudStorageBucket from pillar.api.utils import skip_when_testing encoding = Blueprint('encoding', __name__) diff --git a/pillar/api/file_storage/__init__.py b/pillar/api/file_storage/__init__.py index 637963fb..fc8ec449 100644 --- a/pillar/api/file_storage/__init__.py +++ b/pillar/api/file_storage/__init__.py @@ -3,7 +3,9 @@ import io import logging import mimetypes import os +import pathlib import tempfile +import typing import uuid from hashlib import md5 @@ -12,7 +14,7 @@ import eve.utils import pymongo import werkzeug.exceptions as wz_exceptions from bson import ObjectId -from flask import Blueprint, current_app +from flask import Blueprint from flask import current_app from flask import g from flask import jsonify @@ -21,15 +23,15 @@ from flask import send_from_directory from flask import url_for, helpers from pillar.api import utils +from pillar.api.file_storage_backends.gcs import GoogleCloudStorageBucket, \ + GoogleCloudStorageBlob from pillar.api.utils import remove_private_keys, authentication from pillar.api.utils.authorization import require_login, user_has_role, \ user_matches_roles from pillar.api.utils.cdn import hash_file_path from pillar.api.utils.encoding import Encoder -from pillar.api.utils.gcs import GoogleCloudStorageBucket, \ - GoogleCloudStorageBlob from pillar.api.utils.imaging import generate_local_thumbnails -from pillar.api.utils.storage import LocalBlob, default_storage_backend +from pillar.api.file_storage_backends import default_storage_backend, Bucket log = logging.getLogger(__name__) @@ -81,7 +83,8 @@ def index(file_name=None): return jsonify({'url': url_for('file_storage.index', file_name=file_name)}) -def _process_image(gcs, file_id, local_file, src_file): +def _process_image(bucket: Bucket, + file_id, local_file, src_file): from PIL import Image im = Image.open(local_file) @@ -102,12 +105,12 @@ def _process_image(gcs, file_id, local_file, src_file): for variation in src_file['variations']: fname = variation['file_path'] if current_app.config['TESTING']: - log.warning(' - NOT sending thumbnail %s to GCS', fname) + log.warning(' - NOT sending thumbnail %s to %s', fname, bucket) else: - log.debug(' - Sending thumbnail %s to GCS', fname) - blob = gcs.bucket.blob('_/' + fname, chunk_size=256 * 1024 * 2) - blob.upload_from_filename(variation['local_path'], - content_type=variation['content_type']) + blob = bucket.blob(fname) + log.debug(' - Sending thumbnail %s to %s', fname, blob) + blob.upload_from_path(pathlib.Path(variation['local_path']), + content_type=variation['content_type']) if variation.get('size') == 't': blob.make_public() @@ -172,7 +175,7 @@ def _process_video(gcs, file_id, local_file, src_file): 'backend': j['backend']} -def process_file(gcs, file_id, local_file): +def process_file(bucket: Bucket, file_id, local_file): """Process the file by creating thumbnails, sending to Zencoder, etc. :param file_id: '_id' key of the file @@ -227,7 +230,7 @@ def process_file(gcs, file_id, local_file): update_file_doc(file_id, status='processing') try: - processor(gcs, file_id, local_file, src_file) + processor(bucket, file_id, local_file, src_file) except Exception: log.warning('process_file(%s): error when processing file, ' 'resetting status to ' @@ -243,60 +246,31 @@ def process_file(gcs, file_id, local_file): file_id, status, r) -def delete_file(file_item): - def process_file_delete(file_item): - """Given a file item, delete the actual file from the storage backend. - This function can be probably made self-calling.""" - if file_item['backend'] == 'gcs': - storage = GoogleCloudStorageBucket(str(file_item['project'])) - storage.Delete(file_item['file_path']) - # Delete any file variation found in the file_item document - if 'variations' in file_item: - for v in file_item['variations']: - storage.Delete(v['file_path']) - return True - elif file_item['backend'] == 'pillar': - pass - elif file_item['backend'] == 'cdnsun': - pass - else: - pass - - files_collection = current_app.data.driver.db['files'] - # Collect children (variations) of the original file - children = files_collection.find({'parent': file_item['_id']}) - for child in children: - process_file_delete(child) - # Finally remove the original file - process_file_delete(file_item) - - -def generate_link(backend, file_path: str, project_id=None, is_public=False): +def generate_link(backend, file_path: str, project_id=None, is_public=False) -> str: """Hook to check the backend of a file resource, to build an appropriate link that can be used by the client to retrieve the actual file. """ - if backend == 'gcs': - if current_app.config['TESTING']: - log.info('Skipping GCS link generation, and returning a fake link ' - 'instead.') - return '/path/to/testing/gcs/%s' % file_path + # TODO: replace config['TESTING'] with mocking GCS. + if backend == 'gcs' and current_app.config['TESTING']: + log.info('Skipping GCS link generation, and returning a fake link ' + 'instead.') + return '/path/to/testing/gcs/%s' % file_path + + if backend in {'gcs', 'local'}: + from ..file_storage_backends import Bucket + + bucket_cls = Bucket.for_backend(backend) + storage = bucket_cls(project_id) + blob = storage.get_blob(file_path) - storage = GoogleCloudStorageBucket(project_id) - blob = storage.Get(file_path) if blob is None: log.warning('generate_link(%r, %r): unable to find blob for file' ' path, returning empty link.', backend, file_path) return '' - if is_public: - return blob['public_url'] - return blob['signed_url'] - if backend == 'local': - bucket = default_storage_backend(project_id) - blob = bucket.get_blob(file_path) - return url_for('file_storage.index', file_name=blob.partial_path, - _external=True, _scheme=current_app.config['SCHEME']) + return blob.get_url(is_public=is_public) + if backend == 'pillar': return url_for('file_storage.index', file_name=file_path, _external=True, _scheme=current_app.config['SCHEME']) @@ -410,10 +384,6 @@ def generate_all_links(response, now): response['_etag'] = etag_doc['_etag'] -def before_deleting_file(item): - delete_file(item) - - def on_pre_get_files(_, lookup): # Override the HTTP header, we always want to fetch the document from # MongoDB. @@ -587,10 +557,10 @@ def override_content_type(uploaded_file): del uploaded_file._parsed_content_type -def assert_file_size_allowed(file_size): +def assert_file_size_allowed(file_size: int): """Asserts that the current user is allowed to upload a file of the given size. - :raises + :raises wz_exceptions.RequestEntityTooLarge: """ roles = current_app.config['ROLES_FOR_UNLIMITED_UPLOADS'] @@ -674,28 +644,14 @@ def stream_to_storage(project_id): assert_file_size_allowed(file_size) # Create file document in MongoDB. - file_id, internal_fname, status = create_file_doc_for_upload(project_oid, - uploaded_file) - storage_backend = None - blob = None + file_id, internal_fname, status = create_file_doc_for_upload(project_oid, uploaded_file) - if current_app.config['TESTING']: - log.warning('NOT streaming to GCS because TESTING=%r', - current_app.config['TESTING']) - # Fake a Blob object. - blob = type('Blob', (), {'size': file_size}) - else: - bucket = default_storage_backend(project_id) - blob = bucket.blob(internal_fname) - blob.create_from_file(stream_for_gcs, file_size) - # if current_app.config['STORAGE_BACKEND'] == 'gcs': - # blob, storage_backend = stream_to_gcs( - # file_id, file_size, internal_fname, project_id, - # stream_for_gcs, uploaded_file.mimetype) - # elif current_app.config['STORAGE_BACKEND'] == 'local': - # storage_backend = LocalBucket(project_id) - # blob = LocalBlob(project_id, internal_fname) - # blob.create_from_file(stream_for_gcs, file_size) + # Copy the file into storage. + bucket = default_storage_backend(project_id) + blob = bucket.blob(internal_fname) + blob.create_from_file(stream_for_gcs, + file_size=file_size, + content_type=uploaded_file.mimetype) log.debug('Marking uploaded file id=%s, fname=%s, ' 'size=%i as "queued_for_processing"', @@ -708,8 +664,7 @@ def stream_to_storage(project_id): log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, internal_fname, blob.size) - # process_file(storage_backend, file_id, local_file) - blob.process_file(file_id) + process_file(bucket, file_id, local_file) # Local processing is done, we can close the local file so it is removed. if local_file is not None: @@ -727,20 +682,17 @@ def stream_to_storage(project_id): return resp -def stream_to_gcs(file_id, file_size, internal_fname, project_id, - stream_for_gcs, content_type): +from ..file_storage_backends.abstract import FileType + + +def stream_to_gcs(file_id: ObjectId, file_size: int, internal_fname: str, project_id: ObjectId, + stream_for_gcs: FileType, content_type: str) \ + -> typing.Tuple[GoogleCloudStorageBlob, GoogleCloudStorageBucket]: # Upload the file to GCS. - from gcloud.streaming import transfer - log.debug('Streaming file to GCS bucket; id=%s, fname=%s, size=%i', - file_id, internal_fname, file_size) - # Files larger than this many bytes will be streamed directly from disk, - # smaller ones will be read into memory and then uploaded. - transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400 try: - gcs = GoogleCloudStorageBucket(project_id) - file_in_storage = GoogleCloudStorageBlob(gcs, internal_fname) - file_in_storage.blob.upload_from_file(stream_for_gcs, size=file_size, - content_type=content_type) + bucket = GoogleCloudStorageBucket(str(project_id)) + blob = bucket.blob(internal_fname) + blob.create_from_file(stream_for_gcs, file_size=file_size, content_type=content_type) except Exception: log.exception('Error uploading file to Google Cloud Storage (GCS),' ' aborting handling of uploaded file (id=%s).', file_id) @@ -748,9 +700,7 @@ def stream_to_gcs(file_id, file_size, internal_fname, project_id, raise wz_exceptions.InternalServerError( 'Unable to stream file to Google Cloud Storage') - # Reload the blob to get the file size according to Google. - file_in_storage.blob.reload() - return file_in_storage, gcs + return blob, bucket def add_access_control_headers(resp): @@ -841,8 +791,6 @@ def setup_app(app, url_prefix): app.on_fetched_item_files += before_returning_file app.on_fetched_resource_files += before_returning_files - app.on_delete_item_files += before_deleting_file - app.on_update_files += compute_aggregate_length app.on_replace_files += compute_aggregate_length app.on_insert_files += compute_aggregate_length_items diff --git a/pillar/api/file_storage/moving.py b/pillar/api/file_storage/moving.py index 592be437..5c37831a 100644 --- a/pillar/api/file_storage/moving.py +++ b/pillar/api/file_storage/moving.py @@ -5,14 +5,13 @@ import logging import os import tempfile -from bson import ObjectId import bson.tz_util -from flask import current_app import requests import requests.exceptions +from bson import ObjectId +from flask import current_app from . import stream_to_gcs, generate_all_links, ensure_valid_link -import pillar.api.utils.gcs __all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend'] @@ -90,22 +89,23 @@ def copy_file_to_backend(file_id, project_id, file_or_var, src_backend, dest_bac else: local_finfo = fetch_file_from_link(file_or_var['link']) - # Upload to GCS - if dest_backend != 'gcs': - raise ValueError('Only dest_backend="gcs" is supported now.') + try: + # Upload to GCS + if dest_backend != 'gcs': + raise ValueError('Only dest_backend="gcs" is supported now.') - if current_app.config['TESTING']: - log.warning('Skipping actual upload to GCS due to TESTING') - else: - # TODO check for name collisions - stream_to_gcs(file_id, local_finfo['file_size'], - internal_fname=internal_fname, - project_id=str(project_id), - stream_for_gcs=local_finfo['local_file'], - content_type=local_finfo['content_type']) - - # No longer needed, so it can be closed & dispersed of. - local_finfo['local_file'].close() + if current_app.config['TESTING']: + log.warning('Skipping actual upload to GCS due to TESTING') + else: + # TODO check for name collisions + stream_to_gcs(file_id, local_finfo['file_size'], + internal_fname=internal_fname, + project_id=project_id, + stream_for_gcs=local_finfo['local_file'], + content_type=local_finfo['content_type']) + finally: + # No longer needed, so it can be closed & dispersed of. + local_finfo['local_file'].close() def fetch_file_from_link(link): @@ -169,10 +169,12 @@ def gcs_move_to_bucket(file_id, dest_project_id, skip_gcs=False): if skip_gcs: log.warning('NOT ACTUALLY MOVING file %s on GCS, just updating MongoDB', file_id) else: + from pillar.api.file_storage_backends.gcs import GoogleCloudStorageBucket + src_project = f['project'] - pillar.api.utils.gcs.copy_to_bucket(f['file_path'], src_project, dest_project_id) + GoogleCloudStorageBucket.copy_to_bucket(f['file_path'], src_project, dest_project_id) for var in f.get('variations', []): - pillar.api.utils.gcs.copy_to_bucket(var['file_path'], src_project, dest_project_id) + GoogleCloudStorageBucket.copy_to_bucket(var['file_path'], src_project, dest_project_id) # Update the file document after moving was successful. log.info('Switching file %s to project %s', file_id, dest_project_id) diff --git a/pillar/api/file_storage_backends/__init__.py b/pillar/api/file_storage_backends/__init__.py new file mode 100644 index 00000000..3387bd9c --- /dev/null +++ b/pillar/api/file_storage_backends/__init__.py @@ -0,0 +1,25 @@ +"""Storage backends. + +To obtain a storage backend, use either of the two forms: + +>>> bucket = default_storage_backend('bucket_name') + +>>> BucketClass = Bucket.for_backend('backend_name') +>>> bucket = BucketClass('bucket_name') + +""" + +from .abstract import Bucket + +# Import the other backends so that they register. +from . import local +from . import gcs + + +def default_storage_backend(name: str) -> Bucket: + from flask import current_app + + backend_name = current_app.config['STORAGE_BACKEND'] + backend_cls = Bucket.for_backend(backend_name) + + return backend_cls(name) diff --git a/pillar/api/file_storage_backends/abstract.py b/pillar/api/file_storage_backends/abstract.py new file mode 100644 index 00000000..1195de71 --- /dev/null +++ b/pillar/api/file_storage_backends/abstract.py @@ -0,0 +1,152 @@ +import abc +import io +import logging +import typing + +import pathlib +from bson import ObjectId + +__all__ = ['Bucket', 'Blob', 'Path', 'FileType'] + +# Shorthand for the type of path we use. +Path = pathlib.PurePosixPath + +# This is a mess: typing.IO keeps mypy-0.501 happy, and io.FileIO keeps PyCharm-2017.1 happy. +FileType = typing.Union[typing.IO, io.FileIO] + + +class Bucket(metaclass=abc.ABCMeta): + """Can be a GCS bucket or simply a project folder in Pillar + + :type name: string + :param name: Name of the bucket. As a convention, we use the ID of + the project to name the bucket. + + """ + + # Mapping from backend name to Bucket class + backends: typing.Dict[str, typing.Type['Bucket']] = {} + + backend_name: str = None # define in subclass. + + def __init__(self, name: str) -> None: + self.name = name + + def __init_subclass__(cls): + assert cls.backend_name, '%s.backend_name must be non-empty string' % cls + cls.backends[cls.backend_name] = cls + + def __repr__(self): + return f'<{self.__class__.__name__} name={self.name!r}>' + + @classmethod + def for_backend(cls, backend_name: str) -> typing.Type['Bucket']: + """Returns the Bucket subclass for the given backend.""" + return cls.backends[backend_name] + + @abc.abstractmethod + def blob(self, blob_name: str) -> 'Blob': + """Factory constructor for blob object. + + :param blob_name: The path of the blob to be instantiated. + """ + + @abc.abstractmethod + def get_blob(self, blob_name: str) -> typing.Optional['Blob']: + """Get a blob object by name. + + If the blob exists return the object, otherwise None. + """ + + @abc.abstractmethod + def copy_blob(self, blob: 'Blob', to_bucket: 'Bucket'): + """Copies a blob from the current bucket to the other bucket. + + Implementations only need to support copying between buckets of the + same storage backend. + """ + + @classmethod + def copy_to_bucket(cls, blob_name, src_project_id: ObjectId, dest_project_id: ObjectId): + """Copies a file from one bucket to the other.""" + + src_storage = cls(str(src_project_id)) + dest_storage = cls(str(dest_project_id)) + + blob = src_storage.get_blob(blob_name) + src_storage.copy_blob(blob, dest_storage) + + +Bu = typing.TypeVar('Bu', bound=Bucket) + + +class Blob(metaclass=abc.ABCMeta): + """A wrapper for file or blob objects.""" + + def __init__(self, name: str, bucket: Bucket) -> None: + self.name = name + self.bucket = bucket + self._size_in_bytes: typing.Optional[int] = None + + self.filename: str = None + """Name of the file for the Content-Disposition header when downloading it.""" + + self._log = logging.getLogger(f'{__name__}.Blob') + + def __repr__(self): + return f'<{self.__class__.__name__} bucket={self.bucket.name!r} name={self.name!r}>' + + @property + def size(self) -> typing.Optional[int]: + """Size of the object, in bytes. + + :returns: The size of the blob or ``None`` if the property + is not set locally. + """ + + size = self._size_in_bytes + if size is None: + return None + return int(size) + + @abc.abstractmethod + def create_from_file(self, file_obj: FileType, *, + content_type: str, + file_size: int = -1): + """Copies the file object to the storage. + + :param file_obj: The file object to send to storage. + :param content_type: The content type of the file. + :param file_size: The size of the file in bytes, or -1 if unknown + """ + + def upload_from_path(self, path: pathlib.Path, content_type: str): + file_size = path.stat().st_size + + with path.open('rb') as infile: + self.create_from_file(infile, content_type=content_type, + file_size=file_size) + + @abc.abstractmethod + def update_filename(self, filename: str): + """Sets the filename which is used when downloading the file. + + Not all storage backends support this, and will use the on-disk filename instead. + """ + + @abc.abstractmethod + def get_url(self, *, is_public: bool) -> str: + """Returns the URL to access this blob. + + Note that this may involve API calls to generate a signed URL. + """ + + @abc.abstractmethod + def make_public(self): + """Makes the blob publicly available. + + Only performs an actual action on backends that support temporary links. + """ + + +Bl = typing.TypeVar('Bl', bound=Blob) diff --git a/pillar/api/utils/gcs.py b/pillar/api/file_storage_backends/gcs.py similarity index 53% rename from pillar/api/utils/gcs.py rename to pillar/api/file_storage_backends/gcs.py index b243b713..8e1e2de3 100644 --- a/pillar/api/utils/gcs.py +++ b/pillar/api/file_storage_backends/gcs.py @@ -2,24 +2,24 @@ import os import time import datetime import logging +import typing from bson import ObjectId from gcloud.storage.client import Client +import gcloud.storage.blob from gcloud.exceptions import NotFound from flask import current_app, g from werkzeug.local import LocalProxy -from pillar.api.utils.storage import Bucket, Blob +from .abstract import Bucket, Blob, Path, FileType log = logging.getLogger(__name__) -def get_client(): +def get_client() -> Client: """Stores the GCS client on the global Flask object. The GCS client is not user-specific anyway. - - :rtype: Client """ _gcs = getattr(g, '_gcs_client', None) @@ -31,7 +31,7 @@ def get_client(): # This hides the specifics of how/where we store the GCS client, # and allows the rest of the code to use 'gcs' as a simple variable # that does the right thing. -gcs = LocalProxy(get_client) +gcs: Client = LocalProxy(get_client) class GoogleCloudStorageBucket(Bucket): @@ -52,8 +52,11 @@ class GoogleCloudStorageBucket(Bucket): backend_name = 'gcs' - def __init__(self, name, subdir='_/'): - super(GoogleCloudStorageBucket, self).__init__(name=name) + def __init__(self, name: str, subdir='_/') -> None: + super().__init__(name=name) + + self._log = logging.getLogger(f'{__name__}.GoogleCloudStorageBucket') + try: self._gcs_bucket = gcs.get_bucket(name) except NotFound: @@ -73,138 +76,110 @@ class GoogleCloudStorageBucket(Bucket): self.subdir = subdir - def blob(self, blob_name): + def blob(self, blob_name: str) -> 'GoogleCloudStorageBlob': return GoogleCloudStorageBlob(name=blob_name, bucket=self) - def List(self, path=None): - """Display the content of a subdir in the project bucket. If the path - points to a file the listing is simply empty. + def get_blob(self, internal_fname: str) -> typing.Optional['GoogleCloudStorageBlob']: + blob = self.blob(internal_fname) + if not blob.gblob.exists(): + return None + return blob - :type path: string - :param path: The relative path to the directory or asset. - """ - if path and not path.endswith('/'): - path += '/' - prefix = os.path.join(self.subdir, path) - - fields_to_return = 'nextPageToken,items(name,size,contentType),prefixes' - req = self._gcs_bucket.list_blobs(fields=fields_to_return, prefix=prefix, - delimiter='/') - - files = [] - for f in req: - filename = os.path.basename(f.name) - if filename != '': # Skip own folder name - files.append(dict( - path=os.path.relpath(f.name, self.subdir), - text=filename, - type=f.content_type)) - - directories = [] - for dir_path in req.prefixes: - directory_name = os.path.basename(os.path.normpath(dir_path)) - directories.append(dict( - text=directory_name, - path=os.path.relpath(dir_path, self.subdir), - type='group_storage', - children=True)) - # print os.path.basename(os.path.normpath(path)) - - list_dict = dict( - name=os.path.basename(os.path.normpath(path)), - type='group_storage', - children=files + directories - ) - - return list_dict - - def blob_to_dict(self, blob): - blob.reload() - expiration = datetime.datetime.now() + datetime.timedelta(days=1) - expiration = int(time.mktime(expiration.timetuple())) - return dict( - updated=blob.updated, - name=os.path.basename(blob.name), - size=blob.size, - content_type=blob.content_type, - signed_url=blob.generate_signed_url(expiration), - public_url=blob.public_url) - - def Get(self, path, to_dict=True): + def _gcs_get(self, path: str, *, chunk_size=None) -> gcloud.storage.Blob: """Get selected file info if the path matches. - :type path: string - :param path: The relative path to the file. - :type to_dict: bool - :param to_dict: Return the object as a dictionary. + :param path: The path to the file, relative to the bucket's subdir. """ path = os.path.join(self.subdir, path) - blob = self._gcs_bucket.blob(path) - if blob.exists(): - if to_dict: - return self.blob_to_dict(blob) - else: - return blob - else: - return None + blob = self._gcs_bucket.blob(path, chunk_size=chunk_size) + return blob - def Post(self, full_path, path=None): + def _gcs_post(self, full_path, *, path=None) -> typing.Optional[gcloud.storage.Blob]: """Create new blob and upload data to it. """ path = path if path else os.path.join('_', os.path.basename(full_path)) - blob = self._gcs_bucket.blob(path) - if blob.exists(): + gblob = self._gcs_bucket.blob(path) + if gblob.exists(): + self._log.error(f'Trying to upload to {path}, but that blob already exists. ' + f'Not uploading.') return None - blob.upload_from_filename(full_path) - return blob + + gblob.upload_from_filename(full_path) + return gblob # return self.blob_to_dict(blob) # Has issues with threading - def Delete(self, path): - """Delete blob (when removing an asset or replacing a preview)""" + def delete_blob(self, path: str) -> bool: + """Deletes the blob (when removing an asset or replacing a preview)""" # We want to get the actual blob to delete - blob = self.Get(path, to_dict=False) + gblob = self._gcs_get(path) try: - blob.delete() + gblob.delete() return True except NotFound: - return None + return False - def update_name(self, blob, name): - """Set the ContentDisposition metadata so that when a file is downloaded - it has a human-readable name. - """ - blob.content_disposition = 'attachment; filename="{0}"'.format(name) - blob.patch() - - def copy_blob(self, blob, to_bucket): + def copy_blob(self, blob: Blob, to_bucket: Bucket): """Copies the given blob from this bucket to the other bucket. Returns the new blob. """ + assert isinstance(blob, GoogleCloudStorageBlob) assert isinstance(to_bucket, GoogleCloudStorageBucket) - return self._gcs_bucket.copy_blob(blob, to_bucket._gcs_bucket) - def get_blob(self, internal_fname, chunk_size=256 * 1024 * 2): - return self._gcs_bucket.blob('_/' + internal_fname, chunk_size) + self._log.info('Copying %s to bucket %s', blob, to_bucket) + + return self._gcs_bucket.copy_blob(blob.gblob, to_bucket._gcs_bucket) class GoogleCloudStorageBlob(Blob): """GCS blob interface.""" - def __init__(self, name, bucket): - super(GoogleCloudStorageBlob, self).__init__(name, bucket) - self.blob = bucket.gcs_bucket.blob('_/' + name, chunk_size=256 * 1024 * 2) + def __init__(self, name: str, bucket: GoogleCloudStorageBucket) -> None: + super().__init__(name, bucket) - def create_from_file(self, uploaded_file, file_size): - raise NotImplementedError() + self._log = logging.getLogger(f'{__name__}.GoogleCloudStorageBlob') + self.gblob = bucket._gcs_get(name, chunk_size=256 * 1024 * 2) - def _process_image(self, file_doc): - raise NotImplementedError() + def create_from_file(self, file_obj: FileType, *, + content_type: str, + file_size: int = -1) -> None: + from gcloud.streaming import transfer - def _process_video(self, file_doc): - raise NotImplementedError() + self._log.debug('Streaming file to GCS bucket %r, size=%i', self, file_size) + + # Files larger than this many bytes will be streamed directly from disk, + # smaller ones will be read into memory and then uploaded. + transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400 + self.gblob.upload_from_file(file_obj, + size=file_size, + content_type=content_type) + + # Reload the blob to get the file size according to Google. + self.gblob.reload() + self._size_in_bytes = self.gblob.size + + def update_filename(self, filename: str): + """Set the ContentDisposition metadata so that when a file is downloaded + it has a human-readable name. + """ + + if '"' in filename: + raise ValueError(f'Filename is not allowed to have double quote in it: {filename!r}') + + self.gblob.content_disposition = f'attachment; filename="{filename}"' + self.gblob.patch() + + def get_url(self, *, is_public: bool) -> str: + if is_public: + return self.gblob.public_url + + expiration = datetime.datetime.utcnow() + datetime.timedelta(days=1) + return self.gblob.generate_signed_url(expiration) + + def make_public(self): + self.gblob.make_public() def update_file_name(node): @@ -233,7 +208,7 @@ def update_file_name(node): map_type = file_props.get('map_type', '') storage = GoogleCloudStorageBucket(str(node['project'])) - blob = storage.Get(file_doc['file_path'], to_dict=False) + blob = storage.get_blob(file_doc['file_path']) if blob is None: log.warning('Unable to find blob for file %s in project %s', file_doc['file_path'], file_doc['project']) @@ -242,18 +217,18 @@ def update_file_name(node): # Pick file extension from original filename _, ext = os.path.splitext(file_doc['filename']) name = _format_name(node['name'], ext, map_type=map_type) - storage.update_name(blob, name) + blob.update_filename(name) # Assign the same name to variations for v in file_doc.get('variations', []): _, override_ext = os.path.splitext(v['file_path']) name = _format_name(node['name'], override_ext, v['size'], map_type=map_type) - blob = storage.Get(v['file_path'], to_dict=False) + blob = storage.get_blob(v['file_path']) if blob is None: log.info('Unable to find blob for file %s in project %s. This can happen if the ' 'video encoding is still processing.', v['file_path'], node['project']) continue - storage.update_name(blob, name) + blob.update_filename(name) # Currently we search for 'file' and 'files' keys in the object properties. # This could become a bit more flexible and realy on a true reference of the @@ -264,16 +239,3 @@ def update_file_name(node): if 'files' in node['properties']: for file_props in node['properties']['files']: _update_name(file_props['file'], file_props) - - -def copy_to_bucket(file_path, src_project_id, dest_project_id): - """Copies a file from one bucket to the other.""" - - log.info('Copying %s from project bucket %s to %s', - file_path, src_project_id, dest_project_id) - - src_storage = GoogleCloudStorageBucket(str(src_project_id)) - dest_storage = GoogleCloudStorageBucket(str(dest_project_id)) - - blob = src_storage.Get(file_path, to_dict=False) - src_storage.copy_blob(blob, dest_storage) diff --git a/pillar/api/file_storage_backends/local.py b/pillar/api/file_storage_backends/local.py new file mode 100644 index 00000000..3b2243de --- /dev/null +++ b/pillar/api/file_storage_backends/local.py @@ -0,0 +1,100 @@ +import logging +import typing + +import pathlib +from flask import current_app + +from pillar.api.utils.imaging import generate_local_thumbnails + +__all__ = ['LocalBucket', 'LocalBlob'] + +from .abstract import Bucket, Blob, FileType, Path + + +class LocalBucket(Bucket): + backend_name = 'local' + + def __init__(self, name: str) -> None: + super().__init__(name) + + self._log = logging.getLogger(f'{__name__}.LocalBucket') + + # For local storage, the name is actually a partial path, relative + # to the local storage root. + self.root = pathlib.Path(current_app.config['STORAGE_DIR']) + self.abspath = self.root / name[:2] / name + + def blob(self, blob_name: str) -> 'LocalBlob': + return LocalBlob(name=blob_name, bucket=self) + + def get_blob(self, blob_name: str) -> typing.Optional['LocalBlob']: + # TODO: Check if file exists, otherwise None + return self.blob(blob_name) + + def copy_blob(self, blob: Blob, to_bucket: Bucket): + """Copies a blob from the current bucket to the other bucket. + + Implementations only need to support copying between buckets of the + same storage backend. + """ + + assert isinstance(blob, LocalBlob) + assert isinstance(to_bucket, LocalBucket) + + self._log.info('Copying %s to bucket %s', blob, to_bucket) + + dest_blob = to_bucket.blob(blob.name) + + # TODO: implement content type handling for local storage. + self._log.warning('Unable to set correct file content type for %s', dest_blob) + + with open(blob.abspath(), 'rb') as src_file: + dest_blob.create_from_file(src_file, content_type='application/x-octet-stream') + + +class LocalBlob(Blob): + """Blob representing a local file on the filesystem.""" + + bucket: LocalBucket + + def __init__(self, name: str, bucket: LocalBucket) -> None: + super().__init__(name, bucket) + + self._log = logging.getLogger(f'{__name__}.LocalBlob') + self.partial_path = Path(name[:2]) / name + + def abspath(self) -> pathlib.Path: + """Returns a concrete, absolute path to the local file.""" + + return pathlib.Path(self.bucket.abspath / self.partial_path) + + def get_url(self, *, is_public: bool) -> str: + from flask import url_for + + url = url_for('file_storage.index', file_name=str(self.partial_path), _external=True, + _scheme=current_app.config['SCHEME']) + return url + + def create_from_file(self, file_obj: FileType, *, + content_type: str, + file_size: int = -1): + assert hasattr(file_obj, 'read') + + import shutil + + # Ensure path exists before saving + my_path = self.abspath() + my_path.parent.mkdir(exist_ok=True, parents=True) + + with my_path.open('wb') as outfile: + shutil.copyfileobj(typing.cast(typing.IO, file_obj), outfile) + + self._size_in_bytes = file_size + + def update_filename(self, filename: str): + # TODO: implement this for local storage. + self._log.info('update_filename(%r) not supported', filename) + + def make_public(self): + # No-op on this storage backend. + pass diff --git a/pillar/api/nodes/__init__.py b/pillar/api/nodes/__init__.py index f2e65db6..79a3aa30 100644 --- a/pillar/api/nodes/__init__.py +++ b/pillar/api/nodes/__init__.py @@ -9,13 +9,13 @@ from bson import ObjectId from flask import current_app, g, Blueprint, request import pillar.markdown -from pillar.api.node_types import PILLAR_NAMED_NODE_TYPES from pillar.api.activities import activity_subscribe, activity_object_add +from pillar.api.node_types import PILLAR_NAMED_NODE_TYPES +from pillar.api.file_storage_backends.gcs import update_file_name +from pillar.api.utils import str2id, jsonify from pillar.api.utils.algolia import algolia_index_node_delete from pillar.api.utils.algolia import algolia_index_node_save -from pillar.api.utils import str2id, jsonify from pillar.api.utils.authorization import check_permissions, require_login -from pillar.api.utils.gcs import update_file_name log = logging.getLogger(__name__) blueprint = Blueprint('nodes_api', __name__) diff --git a/pillar/api/projects/hooks.py b/pillar/api/projects/hooks.py index 4a4bdcf1..3464d606 100644 --- a/pillar/api/projects/hooks.py +++ b/pillar/api/projects/hooks.py @@ -3,12 +3,13 @@ import logging from flask import request, abort, current_app from gcloud import exceptions as gcs_exceptions + from pillar.api.node_types.asset import node_type_asset from pillar.api.node_types.comment import node_type_comment from pillar.api.node_types.group import node_type_group from pillar.api.node_types.group_texture import node_type_group_texture from pillar.api.node_types.texture import node_type_texture -from pillar.api.utils.gcs import GoogleCloudStorageBucket +from pillar.api.file_storage_backends.gcs import GoogleCloudStorageBucket from pillar.api.utils import authorization, authentication from pillar.api.utils import remove_private_keys from pillar.api.utils.authorization import user_has_role, check_permissions diff --git a/pillar/api/utils/imaging.py b/pillar/api/utils/imaging.py index af92480a..83883142 100644 --- a/pillar/api/utils/imaging.py +++ b/pillar/api/utils/imaging.py @@ -5,6 +5,7 @@ from PIL import Image from flask import current_app +# TODO: refactor to use pathlib.Path and f-strings. def generate_local_thumbnails(name_base, src): """Given a source image, use Pillow to generate thumbnails according to the application settings. diff --git a/pillar/api/utils/storage.py b/pillar/api/utils/storage.py index aaae96ae..e0a8855a 100644 --- a/pillar/api/utils/storage.py +++ b/pillar/api/utils/storage.py @@ -1,266 +1 @@ """Utility for managing storage backends and files.""" - -import abc -import logging -import os -import shutil -import typing - -from bson import ObjectId -from flask import current_app - -from pillar.api import utils -from pillar.api.utils.authorization import user_has_role -from pillar.api.utils.imaging import generate_local_thumbnails - -log = logging.getLogger(__name__) - - -class Bucket(metaclass=abc.ABCMeta): - """Can be a GCS bucket or simply a project folder in Pillar - - :type name: string - :param name: Name of the bucket. As a convention, we use the ID of - the project to name the bucket. - - """ - - # Mapping from backend name to Bucket class - backends = {} - - backend_name: str = None # define in subclass. - - def __init__(self, name): - self.name = name - - def __init_subclass__(cls, **kwargs): - super().__init_subclass__(**kwargs) - assert cls.backend_name, '%s.backend_name must be non-empty string' % cls - cls.backends[cls.backend_name] = cls - - @classmethod - def for_backend(cls, backend_name: str) -> type: - """Returns the Bucket subclass for the given backend.""" - return cls.backends[backend_name] - - @abc.abstractmethod - def blob(self, blob_name) -> 'Blob': - """Factory constructor for blob object. - - :type blob_name: string - :param blob_name: The name of the blob to be instantiated. - """ - return Blob(name=blob_name, bucket=self) - - @abc.abstractmethod - def get_blob(self, blob_name) -> typing.Optional['Blob']: - """Get a blob object by name. - - If the blob exists return the object, otherwise None. - """ - pass - - -class Blob(metaclass=abc.ABCMeta): - """A wrapper for file or blob objects. - - :type name: string - :param name: Name of the blob. - - """ - - def __init__(self, name: str, bucket: Bucket): - self.name = name - self.bucket = bucket - self._size_in_bytes = None - - @property - def size(self) -> typing.Optional[int]: - """Size of the object, in bytes. - - :returns: The size of the blob or ``None`` if the property - is not set locally. - """ - size = self._size_in_bytes - if size is not None: - return int(size) - return self._size_in_bytes - - @abc.abstractmethod - def create_from_file(self, uploaded_file, file_size): - pass - - @abc.abstractmethod - def _process_image(self, file_doc): - pass - - @abc.abstractmethod - def _process_video(self, file_doc): - pass - - # TODO Sybren: change file_id type to ObjectId? - def process_file(self, file_id: str): - """Generate image thumbnails or encode video. - - :type file_id: string - :param file_id: The document ID for the file processed. We need it to - update the document as we process the file. - """ - - def update_file_doc(file_id, **updates): - res = files.update_one({'_id': ObjectId(file_id)}, - {'$set': updates}) - - log.debug('update_file_doc(%s, %s): %i matched, %i updated.', - file_id, updates, res.matched_count, res.modified_count) - return res - - file_id = ObjectId(file_id) - - # Fetch the src_file document from MongoDB. - files = current_app.data.driver.db['files'] - src_file = files.find_one(file_id) - if not src_file: - log.warning( - 'process_file(%s): no such file document found, ignoring.') - return - - # Update the 'format' field from the content type. - # TODO: overrule the content type based on file extention & magic numbers. - mime_category, src_file['format'] = src_file['content_type'].split('/', 1) - - # Prevent video handling for non-admins. - if not user_has_role('admin') and mime_category == 'video': - if src_file['format'].startswith('x-'): - xified = src_file['format'] - else: - xified = 'x-' + src_file['format'] - - src_file['content_type'] = 'application/%s' % xified - mime_category = 'application' - log.info('Not processing video file %s for non-admin user', file_id) - - # Run the required processor, based on the MIME category. - processors = { - 'image': self._process_image, - 'video': self._process_video, - } - - try: - processor = processors[mime_category] - except KeyError: - log.info("POSTed file %s was of type %r, which isn't " - "thumbnailed/encoded.", file_id, - mime_category) - src_file['status'] = 'complete' - else: - log.debug('process_file(%s): marking file status as "processing"', - file_id) - src_file['status'] = 'processing' - update_file_doc(file_id, status='processing') - - try: - processor(src_file) - except Exception: - log.warning('process_file(%s): error when processing file, ' - 'resetting status to ' - '"queued_for_processing"', file_id, exc_info=True) - update_file_doc(file_id, status='queued_for_processing') - return - - src_file = utils.remove_private_keys(src_file) - # Update the original file with additional info, e.g. image resolution - r, _, _, status = current_app.put_internal('files', src_file, - _id=file_id) - if status not in (200, 201): - log.warning( - 'process_file(%s): status %i when saving processed file ' - 'info to MongoDB: %s', - file_id, status, r) - - -class LocalBucket(Bucket): - backend_name = 'local' - - def blob(self, blob_name: str) -> 'LocalBlob': - return LocalBlob(name=blob_name, bucket=self) - - def get_blob(self, blob_name: str) -> typing.Optional['LocalBlob']: - # TODO: Check if file exists, otherwise None - return self.blob(blob_name) - - -class LocalBlob(Blob): - def __init__(self, name: str, bucket: LocalBucket): - super().__init__(name=name, bucket=bucket) - - bucket_name = bucket.name - self.partial_path = os.path.join(bucket_name[:2], bucket_name, - name[:2], name) - self.path = os.path.join( - current_app.config['STORAGE_DIR'], self.partial_path) - - def create_from_file(self, uploaded_file: typing.io.BinaryIO, file_size: int): - assert hasattr(uploaded_file, 'read') - - # Ensure path exists before saving - directory = os.path.dirname(self.path) - if not os.path.exists(directory): - os.makedirs(directory) - - with open(self.path, 'wb') as outfile: - shutil.copyfileobj(uploaded_file, outfile) - - self._size_in_bytes = file_size - - def _process_image(self, file_doc: dict): - from PIL import Image - - im = Image.open(self.path) - res = im.size - file_doc['width'] = res[0] - file_doc['height'] = res[1] - - # Generate previews - log.info('Generating thumbnails for file %s', file_doc['_id']) - file_doc['variations'] = generate_local_thumbnails(file_doc['name'], - self.path) - - # Send those previews to Google Cloud Storage. - log.info('Uploading %i thumbnails for file %s to Google Cloud Storage ' - '(GCS)', len(file_doc['variations']), file_doc['_id']) - - # TODO: parallelize this at some point. - for variation in file_doc['variations']: - fname = variation['file_path'] - if current_app.config['TESTING']: - log.warning(' - NOT making thumbnails', fname) - else: - log.debug(' - Sending thumbnail %s to %s', fname, self.bucket) - - blob = self.bucket.blob(fname) - with open(variation['local_path'], 'rb') as local_file: - blob.create_from_file(local_file, variation['length']) - - try: - os.unlink(variation['local_path']) - except OSError: - log.warning( - 'Unable to unlink %s, ignoring this but it will need ' - 'cleanup later.', variation['local_path']) - - del variation['local_path'] - - log.info('Done processing file %s', file_doc['_id']) - file_doc['status'] = 'complete' - - def _process_video(self, file_doc): - pass - - -def default_storage_backend(name): - from flask import current_app - - backend_name = current_app.config['STORAGE_BACKEND'] - backend_cls = Bucket.for_backend(backend_name) - return backend_cls(name) diff --git a/pillar/tests/__init__.py b/pillar/tests/__init__.py index 704f62bc..1c44164b 100644 --- a/pillar/tests/__init__.py +++ b/pillar/tests/__init__.py @@ -29,15 +29,6 @@ import responses import pillar from . import common_test_data as ctd -# from six: -PY3 = sys.version_info[0] == 3 -if PY3: - string_type = str - text_type = str -else: - string_type = basestring - text_type = unicode - MY_PATH = os.path.dirname(os.path.abspath(__file__)) TEST_EMAIL_USER = 'koro' @@ -71,6 +62,32 @@ class PillarTestServer(pillar.PillarServer): class AbstractPillarTest(TestMinimal): pillar_server_class = PillarTestServer + @classmethod + def setUpClass(cls): + import tempfile + + # Store the global temporary directory location, as Pillar itself will + # change this into the config['STORAGE_DIR'] directory. If we don't + # restore that, mkdtemp() will keep trying to create inside its previously + # created temporary storage directory. + cls._orig_tempdir = tempfile.gettempdir() + + # Point the storage directory to something temporary. + try: + cls._pillar_storage_dir = tempfile.mkdtemp(prefix='test-pillar-storage-') + except FileNotFoundError as ex: + raise FileNotFoundError(f'Error creating temp dir: {ex}') + os.environ['PILLAR_STORAGE_DIR'] = cls._pillar_storage_dir + + @classmethod + def tearDownClass(cls): + import tempfile + import shutil + + tempfile.tempdir = cls._orig_tempdir + shutil.rmtree(cls._pillar_storage_dir) + + def setUp(self, **kwargs): eve_settings_file = os.path.join(MY_PATH, 'eve_test_settings.py') kwargs['settings_file'] = eve_settings_file @@ -81,6 +98,8 @@ class AbstractPillarTest(TestMinimal): config.DEBUG = True self.app = self.pillar_server_class(os.path.dirname(os.path.dirname(__file__))) + self.assertEqual(self.app.config['STORAGE_DIR'], self._pillar_storage_dir) + self.app.process_extensions() assert self.app.config['MONGO_DBNAME'] == 'pillar_test' @@ -301,7 +320,7 @@ class AbstractPillarTest(TestMinimal): json=BLENDER_ID_USER_RESPONSE, status=200) - def make_header(self, username: str, subclient_id: str='') -> bytes: + def make_header(self, username: str, subclient_id: str = '') -> bytes: """Returns a Basic HTTP Authentication header value.""" content = '%s:%s' % (username, subclient_id) @@ -348,16 +367,16 @@ class AbstractPillarTest(TestMinimal): if not isinstance(params, dict): return params - def convert_to_string(param): + def convert_to_bytes(param): if isinstance(param, dict): return json.dumps(param, sort_keys=True) - if isinstance(param, text_type): + if isinstance(param, str): return param.encode('utf-8') return param # Pass as (key, value) pairs, so that the sorted order is maintained. jsonified_params = [ - (key, convert_to_string(params[key])) + (key, convert_to_bytes(params[key])) for key in sorted(params.keys())] return urlencode(jsonified_params) diff --git a/pillar/tests/config_testing.py b/pillar/tests/config_testing.py index f56facac..b27fcb1f 100644 --- a/pillar/tests/config_testing.py +++ b/pillar/tests/config_testing.py @@ -9,3 +9,6 @@ CDN_STORAGE_USER = 'u41508580125621' FILESIZE_LIMIT_BYTES_NONSUBS = 20 * 2 ** 10 ROLES_FOR_UNLIMITED_UPLOADS = {'subscriber', 'demo', 'admin'} + +GCLOUD_APP_CREDENTIALS = 'invalid-file-because-gcloud-storage-should-be-mocked-in-tests' +STORAGE_BACKEND = 'local' diff --git a/requirements-dev.txt b/requirements-dev.txt index f628b3fa..bf184130 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -8,6 +8,8 @@ responses==0.5.1 pytest-cov==2.4.0 mock==2.0.0 +# mypy-0.501 typed-ast-1.0.2 + # Secondary development requirements cookies==2.2.1 coverage==4.3.4 diff --git a/setup.cfg b/setup.cfg index 3c19782f..ebdcd747 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,11 @@ [tool:pytest] -addopts = -v --cov pillar --cov-report term-missing --ignore node_modules -x +addopts = -v --cov pillar --cov-report term-missing --ignore node_modules + +[mypy] +python_version = 3.6 +warn_unused_ignores = True +ignore_missing_imports = True +follow_imports = skip [pep8] max-line-length = 100