diff --git a/pillar/application/modules/file_storage.py b/pillar/application/modules/file_storage.py index 08b37ba9..256dbadf 100644 --- a/pillar/application/modules/file_storage.py +++ b/pillar/application/modules/file_storage.py @@ -1,4 +1,5 @@ import datetime +import functools import logging import mimetypes import os @@ -6,6 +7,7 @@ import tempfile import uuid import io from hashlib import md5 +import threading import bson.tz_util import eve.utils @@ -376,12 +378,28 @@ def _generate_all_links(response, now): project_id = str( response['project']) if 'project' in response else None # TODO: add project id to all files backend = response['backend'] - response['link'] = generate_link(backend, response['file_path'], project_id) - variations = response.get('variations') - if variations: - for variation in variations: - variation['link'] = generate_link(backend, variation['file_path'], project_id) + def generate_link_for(store_in): + log.debug(' generating link for %s', store_in['file_path']) + store_in['link'] = generate_link(backend, store_in['file_path'], project_id) + + # Generate links in parallel using the thread pool. + # TODO: use asyncio for this, once we're on Python 3.5, to limit the total number + # of concurrent threads. + threads = [] + thread = threading.Thread(target=functools.partial(generate_link_for, response)) + threads.append(thread) + + variations = response.get('variations', ()) + for variation in variations: + thread = threading.Thread(target=functools.partial(generate_link_for, variation)) + threads.append(thread) + + # Start & wait for all the parallel tasks. + for thread in threads: + thread.start() + for thread in threads: + thread.join(120) # Construct the new expiry datetime. validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend] diff --git a/pillar/application/utils/gcs.py b/pillar/application/utils/gcs.py index 23b336c7..83e5f074 100644 --- a/pillar/application/utils/gcs.py +++ b/pillar/application/utils/gcs.py @@ -1,7 +1,9 @@ +import functools import os import time import datetime import logging +import threading from bson import ObjectId from gcloud.storage.client import Client @@ -159,6 +161,8 @@ def update_file_name(node): if node['properties'].get('status', '') == 'processing': return + files_collection = current_app.data.driver.db['files'] + def _format_name(name, override_ext, size=None, map_type=u''): root, _ = os.path.splitext(name) size = u'-{}'.format(size) if size else u'' @@ -166,7 +170,6 @@ def update_file_name(node): return u'{}{}{}{}'.format(root, size, map_type, override_ext) def _update_name(file_id, file_props): - files_collection = current_app.data.driver.db['files'] file_doc = files_collection.find_one({'_id': ObjectId(file_id)}) if file_doc is None or file_doc['backend'] != 'gcs': @@ -193,12 +196,26 @@ def update_file_name(node): continue storage.update_name(blob, name) + # Generate links in parallel using the thread pool. + threads = [] + log.debug('Creating _update_name tasks') + # Currently we search for 'file' and 'files' keys in the object properties. # This could become a bit more flexible and realy on a true reference of the # file object type from the schema. if 'file' in node['properties']: - _update_name(node['properties']['file'], {}) + thread = threading.Thread(target=functools.partial( + _update_name, node['properties']['file'], {})) + threads.append(thread) if 'files' in node['properties']: for file_props in node['properties']['files']: - _update_name(file_props['file'], file_props) + thread = threading.Thread(target=functools.partial( + _update_name, file_props['file'], file_props)) + threads.append(thread) + + # Start & wait for all the parallel tasks. + for thread in threads: + thread.start() + for thread in threads: + thread.join(120)