diff --git a/io_scene_fbx/fbx_utils_threading.py b/io_scene_fbx/fbx_utils_threading.py new file mode 100644 index 000000000..bf7631b51 --- /dev/null +++ b/io_scene_fbx/fbx_utils_threading.py @@ -0,0 +1,194 @@ +# SPDX-FileCopyrightText: 2023 Blender Foundation +# +# SPDX-License-Identifier: GPL-2.0-or-later + +from contextlib import contextmanager, nullcontext +import os +from queue import SimpleQueue + +# Note: `bpy` cannot be imported here because this module is also used by the fbx2json.py and json2fbx.py scripts. + +# For debugging/profiling purposes, can be modified at runtime to force single-threaded execution. +_MULTITHREADING_ENABLED = True +# The concurrent.futures module may not work or may not be available on WebAssembly platforms wasm32-emscripten and +# wasm32-wasi. +try: + from concurrent.futures import ThreadPoolExecutor +except ModuleNotFoundError: + _MULTITHREADING_ENABLED = False + ThreadPoolExecutor = None +else: + try: + # The module may be available, but not be fully functional. An error may be raised when attempting to start a + # new thread. + with ThreadPoolExecutor() as tpe: + # Attempt to start a thread by submitting a callable. + tpe.submit(lambda: None) + except Exception: + # Assume that multithreading is not supported and fall back to single-threaded execution. + _MULTITHREADING_ENABLED = False + + +def get_cpu_count(): + """Get the number of cpus assigned to the current process if that information is available on this system. + If not available, get the total number of cpus. + If the cpu count is indeterminable, it is assumed that there is only 1 cpu available.""" + sched_getaffinity = getattr(os, "sched_getaffinity", None) + if sched_getaffinity is not None: + # Return the number of cpus assigned to the current process. + return len(sched_getaffinity(0)) + count = os.cpu_count() + return count if count is not None else 1 + + +class MultiThreadedTaskConsumer: + """Helper class that encapsulates everything needed to run a function on separate threads, with a single-threaded + fallback if multithreading is not available. + + Lower overhead than typical use of ThreadPoolExecutor because no Future objects are returned, which makes this class + more suitable to running many smaller tasks. + + As with any threaded parallelization, because of Python's Global Interpreter Lock, only one thread can execute + Python code at a time, so threaded parallelization is only useful when the functions used release the GIL, such as + many IO related functions.""" + # A special task value used to signal task consumer threads to shut down. + _SHUT_DOWN_THREADS = object() + + __slots__ = ("_consumer_function", "_shared_task_queue", "_task_consumer_futures", "_executor", + "_max_consumer_threads", "_shutting_down", "_max_queue_per_consumer") + + def __init__(self, consumer_function, max_consumer_threads, max_queue_per_consumer=5): + # It's recommended to use MultiThreadedTaskConsumer.new_cpu_bound_cm() instead of creating new instances + # directly. + # __init__ should only be called after checking _MULTITHREADING_ENABLED. + assert(_MULTITHREADING_ENABLED) + # The function that will be called on separate threads to consume tasks. + self._consumer_function = consumer_function + # All the threads share a single queue. This is a simplistic approach, but it is unlikely to be problematic + # unless the main thread is expected to wait a long time for the consumer threads to finish. + self._shared_task_queue = SimpleQueue() + # Reference to each thread is kept through the returned Future objects. This is used as part of determining when + # new threads should be started and is used to be able to receive and handle exceptions from the threads. + self._task_consumer_futures = [] + # Create the executor. + self._executor = ThreadPoolExecutor(max_workers=max_consumer_threads) + # Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private, + # meaning it could be changed without warning, we'll store the max workers/consumers ourselves. + self._max_consumer_threads = max_consumer_threads + # The maximum task queue size (before another consumer thread is started) increases by this amount with every + # additional consumer thread. + self._max_queue_per_consumer = max_queue_per_consumer + # When shutting down the threads, this is set to True as an extra safeguard to prevent new tasks being + # scheduled. + self._shutting_down = False + + @classmethod + def new_cpu_bound_cm(cls, consumer_function, other_cpu_bound_threads_in_use=1, hard_max_threads=32): + """Return a context manager that, when entered, returns a wrapper around `consumer_function` that schedules + `consumer_function` to be run on a separate thread. + + If the system can't use multithreading, then the context manager's returned function will instead be the input + `consumer_function` argument, causing tasks to be run immediately on the calling thread. + + When exiting the context manager, it waits for all scheduled tasks to complete and prevents the creation of new + tasks, similar to calling ThreadPoolExecutor.shutdown(). For these reasons, the wrapped function should only be + called from the thread that entered the context manager, otherwise there is no guarantee that all tasks will get + scheduled before the context manager exits. + + Any task that fails with an exception will cause all task consumer threads to stop. + + The maximum number of threads used matches the number of cpus available up to a maximum of `hard_max_threads`. + `hard_max_threads`'s default of 32 matches ThreadPoolExecutor's default behaviour. + + The maximum number of threads used is decreased by `other_cpu_bound_threads_in_use`. Defaulting to `1`, assuming + that the calling thread will also be doing CPU-bound work. + + Most IO-bound tasks can probably use a ThreadPoolExecutor directly instead because there will typically be fewer + tasks and, on average, each individual task will take longer. + If needed, `cls.new_cpu_bound_cm(consumer_function, -4)` could be suitable for lots of small IO-bound tasks, + because it ensures a minimum of 5 threads, like the default ThreadPoolExecutor.""" + if _MULTITHREADING_ENABLED: + max_threads = get_cpu_count() - other_cpu_bound_threads_in_use + max_threads = min(max_threads, hard_max_threads) + if max_threads > 0: + return cls(consumer_function, max_threads)._wrap_executor_cm() + # Fall back to single-threaded. + return nullcontext(consumer_function) + + def _task_consumer_callable(self): + """Callable that is run by each task consumer thread. + Signals the other task consumer threads to stop when stopped intentionally or when an exception occurs.""" + try: + while True: + # Blocks until it can get a task. + task_args = self._shared_task_queue.get() + + if task_args is self._SHUT_DOWN_THREADS: + # This special value signals that it's time for all the threads to stop. + break + else: + # Call the task consumer function. + self._consumer_function(*task_args) + finally: + # Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has + # occurred. + # Add _SHUT_DOWN_THREADS to the queue so that the other consumer threads will also shut down. + self._shared_task_queue.put(self._SHUT_DOWN_THREADS) + + def _schedule_task(self, *args): + """Task consumer threads are only started as tasks are added. + + To mitigate starting lots of threads if many tasks are scheduled in quick succession, new threads are only + started if the number of queued tasks grows too large. + + This function is a slight misuse of ThreadPoolExecutor. Normally each task to be scheduled would be submitted + through ThreadPoolExecutor.submit, but doing so is noticeably slower for small tasks. We could start new Thread + instances manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for + waiting for threads to finish and handling exceptions without having to implement an API using Thread ourselves. + """ + if self._shutting_down: + # Shouldn't occur through normal usage. + raise RuntimeError("Cannot schedule new tasks after shutdown") + # Schedule the task by adding it to the task queue. + self._shared_task_queue.put(args) + # Check if more consumer threads need to be added to account for the rate at which tasks are being scheduled + # compared to the rate at which tasks are being consumed. + current_consumer_count = len(self._task_consumer_futures) + if current_consumer_count < self._max_consumer_threads: + # The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's + # likely that the queue size will still be over the max, causing another new thread to be added immediately. + # Increasing the max queue size whenever a new thread is started gives some time for the new thread to start + # up and begin consuming tasks before it's determined that another thread is needed. + max_queue_size_for_current_consumers = self._max_queue_per_consumer * current_consumer_count + + if self._shared_task_queue.qsize() > max_queue_size_for_current_consumers: + # Add a new consumer thread because the queue has grown too large. + self._task_consumer_futures.append(self._executor.submit(self._task_consumer_callable)) + + @contextmanager + def _wrap_executor_cm(self): + """Wrap the executor's context manager to instead return self._schedule_task and such that the threads + automatically start shutting down before the executor itself starts shutting down.""" + # .__enter__() + # Exiting the context manager of the executor will wait for all threads to finish and prevent new + # threads from being created, as if its shutdown() method had been called. + with self._executor: + try: + yield self._schedule_task + finally: + # .__exit__() + self._shutting_down = True + # Signal all consumer threads to finish up and shut down so that the executor can shut down. + # When this is run on the same thread that schedules new tasks, this guarantees that no more tasks will + # be scheduled after the consumer threads start to shut down. + self._shared_task_queue.put(self._SHUT_DOWN_THREADS) + + # Because `self._executor` was entered with a context manager, it will wait for all the consumer threads + # to finish even if we propagate an exception from one of the threads here. + for future in self._task_consumer_futures: + # .exception() waits for the future to finish and returns its raised exception or None. + ex = future.exception() + if ex is not None: + # If one of the threads raised an exception, propagate it to the main thread. + # Only the first exception will be propagated if there were multiple. + raise ex