More streamlined interface with Pillar.

Using a semaphore to ensure requests to Pillar aren't too parallel,
so that we can cancel requests faster.
This commit is contained in:
Sybren A. Stüvel 2016-04-01 18:47:06 +02:00
parent 2c4c102302
commit 29230f09e7
3 changed files with 42 additions and 46 deletions

View File

@ -4,6 +4,7 @@ import asyncio
import traceback import traceback
import concurrent.futures import concurrent.futures
import logging import logging
import gc
import bpy import bpy
@ -53,6 +54,9 @@ def kick_async_loop(*args) -> bool:
len(all_tasks)) len(all_tasks))
stop_after_this_kick = True stop_after_this_kick = True
# Clean up circular references between tasks.
gc.collect()
for task_idx, task in enumerate(all_tasks): for task_idx, task in enumerate(all_tasks):
if not task.done(): if not task.done():
continue continue
@ -68,6 +72,9 @@ def kick_async_loop(*args) -> bool:
print('{}: resulted in exception'.format(task)) print('{}: resulted in exception'.format(task))
traceback.print_exc() traceback.print_exc()
# for ref in gc.get_referrers(task):
# log.debug(' - referred by %s', ref)
loop.stop() loop.stop()
loop.run_forever() loop.run_forever()

View File

@ -313,6 +313,7 @@ class BlenderCloudBrowser(bpy.types.Operator):
return return
# Signal that we want to stop. # Signal that we want to stop.
self.async_task.cancel()
if not self.signalling_future.done(): if not self.signalling_future.done():
self.log.info("Signalling that we want to cancel anything that's running.") self.log.info("Signalling that we want to cancel anything that's running.")
self.signalling_future.cancel() self.signalling_future.cancel()

View File

@ -101,17 +101,26 @@ def pillar_api(pillar_endpoint: str = None) -> pillarsdk.Api:
return _pillar_api return _pillar_api
# No more than this many Pillar calls should be made simultaneously
pillar_semaphore = asyncio.Semaphore(3)
async def pillar_call(pillar_func, *args, **kwargs):
partial = functools.partial(pillar_func, *args, api=pillar_api(), **kwargs)
loop = asyncio.get_event_loop()
async with pillar_semaphore:
return await loop.run_in_executor(None, partial)
async def get_project_uuid(project_url: str) -> str: async def get_project_uuid(project_url: str) -> str:
"""Returns the UUID for the project, given its '/p/<project_url>' string.""" """Returns the UUID for the project, given its '/p/<project_url>' string."""
find_one = functools.partial(pillarsdk.Project.find_one, { try:
project = await pillar_call(pillarsdk.Project.find_one, {
'where': {'url': project_url}, 'where': {'url': project_url},
'projection': {'permissions': 1}, 'projection': {'permissions': 1},
}, api=pillar_api()) })
loop = asyncio.get_event_loop()
try:
project = await loop.run_in_executor(None, find_one)
except pillarsdk.exceptions.ResourceNotFound: except pillarsdk.exceptions.ResourceNotFound:
log.error('Project with URL %r does not exist', project_url) log.error('Project with URL %r does not exist', project_url)
return None return None
@ -148,17 +157,14 @@ async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None,
if node_type: if node_type:
where['node_type'] = node_type where['node_type'] = node_type
node_all = functools.partial(pillarsdk.Node.all, { children = await pillar_call(pillarsdk.Node.all, {
'projection': {'name': 1, 'parent': 1, 'node_type': 1, 'projection': {'name': 1, 'parent': 1, 'node_type': 1,
'properties.order': 1, 'properties.status': 1, 'properties.order': 1, 'properties.status': 1,
'properties.files': 1, 'properties.files': 1,
'properties.content_type': 1, 'picture': 1}, 'properties.content_type': 1, 'picture': 1},
'where': where, 'where': where,
'sort': 'properties.order', 'sort': 'properties.order',
'embed': ['parent']}, api=pillar_api()) 'embed': ['parent']})
loop = asyncio.get_event_loop()
children = await loop.run_in_executor(None, node_all)
return children['_items'] return children['_items']
@ -274,11 +280,7 @@ async def fetch_thumbnail_info(file: pillarsdk.File, directory: str, desired_siz
finished. finished.
""" """
api = pillar_api() thumb_link = await pillar_call(file.thumbnail_file, desired_size)
loop = asyncio.get_event_loop()
thumb_link = await loop.run_in_executor(None, functools.partial(
file.thumbnail_file, desired_size, api=api))
if thumb_link is None: if thumb_link is None:
raise ValueError("File {} has no thumbnail of size {}" raise ValueError("File {} has no thumbnail of size {}"
@ -320,21 +322,12 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
log.warning('fetch_texture_thumbs: Texture downloading cancelled') log.warning('fetch_texture_thumbs: Texture downloading cancelled')
return return
# We don't want to gather too much in parallel, as it will make cancelling take more time.
# This is caused by HTTP requests going out in parallel, and once the socket is open and
# the GET request is sent, we can't cancel until the server starts streaming the response.
chunk_size = 2
for i in range(0, len(texture_nodes), chunk_size):
chunk = texture_nodes[i:i + chunk_size]
log.debug('fetch_texture_thumbs: Gathering texture[%i:%i] for parent node %r',
i, i + chunk_size, parent_node_uuid)
coros = (download_texture_thumbnail(texture_node, desired_size, coros = (download_texture_thumbnail(texture_node, desired_size,
thumbnail_directory, thumbnail_directory,
thumbnail_loading=thumbnail_loading, thumbnail_loading=thumbnail_loading,
thumbnail_loaded=thumbnail_loaded, thumbnail_loaded=thumbnail_loaded,
future=future) future=future)
for texture_node in chunk) for texture_node in texture_nodes)
# raises any exception from failed handle_texture_node() calls. # raises any exception from failed handle_texture_node() calls.
await asyncio.gather(*coros) await asyncio.gather(*coros)
@ -357,17 +350,14 @@ async def download_texture_thumbnail(texture_node, desired_size: str,
texture_node['_id']) texture_node['_id'])
return return
api = pillar_api()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
file_find = functools.partial(pillarsdk.File.find, params={
'projection': {'filename': 1, 'variations': 1, 'width': 1, 'height': 1},
}, api=api)
# Find the File that belongs to this texture node # Find the File that belongs to this texture node
pic_uuid = texture_node['picture'] pic_uuid = texture_node['picture']
loop.call_soon_threadsafe(thumbnail_loading, texture_node, texture_node) loop.call_soon_threadsafe(thumbnail_loading, texture_node, texture_node)
file_desc = await loop.run_in_executor(None, file_find, pic_uuid) file_desc = await pillar_call(pillarsdk.File.find, pic_uuid, params={
'projection': {'filename': 1, 'variations': 1, 'width': 1, 'height': 1},
})
if file_desc is None: if file_desc is None:
log.warning('Unable to find file for texture node %s', pic_uuid) log.warning('Unable to find file for texture node %s', pic_uuid)
@ -399,7 +389,7 @@ async def download_file_by_uuid(file_uuid,
target_directory: str, target_directory: str,
metadata_directory: str, metadata_directory: str,
*, *,
map_type: str=None, map_type: str = None,
file_loading: callable, file_loading: callable,
file_loaded: callable, file_loaded: callable,
future: asyncio.Future): future: asyncio.Future):
@ -410,11 +400,9 @@ async def download_file_by_uuid(file_uuid,
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# Find the File document. # Find the File document.
api = pillar_api() file_desc = await pillar_call(pillarsdk.File.find, file_uuid, params={
file_find = functools.partial(pillarsdk.File.find, params={
'projection': {'link': 1, 'filename': 1}, 'projection': {'link': 1, 'filename': 1},
}, api=api) })
file_desc = await loop.run_in_executor(None, file_find, file_uuid)
# Save the file document to disk # Save the file document to disk
metadata_file = os.path.join(metadata_directory, 'files', '%s.json' % file_uuid) metadata_file = os.path.join(metadata_directory, 'files', '%s.json' % file_uuid)
@ -455,7 +443,7 @@ async def download_texture(texture_node,
future=future) future=future)
for file_info in texture_node['properties']['files']) for file_info in texture_node['properties']['files'])
return await asyncio.gather(*downloaders) return await asyncio.gather(*downloaders, return_exceptions=True)
def is_cancelled(future: asyncio.Future) -> bool: def is_cancelled(future: asyncio.Future) -> bool: