WIP introducing STORAGE_BACKEND

We introduce two new classes StorageBackend and FileInStorage, which
are subclassed by CGS and local Pillar. This makes supporting multiple
storage solutions easier.
This commit is contained in:
2016-11-07 02:18:23 +01:00
parent 4570b4637b
commit aecab0561e
4 changed files with 108 additions and 101 deletions

View File

@@ -26,7 +26,9 @@ from pillar.api.utils.authorization import require_login, user_has_role, \
user_matches_roles user_matches_roles
from pillar.api.utils.cdn import hash_file_path from pillar.api.utils.cdn import hash_file_path
from pillar.api.utils.encoding import Encoder from pillar.api.utils.encoding import Encoder
from pillar.api.utils.gcs import GoogleCloudStorageBucket from pillar.api.utils.gcs import GoogleCloudStorageBucket, \
GoogleCloudStorageBlob
from pillar.api.utils.storage import PillarStorage, PillarStorageFile
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -307,14 +309,17 @@ def generate_link(backend, file_path, project_id=None, is_public=False):
storage = GoogleCloudStorageBucket(project_id) storage = GoogleCloudStorageBucket(project_id)
blob = storage.Get(file_path) blob = storage.Get(file_path)
if blob is None: if blob is None:
log.warning('generate_link(%r, %r): unable to find blob for file path,' log.warning('generate_link(%r, %r): unable to find blob for file'
' returning empty link.', backend, file_path) ' path, returning empty link.', backend, file_path)
return '' return ''
if is_public: if is_public:
return blob['public_url'] return blob['public_url']
return blob['signed_url'] return blob['signed_url']
if backend == 'local':
f = PillarStorageFile(project_id, file_path)
return url_for('file_storage.index', file_name=f.partial_path,
_external=True, _scheme=current_app.config['SCHEME'])
if backend == 'pillar': if backend == 'pillar':
return url_for('file_storage.index', file_name=file_path, return url_for('file_storage.index', file_name=file_path,
_external=True, _scheme=current_app.config['SCHEME']) _external=True, _scheme=current_app.config['SCHEME'])
@@ -323,7 +328,8 @@ def generate_link(backend, file_path, project_id=None, is_public=False):
if backend == 'unittest': if backend == 'unittest':
return 'https://unit.test/%s' % md5(file_path).hexdigest() return 'https://unit.test/%s' % md5(file_path).hexdigest()
log.warning('generate_link(): Unknown backend %r, returning empty string as new link.', log.warning('generate_link(): Unknown backend %r, returning empty string '
'as new link.',
backend) backend)
return '' return ''
@@ -641,10 +647,10 @@ def stream_to_storage(project_id):
log.info('request.headers[Origin] = %r', request.headers.get('Origin')) log.info('request.headers[Origin] = %r', request.headers.get('Origin'))
log.info('request.content_length = %r', request.content_length) log.info('request.content_length = %r', request.content_length)
# Try a check for the content length before we access request.files[]. This allows us # Try a check for the content length before we access request.files[].
# to abort the upload early. The entire body content length is always a bit larger than # This allows us to abort the upload early. The entire body content length
# the actual file size, so if we accept here, we're sure it'll be accepted in subsequent # is always a bit larger than the actual file size, so if we accept here,
# checks as well. # we're sure it'll be accepted in subsequent checks as well.
if request.content_length: if request.content_length:
assert_file_size_allowed(request.content_length) assert_file_size_allowed(request.content_length)
@@ -659,15 +665,17 @@ def stream_to_storage(project_id):
override_content_type(uploaded_file) override_content_type(uploaded_file)
if not uploaded_file.content_type: if not uploaded_file.content_type:
log.warning('File uploaded to project %s without content type.', project_oid) log.warning('File uploaded to project %s without content type.',
project_oid)
raise wz_exceptions.BadRequest('Missing content type.') raise wz_exceptions.BadRequest('Missing content type.')
if uploaded_file.content_type.startswith('image/'): if uploaded_file.content_type.startswith('image/'):
# We need to do local thumbnailing, so we have to write the stream # We need to do local thumbnailing, so we have to write the stream
# both to Google Cloud Storage and to local storage. # both to Google Cloud Storage and to local storage.
local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR']) local_file = tempfile.NamedTemporaryFile(
dir=current_app.config['STORAGE_DIR'])
uploaded_file.save(local_file) uploaded_file.save(local_file)
local_file.seek(0) # Make sure that a re-read starts from the beginning. local_file.seek(0) # Make sure that re-read starts from the beginning.
stream_for_gcs = local_file stream_for_gcs = local_file
else: else:
local_file = None local_file = None
@@ -688,37 +696,43 @@ def stream_to_storage(project_id):
# Create file document in MongoDB. # Create file document in MongoDB.
file_id, internal_fname, status = create_file_doc_for_upload(project_oid, file_id, internal_fname, status = create_file_doc_for_upload(project_oid,
uploaded_file) uploaded_file)
storage_backend = None
file_in_storage = None
if current_app.config['TESTING']: if current_app.config['TESTING']:
log.warning('NOT streaming to GCS because TESTING=%r', log.warning('NOT streaming to GCS because TESTING=%r',
current_app.config['TESTING']) current_app.config['TESTING'])
# Fake a Blob object. # Fake a Blob object.
gcs = None file_in_storage = type('Blob', (), {'size': file_size})
blob = type('Blob', (), {'size': file_size})
else: else:
blob, gcs = stream_to_gcs(file_id, file_size, internal_fname, if current_app.config['STORAGE_BACKEND'] == 'gcs':
project_id, stream_for_gcs, file_in_storage, storage_backend = stream_to_gcs(
uploaded_file.mimetype) file_id, file_size, internal_fname, project_id, stream_for_gcs,
uploaded_file.mimetype)
elif current_app.config['STORAGE_BACKEND'] == 'local':
storage_backend = PillarStorage(project_id)
file_in_storage = PillarStorageFile(project_id, internal_fname)
file_in_storage.create_from_file(uploaded_file, file_size)
log.debug('Marking uploaded file id=%s, fname=%s, ' log.debug('Marking uploaded file id=%s, fname=%s, '
'size=%i as "queued_for_processing"', 'size=%i as "queued_for_processing"',
file_id, internal_fname, blob.size) file_id, internal_fname, file_size)
update_file_doc(file_id, update_file_doc(file_id,
status='queued_for_processing', status='queued_for_processing',
file_path=internal_fname, file_path=internal_fname,
length=blob.size, length=file_in_storage.size,
content_type=uploaded_file.mimetype) content_type=uploaded_file.mimetype)
log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id,
internal_fname, blob.size) internal_fname, file_in_storage.size)
process_file(gcs, file_id, local_file) process_file(storage_backend, file_id, local_file)
# Local processing is done, we can close the local file so it is removed. # Local processing is done, we can close the local file so it is removed.
if local_file is not None: if local_file is not None:
local_file.close() local_file.close()
log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i', log.debug('Handled uploaded file id=%s, fname=%s, size=%i, status=%i',
file_id, internal_fname, blob.size, status) file_id, internal_fname, file_in_storage.size, status)
# Status is 200 if the file already existed, and 201 if it was newly # Status is 200 if the file already existed, and 201 if it was newly
# created. # created.
@@ -740,9 +754,9 @@ def stream_to_gcs(file_id, file_size, internal_fname, project_id,
transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400 transfer.RESUMABLE_UPLOAD_THRESHOLD = 102400
try: try:
gcs = GoogleCloudStorageBucket(project_id) gcs = GoogleCloudStorageBucket(project_id)
blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2) file_in_storage = GoogleCloudStorageBlob(gcs, internal_fname)
blob.upload_from_file(stream_for_gcs, size=file_size, file_in_storage.blob.upload_from_file(stream_for_gcs, size=file_size,
content_type=content_type) content_type=content_type)
except Exception: except Exception:
log.exception('Error uploading file to Google Cloud Storage (GCS),' log.exception('Error uploading file to Google Cloud Storage (GCS),'
' aborting handling of uploaded file (id=%s).', file_id) ' aborting handling of uploaded file (id=%s).', file_id)
@@ -751,8 +765,8 @@ def stream_to_gcs(file_id, file_size, internal_fname, project_id,
'Unable to stream file to Google Cloud Storage') 'Unable to stream file to Google Cloud Storage')
# Reload the blob to get the file size according to Google. # Reload the blob to get the file size according to Google.
blob.reload() file_in_storage.blob.reload()
return blob, gcs return file_in_storage, gcs
def add_access_control_headers(resp): def add_access_control_headers(resp):

View File

@@ -9,6 +9,8 @@ from gcloud.exceptions import NotFound
from flask import current_app, g from flask import current_app, g
from werkzeug.local import LocalProxy from werkzeug.local import LocalProxy
from pillar.api.utils.storage import StorageBackend, FileInStorage
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -32,7 +34,7 @@ def get_client():
gcs = LocalProxy(get_client) gcs = LocalProxy(get_client)
class GoogleCloudStorageBucket(object): class GoogleCloudStorageBucket(StorageBackend):
"""Cloud Storage bucket interface. We create a bucket for every project. In """Cloud Storage bucket interface. We create a bucket for every project. In
the bucket we create first level subdirs as follows: the bucket we create first level subdirs as follows:
- '_' (will contain hashed assets, and stays on top of default listing) - '_' (will contain hashed assets, and stays on top of default listing)
@@ -49,6 +51,7 @@ class GoogleCloudStorageBucket(object):
""" """
def __init__(self, bucket_name, subdir='_/'): def __init__(self, bucket_name, subdir='_/'):
super(GoogleCloudStorageBucket, self).__init__(backend='cgs')
try: try:
self.bucket = gcs.get_bucket(bucket_name) self.bucket = gcs.get_bucket(bucket_name)
except NotFound: except NotFound:
@@ -178,6 +181,21 @@ class GoogleCloudStorageBucket(object):
assert isinstance(to_bucket, GoogleCloudStorageBucket) assert isinstance(to_bucket, GoogleCloudStorageBucket)
return self.bucket.copy_blob(blob, to_bucket.bucket) return self.bucket.copy_blob(blob, to_bucket.bucket)
def get_blob(self, internal_fname, chunk_size=256 * 1024 * 2):
return self.bucket.blob('_/' + internal_fname, chunk_size)
class GoogleCloudStorageBlob(FileInStorage):
"""GCS blob interface."""
def __init__(self, bucket, internal_fname):
super(GoogleCloudStorageBlob, self).__init__(backend='cgs')
self.blob = bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2)
self.size = self.get_size()
def get_size(self):
return self.blob.size
def update_file_name(node): def update_file_name(node):
"""Assign to the CGS blob the same name of the asset node. This way when """Assign to the CGS blob the same name of the asset node. This way when

View File

@@ -1,83 +1,56 @@
import subprocess """Utility for managing storage backends and files."""
import logging
import os import os
from flask import current_app from flask import current_app
from pillar.api.utils.gcs import GoogleCloudStorageBucket
log = logging.getLogger(__name__)
def get_sizedata(filepath): class StorageBackend(object):
outdata = dict( """Can be a GCS bucket or simply a project folder in Pillar
size=int(os.stat(filepath).st_size)
)
return outdata
:type backend: string
:param backend: Name of the storage backend (gcs, pillar, cdnsun).
def rsync(path, remote_dir=''):
BIN_SSH = current_app.config['BIN_SSH']
BIN_RSYNC = current_app.config['BIN_RSYNC']
DRY_RUN = False
arguments = ['--verbose', '--ignore-existing', '--recursive', '--human-readable']
logs_path = current_app.config['CDN_SYNC_LOGS']
storage_address = current_app.config['CDN_STORAGE_ADDRESS']
user = current_app.config['CDN_STORAGE_USER']
rsa_key_path = current_app.config['CDN_RSA_KEY']
known_hosts_path = current_app.config['CDN_KNOWN_HOSTS']
if DRY_RUN:
arguments.append('--dry-run')
folder_arguments = list(arguments)
if rsa_key_path:
folder_arguments.append(
'-e ' + BIN_SSH + ' -i ' + rsa_key_path + ' -o "StrictHostKeyChecking=no"')
# if known_hosts_path:
# folder_arguments.append("-o UserKnownHostsFile " + known_hosts_path)
folder_arguments.append("--log-file=" + logs_path + "/rsync.log")
folder_arguments.append(path)
folder_arguments.append(user + "@" + storage_address + ":/public/" + remote_dir)
# print (folder_arguments)
devnull = open(os.devnull, 'wb')
# DEBUG CONFIG
# print folder_arguments
# proc = subprocess.Popen(['rsync'] + folder_arguments)
# stdout, stderr = proc.communicate()
subprocess.Popen(['nohup', BIN_RSYNC] + folder_arguments, stdout=devnull, stderr=devnull)
def remote_storage_sync(path): # can be both folder and file
if os.path.isfile(path):
filename = os.path.split(path)[1]
rsync(path, filename[:2] + '/')
else:
if os.path.exists(path):
rsync(path)
else:
raise IOError('ERROR: path not found')
def push_to_storage(project_id, full_path, backend='cgs'):
"""Move a file from temporary/processing local storage to a storage endpoint.
By default we store items in a Google Cloud Storage bucket named after the
project id.
""" """
def push_single_file(project_id, full_path, backend): def __init__(self, backend):
if backend == 'cgs': self.backend = backend
storage = GoogleCloudStorageBucket(project_id, subdir='_')
blob = storage.Post(full_path)
# XXX Make public on the fly if it's an image and small preview. class FileInStorage(object):
# This should happen by reading the database (push to storage """A wrapper for file or blob objects.
# should change to accomodate it).
if blob is not None and full_path.endswith('-t.jpg'): :type backend: string
blob.make_public() :param backend: Name of the storage backend (gcs, pillar, cdnsun).
os.remove(full_path)
"""
def __init__(self, backend):
self.backend = backend
self.path = None
self.size = None
class PillarStorage(StorageBackend):
def __init__(self, project_id):
super(PillarStorage, self).__init__(backend='local')
class PillarStorageFile(FileInStorage):
def __init__(self, project_id, internal_fname):
super(PillarStorageFile, self).__init__(backend='local')
self.size = None
self.partial_path = os.path.join(project_id[:2], project_id,
internal_fname[:2], internal_fname)
self.path = os.path.join(
current_app.config['STORAGE_DIR'], self.partial_path)
def create_from_file(self, uploaded_file, file_size):
# Ensure path exists before saving
os.makedirs(os.path.basename(self.path))
uploaded_file.save(self.path)
self.size = file_size
if os.path.isfile(full_path):
push_single_file(project_id, full_path, backend)
else:
if os.path.exists(full_path):
for root, dirs, files in os.walk(full_path):
for name in files:
push_single_file(project_id, os.path.join(root, name), backend)
else:
raise IOError('ERROR: path not found')

View File

@@ -71,6 +71,8 @@ ZENCODER_NOTIFICATIONS_URL = 'http://zencoderfetcher/'
ENCODING_BACKEND = 'zencoder' # local, flamenco ENCODING_BACKEND = 'zencoder' # local, flamenco
STORAGE_BACKEND = 'local' # gcs
# Validity period of links, per file storage backend. Expressed in seconds. # Validity period of links, per file storage backend. Expressed in seconds.
# Shouldn't be more than a year, as this isn't supported by HTTP/1.1. # Shouldn't be more than a year, as this isn't supported by HTTP/1.1.
FILE_LINK_VALIDITY = defaultdict( FILE_LINK_VALIDITY = defaultdict(