Introducing external video encoding

It is now possible to specify an encoding backend (at the moment only
zencoder) to take care of video variations encoding. Files transfer
happens directly on CGS (although any storage backend can be
supported). New requirements is the Zencoder Python library.
This commit is contained in:
Francesco Siddi 2016-02-22 16:48:53 +01:00
parent 774bc35206
commit 3308751ed4
8 changed files with 159 additions and 40 deletions

View File

@ -5,11 +5,13 @@ from datetime import datetime
import bugsnag
from bugsnag.flask import handle_exceptions
from algoliasearch import algoliasearch
from zencoder import Zencoder
from flask import g
from flask import request
from flask import url_for
from flask import abort
from eve import Eve
from eve.auth import TokenAuth
from eve.io.mongo import Validator
@ -105,6 +107,12 @@ if 'ALGOLIA_USER' in app.config:
else:
algolia_index_users = None
# Encoding backend
if app.config['ENCODING_BACKEND'] == 'zencoder':
encoding_service_client = Zencoder(app.config['ZENCODER_API_KEY'])
else:
encoding_service_client = None
from application.utils.authentication import validate_token
from application.utils.authorization import check_permissions
from application.utils.cdn import hash_file_path
@ -302,3 +310,6 @@ app.on_delete_item_files += before_deleting_file
from modules.file_storage import file_storage
#from modules.file_storage.serve import *
app.register_blueprint(file_storage, url_prefix='/storage')
# The encoding module (receive notification and report progress)
from modules.encoding import encoding
app.register_blueprint(encoding, url_prefix='/encoding')

View File

@ -0,0 +1,57 @@
from bson import ObjectId
from eve.methods.put import put_internal
from flask import Blueprint
from flask import abort
from flask import request
from application import app
encoding = Blueprint('encoding', __name__)
@encoding.route('/zencoder/notifications', methods=['POST'])
def zencoder_notifications():
if app.config['ENCODING_BACKEND'] == 'zencoder':
if not app.config['DEBUG']:
# If we are in production, look for the Zencoder header secret
try:
notification_secret_request = request.headers[
'X-Zencoder-Notification-Secret']
except KeyError:
return abort(401)
# If the header is found, check it agains the one in the config
notification_secret = app.config['ZENCODER_NOTIFICATIONS_SECRET']
if notification_secret_request != notification_secret:
return abort(401)
# Cast request data into a dict
data = request.get_json()
files_collection = app.data.driver.db['files']
# Find the file object based on processing backend and job_id
lookup = {'processing.backend': 'zencoder', 'processing.job_id': str(
data['job']['id'])}
f = files_collection.find_one(lookup)
if f:
file_id = f['_id']
# Remove internal keys (so that we can run put internal)
internal_fields = ['_id', '_etag', '_updated', '_created', '_status']
for field in internal_fields:
f.pop(field, None)
# Update processing status
f['processing']['status'] = data['job']['state']
# For every variation encoded, try to update the file object
for output in data['outputs']:
format = output['format']
# Change the zencoder 'mpeg4' format to 'mp4' used internally
format = 'mp4' if format == 'mpeg4' else format
# Find a variation matching format and resolution
variation = next((v for v in f['variations'] if v['format'] == format \
and v['width'] == output['width']), None)
# If found, update with delivered file size
# TODO: calculate md5 on the storage
if variation:
variation['length'] = output['file_size_in_bytes']
r = put_internal('files', f, **{'_id': ObjectId(file_id)})
return ''
else:
return abort(404)
else:
return abort(403)

View File

@ -15,6 +15,7 @@ from application.utils.imaging import ffmpeg_encode
from application.utils.storage import remote_storage_sync
from application.utils.storage import push_to_storage
from application.utils.gcs import GoogleCloudStorageBucket
from application.utils.encoding import Encoder
file_storage = Blueprint('file_storage', __name__,
template_folder='templates',
@ -123,7 +124,8 @@ def process_file(src_file):
src_file.pop(field, None)
files_collection = app.data.driver.db['files']
file_abs_path = os.path.join(app.config['SHARED_DIR'], src_file['name'][:2], src_file['name'])
file_abs_path = os.path.join(
app.config['SHARED_DIR'], src_file['name'][:2], src_file['name'])
src_file['length'] = os.stat(file_abs_path).st_size
content_type = src_file['content_type'].split('/')
@ -180,26 +182,45 @@ def process_file(src_file):
# Append file variation
src_file['variations'].append(file_variation)
def encode(src, variations, res_y):
def encode(src_path, src_file, res_y):
# For every variation in the list call video_encode
# print "encoding {0}".format(variations)
for v in variations:
path = ffmpeg_encode(file_abs_path, v['format'], res_y)
if app.config['ENCODING_BACKEND'] == 'zencoder':
# Move the source file in place on the remote storage (which can
# be accessed from zencoder)
push_to_storage(str(src_file['project']), src_path)
j = Encoder.job_create(src_file)
try:
if j:
src_file['processing'] = dict(
status='pending',
job_id="{0}".format(j['process_id']),
backend=j['backend'])
# Add the processing status to the file object
r = put_internal('files',
src_file, **{'_id': ObjectId(file_id)})
pass
except KeyError:
pass
elif app.config['ENCODING_BACKEND'] == 'local':
for v in src_file['variations']:
path = ffmpeg_encode(src_path, v['format'], res_y)
# Update size data after encoding
v['length'] = os.stat(path).st_size
r = put_internal('files', src_file, **{'_id': ObjectId(file_id)})
# When all encodes are done, delete source file
sync_path = os.path.split(file_abs_path)[0]
sync_path = os.path.split(src_path)[0]
push_to_storage(str(src_file['project']), sync_path)
p = Process(target=encode, args=(file_abs_path, src_file['variations'], res_y))
p = Process(target=encode, args=(file_abs_path, src_file, res_y))
p.start()
if mime_type != 'video':
# Sync the whole subdir
sync_path = os.path.split(file_abs_path)[0]
# push_to_storage(str(src_file['project']), sync_path)
p = Process(target=push_to_storage, args=(str(src_file['project']), sync_path))
p = Process(target=push_to_storage, args=(
str(src_file['project']), sync_path))
p.start()
else:
sync_path = file_abs_path

View File

@ -0,0 +1,43 @@
import os
from zencoder import Zencoder
from application import encoding_service_client
from application import app
class Encoder:
"""Generic Encoder wrapper. Provides a consistent API, independent from
the encoding backend enabled.
"""
@staticmethod
def job_create(src_file):
"""Create an encoding job. Return the backend used as well as an id.
"""
if isinstance(encoding_service_client, Zencoder):
if src_file['backend'] == 'gcs':
# Build the specific GCS input url, assuming the file is stored
# in the _ subdirectory
storage_base = "gcs://{0}/_/".format(src_file['project'])
file_input = os.path.join(storage_base, src_file['file_path'])
outputs = []
options = dict(notifications=app.config['ZENCODER_NOTIFICATIONS_URL'])
for v in src_file['variations']:
outputs.append({
'format': v['format'],
'witdh': v['width'],
'url': os.path.join(storage_base, v['file_path'])})
r = encoding_service_client.job.create(file_input, outputs=outputs,
options=options)
if r.code == 201:
return dict(process_id=r.body['id'], backend='zencoder')
else:
return None
else:
return None
@staticmethod
def job_progress(job_id):
if isinstance(encoding_service_client, Zencoder):
r = encoding_service_client.job.progress(int(job_id))
return r.body
else:
return None

View File

@ -204,4 +204,3 @@ def ffmpeg_encode(src, format, res_y=720):
# return path of the encoded video
return dst

View File

@ -78,4 +78,3 @@ def push_to_storage(project_id, full_path, backend='cgs'):
else:
raise IOError('ERROR: path not found')
pass

View File

@ -349,20 +349,6 @@ files_schema = {
'description': {
'type': 'string',
},
# If the object has a parent, it is a variation of its parent. When querying
# for a file we are going to check if the object does NOT have a parent. In
# this case we will query for all files with the ObjectID as parent and we
# will aggregate them according of the type (if it's an image we will use
# some prefix, if it's a video we will combine the contentType and a custom
# prefix, such as 720p)
'parent': {
'type': 'objectid',
'data_relation': {
'resource': 'files',
'field': '_id',
'embeddable': True
},
},
'content_type': { # MIME type image/png video/mp4
'type': 'string',
'required': True,
@ -459,20 +445,22 @@ files_schema = {
}
}
},
'previews': { # Deprecated (see comments above)
'type': 'list',
'processing': {
'type': 'dict',
'schema': {
'type': 'objectid',
'data_relation': {
'resource': 'files',
'field': '_id',
'embeddable': True
}
}
'job_id': {
'type': 'string' # can be int, depending on the backend
},
# Preview parameters:
'is_preview': { # Deprecated
'type': 'boolean'
'backend': {
'type': 'string',
'allowed': ["zencoder", "local"]
},
'status': {
'type': 'string',
'allowed': ["pending", "waiting", "processing", "finished",
"failed", "cancelled"]
},
}
}
}

View File

@ -37,3 +37,4 @@ six==1.9.0
WebOb==1.5.0
Werkzeug==0.10.4
wheel==0.24.0
zencoder==0.6.5