diff --git a/pillar/application/__init__.py b/pillar/application/__init__.py index 3f880115..a040fa38 100644 --- a/pillar/application/__init__.py +++ b/pillar/application/__init__.py @@ -5,11 +5,13 @@ from datetime import datetime import bugsnag from bugsnag.flask import handle_exceptions from algoliasearch import algoliasearch +from zencoder import Zencoder from flask import g from flask import request from flask import url_for from flask import abort from eve import Eve + from eve.auth import TokenAuth from eve.io.mongo import Validator @@ -105,6 +107,12 @@ if 'ALGOLIA_USER' in app.config: else: algolia_index_users = None +# Encoding backend +if app.config['ENCODING_BACKEND'] == 'zencoder': + encoding_service_client = Zencoder(app.config['ZENCODER_API_KEY']) +else: + encoding_service_client = None + from application.utils.authentication import validate_token from application.utils.authorization import check_permissions from application.utils.cdn import hash_file_path @@ -302,3 +310,6 @@ app.on_delete_item_files += before_deleting_file from modules.file_storage import file_storage #from modules.file_storage.serve import * app.register_blueprint(file_storage, url_prefix='/storage') +# The encoding module (receive notification and report progress) +from modules.encoding import encoding +app.register_blueprint(encoding, url_prefix='/encoding') diff --git a/pillar/application/modules/encoding.py b/pillar/application/modules/encoding.py new file mode 100644 index 00000000..a7714954 --- /dev/null +++ b/pillar/application/modules/encoding.py @@ -0,0 +1,57 @@ +from bson import ObjectId +from eve.methods.put import put_internal +from flask import Blueprint +from flask import abort +from flask import request +from application import app + +encoding = Blueprint('encoding', __name__) + +@encoding.route('/zencoder/notifications', methods=['POST']) +def zencoder_notifications(): + if app.config['ENCODING_BACKEND'] == 'zencoder': + if not app.config['DEBUG']: + # If we are in production, look for the Zencoder header secret + try: + notification_secret_request = request.headers[ + 'X-Zencoder-Notification-Secret'] + except KeyError: + return abort(401) + # If the header is found, check it agains the one in the config + notification_secret = app.config['ZENCODER_NOTIFICATIONS_SECRET'] + if notification_secret_request != notification_secret: + return abort(401) + # Cast request data into a dict + data = request.get_json() + files_collection = app.data.driver.db['files'] + # Find the file object based on processing backend and job_id + lookup = {'processing.backend': 'zencoder', 'processing.job_id': str( + data['job']['id'])} + f = files_collection.find_one(lookup) + if f: + file_id = f['_id'] + # Remove internal keys (so that we can run put internal) + internal_fields = ['_id', '_etag', '_updated', '_created', '_status'] + for field in internal_fields: + f.pop(field, None) + # Update processing status + f['processing']['status'] = data['job']['state'] + # For every variation encoded, try to update the file object + for output in data['outputs']: + format = output['format'] + # Change the zencoder 'mpeg4' format to 'mp4' used internally + format = 'mp4' if format == 'mpeg4' else format + # Find a variation matching format and resolution + variation = next((v for v in f['variations'] if v['format'] == format \ + and v['width'] == output['width']), None) + # If found, update with delivered file size + # TODO: calculate md5 on the storage + if variation: + variation['length'] = output['file_size_in_bytes'] + + r = put_internal('files', f, **{'_id': ObjectId(file_id)}) + return '' + else: + return abort(404) + else: + return abort(403) diff --git a/pillar/application/modules/file_storage/__init__.py b/pillar/application/modules/file_storage/__init__.py index 6c85d2fa..e71d23b4 100644 --- a/pillar/application/modules/file_storage/__init__.py +++ b/pillar/application/modules/file_storage/__init__.py @@ -15,6 +15,7 @@ from application.utils.imaging import ffmpeg_encode from application.utils.storage import remote_storage_sync from application.utils.storage import push_to_storage from application.utils.gcs import GoogleCloudStorageBucket +from application.utils.encoding import Encoder file_storage = Blueprint('file_storage', __name__, template_folder='templates', @@ -123,7 +124,8 @@ def process_file(src_file): src_file.pop(field, None) files_collection = app.data.driver.db['files'] - file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name'][:2], src_file['name']) + file_abs_path = os.path.join( + app.config['SHARED_DIR'], src_file['name'][:2], src_file['name']) src_file['length'] = os.stat(file_abs_path).st_size content_type = src_file['content_type'].split('/') @@ -180,26 +182,45 @@ def process_file(src_file): # Append file variation src_file['variations'].append(file_variation) - def encode(src, variations, res_y): + def encode(src_path, src_file, res_y): # For every variation in the list call video_encode # print "encoding {0}".format(variations) - for v in variations: - path = ffmpeg_encode(file_abs_path, v['format'], res_y) - # Update size data after encoding - v['length'] = os.stat(path).st_size + if app.config['ENCODING_BACKEND'] == 'zencoder': + # Move the source file in place on the remote storage (which can + # be accessed from zencoder) + push_to_storage(str(src_file['project']), src_path) + j = Encoder.job_create(src_file) + try: + if j: + src_file['processing'] = dict( + status='pending', + job_id="{0}".format(j['process_id']), + backend=j['backend']) + # Add the processing status to the file object + r = put_internal('files', + src_file, **{'_id': ObjectId(file_id)}) + pass + except KeyError: + pass + elif app.config['ENCODING_BACKEND'] == 'local': + for v in src_file['variations']: + path = ffmpeg_encode(src_path, v['format'], res_y) + # Update size data after encoding + v['length'] = os.stat(path).st_size - r = put_internal('files', src_file, **{'_id': ObjectId(file_id)}) - # When all encodes are done, delete source file - sync_path = os.path.split(file_abs_path)[0] - push_to_storage(str(src_file['project']), sync_path) + r = put_internal('files', src_file, **{'_id': ObjectId(file_id)}) + # When all encodes are done, delete source file + sync_path = os.path.split(src_path)[0] + push_to_storage(str(src_file['project']), sync_path) - p = Process(target=encode, args=(file_abs_path, src_file['variations'], res_y)) + p = Process(target=encode, args=(file_abs_path, src_file, res_y)) p.start() if mime_type != 'video': # Sync the whole subdir sync_path = os.path.split(file_abs_path)[0] # push_to_storage(str(src_file['project']), sync_path) - p = Process(target=push_to_storage, args=(str(src_file['project']), sync_path)) + p = Process(target=push_to_storage, args=( + str(src_file['project']), sync_path)) p.start() else: sync_path = file_abs_path diff --git a/pillar/application/utils/encoding.py b/pillar/application/utils/encoding.py new file mode 100644 index 00000000..2b5c1420 --- /dev/null +++ b/pillar/application/utils/encoding.py @@ -0,0 +1,43 @@ +import os +from zencoder import Zencoder +from application import encoding_service_client +from application import app + +class Encoder: + """Generic Encoder wrapper. Provides a consistent API, independent from + the encoding backend enabled. + """ + + @staticmethod + def job_create(src_file): + """Create an encoding job. Return the backend used as well as an id. + """ + if isinstance(encoding_service_client, Zencoder): + if src_file['backend'] == 'gcs': + # Build the specific GCS input url, assuming the file is stored + # in the _ subdirectory + storage_base = "gcs://{0}/_/".format(src_file['project']) + file_input = os.path.join(storage_base, src_file['file_path']) + outputs = [] + options = dict(notifications=app.config['ZENCODER_NOTIFICATIONS_URL']) + for v in src_file['variations']: + outputs.append({ + 'format': v['format'], + 'witdh': v['width'], + 'url': os.path.join(storage_base, v['file_path'])}) + r = encoding_service_client.job.create(file_input, outputs=outputs, + options=options) + if r.code == 201: + return dict(process_id=r.body['id'], backend='zencoder') + else: + return None + else: + return None + + @staticmethod + def job_progress(job_id): + if isinstance(encoding_service_client, Zencoder): + r = encoding_service_client.job.progress(int(job_id)) + return r.body + else: + return None diff --git a/pillar/application/utils/imaging.py b/pillar/application/utils/imaging.py index 73d1d747..c3885f17 100644 --- a/pillar/application/utils/imaging.py +++ b/pillar/application/utils/imaging.py @@ -204,4 +204,3 @@ def ffmpeg_encode(src, format, res_y=720): # return path of the encoded video return dst - diff --git a/pillar/application/utils/storage.py b/pillar/application/utils/storage.py index 9328394d..73621861 100644 --- a/pillar/application/utils/storage.py +++ b/pillar/application/utils/storage.py @@ -78,4 +78,3 @@ def push_to_storage(project_id, full_path, backend='cgs'): else: raise IOError('ERROR: path not found') - pass diff --git a/pillar/settings.py b/pillar/settings.py index 5d9b0a18..48322733 100644 --- a/pillar/settings.py +++ b/pillar/settings.py @@ -349,20 +349,6 @@ files_schema = { 'description': { 'type': 'string', }, - # If the object has a parent, it is a variation of its parent. When querying - # for a file we are going to check if the object does NOT have a parent. In - # this case we will query for all files with the ObjectID as parent and we - # will aggregate them according of the type (if it's an image we will use - # some prefix, if it's a video we will combine the contentType and a custom - # prefix, such as 720p) - 'parent': { - 'type': 'objectid', - 'data_relation': { - 'resource': 'files', - 'field': '_id', - 'embeddable': True - }, - }, 'content_type': { # MIME type image/png video/mp4 'type': 'string', 'required': True, @@ -459,20 +445,22 @@ files_schema = { } } }, - 'previews': { # Deprecated (see comments above) - 'type': 'list', + 'processing': { + 'type': 'dict', 'schema': { - 'type': 'objectid', - 'data_relation': { - 'resource': 'files', - 'field': '_id', - 'embeddable': True - } + 'job_id': { + 'type': 'string' # can be int, depending on the backend + }, + 'backend': { + 'type': 'string', + 'allowed': ["zencoder", "local"] + }, + 'status': { + 'type': 'string', + 'allowed': ["pending", "waiting", "processing", "finished", + "failed", "cancelled"] + }, } - }, - # Preview parameters: - 'is_preview': { # Deprecated - 'type': 'boolean' } } diff --git a/requirements.txt b/requirements.txt index 1e283414..8ed3795b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,3 +37,4 @@ six==1.9.0 WebOb==1.5.0 Werkzeug==0.10.4 wheel==0.24.0 +zencoder==0.6.5