FBX IO: Speed up parsing by multithreading array decompression #104739

Merged
Thomas Barlow merged 7 commits from Mysteryem/blender-addons:fbx_parse_multithread_pr into main 2024-01-12 21:32:36 +01:00
Showing only changes of commit 00c58e0d91 - Show all commits

View File

@ -14,6 +14,7 @@ from struct import unpack
import array
import zlib
from io import BytesIO
from contextlib import contextmanager, nullcontext
from . import data_types
@ -27,6 +28,15 @@ _HEAD_MAGIC = b'Kaydara FBX Binary\x20\x20\x00\x1a\x00'
from collections import namedtuple
FBXElem = namedtuple("FBXElem", ("id", "props", "props_type", "elems"))
del namedtuple
# The maximum number of threads that can be started when decompressing arrays is dynamic and based on the number of
# CPUs, but has a hard max to limit resource costs. This hard max matches ThreadPoolExecutor's default behaviour.
HARD_MAX_ARRAY_DECOMPRESSION_THREADS = 32
# The maximum size of the task queue, per array decompression thread, before another thread is started. This magic
# has been determined experimentally to usually keep the queue quite small without starting an excessive number of
# threads when the file being parsed is small or when many tasks are added in quick succession. This tends to start more
# threads than would be most optimal, but ensures that the queue is close to empty by the time the main thread finishes
# parsing, so there will be little to no waiting for decompression tasks to finish.
MAX_QUEUE_PER_DECOMPRESSION_THREAD = 5
def read_uint(read):
@ -59,16 +69,10 @@ def read_elem_start64(read):
return end_offset, prop_count, elem_id
def unpack_array(read, array_type, array_stride, array_byteswap):
length, encoding, comp_len = read_array_params(read)
data = read(comp_len)
if encoding == 0:
pass
elif encoding == 1:
data = zlib.decompress(data)
def _create_array(data, length, array_type, array_stride, array_byteswap):
"""Create an array from FBX data."""
# If size of the data does not match the expected size of the array, then something is wrong with the code or the
# FBX file.
assert(length * array_stride == len(data))
data_array = array.array(array_type, data)
@ -77,6 +81,35 @@ def unpack_array(read, array_type, array_stride, array_byteswap):
return data_array
def unpack_array(read, array_type, array_stride, array_byteswap):
"""Unpack an array from an FBX file being parsed.
If the array data is compressed, the compressed data is combined with the other arguments into a tuple to prepare
for decompressing on a separate thread if possible.
If the array data is not compressed, the array is created.
Returns (tuple, True) or (array, False)."""
length, encoding, comp_len = read_array_params(read)
data = read(comp_len)
if encoding == 1:
# Array data requires decompression, which is done in a separate thread if possible.
return (data, length, array_type, array_stride, array_byteswap), True
else:
return _create_array(data, length, array_type, array_stride, array_byteswap), False
read_array_dict = {
b'b'[0]: lambda read: unpack_array(read, data_types.ARRAY_BOOL, 1, False), # bool
b'c'[0]: lambda read: unpack_array(read, data_types.ARRAY_BYTE, 1, False), # ubyte
b'i'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT32, 4, True), # int
b'l'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT64, 8, True), # long
b'f'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT32, 4, False), # float
b'd'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT64, 8, False), # double
}
read_data_dict = {
b'Z'[0]: lambda read: unpack(b'<b', read(1))[0], # byte
b'Y'[0]: lambda read: unpack(b'<h', read(2))[0], # 16 bit int
@ -87,15 +120,180 @@ read_data_dict = {
b'L'[0]: lambda read: unpack(b'<q', read(8))[0], # 64 bit int
b'R'[0]: lambda read: read(read_uint(read)), # binary data
b'S'[0]: lambda read: read(read_uint(read)), # string data
b'f'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT32, 4, False), # array (float)
b'i'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT32, 4, True), # array (int)
b'd'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT64, 8, False), # array (double)
b'l'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT64, 8, True), # array (long)
b'b'[0]: lambda read: unpack_array(read, data_types.ARRAY_BOOL, 1, False), # array (bool)
b'c'[0]: lambda read: unpack_array(read, data_types.ARRAY_BYTE, 1, False), # array (ubyte)
}
class _MultiThreadedArrayDecompressor:
"""Helper class that encapsulates everything needed to decompress array data on separate threads and then insert
the arrays into the FBX hierarchy, with a single-threaded fallback if multithreading is not available."""
# A special task value used to signal array decompression threads to shut down.
_SHUT_DOWN_THREADS = object()
__slots__ = "_shared_task_queue", "_worker_futures", "_executor", "_max_workers", "_shutting_down"
def __init__(self, thread_pool_executor_cls, max_workers):
from queue import SimpleQueue
# All the threads share a single queue.
self._shared_task_queue = SimpleQueue()
# Reference to each thread is kept through the returned Future objects. This is used as part of determining when
# new threads should be started and is used to be able to receive and handle exceptions from the threads.
self._worker_futures = []
# ThreadPoolExecutor might not be available on the current system. To ensure this class is only instantiated
# in cases where ThreadPoolExecutor is available, the ThreadPoolExecutor class must be provided as an argument.
self._executor = thread_pool_executor_cls(max_workers=max_workers)
# Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private
# we'll store the max workers ourselves.
self._max_workers = max_workers
# When shutting down the threads, this is set to True as an extra safeguard to prevent new array decompression
# tasks being scheduled.
self._shutting_down = False
@classmethod
def new_cm(cls):
"""Return a context manager that, when entered, returns a function to schedule array decompression tasks on
separate threads.
If the system can't use multithreading, then the context manager's returned function will instead immediately
perform array decompression on the calling thread.
When exiting the context manager, it waits for all scheduled decompression tasks to complete."""
max_threads = cls._get_max_threads()
# The concurrent.futures module does not work or is not available on WebAssembly platforms wasm32-emscripten
# and wasm32-wasi.
# wasm32-emscripten raises ModuleNotFoundError, not sure about wasm32-wasi.
try:
from concurrent.futures import ThreadPoolExecutor
thread_pool_executor_cls = ThreadPoolExecutor
except ModuleNotFoundError:
thread_pool_executor_cls = None
# max_threads should always be greater than zero, but it can be useful for debugging and profiling to be able to
# disable array decompression multithreading by setting MAX_ARRAY_DECOMPRESSION_THREADS to zero.
if thread_pool_executor_cls and max_threads > 0:
return cls(thread_pool_executor_cls, max_threads)._wrap_executor_cm()
else:
# Fall back to single-threaded.
return nullcontext(cls._decompress_and_insert_array)
@staticmethod
def _get_max_threads():
"""Decompressing arrays is entirely CPU work that releases the GIL, so there shouldn't be any benefit in using
more threads than there are CPUs.
The current (main) thread is not counted when considering the maximum number of threads because it can spend
some of its time waiting for File IO, so even a system with only a single CPU can see a benefit from having a
separate thread for decompressing arrays."""
import os
# os.sched_getaffinity(0) gets the set of CPUs available to the current process, but is only available on some
# Unix platforms.
sched_getaffinity = getattr(os, "sched_getaffinity", None)
if sched_getaffinity is not None:
max_threads = len(sched_getaffinity(0))
else:
# Without sched_getaffinity being available, assume all CPUs are available to the current process.
max_threads = os.cpu_count() or 1 # assume 1 if cpu_count is indeterminable
# Cap the maximum number of threads to limit resource costs.
return min(HARD_MAX_ARRAY_DECOMPRESSION_THREADS, max_threads)
@staticmethod
def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args):
"""Decompress array data and insert the created array into the FBX tree being parsed.
This is usually called from a separate thread to the main thread."""
compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args
# zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the
# decompression to complete.
data = zlib.decompress(compressed_data)
# Create and insert the array into the parsed FBX hierarchy.
elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap)
def _worker_callable(self):
"""Callable that is run by each worker thread.
Signals the other worker threads to stop when stopped intentionally or when an exception occurs."""
try:
while True:
# Blocks until it can get a task.
task_args = self._shared_task_queue.get()
if task_args is self._SHUT_DOWN_THREADS:
# This special value signals that it's time for all the threads to stop.
break
else:
# Decompress the array data, create the array and insert it into the FBX hierarchy.
self._decompress_and_insert_array(*task_args)
finally:
# Either the thread has been told to shut down because it received _STOP_THREADS or an exception has
# occurred.
# Add _STOP_THREADS to the queue so that the other worker threads will also shut down.
self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
def _schedule_array_decompression(self, elem_props_data, index_to_set, compressed_array_args):
"""Some FBX files might not have any compressed arrays, so worker threads are only started as compressed arrays
are found.
Note that this function must have the same signature as (or otherwise be compatible as a drop-in replacement
for) _decompress_and_insert_array, which is used when multithreading is not available.
This function is a slight misuse of ThreadPoolExecutor, normally each task to be scheduled would be submitted
through ThreadPoolExecutor.submit, but doing so is noticeably slower for these array decompression tasks,
perhaps because they can be quite quick and there can be a lot of them. We could start new Thread instances
manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for waiting for
threads to finish and handling exceptions without having to implement an API using Thread ourselves."""
if self._shutting_down:
# Shouldn't occur through normal usage.
raise RuntimeError("Cannot schedule new tasks after shutdown")
# Schedule the task by adding it to the task queue.
self._shared_task_queue.put((elem_props_data, index_to_set, compressed_array_args))
# Check if more worker threads need to be added to account for the rate at which tasks are being scheduled
# compared to the rate at which tasks are being consumed.
current_worker_count = len(self._worker_futures)
if current_worker_count < self._max_workers:
# The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's
# likely that the queue size will still be over the max, causing another new thread to be added immediately.
# Increasing the max queue size whenever a new thread is started gives some time for the new thread to start
# up and begin consuming tasks before it's determined that another thread is needed.
max_queue_size_for_current_workers = MAX_QUEUE_PER_DECOMPRESSION_THREAD * current_worker_count
if self._shared_task_queue.qsize() > max_queue_size_for_current_workers:
# Add a new worker thread because the queue has grown too large.
self._worker_futures.append(self._executor.submit(self._worker_callable))
@contextmanager
def _wrap_executor_cm(self):
"""Wrap the executor's context manager to instead return _schedule_array_decompression and such that the threads
automatically start shutting down before the executor starts shutting down."""
# .__enter__()
# Exiting the context manager of the executor will wait for all threads to finish and prevent new
# threads from being created, as if its shutdown() method had been called.
with self._executor:
try:
yield self._schedule_array_decompression
finally:
# .__exit__()
self._shutting_down = True
# Signal all worker threads to finish up and shut down so that the executor can shut down.
# Because this is run on the main thread and because decompression tasks are only scheduled from
# the main thread, it is guaranteed that no more decompression tasks will be scheduled after the
# worker threads start to shut down.
self._shared_task_queue.put(self._SHUT_DOWN_THREADS)
# Because `self._executor` was entered with a context manager, it will wait for all the worker threads
# to finish even if an exception is propagated from one of the threads.
for future in self._worker_futures:
# .exception() waits for the future to finish and returns its raised exception or None.
ex = future.exception()
if ex is not None:
# If one of the threads raised an exception, propagate it to the main thread.
# Only the first exception will be propagated if there were multiple.
raise ex
# FBX 7500 (aka FBX2016) introduces incompatible changes at binary level:
# * The NULL block marking end of nested stuff switches from 13 bytes long to 25 bytes long.
# * The FBX element metadata (end_offset, prop_count and prop_length) switch from uint32 to uint64.
@ -114,7 +312,7 @@ def init_version(fbx_version):
_BLOCK_SENTINEL_DATA = (b'\0' * _BLOCK_SENTINEL_LENGTH)
def read_elem(read, tell, use_namedtuple, tell_file_offset=0):
def read_elem(read, tell, use_namedtuple, decompress_array_func, tell_file_offset=0):
# [0] the offset at which this block ends
# [1] the number of properties in the scope
# [2] the length of the property list
@ -132,7 +330,17 @@ def read_elem(read, tell, use_namedtuple, tell_file_offset=0):
for i in range(prop_count):
data_type = read(1)[0]
elem_props_data[i] = read_data_dict[data_type](read)
if data_type in read_array_dict:
val, needs_decompression = read_array_dict[data_type](read)
if needs_decompression:
# Array decompression releases the GIL, so can be multithreaded (if possible on the current system) for
# performance.
# After decompressing, the array is inserted into elem_props_data[i].
decompress_array_func(elem_props_data, i, val)
else:
elem_props_data[i] = val
else:
elem_props_data[i] = read_data_dict[data_type](read)
elem_props_type[i] = data_type
pos = tell()
@ -175,7 +383,7 @@ def read_elem(read, tell, use_namedtuple, tell_file_offset=0):
sub_pos = start_sub_pos
while sub_pos < sub_tree_end:
elem_subtree.append(read_elem(read, tell, use_namedtuple, tell_file_offset))
elem_subtree.append(read_elem(read, tell, use_namedtuple, decompress_array_func, tell_file_offset))
sub_pos = tell()
# At the end of each subtree there should be a sentinel (an empty element with all bytes set to zero).
@ -210,7 +418,7 @@ def parse_version(fn):
def parse(fn, use_namedtuple=True):
root_elems = []
with open(fn, 'rb') as f:
with open(fn, 'rb') as f, _MultiThreadedArrayDecompressor.new_cm() as decompress_array_func:
read = f.read
tell = f.tell
@ -221,7 +429,7 @@ def parse(fn, use_namedtuple=True):
init_version(fbx_version)
while True:
elem = read_elem(read, tell, use_namedtuple)
elem = read_elem(read, tell, use_namedtuple, decompress_array_func)
if elem is None:
break
root_elems.append(elem)