From 27df603299eeacee49735ec9f18efe0edb5e4fce Mon Sep 17 00:00:00 2001 From: Francesco Siddi Date: Wed, 9 Nov 2016 03:08:41 +0100 Subject: [PATCH] Started moving processing function in subclasses --- pillar/api/eve_settings.py | 2 +- pillar/api/file_storage/__init__.py | 44 +++++---- pillar/api/utils/gcs.py | 9 ++ pillar/api/utils/storage.py | 146 +++++++++++++++++++++++++++- 4 files changed, 178 insertions(+), 23 deletions(-) diff --git a/pillar/api/eve_settings.py b/pillar/api/eve_settings.py index 544ea367..adf2ebd6 100644 --- a/pillar/api/eve_settings.py +++ b/pillar/api/eve_settings.py @@ -394,7 +394,7 @@ files_schema = { 'backend': { 'type': 'string', 'required': True, - 'allowed': ["attract-web", "pillar", "cdnsun", "gcs", "unittest"] + 'allowed': ["local", "pillar", "cdnsun", "gcs", "unittest"] }, # Where the file is in the backend storage itself. In the case of GCS, diff --git a/pillar/api/file_storage/__init__.py b/pillar/api/file_storage/__init__.py index a087929f..1adf3de7 100644 --- a/pillar/api/file_storage/__init__.py +++ b/pillar/api/file_storage/__init__.py @@ -1,26 +1,26 @@ +import datetime import io import logging import mimetypes +import os import tempfile import uuid from hashlib import md5 -import os -import requests + import bson.tz_util -import datetime import eve.utils import pymongo import werkzeug.exceptions as wz_exceptions from bson import ObjectId -from flask import Blueprint +from flask import Blueprint, current_app from flask import current_app from flask import g from flask import jsonify from flask import request from flask import send_from_directory from flask import url_for, helpers + from pillar.api import utils -from pillar.api.utils.imaging import generate_local_thumbnails from pillar.api.utils import remove_private_keys, authentication from pillar.api.utils.authorization import require_login, user_has_role, \ user_matches_roles @@ -28,7 +28,8 @@ from pillar.api.utils.cdn import hash_file_path from pillar.api.utils.encoding import Encoder from pillar.api.utils.gcs import GoogleCloudStorageBucket, \ GoogleCloudStorageBlob -from pillar.api.utils.storage import LocalBucket, LocalBlob, default_storage_backend +from pillar.api.utils.imaging import generate_local_thumbnails +from pillar.api.utils.storage import LocalBlob, default_storage_backend log = logging.getLogger(__name__) @@ -317,8 +318,9 @@ def generate_link(backend, file_path, project_id=None, is_public=False): return blob['public_url'] return blob['signed_url'] if backend == 'local': - f = LocalBlob(project_id, file_path) - return url_for('file_storage.index', file_name=f.partial_path, + bucket = default_storage_backend(project_id) + blob = bucket.get_blob(file_path) + return url_for('file_storage.index', file_name=blob.partial_path, _external=True, _scheme=current_app.config['SCHEME']) if backend == 'pillar': return url_for('file_storage.index', file_name=file_path, @@ -550,12 +552,15 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds): @require_login() def create_file_doc(name, filename, content_type, length, project, - backend='gcs', **extra_fields): + backend=None, **extra_fields): """Creates a minimal File document for storage in MongoDB. Doesn't save it to MongoDB yet. """ + if backend is None: + backend = current_app.config['STORAGE_BACKEND'] + current_user = g.get('current_user') file_doc = {'name': name, @@ -728,7 +733,8 @@ def stream_to_storage(project_id): log.debug('Processing uploaded file id=%s, fname=%s, size=%i', file_id, internal_fname, blob.size) - process_file(storage_backend, file_id, local_file) + # process_file(storage_backend, file_id, local_file) + blob.process_file(file_id) # Local processing is done, we can close the local file so it is removed. if local_file is not None: @@ -783,15 +789,6 @@ def add_access_control_headers(resp): return resp -def update_file_doc(file_id, **updates): - files = current_app.data.driver.db['files'] - res = files.update_one({'_id': ObjectId(file_id)}, - {'$set': updates}) - log.debug('update_file_doc(%s, %s): %i matched, %i updated.', - file_id, updates, res.matched_count, res.modified_count) - return res - - def create_file_doc_for_upload(project_id, uploaded_file): """Creates a secure filename and a document in MongoDB for the file. @@ -876,3 +873,12 @@ def setup_app(app, url_prefix): app.on_insert_files += compute_aggregate_length_items app.register_api_blueprint(file_storage, url_prefix=url_prefix) + + +def update_file_doc(file_id, **updates): + files = current_app.data.driver.db['files'] + res = files.update_one({'_id': ObjectId(file_id)}, + {'$set': updates}) + log.debug('update_file_doc(%s, %s): %i matched, %i updated.', + file_id, updates, res.matched_count, res.modified_count) + return res \ No newline at end of file diff --git a/pillar/api/utils/gcs.py b/pillar/api/utils/gcs.py index 620ce43a..6e39e2a4 100644 --- a/pillar/api/utils/gcs.py +++ b/pillar/api/utils/gcs.py @@ -196,6 +196,15 @@ class GoogleCloudStorageBlob(Blob): self.blob = bucket.gcs_bucket.blob('_/' + name, chunk_size=256 * 1024 * 2) + def create_from_file(self, uploaded_file, file_size): + pass + + def _process_image(self, file_doc): + pass + + def _process_video(self, file_doc): + pass + def update_file_name(node): """Assign to the CGS blob the same name of the asset node. This way when diff --git a/pillar/api/utils/storage.py b/pillar/api/utils/storage.py index 3601bce3..63ca608c 100644 --- a/pillar/api/utils/storage.py +++ b/pillar/api/utils/storage.py @@ -5,8 +5,13 @@ import logging import os import shutil +from bson import ObjectId from flask import current_app +from pillar.api import utils +from pillar.api.utils.authorization import user_has_role +from pillar.api.utils.imaging import generate_local_thumbnails + log = logging.getLogger(__name__) # Mapping from backend name to backend class @@ -86,6 +91,93 @@ class Blob(object): def create_from_file(self, uploaded_file, file_size): pass + @abc.abstractmethod + def _process_image(self, file_doc): + pass + + @abc.abstractmethod + def _process_video(self, file_doc): + pass + + def process_file(self, file_id): + """Generate image thumbnails or encode video. + + :type file_id: string + :param file_id: The document ID for the file processed. We need it to + update the document as we process the file. + """ + + def update_file_doc(file_id, **updates): + res = files.update_one({'_id': ObjectId(file_id)}, + {'$set': updates}) + + log.debug('update_file_doc(%s, %s): %i matched, %i updated.', + file_id, updates, res.matched_count, res.modified_count) + return res + + file_id = ObjectId(file_id) + + # Fetch the src_file document from MongoDB. + files = current_app.data.driver.db['files'] + src_file = files.find_one(file_id) + if not src_file: + log.warning( + 'process_file(%s): no such file document found, ignoring.') + return + + # Update the 'format' field from the content type. + # TODO: overrule the content type based on file extention & magic numbers. + mime_category, src_file['format'] = src_file['content_type'].split('/', + 1) + # Prevent video handling for non-admins. + if not user_has_role(u'admin') and mime_category == 'video': + if src_file['format'].startswith('x-'): + xified = src_file['format'] + else: + xified = 'x-' + src_file['format'] + + src_file['content_type'] = 'application/%s' % xified + mime_category = 'application' + log.info('Not processing video file %s for non-admin user', file_id) + + # Run the required processor, based on the MIME category. + processors = { + 'image': self._process_image, + 'video': self._process_video, + } + + try: + processor = processors[mime_category] + except KeyError: + log.info("POSTed file %s was of type %r, which isn't " + "thumbnailed/encoded.", file_id, + mime_category) + src_file['status'] = 'complete' + else: + log.debug('process_file(%s): marking file status as "processing"', + file_id) + src_file['status'] = 'processing' + update_file_doc(file_id, status='processing') + + try: + processor(src_file) + except Exception: + log.warning('process_file(%s): error when processing file, ' + 'resetting status to ' + '"queued_for_processing"', file_id, exc_info=True) + update_file_doc(file_id, status='queued_for_processing') + return + + src_file = utils.remove_private_keys(src_file) + # Update the original file with additional info, e.g. image resolution + r, _, _, status = current_app.put_internal('files', src_file, + _id=file_id) + if status not in (200, 201): + log.warning( + 'process_file(%s): status %i when saving processed file ' + 'info to MongoDB: %s', + file_id, status, r) + @register_backend('local') class LocalBucket(Bucket): @@ -96,8 +188,8 @@ class LocalBucket(Bucket): return LocalBlob(name=blob_name, bucket=self) def get_blob(self, blob_name): - # Check if file exists, otherwise None - return None + # TODO: Check if file exists, otherwise None + return self.blob(blob_name) class LocalBlob(Blob): @@ -112,16 +204,64 @@ class LocalBlob(Blob): def create_from_file(self, uploaded_file, file_size): # Ensure path exists before saving - os.makedirs(os.path.dirname(self.path)) + directory = os.path.dirname(self.path) + if not os.path.exists(directory): + os.makedirs(directory) with open(self.path, 'wb') as outfile: shutil.copyfileobj(uploaded_file, outfile) self._size_in_bytes = file_size + def _process_image(self, file_doc): + from PIL import Image + + im = Image.open(self.path) + res = im.size + file_doc['width'] = res[0] + file_doc['height'] = res[1] + + # Generate previews + log.info('Generating thumbnails for file %s', file_doc['_id']) + file_doc['variations'] = generate_local_thumbnails(file_doc['name'], + self.path) + + # Send those previews to Google Cloud Storage. + log.info('Uploading %i thumbnails for file %s to Google Cloud Storage ' + '(GCS)', len(file_doc['variations']), file_doc['_id']) + + # TODO: parallelize this at some point. + for variation in file_doc['variations']: + fname = variation['file_path'] + if current_app.config['TESTING']: + log.warning(' - NOT making thumbnails', fname) + else: + log.debug(' - Sending thumbnail %s to GCS', fname) + + blob = self.bucket.blob(fname) + blob.create_from_file(variation['local_path'], + variation['length']) + + try: + os.unlink(variation['local_path']) + except OSError: + log.warning( + 'Unable to unlink %s, ignoring this but it will need ' + 'cleanup later.', variation['local_path']) + + del variation['local_path'] + + log.info('Done processing file %s', file_doc['_id']) + file_doc['status'] = 'complete' + + def _process_video(self, file_doc): + pass + def default_storage_backend(name): from flask import current_app backend_cls = backends[current_app.config['STORAGE_BACKEND']] return backend_cls(name) + +