FBX IO: Speed up parsing by multithreading array decompression #104739

Merged
Thomas Barlow merged 7 commits from Mysteryem/blender-addons:fbx_parse_multithread_pr into main 2024-01-12 21:32:36 +01:00
Showing only changes of commit 0fdf09a75b - Show all commits

View File

@ -227,23 +227,23 @@ class _MultiThreadedArrayDecompressor:
# Decompress the array data, create the array and insert it into the FBX hierarchy. # Decompress the array data, create the array and insert it into the FBX hierarchy.
self._decompress_and_insert_array(*task_args) self._decompress_and_insert_array(*task_args)
finally: finally:
# Either the thread has been told to shut down because it received _STOP_THREADS or an exception has # Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has
# occurred. # occurred.
# Add _STOP_THREADS to the queue so that the other worker threads will also shut down. # Add _SHUT_DOWN_THREADS to the queue so that the other worker threads will also shut down.
self._shared_task_queue.put(self._SHUT_DOWN_THREADS) self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args): def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args):
"""Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays """Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays
are found. are found.
Note that this function must have the same signature as (or otherwise be compatible as a drop-in replacement Note that the signature of this function must be the same as, or otherwise be compatible with,
for) _decompress_and_insert_array, which is used when multithreading is not available. _decompress_and_insert_array, which is used instead of this function when multithreading is not available.
This function is a slight misuse of ThreadPoolExecutor, normally each task to be scheduled would be submitted This function is a slight misuse of ThreadPoolExecutor. Normally, each task to be scheduled would be submitted
through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks, through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks,
perhaps because they can be quite quick and there can be a lot of them. We could start new Thread instances perhaps because each task can be quick and there can be a lot of them. An alternative would be starting new
manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for waiting for Thread instances manually, but then we would have to implement our own functions that can wait for threads to
threads to finish and handling exceptions without having to implement an API using Thread ourselves.""" finish and handle exceptions."""
if self._shutting_down: if self._shutting_down:
# Shouldn't occur through normal usage. # Shouldn't occur through normal usage.
raise RuntimeError("Cannot schedule new tasks after shutdown") raise RuntimeError("Cannot schedule new tasks after shutdown")
@ -254,10 +254,9 @@ class _MultiThreadedArrayDecompressor:
# compared to the rate at which tasks are being consumed. # compared to the rate at which tasks are being consumed.
current_worker_count = len(self._worker_futures) current_worker_count = len(self._worker_futures)
if current_worker_count < self._max_workers: if current_worker_count < self._max_workers:
# The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's # Increasing the max queue size whenever a new thread is started gives some time for new threads to start up
# likely that the queue size will still be over the max, causing another new thread to be added immediately. # and begin consuming tasks from the queue before it's determined that another new thread is needed. This
# Increasing the max queue size whenever a new thread is started gives some time for the new thread to start # helps account for lots of compressed arrays being read in quick succession.
# up and begin consuming tasks before it's determined that another thread is needed.
max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count
if self._shared_task_queue.qsize() > max_queue_size_for_current_workers: if self._shared_task_queue.qsize() > max_queue_size_for_current_workers:
@ -269,8 +268,8 @@ class _MultiThreadedArrayDecompressor:
"""Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads """Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads
automatically start shutting down before the executor starts shutting down.""" automatically start shutting down before the executor starts shutting down."""
# .__enter__() # .__enter__()
# Exiting the context manager of the executor will wait for all threads to finish and prevent new # Exiting the context manager of the executor will wait for all threads to finish and prevent new threads from
# threads from being created, as if its shutdown() method had been called. # being created, as if its shutdown() method had been called.
with self._executor: with self._executor:
try: try:
yield self._schedule_array_decompression yield self._schedule_array_decompression
@ -278,9 +277,9 @@ class _MultiThreadedArrayDecompressor:
# .__exit__() # .__exit__()
self._shutting_down = True self._shutting_down = True
# Signal all worker threads to finish up and shut down so that the executor can shut down. # Signal all worker threads to finish up and shut down so that the executor can shut down.
# Because this is run on the main thread and because decompression tasks are only scheduled from # Because this is run on the main thread and because decompression tasks are only scheduled from the
# the main thread, it is guaranteed that no more decompression tasks will be scheduled after the # main thread, it is guaranteed that no more decompression tasks will be scheduled after the worker
# worker threads start to shut down. # threads start to shut down.
self._shared_task_queue.put(self._SHUT_DOWN_THREADS) self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
# Because `self._executor` was entered with a context manager, it will wait for all the worker threads # Because `self._executor` was entered with a context manager, it will wait for all the worker threads