FBX IO: Speed up parsing by multithreading array decompression #104739
@ -14,9 +14,9 @@ from struct import unpack
|
||||
import array
|
||||
import zlib
|
||||
from io import BytesIO
|
||||
from contextlib import contextmanager, nullcontext
|
||||
|
||||
from . import data_types
|
||||
from .fbx_utils_threading import MultiThreadedTaskConsumer
|
||||
|
||||
# at the end of each nested block, there is a NUL record to indicate
|
||||
# that the sub-scope exists (i.e. to distinguish between P: and P : {})
|
||||
@ -28,15 +28,6 @@ _HEAD_MAGIC = b'Kaydara FBX Binary\x20\x20\x00\x1a\x00'
|
||||
from collections import namedtuple
|
||||
FBXElem = namedtuple("FBXElem", ("id", "props", "props_type", "elems"))
|
||||
del namedtuple
|
||||
# The maximum number of threads that can be started when decompressing arrays is dynamic and based on the number of
|
||||
# CPUs, but has a hard max to limit resource costs. This hard max matches ThreadPoolExecutor's default behaviour.
|
||||
HARD_MAX_ARRAY_DECOMPRESSION_THREADS = 32
|
||||
# The maximum size of the task queue, per array decompression thread, before another thread is started. This magic
|
||||
# has been determined experimentally to usually keep the queue quite small without starting an excessive number of
|
||||
# threads when the file being parsed is small or when many tasks are added in quick succession. This tends to start more
|
||||
# threads than would be most optimal, but ensures that the queue is close to empty by the time the main thread finishes
|
||||
# parsing, so there will be little to no waiting for decompression tasks to finish.
|
||||
MAX_QUEUE_PER_DECOMPRESSION_THREAD = 5
|
||||
|
||||
|
||||
def read_uint(read):
|
||||
@ -81,6 +72,20 @@ def _create_array(data, length, array_type, array_stride, array_byteswap):
|
||||
return data_array
|
||||
|
||||
|
||||
def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args):
|
||||
"""Decompress array data and insert the created array into the FBX tree being parsed.
|
||||
|
||||
This is usually called from a separate thread to the main thread."""
|
||||
compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args
|
||||
|
||||
# zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the
|
||||
# decompression to complete.
|
||||
data = zlib.decompress(compressed_data, bufsize=length * array_stride)
|
||||
|
||||
# Create and insert the array into the parsed FBX hierarchy.
|
||||
elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap)
|
||||
|
||||
|
||||
def unpack_array(read, array_type, array_stride, array_byteswap):
|
||||
"""Unpack an array from an FBX file being parsed.
|
||||
|
||||
@ -124,176 +129,6 @@ read_data_dict = {
|
||||
}
|
||||
|
||||
|
||||
class _MultiThreadedArrayDecompressor:
|
||||
"""Helper class that encapsulates everything needed to decompress array data on separate threads and then insert
|
||||
the arrays into the FBX hierarchy, with a single-threaded fallback if multithreading is not available."""
|
||||
# A special task value used to signal array decompression threads to shut down.
|
||||
_SHUT_DOWN_THREADS = object()
|
||||
|
||||
__slots__ = "_shared_task_queue", "_worker_futures", "_executor", "_max_workers", "_shutting_down"
|
||||
|
||||
def __init__(self, thread_pool_executor_cls, max_workers):
|
||||
from queue import SimpleQueue
|
||||
# All the threads share a single queue.
|
||||
self._shared_task_queue = SimpleQueue()
|
||||
# Reference to each thread is kept through the returned Future objects. This is used as part of determining when
|
||||
# new threads should be started and is used to be able to receive and handle exceptions from the threads.
|
||||
self._worker_futures = []
|
||||
# ThreadPoolExecutor might not be available on the current system. To ensure this class is only instantiated
|
||||
# in cases where ThreadPoolExecutor is available, the ThreadPoolExecutor class must be provided as an argument.
|
||||
self._executor = thread_pool_executor_cls(max_workers=max_workers)
|
||||
# Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private
|
||||
# we'll store the max workers ourselves.
|
||||
self._max_workers = max_workers
|
||||
# When shutting down the threads, this is set to True as an extra safeguard to prevent new array decompression
|
||||
# tasks being scheduled.
|
||||
self._shutting_down = False
|
||||
|
||||
@classmethod
|
||||
def new_cm(cls):
|
||||
"""Return a context manager that, when entered, returns a function to schedule array decompression tasks on
|
||||
separate threads.
|
||||
|
||||
If the system can't use multithreading, then the context manager's returned function will instead immediately
|
||||
perform array decompression on the calling thread.
|
||||
|
||||
When exiting the context manager, it waits for all scheduled decompression tasks to complete."""
|
||||
max_threads = cls._get_max_threads()
|
||||
|
||||
# The concurrent.futures module does not work or is not available on WebAssembly platforms wasm32-emscripten
|
||||
# and wasm32-wasi.
|
||||
# wasm32-emscripten raises ModuleNotFoundError, not sure about wasm32-wasi.
|
||||
try:
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
thread_pool_executor_cls = ThreadPoolExecutor
|
||||
except ModuleNotFoundError:
|
||||
thread_pool_executor_cls = None
|
||||
|
||||
# max_threads should always be greater than zero, but it can be useful for debugging and profiling to be able to
|
||||
# disable array decompression multithreading by setting MAX_ARRAY_DECOMPRESSION_THREADS to zero.
|
||||
if thread_pool_executor_cls and max_threads > 0:
|
||||
return cls(thread_pool_executor_cls, max_threads)._wrap_executor_cm()
|
||||
else:
|
||||
# Fall back to single-threaded.
|
||||
return nullcontext(cls._decompress_and_insert_array)
|
||||
|
||||
@staticmethod
|
||||
def _get_max_threads():
|
||||
"""Decompressing arrays is entirely CPU work that releases the GIL, so there shouldn't be any benefit in using
|
||||
more threads than there are CPUs.
|
||||
|
||||
The current (main) thread is not counted when considering the maximum number of threads because it can spend
|
||||
some of its time waiting for File IO, so even a system with only a single CPU can see a benefit from having a
|
||||
separate thread for decompressing arrays."""
|
||||
import os
|
||||
|
||||
# os.sched_getaffinity(0) gets the set of CPUs available to the current process, but is only available on some
|
||||
# Unix platforms.
|
||||
sched_getaffinity = getattr(os, "sched_getaffinity", None)
|
||||
if sched_getaffinity is not None:
|
||||
max_threads = len(sched_getaffinity(0))
|
||||
else:
|
||||
# Without sched_getaffinity being available, assume all CPUs are available to the current process.
|
||||
max_threads = os.cpu_count() or 1 # assume 1 if cpu_count is indeterminable
|
||||
|
||||
# Cap the maximum number of threads to limit resource costs.
|
||||
return min(HARD_MAX_ARRAY_DECOMPRESSION_THREADS, max_threads)
|
||||
|
||||
@staticmethod
|
||||
def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args):
|
||||
"""Decompress array data and insert the created array into the FBX tree being parsed.
|
||||
|
||||
This is usually called from a separate thread to the main thread."""
|
||||
compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args
|
||||
|
||||
# zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the
|
||||
# decompression to complete.
|
||||
data = zlib.decompress(compressed_data, bufsize=length * array_stride)
|
||||
|
||||
# Create and insert the array into the parsed FBX hierarchy.
|
||||
elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap)
|
||||
|
||||
def _worker_callable(self):
|
||||
"""Callable that is run by each worker thread.
|
||||
Signals the other worker threads to stop when stopped intentionally or when an exception occurs."""
|
||||
try:
|
||||
while True:
|
||||
# Blocks until it can get a task.
|
||||
task_args = self._shared_task_queue.get()
|
||||
|
||||
if task_args is self._SHUT_DOWN_THREADS:
|
||||
# This special value signals that it's time for all the threads to stop.
|
||||
break
|
||||
else:
|
||||
# Decompress the array data, create the array and insert it into the FBX hierarchy.
|
||||
self._decompress_and_insert_array(*task_args)
|
||||
finally:
|
||||
# Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has
|
||||
# occurred.
|
||||
# Add _SHUT_DOWN_THREADS to the queue so that the other worker threads will also shut down.
|
||||
self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
|
||||
|
||||
def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args):
|
||||
"""Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays
|
||||
are found.
|
||||
|
||||
Note that the signature of this function must be the same as, or otherwise be compatible with,
|
||||
_decompress_and_insert_array, which is used instead of this function when multithreading is not available.
|
||||
|
||||
This function is a slight misuse of ThreadPoolExecutor. Normally, each task to be scheduled would be submitted
|
||||
through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks,
|
||||
perhaps because each task can be quick and there can be a lot of them. An alternative would be starting new
|
||||
Thread instances manually, but then we would have to implement our own functions that can wait for threads to
|
||||
finish and handle exceptions."""
|
||||
if self._shutting_down:
|
||||
# Shouldn't occur through normal usage.
|
||||
raise RuntimeError("Cannot schedule new tasks after shutdown")
|
||||
# Schedule the task by adding it to the task queue.
|
||||
self._shared_task_queue.put((elem_props_data, index_to_set, compressed_array_args))
|
||||
|
||||
# Check if more worker threads need to be added to account for the rate at which tasks are being scheduled
|
||||
# compared to the rate at which tasks are being consumed.
|
||||
current_worker_count = len(self._worker_futures)
|
||||
if current_worker_count < self._max_workers:
|
||||
# Increasing the max queue size whenever a new thread is started gives some time for new threads to start up
|
||||
# and begin consuming tasks from the queue before it's determined that another new thread is needed. This
|
||||
# helps account for lots of compressed arrays being read in quick succession.
|
||||
max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count
|
||||
|
||||
if self._shared_task_queue.qsize() > max_queue_size_for_current_workers:
|
||||
# Add a new worker thread because the queue has grown too large.
|
||||
self._worker_futures.append(self._executor.submit(self._worker_callable))
|
||||
|
||||
@contextmanager
|
||||
def _wrap_executor_cm(self):
|
||||
"""Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads
|
||||
automatically start shutting down before the executor starts shutting down."""
|
||||
# .__enter__()
|
||||
# Exiting the context manager of the executor will wait for all threads to finish and prevent new threads from
|
||||
# being created, as if its shutdown() method had been called.
|
||||
with self._executor:
|
||||
try:
|
||||
yield self._schedule_array_decompression
|
||||
finally:
|
||||
# .__exit__()
|
||||
self._shutting_down = True
|
||||
# Signal all worker threads to finish up and shut down so that the executor can shut down.
|
||||
# Because this is run on the main thread and because decompression tasks are only scheduled from the
|
||||
# main thread, it is guaranteed that no more decompression tasks will be scheduled after the worker
|
||||
# threads start to shut down.
|
||||
self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
|
||||
|
||||
# Because `self._executor` was entered with a context manager, it will wait for all the worker threads
|
||||
# to finish even if an exception is propagated from one of the threads.
|
||||
for future in self._worker_futures:
|
||||
# .exception() waits for the future to finish and returns its raised exception or None.
|
||||
ex = future.exception()
|
||||
if ex is not None:
|
||||
# If one of the threads raised an exception, propagate it to the main thread.
|
||||
# Only the first exception will be propagated if there were multiple.
|
||||
raise ex
|
||||
|
||||
|
||||
# FBX 7500 (aka FBX2016) introduces incompatible changes at binary level:
|
||||
# * The NULL block marking end of nested stuff switches from 13 bytes long to 25 bytes long.
|
||||
# * The FBX element metadata (end_offset, prop_count and prop_length) switch from uint32 to uint64.
|
||||
@ -418,7 +253,8 @@ def parse_version(fn):
|
||||
def parse(fn, use_namedtuple=True):
|
||||
root_elems = []
|
||||
|
||||
with open(fn, 'rb') as f, _MultiThreadedArrayDecompressor.new_cm() as decompress_array_func:
|
||||
multithread_decompress_array_cm = MultiThreadedTaskConsumer.new_cpu_bound_cm(_decompress_and_insert_array)
|
||||
with open(fn, 'rb') as f, multithread_decompress_array_cm as decompress_array_func:
|
||||
read = f.read
|
||||
tell = f.tell
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user