From 00c58e0d917c9143b1b7fb21180bd48283817ef4 Mon Sep 17 00:00:00 2001 From: Thomas Barlow Date: Sun, 2 Jul 2023 23:53:58 +0100 Subject: [PATCH 1/5] FBX Parsing: Multithread array decompression Because `zlib.decompress` releases the GIL, the arrays are now decompressed on separate threads. Given enough logical CPUs on the current system, decompressing arrays and parsing the rest of the file is now done simultaneously. All the functions for managing the multithreading have been encapsulated in a helper class that gets exposed through a context manager. If the current platform does not support multithreading (wasm32-emscripten/wasm32-wasi), then the code falls back to being single-threaded. Aside from .fbx files without any compressed arrays, array decompression usually takes just under 50% of the parsing duration on average, though commonly varies between 40% to 60% depending on the contents of the file. I was only able to get an average of a 35% reduction in parsing duration because the main thread ends up reading from the file more often and appears to end up spending more time waiting for IO than before. Though this is likely to vary depending on the file system that is being read from and the time taken to read from IO is expected to be longer in real use cases because the file being read won't have been accessed recently. For the smallest files, e.g. a single cube mesh, this can be slightly slower because starting a new thread takes more time than is gained by starting that thread. Because the main thread spends some time waiting on IO, even systems with a single CPU can see a small speedup from this patch. I get about a 6% reduction in parsing duration in this case. Parsing fbx files takes about 16% of the total import duration on average, so the overall import duration would be expected to reduce by about 5.6% on average. However, from timing imports before and after this patch, I get an actual average reduction of 3.5%. --- io_scene_fbx/parse_fbx.py | 250 ++++++++++++++++++++++++++++++++++---- 1 file changed, 229 insertions(+), 21 deletions(-) diff --git a/io_scene_fbx/parse_fbx.py b/io_scene_fbx/parse_fbx.py index 6639be1da..1bd95131c 100644 --- a/io_scene_fbx/parse_fbx.py +++ b/io_scene_fbx/parse_fbx.py @@ -14,6 +14,7 @@ from struct import unpack import array import zlib from io import BytesIO +from contextlib import contextmanager, nullcontext from . import data_types @@ -27,6 +28,15 @@ _HEAD_MAGIC = b'Kaydara FBX Binary\x20\x20\x00\x1a\x00' from collections import namedtuple FBXElem = namedtuple("FBXElem", ("id", "props", "props_type", "elems")) del namedtuple +# The maximum number of threads that can be started when decompressing arrays is dynamic and based on the number of +# CPUs, but has a hard max to limit resource costs. This hard max matches ThreadPoolExecutor's default behaviour. +HARD_MAX_ARRAY_DECOMPRESSION_THREADS = 32 +# The maximum size of the task queue, per array decompression thread, before another thread is started. This magic +# has been determined experimentally to usually keep the queue quite small without starting an excessive number of +# threads when the file being parsed is small or when many tasks are added in quick succession. This tends to start more +# threads than would be most optimal, but ensures that the queue is close to empty by the time the main thread finishes +# parsing, so there will be little to no waiting for decompression tasks to finish. +MAX_QUEUE_PER_DECOMPRESSION_THREAD = 5 def read_uint(read): @@ -59,16 +69,10 @@ def read_elem_start64(read): return end_offset, prop_count, elem_id -def unpack_array(read, array_type, array_stride, array_byteswap): - length, encoding, comp_len = read_array_params(read) - - data = read(comp_len) - - if encoding == 0: - pass - elif encoding == 1: - data = zlib.decompress(data) - +def _create_array(data, length, array_type, array_stride, array_byteswap): + """Create an array from FBX data.""" + # If size of the data does not match the expected size of the array, then something is wrong with the code or the + # FBX file. assert(length * array_stride == len(data)) data_array = array.array(array_type, data) @@ -77,6 +81,35 @@ def unpack_array(read, array_type, array_stride, array_byteswap): return data_array +def unpack_array(read, array_type, array_stride, array_byteswap): + """Unpack an array from an FBX file being parsed. + + If the array data is compressed, the compressed data is combined with the other arguments into a tuple to prepare + for decompressing on a separate thread if possible. + + If the array data is not compressed, the array is created. + + Returns (tuple, True) or (array, False).""" + length, encoding, comp_len = read_array_params(read) + + data = read(comp_len) + + if encoding == 1: + # Array data requires decompression, which is done in a separate thread if possible. + return (data, length, array_type, array_stride, array_byteswap), True + else: + return _create_array(data, length, array_type, array_stride, array_byteswap), False + + +read_array_dict = { + b'b'[0]: lambda read: unpack_array(read, data_types.ARRAY_BOOL, 1, False), # bool + b'c'[0]: lambda read: unpack_array(read, data_types.ARRAY_BYTE, 1, False), # ubyte + b'i'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT32, 4, True), # int + b'l'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT64, 8, True), # long + b'f'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT32, 4, False), # float + b'd'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT64, 8, False), # double +} + read_data_dict = { b'Z'[0]: lambda read: unpack(b' 0: + return cls(thread_pool_executor_cls, max_threads)._wrap_executor_cm() + else: + # Fall back to single-threaded. + return nullcontext(cls._decompress_and_insert_array) + + @staticmethod + def _get_max_threads(): + """Decompressing arrays is entirely CPU work that releases the GIL, so there shouldn't be any benefit in using + more threads than there are CPUs. + + The current (main) thread is not counted when considering the maximum number of threads because it can spend + some of its time waiting for File IO, so even a system with only a single CPU can see a benefit from having a + separate thread for decompressing arrays.""" + import os + + # os.sched_getaffinity(0) gets the set of CPUs available to the current process, but is only available on some + # Unix platforms. + sched_getaffinity = getattr(os, "sched_getaffinity", None) + if sched_getaffinity is not None: + max_threads = len(sched_getaffinity(0)) + else: + # Without sched_getaffinity being available, assume all CPUs are available to the current process. + max_threads = os.cpu_count() or 1 # assume 1 if cpu_count is indeterminable + + # Cap the maximum number of threads to limit resource costs. + return min(HARD_MAX_ARRAY_DECOMPRESSION_THREADS, max_threads) + + @staticmethod + def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args): + """Decompress array data and insert the created array into the FBX tree being parsed. + + This is usually called from a separate thread to the main thread.""" + compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args + + # zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the + # decompression to complete. + data = zlib.decompress(compressed_data) + + # Create and insert the array into the parsed FBX hierarchy. + elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap) + + def _worker_callable(self): + """Callable that is run by each worker thread. + Signals the other worker threads to stop when stopped intentionally or when an exception occurs.""" + try: + while True: + # Blocks until it can get a task. + task_args = self._shared_task_queue.get() + + if task_args is self._SHUT_DOWN_THREADS: + # This special value signals that it's time for all the threads to stop. + break + else: + # Decompress the array data, create the array and insert it into the FBX hierarchy. + self._decompress_and_insert_array(*task_args) + finally: + # Either the thread has been told to shut down because it received _STOP_THREADS or an exception has + # occurred. + # Add _STOP_THREADS to the queue so that the other worker threads will also shut down. + self._shared_task_queue.put(self._SHUT_DOWN_THREADS) + + def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args): + """Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays + are found. + + Note that this function must have the same signature as (or otherwise be compatible as a drop-in replacement + for) _decompress_and_insert_array, which is used when multithreading is not available. + + This function is a slight misuse of ThreadPoolExecutor, normally each task to be scheduled would be submitted + through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks, + perhaps because they can be quite quick and there can be a lot of them. We could start new Thread instances + manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for waiting for + threads to finish and handling exceptions without having to implement an API using Thread ourselves.""" + if self._shutting_down: + # Shouldn't occur through normal usage. + raise RuntimeError("Cannot schedule new tasks after shutdown") + # Schedule the task by adding it to the task queue. + self._shared_task_queue.put((elem_props_data, index_to_set, compressed_array_args)) + + # Check if more worker threads need to be added to account for the rate at which tasks are being scheduled + # compared to the rate at which tasks are being consumed. + current_worker_count = len(self._worker_futures) + if current_worker_count < self._max_workers: + # The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's + # likely that the queue size will still be over the max, causing another new thread to be added immediately. + # Increasing the max queue size whenever a new thread is started gives some time for the new thread to start + # up and begin consuming tasks before it's determined that another thread is needed. + max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count + + if self._shared_task_queue.qsize() > max_queue_size_for_current_workers: + # Add a new worker thread because the queue has grown too large. + self._worker_futures.append(self._executor.submit(self._worker_callable)) + + @contextmanager + def _wrap_executor_cm(self): + """Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads + automatically start shutting down before the executor starts shutting down.""" + # .__enter__() + # Exiting the context manager of the executor will wait for all threads to finish and prevent new + # threads from being created, as if its shutdown() method had been called. + with self._executor: + try: + yield self._schedule_array_decompression + finally: + # .__exit__() + self._shutting_down = True + # Signal all worker threads to finish up and shut down so that the executor can shut down. + # Because this is run on the main thread and because decompression tasks are only scheduled from + # the main thread, it is guaranteed that no more decompression tasks will be scheduled after the + # worker threads start to shut down. + self._shared_task_queue.put(self._SHUT_DOWN_THREADS) + + # Because `self._executor` was entered with a context manager, it will wait for all the worker threads + # to finish even if an exception is propagated from one of the threads. + for future in self._worker_futures: + # .exception() waits for the future to finish and returns its raised exception or None. + ex = future.exception() + if ex is not None: + # If one of the threads raised an exception, propagate it to the main thread. + # Only the first exception will be propagated if there were multiple. + raise ex + + # FBX 7500 (aka FBX2016) introduces incompatible changes at binary level: # * The NULL block marking end of nested stuff switches from 13 bytes long to 25 bytes long. # * The FBX element metadata (end_offset, prop_count and prop_length) switch from uint32 to uint64. @@ -114,7 +312,7 @@ def init_version(fbx_version): _BLOCK_SENTINEL_DATA = (b'\0' * _BLOCK_SENTINEL_LENGTH) -def read_elem(read, tell, use_namedtuple, tell_file_offset=0): +def read_elem(read, tell, use_namedtuple, decompress_array_func, tell_file_offset=0): # [0] the offset at which this block ends # [1] the number of properties in the scope # [2] the length of the property list @@ -132,7 +330,17 @@ def read_elem(read, tell, use_namedtuple, tell_file_offset=0): for i in range(prop_count): data_type = read(1)[0] - elem_props_data[i] = read_data_dict[data_type](read) + if data_type in read_array_dict: + val, needs_decompression = read_array_dict[data_type](read) + if needs_decompression: + # Array decompression releases the GIL, so can be multithreaded (if possible on the current system) for + # performance. + # After decompressing, the array is inserted into elem_props_data[i]. + decompress_array_func(elem_props_data, i, val) + else: + elem_props_data[i] = val + else: + elem_props_data[i] = read_data_dict[data_type](read) elem_props_type[i] = data_type pos = tell() @@ -175,7 +383,7 @@ def read_elem(read, tell, use_namedtuple, tell_file_offset=0): sub_pos = start_sub_pos while sub_pos < sub_tree_end: - elem_subtree.append(read_elem(read, tell, use_namedtuple, tell_file_offset)) + elem_subtree.append(read_elem(read, tell, use_namedtuple, decompress_array_func, tell_file_offset)) sub_pos = tell() # At the end of each subtree there should be a sentinel (an empty element with all bytes set to zero). @@ -210,7 +418,7 @@ def parse_version(fn): def parse(fn, use_namedtuple=True): root_elems = [] - with open(fn, 'rb') as f: + with open(fn, 'rb') as f, _MultiThreadedArrayDecompressor.new_cm() as decompress_array_func: read = f.read tell = f.tell @@ -221,7 +429,7 @@ def parse(fn, use_namedtuple=True): init_version(fbx_version) while True: - elem = read_elem(read, tell, use_namedtuple) + elem = read_elem(read, tell, use_namedtuple, decompress_array_func) if elem is None: break root_elems.append(elem) -- 2.30.2 From 93b00cdb76fa56b4ad4b334009c92d8172d991c9 Mon Sep 17 00:00:00 2001 From: Thomas Barlow Date: Sat, 2 Sep 2023 12:43:08 +0100 Subject: [PATCH 2/5] Minor speedup to array decompression Since the expected decompressed size is known ahead of time, the output buffer can be created at the full size in advance, reducing the number of memory allocations while decompressing. --- io_scene_fbx/parse_fbx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io_scene_fbx/parse_fbx.py b/io_scene_fbx/parse_fbx.py index 1bd95131c..95ecc2d39 100644 --- a/io_scene_fbx/parse_fbx.py +++ b/io_scene_fbx/parse_fbx.py @@ -207,7 +207,7 @@ class _MultiThreadedArrayDecompressor: # zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the # decompression to complete. - data = zlib.decompress(compressed_data) + data = zlib.decompress(compressed_data, bufsize=length * array_stride) # Create and insert the array into the parsed FBX hierarchy. elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap) -- 2.30.2 From 0fdf09a75b86e04facb3e4024b962a87ca00b19c Mon Sep 17 00:00:00 2001 From: Thomas Barlow Date: Mon, 18 Sep 2023 00:13:11 +0100 Subject: [PATCH 3/5] Comment updates --- io_scene_fbx/parse_fbx.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/io_scene_fbx/parse_fbx.py b/io_scene_fbx/parse_fbx.py index 95ecc2d39..10b94bddb 100644 --- a/io_scene_fbx/parse_fbx.py +++ b/io_scene_fbx/parse_fbx.py @@ -227,23 +227,23 @@ class _MultiThreadedArrayDecompressor: # Decompress the array data, create the array and insert it into the FBX hierarchy. self._decompress_and_insert_array(*task_args) finally: - # Either the thread has been told to shut down because it received _STOP_THREADS or an exception has + # Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has # occurred. - # Add _STOP_THREADS to the queue so that the other worker threads will also shut down. + # Add _SHUT_DOWN_THREADS to the queue so that the other worker threads will also shut down. self._shared_task_queue.put(self._SHUT_DOWN_THREADS) def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args): """Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays are found. - Note that this function must have the same signature as (or otherwise be compatible as a drop-in replacement - for) _decompress_and_insert_array, which is used when multithreading is not available. + Note that the signature of this function must be the same as, or otherwise be compatible with, + _decompress_and_insert_array, which is used instead of this function when multithreading is not available. - This function is a slight misuse of ThreadPoolExecutor, normally each task to be scheduled would be submitted + This function is a slight misuse of ThreadPoolExecutor. Normally, each task to be scheduled would be submitted through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks, - perhaps because they can be quite quick and there can be a lot of them. We could start new Thread instances - manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for waiting for - threads to finish and handling exceptions without having to implement an API using Thread ourselves.""" + perhaps because each task can be quick and there can be a lot of them. An alternative would be starting new + Thread instances manually, but then we would have to implement our own functions that can wait for threads to + finish and handle exceptions.""" if self._shutting_down: # Shouldn't occur through normal usage. raise RuntimeError("Cannot schedule new tasks after shutdown") @@ -254,10 +254,9 @@ class _MultiThreadedArrayDecompressor: # compared to the rate at which tasks are being consumed. current_worker_count = len(self._worker_futures) if current_worker_count < self._max_workers: - # The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's - # likely that the queue size will still be over the max, causing another new thread to be added immediately. - # Increasing the max queue size whenever a new thread is started gives some time for the new thread to start - # up and begin consuming tasks before it's determined that another thread is needed. + # Increasing the max queue size whenever a new thread is started gives some time for new threads to start up + # and begin consuming tasks from the queue before it's determined that another new thread is needed. This + # helps account for lots of compressed arrays being read in quick succession. max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count if self._shared_task_queue.qsize() > max_queue_size_for_current_workers: @@ -269,8 +268,8 @@ class _MultiThreadedArrayDecompressor: """Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads automatically start shutting down before the executor starts shutting down.""" # .__enter__() - # Exiting the context manager of the executor will wait for all threads to finish and prevent new - # threads from being created, as if its shutdown() method had been called. + # Exiting the context manager of the executor will wait for all threads to finish and prevent new threads from + # being created, as if its shutdown() method had been called. with self._executor: try: yield self._schedule_array_decompression @@ -278,9 +277,9 @@ class _MultiThreadedArrayDecompressor: # .__exit__() self._shutting_down = True # Signal all worker threads to finish up and shut down so that the executor can shut down. - # Because this is run on the main thread and because decompression tasks are only scheduled from - # the main thread, it is guaranteed that no more decompression tasks will be scheduled after the - # worker threads start to shut down. + # Because this is run on the main thread and because decompression tasks are only scheduled from the + # main thread, it is guaranteed that no more decompression tasks will be scheduled after the worker + # threads start to shut down. self._shared_task_queue.put(self._SHUT_DOWN_THREADS) # Because `self._executor` was entered with a context manager, it will wait for all the worker threads -- 2.30.2 From f5e286a94ecfc7c0bffb5a4516169522b3abade9 Mon Sep 17 00:00:00 2001 From: Thomas Barlow Date: Sun, 3 Dec 2023 00:31:25 +0000 Subject: [PATCH 4/5] Update for new multithreading utils --- io_scene_fbx/parse_fbx.py | 198 ++++---------------------------------- 1 file changed, 17 insertions(+), 181 deletions(-) diff --git a/io_scene_fbx/parse_fbx.py b/io_scene_fbx/parse_fbx.py index 9ba6e4cf0..1af5994e2 100644 --- a/io_scene_fbx/parse_fbx.py +++ b/io_scene_fbx/parse_fbx.py @@ -14,9 +14,9 @@ from struct import unpack import array import zlib from io import BytesIO -from contextlib import contextmanager, nullcontext from . import data_types +from .fbx_utils_threading import MultiThreadedTaskConsumer # at the end of each nested block, there is a NUL record to indicate # that the sub-scope exists (i.e. to distinguish between P: and P : {}) @@ -28,15 +28,6 @@ _HEAD_MAGIC = b'Kaydara FBX Binary\x20\x20\x00\x1a\x00' from collections import namedtuple FBXElem = namedtuple("FBXElem", ("id", "props", "props_type", "elems")) del namedtuple -# The maximum number of threads that can be started when decompressing arrays is dynamic and based on the number of -# CPUs, but has a hard max to limit resource costs. This hard max matches ThreadPoolExecutor's default behaviour. -HARD_MAX_ARRAY_DECOMPRESSION_THREADS = 32 -# The maximum size of the task queue, per array decompression thread, before another thread is started. This magic -# has been determined experimentally to usually keep the queue quite small without starting an excessive number of -# threads when the file being parsed is small or when many tasks are added in quick succession. This tends to start more -# threads than would be most optimal, but ensures that the queue is close to empty by the time the main thread finishes -# parsing, so there will be little to no waiting for decompression tasks to finish. -MAX_QUEUE_PER_DECOMPRESSION_THREAD = 5 def read_uint(read): @@ -81,6 +72,20 @@ def _create_array(data, length, array_type, array_stride, array_byteswap): return data_array +def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args): + """Decompress array data and insert the created array into the FBX tree being parsed. + + This is usually called from a separate thread to the main thread.""" + compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args + + # zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the + # decompression to complete. + data = zlib.decompress(compressed_data, bufsize=length * array_stride) + + # Create and insert the array into the parsed FBX hierarchy. + elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap) + + def unpack_array(read, array_type, array_stride, array_byteswap): """Unpack an array from an FBX file being parsed. @@ -124,176 +129,6 @@ read_data_dict = { } -class _MultiThreadedArrayDecompressor: - """Helper class that encapsulates everything needed to decompress array data on separate threads and then insert - the arrays into the FBX hierarchy, with a single-threaded fallback if multithreading is not available.""" - # A special task value used to signal array decompression threads to shut down. - _SHUT_DOWN_THREADS = object() - - __slots__ = "_shared_task_queue", "_worker_futures", "_executor", "_max_workers", "_shutting_down" - - def __init__(self, thread_pool_executor_cls, max_workers): - from queue import SimpleQueue - # All the threads share a single queue. - self._shared_task_queue = SimpleQueue() - # Reference to each thread is kept through the returned Future objects. This is used as part of determining when - # new threads should be started and is used to be able to receive and handle exceptions from the threads. - self._worker_futures = [] - # ThreadPoolExecutor might not be available on the current system. To ensure this class is only instantiated - # in cases where ThreadPoolExecutor is available, the ThreadPoolExecutor class must be provided as an argument. - self._executor = thread_pool_executor_cls(max_workers=max_workers) - # Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private - # we'll store the max workers ourselves. - self._max_workers = max_workers - # When shutting down the threads, this is set to True as an extra safeguard to prevent new array decompression - # tasks being scheduled. - self._shutting_down = False - - @classmethod - def new_cm(cls): - """Return a context manager that, when entered, returns a function to schedule array decompression tasks on - separate threads. - - If the system can't use multithreading, then the context manager's returned function will instead immediately - perform array decompression on the calling thread. - - When exiting the context manager, it waits for all scheduled decompression tasks to complete.""" - max_threads = cls._get_max_threads() - - # The concurrent.futures module does not work or is not available on WebAssembly platforms wasm32-emscripten - # and wasm32-wasi. - # wasm32-emscripten raises ModuleNotFoundError, not sure about wasm32-wasi. - try: - from concurrent.futures import ThreadPoolExecutor - thread_pool_executor_cls = ThreadPoolExecutor - except ModuleNotFoundError: - thread_pool_executor_cls = None - - # max_threads should always be greater than zero, but it can be useful for debugging and profiling to be able to - # disable array decompression multithreading by setting MAX_ARRAY_DECOMPRESSION_THREADS to zero. - if thread_pool_executor_cls and max_threads > 0: - return cls(thread_pool_executor_cls, max_threads)._wrap_executor_cm() - else: - # Fall back to single-threaded. - return nullcontext(cls._decompress_and_insert_array) - - @staticmethod - def _get_max_threads(): - """Decompressing arrays is entirely CPU work that releases the GIL, so there shouldn't be any benefit in using - more threads than there are CPUs. - - The current (main) thread is not counted when considering the maximum number of threads because it can spend - some of its time waiting for File IO, so even a system with only a single CPU can see a benefit from having a - separate thread for decompressing arrays.""" - import os - - # os.sched_getaffinity(0) gets the set of CPUs available to the current process, but is only available on some - # Unix platforms. - sched_getaffinity = getattr(os, "sched_getaffinity", None) - if sched_getaffinity is not None: - max_threads = len(sched_getaffinity(0)) - else: - # Without sched_getaffinity being available, assume all CPUs are available to the current process. - max_threads = os.cpu_count() or 1 # assume 1 if cpu_count is indeterminable - - # Cap the maximum number of threads to limit resource costs. - return min(HARD_MAX_ARRAY_DECOMPRESSION_THREADS, max_threads) - - @staticmethod - def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args): - """Decompress array data and insert the created array into the FBX tree being parsed. - - This is usually called from a separate thread to the main thread.""" - compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args - - # zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the - # decompression to complete. - data = zlib.decompress(compressed_data, bufsize=length * array_stride) - - # Create and insert the array into the parsed FBX hierarchy. - elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap) - - def _worker_callable(self): - """Callable that is run by each worker thread. - Signals the other worker threads to stop when stopped intentionally or when an exception occurs.""" - try: - while True: - # Blocks until it can get a task. - task_args = self._shared_task_queue.get() - - if task_args is self._SHUT_DOWN_THREADS: - # This special value signals that it's time for all the threads to stop. - break - else: - # Decompress the array data, create the array and insert it into the FBX hierarchy. - self._decompress_and_insert_array(*task_args) - finally: - # Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has - # occurred. - # Add _SHUT_DOWN_THREADS to the queue so that the other worker threads will also shut down. - self._shared_task_queue.put(self._SHUT_DOWN_THREADS) - - def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args): - """Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays - are found. - - Note that the signature of this function must be the same as, or otherwise be compatible with, - _decompress_and_insert_array, which is used instead of this function when multithreading is not available. - - This function is a slight misuse of ThreadPoolExecutor. Normally, each task to be scheduled would be submitted - through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks, - perhaps because each task can be quick and there can be a lot of them. An alternative would be starting new - Thread instances manually, but then we would have to implement our own functions that can wait for threads to - finish and handle exceptions.""" - if self._shutting_down: - # Shouldn't occur through normal usage. - raise RuntimeError("Cannot schedule new tasks after shutdown") - # Schedule the task by adding it to the task queue. - self._shared_task_queue.put((elem_props_data, index_to_set, compressed_array_args)) - - # Check if more worker threads need to be added to account for the rate at which tasks are being scheduled - # compared to the rate at which tasks are being consumed. - current_worker_count = len(self._worker_futures) - if current_worker_count < self._max_workers: - # Increasing the max queue size whenever a new thread is started gives some time for new threads to start up - # and begin consuming tasks from the queue before it's determined that another new thread is needed. This - # helps account for lots of compressed arrays being read in quick succession. - max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count - - if self._shared_task_queue.qsize() > max_queue_size_for_current_workers: - # Add a new worker thread because the queue has grown too large. - self._worker_futures.append(self._executor.submit(self._worker_callable)) - - @contextmanager - def _wrap_executor_cm(self): - """Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads - automatically start shutting down before the executor starts shutting down.""" - # .__enter__() - # Exiting the context manager of the executor will wait for all threads to finish and prevent new threads from - # being created, as if its shutdown() method had been called. - with self._executor: - try: - yield self._schedule_array_decompression - finally: - # .__exit__() - self._shutting_down = True - # Signal all worker threads to finish up and shut down so that the executor can shut down. - # Because this is run on the main thread and because decompression tasks are only scheduled from the - # main thread, it is guaranteed that no more decompression tasks will be scheduled after the worker - # threads start to shut down. - self._shared_task_queue.put(self._SHUT_DOWN_THREADS) - - # Because `self._executor` was entered with a context manager, it will wait for all the worker threads - # to finish even if an exception is propagated from one of the threads. - for future in self._worker_futures: - # .exception() waits for the future to finish and returns its raised exception or None. - ex = future.exception() - if ex is not None: - # If one of the threads raised an exception, propagate it to the main thread. - # Only the first exception will be propagated if there were multiple. - raise ex - - # FBX 7500 (aka FBX2016) introduces incompatible changes at binary level: # * The NULL block marking end of nested stuff switches from 13 bytes long to 25 bytes long. # * The FBX element metadata (end_offset, prop_count and prop_length) switch from uint32 to uint64. @@ -418,7 +253,8 @@ def parse_version(fn): def parse(fn, use_namedtuple=True): root_elems = [] - with open(fn, 'rb') as f, _MultiThreadedArrayDecompressor.new_cm() as decompress_array_func: + multithread_decompress_array_cm = MultiThreadedTaskConsumer.new_cpu_bound_cm(_decompress_and_insert_array) + with open(fn, 'rb') as f, multithread_decompress_array_cm as decompress_array_func: read = f.read tell = f.tell -- 2.30.2 From 726ddf588ec403c429aa919a2970fa8ce42c5b85 Mon Sep 17 00:00:00 2001 From: Thomas Barlow Date: Mon, 4 Dec 2023 02:54:14 +0000 Subject: [PATCH 5/5] Increase FBX IO version --- io_scene_fbx/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io_scene_fbx/__init__.py b/io_scene_fbx/__init__.py index 9cfebeae6..7087f1f9a 100644 --- a/io_scene_fbx/__init__.py +++ b/io_scene_fbx/__init__.py @@ -5,7 +5,7 @@ bl_info = { "name": "FBX format", "author": "Campbell Barton, Bastien Montagne, Jens Restemeier, @Mysteryem", - "version": (5, 11, 2), + "version": (5, 11, 3), "blender": (4, 1, 0), "location": "File > Import-Export", "description": "FBX IO meshes, UVs, vertex colors, materials, textures, cameras, lamps and actions", -- 2.30.2