Support for file upload to Google Cloud Storage
All other backends remain available and are still fully compatible with the File storage infrastructure.
This commit is contained in:
@@ -73,10 +73,10 @@ def validate_token():
|
|||||||
current_user = {}
|
current_user = {}
|
||||||
|
|
||||||
token = request.authorization.username
|
token = request.authorization.username
|
||||||
tokens = app.data.driver.db['tokens']
|
tokens_collection = app.data.driver.db['tokens']
|
||||||
|
|
||||||
lookup = {'token': token, 'expire_time': {"$gt": datetime.now()}}
|
lookup = {'token': token, 'expire_time': {"$gt": datetime.now()}}
|
||||||
db_token = tokens.find_one(lookup)
|
db_token = tokens_collection.find_one(lookup)
|
||||||
if not db_token:
|
if not db_token:
|
||||||
# If no valid token is found, we issue a new request to the Blender ID
|
# If no valid token is found, we issue a new request to the Blender ID
|
||||||
# to verify the validity of the token. We will get basic user info if
|
# to verify the validity of the token. We will get basic user info if
|
||||||
@@ -99,7 +99,6 @@ def validate_token():
|
|||||||
'token': ''}]
|
'token': ''}]
|
||||||
}
|
}
|
||||||
r = post_internal('users', user_data)
|
r = post_internal('users', user_data)
|
||||||
print r
|
|
||||||
user_id = r[0]['_id']
|
user_id = r[0]['_id']
|
||||||
groups = None
|
groups = None
|
||||||
else:
|
else:
|
||||||
@@ -345,10 +344,15 @@ def post_POST_files(request, payload):
|
|||||||
app.on_post_POST_files += post_POST_files
|
app.on_post_POST_files += post_POST_files
|
||||||
|
|
||||||
from utils.cdn import hash_file_path
|
from utils.cdn import hash_file_path
|
||||||
|
from application.utils.gcs import GoogleCloudStorageBucket
|
||||||
# Hook to check the backend of a file resource, to build an appropriate link
|
# Hook to check the backend of a file resource, to build an appropriate link
|
||||||
# that can be used by the client to retrieve the actual file.
|
# that can be used by the client to retrieve the actual file.
|
||||||
def generate_link(backend, path):
|
def generate_link(backend, path, project_id=None):
|
||||||
if backend == 'pillar':
|
if backend == 'gcs':
|
||||||
|
storage = GoogleCloudStorageBucket(project_id)
|
||||||
|
blob = storage.Get(path)
|
||||||
|
link = blob['signed_url']
|
||||||
|
elif backend == 'pillar':
|
||||||
link = url_for('file_storage.index', file_name=path, _external=True)
|
link = url_for('file_storage.index', file_name=path, _external=True)
|
||||||
elif backend == 'cdnsun':
|
elif backend == 'cdnsun':
|
||||||
link = hash_file_path(path, None)
|
link = hash_file_path(path, None)
|
||||||
@@ -357,11 +361,15 @@ def generate_link(backend, path):
|
|||||||
return link
|
return link
|
||||||
|
|
||||||
def before_returning_file(response):
|
def before_returning_file(response):
|
||||||
response['link'] = generate_link(response['backend'], response['path'])
|
# TODO: add project id to all files
|
||||||
|
project_id = None if 'project' not in response else str(response['project'])
|
||||||
|
response['link'] = generate_link(response['backend'], response['path'], project_id)
|
||||||
|
|
||||||
def before_returning_files(response):
|
def before_returning_files(response):
|
||||||
for item in response['_items']:
|
for item in response['_items']:
|
||||||
item['link'] = generate_link(item['backend'], item['path'])
|
# TODO: add project id to all files
|
||||||
|
project_id = None if 'project' not in item else str(item['project'])
|
||||||
|
item['link'] = generate_link(item['backend'], item['path'], project_id)
|
||||||
|
|
||||||
|
|
||||||
app.on_fetched_item_files += before_returning_file
|
app.on_fetched_item_files += before_returning_file
|
||||||
|
@@ -13,6 +13,7 @@ from application.utils.imaging import generate_local_thumbnails
|
|||||||
from application.utils.imaging import get_video_data
|
from application.utils.imaging import get_video_data
|
||||||
from application.utils.imaging import ffmpeg_encode
|
from application.utils.imaging import ffmpeg_encode
|
||||||
from application.utils.storage import remote_storage_sync
|
from application.utils.storage import remote_storage_sync
|
||||||
|
from application.utils.storage import push_to_storage
|
||||||
from application.utils.gcs import GoogleCloudStorageBucket
|
from application.utils.gcs import GoogleCloudStorageBucket
|
||||||
|
|
||||||
file_storage = Blueprint('file_storage', __name__,
|
file_storage = Blueprint('file_storage', __name__,
|
||||||
@@ -45,7 +46,7 @@ def browse_gcs(bucket_name, subdir, file_path=None):
|
|||||||
return jsonify(listing)
|
return jsonify(listing)
|
||||||
|
|
||||||
|
|
||||||
@file_storage.route('/build_thumbnails/<path:file_path>')
|
#@file_storage.route('/build_thumbnails/<path:file_path>')
|
||||||
def build_thumbnails(file_path=None, file_id=None):
|
def build_thumbnails(file_path=None, file_id=None):
|
||||||
files_collection = app.data.driver.db['files']
|
files_collection = app.data.driver.db['files']
|
||||||
if file_path:
|
if file_path:
|
||||||
@@ -59,7 +60,7 @@ def build_thumbnails(file_path=None, file_id=None):
|
|||||||
|
|
||||||
user = file_['user']
|
user = file_['user']
|
||||||
|
|
||||||
file_full_path = os.path.join(app.config['STORAGE_DIR'], file_path)
|
file_full_path = os.path.join(app.config['SHARED_DIR'], file_path[:2], file_path)
|
||||||
# Does the original file exist?
|
# Does the original file exist?
|
||||||
if not os.path.isfile(file_full_path):
|
if not os.path.isfile(file_full_path):
|
||||||
return "", 404
|
return "", 404
|
||||||
@@ -73,7 +74,6 @@ def build_thumbnails(file_path=None, file_id=None):
|
|||||||
continue
|
continue
|
||||||
basename = os.path.basename(thumbnail['path'])
|
basename = os.path.basename(thumbnail['path'])
|
||||||
root, ext = os.path.splitext(basename)
|
root, ext = os.path.splitext(basename)
|
||||||
path = os.path.join(basename[:2], basename)
|
|
||||||
file_object = dict(
|
file_object = dict(
|
||||||
name=root,
|
name=root,
|
||||||
#description="Preview of file {0}".format(file_['name']),
|
#description="Preview of file {0}".format(file_['name']),
|
||||||
@@ -88,7 +88,8 @@ def build_thumbnails(file_path=None, file_id=None):
|
|||||||
md5=thumbnail['md5'],
|
md5=thumbnail['md5'],
|
||||||
filename=basename,
|
filename=basename,
|
||||||
backend=file_['backend'],
|
backend=file_['backend'],
|
||||||
path=path)
|
path=basename,
|
||||||
|
project=file_['project'])
|
||||||
# Commit to database
|
# Commit to database
|
||||||
r = post_item('files', file_object)
|
r = post_item('files', file_object)
|
||||||
if r[0]['_status'] == 'ERR':
|
if r[0]['_status'] == 'ERR':
|
||||||
@@ -122,7 +123,7 @@ def process_file(src_file):
|
|||||||
|
|
||||||
files_collection = app.data.driver.db['files']
|
files_collection = app.data.driver.db['files']
|
||||||
|
|
||||||
file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name'])
|
file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name'][:2], src_file['name'])
|
||||||
src_file['length'] = os.stat(file_abs_path).st_size
|
src_file['length'] = os.stat(file_abs_path).st_size
|
||||||
# Remove properties that do not belong in the collection
|
# Remove properties that do not belong in the collection
|
||||||
src_file.pop('_status', None)
|
src_file.pop('_status', None)
|
||||||
@@ -169,6 +170,7 @@ def process_file(src_file):
|
|||||||
file_object = dict(
|
file_object = dict(
|
||||||
name=os.path.split(filename)[1],
|
name=os.path.split(filename)[1],
|
||||||
#description="Preview of file {0}".format(file_['name']),
|
#description="Preview of file {0}".format(file_['name']),
|
||||||
|
project=src_file['project'],
|
||||||
user=src_file['user'],
|
user=src_file['user'],
|
||||||
parent=src_file['_id'],
|
parent=src_file['_id'],
|
||||||
size="{0}p".format(res_y),
|
size="{0}p".format(res_y),
|
||||||
@@ -204,7 +206,7 @@ def process_file(src_file):
|
|||||||
variation)
|
variation)
|
||||||
|
|
||||||
# rsync the file file (this is async)
|
# rsync the file file (this is async)
|
||||||
remote_storage_sync(path)
|
#remote_storage_sync(path)
|
||||||
# When all encodes are done, delete source file
|
# When all encodes are done, delete source file
|
||||||
|
|
||||||
|
|
||||||
@@ -215,8 +217,10 @@ def process_file(src_file):
|
|||||||
sync_path = os.path.split(file_abs_path)[0]
|
sync_path = os.path.split(file_abs_path)[0]
|
||||||
else:
|
else:
|
||||||
sync_path = file_abs_path
|
sync_path = file_abs_path
|
||||||
remote_storage_sync(sync_path)
|
#remote_storage_sync(sync_path)
|
||||||
|
push_to_storage(str(src_file['project']), sync_path)
|
||||||
|
|
||||||
|
# Update the original file with additional info, e.g. image resolution
|
||||||
file_asset = files_collection.find_and_modify(
|
file_asset = files_collection.find_and_modify(
|
||||||
{'_id': src_file['_id']},
|
{'_id': src_file['_id']},
|
||||||
src_file)
|
src_file)
|
||||||
|
@@ -21,8 +21,6 @@ class GoogleCloudStorageBucket(object):
|
|||||||
:param subdir: The local entrypoint to browse the bucket.
|
:param subdir: The local entrypoint to browse the bucket.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, bucket_name, subdir='_/'):
|
|
||||||
CGS_PROJECT_NAME = app.config['CGS_PROJECT_NAME']
|
CGS_PROJECT_NAME = app.config['CGS_PROJECT_NAME']
|
||||||
GCS_CLIENT_EMAIL = app.config['GCS_CLIENT_EMAIL']
|
GCS_CLIENT_EMAIL = app.config['GCS_CLIENT_EMAIL']
|
||||||
GCS_PRIVATE_KEY_PEM = app.config['GCS_PRIVATE_KEY_PEM']
|
GCS_PRIVATE_KEY_PEM = app.config['GCS_PRIVATE_KEY_PEM']
|
||||||
@@ -38,11 +36,13 @@ class GoogleCloudStorageBucket(object):
|
|||||||
# Load private key in p12 format (used by the singed urls generator)
|
# Load private key in p12 format (used by the singed urls generator)
|
||||||
with open(GCS_PRIVATE_KEY_P12) as f:
|
with open(GCS_PRIVATE_KEY_P12) as f:
|
||||||
private_key_pkcs12 = f.read()
|
private_key_pkcs12 = f.read()
|
||||||
self.credentials_p12 = SignedJwtAssertionCredentials(GCS_CLIENT_EMAIL,
|
credentials_p12 = SignedJwtAssertionCredentials(GCS_CLIENT_EMAIL,
|
||||||
private_key_pkcs12,
|
private_key_pkcs12,
|
||||||
'https://www.googleapis.com/auth/devstorage.read_write')
|
'https://www.googleapis.com/auth/devstorage.read_write')
|
||||||
|
|
||||||
gcs = Client(project=CGS_PROJECT_NAME, credentials=credentials_pem)
|
|
||||||
|
def __init__(self, bucket_name, subdir='_/'):
|
||||||
|
gcs = Client(project=self.CGS_PROJECT_NAME, credentials=self.credentials_pem)
|
||||||
self.bucket = gcs.get_bucket(bucket_name)
|
self.bucket = gcs.get_bucket(bucket_name)
|
||||||
self.subdir = subdir
|
self.subdir = subdir
|
||||||
|
|
||||||
@@ -89,6 +89,18 @@ class GoogleCloudStorageBucket(object):
|
|||||||
return list_dict
|
return list_dict
|
||||||
|
|
||||||
|
|
||||||
|
def blob_to_dict(self, blob):
|
||||||
|
blob.reload()
|
||||||
|
expiration = datetime.datetime.now() + datetime.timedelta(days=1)
|
||||||
|
expiration = int(time.mktime(expiration.timetuple()))
|
||||||
|
return dict(
|
||||||
|
updated=blob.updated,
|
||||||
|
name=os.path.basename(blob.name),
|
||||||
|
size=blob.size,
|
||||||
|
content_type=blob.content_type,
|
||||||
|
signed_url=blob.generate_signed_url(expiration, credentials=self.credentials_p12))
|
||||||
|
|
||||||
|
|
||||||
def Get(self, path):
|
def Get(self, path):
|
||||||
"""Get selected file info if the path matches.
|
"""Get selected file info if the path matches.
|
||||||
|
|
||||||
@@ -96,17 +108,17 @@ class GoogleCloudStorageBucket(object):
|
|||||||
:param path: The relative path to the file.
|
:param path: The relative path to the file.
|
||||||
"""
|
"""
|
||||||
path = os.path.join(self.subdir, path)
|
path = os.path.join(self.subdir, path)
|
||||||
f = self.bucket.blob(path)
|
blob = self.bucket.blob(path)
|
||||||
if f.exists():
|
if blob.exists():
|
||||||
f.reload()
|
return self.blob_to_dict(blob)
|
||||||
expiration = datetime.datetime.now() + datetime.timedelta(days=1)
|
|
||||||
expiration = int(time.mktime(expiration.timetuple()))
|
|
||||||
file_dict = dict(
|
|
||||||
updated=f.updated,
|
|
||||||
name=os.path.basename(f.name),
|
|
||||||
size=f.size,
|
|
||||||
content_type=f.content_type,
|
|
||||||
signed_url=f.generate_signed_url(expiration, credentials=self.credentials_p12))
|
|
||||||
return file_dict
|
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def Post(self, full_path, path=None):
|
||||||
|
"""Create new blob and upload data to it.
|
||||||
|
"""
|
||||||
|
path = path if path else os.path.join('_', os.path.basename(full_path))
|
||||||
|
blob = self.bucket.blob(path)
|
||||||
|
blob.upload_from_filename(full_path)
|
||||||
|
return self.blob_to_dict(blob)
|
||||||
|
@@ -2,6 +2,7 @@ import os
|
|||||||
import subprocess
|
import subprocess
|
||||||
#import logging
|
#import logging
|
||||||
from application import app
|
from application import app
|
||||||
|
from application.utils.gcs import GoogleCloudStorageBucket
|
||||||
|
|
||||||
BIN_FFPROBE = app.config['BIN_FFPROBE']
|
BIN_FFPROBE = app.config['BIN_FFPROBE']
|
||||||
BIN_FFMPEG = app.config['BIN_FFMPEG']
|
BIN_FFMPEG = app.config['BIN_FFMPEG']
|
||||||
@@ -55,3 +56,27 @@ def remote_storage_sync(path): #can be both folder and file
|
|||||||
else:
|
else:
|
||||||
raise IOError('ERROR: path not found')
|
raise IOError('ERROR: path not found')
|
||||||
|
|
||||||
|
|
||||||
|
def push_to_storage(project_id, full_path, backend='cgs'):
|
||||||
|
"""Move a file from temporary/processing local storage to a storage endpoint.
|
||||||
|
By default we store items in a Google Cloud Storage bucket named after the
|
||||||
|
project id.
|
||||||
|
"""
|
||||||
|
def push_single_file(project_id, full_path, backend):
|
||||||
|
if backend == 'cgs':
|
||||||
|
storage = GoogleCloudStorageBucket(project_id, subdir='_')
|
||||||
|
storage.Post(full_path)
|
||||||
|
os.remove(full_path)
|
||||||
|
|
||||||
|
if os.path.isfile(full_path):
|
||||||
|
push_single_file(project_id, full_path, backend)
|
||||||
|
else:
|
||||||
|
if os.path.exists(full_path):
|
||||||
|
for root, dirs, files in os.walk(full_path):
|
||||||
|
for name in files:
|
||||||
|
push_single_file(project_id, os.path.join(root, name), backend)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise IOError('ERROR: path not found')
|
||||||
|
|
||||||
|
pass
|
||||||
|
Reference in New Issue
Block a user