From 441b39211b86d13c165ff2cdc0f57f9ad7bb63b6 Mon Sep 17 00:00:00 2001 From: Francesco Siddi Date: Wed, 4 Nov 2015 12:59:08 +0100 Subject: [PATCH] Support for file upload to Google Cloud Storage All other backends remain available and are still fully compatible with the File storage infrastructure. --- pillar/application/__init__.py | 22 ++++-- .../modules/file_storage/__init__.py | 18 +++-- pillar/application/utils/gcs.py | 76 +++++++++++-------- pillar/application/utils/storage.py | 25 ++++++ 4 files changed, 95 insertions(+), 46 deletions(-) diff --git a/pillar/application/__init__.py b/pillar/application/__init__.py index e1bacd29..d15bd9c2 100644 --- a/pillar/application/__init__.py +++ b/pillar/application/__init__.py @@ -73,10 +73,10 @@ def validate_token(): current_user = {} token = request.authorization.username - tokens = app.data.driver.db['tokens'] + tokens_collection = app.data.driver.db['tokens'] lookup = {'token': token, 'expire_time': {"$gt": datetime.now()}} - db_token = tokens.find_one(lookup) + db_token = tokens_collection.find_one(lookup) if not db_token: # If no valid token is found, we issue a new request to the Blender ID # to verify the validity of the token. We will get basic user info if @@ -99,7 +99,6 @@ def validate_token(): 'token': ''}] } r = post_internal('users', user_data) - print r user_id = r[0]['_id'] groups = None else: @@ -345,10 +344,15 @@ def post_POST_files(request, payload): app.on_post_POST_files += post_POST_files from utils.cdn import hash_file_path +from application.utils.gcs import GoogleCloudStorageBucket # Hook to check the backend of a file resource, to build an appropriate link # that can be used by the client to retrieve the actual file. -def generate_link(backend, path): - if backend == 'pillar': +def generate_link(backend, path, project_id=None): + if backend == 'gcs': + storage = GoogleCloudStorageBucket(project_id) + blob = storage.Get(path) + link = blob['signed_url'] + elif backend == 'pillar': link = url_for('file_storage.index', file_name=path, _external=True) elif backend == 'cdnsun': link = hash_file_path(path, None) @@ -357,11 +361,15 @@ def generate_link(backend, path): return link def before_returning_file(response): - response['link'] = generate_link(response['backend'], response['path']) + # TODO: add project id to all files + project_id = None if 'project' not in response else str(response['project']) + response['link'] = generate_link(response['backend'], response['path'], project_id) def before_returning_files(response): for item in response['_items']: - item['link'] = generate_link(item['backend'], item['path']) + # TODO: add project id to all files + project_id = None if 'project' not in item else str(item['project']) + item['link'] = generate_link(item['backend'], item['path'], project_id) app.on_fetched_item_files += before_returning_file diff --git a/pillar/application/modules/file_storage/__init__.py b/pillar/application/modules/file_storage/__init__.py index 12fa3de6..dbb0f876 100644 --- a/pillar/application/modules/file_storage/__init__.py +++ b/pillar/application/modules/file_storage/__init__.py @@ -13,6 +13,7 @@ from application.utils.imaging import generate_local_thumbnails from application.utils.imaging import get_video_data from application.utils.imaging import ffmpeg_encode from application.utils.storage import remote_storage_sync +from application.utils.storage import push_to_storage from application.utils.gcs import GoogleCloudStorageBucket file_storage = Blueprint('file_storage', __name__, @@ -45,7 +46,7 @@ def browse_gcs(bucket_name, subdir, file_path=None): return jsonify(listing) -@file_storage.route('/build_thumbnails/') +#@file_storage.route('/build_thumbnails/') def build_thumbnails(file_path=None, file_id=None): files_collection = app.data.driver.db['files'] if file_path: @@ -59,7 +60,7 @@ def build_thumbnails(file_path=None, file_id=None): user = file_['user'] - file_full_path = os.path.join(app.config['STORAGE_DIR'], file_path) + file_full_path = os.path.join(app.config['SHARED_DIR'], file_path[:2], file_path) # Does the original file exist? if not os.path.isfile(file_full_path): return "", 404 @@ -73,7 +74,6 @@ def build_thumbnails(file_path=None, file_id=None): continue basename = os.path.basename(thumbnail['path']) root, ext = os.path.splitext(basename) - path = os.path.join(basename[:2], basename) file_object = dict( name=root, #description="Preview of file {0}".format(file_['name']), @@ -88,7 +88,8 @@ def build_thumbnails(file_path=None, file_id=None): md5=thumbnail['md5'], filename=basename, backend=file_['backend'], - path=path) + path=basename, + project=file_['project']) # Commit to database r = post_item('files', file_object) if r[0]['_status'] == 'ERR': @@ -122,7 +123,7 @@ def process_file(src_file): files_collection = app.data.driver.db['files'] - file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name']) + file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name'][:2], src_file['name']) src_file['length'] = os.stat(file_abs_path).st_size # Remove properties that do not belong in the collection src_file.pop('_status', None) @@ -169,6 +170,7 @@ def process_file(src_file): file_object = dict( name=os.path.split(filename)[1], #description="Preview of file {0}".format(file_['name']), + project=src_file['project'], user=src_file['user'], parent=src_file['_id'], size="{0}p".format(res_y), @@ -204,7 +206,7 @@ def process_file(src_file): variation) # rsync the file file (this is async) - remote_storage_sync(path) + #remote_storage_sync(path) # When all encodes are done, delete source file @@ -215,8 +217,10 @@ def process_file(src_file): sync_path = os.path.split(file_abs_path)[0] else: sync_path = file_abs_path - remote_storage_sync(sync_path) + #remote_storage_sync(sync_path) + push_to_storage(str(src_file['project']), sync_path) + # Update the original file with additional info, e.g. image resolution file_asset = files_collection.find_and_modify( {'_id': src_file['_id']}, src_file) diff --git a/pillar/application/utils/gcs.py b/pillar/application/utils/gcs.py index 65c4979b..6bb531c5 100644 --- a/pillar/application/utils/gcs.py +++ b/pillar/application/utils/gcs.py @@ -21,28 +21,28 @@ class GoogleCloudStorageBucket(object): :param subdir: The local entrypoint to browse the bucket. """ + CGS_PROJECT_NAME = app.config['CGS_PROJECT_NAME'] + GCS_CLIENT_EMAIL = app.config['GCS_CLIENT_EMAIL'] + GCS_PRIVATE_KEY_PEM = app.config['GCS_PRIVATE_KEY_PEM'] + GCS_PRIVATE_KEY_P12 = app.config['GCS_PRIVATE_KEY_P12'] + + # Load private key in pem format (used by the API) + with open(GCS_PRIVATE_KEY_PEM) as f: + private_key_pem = f.read() + credentials_pem = SignedJwtAssertionCredentials(GCS_CLIENT_EMAIL, + private_key_pem, + 'https://www.googleapis.com/auth/devstorage.read_write') + + # Load private key in p12 format (used by the singed urls generator) + with open(GCS_PRIVATE_KEY_P12) as f: + private_key_pkcs12 = f.read() + credentials_p12 = SignedJwtAssertionCredentials(GCS_CLIENT_EMAIL, + private_key_pkcs12, + 'https://www.googleapis.com/auth/devstorage.read_write') + def __init__(self, bucket_name, subdir='_/'): - CGS_PROJECT_NAME = app.config['CGS_PROJECT_NAME'] - GCS_CLIENT_EMAIL = app.config['GCS_CLIENT_EMAIL'] - GCS_PRIVATE_KEY_PEM = app.config['GCS_PRIVATE_KEY_PEM'] - GCS_PRIVATE_KEY_P12 = app.config['GCS_PRIVATE_KEY_P12'] - - # Load private key in pem format (used by the API) - with open(GCS_PRIVATE_KEY_PEM) as f: - private_key_pem = f.read() - credentials_pem = SignedJwtAssertionCredentials(GCS_CLIENT_EMAIL, - private_key_pem, - 'https://www.googleapis.com/auth/devstorage.read_write') - - # Load private key in p12 format (used by the singed urls generator) - with open(GCS_PRIVATE_KEY_P12) as f: - private_key_pkcs12 = f.read() - self.credentials_p12 = SignedJwtAssertionCredentials(GCS_CLIENT_EMAIL, - private_key_pkcs12, - 'https://www.googleapis.com/auth/devstorage.read_write') - - gcs = Client(project=CGS_PROJECT_NAME, credentials=credentials_pem) + gcs = Client(project=self.CGS_PROJECT_NAME, credentials=self.credentials_pem) self.bucket = gcs.get_bucket(bucket_name) self.subdir = subdir @@ -89,6 +89,18 @@ class GoogleCloudStorageBucket(object): return list_dict + def blob_to_dict(self, blob): + blob.reload() + expiration = datetime.datetime.now() + datetime.timedelta(days=1) + expiration = int(time.mktime(expiration.timetuple())) + return dict( + updated=blob.updated, + name=os.path.basename(blob.name), + size=blob.size, + content_type=blob.content_type, + signed_url=blob.generate_signed_url(expiration, credentials=self.credentials_p12)) + + def Get(self, path): """Get selected file info if the path matches. @@ -96,17 +108,17 @@ class GoogleCloudStorageBucket(object): :param path: The relative path to the file. """ path = os.path.join(self.subdir, path) - f = self.bucket.blob(path) - if f.exists(): - f.reload() - expiration = datetime.datetime.now() + datetime.timedelta(days=1) - expiration = int(time.mktime(expiration.timetuple())) - file_dict = dict( - updated=f.updated, - name=os.path.basename(f.name), - size=f.size, - content_type=f.content_type, - signed_url=f.generate_signed_url(expiration, credentials=self.credentials_p12)) - return file_dict + blob = self.bucket.blob(path) + if blob.exists(): + return self.blob_to_dict(blob) else: return None + + + def Post(self, full_path, path=None): + """Create new blob and upload data to it. + """ + path = path if path else os.path.join('_', os.path.basename(full_path)) + blob = self.bucket.blob(path) + blob.upload_from_filename(full_path) + return self.blob_to_dict(blob) diff --git a/pillar/application/utils/storage.py b/pillar/application/utils/storage.py index e880b335..123f5286 100644 --- a/pillar/application/utils/storage.py +++ b/pillar/application/utils/storage.py @@ -2,6 +2,7 @@ import os import subprocess #import logging from application import app +from application.utils.gcs import GoogleCloudStorageBucket BIN_FFPROBE = app.config['BIN_FFPROBE'] BIN_FFMPEG = app.config['BIN_FFMPEG'] @@ -55,3 +56,27 @@ def remote_storage_sync(path): #can be both folder and file else: raise IOError('ERROR: path not found') + +def push_to_storage(project_id, full_path, backend='cgs'): + """Move a file from temporary/processing local storage to a storage endpoint. + By default we store items in a Google Cloud Storage bucket named after the + project id. + """ + def push_single_file(project_id, full_path, backend): + if backend == 'cgs': + storage = GoogleCloudStorageBucket(project_id, subdir='_') + storage.Post(full_path) + os.remove(full_path) + + if os.path.isfile(full_path): + push_single_file(project_id, full_path, backend) + else: + if os.path.exists(full_path): + for root, dirs, files in os.walk(full_path): + for name in files: + push_single_file(project_id, os.path.join(root, name), backend) + + else: + raise IOError('ERROR: path not found') + + pass