From 3308751ed4af93db3309ea70401e5f8a1ae106f8 Mon Sep 17 00:00:00 2001 From: Francesco Siddi Date: Mon, 22 Feb 2016 16:48:53 +0100 Subject: [PATCH] Introducing external video encoding It is now possible to specify an encoding backend (at the moment only zencoder) to take care of video variations encoding. Files transfer happens directly on CGS (although any storage backend can be supported). New requirements is the Zencoder Python library. --- pillar/application/__init__.py | 11 ++++ pillar/application/modules/encoding.py | 57 +++++++++++++++++++ .../modules/file_storage/__init__.py | 45 +++++++++++---- pillar/application/utils/encoding.py | 43 ++++++++++++++ pillar/application/utils/imaging.py | 1 - pillar/application/utils/storage.py | 1 - pillar/settings.py | 40 +++++-------- requirements.txt | 1 + 8 files changed, 159 insertions(+), 40 deletions(-) create mode 100644 pillar/application/modules/encoding.py create mode 100644 pillar/application/utils/encoding.py diff --git a/pillar/application/__init__.py b/pillar/application/__init__.py index 3f880115..a040fa38 100644 --- a/pillar/application/__init__.py +++ b/pillar/application/__init__.py @@ -5,11 +5,13 @@ from datetime import datetime import bugsnag from bugsnag.flask import handle_exceptions from algoliasearch import algoliasearch +from zencoder import Zencoder from flask import g from flask import request from flask import url_for from flask import abort from eve import Eve + from eve.auth import TokenAuth from eve.io.mongo import Validator @@ -105,6 +107,12 @@ if 'ALGOLIA_USER' in app.config: else: algolia_index_users = None +# Encoding backend +if app.config['ENCODING_BACKEND'] == 'zencoder': + encoding_service_client = Zencoder(app.config['ZENCODER_API_KEY']) +else: + encoding_service_client = None + from application.utils.authentication import validate_token from application.utils.authorization import check_permissions from application.utils.cdn import hash_file_path @@ -302,3 +310,6 @@ app.on_delete_item_files += before_deleting_file from modules.file_storage import file_storage #from modules.file_storage.serve import * app.register_blueprint(file_storage, url_prefix='/storage') +# The encoding module (receive notification and report progress) +from modules.encoding import encoding +app.register_blueprint(encoding, url_prefix='/encoding') diff --git a/pillar/application/modules/encoding.py b/pillar/application/modules/encoding.py new file mode 100644 index 00000000..a7714954 --- /dev/null +++ b/pillar/application/modules/encoding.py @@ -0,0 +1,57 @@ +from bson import ObjectId +from eve.methods.put import put_internal +from flask import Blueprint +from flask import abort +from flask import request +from application import app + +encoding = Blueprint('encoding', __name__) + +@encoding.route('/zencoder/notifications', methods=['POST']) +def zencoder_notifications(): + if app.config['ENCODING_BACKEND'] == 'zencoder': + if not app.config['DEBUG']: + # If we are in production, look for the Zencoder header secret + try: + notification_secret_request = request.headers[ + 'X-Zencoder-Notification-Secret'] + except KeyError: + return abort(401) + # If the header is found, check it agains the one in the config + notification_secret = app.config['ZENCODER_NOTIFICATIONS_SECRET'] + if notification_secret_request != notification_secret: + return abort(401) + # Cast request data into a dict + data = request.get_json() + files_collection = app.data.driver.db['files'] + # Find the file object based on processing backend and job_id + lookup = {'processing.backend': 'zencoder', 'processing.job_id': str( + data['job']['id'])} + f = files_collection.find_one(lookup) + if f: + file_id = f['_id'] + # Remove internal keys (so that we can run put internal) + internal_fields = ['_id', '_etag', '_updated', '_created', '_status'] + for field in internal_fields: + f.pop(field, None) + # Update processing status + f['processing']['status'] = data['job']['state'] + # For every variation encoded, try to update the file object + for output in data['outputs']: + format = output['format'] + # Change the zencoder 'mpeg4' format to 'mp4' used internally + format = 'mp4' if format == 'mpeg4' else format + # Find a variation matching format and resolution + variation = next((v for v in f['variations'] if v['format'] == format \ + and v['width'] == output['width']), None) + # If found, update with delivered file size + # TODO: calculate md5 on the storage + if variation: + variation['length'] = output['file_size_in_bytes'] + + r = put_internal('files', f, **{'_id': ObjectId(file_id)}) + return '' + else: + return abort(404) + else: + return abort(403) diff --git a/pillar/application/modules/file_storage/__init__.py b/pillar/application/modules/file_storage/__init__.py index 6c85d2fa..e71d23b4 100644 --- a/pillar/application/modules/file_storage/__init__.py +++ b/pillar/application/modules/file_storage/__init__.py @@ -15,6 +15,7 @@ from application.utils.imaging import ffmpeg_encode from application.utils.storage import remote_storage_sync from application.utils.storage import push_to_storage from application.utils.gcs import GoogleCloudStorageBucket +from application.utils.encoding import Encoder file_storage = Blueprint('file_storage', __name__, template_folder='templates', @@ -123,7 +124,8 @@ def process_file(src_file): src_file.pop(field, None) files_collection = app.data.driver.db['files'] - file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name'][:2], src_file['name']) + file_abs_path = os.path.join( + app.config['SHARED_DIR'], src_file['name'][:2], src_file['name']) src_file['length'] = os.stat(file_abs_path).st_size content_type = src_file['content_type'].split('/') @@ -180,26 +182,45 @@ def process_file(src_file): # Append file variation src_file['variations'].append(file_variation) - def encode(src, variations, res_y): + def encode(src_path, src_file, res_y): # For every variation in the list call video_encode # print "encoding {0}".format(variations) - for v in variations: - path = ffmpeg_encode(file_abs_path, v['format'], res_y) - # Update size data after encoding - v['length'] = os.stat(path).st_size + if app.config['ENCODING_BACKEND'] == 'zencoder': + # Move the source file in place on the remote storage (which can + # be accessed from zencoder) + push_to_storage(str(src_file['project']), src_path) + j = Encoder.job_create(src_file) + try: + if j: + src_file['processing'] = dict( + status='pending', + job_id="{0}".format(j['process_id']), + backend=j['backend']) + # Add the processing status to the file object + r = put_internal('files', + src_file, **{'_id': ObjectId(file_id)}) + pass + except KeyError: + pass + elif app.config['ENCODING_BACKEND'] == 'local': + for v in src_file['variations']: + path = ffmpeg_encode(src_path, v['format'], res_y) + # Update size data after encoding + v['length'] = os.stat(path).st_size - r = put_internal('files', src_file, **{'_id': ObjectId(file_id)}) - # When all encodes are done, delete source file - sync_path = os.path.split(file_abs_path)[0] - push_to_storage(str(src_file['project']), sync_path) + r = put_internal('files', src_file, **{'_id': ObjectId(file_id)}) + # When all encodes are done, delete source file + sync_path = os.path.split(src_path)[0] + push_to_storage(str(src_file['project']), sync_path) - p = Process(target=encode, args=(file_abs_path, src_file['variations'], res_y)) + p = Process(target=encode, args=(file_abs_path, src_file, res_y)) p.start() if mime_type != 'video': # Sync the whole subdir sync_path = os.path.split(file_abs_path)[0] # push_to_storage(str(src_file['project']), sync_path) - p = Process(target=push_to_storage, args=(str(src_file['project']), sync_path)) + p = Process(target=push_to_storage, args=( + str(src_file['project']), sync_path)) p.start() else: sync_path = file_abs_path diff --git a/pillar/application/utils/encoding.py b/pillar/application/utils/encoding.py new file mode 100644 index 00000000..2b5c1420 --- /dev/null +++ b/pillar/application/utils/encoding.py @@ -0,0 +1,43 @@ +import os +from zencoder import Zencoder +from application import encoding_service_client +from application import app + +class Encoder: + """Generic Encoder wrapper. Provides a consistent API, independent from + the encoding backend enabled. + """ + + @staticmethod + def job_create(src_file): + """Create an encoding job. Return the backend used as well as an id. + """ + if isinstance(encoding_service_client, Zencoder): + if src_file['backend'] == 'gcs': + # Build the specific GCS input url, assuming the file is stored + # in the _ subdirectory + storage_base = "gcs://{0}/_/".format(src_file['project']) + file_input = os.path.join(storage_base, src_file['file_path']) + outputs = [] + options = dict(notifications=app.config['ZENCODER_NOTIFICATIONS_URL']) + for v in src_file['variations']: + outputs.append({ + 'format': v['format'], + 'witdh': v['width'], + 'url': os.path.join(storage_base, v['file_path'])}) + r = encoding_service_client.job.create(file_input, outputs=outputs, + options=options) + if r.code == 201: + return dict(process_id=r.body['id'], backend='zencoder') + else: + return None + else: + return None + + @staticmethod + def job_progress(job_id): + if isinstance(encoding_service_client, Zencoder): + r = encoding_service_client.job.progress(int(job_id)) + return r.body + else: + return None diff --git a/pillar/application/utils/imaging.py b/pillar/application/utils/imaging.py index 73d1d747..c3885f17 100644 --- a/pillar/application/utils/imaging.py +++ b/pillar/application/utils/imaging.py @@ -204,4 +204,3 @@ def ffmpeg_encode(src, format, res_y=720): # return path of the encoded video return dst - diff --git a/pillar/application/utils/storage.py b/pillar/application/utils/storage.py index 9328394d..73621861 100644 --- a/pillar/application/utils/storage.py +++ b/pillar/application/utils/storage.py @@ -78,4 +78,3 @@ def push_to_storage(project_id, full_path, backend='cgs'): else: raise IOError('ERROR: path not found') - pass diff --git a/pillar/settings.py b/pillar/settings.py index 5d9b0a18..48322733 100644 --- a/pillar/settings.py +++ b/pillar/settings.py @@ -349,20 +349,6 @@ files_schema = { 'description': { 'type': 'string', }, - # If the object has a parent, it is a variation of its parent. When querying - # for a file we are going to check if the object does NOT have a parent. In - # this case we will query for all files with the ObjectID as parent and we - # will aggregate them according of the type (if it's an image we will use - # some prefix, if it's a video we will combine the contentType and a custom - # prefix, such as 720p) - 'parent': { - 'type': 'objectid', - 'data_relation': { - 'resource': 'files', - 'field': '_id', - 'embeddable': True - }, - }, 'content_type': { # MIME type image/png video/mp4 'type': 'string', 'required': True, @@ -459,20 +445,22 @@ files_schema = { } } }, - 'previews': { # Deprecated (see comments above) - 'type': 'list', + 'processing': { + 'type': 'dict', 'schema': { - 'type': 'objectid', - 'data_relation': { - 'resource': 'files', - 'field': '_id', - 'embeddable': True - } + 'job_id': { + 'type': 'string' # can be int, depending on the backend + }, + 'backend': { + 'type': 'string', + 'allowed': ["zencoder", "local"] + }, + 'status': { + 'type': 'string', + 'allowed': ["pending", "waiting", "processing", "finished", + "failed", "cancelled"] + }, } - }, - # Preview parameters: - 'is_preview': { # Deprecated - 'type': 'boolean' } } diff --git a/requirements.txt b/requirements.txt index 1e283414..8ed3795b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,3 +37,4 @@ six==1.9.0 WebOb==1.5.0 Werkzeug==0.10.4 wheel==0.24.0 +zencoder==0.6.5