Started moving processing function in subclasses

This commit is contained in:
Francesco Siddi 2016-11-09 03:08:41 +01:00
parent 4d6bf65a99
commit 27df603299
4 changed files with 178 additions and 23 deletions

View File

@ -394,7 +394,7 @@ files_schema = {
'backend': { 'backend': {
'type': 'string', 'type': 'string',
'required': True, 'required': True,
'allowed': ["attract-web", "pillar", "cdnsun", "gcs", "unittest"] 'allowed': ["local", "pillar", "cdnsun", "gcs", "unittest"]
}, },
# Where the file is in the backend storage itself. In the case of GCS, # Where the file is in the backend storage itself. In the case of GCS,

View File

@ -1,26 +1,26 @@
import datetime
import io import io
import logging import logging
import mimetypes import mimetypes
import os
import tempfile import tempfile
import uuid import uuid
from hashlib import md5 from hashlib import md5
import os
import requests
import bson.tz_util import bson.tz_util
import datetime
import eve.utils import eve.utils
import pymongo import pymongo
import werkzeug.exceptions as wz_exceptions import werkzeug.exceptions as wz_exceptions
from bson import ObjectId from bson import ObjectId
from flask import Blueprint from flask import Blueprint, current_app
from flask import current_app from flask import current_app
from flask import g from flask import g
from flask import jsonify from flask import jsonify
from flask import request from flask import request
from flask import send_from_directory from flask import send_from_directory
from flask import url_for, helpers from flask import url_for, helpers
from pillar.api import utils from pillar.api import utils
from pillar.api.utils.imaging import generate_local_thumbnails
from pillar.api.utils import remove_private_keys, authentication from pillar.api.utils import remove_private_keys, authentication
from pillar.api.utils.authorization import require_login, user_has_role, \ from pillar.api.utils.authorization import require_login, user_has_role, \
user_matches_roles user_matches_roles
@ -28,7 +28,8 @@ from pillar.api.utils.cdn import hash_file_path
from pillar.api.utils.encoding import Encoder from pillar.api.utils.encoding import Encoder
from pillar.api.utils.gcs import GoogleCloudStorageBucket, \ from pillar.api.utils.gcs import GoogleCloudStorageBucket, \
GoogleCloudStorageBlob GoogleCloudStorageBlob
from pillar.api.utils.storage import LocalBucket, LocalBlob, default_storage_backend from pillar.api.utils.imaging import generate_local_thumbnails
from pillar.api.utils.storage import LocalBlob, default_storage_backend
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -317,8 +318,9 @@ def generate_link(backend, file_path, project_id=None, is_public=False):
return blob['public_url'] return blob['public_url']
return blob['signed_url'] return blob['signed_url']
if backend == 'local': if backend == 'local':
f = LocalBlob(project_id, file_path) bucket = default_storage_backend(project_id)
return url_for('file_storage.index', file_name=f.partial_path, blob = bucket.get_blob(file_path)
return url_for('file_storage.index', file_name=blob.partial_path,
_external=True, _scheme=current_app.config['SCHEME']) _external=True, _scheme=current_app.config['SCHEME'])
if backend == 'pillar': if backend == 'pillar':
return url_for('file_storage.index', file_name=file_path, return url_for('file_storage.index', file_name=file_path,
@ -550,12 +552,15 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds):
@require_login() @require_login()
def create_file_doc(name, filename, content_type, length, project, def create_file_doc(name, filename, content_type, length, project,
backend='gcs', **extra_fields): backend=None, **extra_fields):
"""Creates a minimal File document for storage in MongoDB. """Creates a minimal File document for storage in MongoDB.
Doesn't save it to MongoDB yet. Doesn't save it to MongoDB yet.
""" """
if backend is None:
backend = current_app.config['STORAGE_BACKEND']
current_user = g.get('current_user') current_user = g.get('current_user')
file_doc = {'name': name, file_doc = {'name': name,
@ -728,7 +733,8 @@ def stream_to_storage(project_id):
log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id,
internal_fname, blob.size) internal_fname, blob.size)
process_file(storage_backend, file_id, local_file) # process_file(storage_backend, file_id, local_file)
blob.process_file(file_id)
# Local processing is done, we can close the local file so it is removed. # Local processing is done, we can close the local file so it is removed.
if local_file is not None: if local_file is not None:
@ -783,15 +789,6 @@ def add_access_control_headers(resp):
return resp return resp
def update_file_doc(file_id, **updates):
files = current_app.data.driver.db['files']
res = files.update_one({'_id': ObjectId(file_id)},
{'$set': updates})
log.debug('update_file_doc(%s, %s): %i matched, %i updated.',
file_id, updates, res.matched_count, res.modified_count)
return res
def create_file_doc_for_upload(project_id, uploaded_file): def create_file_doc_for_upload(project_id, uploaded_file):
"""Creates a secure filename and a document in MongoDB for the file. """Creates a secure filename and a document in MongoDB for the file.
@ -876,3 +873,12 @@ def setup_app(app, url_prefix):
app.on_insert_files += compute_aggregate_length_items app.on_insert_files += compute_aggregate_length_items
app.register_api_blueprint(file_storage, url_prefix=url_prefix) app.register_api_blueprint(file_storage, url_prefix=url_prefix)
def update_file_doc(file_id, **updates):
files = current_app.data.driver.db['files']
res = files.update_one({'_id': ObjectId(file_id)},
{'$set': updates})
log.debug('update_file_doc(%s, %s): %i matched, %i updated.',
file_id, updates, res.matched_count, res.modified_count)
return res

View File

@ -196,6 +196,15 @@ class GoogleCloudStorageBlob(Blob):
self.blob = bucket.gcs_bucket.blob('_/' + name, chunk_size=256 * 1024 * 2) self.blob = bucket.gcs_bucket.blob('_/' + name, chunk_size=256 * 1024 * 2)
def create_from_file(self, uploaded_file, file_size):
pass
def _process_image(self, file_doc):
pass
def _process_video(self, file_doc):
pass
def update_file_name(node): def update_file_name(node):
"""Assign to the CGS blob the same name of the asset node. This way when """Assign to the CGS blob the same name of the asset node. This way when

View File

@ -5,8 +5,13 @@ import logging
import os import os
import shutil import shutil
from bson import ObjectId
from flask import current_app from flask import current_app
from pillar.api import utils
from pillar.api.utils.authorization import user_has_role
from pillar.api.utils.imaging import generate_local_thumbnails
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Mapping from backend name to backend class # Mapping from backend name to backend class
@ -86,6 +91,93 @@ class Blob(object):
def create_from_file(self, uploaded_file, file_size): def create_from_file(self, uploaded_file, file_size):
pass pass
@abc.abstractmethod
def _process_image(self, file_doc):
pass
@abc.abstractmethod
def _process_video(self, file_doc):
pass
def process_file(self, file_id):
"""Generate image thumbnails or encode video.
:type file_id: string
:param file_id: The document ID for the file processed. We need it to
update the document as we process the file.
"""
def update_file_doc(file_id, **updates):
res = files.update_one({'_id': ObjectId(file_id)},
{'$set': updates})
log.debug('update_file_doc(%s, %s): %i matched, %i updated.',
file_id, updates, res.matched_count, res.modified_count)
return res
file_id = ObjectId(file_id)
# Fetch the src_file document from MongoDB.
files = current_app.data.driver.db['files']
src_file = files.find_one(file_id)
if not src_file:
log.warning(
'process_file(%s): no such file document found, ignoring.')
return
# Update the 'format' field from the content type.
# TODO: overrule the content type based on file extention & magic numbers.
mime_category, src_file['format'] = src_file['content_type'].split('/',
1)
# Prevent video handling for non-admins.
if not user_has_role(u'admin') and mime_category == 'video':
if src_file['format'].startswith('x-'):
xified = src_file['format']
else:
xified = 'x-' + src_file['format']
src_file['content_type'] = 'application/%s' % xified
mime_category = 'application'
log.info('Not processing video file %s for non-admin user', file_id)
# Run the required processor, based on the MIME category.
processors = {
'image': self._process_image,
'video': self._process_video,
}
try:
processor = processors[mime_category]
except KeyError:
log.info("POSTed file %s was of type %r, which isn't "
"thumbnailed/encoded.", file_id,
mime_category)
src_file['status'] = 'complete'
else:
log.debug('process_file(%s): marking file status as "processing"',
file_id)
src_file['status'] = 'processing'
update_file_doc(file_id, status='processing')
try:
processor(src_file)
except Exception:
log.warning('process_file(%s): error when processing file, '
'resetting status to '
'"queued_for_processing"', file_id, exc_info=True)
update_file_doc(file_id, status='queued_for_processing')
return
src_file = utils.remove_private_keys(src_file)
# Update the original file with additional info, e.g. image resolution
r, _, _, status = current_app.put_internal('files', src_file,
_id=file_id)
if status not in (200, 201):
log.warning(
'process_file(%s): status %i when saving processed file '
'info to MongoDB: %s',
file_id, status, r)
@register_backend('local') @register_backend('local')
class LocalBucket(Bucket): class LocalBucket(Bucket):
@ -96,8 +188,8 @@ class LocalBucket(Bucket):
return LocalBlob(name=blob_name, bucket=self) return LocalBlob(name=blob_name, bucket=self)
def get_blob(self, blob_name): def get_blob(self, blob_name):
# Check if file exists, otherwise None # TODO: Check if file exists, otherwise None
return None return self.blob(blob_name)
class LocalBlob(Blob): class LocalBlob(Blob):
@ -112,16 +204,64 @@ class LocalBlob(Blob):
def create_from_file(self, uploaded_file, file_size): def create_from_file(self, uploaded_file, file_size):
# Ensure path exists before saving # Ensure path exists before saving
os.makedirs(os.path.dirname(self.path)) directory = os.path.dirname(self.path)
if not os.path.exists(directory):
os.makedirs(directory)
with open(self.path, 'wb') as outfile: with open(self.path, 'wb') as outfile:
shutil.copyfileobj(uploaded_file, outfile) shutil.copyfileobj(uploaded_file, outfile)
self._size_in_bytes = file_size self._size_in_bytes = file_size
def _process_image(self, file_doc):
from PIL import Image
im = Image.open(self.path)
res = im.size
file_doc['width'] = res[0]
file_doc['height'] = res[1]
# Generate previews
log.info('Generating thumbnails for file %s', file_doc['_id'])
file_doc['variations'] = generate_local_thumbnails(file_doc['name'],
self.path)
# Send those previews to Google Cloud Storage.
log.info('Uploading %i thumbnails for file %s to Google Cloud Storage '
'(GCS)', len(file_doc['variations']), file_doc['_id'])
# TODO: parallelize this at some point.
for variation in file_doc['variations']:
fname = variation['file_path']
if current_app.config['TESTING']:
log.warning(' - NOT making thumbnails', fname)
else:
log.debug(' - Sending thumbnail %s to GCS', fname)
blob = self.bucket.blob(fname)
blob.create_from_file(variation['local_path'],
variation['length'])
try:
os.unlink(variation['local_path'])
except OSError:
log.warning(
'Unable to unlink %s, ignoring this but it will need '
'cleanup later.', variation['local_path'])
del variation['local_path']
log.info('Done processing file %s', file_doc['_id'])
file_doc['status'] = 'complete'
def _process_video(self, file_doc):
pass
def default_storage_backend(name): def default_storage_backend(name):
from flask import current_app from flask import current_app
backend_cls = backends[current_app.config['STORAGE_BACKEND']] backend_cls = backends[current_app.config['STORAGE_BACKEND']]
return backend_cls(name) return backend_cls(name)