diff --git a/blender_cloud/__init__.py b/blender_cloud/__init__.py index 126f76d..4717cd4 100644 --- a/blender_cloud/__init__.py +++ b/blender_cloud/__init__.py @@ -149,6 +149,7 @@ def register(): logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(levelname)8s %(name)s %(message)s') + async_loop.setup_asyncio_executor() gui.register() diff --git a/blender_cloud/async_loop.py b/blender_cloud/async_loop.py index 9930478..438d375 100644 --- a/blender_cloud/async_loop.py +++ b/blender_cloud/async_loop.py @@ -2,6 +2,7 @@ import asyncio import traceback +import concurrent.futures import logging import bpy @@ -9,6 +10,20 @@ import bpy log = logging.getLogger(__name__) +def setup_asyncio_executor(): + """Sets up AsyncIO to run on a single thread. + + This ensures that only one Pillar HTTP call is performed at the same time. Other + calls that could be performed in parallel are queued, and thus we can + reliably cancel them. + """ + + executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + loop = asyncio.get_event_loop() + loop.set_default_executor(executor) + # loop.set_debug(True) + + def kick_async_loop(*args): loop = asyncio.get_event_loop() diff --git a/blender_cloud/gui.py b/blender_cloud/gui.py index a60e299..6f07d2e 100644 --- a/blender_cloud/gui.py +++ b/blender_cloud/gui.py @@ -65,6 +65,7 @@ class MenuItem: self.label_text = label_text self._thumb_path = '' self.icon = None + self._is_folder = file_desc is None and thumb_path == 'FOLDER' self.thumb_path = thumb_path @@ -93,7 +94,7 @@ class MenuItem: @property def is_folder(self) -> bool: - return self.file_desc is None + return self._is_folder def update_placement(self, x, y, width, height): """Use OpenGL to draw this one menu item.""" @@ -161,6 +162,7 @@ class BlenderCloudBrowser(bpy.types.Operator): project_uuid = '5672beecc0261b2005ed1a33' # Blender Cloud project UUID node_uuid = '' # Blender Cloud node UUID async_task = None # asyncio task for fetching thumbnails + signalling_future = None # asyncio future for signalling that we want to cancel everything. timer = None log = logging.getLogger('%s.BlenderCloudBrowser' % __name__) @@ -193,7 +195,7 @@ class BlenderCloudBrowser(bpy.types.Operator): self.current_display_content = [] self.loaded_images = set() - self.browse_assets(context) + self.browse_assets() context.window_manager.modal_handler_add(self) self.timer = context.window_manager.event_timer_add(1/30, context.window) @@ -219,7 +221,7 @@ class BlenderCloudBrowser(bpy.types.Operator): if selected.is_folder: self.node_uuid = selected.node_uuid - self.browse_assets(context) + self.browse_assets() else: self.handle_item_selection(selected) self._finish(context) @@ -232,14 +234,33 @@ class BlenderCloudBrowser(bpy.types.Operator): return {'RUNNING_MODAL'} def _stop_async_task(self): + self.log.debug('Stopping async task') if self.async_task is None: + self.log.debug('No async task, trivially stopped') return + # Signal that we want to stop. + if not self.signalling_future.done(): + self.log.info("Signalling that we want to cancel anything that's running.") + self.signalling_future.cancel() + + # Wait until the asynchronous task is done. if not self.async_task.done(): - print('Cancelling running async download task {}'.format(self.async_task)) - self.async_task.cancel() - else: + self.log.info("blocking until async task is done.") + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(self.async_task) + except asyncio.CancelledError: + self.log.info('Asynchronous task was cancelled') + return + + # noinspection PyBroadException + try: self.async_task.result() # This re-raises any exception of the task. + except asyncio.CancelledError: + self.log.info('Asynchronous task was cancelled') + except Exception: + self.log.exception("Exception from asynchronous task") def _finish(self, context): self.log.debug('Finishing the modal operator') @@ -283,74 +304,69 @@ class BlenderCloudBrowser(bpy.types.Operator): else: raise ValueError('Unable to find MenuItem(node_uuid=%r)' % node_uuid) - async def async_download_previews(self, context, thumbnails_directory): - # If we have a node UUID, we fetch the textures - # FIXME: support mixture of sub-nodes and textures under one node. + async def async_download_previews(self, thumbnails_directory): + self.log.info('Asynchronously downloading previews to %r', thumbnails_directory) self.clear_images() - def redraw(): - # region = context.region - # if region is None: - # print('Unable to redraw, region is %s' % region) - # print(' (context is %s)' % context) - # return - # region.tag_redraw() - pass - def thumbnail_loading(node_uuid, texture_node): self.add_menu_item(node_uuid, None, 'SPINNER', texture_node['name']) - redraw() def thumbnail_loaded(node_uuid, file_desc, thumb_path): - # update MenuItem added above self.update_menu_item(node_uuid, file_desc, thumb_path, file_desc['filename']) - redraw() if self.node_uuid: + self.log.debug('Getting subnodes for parent node %r', self.node_uuid) + children = await pillar.get_nodes(parent_node_uuid=self.node_uuid, + node_type='group_textures') + + self.log.debug('Finding parent of node %r', self.node_uuid) # Make sure we can go up again. parent_uuid = await pillar.parent_node_uuid(self.node_uuid) self.add_menu_item(parent_uuid, None, 'FOLDER', '.. up ..') + self.log.debug('Iterating over child nodes of %r', self.node_uuid) + for child in children: + # print(' - %(_id)s = %(name)s' % child) + self.add_menu_item(child['_id'], None, 'FOLDER', child['name']) + directory = os.path.join(thumbnails_directory, self.project_uuid, self.node_uuid) os.makedirs(directory, exist_ok=True) + self.log.debug('Fetching texture thumbnails for node %r', self.node_uuid) await pillar.fetch_texture_thumbs(self.node_uuid, 's', directory, thumbnail_loading=thumbnail_loading, - thumbnail_loaded=thumbnail_loaded) + thumbnail_loaded=thumbnail_loaded, + future=self.signalling_future) elif self.project_uuid: + self.log.debug('Getting subnodes for project node %r', self.project_uuid) children = await pillar.get_nodes(self.project_uuid, '') + self.log.debug('Iterating over child nodes of project %r', self.project_uuid) for child in children: - print(' - %(_id)s = %(name)s' % child) + # print(' - %(_id)s = %(name)s' % child) self.add_menu_item(child['_id'], None, 'FOLDER', child['name']) - redraw() else: # TODO: add "nothing here" icon and trigger re-draw - redraw() + self.log.warning("Not node UUID and no project UUID, I can't do anything!") + pass - # Call the 'done' callback. loop = asyncio.get_event_loop() loop.call_soon_threadsafe(self.downloading_done) - def browse_assets(self, context): + def browse_assets(self): + self.log.debug('Browsing assets at project %r node %r', self.project_uuid, self.node_uuid) self._stop_async_task() - self.clear_images() # Download the previews asynchronously. + self.signalling_future = asyncio.Future() self.async_task = asyncio.ensure_future( - self.async_download_previews(context, self.thumbnails_cache)) + self.async_download_previews(self.thumbnails_cache)) # Start the async manager so everything happens. async_loop.ensure_async_loop() def downloading_done(self): - # if not self.async_task.done(): - # print('%s: aborting download task' % self) - # self._stop_async_task() - # else: - # print('%s: downloading done' % self) - # self.async_task.result() - pass + self.log.info('Done downloading thumbnails.') def draw_menu(self, context): margin_x = 20 diff --git a/blender_cloud/pillar.py b/blender_cloud/pillar.py index 662c3e2..a0e0b9d 100644 --- a/blender_cloud/pillar.py +++ b/blender_cloud/pillar.py @@ -3,6 +3,9 @@ import sys import os import functools import logging +from contextlib import closing + +import requests # Add our shipped Pillar SDK wheel to the Python path if not any('pillar_sdk' in path for path in sys.path): @@ -84,7 +87,8 @@ async def get_project_uuid(project_url: str) -> str: return project['_id'] -async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None) -> list: +async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None, + node_type: str = None) -> list: """Gets nodes for either a project or given a parent node. @param project_uuid: the UUID of the project, or None if only querying by parent_node_uuid. @@ -108,6 +112,9 @@ async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None) -> l if project_uuid: where['project'] = project_uuid + if node_type: + where['node_type'] = node_type + node_all = functools.partial(pillarsdk.Node.all, { 'projection': {'name': 1, 'parent': 1, 'node_type': 1, 'properties.order': 1, 'properties.status': 1, @@ -121,24 +128,59 @@ async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None) -> l return children['_items'] -async def download_to_file(url, filename, chunk_size=10 * 1024): +async def download_to_file(url, filename, chunk_size=10 * 1024, *, future: asyncio.Future = None): """Downloads a file via HTTP(S) directly to the filesystem.""" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, pillarsdk.utils.download_to_file, url, filename, chunk_size) + + def perform_get_request(): + return requests.get(url, stream=True, verify=True) + + # Download the file in a different thread. + def download_loop(): + with closing(req), open(filename, 'wb') as outfile: + for block in req.iter_content(chunk_size=chunk_size): + if is_cancelled(future): + raise asyncio.CancelledError('Downloading was cancelled') + outfile.write(block) + + # Check for cancellation even before we start our GET request + if is_cancelled(future): + log.debug('Downloading was cancelled before doing the GET') + raise asyncio.CancelledError('Downloading was cancelled') + + log.debug('Performing GET %s', url) + req = await loop.run_in_executor(None, perform_get_request) + log.debug('Done with GET %s', url) + + # After we performed the GET request, we should check whether we should start + # the download at all. + if is_cancelled(future): + log.debug('Downloading was cancelled before downloading the GET response') + raise asyncio.CancelledError('Downloading was cancelled') + + log.debug('Downloading response of GET %s', url) + await loop.run_in_executor(None, download_loop) + log.debug('Done downloading response of GET %s', url) -async def stream_thumb_to_file(file: pillarsdk.File, directory: str, desired_size: str): +async def stream_thumb_to_file(file: pillarsdk.File, directory: str, desired_size: str, *, + future: asyncio.Future = None): """Streams a thumbnail to a file. @param file: the pillar File object that represents the image whose thumbnail to download. @param directory: the directory to save the file to. @param desired_size: thumbnail size - @return: the absolute path of the downloaded file. + @return: the absolute path of the downloaded file, or None if the task was cancelled before + downloading finished. """ api = pillar_api() + if is_cancelled(future): + log.debug('stream_thumb_to_file(): cancelled before fetching thumbnail URL from Pillar') + return None + loop = asyncio.get_event_loop() thumb_link = await loop.run_in_executor(None, functools.partial( file.thumbnail_file, desired_size, api=api)) @@ -147,11 +189,15 @@ async def stream_thumb_to_file(file: pillarsdk.File, directory: str, desired_siz raise ValueError("File {} has no thumbnail of size {}" .format(file['_id'], desired_size)) + if is_cancelled(future): + log.debug('stream_thumb_to_file(): cancelled before downloading file') + return None + root, ext = os.path.splitext(file['file_path']) thumb_fname = "{0}-{1}.jpg".format(root, desired_size) thumb_path = os.path.abspath(os.path.join(directory, thumb_fname)) - await download_to_file(thumb_link, thumb_path) + await download_to_file(thumb_link, thumb_path, future=future) return thumb_path @@ -160,7 +206,8 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str, thumbnail_directory: str, *, thumbnail_loading: callable, - thumbnail_loaded: callable): + thumbnail_loaded: callable, + future: asyncio.Future = None): """Generator, fetches all texture thumbnails in a certain parent node. @param parent_node_uuid: the UUID of the parent node. All sub-nodes will be downloaded. @@ -171,6 +218,8 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str, show a "downloading" indicator. @param thumbnail_loaded: callback function that takes (node_id, pillarsdk.File object, thumbnail path) parameters, which is called for every thumbnail after it's been downloaded. + @param future: Future that's inspected; if it is not None and cancelled, texture downloading + is aborted. """ api = pillar_api() @@ -185,6 +234,11 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str, if texture_node['node_type'] != 'texture': return + if is_cancelled(future): + log.debug('fetch_texture_thumbs cancelled before finding File for texture %r', + texture_node['_id']) + return + # Find the File that belongs to this texture node pic_uuid = texture_node['picture'] loop.call_soon_threadsafe(functools.partial(thumbnail_loading, @@ -193,25 +247,59 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str, file_desc = await loop.run_in_executor(None, file_find, pic_uuid) if file_desc is None: - print('Unable to find file for texture node {}'.format(pic_uuid)) + log.warning('Unable to find file for texture node %s', pic_uuid) thumb_path = None else: + if is_cancelled(future): + log.debug('fetch_texture_thumbs cancelled before downloading file %r', + file_desc['_id']) + return + # Save the thumbnail - thumb_path = await stream_thumb_to_file(file_desc, thumbnail_directory, desired_size) - # print('Texture node {} has file {}'.format(texture_node['_id'], thumb_path)) + thumb_path = await stream_thumb_to_file(file_desc, thumbnail_directory, desired_size, + future=future) + if thumb_path is None: + # The task got cancelled, we should abort too. + log.debug('fetch_texture_thumbs cancelled while downloading file %r', + file_desc['_id']) + return + + # print('Texture node {} has file {}'.format(texture_node['_id'], thumb_path)) loop.call_soon_threadsafe(functools.partial(thumbnail_loaded, texture_node['_id'], file_desc, thumb_path)) # Download all texture nodes in parallel. - texture_nodes = await get_nodes(parent_node_uuid=parent_node_uuid) + log.debug('Getting child nodes of node %r', parent_node_uuid) + texture_nodes = await get_nodes(parent_node_uuid=parent_node_uuid, + node_type='texture') - # raises any exception from failed handle_texture_node() calls. - await asyncio.gather(*(handle_texture_node(texture_node) - for texture_node in texture_nodes)) + if is_cancelled(future): + log.warning('fetch_texture_thumbs: Texture downloading cancelled') + return - print('Done downloading texture thumbnails') + # We don't want to gather too much in parallel, as it will make cancelling take more time. + # This is caused by HTTP requests going out in parallel, and once the socket is open and + # the GET request is sent, we can't cancel until the server starts streaming the response. + chunk_size = 2 + for i in range(0, len(texture_nodes), chunk_size): + chunk = texture_nodes[i:i + chunk_size] + + log.debug('fetch_texture_thumbs: Gathering texture[%i:%i] for parent node %r', + i, i + chunk_size, parent_node_uuid) + coros = (handle_texture_node(texture_node) + for texture_node in chunk) + + # raises any exception from failed handle_texture_node() calls. + await asyncio.gather(*coros) + + log.info('fetch_texture_thumbs: Done downloading texture thumbnails') + + +def is_cancelled(future: asyncio.Future) -> bool: + log.debug('%s.cancelled() = %s', future, future.cancelled()) + return future is not None and future.cancelled() async def parent_node_uuid(node_uuid: str) -> str: @@ -228,8 +316,6 @@ async def parent_node_uuid(node_uuid: str) -> str: log.debug('Unable to find node %r, returning empty parent', node_uuid) return '' - print('Found node {}'.format(node)) - try: - return node['parent'] - except KeyError: - return '' + parent_uuid = node.parent or '' + log.debug('Parent node of %r is %r', node_uuid, parent_uuid) + return parent_uuid