WIP for change file backends
This commit is contained in:
@@ -4,11 +4,11 @@ import mimetypes
|
|||||||
import tempfile
|
import tempfile
|
||||||
import uuid
|
import uuid
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
import bson.tz_util
|
import bson.tz_util
|
||||||
import datetime
|
import datetime
|
||||||
import eve.utils
|
import eve.utils
|
||||||
import os
|
|
||||||
import pymongo
|
import pymongo
|
||||||
import werkzeug.exceptions as wz_exceptions
|
import werkzeug.exceptions as wz_exceptions
|
||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
@@ -627,7 +627,7 @@ def assert_file_size_allowed(file_size):
|
|||||||
|
|
||||||
@file_storage.route('/stream/<string:project_id>', methods=['POST', 'OPTIONS'])
|
@file_storage.route('/stream/<string:project_id>', methods=['POST', 'OPTIONS'])
|
||||||
@require_login()
|
@require_login()
|
||||||
def stream_to_gcs(project_id):
|
def stream_to_storage(project_id):
|
||||||
project_oid = utils.str2id(project_id)
|
project_oid = utils.str2id(project_id)
|
||||||
|
|
||||||
projects = current_app.data.driver.db['projects']
|
projects = current_app.data.driver.db['projects']
|
||||||
@@ -667,7 +667,8 @@ def stream_to_gcs(project_id):
|
|||||||
|
|
||||||
# Figure out the file size, as we need to pass this in explicitly to GCloud.
|
# Figure out the file size, as we need to pass this in explicitly to GCloud.
|
||||||
# Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't
|
# Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't
|
||||||
# supported by a BytesIO object (even though it does have a fileno attribute).
|
# supported by a BytesIO object (even though it does have a fileno
|
||||||
|
# attribute).
|
||||||
if isinstance(stream_for_gcs, io.BytesIO):
|
if isinstance(stream_for_gcs, io.BytesIO):
|
||||||
file_size = len(stream_for_gcs.getvalue())
|
file_size = len(stream_for_gcs.getvalue())
|
||||||
else:
|
else:
|
||||||
@@ -677,41 +678,22 @@ def stream_to_gcs(project_id):
|
|||||||
assert_file_size_allowed(file_size)
|
assert_file_size_allowed(file_size)
|
||||||
|
|
||||||
# Create file document in MongoDB.
|
# Create file document in MongoDB.
|
||||||
file_id, internal_fname, status = create_file_doc_for_upload(project_oid, uploaded_file)
|
file_id, internal_fname, status = create_file_doc_for_upload(project_oid,
|
||||||
|
uploaded_file)
|
||||||
|
|
||||||
if current_app.config['TESTING']:
|
if current_app.config['TESTING']:
|
||||||
log.warning('NOT streaming to GCS because TESTING=%r', current_app.config['TESTING'])
|
log.warning('NOT streaming to GCS because TESTING=%r',
|
||||||
|
current_app.config['TESTING'])
|
||||||
# Fake a Blob object.
|
# Fake a Blob object.
|
||||||
gcs = None
|
gcs = None
|
||||||
blob = type('Blob', (), {'size': file_size})
|
blob = type('Blob', (), {'size': file_size})
|
||||||
else:
|
else:
|
||||||
# Upload the file to GCS.
|
blob, gcs = stream_to_gcs(file_id, file_size, internal_fname,
|
||||||
from gcloud.streaming import transfer
|
project_id, stream_for_gcs,
|
||||||
|
uploaded_file.mimetype)
|
||||||
|
|
||||||
log.debug('Streaming file to GCS bucket; id=%s, fname=%s, size=%i',
|
log.debug('Marking uploaded file id=%s, fname=%s, '
|
||||||
file_id, internal_fname, file_size)
|
'size=%i as "queued_for_processing"',
|
||||||
|
|
||||||
# Files larger than this many bytes will be streamed directly from disk, smaller
|
|
||||||
# ones will be read into memory and then uploaded.
|
|
||||||
transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400
|
|
||||||
try:
|
|
||||||
gcs = GoogleCloudStorageBucket(project_id)
|
|
||||||
blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2)
|
|
||||||
blob.upload_from_file(stream_for_gcs, size=file_size,
|
|
||||||
content_type=uploaded_file.mimetype)
|
|
||||||
except Exception:
|
|
||||||
log.exception('Error uploading file to Google Cloud Storage (GCS),'
|
|
||||||
' aborting handling of uploaded file (id=%s).', file_id)
|
|
||||||
update_file_doc(file_id, status='failed')
|
|
||||||
raise wz_exceptions.InternalServerError('Unable to stream file to Google Cloud Storage')
|
|
||||||
|
|
||||||
if stream_for_gcs.closed:
|
|
||||||
log.error('Eek, GCS closed its stream, Andy is not going to like this.')
|
|
||||||
|
|
||||||
# Reload the blob to get the file size according to Google.
|
|
||||||
blob.reload()
|
|
||||||
|
|
||||||
log.debug('Marking uploaded file id=%s, fname=%s, size=%i as "queued_for_processing"',
|
|
||||||
file_id, internal_fname, blob.size)
|
file_id, internal_fname, blob.size)
|
||||||
update_file_doc(file_id,
|
update_file_doc(file_id,
|
||||||
status='queued_for_processing',
|
status='queued_for_processing',
|
||||||
@@ -719,7 +701,8 @@ def stream_to_gcs(project_id):
|
|||||||
length=blob.size,
|
length=blob.size,
|
||||||
content_type=uploaded_file.mimetype)
|
content_type=uploaded_file.mimetype)
|
||||||
|
|
||||||
log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, internal_fname, blob.size)
|
log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id,
|
||||||
|
internal_fname, blob.size)
|
||||||
process_file(gcs, file_id, local_file)
|
process_file(gcs, file_id, local_file)
|
||||||
|
|
||||||
# Local processing is done, we can close the local file so it is removed.
|
# Local processing is done, we can close the local file so it is removed.
|
||||||
@@ -729,7 +712,8 @@ def stream_to_gcs(project_id):
|
|||||||
log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i',
|
log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i',
|
||||||
file_id, internal_fname, blob.size, status)
|
file_id, internal_fname, blob.size, status)
|
||||||
|
|
||||||
# Status is 200 if the file already existed, and 201 if it was newly created.
|
# Status is 200 if the file already existed, and 201 if it was newly
|
||||||
|
# created.
|
||||||
# TODO: add a link to a thumbnail in the response.
|
# TODO: add a link to a thumbnail in the response.
|
||||||
resp = jsonify(status='ok', file_id=str(file_id))
|
resp = jsonify(status='ok', file_id=str(file_id))
|
||||||
resp.status_code = status
|
resp.status_code = status
|
||||||
@@ -737,6 +721,32 @@ def stream_to_gcs(project_id):
|
|||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
def stream_to_gcs(file_id, file_size, internal_fname, project_id,
|
||||||
|
stream_for_gcs, content_type):
|
||||||
|
# Upload the file to GCS.
|
||||||
|
from gcloud.streaming import transfer
|
||||||
|
log.debug('Streaming file to GCS bucket; id=%s, fname=%s, size=%i',
|
||||||
|
file_id, internal_fname, file_size)
|
||||||
|
# Files larger than this many bytes will be streamed directly from disk,
|
||||||
|
# smaller ones will be read into memory and then uploaded.
|
||||||
|
transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400
|
||||||
|
try:
|
||||||
|
gcs = GoogleCloudStorageBucket(project_id)
|
||||||
|
blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2)
|
||||||
|
blob.upload_from_file(stream_for_gcs, size=file_size,
|
||||||
|
content_type=content_type)
|
||||||
|
except Exception:
|
||||||
|
log.exception('Error uploading file to Google Cloud Storage (GCS),'
|
||||||
|
' aborting handling of uploaded file (id=%s).', file_id)
|
||||||
|
update_file_doc(file_id, status='failed')
|
||||||
|
raise wz_exceptions.InternalServerError(
|
||||||
|
'Unable to stream file to Google Cloud Storage')
|
||||||
|
|
||||||
|
# Reload the blob to get the file size according to Google.
|
||||||
|
blob.reload()
|
||||||
|
return blob, gcs
|
||||||
|
|
||||||
|
|
||||||
def add_access_control_headers(resp):
|
def add_access_control_headers(resp):
|
||||||
"""Allows cross-site requests from the configured domain."""
|
"""Allows cross-site requests from the configured domain."""
|
||||||
|
|
||||||
@@ -828,6 +838,74 @@ def compute_aggregate_length_items(file_docs):
|
|||||||
compute_aggregate_length(file_doc)
|
compute_aggregate_length(file_doc)
|
||||||
|
|
||||||
|
|
||||||
|
def change_file_storage_backend(file_id, dest_backend):
|
||||||
|
"""Given a file document, move it to the specified backend (if not already
|
||||||
|
there) and update the document to reflect that.
|
||||||
|
Files on the original backend are not deleted automatically.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Fetch file
|
||||||
|
files_collection = current_app.data.driver.db['files']
|
||||||
|
f = files_collection.find_one(ObjectId(file_id))
|
||||||
|
if f is None:
|
||||||
|
log.warning('File with _id: {} not found'.format(file_id))
|
||||||
|
return
|
||||||
|
# Check that new backend differs from current one
|
||||||
|
if dest_backend == f['backend']:
|
||||||
|
log.warning('Destination backend ({}) matches the current backend, we '
|
||||||
|
'are not moving the file'.format(dest_backend))
|
||||||
|
return
|
||||||
|
# TODO Check that new backend is allowed (make conf var)
|
||||||
|
# Check that the file has a project
|
||||||
|
if 'project' not in f:
|
||||||
|
log.warning('File document does not have a project')
|
||||||
|
return
|
||||||
|
# Upload file and variations to the new backend
|
||||||
|
move_file_to_backend(f, dest_backend)
|
||||||
|
# Update document to reflect the changes
|
||||||
|
|
||||||
|
|
||||||
|
def move_file_to_backend(file_doc, dest_backend):
|
||||||
|
# If the file is not local already, fetch it
|
||||||
|
if file_doc['backend'] != 'local':
|
||||||
|
# TODO ensure that file['link'] is up to date
|
||||||
|
local_file = fetch_file_from_link(file_doc['link'])
|
||||||
|
|
||||||
|
# Upload to GCS
|
||||||
|
if dest_backend == 'gcs':
|
||||||
|
# Filenames on GCS do not contain paths, by our convention
|
||||||
|
internal_fname = os.path.basename(file_doc['file_path'])
|
||||||
|
# TODO check for name collisions
|
||||||
|
stream_to_gcs(file_doc['_id'], local_file['file_size'],
|
||||||
|
internal_fname=internal_fname,
|
||||||
|
project_id=str(file_doc['project']),
|
||||||
|
stream_for_gcs=local_file['local_file'],
|
||||||
|
content_type=local_file['content_type'])
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_file_from_link(link):
|
||||||
|
"""Utility to download a file from a remote location and return it with
|
||||||
|
additional info (for upload to a different storage backend).
|
||||||
|
"""
|
||||||
|
r = requests.get(link, stream=True)
|
||||||
|
# If the file is not found we will use one from the variations. Original
|
||||||
|
# files might not exists because they were too large to keep.
|
||||||
|
if r.status_code == 404:
|
||||||
|
pass
|
||||||
|
local_file = tempfile.NamedTemporaryFile(
|
||||||
|
dir=current_app.config['STORAGE_DIR'])
|
||||||
|
with open(local_file, 'wb') as f:
|
||||||
|
for chunk in r.iter_content(chunk_size=1024):
|
||||||
|
if chunk:
|
||||||
|
f.write(chunk)
|
||||||
|
file_dict = {
|
||||||
|
'file_size': os.fstat(local_file.fileno()).st_size,
|
||||||
|
'content_type': r.headers['content-type'],
|
||||||
|
'local_file': local_file
|
||||||
|
}
|
||||||
|
return file_dict
|
||||||
|
|
||||||
|
|
||||||
def setup_app(app, url_prefix):
|
def setup_app(app, url_prefix):
|
||||||
app.on_pre_GET_files += on_pre_get_files
|
app.on_pre_GET_files += on_pre_get_files
|
||||||
|
|
||||||
|
@@ -389,3 +389,14 @@ def check_cdnsun():
|
|||||||
|
|
||||||
print('Missing main: %i' % missing_main)
|
print('Missing main: %i' % missing_main)
|
||||||
print('Missing vars: %i' % missing_variation)
|
print('Missing vars: %i' % missing_variation)
|
||||||
|
|
||||||
|
|
||||||
|
@manager.command
|
||||||
|
def file_change_backend(file_id, dest_backend='gcs'):
|
||||||
|
"""Given a file document, move it to the specified backend (if not already
|
||||||
|
there) and update the document to reflect that.
|
||||||
|
Files on the original backend are not deleted automatically.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from pillar.api.file_storage import change_file_storage_backend
|
||||||
|
change_file_storage_backend(file_id, dest_backend)
|
Reference in New Issue
Block a user