Revert "Use threading when generating links & renaming files at GCS."
This reverts commit 2fc89a728d
.
This commit is contained in:
@@ -1,5 +1,4 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import functools
|
|
||||||
import logging
|
import logging
|
||||||
import mimetypes
|
import mimetypes
|
||||||
import os
|
import os
|
||||||
@@ -7,7 +6,6 @@ import tempfile
|
|||||||
import uuid
|
import uuid
|
||||||
import io
|
import io
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
import threading
|
|
||||||
|
|
||||||
import bson.tz_util
|
import bson.tz_util
|
||||||
import eve.utils
|
import eve.utils
|
||||||
@@ -378,28 +376,12 @@ def _generate_all_links(response, now):
|
|||||||
project_id = str(
|
project_id = str(
|
||||||
response['project']) if 'project' in response else None # TODO: add project id to all files
|
response['project']) if 'project' in response else None # TODO: add project id to all files
|
||||||
backend = response['backend']
|
backend = response['backend']
|
||||||
|
response['link'] = generate_link(backend, response['file_path'], project_id)
|
||||||
|
|
||||||
def generate_link_for(store_in):
|
variations = response.get('variations')
|
||||||
log.debug(' generating link for %s', store_in['file_path'])
|
if variations:
|
||||||
store_in['link'] = generate_link(backend, store_in['file_path'], project_id)
|
for variation in variations:
|
||||||
|
variation['link'] = generate_link(backend, variation['file_path'], project_id)
|
||||||
# Generate links in parallel using the thread pool.
|
|
||||||
# TODO: use asyncio for this, once we're on Python 3.5, to limit the total number
|
|
||||||
# of concurrent threads.
|
|
||||||
threads = []
|
|
||||||
thread = threading.Thread(target=functools.partial(generate_link_for, response))
|
|
||||||
threads.append(thread)
|
|
||||||
|
|
||||||
variations = response.get('variations', ())
|
|
||||||
for variation in variations:
|
|
||||||
thread = threading.Thread(target=functools.partial(generate_link_for, variation))
|
|
||||||
threads.append(thread)
|
|
||||||
|
|
||||||
# Start & wait for all the parallel tasks.
|
|
||||||
for thread in threads:
|
|
||||||
thread.start()
|
|
||||||
for thread in threads:
|
|
||||||
thread.join(120)
|
|
||||||
|
|
||||||
# Construct the new expiry datetime.
|
# Construct the new expiry datetime.
|
||||||
validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend]
|
validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend]
|
||||||
|
@@ -1,9 +1,7 @@
|
|||||||
import functools
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import threading
|
|
||||||
|
|
||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
from gcloud.storage.client import Client
|
from gcloud.storage.client import Client
|
||||||
@@ -161,8 +159,6 @@ def update_file_name(node):
|
|||||||
if node['properties'].get('status', '') == 'processing':
|
if node['properties'].get('status', '') == 'processing':
|
||||||
return
|
return
|
||||||
|
|
||||||
files_collection = current_app.data.driver.db['files']
|
|
||||||
|
|
||||||
def _format_name(name, override_ext, size=None, map_type=u''):
|
def _format_name(name, override_ext, size=None, map_type=u''):
|
||||||
root, _ = os.path.splitext(name)
|
root, _ = os.path.splitext(name)
|
||||||
size = u'-{}'.format(size) if size else u''
|
size = u'-{}'.format(size) if size else u''
|
||||||
@@ -170,6 +166,7 @@ def update_file_name(node):
|
|||||||
return u'{}{}{}{}'.format(root, size, map_type, override_ext)
|
return u'{}{}{}{}'.format(root, size, map_type, override_ext)
|
||||||
|
|
||||||
def _update_name(file_id, file_props):
|
def _update_name(file_id, file_props):
|
||||||
|
files_collection = current_app.data.driver.db['files']
|
||||||
file_doc = files_collection.find_one({'_id': ObjectId(file_id)})
|
file_doc = files_collection.find_one({'_id': ObjectId(file_id)})
|
||||||
|
|
||||||
if file_doc is None or file_doc['backend'] != 'gcs':
|
if file_doc is None or file_doc['backend'] != 'gcs':
|
||||||
@@ -196,26 +193,12 @@ def update_file_name(node):
|
|||||||
continue
|
continue
|
||||||
storage.update_name(blob, name)
|
storage.update_name(blob, name)
|
||||||
|
|
||||||
# Generate links in parallel using the thread pool.
|
|
||||||
threads = []
|
|
||||||
log.debug('Creating _update_name tasks')
|
|
||||||
|
|
||||||
# Currently we search for 'file' and 'files' keys in the object properties.
|
# Currently we search for 'file' and 'files' keys in the object properties.
|
||||||
# This could become a bit more flexible and realy on a true reference of the
|
# This could become a bit more flexible and realy on a true reference of the
|
||||||
# file object type from the schema.
|
# file object type from the schema.
|
||||||
if 'file' in node['properties']:
|
if 'file' in node['properties']:
|
||||||
thread = threading.Thread(target=functools.partial(
|
_update_name(node['properties']['file'], {})
|
||||||
_update_name, node['properties']['file'], {}))
|
|
||||||
threads.append(thread)
|
|
||||||
|
|
||||||
if 'files' in node['properties']:
|
if 'files' in node['properties']:
|
||||||
for file_props in node['properties']['files']:
|
for file_props in node['properties']['files']:
|
||||||
thread = threading.Thread(target=functools.partial(
|
_update_name(file_props['file'], file_props)
|
||||||
_update_name, file_props['file'], file_props))
|
|
||||||
threads.append(thread)
|
|
||||||
|
|
||||||
# Start & wait for all the parallel tasks.
|
|
||||||
for thread in threads:
|
|
||||||
thread.start()
|
|
||||||
for thread in threads:
|
|
||||||
thread.join(120)
|
|
||||||
|
Reference in New Issue
Block a user