File streaming to Google Cloud Storage
Also simplifies some code since we're only going to support GCS.
This commit is contained in:
parent
7aefed22d4
commit
401bfeea98
@ -12,8 +12,40 @@ encoding = Blueprint('encoding', __name__)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def size_descriptor(width, height):
|
||||
"""Returns the size descriptor (like '1080p') for the given width.
|
||||
|
||||
>>> size_descriptor(720, 480)
|
||||
'576p'
|
||||
>>> size_descriptor(1920, 1080)
|
||||
'1080p'
|
||||
>>> size_descriptor(1920, 751) # 23:9
|
||||
'1080p'
|
||||
"""
|
||||
|
||||
widths = {
|
||||
720: '576p',
|
||||
640: '480p',
|
||||
1280: '720p',
|
||||
1920: '1080p',
|
||||
2048: '2k',
|
||||
4096: '4k',
|
||||
}
|
||||
|
||||
# If it is a known width, use it. Otherwise just return '{height}p'
|
||||
if width in widths:
|
||||
return widths[width]
|
||||
|
||||
return '%ip' % height
|
||||
|
||||
|
||||
@encoding.route('/zencoder/notifications', methods=['POST'])
|
||||
def zencoder_notifications():
|
||||
"""
|
||||
|
||||
See: https://app.zencoder.com/docs/guides/getting-started/notifications#api_version_2
|
||||
|
||||
"""
|
||||
if current_app.config['ENCODING_BACKEND'] != 'zencoder':
|
||||
log.warning('Received notification from Zencoder but app not configured for Zencoder.')
|
||||
return abort(403)
|
||||
@ -34,32 +66,78 @@ def zencoder_notifications():
|
||||
|
||||
# Cast request data into a dict
|
||||
data = request.get_json()
|
||||
|
||||
if log.isEnabledFor(logging.DEBUG):
|
||||
from pprint import pformat
|
||||
log.debug('Zencoder job JSON: %s', pformat(data))
|
||||
|
||||
files_collection = current_app.data.driver.db['files']
|
||||
# Find the file object based on processing backend and job_id
|
||||
lookup = {'processing.backend': 'zencoder', 'processing.job_id': str(data['job']['id'])}
|
||||
f = files_collection.find_one(lookup)
|
||||
if not f:
|
||||
log.warning('Unknown Zencoder job id %r', data['job']['id'])
|
||||
return abort(404)
|
||||
zencoder_job_id = data['job']['id']
|
||||
lookup = {'processing.backend': 'zencoder',
|
||||
'processing.job_id': str(zencoder_job_id)}
|
||||
file_doc = files_collection.find_one(lookup)
|
||||
if not file_doc:
|
||||
log.warning('Unknown Zencoder job id %r', zencoder_job_id)
|
||||
# Return 200 OK when debugging, or Zencoder will keep trying and trying and trying...
|
||||
# which is what we want in production.
|
||||
return "Not found, but that's okay.", 200 if current_app.config['DEBUG'] else 404
|
||||
|
||||
file_id = f['_id']
|
||||
file_id = ObjectId(file_doc['_id'])
|
||||
# Remove internal keys (so that we can run put internal)
|
||||
f = utils.remove_private_keys(f)
|
||||
file_doc = utils.remove_private_keys(file_doc)
|
||||
|
||||
# Update processing status
|
||||
f['processing']['status'] = data['job']['state']
|
||||
job_state = data['job']['state']
|
||||
file_doc['processing']['status'] = job_state
|
||||
|
||||
if job_state == 'failed':
|
||||
log.warning('Zencoder job %i for file %s failed.', zencoder_job_id, file_id)
|
||||
# Log what Zencoder told us went wrong.
|
||||
for output in data['outputs']:
|
||||
if not any('error' in key for key in output):
|
||||
continue
|
||||
log.warning('Errors for output %s:', output['url'])
|
||||
for key in output:
|
||||
if 'error' in key:
|
||||
log.info(' %s: %s', key, output[key])
|
||||
|
||||
file_doc['status'] = 'failed'
|
||||
put_internal('files', file_doc, _id=file_id)
|
||||
return "You failed, but that's okay.", 200
|
||||
|
||||
log.info('Zencoder job %s for file %s completed with status %s.', zencoder_job_id, file_id,
|
||||
job_state)
|
||||
|
||||
# For every variation encoded, try to update the file object
|
||||
for output in data['outputs']:
|
||||
format = output['format']
|
||||
video_format = output['format']
|
||||
# Change the zencoder 'mpeg4' format to 'mp4' used internally
|
||||
format = 'mp4' if format == 'mpeg4' else format
|
||||
# Find a variation matching format and resolution
|
||||
variation = next((v for v in f['variations'] if v['format'] == format \
|
||||
and v['width'] == output['width']), None)
|
||||
# If found, update with delivered file size
|
||||
# TODO: calculate md5 on the storage
|
||||
if variation:
|
||||
variation['length'] = output['file_size_in_bytes']
|
||||
video_format = 'mp4' if video_format == 'mpeg4' else video_format
|
||||
|
||||
put_internal('files', f, _id=ObjectId(file_id))
|
||||
return ''
|
||||
# Find a variation matching format and resolution
|
||||
variation = next((v for v in file_doc['variations']
|
||||
if v['format'] == format and v['width'] == output['width']), None)
|
||||
# Fall back to a variation matching just the format
|
||||
if variation is None:
|
||||
variation = next((v for v in file_doc['variations']
|
||||
if v['format'] == video_format), None)
|
||||
if variation is None:
|
||||
log.warning('Unable to find variation for video format %s for file %s',
|
||||
video_format, file_id)
|
||||
continue
|
||||
|
||||
# TODO: calculate md5 on the storage
|
||||
variation.update({
|
||||
'height': output['height'],
|
||||
'width': output['width'],
|
||||
'length': output['file_size_in_bytes'],
|
||||
'duration': data['input']['duration_in_ms'] / 1000,
|
||||
'md5': output['md5_checksum'] or '', # they don't do MD5 for GCS...
|
||||
'size': size_descriptor(output['width'], output['height']),
|
||||
})
|
||||
|
||||
file_doc['status'] = 'complete'
|
||||
put_internal('files', file_doc, _id=file_id)
|
||||
|
||||
return '', 204
|
||||
|
@ -1,7 +1,8 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from multiprocessing import Process
|
||||
import tempfile
|
||||
import multiprocessing
|
||||
from hashlib import md5
|
||||
|
||||
import bson.tz_util
|
||||
@ -9,17 +10,20 @@ import eve.utils
|
||||
import pymongo
|
||||
from bson import ObjectId
|
||||
from eve.methods.patch import patch_internal
|
||||
from eve.methods.post import post_internal
|
||||
from eve.methods.put import put_internal
|
||||
from flask import Blueprint, safe_join
|
||||
from flask import jsonify
|
||||
from flask import request
|
||||
from flask import abort
|
||||
from flask import send_from_directory
|
||||
from flask import url_for, helpers
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
from werkzeug.exceptions import UnprocessableEntity, NotFound, InternalServerError, Forbidden
|
||||
|
||||
from application import utils
|
||||
from application.utils import remove_private_keys
|
||||
from application.utils.authorization import require_login
|
||||
from application.utils.cdn import hash_file_path
|
||||
from application.utils.encoding import Encoder
|
||||
from application.utils.gcs import GoogleCloudStorageBucket
|
||||
@ -60,66 +64,9 @@ def browse_gcs(bucket_name, subdir, file_path=None):
|
||||
return jsonify(listing)
|
||||
|
||||
|
||||
# @file_storage.route('/build_thumbnails/<path:file_path>')
|
||||
def build_thumbnails(file_path=None, file_id=None):
|
||||
"""Given a file path or file ObjectId pointing to an image file, fetch it
|
||||
and generate a set of predefined variations (using generate_local_thumbnails).
|
||||
Return a list of dictionaries containing the various image properties and
|
||||
variation properties.
|
||||
"""
|
||||
|
||||
files_collection = current_app.data.driver.db['files']
|
||||
if file_path:
|
||||
# Search file with backend "pillar" and path=file_path
|
||||
file_ = files_collection.find({"file_path": "{0}".format(file_path)})
|
||||
file_ = file_[0]
|
||||
|
||||
if file_id:
|
||||
file_ = files_collection.find_one({"_id": ObjectId(file_id)})
|
||||
file_path = file_['name']
|
||||
|
||||
file_full_path = safe_join(safe_join(current_app.config['SHARED_DIR'], file_path[:2]),
|
||||
file_path)
|
||||
# Does the original file exist?
|
||||
if not os.path.isfile(file_full_path):
|
||||
return "", 404
|
||||
else:
|
||||
thumbnails = generate_local_thumbnails(file_full_path,
|
||||
return_image_stats=True)
|
||||
|
||||
file_variations = []
|
||||
for size, thumbnail in thumbnails.iteritems():
|
||||
if thumbnail.get('exists'):
|
||||
# If a thumbnail was already made, we just continue
|
||||
continue
|
||||
basename = os.path.basename(thumbnail['file_path'])
|
||||
root, ext = os.path.splitext(basename)
|
||||
file_variation = dict(
|
||||
size=size,
|
||||
format=ext[1:],
|
||||
width=thumbnail['width'],
|
||||
height=thumbnail['height'],
|
||||
content_type=thumbnail['content_type'],
|
||||
length=thumbnail['length'],
|
||||
md5=thumbnail['md5'],
|
||||
file_path=basename,
|
||||
)
|
||||
# XXX Inject is_public for size 't' (should be part of the upload),
|
||||
# and currently we set it here and then on the fly during blob
|
||||
# creation by simply parsing the extension of the filename. This is
|
||||
# bad.
|
||||
if size == 't':
|
||||
file_variation['is_public'] = True
|
||||
|
||||
file_variations.append(file_variation)
|
||||
|
||||
return file_variations
|
||||
|
||||
|
||||
@file_storage.route('/file', methods=['POST'])
|
||||
@file_storage.route('/file/<path:file_name>', methods=['GET', 'POST'])
|
||||
def index(file_name=None):
|
||||
|
||||
# GET file -> read it
|
||||
if request.method == 'GET':
|
||||
return send_from_directory(current_app.config['STORAGE_DIR'], file_name)
|
||||
@ -149,124 +96,135 @@ def index(file_name=None):
|
||||
return jsonify({'url': url_for('file_storage.index', file_name=file_name)})
|
||||
|
||||
|
||||
def process_file(file_id, src_file):
|
||||
"""Process the file.
|
||||
def _process_image(gcs, file_id, local_file, src_file):
|
||||
from PIL import Image
|
||||
|
||||
im = Image.open(local_file)
|
||||
res = im.size
|
||||
src_file['width'] = res[0]
|
||||
src_file['height'] = res[1]
|
||||
|
||||
# Generate previews
|
||||
log.info('Generating thumbnails for file %s', file_id)
|
||||
src_file['variations'] = generate_local_thumbnails(src_file['name'],
|
||||
local_file.name)
|
||||
|
||||
# Send those previews to Google Cloud Storage.
|
||||
log.info('Uploading %i thumbnails for file %s to Google Cloud Storage (GCS)',
|
||||
len(src_file['variations']), file_id)
|
||||
|
||||
# TODO: parallelize this at some point.
|
||||
for variation in src_file['variations']:
|
||||
fname = variation['file_path']
|
||||
log.debug(' - Sending thumbnail %s to GCS', fname)
|
||||
blob = gcs.bucket.blob('_/' + fname, chunk_size=256 * 1024 * 2)
|
||||
blob.upload_from_filename(variation['local_path'],
|
||||
content_type=variation['content_type'])
|
||||
|
||||
try:
|
||||
os.unlink(variation['local_path'])
|
||||
except OSError:
|
||||
log.warning('Unable to unlink %s, ignoring this but it will need cleanup later.',
|
||||
variation['local_path'])
|
||||
|
||||
del variation['local_path']
|
||||
|
||||
log.info('Done processing file %s', file_id)
|
||||
src_file['status'] = 'complete'
|
||||
|
||||
|
||||
def _process_video(gcs, file_id, local_file, src_file):
|
||||
"""Video is processed by Zencoder; the file isn't even stored locally."""
|
||||
|
||||
log.info('Processing video for file %s', file_id)
|
||||
|
||||
# Create variations
|
||||
root, _ = os.path.splitext(src_file['filename'])
|
||||
src_file['variations'] = []
|
||||
|
||||
for v in ('mp4', 'webm'):
|
||||
# Most of these properties will be available after encode.
|
||||
file_variation = dict(
|
||||
format=v,
|
||||
content_type='video/{}'.format(v),
|
||||
file_path='{}.{}'.format(root, v),
|
||||
size='',
|
||||
duration=0,
|
||||
width=0,
|
||||
height=0,
|
||||
length=0,
|
||||
md5='',
|
||||
)
|
||||
# Append file variation
|
||||
src_file['variations'].append(file_variation)
|
||||
|
||||
j = Encoder.job_create(src_file)
|
||||
if j is None:
|
||||
log.warning('_process_video: unable to create encoder job for file %s.', file_id)
|
||||
return
|
||||
|
||||
log.info('Created asynchronous Zencoder job %s for file %s', j['process_id'], file_id)
|
||||
|
||||
# Add the processing status to the file object
|
||||
src_file['processing'] = {
|
||||
'status': 'pending',
|
||||
'job_id': str(j['process_id']),
|
||||
'backend': j['backend']}
|
||||
|
||||
|
||||
def process_file(gcs, file_id, local_file):
|
||||
"""Process the file by creating thumbnails, sending to Zencoder, etc.
|
||||
|
||||
:param file_id: '_id' key of the file
|
||||
:param src_file: POSTed data of the file, lacks private properties.
|
||||
:type file_id: ObjectId or str
|
||||
:param local_file: locally stored file, or None if no local processing is needed.
|
||||
:type local_file: file
|
||||
"""
|
||||
|
||||
file_id = ObjectId(file_id)
|
||||
|
||||
# Fetch the src_file document from MongoDB.
|
||||
files = current_app.data.driver.db['files']
|
||||
src_file = files.find_one(file_id)
|
||||
if not src_file:
|
||||
log.warning('process_file(%s): no such file document found, ignoring.')
|
||||
return
|
||||
src_file = utils.remove_private_keys(src_file)
|
||||
|
||||
filename = src_file['name']
|
||||
file_abs_path = safe_join(safe_join(current_app.config['SHARED_DIR'], filename[:2]), filename)
|
||||
# Update the 'format' field from the content type.
|
||||
# TODO: overrule the content type based on file extention & magic numbers.
|
||||
mime_category, src_file['format'] = src_file['content_type'].split('/', 1)
|
||||
|
||||
if not os.path.exists(file_abs_path):
|
||||
log.warning("POSTed file document %r refers to non-existant file on file system %s!",
|
||||
file_id, file_abs_path)
|
||||
abort(422, "POSTed file document refers to non-existant file on file system!")
|
||||
# Run the required processor, based on the MIME category.
|
||||
processors = {
|
||||
'image': _process_image,
|
||||
'video': _process_video,
|
||||
}
|
||||
|
||||
src_file['length'] = os.stat(file_abs_path).st_size
|
||||
content_type = src_file['content_type'].split('/')
|
||||
src_file['format'] = content_type[1]
|
||||
mime_type = content_type[0]
|
||||
src_file['file_path'] = filename
|
||||
|
||||
if mime_type == 'image':
|
||||
from PIL import Image
|
||||
im = Image.open(file_abs_path)
|
||||
res = im.size
|
||||
src_file['width'] = res[0]
|
||||
src_file['height'] = res[1]
|
||||
# Generate previews
|
||||
src_file['variations'] = build_thumbnails(file_id=file_id)
|
||||
elif mime_type == 'video':
|
||||
pass
|
||||
# Generate variations
|
||||
src_video_data = get_video_data(file_abs_path)
|
||||
variations = {
|
||||
'mp4': None,
|
||||
'webm': None
|
||||
}
|
||||
if src_video_data['duration']:
|
||||
src_file['duration'] = src_video_data['duration']
|
||||
|
||||
# Properly resize the video according to 720p and 1080p resolutions
|
||||
if src_video_data['res_y'] < 1080:
|
||||
res_y = 720
|
||||
elif src_video_data['res_y'] >= 1080:
|
||||
res_y = 1080
|
||||
|
||||
# Add variations property to the file
|
||||
src_file['variations'] = []
|
||||
# Create variations
|
||||
for v in variations:
|
||||
root, ext = os.path.splitext(filename)
|
||||
filename = "{0}-{1}p.{2}".format(root, res_y, v)
|
||||
video_duration = None
|
||||
if src_video_data['duration']:
|
||||
video_duration = src_video_data['duration']
|
||||
|
||||
file_variation = dict(
|
||||
size="{0}p".format(res_y),
|
||||
duration=video_duration,
|
||||
format=v,
|
||||
width=src_video_data['res_x'],
|
||||
height=src_video_data['res_y'],
|
||||
content_type="video/{0}".format(v),
|
||||
length=0, # Available after encode
|
||||
md5="", # Available after encode
|
||||
file_path=filename,
|
||||
)
|
||||
# Append file variation
|
||||
src_file['variations'].append(file_variation)
|
||||
|
||||
def encode(src_path, src_file, res_y):
|
||||
# For every variation in the list call video_encode
|
||||
# print "encoding {0}".format(variations)
|
||||
if current_app.config['ENCODING_BACKEND'] == 'zencoder':
|
||||
# Move the source file in place on the remote storage (which can
|
||||
# be accessed from zencoder)
|
||||
push_to_storage(str(src_file['project']), src_path)
|
||||
j = Encoder.job_create(src_file)
|
||||
try:
|
||||
if j:
|
||||
src_file['processing'] = dict(
|
||||
status='pending',
|
||||
job_id="{0}".format(j['process_id']),
|
||||
backend=j['backend'])
|
||||
# Add the processing status to the file object
|
||||
r = put_internal('files',
|
||||
src_file, **{'_id': ObjectId(file_id)})
|
||||
pass
|
||||
except KeyError:
|
||||
pass
|
||||
elif current_app.config['ENCODING_BACKEND'] == 'local':
|
||||
for v in src_file['variations']:
|
||||
path = ffmpeg_encode(src_path, v['format'], res_y)
|
||||
# Update size data after encoding
|
||||
v['length'] = os.stat(path).st_size
|
||||
|
||||
r = put_internal('files', src_file, **{'_id': ObjectId(file_id)})
|
||||
# When all encodes are done, delete source file
|
||||
sync_path = os.path.split(src_path)[0]
|
||||
push_to_storage(str(src_file['project']), sync_path)
|
||||
|
||||
p = Process(target=encode, args=(file_abs_path, src_file, res_y))
|
||||
p.start()
|
||||
try:
|
||||
processor = processors[mime_category]
|
||||
except KeyError:
|
||||
log.info("POSTed file %s was of type %r, which isn't thumbnailed/encoded.", file_id,
|
||||
mime_category)
|
||||
src_file['status'] = 'complete'
|
||||
else:
|
||||
log.info("POSTed file was of type %r, which isn't thumbnailed/encoded.", mime_type)
|
||||
log.debug('process_file(%s): marking file status as "processing"', file_id)
|
||||
src_file['status'] = 'processing'
|
||||
update_file_doc(file_id, status='processing')
|
||||
|
||||
if mime_type != 'video':
|
||||
# Sync the whole subdir
|
||||
sync_path = os.path.split(file_abs_path)[0]
|
||||
# push_to_storage(str(src_file['project']), sync_path)
|
||||
p = Process(target=push_to_storage, args=(
|
||||
str(src_file['project']), sync_path))
|
||||
p.start()
|
||||
try:
|
||||
processor(gcs, file_id, local_file, src_file)
|
||||
except Exception:
|
||||
log.warning('process_file(%s): error when processing file, resetting status to '
|
||||
'"queued_for_processing"', file_id, exc_info=True)
|
||||
update_file_doc(file_id, status='queued_for_processing')
|
||||
return
|
||||
|
||||
# Update the original file with additional info, e.g. image resolution
|
||||
put_internal('files', src_file, _id=ObjectId(file_id))
|
||||
r, _, _, status = put_internal('files', src_file, _id=file_id)
|
||||
if status not in (200, 201):
|
||||
log.warning('process_file(%s): status %i when saving processed file info to MongoDB: %s',
|
||||
file_id, status, r)
|
||||
|
||||
|
||||
def delete_file(file_item):
|
||||
@ -383,20 +341,6 @@ def _generate_all_links(response, now):
|
||||
response['_updated'] = patch_resp['_updated']
|
||||
|
||||
|
||||
def post_POST_files(request, payload):
|
||||
"""After an file object has been created, we do the necessary processing
|
||||
and further update it.
|
||||
"""
|
||||
|
||||
if 200 <= payload.status_code < 300:
|
||||
import json
|
||||
posted_properties = json.loads(request.data)
|
||||
private_properties = json.loads(payload.data)
|
||||
file_id = private_properties['_id']
|
||||
|
||||
process_file(file_id, posted_properties)
|
||||
|
||||
|
||||
def before_deleting_file(item):
|
||||
delete_file(item)
|
||||
|
||||
@ -460,7 +404,7 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds):
|
||||
{'$or': [{'backend': backend_name, 'link_expires': None},
|
||||
{'backend': backend_name, 'link_expires': {'$lt': expire_before}},
|
||||
{'backend': backend_name, 'link': None}]
|
||||
}).sort([('link_expires', pymongo.ASCENDING)]).limit(chunk_size)
|
||||
}).sort([('link_expires', pymongo.ASCENDING)]).limit(chunk_size)
|
||||
|
||||
if to_refresh.count() == 0:
|
||||
log.info('No links to refresh.')
|
||||
@ -473,9 +417,142 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds):
|
||||
log.info('Refreshed %i links', min(chunk_size, to_refresh.count()))
|
||||
|
||||
|
||||
@require_login({u'subscriber', u'admin'})
|
||||
def create_file_doc(name, filename, content_type, length, project, backend='gcs',
|
||||
**extra_fields):
|
||||
"""Creates a minimal File document for storage in MongoDB.
|
||||
|
||||
Doesn't save it to MongoDB yet.
|
||||
"""
|
||||
|
||||
current_user = g.get('current_user')
|
||||
|
||||
file_doc = {'name': name,
|
||||
'filename': filename,
|
||||
'file_path': '',
|
||||
'user': current_user['user_id'],
|
||||
'backend': backend,
|
||||
'md5': '',
|
||||
'content_type': content_type,
|
||||
'length': length,
|
||||
'project': project}
|
||||
file_doc.update(extra_fields)
|
||||
|
||||
return file_doc
|
||||
|
||||
|
||||
@file_storage.route('/stream/<string:project_id>', methods=['POST'])
|
||||
def stream_to_gcs(project_id):
|
||||
log.info('Streaming file to bucket for project %s', project_id)
|
||||
|
||||
uploaded_file = request.files['file']
|
||||
|
||||
projects = current_app.data.driver.db['projects']
|
||||
project = projects.find_one(ObjectId(project_id), projection={'_id': 1})
|
||||
if not project:
|
||||
raise NotFound('Project %s does not exist' % project_id)
|
||||
|
||||
file_id, fname, status = create_file_doc_for_upload(project['_id'], uploaded_file)
|
||||
|
||||
if uploaded_file.content_type.startswith('image/'):
|
||||
# We need to do local thumbnailing, so we have to write the stream
|
||||
# both to Google Cloud Storage and to local storage.
|
||||
local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR'])
|
||||
uploaded_file.save(local_file)
|
||||
local_file.seek(0) # Make sure that a re-read starts from the beginning.
|
||||
stream_for_gcs = local_file
|
||||
else:
|
||||
local_file = None
|
||||
stream_for_gcs = uploaded_file.stream
|
||||
|
||||
# Upload the file to GCS.
|
||||
try:
|
||||
gcs = GoogleCloudStorageBucket(project_id)
|
||||
blob = gcs.bucket.blob('_/' + fname, chunk_size=256 * 1024 * 2)
|
||||
blob.upload_from_file(stream_for_gcs,
|
||||
content_type=uploaded_file.mimetype,
|
||||
size=uploaded_file.content_length)
|
||||
except Exception:
|
||||
log.exception('Error uploading file to Google Cloud Storage (GCS),'
|
||||
' aborting handling of uploaded file (id=%s).', file_id)
|
||||
update_file_doc(file_id, status='failed')
|
||||
raise InternalServerError('Unable to stream file to Google Cloud Storage')
|
||||
|
||||
# Reload the blob to get the file size according to Google.
|
||||
blob.reload()
|
||||
update_file_doc(file_id,
|
||||
status='queued_for_processing',
|
||||
file_path=fname,
|
||||
length=blob.size)
|
||||
|
||||
process_file(gcs, file_id, local_file)
|
||||
|
||||
# Local processing is done, we can close the local file so it is removed.
|
||||
if local_file is not None:
|
||||
local_file.close()
|
||||
|
||||
log.debug('Handled uploaded file id=%s, fname=%s, size=%i', file_id, fname, blob.size)
|
||||
|
||||
# Status is 200 if the file already existed, and 201 if it was newly created.
|
||||
return jsonify(status='ok', file_id=str(file_id)), status
|
||||
|
||||
|
||||
def update_file_doc(file_id, **updates):
|
||||
files = current_app.data.driver.db['files']
|
||||
res = files.update_one({'_id': ObjectId(file_id)},
|
||||
{'$set': updates})
|
||||
log.debug('update_file_doc(%s, %s): %i matched, %i updated.',
|
||||
file_id, updates, res.matched_count, res.modified_count)
|
||||
return res
|
||||
|
||||
|
||||
def create_file_doc_for_upload(project_id, uploaded_file):
|
||||
"""Creates a secure filename and a document in MongoDB for the file.
|
||||
|
||||
The (project_id, filename) tuple should be unique. If such a document already
|
||||
exists, it is updated with the new file.
|
||||
|
||||
:param uploaded_file: file from request.files['form-key']
|
||||
:type uploaded_file: werkzeug.datastructures.FileStorage
|
||||
:returns: a tuple (file_id, filename, status), where 'filename' is the secured
|
||||
name stored in file_doc['filename'].
|
||||
"""
|
||||
|
||||
project_id = ObjectId(project_id)
|
||||
|
||||
# TODO: hash the filename with path info to get the internal name.
|
||||
internal_filename = uploaded_file.filename
|
||||
|
||||
# See if we can find a pre-existing file doc
|
||||
files = current_app.data.driver.db['files']
|
||||
file_doc = files.find_one({'project': project_id,
|
||||
'name': internal_filename})
|
||||
|
||||
# TODO: at some point do name-based and content-based content-type sniffing.
|
||||
new_props = {'filename': uploaded_file.filename,
|
||||
'content_type': uploaded_file.mimetype,
|
||||
'length': uploaded_file.content_length,
|
||||
'project': project_id,
|
||||
'status': 'uploading'}
|
||||
|
||||
if file_doc is None:
|
||||
# Create a file document on MongoDB for this file.
|
||||
file_doc = create_file_doc(name=internal_filename, **new_props)
|
||||
file_fields, _, _, status = post_internal('files', file_doc)
|
||||
else:
|
||||
file_doc.update(new_props)
|
||||
file_fields, _, _, status = put_internal('files', remove_private_keys(file_doc))
|
||||
|
||||
if status not in (200, 201):
|
||||
log.error('Unable to create new file document in MongoDB, status=%i: %s',
|
||||
status, file_fields)
|
||||
raise InternalServerError()
|
||||
|
||||
return file_fields['_id'], uploaded_file.filename, status
|
||||
|
||||
|
||||
def setup_app(app, url_prefix):
|
||||
app.on_pre_GET_files += on_pre_get_files
|
||||
app.on_post_POST_files += post_POST_files
|
||||
|
||||
app.on_fetched_item_files += before_returning_file
|
||||
app.on_fetched_resource_files += before_returning_files
|
||||
|
@ -90,6 +90,9 @@ def require_login(require_roles=set()):
|
||||
Optionally only allows access to users with a certain role./
|
||||
"""
|
||||
|
||||
if not isinstance(require_roles, set):
|
||||
raise TypeError('require_roles param should be a set, but is a %r' % type(require_roles))
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
|
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
from flask import current_app
|
||||
@ -5,6 +6,8 @@ from zencoder import Zencoder
|
||||
|
||||
from application import encoding_service_client
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Encoder:
|
||||
"""Generic Encoder wrapper. Provides a consistent API, independent from
|
||||
@ -15,28 +18,34 @@ class Encoder:
|
||||
def job_create(src_file):
|
||||
"""Create an encoding job. Return the backend used as well as an id.
|
||||
"""
|
||||
if isinstance(encoding_service_client, Zencoder):
|
||||
if src_file['backend'] == 'gcs':
|
||||
# Build the specific GCS input url, assuming the file is stored
|
||||
# in the _ subdirectory
|
||||
storage_base = "gcs://{0}/_/".format(src_file['project'])
|
||||
file_input = os.path.join(storage_base, src_file['file_path'])
|
||||
outputs = []
|
||||
options = dict(notifications=current_app.config['ZENCODER_NOTIFICATIONS_URL'])
|
||||
for v in src_file['variations']:
|
||||
outputs.append({
|
||||
'format': v['format'],
|
||||
'witdh': v['width'],
|
||||
'url': os.path.join(storage_base, v['file_path'])})
|
||||
r = encoding_service_client.job.create(file_input, outputs=outputs,
|
||||
options=options)
|
||||
if r.code == 201:
|
||||
return dict(process_id=r.body['id'], backend='zencoder')
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
|
||||
if not isinstance(encoding_service_client, Zencoder):
|
||||
log.error('I can only work with Zencoder, not with %r', encoding_service_client)
|
||||
return None
|
||||
|
||||
if src_file['backend'] != 'gcs':
|
||||
log.error("Unable to work with storage backend %r", src_file['backend'])
|
||||
return None
|
||||
|
||||
# Build the specific GCS input url, assuming the file is stored
|
||||
# in the _ subdirectory
|
||||
storage_base = "gcs://{0}/_/".format(src_file['project'])
|
||||
file_input = os.path.join(storage_base, src_file['file_path'])
|
||||
options = dict(notifications=current_app.config['ZENCODER_NOTIFICATIONS_URL'])
|
||||
|
||||
outputs = [{'format': v['format'],
|
||||
'url': os.path.join(storage_base, v['file_path'])}
|
||||
for v in src_file['variations']]
|
||||
r = encoding_service_client.job.create(file_input,
|
||||
outputs=outputs,
|
||||
options=options)
|
||||
if r.code != 201:
|
||||
log.error('Error %i creating Zencoder job: %s', r.code, r.body)
|
||||
return None
|
||||
|
||||
return {'process_id': r.body['id'],
|
||||
'backend': 'zencoder'}
|
||||
|
||||
@staticmethod
|
||||
def job_progress(job_id):
|
||||
if isinstance(encoding_service_client, Zencoder):
|
||||
|
@ -5,54 +5,50 @@ from PIL import Image
|
||||
from flask import current_app
|
||||
|
||||
|
||||
def generate_local_thumbnails(src, return_image_stats=False):
|
||||
def generate_local_thumbnails(name_base, src):
|
||||
"""Given a source image, use Pillow to generate thumbnails according to the
|
||||
application settings.
|
||||
|
||||
args:
|
||||
src: the path of the image to be thumbnailed
|
||||
return_image_stats: if True, return a dict object which contains length,
|
||||
resolution, format and path of the thumbnailed image
|
||||
:param name_base: the thumbnail will get a field 'name': '{basename}-{thumbsize}.jpg'
|
||||
:type name_base: str
|
||||
:param src: the path of the image to be thumbnailed
|
||||
:type src: str
|
||||
"""
|
||||
|
||||
thumbnail_settings = current_app.config['UPLOADS_LOCAL_STORAGE_THUMBNAILS']
|
||||
thumbnails = {}
|
||||
thumbnails = []
|
||||
|
||||
save_to_base, _ = os.path.splitext(src)
|
||||
name_base, _ = os.path.splitext(name_base)
|
||||
|
||||
for size, settings in thumbnail_settings.iteritems():
|
||||
root, ext = os.path.splitext(src)
|
||||
dst = "{0}-{1}{2}".format(root, size, '.jpg')
|
||||
if os.path.isfile(dst):
|
||||
# If the thumbnail already exists we require stats about it
|
||||
if return_image_stats:
|
||||
thumbnails[size] = dict(exists=True)
|
||||
continue
|
||||
dst = '{0}-{1}{2}'.format(save_to_base, size, '.jpg')
|
||||
name = '{0}-{1}{2}'.format(name_base, size, '.jpg')
|
||||
|
||||
if settings['crop']:
|
||||
resize_and_crop(src, dst, settings['size'])
|
||||
width, height = settings['size']
|
||||
else:
|
||||
im = Image.open(src)
|
||||
im.thumbnail(settings['size'])
|
||||
im.save(dst, "JPEG")
|
||||
width, height = im.size
|
||||
|
||||
if return_image_stats:
|
||||
# Get file size
|
||||
st = os.stat(dst)
|
||||
length = st.st_size
|
||||
# Get resolution
|
||||
im = Image.open(dst)
|
||||
width = im.size[0]
|
||||
height = im.size[1]
|
||||
format = im.format.lower()
|
||||
# Get format
|
||||
thumbnails[size] = dict(
|
||||
file_path=dst, # Full path, to be processed before storage
|
||||
length=length,
|
||||
width=width,
|
||||
height=height,
|
||||
md5='--',
|
||||
content_type='image/' + format,
|
||||
)
|
||||
thumb_info = {'size': size,
|
||||
'file_path': name,
|
||||
'local_path': dst,
|
||||
'length': os.stat(dst).st_size,
|
||||
'width': width,
|
||||
'height': height,
|
||||
'md5': '',
|
||||
'content_type': 'image/jpeg'}
|
||||
|
||||
if return_image_stats:
|
||||
return thumbnails
|
||||
if size == 't':
|
||||
thumb_info['is_public'] = True
|
||||
|
||||
thumbnails.append(thumb_info)
|
||||
|
||||
return thumbnails
|
||||
|
||||
|
||||
def resize_and_crop(img_path, modified_path, size, crop_type='middle'):
|
||||
|
@ -366,6 +366,7 @@ tokens_schema = {
|
||||
}
|
||||
|
||||
files_schema = {
|
||||
# Name of the file after processing, possibly hashed.
|
||||
'name': {
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
@ -405,6 +406,8 @@ files_schema = {
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
},
|
||||
|
||||
# Original filename as given by the user, possibly cleaned-up to make it safe.
|
||||
'filename': {
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
@ -414,10 +417,14 @@ files_schema = {
|
||||
'required': True,
|
||||
'allowed': ["attract-web", "pillar", "cdnsun", "gcs", "unittest"]
|
||||
},
|
||||
|
||||
# Where the file is in the backend storage itself. In the case of GCS,
|
||||
# it is relative to the /_ folder. In the other cases, it is relative
|
||||
# to the root of that storage backend. required=False to allow creation
|
||||
# before uploading to a storage, in case the final path is determined
|
||||
# by that storage backend.
|
||||
'file_path': {
|
||||
'type': 'string',
|
||||
#'required': True,
|
||||
'unique': True,
|
||||
},
|
||||
'link': {
|
||||
'type': 'string',
|
||||
@ -497,7 +504,13 @@ files_schema = {
|
||||
"failed", "cancelled"]
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
'status': {
|
||||
'type': 'string',
|
||||
'allowed': ['uploading', 'queued_for_processing', 'processing', 'complete', 'failed'],
|
||||
'required': False,
|
||||
'default': 'complete', # default value for backward compatibility.
|
||||
},
|
||||
}
|
||||
|
||||
groups_schema = {
|
||||
|
@ -23,7 +23,8 @@ EXAMPLE_FILE = {u'_id': ObjectId('5672e2c1c379cf0007b31995'),
|
||||
u'content_type': 'image/png', u'_etag': '044ce3aede2e123e261c0d8bd77212f264d4f7b0',
|
||||
u'_created': datetime.datetime(2015, 12, 17, 16, 28, 49, tzinfo=tz_util.utc),
|
||||
u'md5': '',
|
||||
u'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4.png', u'backend': 'gcs',
|
||||
u'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4.png',
|
||||
u'backend': 'pillar',
|
||||
u'link': 'http://localhost:8002/file',
|
||||
u'link_expires': datetime.datetime(2016, 3, 22, 9, 28, 22, tzinfo=tz_util.utc)}
|
||||
|
||||
|
@ -24,7 +24,7 @@ class ZencoderNotificationTest(AbstractPillarTest):
|
||||
data=json.dumps({'job': {'id': 'koro-007'}}),
|
||||
headers={'X-Zencoder-Notification-Secret': secret,
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(404, resp.status_code)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_good_secret_existing_file(self):
|
||||
self.ensure_file_exists(file_overrides={
|
||||
@ -40,11 +40,16 @@ class ZencoderNotificationTest(AbstractPillarTest):
|
||||
'state': 'done'},
|
||||
'outputs': [{
|
||||
'format': 'jpg',
|
||||
'height': 1080,
|
||||
'width': 2048,
|
||||
'file_size_in_bytes': 15,
|
||||
}]}),
|
||||
'md5_checksum': None,
|
||||
}],
|
||||
'input': {
|
||||
'duration_in_ms': 5000,
|
||||
}}),
|
||||
headers={'X-Zencoder-Notification-Secret': secret,
|
||||
'Content-Type': 'application/json'})
|
||||
|
||||
# TODO: check that the file in MongoDB is actually updated properly.
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
Loading…
x
Reference in New Issue
Block a user