Fixed issue with multiple asyncio loops on Windows.

The biggest issue was the construction of an asyncio.Semaphore() while the
default loop is alive, and then creating a new loop on win32.

I've also taken the opportunity to explicitly pass our loop to some calls,
rather than expecting them to use the correct one automagically, and added
some more explicit timeout handling to the semaphore usage.
This commit is contained in:
Sybren A. Stüvel 2017-06-13 13:34:15 +02:00
parent ec5f317dac
commit 7b5613ce77
2 changed files with 24 additions and 5 deletions

View File

@ -44,6 +44,7 @@ def setup_asyncio_executor():
executor = concurrent.futures.ThreadPoolExecutor() executor = concurrent.futures.ThreadPoolExecutor()
if sys.platform == 'win32': if sys.platform == 'win32':
asyncio.get_event_loop().close()
# On Windows, the default event loop is SelectorEventLoop, which does # On Windows, the default event loop is SelectorEventLoop, which does
# not support subprocesses. ProactorEventLoop should be used instead. # not support subprocesses. ProactorEventLoop should be used instead.
# Source: https://docs.python.org/3/library/asyncio-subprocess.html # Source: https://docs.python.org/3/library/asyncio-subprocess.html
@ -54,6 +55,10 @@ def setup_asyncio_executor():
loop.set_default_executor(executor) loop.set_default_executor(executor)
# loop.set_debug(True) # loop.set_debug(True)
from . import pillar
# No more than this many Pillar calls should be made simultaneously
pillar.pillar_semaphore = asyncio.Semaphore(3, loop=loop)
def kick_async_loop(*args) -> bool: def kick_async_loop(*args) -> bool:
"""Performs a single iteration of the asyncio event loop. """Performs a single iteration of the asyncio event loop.

24
blender_cloud/pillar.py Normal file → Executable file
View File

@ -200,8 +200,11 @@ def pillar_api(pillar_endpoint: str = None, caching=True) -> pillarsdk.Api:
return _pillar_api[caching] return _pillar_api[caching]
# No more than this many Pillar calls should be made simultaneously # This is an asyncio.Semaphore object, which is late-instantiated to be sure
pillar_semaphore = asyncio.Semaphore(3) # the asyncio loop has been created properly. On Windows we create a new one,
# which can cause this semaphore to still be linked against the old default
# loop.
pillar_semaphore = None
async def pillar_call(pillar_func, *args, caching=True, **kwargs): async def pillar_call(pillar_func, *args, caching=True, **kwargs):
@ -214,8 +217,17 @@ async def pillar_call(pillar_func, *args, caching=True, **kwargs):
partial = functools.partial(pillar_func, *args, api=pillar_api(caching=caching), **kwargs) partial = functools.partial(pillar_func, *args, api=pillar_api(caching=caching), **kwargs)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
async with pillar_semaphore: # Use explicit calls to acquire() and release() so that we have more control over
# how long we wait and how we handle timeouts.
try:
asyncio.wait_for(pillar_semaphore.acquire(), timeout=60, loop=loop)
except asyncio.TimeoutError:
raise RuntimeError('Timeout waiting for Pillar Semaphore!')
try:
return await loop.run_in_executor(None, partial) return await loop.run_in_executor(None, partial)
finally:
pillar_semaphore.release()
def sync_call(pillar_func, *args, caching=True, **kwargs): def sync_call(pillar_func, *args, caching=True, **kwargs):
@ -534,7 +546,8 @@ async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
for texture_node in texture_nodes) for texture_node in texture_nodes)
# raises any exception from failed handle_texture_node() calls. # raises any exception from failed handle_texture_node() calls.
await asyncio.gather(*coros) loop = asyncio.get_event_loop()
await asyncio.gather(*coros, loop=loop)
log.info('fetch_texture_thumbs: Done downloading texture thumbnails') log.info('fetch_texture_thumbs: Done downloading texture thumbnails')
@ -747,7 +760,8 @@ async def download_texture(texture_node,
future=future) future=future)
downloaders.append(dlr) downloaders.append(dlr)
return await asyncio.gather(*downloaders, return_exceptions=True) loop = asyncio.get_event_loop()
return await asyncio.gather(*downloaders, return_exceptions=True, loop=loop)
async def upload_file(project_id: str, file_path: pathlib.Path, *, async def upload_file(project_id: str, file_path: pathlib.Path, *,