diff --git a/pillar/api/file_storage.py b/pillar/api/file_storage.py index 0a4f6ca3..80a34ad3 100644 --- a/pillar/api/file_storage.py +++ b/pillar/api/file_storage.py @@ -4,11 +4,11 @@ import mimetypes import tempfile import uuid from hashlib import md5 - +import os +import requests import bson.tz_util import datetime import eve.utils -import os import pymongo import werkzeug.exceptions as wz_exceptions from bson import ObjectId @@ -627,7 +627,7 @@ def assert_file_size_allowed(file_size): @file_storage.route('/stream/', methods=['POST', 'OPTIONS']) @require_login() -def stream_to_gcs(project_id): +def stream_to_storage(project_id): project_oid = utils.str2id(project_id) projects = current_app.data.driver.db['projects'] @@ -667,7 +667,8 @@ def stream_to_gcs(project_id): # Figure out the file size, as we need to pass this in explicitly to GCloud. # Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't - # supported by a BytesIO object (even though it does have a fileno attribute). + # supported by a BytesIO object (even though it does have a fileno + # attribute). if isinstance(stream_for_gcs, io.BytesIO): file_size = len(stream_for_gcs.getvalue()) else: @@ -677,41 +678,22 @@ def stream_to_gcs(project_id): assert_file_size_allowed(file_size) # Create file document in MongoDB. - file_id, internal_fname, status = create_file_doc_for_upload(project_oid, uploaded_file) + file_id, internal_fname, status = create_file_doc_for_upload(project_oid, + uploaded_file) if current_app.config['TESTING']: - log.warning('NOT streaming to GCS because TESTING=%r', current_app.config['TESTING']) + log.warning('NOT streaming to GCS because TESTING=%r', + current_app.config['TESTING']) # Fake a Blob object. gcs = None blob = type('Blob', (), {'size': file_size}) else: - # Upload the file to GCS. - from gcloud.streaming import transfer + blob, gcs = stream_to_gcs(file_id, file_size, internal_fname, + project_id, stream_for_gcs, + uploaded_file.mimetype) - log.debug('Streaming file to GCS bucket; id=%s, fname=%s, size=%i', - file_id, internal_fname, file_size) - - # Files larger than this many bytes will be streamed directly from disk, smaller - # ones will be read into memory and then uploaded. - transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400 - try: - gcs = GoogleCloudStorageBucket(project_id) - blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2) - blob.upload_from_file(stream_for_gcs, size=file_size, - content_type=uploaded_file.mimetype) - except Exception: - log.exception('Error uploading file to Google Cloud Storage (GCS),' - ' aborting handling of uploaded file (id=%s).', file_id) - update_file_doc(file_id, status='failed') - raise wz_exceptions.InternalServerError('Unable to stream file to Google Cloud Storage') - - if stream_for_gcs.closed: - log.error('Eek, GCS closed its stream, Andy is not going to like this.') - - # Reload the blob to get the file size according to Google. - blob.reload() - - log.debug('Marking uploaded file id=%s, fname=%s, size=%i as "queued_for_processing"', + log.debug('Marking uploaded file id=%s, fname=%s, ' + 'size=%i as "queued_for_processing"', file_id, internal_fname, blob.size) update_file_doc(file_id, status='queued_for_processing', @@ -719,7 +701,8 @@ def stream_to_gcs(project_id): length=blob.size, content_type=uploaded_file.mimetype) - log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, internal_fname, blob.size) + log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, + internal_fname, blob.size) process_file(gcs, file_id, local_file) # Local processing is done, we can close the local file so it is removed. @@ -729,7 +712,8 @@ def stream_to_gcs(project_id): log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i', file_id, internal_fname, blob.size, status) - # Status is 200 if the file already existed, and 201 if it was newly created. + # Status is 200 if the file already existed, and 201 if it was newly + # created. # TODO: add a link to a thumbnail in the response. resp = jsonify(status='ok', file_id=str(file_id)) resp.status_code = status @@ -737,6 +721,32 @@ def stream_to_gcs(project_id): return resp +def stream_to_gcs(file_id, file_size, internal_fname, project_id, + stream_for_gcs, content_type): + # Upload the file to GCS. + from gcloud.streaming import transfer + log.debug('Streaming file to GCS bucket; id=%s, fname=%s, size=%i', + file_id, internal_fname, file_size) + # Files larger than this many bytes will be streamed directly from disk, + # smaller ones will be read into memory and then uploaded. + transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400 + try: + gcs = GoogleCloudStorageBucket(project_id) + blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2) + blob.upload_from_file(stream_for_gcs, size=file_size, + content_type=content_type) + except Exception: + log.exception('Error uploading file to Google Cloud Storage (GCS),' + ' aborting handling of uploaded file (id=%s).', file_id) + update_file_doc(file_id, status='failed') + raise wz_exceptions.InternalServerError( + 'Unable to stream file to Google Cloud Storage') + + # Reload the blob to get the file size according to Google. + blob.reload() + return blob, gcs + + def add_access_control_headers(resp): """Allows cross-site requests from the configured domain.""" @@ -828,6 +838,74 @@ def compute_aggregate_length_items(file_docs): compute_aggregate_length(file_doc) +def change_file_storage_backend(file_id, dest_backend): + """Given a file document, move it to the specified backend (if not already + there) and update the document to reflect that. + Files on the original backend are not deleted automatically. + """ + + # Fetch file + files_collection = current_app.data.driver.db['files'] + f = files_collection.find_one(ObjectId(file_id)) + if f is None: + log.warning('File with _id: {} not found'.format(file_id)) + return + # Check that new backend differs from current one + if dest_backend == f['backend']: + log.warning('Destination backend ({}) matches the current backend, we ' + 'are not moving the file'.format(dest_backend)) + return + # TODO Check that new backend is allowed (make conf var) + # Check that the file has a project + if 'project' not in f: + log.warning('File document does not have a project') + return + # Upload file and variations to the new backend + move_file_to_backend(f, dest_backend) + # Update document to reflect the changes + + +def move_file_to_backend(file_doc, dest_backend): + # If the file is not local already, fetch it + if file_doc['backend'] != 'local': + # TODO ensure that file['link'] is up to date + local_file = fetch_file_from_link(file_doc['link']) + + # Upload to GCS + if dest_backend == 'gcs': + # Filenames on GCS do not contain paths, by our convention + internal_fname = os.path.basename(file_doc['file_path']) + # TODO check for name collisions + stream_to_gcs(file_doc['_id'], local_file['file_size'], + internal_fname=internal_fname, + project_id=str(file_doc['project']), + stream_for_gcs=local_file['local_file'], + content_type=local_file['content_type']) + + +def fetch_file_from_link(link): + """Utility to download a file from a remote location and return it with + additional info (for upload to a different storage backend). + """ + r = requests.get(link, stream=True) + # If the file is not found we will use one from the variations. Original + # files might not exists because they were too large to keep. + if r.status_code == 404: + pass + local_file = tempfile.NamedTemporaryFile( + dir=current_app.config['STORAGE_DIR']) + with open(local_file, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + file_dict = { + 'file_size': os.fstat(local_file.fileno()).st_size, + 'content_type': r.headers['content-type'], + 'local_file': local_file + } + return file_dict + + def setup_app(app, url_prefix): app.on_pre_GET_files += on_pre_get_files diff --git a/pillar/cli.py b/pillar/cli.py index 71b35aba..c36e6849 100644 --- a/pillar/cli.py +++ b/pillar/cli.py @@ -389,3 +389,14 @@ def check_cdnsun(): print('Missing main: %i' % missing_main) print('Missing vars: %i' % missing_variation) + + +@manager.command +def file_change_backend(file_id, dest_backend='gcs'): + """Given a file document, move it to the specified backend (if not already + there) and update the document to reflect that. + Files on the original backend are not deleted automatically. + """ + + from pillar.api.file_storage import change_file_storage_backend + change_file_storage_backend(file_id, dest_backend) \ No newline at end of file