FBX IO: Speed up parsing by multithreading array decompression #104739

Merged
Thomas Barlow merged 7 commits from Mysteryem/blender-addons:fbx_parse_multithread_pr into main 2024-01-12 21:32:36 +01:00
2 changed files with 65 additions and 22 deletions

View File

@ -5,7 +5,7 @@
bl_info = {
"name": "FBX format",
"author": "Campbell Barton, Bastien Montagne, Jens Restemeier, @Mysteryem",
"version": (5, 11, 2),
"version": (5, 11, 3),
"blender": (4, 1, 0),
"location": "File > Import-Export",
"description": "FBX IO meshes, UVs, vertex colors, materials, textures, cameras, lamps and actions",

View File

@ -16,6 +16,7 @@ import zlib
from io import BytesIO
from . import data_types
from .fbx_utils_threading import MultiThreadedTaskConsumer
# at the end of each nested block, there is a NUL record to indicate
# that the sub-scope exists (i.e. to distinguish between P: and P : {})
@ -59,16 +60,10 @@ def read_elem_start64(read):
return end_offset, prop_count, elem_id
def unpack_array(read, array_type, array_stride, array_byteswap):
length, encoding, comp_len = read_array_params(read)
data = read(comp_len)
if encoding == 0:
pass
elif encoding == 1:
data = zlib.decompress(data)
def _create_array(data, length, array_type, array_stride, array_byteswap):
"""Create an array from FBX data."""
# If size of the data does not match the expected size of the array, then something is wrong with the code or the
# FBX file.
assert(length * array_stride == len(data))
data_array = array.array(array_type, data)
@ -77,6 +72,49 @@ def unpack_array(read, array_type, array_stride, array_byteswap):
return data_array
def _decompress_and_insert_array(elem_props_data, index_to_set, compressed_array_args):
"""Decompress array data and insert the created array into the FBX tree being parsed.
This is usually called from a separate thread to the main thread."""
compressed_data, length, array_type, array_stride, array_byteswap = compressed_array_args
# zlib.decompress releases the Global Interpreter Lock, so another thread can run code while waiting for the
# decompression to complete.
data = zlib.decompress(compressed_data, bufsize=length * array_stride)
# Create and insert the array into the parsed FBX hierarchy.
elem_props_data[index_to_set] = _create_array(data, length, array_type, array_stride, array_byteswap)
def unpack_array(read, array_type, array_stride, array_byteswap):
"""Unpack an array from an FBX file being parsed.
If the array data is compressed, the compressed data is combined with the other arguments into a tuple to prepare
for decompressing on a separate thread if possible.
If the array data is not compressed, the array is created.
Returns (tuple, True) or (array, False)."""
length, encoding, comp_len = read_array_params(read)
data = read(comp_len)
if encoding == 1:
# Array data requires decompression, which is done in a separate thread if possible.
return (data, length, array_type, array_stride, array_byteswap), True
else:
return _create_array(data, length, array_type, array_stride, array_byteswap), False
read_array_dict = {
b'b'[0]: lambda read: unpack_array(read, data_types.ARRAY_BOOL, 1, False), # bool
b'c'[0]: lambda read: unpack_array(read, data_types.ARRAY_BYTE, 1, False), # ubyte
b'i'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT32, 4, True), # int
b'l'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT64, 8, True), # long
b'f'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT32, 4, False), # float
b'd'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT64, 8, False), # double
}
read_data_dict = {
b'Z'[0]: lambda read: unpack(b'<b', read(1))[0], # byte
b'Y'[0]: lambda read: unpack(b'<h', read(2))[0], # 16 bit int
@ -88,12 +126,6 @@ read_data_dict = {
b'L'[0]: lambda read: unpack(b'<q', read(8))[0], # 64 bit int
b'R'[0]: lambda read: read(read_uint(read)), # binary data
b'S'[0]: lambda read: read(read_uint(read)), # string data
b'f'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT32, 4, False), # array (float)
b'i'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT32, 4, True), # array (int)
b'd'[0]: lambda read: unpack_array(read, data_types.ARRAY_FLOAT64, 8, False), # array (double)
b'l'[0]: lambda read: unpack_array(read, data_types.ARRAY_INT64, 8, True), # array (long)
b'b'[0]: lambda read: unpack_array(read, data_types.ARRAY_BOOL, 1, False), # array (bool)
b'c'[0]: lambda read: unpack_array(read, data_types.ARRAY_BYTE, 1, False), # array (ubyte)
}
@ -115,7 +147,7 @@ def init_version(fbx_version):
_BLOCK_SENTINEL_DATA = (b'\0' * _BLOCK_SENTINEL_LENGTH)
def read_elem(read, tell, use_namedtuple, tell_file_offset=0):
def read_elem(read, tell, use_namedtuple, decompress_array_func, tell_file_offset=0):
# [0] the offset at which this block ends
# [1] the number of properties in the scope
# [2] the length of the property list
@ -133,7 +165,17 @@ def read_elem(read, tell, use_namedtuple, tell_file_offset=0):
for i in range(prop_count):
data_type = read(1)[0]
elem_props_data[i] = read_data_dict[data_type](read)
if data_type in read_array_dict:
val, needs_decompression = read_array_dict[data_type](read)
if needs_decompression:
# Array decompression releases the GIL, so can be multithreaded (if possible on the current system) for
# performance.
# After decompressing, the array is inserted into elem_props_data[i].
decompress_array_func(elem_props_data, i, val)
else:
elem_props_data[i] = val
else:
elem_props_data[i] = read_data_dict[data_type](read)
elem_props_type[i] = data_type
pos = tell()
@ -176,7 +218,7 @@ def read_elem(read, tell, use_namedtuple, tell_file_offset=0):
sub_pos = start_sub_pos
while sub_pos < sub_tree_end:
elem_subtree.append(read_elem(read, tell, use_namedtuple, tell_file_offset))
elem_subtree.append(read_elem(read, tell, use_namedtuple, decompress_array_func, tell_file_offset))
sub_pos = tell()
# At the end of each subtree there should be a sentinel (an empty element with all bytes set to zero).
@ -211,7 +253,8 @@ def parse_version(fn):
def parse(fn, use_namedtuple=True):
root_elems = []
with open(fn, 'rb') as f:
multithread_decompress_array_cm = MultiThreadedTaskConsumer.new_cpu_bound_cm(_decompress_and_insert_array)
with open(fn, 'rb') as f, multithread_decompress_array_cm as decompress_array_func:
read = f.read
tell = f.tell
@ -222,7 +265,7 @@ def parse(fn, use_namedtuple=True):
init_version(fbx_version)
while True:
elem = read_elem(read, tell, use_namedtuple)
elem = read_elem(read, tell, use_namedtuple, decompress_array_func)
if elem is None:
break
root_elems.append(elem)