Use threading when generating links & renaming files at GCS.

Once we move to Python 3.5 we should replace this with asyncio, so we have
more control over the total number of threads we spawn.
This commit is contained in:
Sybren A. Stüvel 2016-07-21 14:11:53 +02:00
parent d46eed7a34
commit 2fc89a728d
2 changed files with 43 additions and 8 deletions

View File

@ -1,4 +1,5 @@
import datetime import datetime
import functools
import logging import logging
import mimetypes import mimetypes
import os import os
@ -6,6 +7,7 @@ import tempfile
import uuid import uuid
import io import io
from hashlib import md5 from hashlib import md5
import threading
import bson.tz_util import bson.tz_util
import eve.utils import eve.utils
@ -376,12 +378,28 @@ def _generate_all_links(response, now):
project_id = str( project_id = str(
response['project']) if 'project' in response else None # TODO: add project id to all files response['project']) if 'project' in response else None # TODO: add project id to all files
backend = response['backend'] backend = response['backend']
response['link'] = generate_link(backend, response['file_path'], project_id)
variations = response.get('variations') def generate_link_for(store_in):
if variations: log.debug(' generating link for %s', store_in['file_path'])
for variation in variations: store_in['link'] = generate_link(backend, store_in['file_path'], project_id)
variation['link'] = generate_link(backend, variation['file_path'], project_id)
# Generate links in parallel using the thread pool.
# TODO: use asyncio for this, once we're on Python 3.5, to limit the total number
# of concurrent threads.
threads = []
thread = threading.Thread(target=functools.partial(generate_link_for, response))
threads.append(thread)
variations = response.get('variations', ())
for variation in variations:
thread = threading.Thread(target=functools.partial(generate_link_for, variation))
threads.append(thread)
# Start & wait for all the parallel tasks.
for thread in threads:
thread.start()
for thread in threads:
thread.join(120)
# Construct the new expiry datetime. # Construct the new expiry datetime.
validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend] validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend]

View File

@ -1,7 +1,9 @@
import functools
import os import os
import time import time
import datetime import datetime
import logging import logging
import threading
from bson import ObjectId from bson import ObjectId
from gcloud.storage.client import Client from gcloud.storage.client import Client
@ -159,6 +161,8 @@ def update_file_name(node):
if node['properties'].get('status', '') == 'processing': if node['properties'].get('status', '') == 'processing':
return return
files_collection = current_app.data.driver.db['files']
def _format_name(name, override_ext, size=None, map_type=u''): def _format_name(name, override_ext, size=None, map_type=u''):
root, _ = os.path.splitext(name) root, _ = os.path.splitext(name)
size = u'-{}'.format(size) if size else u'' size = u'-{}'.format(size) if size else u''
@ -166,7 +170,6 @@ def update_file_name(node):
return u'{}{}{}{}'.format(root, size, map_type, override_ext) return u'{}{}{}{}'.format(root, size, map_type, override_ext)
def _update_name(file_id, file_props): def _update_name(file_id, file_props):
files_collection = current_app.data.driver.db['files']
file_doc = files_collection.find_one({'_id': ObjectId(file_id)}) file_doc = files_collection.find_one({'_id': ObjectId(file_id)})
if file_doc is None or file_doc['backend'] != 'gcs': if file_doc is None or file_doc['backend'] != 'gcs':
@ -193,12 +196,26 @@ def update_file_name(node):
continue continue
storage.update_name(blob, name) storage.update_name(blob, name)
# Generate links in parallel using the thread pool.
threads = []
log.debug('Creating _update_name tasks')
# Currently we search for 'file' and 'files' keys in the object properties. # Currently we search for 'file' and 'files' keys in the object properties.
# This could become a bit more flexible and realy on a true reference of the # This could become a bit more flexible and realy on a true reference of the
# file object type from the schema. # file object type from the schema.
if 'file' in node['properties']: if 'file' in node['properties']:
_update_name(node['properties']['file'], {}) thread = threading.Thread(target=functools.partial(
_update_name, node['properties']['file'], {}))
threads.append(thread)
if 'files' in node['properties']: if 'files' in node['properties']:
for file_props in node['properties']['files']: for file_props in node['properties']['files']:
_update_name(file_props['file'], file_props) thread = threading.Thread(target=functools.partial(
_update_name, file_props['file'], file_props))
threads.append(thread)
# Start & wait for all the parallel tasks.
for thread in threads:
thread.start()
for thread in threads:
thread.join(120)