FBX IO: Add utility to schedule tasks to run on separate threads #105017
@ -8,8 +8,11 @@ import time
|
||||
|
||||
from collections import namedtuple
|
||||
from collections.abc import Iterable
|
||||
from contextlib import contextmanager, nullcontext
|
||||
from itertools import zip_longest, chain
|
||||
from dataclasses import dataclass, field
|
||||
import os
|
||||
from queue import SimpleQueue
|
||||
from typing import Callable
|
||||
import numpy as np
|
||||
|
||||
@ -787,6 +790,185 @@ MESH_ATTRIBUTE_CORNER_EDGE = AttributeDescription(".corner_edge", 'INT', 'CORNER
|
||||
MESH_ATTRIBUTE_SHARP_FACE = AttributeDescription("sharp_face", 'BOOLEAN', 'FACE')
|
||||
|
||||
|
||||
# ##### Multithreading utils. #####
|
||||
|
||||
# For debugging/profiling purposes, can be modified at runtime to force single-threaded execution.
|
||||
_MULTITHREADING_ENABLED = True
|
||||
try:
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
except ModuleNotFoundError:
|
||||
# The concurrent.futures module does not work or is not available on WebAssembly platforms wasm32-emscripten
|
||||
# and wasm32-wasi.
|
||||
# wasm32-emscripten raises ModuleNotFoundError, not sure about wasm32-wasi.
|
||||
_MULTITHREADING_ENABLED = False
|
||||
ThreadPoolExecutor = None
|
||||
|
||||
|
||||
def get_cpu_count():
|
||||
"""Get the number of cpus assigned to the current process if that information is available on this system.
|
||||
If not available, get the total number of cpus.
|
||||
If the cpu count is indeterminable, it is assumed that there is only 1 cpu available."""
|
||||
sched_getaffinity = getattr(os, "sched_getaffinity", None)
|
||||
if sched_getaffinity is not None:
|
||||
# Return the number of cpus assigned to the current process.
|
||||
return len(sched_getaffinity(0))
|
||||
count = os.cpu_count()
|
||||
return count if count is not None else 1
|
||||
|
||||
|
||||
class MultiThreadedTaskConsumer:
|
||||
"""Helper class that encapsulates everything needed to run a function on separate threads, with a single-threaded
|
||||
fallback if multithreading is not available.
|
||||
|
||||
Lower overhead than typical use of ThreadPoolExecutor because no Future objects are returned, which makes this class
|
||||
more suitable to running many smaller tasks.
|
||||
|
||||
As with any threaded parallelization, because of Python's Global Interpreter Lock, only one thread can execute
|
||||
Python code at a time, so threaded parallelization is only useful when the functions used release the GIL, such as
|
||||
many IO related functions."""
|
||||
# A special task value used to signal task consumer threads to shut down.
|
||||
_SHUT_DOWN_THREADS = object()
|
||||
|
||||
__slots__ = ("_consumer_function", "_shared_task_queue", "_task_consumer_futures", "_executor",
|
||||
"_max_consumer_threads", "_shutting_down", "_max_queue_per_consumer")
|
||||
|
||||
def __init__(self, consumer_function, max_consumer_threads, max_queue_per_consumer=5):
|
||||
# It's recommended to use MultiThreadedTaskConsumer.new_cpu_bound_cm() instead of creating new instances
|
||||
# directly.
|
||||
# __init__ should only be called after checking _MULTITHREADING_ENABLED.
|
||||
assert(_MULTITHREADING_ENABLED)
|
||||
# The function that will be called on separate threads to consume tasks.
|
||||
self._consumer_function = consumer_function
|
||||
# All the threads share a single queue. This is a simplistic approach, but it is unlikely to be problematic
|
||||
# unless the main thread is expected to wait a long time for the consumer threads to finish.
|
||||
self._shared_task_queue = SimpleQueue()
|
||||
# Reference to each thread is kept through the returned Future objects. This is used as part of determining when
|
||||
# new threads should be started and is used to be able to receive and handle exceptions from the threads.
|
||||
self._task_consumer_futures = []
|
||||
# Create the executor.
|
||||
self._executor = ThreadPoolExecutor(max_workers=max_consumer_threads)
|
||||
# Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private,
|
||||
# meaning it could be changed without warning, we'll store the max workers/consumers ourselves.
|
||||
self._max_consumer_threads = max_consumer_threads
|
||||
# The maximum task queue size (before another consumer thread is started) increases by this amount with every
|
||||
# additional consumer thread.
|
||||
self._max_queue_per_consumer = max_queue_per_consumer
|
||||
# When shutting down the threads, this is set to True as an extra safeguard to prevent new tasks being
|
||||
# scheduled.
|
||||
self._shutting_down = False
|
||||
|
||||
@classmethod
|
||||
def new_cpu_bound_cm(cls, consumer_function, other_cpu_bound_threads_in_use=1, hard_max_threads=32):
|
||||
"""Return a context manager that, when entered, returns a wrapper around `consumer_function` that schedules
|
||||
`consumer_function` to be run on a separate thread.
|
||||
|
||||
If the system can't use multithreading, then the context manager's returned function will instead be the input
|
||||
`consumer_function` argument, causing tasks to be run immediately on the calling thread.
|
||||
|
||||
When exiting the context manager, it waits for all scheduled tasks to complete and prevents the creation of new
|
||||
tasks, similar to calling ThreadPoolExecutor.shutdown(). For these reasons, the wrapped function should only be
|
||||
called from the thread that entered the context manager, otherwise there is no guarantee that all tasks will get
|
||||
scheduled before the context manager exits.
|
||||
|
||||
Any task that fails with an exception will cause all task consumer threads to stop.
|
||||
|
||||
The maximum number of threads used matches the number of cpus available up to a maximum of `hard_max_threads`.
|
||||
`hard_max_threads`'s default of 32 matches ThreadPoolExecutor's default behaviour.
|
||||
|
||||
The maximum number of threads used is decreased by `other_cpu_bound_threads_in_use`. Defaulting to `1`, assuming
|
||||
that the calling thread will also be doing CPU-bound work.
|
||||
|
||||
Most IO-bound tasks can probably use a ThreadPoolExecutor directly instead because there will typically be fewer
|
||||
tasks and, on average, each individual task will take longer.
|
||||
If needed, `cls.new_cpu_bound_cm(consumer_function, -4)` could be suitable for lots of small IO-bound tasks,
|
||||
because it ensures a minimum of 5 threads, like the default ThreadPoolExecutor."""
|
||||
if _MULTITHREADING_ENABLED:
|
||||
max_threads = get_cpu_count() - other_cpu_bound_threads_in_use
|
||||
max_threads = min(max_threads, hard_max_threads)
|
||||
if max_threads > 0:
|
||||
return cls(consumer_function, max_threads)._wrap_executor_cm()
|
||||
# Fall back to single-threaded.
|
||||
return nullcontext(consumer_function)
|
||||
|
||||
def _task_consumer_callable(self):
|
||||
"""Callable that is run by each task consumer thread.
|
||||
Signals the other task consumer threads to stop when stopped intentionally or when an exception occurs."""
|
||||
try:
|
||||
while True:
|
||||
# Blocks until it can get a task.
|
||||
task_args = self._shared_task_queue.get()
|
||||
|
||||
if task_args is self._SHUT_DOWN_THREADS:
|
||||
# This special value signals that it's time for all the threads to stop.
|
||||
break
|
||||
else:
|
||||
# Call the task consumer function.
|
||||
self._consumer_function(*task_args)
|
||||
finally:
|
||||
# Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has
|
||||
# occurred.
|
||||
# Add _SHUT_DOWN_THREADS to the queue so that the other consumer threads will also shut down.
|
||||
self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
|
||||
|
||||
def _schedule_task(self, *args):
|
||||
"""Task consumer threads are only started as tasks are added.
|
||||
|
||||
To mitigate starting lots of threads if many tasks are scheduled in quick succession, new threads are only
|
||||
started if the number of queued tasks grows too large.
|
||||
|
||||
This function is a slight misuse of ThreadPoolExecutor. Normally each task to be scheduled would be submitted
|
||||
through ThreadPoolExecutor.submit, but doing so is noticeably slower for small tasks. We could start new Thread
|
||||
instances manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for
|
||||
waiting for threads to finish and handling exceptions without having to implement an API using Thread ourselves.
|
||||
"""
|
||||
if self._shutting_down:
|
||||
# Shouldn't occur through normal usage.
|
||||
raise RuntimeError("Cannot schedule new tasks after shutdown")
|
||||
# Schedule the task by adding it to the task queue.
|
||||
self._shared_task_queue.put(args)
|
||||
# Check if more consumer threads need to be added to account for the rate at which tasks are being scheduled
|
||||
# compared to the rate at which tasks are being consumed.
|
||||
current_consumer_count = len(self._task_consumer_futures)
|
||||
if current_consumer_count < self._max_consumer_threads:
|
||||
# The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's
|
||||
# likely that the queue size will still be over the max, causing another new thread to be added immediately.
|
||||
# Increasing the max queue size whenever a new thread is started gives some time for the new thread to start
|
||||
# up and begin consuming tasks before it's determined that another thread is needed.
|
||||
max_queue_size_for_current_consumers = self._max_queue_per_consumer * current_consumer_count
|
||||
|
||||
if self._shared_task_queue.qsize() > max_queue_size_for_current_consumers:
|
||||
# Add a new consumer thread because the queue has grown too large.
|
||||
self._task_consumer_futures.append(self._executor.submit(self._task_consumer_callable))
|
||||
|
||||
@contextmanager
|
||||
def _wrap_executor_cm(self):
|
||||
"""Wrap the executor's context manager to instead return self._schedule_task and such that the threads
|
||||
automatically start shutting down before the executor itself starts shutting down."""
|
||||
# .__enter__()
|
||||
# Exiting the context manager of the executor will wait for all threads to finish and prevent new
|
||||
# threads from being created, as if its shutdown() method had been called.
|
||||
with self._executor:
|
||||
try:
|
||||
yield self._schedule_task
|
||||
finally:
|
||||
# .__exit__()
|
||||
self._shutting_down = True
|
||||
# Signal all consumer threads to finish up and shut down so that the executor can shut down.
|
||||
# When this is run on the same thread that schedules new tasks, this guarantees that no more tasks will
|
||||
# be scheduled after the consumer threads start to shut down.
|
||||
self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
|
||||
|
||||
# Because `self._executor` was entered with a context manager, it will wait for all the consumer threads
|
||||
# to finish even if we propagate an exception from one of the threads here.
|
||||
for future in self._task_consumer_futures:
|
||||
# .exception() waits for the future to finish and returns its raised exception or None.
|
||||
ex = future.exception()
|
||||
if ex is not None:
|
||||
# If one of the threads raised an exception, propagate it to the main thread.
|
||||
# Only the first exception will be propagated if there were multiple.
|
||||
raise ex
|
||||
|
||||
|
||||
# ##### UIDs code. #####
|
||||
|
||||
# ID class (mere int).
|
||||
|
Loading…
Reference in New Issue
Block a user