From 29230f09e780a951e1d2cc0e95f8a5d47cdad970 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 1 Apr 2016 18:47:06 +0200 Subject: [PATCH] More streamlined interface with Pillar. Using a semaphore to ensure requests to Pillar aren't too parallel, so that we can cancel requests faster. --- blender_cloud/async_loop.py | 7 ++++ blender_cloud/gui.py | 1 + blender_cloud/pillar.py | 80 ++++++++++++++++--------------------- 3 files changed, 42 insertions(+), 46 deletions(-) diff --git a/blender_cloud/async_loop.py b/blender_cloud/async_loop.py index 277160e..503e1e2 100644 --- a/blender_cloud/async_loop.py +++ b/blender_cloud/async_loop.py @@ -4,6 +4,7 @@ import asyncio import traceback import concurrent.futures import logging +import gc import bpy @@ -53,6 +54,9 @@ def kick_async_loop(*args) -> bool: len(all_tasks)) stop_after_this_kick = True + # Clean up circular references between tasks. + gc.collect() + for task_idx, task in enumerate(all_tasks): if not task.done(): continue @@ -68,6 +72,9 @@ def kick_async_loop(*args) -> bool: print('{}: resulted in exception'.format(task)) traceback.print_exc() + # for ref in gc.get_referrers(task): + # log.debug(' - referred by %s', ref) + loop.stop() loop.run_forever() diff --git a/blender_cloud/gui.py b/blender_cloud/gui.py index 509f0da..fad8cab 100644 --- a/blender_cloud/gui.py +++ b/blender_cloud/gui.py @@ -313,6 +313,7 @@ class BlenderCloudBrowser(bpy.types.Operator): return # Signal that we want to stop. + self.async_task.cancel() if not self.signalling_future.done(): self.log.info("Signalling that we want to cancel anything that's running.") self.signalling_future.cancel() diff --git a/blender_cloud/pillar.py b/blender_cloud/pillar.py index ba2b01a..46f421a 100644 --- a/blender_cloud/pillar.py +++ b/blender_cloud/pillar.py @@ -101,17 +101,26 @@ def pillar_api(pillar_endpoint: str = None) -> pillarsdk.Api: return _pillar_api +# No more than this many Pillar calls should be made simultaneously +pillar_semaphore = asyncio.Semaphore(3) + + +async def pillar_call(pillar_func, *args, **kwargs): + partial = functools.partial(pillar_func, *args, api=pillar_api(), **kwargs) + loop = asyncio.get_event_loop() + + async with pillar_semaphore: + return await loop.run_in_executor(None, partial) + + async def get_project_uuid(project_url: str) -> str: """Returns the UUID for the project, given its '/p/' string.""" - find_one = functools.partial(pillarsdk.Project.find_one, { - 'where': {'url': project_url}, - 'projection': {'permissions': 1}, - }, api=pillar_api()) - - loop = asyncio.get_event_loop() try: - project = await loop.run_in_executor(None, find_one) + project = await pillar_call(pillarsdk.Project.find_one, { + 'where': {'url': project_url}, + 'projection': {'permissions': 1}, + }) except pillarsdk.exceptions.ResourceNotFound: log.error('Project with URL %r does not exist', project_url) return None @@ -148,17 +157,14 @@ async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None, if node_type: where['node_type'] = node_type - node_all = functools.partial(pillarsdk.Node.all, { + children = await pillar_call(pillarsdk.Node.all, { 'projection': {'name': 1, 'parent': 1, 'node_type': 1, 'properties.order': 1, 'properties.status': 1, 'properties.files': 1, 'properties.content_type': 1, 'picture': 1}, 'where': where, 'sort': 'properties.order', - 'embed': ['parent']}, api=pillar_api()) - - loop = asyncio.get_event_loop() - children = await loop.run_in_executor(None, node_all) + 'embed': ['parent']}) return children['_items'] @@ -274,11 +280,7 @@ async def fetch_thumbnail_info(file: pillarsdk.File, directory: str, desired_siz finished. """ - api = pillar_api() - - loop = asyncio.get_event_loop() - thumb_link = await loop.run_in_executor(None, functools.partial( - file.thumbnail_file, desired_size, api=api)) + thumb_link = await pillar_call(file.thumbnail_file, desired_size) if thumb_link is None: raise ValueError("File {} has no thumbnail of size {}" @@ -320,24 +322,15 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str, log.warning('fetch_texture_thumbs: Texture downloading cancelled') return - # We don't want to gather too much in parallel, as it will make cancelling take more time. - # This is caused by HTTP requests going out in parallel, and once the socket is open and - # the GET request is sent, we can't cancel until the server starts streaming the response. - chunk_size = 2 - for i in range(0, len(texture_nodes), chunk_size): - chunk = texture_nodes[i:i + chunk_size] + coros = (download_texture_thumbnail(texture_node, desired_size, + thumbnail_directory, + thumbnail_loading=thumbnail_loading, + thumbnail_loaded=thumbnail_loaded, + future=future) + for texture_node in texture_nodes) - log.debug('fetch_texture_thumbs: Gathering texture[%i:%i] for parent node %r', - i, i + chunk_size, parent_node_uuid) - coros = (download_texture_thumbnail(texture_node, desired_size, - thumbnail_directory, - thumbnail_loading=thumbnail_loading, - thumbnail_loaded=thumbnail_loaded, - future=future) - for texture_node in chunk) - - # raises any exception from failed handle_texture_node() calls. - await asyncio.gather(*coros) + # raises any exception from failed handle_texture_node() calls. + await asyncio.gather(*coros) log.info('fetch_texture_thumbs: Done downloading texture thumbnails') @@ -357,17 +350,14 @@ async def download_texture_thumbnail(texture_node, desired_size: str, texture_node['_id']) return - api = pillar_api() loop = asyncio.get_event_loop() - file_find = functools.partial(pillarsdk.File.find, params={ - 'projection': {'filename': 1, 'variations': 1, 'width': 1, 'height': 1}, - }, api=api) - # Find the File that belongs to this texture node pic_uuid = texture_node['picture'] loop.call_soon_threadsafe(thumbnail_loading, texture_node, texture_node) - file_desc = await loop.run_in_executor(None, file_find, pic_uuid) + file_desc = await pillar_call(pillarsdk.File.find, pic_uuid, params={ + 'projection': {'filename': 1, 'variations': 1, 'width': 1, 'height': 1}, + }) if file_desc is None: log.warning('Unable to find file for texture node %s', pic_uuid) @@ -399,7 +389,7 @@ async def download_file_by_uuid(file_uuid, target_directory: str, metadata_directory: str, *, - map_type: str=None, + map_type: str = None, file_loading: callable, file_loaded: callable, future: asyncio.Future): @@ -410,11 +400,9 @@ async def download_file_by_uuid(file_uuid, loop = asyncio.get_event_loop() # Find the File document. - api = pillar_api() - file_find = functools.partial(pillarsdk.File.find, params={ + file_desc = await pillar_call(pillarsdk.File.find, file_uuid, params={ 'projection': {'link': 1, 'filename': 1}, - }, api=api) - file_desc = await loop.run_in_executor(None, file_find, file_uuid) + }) # Save the file document to disk metadata_file = os.path.join(metadata_directory, 'files', '%s.json' % file_uuid) @@ -455,7 +443,7 @@ async def download_texture(texture_node, future=future) for file_info in texture_node['properties']['files']) - return await asyncio.gather(*downloaders) + return await asyncio.gather(*downloaders, return_exceptions=True) def is_cancelled(future: asyncio.Future) -> bool: