Explicitly support cancelling tasks.

The 'cancelled' status is now tracked by a Future that's passed to
different asychronous tasks. That way it is possible to cancel all
running tasks before browsing another Pillar node.
This commit is contained in:
2016-03-15 14:05:54 +01:00
parent 0174c28075
commit 5e237bea22
4 changed files with 174 additions and 56 deletions

View File

@@ -3,6 +3,9 @@ import sys
import os
import functools
import logging
from contextlib import closing
import requests
# Add our shipped Pillar SDK wheel to the Python path
if not any('pillar_sdk' in path for path in sys.path):
@@ -84,7 +87,8 @@ async def get_project_uuid(project_url: str) -> str:
return project['_id']
async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None) -> list:
async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None,
node_type: str = None) -> list:
"""Gets nodes for either a project or given a parent node.
@param project_uuid: the UUID of the project, or None if only querying by parent_node_uuid.
@@ -108,6 +112,9 @@ async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None) -> l
if project_uuid:
where['project'] = project_uuid
if node_type:
where['node_type'] = node_type
node_all = functools.partial(pillarsdk.Node.all, {
'projection': {'name': 1, 'parent': 1, 'node_type': 1,
'properties.order': 1, 'properties.status': 1,
@@ -121,24 +128,59 @@ async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None) -> l
return children['_items']
async def download_to_file(url, filename, chunk_size=10 * 1024):
async def download_to_file(url, filename, chunk_size=10 * 1024, *, future: asyncio.Future = None):
"""Downloads a file via HTTP(S) directly to the filesystem."""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, pillarsdk.utils.download_to_file, url, filename, chunk_size)
def perform_get_request():
return requests.get(url, stream=True, verify=True)
# Download the file in a different thread.
def download_loop():
with closing(req), open(filename, 'wb') as outfile:
for block in req.iter_content(chunk_size=chunk_size):
if is_cancelled(future):
raise asyncio.CancelledError('Downloading was cancelled')
outfile.write(block)
# Check for cancellation even before we start our GET request
if is_cancelled(future):
log.debug('Downloading was cancelled before doing the GET')
raise asyncio.CancelledError('Downloading was cancelled')
log.debug('Performing GET %s', url)
req = await loop.run_in_executor(None, perform_get_request)
log.debug('Done with GET %s', url)
# After we performed the GET request, we should check whether we should start
# the download at all.
if is_cancelled(future):
log.debug('Downloading was cancelled before downloading the GET response')
raise asyncio.CancelledError('Downloading was cancelled')
log.debug('Downloading response of GET %s', url)
await loop.run_in_executor(None, download_loop)
log.debug('Done downloading response of GET %s', url)
async def stream_thumb_to_file(file: pillarsdk.File, directory: str, desired_size: str):
async def stream_thumb_to_file(file: pillarsdk.File, directory: str, desired_size: str, *,
future: asyncio.Future = None):
"""Streams a thumbnail to a file.
@param file: the pillar File object that represents the image whose thumbnail to download.
@param directory: the directory to save the file to.
@param desired_size: thumbnail size
@return: the absolute path of the downloaded file.
@return: the absolute path of the downloaded file, or None if the task was cancelled before
downloading finished.
"""
api = pillar_api()
if is_cancelled(future):
log.debug('stream_thumb_to_file(): cancelled before fetching thumbnail URL from Pillar')
return None
loop = asyncio.get_event_loop()
thumb_link = await loop.run_in_executor(None, functools.partial(
file.thumbnail_file, desired_size, api=api))
@@ -147,11 +189,15 @@ async def stream_thumb_to_file(file: pillarsdk.File, directory: str, desired_siz
raise ValueError("File {} has no thumbnail of size {}"
.format(file['_id'], desired_size))
if is_cancelled(future):
log.debug('stream_thumb_to_file(): cancelled before downloading file')
return None
root, ext = os.path.splitext(file['file_path'])
thumb_fname = "{0}-{1}.jpg".format(root, desired_size)
thumb_path = os.path.abspath(os.path.join(directory, thumb_fname))
await download_to_file(thumb_link, thumb_path)
await download_to_file(thumb_link, thumb_path, future=future)
return thumb_path
@@ -160,7 +206,8 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
thumbnail_directory: str,
*,
thumbnail_loading: callable,
thumbnail_loaded: callable):
thumbnail_loaded: callable,
future: asyncio.Future = None):
"""Generator, fetches all texture thumbnails in a certain parent node.
@param parent_node_uuid: the UUID of the parent node. All sub-nodes will be downloaded.
@@ -171,6 +218,8 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
show a "downloading" indicator.
@param thumbnail_loaded: callback function that takes (node_id, pillarsdk.File object,
thumbnail path) parameters, which is called for every thumbnail after it's been downloaded.
@param future: Future that's inspected; if it is not None and cancelled, texture downloading
is aborted.
"""
api = pillar_api()
@@ -185,6 +234,11 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
if texture_node['node_type'] != 'texture':
return
if is_cancelled(future):
log.debug('fetch_texture_thumbs cancelled before finding File for texture %r',
texture_node['_id'])
return
# Find the File that belongs to this texture node
pic_uuid = texture_node['picture']
loop.call_soon_threadsafe(functools.partial(thumbnail_loading,
@@ -193,25 +247,59 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
file_desc = await loop.run_in_executor(None, file_find, pic_uuid)
if file_desc is None:
print('Unable to find file for texture node {}'.format(pic_uuid))
log.warning('Unable to find file for texture node %s', pic_uuid)
thumb_path = None
else:
if is_cancelled(future):
log.debug('fetch_texture_thumbs cancelled before downloading file %r',
file_desc['_id'])
return
# Save the thumbnail
thumb_path = await stream_thumb_to_file(file_desc, thumbnail_directory, desired_size)
# print('Texture node {} has file {}'.format(texture_node['_id'], thumb_path))
thumb_path = await stream_thumb_to_file(file_desc, thumbnail_directory, desired_size,
future=future)
if thumb_path is None:
# The task got cancelled, we should abort too.
log.debug('fetch_texture_thumbs cancelled while downloading file %r',
file_desc['_id'])
return
# print('Texture node {} has file {}'.format(texture_node['_id'], thumb_path))
loop.call_soon_threadsafe(functools.partial(thumbnail_loaded,
texture_node['_id'],
file_desc, thumb_path))
# Download all texture nodes in parallel.
texture_nodes = await get_nodes(parent_node_uuid=parent_node_uuid)
log.debug('Getting child nodes of node %r', parent_node_uuid)
texture_nodes = await get_nodes(parent_node_uuid=parent_node_uuid,
node_type='texture')
# raises any exception from failed handle_texture_node() calls.
await asyncio.gather(*(handle_texture_node(texture_node)
for texture_node in texture_nodes))
if is_cancelled(future):
log.warning('fetch_texture_thumbs: Texture downloading cancelled')
return
print('Done downloading texture thumbnails')
# We don't want to gather too much in parallel, as it will make cancelling take more time.
# This is caused by HTTP requests going out in parallel, and once the socket is open and
# the GET request is sent, we can't cancel until the server starts streaming the response.
chunk_size = 2
for i in range(0, len(texture_nodes), chunk_size):
chunk = texture_nodes[i:i + chunk_size]
log.debug('fetch_texture_thumbs: Gathering texture[%i:%i] for parent node %r',
i, i + chunk_size, parent_node_uuid)
coros = (handle_texture_node(texture_node)
for texture_node in chunk)
# raises any exception from failed handle_texture_node() calls.
await asyncio.gather(*coros)
log.info('fetch_texture_thumbs: Done downloading texture thumbnails')
def is_cancelled(future: asyncio.Future) -> bool:
log.debug('%s.cancelled() = %s', future, future.cancelled())
return future is not None and future.cancelled()
async def parent_node_uuid(node_uuid: str) -> str:
@@ -228,8 +316,6 @@ async def parent_node_uuid(node_uuid: str) -> str:
log.debug('Unable to find node %r, returning empty parent', node_uuid)
return ''
print('Found node {}'.format(node))
try:
return node['parent']
except KeyError:
return ''
parent_uuid = node.parent or ''
log.debug('Parent node of %r is %r', node_uuid, parent_uuid)
return parent_uuid