From 401bfeea9812cc393a88a78297dd4a8853ced0f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 2 May 2016 10:33:23 +0200 Subject: [PATCH] File streaming to Google Cloud Storage Also simplifies some code since we're only going to support GCS. --- pillar/application/modules/encoding.py | 116 +++++- pillar/application/modules/file_storage.py | 439 ++++++++++++--------- pillar/application/utils/authorization.py | 3 + pillar/application/utils/encoding.py | 49 ++- pillar/application/utils/imaging.py | 62 ++- pillar/settings.py | 19 +- tests/common_test_data.py | 3 +- tests/test_encoding.py | 11 +- 8 files changed, 442 insertions(+), 260 deletions(-) diff --git a/pillar/application/modules/encoding.py b/pillar/application/modules/encoding.py index 3a44e622..cb23263a 100644 --- a/pillar/application/modules/encoding.py +++ b/pillar/application/modules/encoding.py @@ -12,8 +12,40 @@ encoding = Blueprint('encoding', __name__) log = logging.getLogger(__name__) +def size_descriptor(width, height): + """Returns the size descriptor (like '1080p') for the given width. + + >>> size_descriptor(720, 480) + '576p' + >>> size_descriptor(1920, 1080) + '1080p' + >>> size_descriptor(1920, 751) # 23:9 + '1080p' + """ + + widths = { + 720: '576p', + 640: '480p', + 1280: '720p', + 1920: '1080p', + 2048: '2k', + 4096: '4k', + } + + # If it is a known width, use it. Otherwise just return '{height}p' + if width in widths: + return widths[width] + + return '%ip' % height + + @encoding.route('/zencoder/notifications', methods=['POST']) def zencoder_notifications(): + """ + + See: https://app.zencoder.com/docs/guides/getting-started/notifications#api_version_2 + + """ if current_app.config['ENCODING_BACKEND'] != 'zencoder': log.warning('Received notification from Zencoder but app not configured for Zencoder.') return abort(403) @@ -34,32 +66,78 @@ def zencoder_notifications(): # Cast request data into a dict data = request.get_json() + + if log.isEnabledFor(logging.DEBUG): + from pprint import pformat + log.debug('Zencoder job JSON: %s', pformat(data)) + files_collection = current_app.data.driver.db['files'] # Find the file object based on processing backend and job_id - lookup = {'processing.backend': 'zencoder', 'processing.job_id': str(data['job']['id'])} - f = files_collection.find_one(lookup) - if not f: - log.warning('Unknown Zencoder job id %r', data['job']['id']) - return abort(404) + zencoder_job_id = data['job']['id'] + lookup = {'processing.backend': 'zencoder', + 'processing.job_id': str(zencoder_job_id)} + file_doc = files_collection.find_one(lookup) + if not file_doc: + log.warning('Unknown Zencoder job id %r', zencoder_job_id) + # Return 200 OK when debugging, or Zencoder will keep trying and trying and trying... + # which is what we want in production. + return "Not found, but that's okay.", 200 if current_app.config['DEBUG'] else 404 - file_id = f['_id'] + file_id = ObjectId(file_doc['_id']) # Remove internal keys (so that we can run put internal) - f = utils.remove_private_keys(f) + file_doc = utils.remove_private_keys(file_doc) # Update processing status - f['processing']['status'] = data['job']['state'] + job_state = data['job']['state'] + file_doc['processing']['status'] = job_state + + if job_state == 'failed': + log.warning('Zencoder job %i for file %s failed.', zencoder_job_id, file_id) + # Log what Zencoder told us went wrong. + for output in data['outputs']: + if not any('error' in key for key in output): + continue + log.warning('Errors for output %s:', output['url']) + for key in output: + if 'error' in key: + log.info(' %s: %s', key, output[key]) + + file_doc['status'] = 'failed' + put_internal('files', file_doc, _id=file_id) + return "You failed, but that's okay.", 200 + + log.info('Zencoder job %s for file %s completed with status %s.', zencoder_job_id, file_id, + job_state) + # For every variation encoded, try to update the file object for output in data['outputs']: - format = output['format'] + video_format = output['format'] # Change the zencoder 'mpeg4' format to 'mp4' used internally - format = 'mp4' if format == 'mpeg4' else format - # Find a variation matching format and resolution - variation = next((v for v in f['variations'] if v['format'] == format \ - and v['width'] == output['width']), None) - # If found, update with delivered file size - # TODO: calculate md5 on the storage - if variation: - variation['length'] = output['file_size_in_bytes'] + video_format = 'mp4' if video_format == 'mpeg4' else video_format - put_internal('files', f, _id=ObjectId(file_id)) - return '' + # Find a variation matching format and resolution + variation = next((v for v in file_doc['variations'] + if v['format'] == format and v['width'] == output['width']), None) + # Fall back to a variation matching just the format + if variation is None: + variation = next((v for v in file_doc['variations'] + if v['format'] == video_format), None) + if variation is None: + log.warning('Unable to find variation for video format %s for file %s', + video_format, file_id) + continue + + # TODO: calculate md5 on the storage + variation.update({ + 'height': output['height'], + 'width': output['width'], + 'length': output['file_size_in_bytes'], + 'duration': data['input']['duration_in_ms'] / 1000, + 'md5': output['md5_checksum'] or '', # they don't do MD5 for GCS... + 'size': size_descriptor(output['width'], output['height']), + }) + + file_doc['status'] = 'complete' + put_internal('files', file_doc, _id=file_id) + + return '', 204 diff --git a/pillar/application/modules/file_storage.py b/pillar/application/modules/file_storage.py index 58e2e328..73cffcef 100644 --- a/pillar/application/modules/file_storage.py +++ b/pillar/application/modules/file_storage.py @@ -1,7 +1,8 @@ import datetime import logging import os -from multiprocessing import Process +import tempfile +import multiprocessing from hashlib import md5 import bson.tz_util @@ -9,17 +10,20 @@ import eve.utils import pymongo from bson import ObjectId from eve.methods.patch import patch_internal +from eve.methods.post import post_internal from eve.methods.put import put_internal from flask import Blueprint, safe_join from flask import jsonify from flask import request -from flask import abort from flask import send_from_directory from flask import url_for, helpers from flask import current_app +from flask import g +from werkzeug.exceptions import UnprocessableEntity, NotFound, InternalServerError, Forbidden from application import utils from application.utils import remove_private_keys +from application.utils.authorization import require_login from application.utils.cdn import hash_file_path from application.utils.encoding import Encoder from application.utils.gcs import GoogleCloudStorageBucket @@ -60,66 +64,9 @@ def browse_gcs(bucket_name, subdir, file_path=None): return jsonify(listing) -# @file_storage.route('/build_thumbnails/') -def build_thumbnails(file_path=None, file_id=None): - """Given a file path or file ObjectId pointing to an image file, fetch it - and generate a set of predefined variations (using generate_local_thumbnails). - Return a list of dictionaries containing the various image properties and - variation properties. - """ - - files_collection = current_app.data.driver.db['files'] - if file_path: - # Search file with backend "pillar" and path=file_path - file_ = files_collection.find({"file_path": "{0}".format(file_path)}) - file_ = file_[0] - - if file_id: - file_ = files_collection.find_one({"_id": ObjectId(file_id)}) - file_path = file_['name'] - - file_full_path = safe_join(safe_join(current_app.config['SHARED_DIR'], file_path[:2]), - file_path) - # Does the original file exist? - if not os.path.isfile(file_full_path): - return "", 404 - else: - thumbnails = generate_local_thumbnails(file_full_path, - return_image_stats=True) - - file_variations = [] - for size, thumbnail in thumbnails.iteritems(): - if thumbnail.get('exists'): - # If a thumbnail was already made, we just continue - continue - basename = os.path.basename(thumbnail['file_path']) - root, ext = os.path.splitext(basename) - file_variation = dict( - size=size, - format=ext[1:], - width=thumbnail['width'], - height=thumbnail['height'], - content_type=thumbnail['content_type'], - length=thumbnail['length'], - md5=thumbnail['md5'], - file_path=basename, - ) - # XXX Inject is_public for size 't' (should be part of the upload), - # and currently we set it here and then on the fly during blob - # creation by simply parsing the extension of the filename. This is - # bad. - if size == 't': - file_variation['is_public'] = True - - file_variations.append(file_variation) - - return file_variations - - @file_storage.route('/file', methods=['POST']) @file_storage.route('/file/', methods=['GET', 'POST']) def index(file_name=None): - # GET file -> read it if request.method == 'GET': return send_from_directory(current_app.config['STORAGE_DIR'], file_name) @@ -149,124 +96,135 @@ def index(file_name=None): return jsonify({'url': url_for('file_storage.index', file_name=file_name)}) -def process_file(file_id, src_file): - """Process the file. +def _process_image(gcs, file_id, local_file, src_file): + from PIL import Image + + im = Image.open(local_file) + res = im.size + src_file['width'] = res[0] + src_file['height'] = res[1] + + # Generate previews + log.info('Generating thumbnails for file %s', file_id) + src_file['variations'] = generate_local_thumbnails(src_file['name'], + local_file.name) + + # Send those previews to Google Cloud Storage. + log.info('Uploading %i thumbnails for file %s to Google Cloud Storage (GCS)', + len(src_file['variations']), file_id) + + # TODO: parallelize this at some point. + for variation in src_file['variations']: + fname = variation['file_path'] + log.debug(' - Sending thumbnail %s to GCS', fname) + blob = gcs.bucket.blob('_/' + fname, chunk_size=256 * 1024 * 2) + blob.upload_from_filename(variation['local_path'], + content_type=variation['content_type']) + + try: + os.unlink(variation['local_path']) + except OSError: + log.warning('Unable to unlink %s, ignoring this but it will need cleanup later.', + variation['local_path']) + + del variation['local_path'] + + log.info('Done processing file %s', file_id) + src_file['status'] = 'complete' + + +def _process_video(gcs, file_id, local_file, src_file): + """Video is processed by Zencoder; the file isn't even stored locally.""" + + log.info('Processing video for file %s', file_id) + + # Create variations + root, _ = os.path.splitext(src_file['filename']) + src_file['variations'] = [] + + for v in ('mp4', 'webm'): + # Most of these properties will be available after encode. + file_variation = dict( + format=v, + content_type='video/{}'.format(v), + file_path='{}.{}'.format(root, v), + size='', + duration=0, + width=0, + height=0, + length=0, + md5='', + ) + # Append file variation + src_file['variations'].append(file_variation) + + j = Encoder.job_create(src_file) + if j is None: + log.warning('_process_video: unable to create encoder job for file %s.', file_id) + return + + log.info('Created asynchronous Zencoder job %s for file %s', j['process_id'], file_id) + + # Add the processing status to the file object + src_file['processing'] = { + 'status': 'pending', + 'job_id': str(j['process_id']), + 'backend': j['backend']} + + +def process_file(gcs, file_id, local_file): + """Process the file by creating thumbnails, sending to Zencoder, etc. :param file_id: '_id' key of the file - :param src_file: POSTed data of the file, lacks private properties. + :type file_id: ObjectId or str + :param local_file: locally stored file, or None if no local processing is needed. + :type local_file: file """ + file_id = ObjectId(file_id) + + # Fetch the src_file document from MongoDB. + files = current_app.data.driver.db['files'] + src_file = files.find_one(file_id) + if not src_file: + log.warning('process_file(%s): no such file document found, ignoring.') + return src_file = utils.remove_private_keys(src_file) - filename = src_file['name'] - file_abs_path = safe_join(safe_join(current_app.config['SHARED_DIR'], filename[:2]), filename) + # Update the 'format' field from the content type. + # TODO: overrule the content type based on file extention & magic numbers. + mime_category, src_file['format'] = src_file['content_type'].split('/', 1) - if not os.path.exists(file_abs_path): - log.warning("POSTed file document %r refers to non-existant file on file system %s!", - file_id, file_abs_path) - abort(422, "POSTed file document refers to non-existant file on file system!") + # Run the required processor, based on the MIME category. + processors = { + 'image': _process_image, + 'video': _process_video, + } - src_file['length'] = os.stat(file_abs_path).st_size - content_type = src_file['content_type'].split('/') - src_file['format'] = content_type[1] - mime_type = content_type[0] - src_file['file_path'] = filename - - if mime_type == 'image': - from PIL import Image - im = Image.open(file_abs_path) - res = im.size - src_file['width'] = res[0] - src_file['height'] = res[1] - # Generate previews - src_file['variations'] = build_thumbnails(file_id=file_id) - elif mime_type == 'video': - pass - # Generate variations - src_video_data = get_video_data(file_abs_path) - variations = { - 'mp4': None, - 'webm': None - } - if src_video_data['duration']: - src_file['duration'] = src_video_data['duration'] - - # Properly resize the video according to 720p and 1080p resolutions - if src_video_data['res_y'] < 1080: - res_y = 720 - elif src_video_data['res_y'] >= 1080: - res_y = 1080 - - # Add variations property to the file - src_file['variations'] = [] - # Create variations - for v in variations: - root, ext = os.path.splitext(filename) - filename = "{0}-{1}p.{2}".format(root, res_y, v) - video_duration = None - if src_video_data['duration']: - video_duration = src_video_data['duration'] - - file_variation = dict( - size="{0}p".format(res_y), - duration=video_duration, - format=v, - width=src_video_data['res_x'], - height=src_video_data['res_y'], - content_type="video/{0}".format(v), - length=0, # Available after encode - md5="", # Available after encode - file_path=filename, - ) - # Append file variation - src_file['variations'].append(file_variation) - - def encode(src_path, src_file, res_y): - # For every variation in the list call video_encode - # print "encoding {0}".format(variations) - if current_app.config['ENCODING_BACKEND'] == 'zencoder': - # Move the source file in place on the remote storage (which can - # be accessed from zencoder) - push_to_storage(str(src_file['project']), src_path) - j = Encoder.job_create(src_file) - try: - if j: - src_file['processing'] = dict( - status='pending', - job_id="{0}".format(j['process_id']), - backend=j['backend']) - # Add the processing status to the file object - r = put_internal('files', - src_file, **{'_id': ObjectId(file_id)}) - pass - except KeyError: - pass - elif current_app.config['ENCODING_BACKEND'] == 'local': - for v in src_file['variations']: - path = ffmpeg_encode(src_path, v['format'], res_y) - # Update size data after encoding - v['length'] = os.stat(path).st_size - - r = put_internal('files', src_file, **{'_id': ObjectId(file_id)}) - # When all encodes are done, delete source file - sync_path = os.path.split(src_path)[0] - push_to_storage(str(src_file['project']), sync_path) - - p = Process(target=encode, args=(file_abs_path, src_file, res_y)) - p.start() + try: + processor = processors[mime_category] + except KeyError: + log.info("POSTed file %s was of type %r, which isn't thumbnailed/encoded.", file_id, + mime_category) + src_file['status'] = 'complete' else: - log.info("POSTed file was of type %r, which isn't thumbnailed/encoded.", mime_type) + log.debug('process_file(%s): marking file status as "processing"', file_id) + src_file['status'] = 'processing' + update_file_doc(file_id, status='processing') - if mime_type != 'video': - # Sync the whole subdir - sync_path = os.path.split(file_abs_path)[0] - # push_to_storage(str(src_file['project']), sync_path) - p = Process(target=push_to_storage, args=( - str(src_file['project']), sync_path)) - p.start() + try: + processor(gcs, file_id, local_file, src_file) + except Exception: + log.warning('process_file(%s): error when processing file, resetting status to ' + '"queued_for_processing"', file_id, exc_info=True) + update_file_doc(file_id, status='queued_for_processing') + return # Update the original file with additional info, e.g. image resolution - put_internal('files', src_file, _id=ObjectId(file_id)) + r, _, _, status = put_internal('files', src_file, _id=file_id) + if status not in (200, 201): + log.warning('process_file(%s): status %i when saving processed file info to MongoDB: %s', + file_id, status, r) def delete_file(file_item): @@ -383,20 +341,6 @@ def _generate_all_links(response, now): response['_updated'] = patch_resp['_updated'] -def post_POST_files(request, payload): - """After an file object has been created, we do the necessary processing - and further update it. - """ - - if 200 <= payload.status_code < 300: - import json - posted_properties = json.loads(request.data) - private_properties = json.loads(payload.data) - file_id = private_properties['_id'] - - process_file(file_id, posted_properties) - - def before_deleting_file(item): delete_file(item) @@ -460,7 +404,7 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds): {'$or': [{'backend': backend_name, 'link_expires': None}, {'backend': backend_name, 'link_expires': {'$lt': expire_before}}, {'backend': backend_name, 'link': None}] - }).sort([('link_expires', pymongo.ASCENDING)]).limit(chunk_size) + }).sort([('link_expires', pymongo.ASCENDING)]).limit(chunk_size) if to_refresh.count() == 0: log.info('No links to refresh.') @@ -473,9 +417,142 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds): log.info('Refreshed %i links', min(chunk_size, to_refresh.count())) +@require_login({u'subscriber', u'admin'}) +def create_file_doc(name, filename, content_type, length, project, backend='gcs', + **extra_fields): + """Creates a minimal File document for storage in MongoDB. + + Doesn't save it to MongoDB yet. + """ + + current_user = g.get('current_user') + + file_doc = {'name': name, + 'filename': filename, + 'file_path': '', + 'user': current_user['user_id'], + 'backend': backend, + 'md5': '', + 'content_type': content_type, + 'length': length, + 'project': project} + file_doc.update(extra_fields) + + return file_doc + + +@file_storage.route('/stream/', methods=['POST']) +def stream_to_gcs(project_id): + log.info('Streaming file to bucket for project %s', project_id) + + uploaded_file = request.files['file'] + + projects = current_app.data.driver.db['projects'] + project = projects.find_one(ObjectId(project_id), projection={'_id': 1}) + if not project: + raise NotFound('Project %s does not exist' % project_id) + + file_id, fname, status = create_file_doc_for_upload(project['_id'], uploaded_file) + + if uploaded_file.content_type.startswith('image/'): + # We need to do local thumbnailing, so we have to write the stream + # both to Google Cloud Storage and to local storage. + local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR']) + uploaded_file.save(local_file) + local_file.seek(0) # Make sure that a re-read starts from the beginning. + stream_for_gcs = local_file + else: + local_file = None + stream_for_gcs = uploaded_file.stream + + # Upload the file to GCS. + try: + gcs = GoogleCloudStorageBucket(project_id) + blob = gcs.bucket.blob('_/' + fname, chunk_size=256 * 1024 * 2) + blob.upload_from_file(stream_for_gcs, + content_type=uploaded_file.mimetype, + size=uploaded_file.content_length) + except Exception: + log.exception('Error uploading file to Google Cloud Storage (GCS),' + ' aborting handling of uploaded file (id=%s).', file_id) + update_file_doc(file_id, status='failed') + raise InternalServerError('Unable to stream file to Google Cloud Storage') + + # Reload the blob to get the file size according to Google. + blob.reload() + update_file_doc(file_id, + status='queued_for_processing', + file_path=fname, + length=blob.size) + + process_file(gcs, file_id, local_file) + + # Local processing is done, we can close the local file so it is removed. + if local_file is not None: + local_file.close() + + log.debug('Handled uploaded file id=%s, fname=%s, size=%i', file_id, fname, blob.size) + + # Status is 200 if the file already existed, and 201 if it was newly created. + return jsonify(status='ok', file_id=str(file_id)), status + + +def update_file_doc(file_id, **updates): + files = current_app.data.driver.db['files'] + res = files.update_one({'_id': ObjectId(file_id)}, + {'$set': updates}) + log.debug('update_file_doc(%s, %s): %i matched, %i updated.', + file_id, updates, res.matched_count, res.modified_count) + return res + + +def create_file_doc_for_upload(project_id, uploaded_file): + """Creates a secure filename and a document in MongoDB for the file. + + The (project_id, filename) tuple should be unique. If such a document already + exists, it is updated with the new file. + + :param uploaded_file: file from request.files['form-key'] + :type uploaded_file: werkzeug.datastructures.FileStorage + :returns: a tuple (file_id, filename, status), where 'filename' is the secured + name stored in file_doc['filename']. + """ + + project_id = ObjectId(project_id) + + # TODO: hash the filename with path info to get the internal name. + internal_filename = uploaded_file.filename + + # See if we can find a pre-existing file doc + files = current_app.data.driver.db['files'] + file_doc = files.find_one({'project': project_id, + 'name': internal_filename}) + + # TODO: at some point do name-based and content-based content-type sniffing. + new_props = {'filename': uploaded_file.filename, + 'content_type': uploaded_file.mimetype, + 'length': uploaded_file.content_length, + 'project': project_id, + 'status': 'uploading'} + + if file_doc is None: + # Create a file document on MongoDB for this file. + file_doc = create_file_doc(name=internal_filename, **new_props) + file_fields, _, _, status = post_internal('files', file_doc) + else: + file_doc.update(new_props) + file_fields, _, _, status = put_internal('files', remove_private_keys(file_doc)) + + if status not in (200, 201): + log.error('Unable to create new file document in MongoDB, status=%i: %s', + status, file_fields) + raise InternalServerError() + + return file_fields['_id'], uploaded_file.filename, status + + def setup_app(app, url_prefix): app.on_pre_GET_files += on_pre_get_files - app.on_post_POST_files += post_POST_files app.on_fetched_item_files += before_returning_file app.on_fetched_resource_files += before_returning_files diff --git a/pillar/application/utils/authorization.py b/pillar/application/utils/authorization.py index 99eff9dd..d03601f4 100644 --- a/pillar/application/utils/authorization.py +++ b/pillar/application/utils/authorization.py @@ -90,6 +90,9 @@ def require_login(require_roles=set()): Optionally only allows access to users with a certain role./ """ + if not isinstance(require_roles, set): + raise TypeError('require_roles param should be a set, but is a %r' % type(require_roles)) + def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): diff --git a/pillar/application/utils/encoding.py b/pillar/application/utils/encoding.py index 9522a6a5..e82e1cd4 100644 --- a/pillar/application/utils/encoding.py +++ b/pillar/application/utils/encoding.py @@ -1,3 +1,4 @@ +import logging import os from flask import current_app @@ -5,6 +6,8 @@ from zencoder import Zencoder from application import encoding_service_client +log = logging.getLogger(__name__) + class Encoder: """Generic Encoder wrapper. Provides a consistent API, independent from @@ -15,28 +18,34 @@ class Encoder: def job_create(src_file): """Create an encoding job. Return the backend used as well as an id. """ - if isinstance(encoding_service_client, Zencoder): - if src_file['backend'] == 'gcs': - # Build the specific GCS input url, assuming the file is stored - # in the _ subdirectory - storage_base = "gcs://{0}/_/".format(src_file['project']) - file_input = os.path.join(storage_base, src_file['file_path']) - outputs = [] - options = dict(notifications=current_app.config['ZENCODER_NOTIFICATIONS_URL']) - for v in src_file['variations']: - outputs.append({ - 'format': v['format'], - 'witdh': v['width'], - 'url': os.path.join(storage_base, v['file_path'])}) - r = encoding_service_client.job.create(file_input, outputs=outputs, - options=options) - if r.code == 201: - return dict(process_id=r.body['id'], backend='zencoder') - else: - return None - else: + + if not isinstance(encoding_service_client, Zencoder): + log.error('I can only work with Zencoder, not with %r', encoding_service_client) return None + if src_file['backend'] != 'gcs': + log.error("Unable to work with storage backend %r", src_file['backend']) + return None + + # Build the specific GCS input url, assuming the file is stored + # in the _ subdirectory + storage_base = "gcs://{0}/_/".format(src_file['project']) + file_input = os.path.join(storage_base, src_file['file_path']) + options = dict(notifications=current_app.config['ZENCODER_NOTIFICATIONS_URL']) + + outputs = [{'format': v['format'], + 'url': os.path.join(storage_base, v['file_path'])} + for v in src_file['variations']] + r = encoding_service_client.job.create(file_input, + outputs=outputs, + options=options) + if r.code != 201: + log.error('Error %i creating Zencoder job: %s', r.code, r.body) + return None + + return {'process_id': r.body['id'], + 'backend': 'zencoder'} + @staticmethod def job_progress(job_id): if isinstance(encoding_service_client, Zencoder): diff --git a/pillar/application/utils/imaging.py b/pillar/application/utils/imaging.py index beb8f56f..e0b10bcc 100644 --- a/pillar/application/utils/imaging.py +++ b/pillar/application/utils/imaging.py @@ -5,54 +5,50 @@ from PIL import Image from flask import current_app -def generate_local_thumbnails(src, return_image_stats=False): +def generate_local_thumbnails(name_base, src): """Given a source image, use Pillow to generate thumbnails according to the application settings. - args: - src: the path of the image to be thumbnailed - return_image_stats: if True, return a dict object which contains length, - resolution, format and path of the thumbnailed image + :param name_base: the thumbnail will get a field 'name': '{basename}-{thumbsize}.jpg' + :type name_base: str + :param src: the path of the image to be thumbnailed + :type src: str """ thumbnail_settings = current_app.config['UPLOADS_LOCAL_STORAGE_THUMBNAILS'] - thumbnails = {} + thumbnails = [] + + save_to_base, _ = os.path.splitext(src) + name_base, _ = os.path.splitext(name_base) + for size, settings in thumbnail_settings.iteritems(): - root, ext = os.path.splitext(src) - dst = "{0}-{1}{2}".format(root, size, '.jpg') - if os.path.isfile(dst): - # If the thumbnail already exists we require stats about it - if return_image_stats: - thumbnails[size] = dict(exists=True) - continue + dst = '{0}-{1}{2}'.format(save_to_base, size, '.jpg') + name = '{0}-{1}{2}'.format(name_base, size, '.jpg') + if settings['crop']: resize_and_crop(src, dst, settings['size']) + width, height = settings['size'] else: im = Image.open(src) im.thumbnail(settings['size']) im.save(dst, "JPEG") + width, height = im.size - if return_image_stats: - # Get file size - st = os.stat(dst) - length = st.st_size - # Get resolution - im = Image.open(dst) - width = im.size[0] - height = im.size[1] - format = im.format.lower() - # Get format - thumbnails[size] = dict( - file_path=dst, # Full path, to be processed before storage - length=length, - width=width, - height=height, - md5='--', - content_type='image/' + format, - ) + thumb_info = {'size': size, + 'file_path': name, + 'local_path': dst, + 'length': os.stat(dst).st_size, + 'width': width, + 'height': height, + 'md5': '', + 'content_type': 'image/jpeg'} - if return_image_stats: - return thumbnails + if size == 't': + thumb_info['is_public'] = True + + thumbnails.append(thumb_info) + + return thumbnails def resize_and_crop(img_path, modified_path, size, crop_type='middle'): diff --git a/pillar/settings.py b/pillar/settings.py index 571a9f62..b020a8db 100644 --- a/pillar/settings.py +++ b/pillar/settings.py @@ -366,6 +366,7 @@ tokens_schema = { } files_schema = { + # Name of the file after processing, possibly hashed. 'name': { 'type': 'string', 'required': True, @@ -405,6 +406,8 @@ files_schema = { 'type': 'string', 'required': True, }, + + # Original filename as given by the user, possibly cleaned-up to make it safe. 'filename': { 'type': 'string', 'required': True, @@ -414,10 +417,14 @@ files_schema = { 'required': True, 'allowed': ["attract-web", "pillar", "cdnsun", "gcs", "unittest"] }, + + # Where the file is in the backend storage itself. In the case of GCS, + # it is relative to the /_ folder. In the other cases, it is relative + # to the root of that storage backend. required=False to allow creation + # before uploading to a storage, in case the final path is determined + # by that storage backend. 'file_path': { 'type': 'string', - #'required': True, - 'unique': True, }, 'link': { 'type': 'string', @@ -497,7 +504,13 @@ files_schema = { "failed", "cancelled"] }, } - } + }, + 'status': { + 'type': 'string', + 'allowed': ['uploading', 'queued_for_processing', 'processing', 'complete', 'failed'], + 'required': False, + 'default': 'complete', # default value for backward compatibility. + }, } groups_schema = { diff --git a/tests/common_test_data.py b/tests/common_test_data.py index aedf0d8a..bd41944c 100644 --- a/tests/common_test_data.py +++ b/tests/common_test_data.py @@ -23,7 +23,8 @@ EXAMPLE_FILE = {u'_id': ObjectId('5672e2c1c379cf0007b31995'), u'content_type': 'image/png', u'_etag': '044ce3aede2e123e261c0d8bd77212f264d4f7b0', u'_created': datetime.datetime(2015, 12, 17, 16, 28, 49, tzinfo=tz_util.utc), u'md5': '', - u'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4.png', u'backend': 'gcs', + u'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4.png', + u'backend': 'pillar', u'link': 'http://localhost:8002/file', u'link_expires': datetime.datetime(2016, 3, 22, 9, 28, 22, tzinfo=tz_util.utc)} diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 18e8a843..d1b3afd2 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -24,7 +24,7 @@ class ZencoderNotificationTest(AbstractPillarTest): data=json.dumps({'job': {'id': 'koro-007'}}), headers={'X-Zencoder-Notification-Secret': secret, 'Content-Type': 'application/json'}) - self.assertEqual(404, resp.status_code) + self.assertEqual(200, resp.status_code) def test_good_secret_existing_file(self): self.ensure_file_exists(file_overrides={ @@ -40,11 +40,16 @@ class ZencoderNotificationTest(AbstractPillarTest): 'state': 'done'}, 'outputs': [{ 'format': 'jpg', + 'height': 1080, 'width': 2048, 'file_size_in_bytes': 15, - }]}), + 'md5_checksum': None, + }], + 'input': { + 'duration_in_ms': 5000, + }}), headers={'X-Zencoder-Notification-Secret': secret, 'Content-Type': 'application/json'}) # TODO: check that the file in MongoDB is actually updated properly. - self.assertEqual(200, resp.status_code) + self.assertEqual(204, resp.status_code)