From aecab0561ecd09577ef3614a2b5cce3deb683030 Mon Sep 17 00:00:00 2001 From: Francesco Siddi Date: Mon, 7 Nov 2016 02:18:23 +0100 Subject: [PATCH] WIP introducing STORAGE_BACKEND We introduce two new classes StorageBackend and FileInStorage, which are subclassed by CGS and local Pillar. This makes supporting multiple storage solutions easier. --- pillar/api/file_storage/__init__.py | 68 +++++++++------- pillar/api/utils/gcs.py | 20 ++++- pillar/api/utils/storage.py | 119 +++++++++++----------------- pillar/config.py | 2 + 4 files changed, 108 insertions(+), 101 deletions(-) diff --git a/pillar/api/file_storage/__init__.py b/pillar/api/file_storage/__init__.py index 6705ebd3..d1f4a48d 100644 --- a/pillar/api/file_storage/__init__.py +++ b/pillar/api/file_storage/__init__.py @@ -26,7 +26,9 @@ from pillar.api.utils.authorization import require_login, user_has_role, \ user_matches_roles from pillar.api.utils.cdn import hash_file_path from pillar.api.utils.encoding import Encoder -from pillar.api.utils.gcs import GoogleCloudStorageBucket +from pillar.api.utils.gcs import GoogleCloudStorageBucket, \ + GoogleCloudStorageBlob +from pillar.api.utils.storage import PillarStorage, PillarStorageFile log = logging.getLogger(__name__) @@ -307,14 +309,17 @@ def generate_link(backend, file_path, project_id=None, is_public=False): storage = GoogleCloudStorageBucket(project_id) blob = storage.Get(file_path) if blob is None: - log.warning('generate_link(%r, %r): unable to find blob for file path,' - ' returning empty link.', backend, file_path) + log.warning('generate_link(%r, %r): unable to find blob for file' + ' path, returning empty link.', backend, file_path) return '' if is_public: return blob['public_url'] return blob['signed_url'] - + if backend == 'local': + f = PillarStorageFile(project_id, file_path) + return url_for('file_storage.index', file_name=f.partial_path, + _external=True, _scheme=current_app.config['SCHEME']) if backend == 'pillar': return url_for('file_storage.index', file_name=file_path, _external=True, _scheme=current_app.config['SCHEME']) @@ -323,7 +328,8 @@ def generate_link(backend, file_path, project_id=None, is_public=False): if backend == 'unittest': return 'https://unit.test/%s' % md5(file_path).hexdigest() - log.warning('generate_link(): Unknown backend %r, returning empty string as new link.', + log.warning('generate_link(): Unknown backend %r, returning empty string ' + 'as new link.', backend) return '' @@ -641,10 +647,10 @@ def stream_to_storage(project_id): log.info('request.headers[Origin] = %r', request.headers.get('Origin')) log.info('request.content_length = %r', request.content_length) - # Try a check for the content length before we access request.files[]. This allows us - # to abort the upload early. The entire body content length is always a bit larger than - # the actual file size, so if we accept here, we're sure it'll be accepted in subsequent - # checks as well. + # Try a check for the content length before we access request.files[]. + # This allows us to abort the upload early. The entire body content length + # is always a bit larger than the actual file size, so if we accept here, + # we're sure it'll be accepted in subsequent checks as well. if request.content_length: assert_file_size_allowed(request.content_length) @@ -659,15 +665,17 @@ def stream_to_storage(project_id): override_content_type(uploaded_file) if not uploaded_file.content_type: - log.warning('File uploaded to project %s without content type.', project_oid) + log.warning('File uploaded to project %s without content type.', + project_oid) raise wz_exceptions.BadRequest('Missing content type.') if uploaded_file.content_type.startswith('image/'): # We need to do local thumbnailing, so we have to write the stream # both to Google Cloud Storage and to local storage. - local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR']) + local_file = tempfile.NamedTemporaryFile( + dir=current_app.config['STORAGE_DIR']) uploaded_file.save(local_file) - local_file.seek(0) # Make sure that a re-read starts from the beginning. + local_file.seek(0) # Make sure that re-read starts from the beginning. stream_for_gcs = local_file else: local_file = None @@ -688,37 +696,43 @@ def stream_to_storage(project_id): # Create file document in MongoDB. file_id, internal_fname, status = create_file_doc_for_upload(project_oid, uploaded_file) + storage_backend = None + file_in_storage = None if current_app.config['TESTING']: log.warning('NOT streaming to GCS because TESTING=%r', current_app.config['TESTING']) # Fake a Blob object. - gcs = None - blob = type('Blob', (), {'size': file_size}) + file_in_storage = type('Blob', (), {'size': file_size}) else: - blob, gcs = stream_to_gcs(file_id, file_size, internal_fname, - project_id, stream_for_gcs, - uploaded_file.mimetype) + if current_app.config['STORAGE_BACKEND'] == 'gcs': + file_in_storage, storage_backend = stream_to_gcs( + file_id, file_size, internal_fname, project_id, stream_for_gcs, + uploaded_file.mimetype) + elif current_app.config['STORAGE_BACKEND'] == 'local': + storage_backend = PillarStorage(project_id) + file_in_storage = PillarStorageFile(project_id, internal_fname) + file_in_storage.create_from_file(uploaded_file, file_size) log.debug('Marking uploaded file id=%s, fname=%s, ' 'size=%i as "queued_for_processing"', - file_id, internal_fname, blob.size) + file_id, internal_fname, file_size) update_file_doc(file_id, status='queued_for_processing', file_path=internal_fname, - length=blob.size, + length=file_in_storage.size, content_type=uploaded_file.mimetype) log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, - internal_fname, blob.size) - process_file(gcs, file_id, local_file) + internal_fname, file_in_storage.size) + process_file(storage_backend, file_id, local_file) # Local processing is done, we can close the local file so it is removed. if local_file is not None: local_file.close() log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i', - file_id, internal_fname, blob.size, status) + file_id, internal_fname, file_in_storage.size, status) # Status is 200 if the file already existed, and 201 if it was newly # created. @@ -740,9 +754,9 @@ def stream_to_gcs(file_id, file_size, internal_fname, project_id, transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400 try: gcs = GoogleCloudStorageBucket(project_id) - blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2) - blob.upload_from_file(stream_for_gcs, size=file_size, - content_type=content_type) + file_in_storage = GoogleCloudStorageBlob(gcs, internal_fname) + file_in_storage.blob.upload_from_file(stream_for_gcs, size=file_size, + content_type=content_type) except Exception: log.exception('Error uploading file to Google Cloud Storage (GCS),' ' aborting handling of uploaded file (id=%s).', file_id) @@ -751,8 +765,8 @@ def stream_to_gcs(file_id, file_size, internal_fname, project_id, 'Unable to stream file to Google Cloud Storage') # Reload the blob to get the file size according to Google. - blob.reload() - return blob, gcs + file_in_storage.blob.reload() + return file_in_storage, gcs def add_access_control_headers(resp): diff --git a/pillar/api/utils/gcs.py b/pillar/api/utils/gcs.py index 60c2f5c6..e6c1cc10 100644 --- a/pillar/api/utils/gcs.py +++ b/pillar/api/utils/gcs.py @@ -9,6 +9,8 @@ from gcloud.exceptions import NotFound from flask import current_app, g from werkzeug.local import LocalProxy +from pillar.api.utils.storage import StorageBackend, FileInStorage + log = logging.getLogger(__name__) @@ -32,7 +34,7 @@ def get_client(): gcs = LocalProxy(get_client) -class GoogleCloudStorageBucket(object): +class GoogleCloudStorageBucket(StorageBackend): """Cloud Storage bucket interface. We create a bucket for every project. In the bucket we create first level subdirs as follows: - '_' (will contain hashed assets, and stays on top of default listing) @@ -49,6 +51,7 @@ class GoogleCloudStorageBucket(object): """ def __init__(self, bucket_name, subdir='_/'): + super(GoogleCloudStorageBucket, self).__init__(backend='cgs') try: self.bucket = gcs.get_bucket(bucket_name) except NotFound: @@ -178,6 +181,21 @@ class GoogleCloudStorageBucket(object): assert isinstance(to_bucket, GoogleCloudStorageBucket) return self.bucket.copy_blob(blob, to_bucket.bucket) + def get_blob(self, internal_fname, chunk_size=256 * 1024 * 2): + return self.bucket.blob('_/' + internal_fname, chunk_size) + + +class GoogleCloudStorageBlob(FileInStorage): + """GCS blob interface.""" + def __init__(self, bucket, internal_fname): + super(GoogleCloudStorageBlob, self).__init__(backend='cgs') + + self.blob = bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2) + self.size = self.get_size() + + def get_size(self): + return self.blob.size + def update_file_name(node): """Assign to the CGS blob the same name of the asset node. This way when diff --git a/pillar/api/utils/storage.py b/pillar/api/utils/storage.py index f5e44a5e..f6c86644 100644 --- a/pillar/api/utils/storage.py +++ b/pillar/api/utils/storage.py @@ -1,83 +1,56 @@ -import subprocess +"""Utility for managing storage backends and files.""" +import logging import os from flask import current_app -from pillar.api.utils.gcs import GoogleCloudStorageBucket + +log = logging.getLogger(__name__) -def get_sizedata(filepath): - outdata = dict( - size=int(os.stat(filepath).st_size) - ) - return outdata +class StorageBackend(object): + """Can be a GCS bucket or simply a project folder in Pillar + :type backend: string + :param backend: Name of the storage backend (gcs, pillar, cdnsun). -def rsync(path, remote_dir=''): - BIN_SSH = current_app.config['BIN_SSH'] - BIN_RSYNC = current_app.config['BIN_RSYNC'] - - DRY_RUN = False - arguments = ['--verbose', '--ignore-existing', '--recursive', '--human-readable'] - logs_path = current_app.config['CDN_SYNC_LOGS'] - storage_address = current_app.config['CDN_STORAGE_ADDRESS'] - user = current_app.config['CDN_STORAGE_USER'] - rsa_key_path = current_app.config['CDN_RSA_KEY'] - known_hosts_path = current_app.config['CDN_KNOWN_HOSTS'] - - if DRY_RUN: - arguments.append('--dry-run') - folder_arguments = list(arguments) - if rsa_key_path: - folder_arguments.append( - '-e ' + BIN_SSH + ' -i ' + rsa_key_path + ' -o "StrictHostKeyChecking=no"') - # if known_hosts_path: - # folder_arguments.append("-o UserKnownHostsFile " + known_hosts_path) - folder_arguments.append("--log-file=" + logs_path + "/rsync.log") - folder_arguments.append(path) - folder_arguments.append(user + "@" + storage_address + ":/public/" + remote_dir) - # print (folder_arguments) - devnull = open(os.devnull, 'wb') - # DEBUG CONFIG - # print folder_arguments - # proc = subprocess.Popen(['rsync'] + folder_arguments) - # stdout, stderr = proc.communicate() - subprocess.Popen(['nohup', BIN_RSYNC] + folder_arguments, stdout=devnull, stderr=devnull) - - -def remote_storage_sync(path): # can be both folder and file - if os.path.isfile(path): - filename = os.path.split(path)[1] - rsync(path, filename[:2] + '/') - else: - if os.path.exists(path): - rsync(path) - else: - raise IOError('ERROR: path not found') - - -def push_to_storage(project_id, full_path, backend='cgs'): - """Move a file from temporary/processing local storage to a storage endpoint. - By default we store items in a Google Cloud Storage bucket named after the - project id. """ - def push_single_file(project_id, full_path, backend): - if backend == 'cgs': - storage = GoogleCloudStorageBucket(project_id, subdir='_') - blob = storage.Post(full_path) - # XXX Make public on the fly if it's an image and small preview. - # This should happen by reading the database (push to storage - # should change to accomodate it). - if blob is not None and full_path.endswith('-t.jpg'): - blob.make_public() - os.remove(full_path) + def __init__(self, backend): + self.backend = backend + + +class FileInStorage(object): + """A wrapper for file or blob objects. + + :type backend: string + :param backend: Name of the storage backend (gcs, pillar, cdnsun). + + """ + + def __init__(self, backend): + self.backend = backend + self.path = None + self.size = None + + +class PillarStorage(StorageBackend): + def __init__(self, project_id): + super(PillarStorage, self).__init__(backend='local') + + +class PillarStorageFile(FileInStorage): + def __init__(self, project_id, internal_fname): + super(PillarStorageFile, self).__init__(backend='local') + + self.size = None + self.partial_path = os.path.join(project_id[:2], project_id, + internal_fname[:2], internal_fname) + self.path = os.path.join( + current_app.config['STORAGE_DIR'], self.partial_path) + + def create_from_file(self, uploaded_file, file_size): + # Ensure path exists before saving + os.makedirs(os.path.basename(self.path)) + uploaded_file.save(self.path) + self.size = file_size - if os.path.isfile(full_path): - push_single_file(project_id, full_path, backend) - else: - if os.path.exists(full_path): - for root, dirs, files in os.walk(full_path): - for name in files: - push_single_file(project_id, os.path.join(root, name), backend) - else: - raise IOError('ERROR: path not found') diff --git a/pillar/config.py b/pillar/config.py index 4314a70b..3f7696c2 100644 --- a/pillar/config.py +++ b/pillar/config.py @@ -71,6 +71,8 @@ ZENCODER_NOTIFICATIONS_URL = 'http://zencoderfetcher/' ENCODING_BACKEND = 'zencoder' # local, flamenco +STORAGE_BACKEND = 'local' # gcs + # Validity period of links, per file storage backend. Expressed in seconds. # Shouldn't be more than a year, as this isn't supported by HTTP/1.1. FILE_LINK_VALIDITY = defaultdict(