Reformat with Black

No functional changes.
This commit is contained in:
2021-02-16 11:21:06 +01:00
parent 883f125722
commit 8b49c5505e
27 changed files with 3094 additions and 2288 deletions

View File

@@ -52,13 +52,13 @@ def open_blend(filename, access="rb"):
bfile.is_compressed = False
bfile.filepath_orig = filename
return bfile
elif magic[:2] == b'\x1f\x8b':
elif magic[:2] == b"\x1f\x8b":
log.debug("gzip blendfile detected")
handle.close()
log.debug("decompressing started")
fs = gzip.open(filename, "rb")
data = fs.read(FILE_BUFFER_SIZE)
magic = data[:len(magic_test)]
magic = data[: len(magic_test)]
if magic == magic_test:
handle = tempfile.TemporaryFile()
while data:
@@ -90,6 +90,7 @@ class BlendFile:
"""
Blend file.
"""
__slots__ = (
# file (result of open())
"handle",
@@ -114,7 +115,7 @@ class BlendFile:
"is_modified",
# bool (is file gzipped)
"is_compressed",
)
)
def __init__(self, handle):
log.debug("initializing reading blend-file")
@@ -125,11 +126,12 @@ class BlendFile:
self.code_index = {}
block = BlendFileBlock(handle, self)
while block.code != b'ENDB':
if block.code == b'DNA1':
(self.structs,
self.sdna_index_from_id,
) = BlendFile.decode_structs(self.header, block, handle)
while block.code != b"ENDB":
if block.code == b"DNA1":
(
self.structs,
self.sdna_index_from_id,
) = BlendFile.decode_structs(self.header, block, handle)
else:
handle.seek(block.size, os.SEEK_CUR)
@@ -141,7 +143,9 @@ class BlendFile:
self.blocks.append(block)
# cache (could lazy init, incase we never use?)
self.block_from_offset = {block.addr_old: block for block in self.blocks if block.code != b'ENDB'}
self.block_from_offset = {
block.addr_old: block for block in self.blocks if block.code != b"ENDB"
}
def __enter__(self):
return self
@@ -150,7 +154,7 @@ class BlendFile:
self.close()
def find_blocks_from_code(self, code):
assert(type(code) == bytes)
assert type(code) == bytes
if code not in self.code_index:
return []
return self.code_index[code]
@@ -158,7 +162,7 @@ class BlendFile:
def find_block_from_offset(self, offset):
# same as looking looping over all blocks,
# then checking ``block.addr_old == offset``
assert(type(offset) is int)
assert type(offset) is int
return self.block_from_offset.get(offset)
def close(self):
@@ -185,12 +189,15 @@ class BlendFile:
def ensure_subtype_smaller(self, sdna_index_curr, sdna_index_next):
# never refine to a smaller type
if (self.structs[sdna_index_curr].size >
self.structs[sdna_index_next].size):
if self.structs[sdna_index_curr].size > self.structs[sdna_index_next].size:
raise RuntimeError("cant refine to smaller type (%s -> %s)" %
(self.structs[sdna_index_curr].dna_type_id.decode('ascii'),
self.structs[sdna_index_next].dna_type_id.decode('ascii')))
raise RuntimeError(
"cant refine to smaller type (%s -> %s)"
% (
self.structs[sdna_index_curr].dna_type_id.decode("ascii"),
self.structs[sdna_index_next].dna_type_id.decode("ascii"),
)
)
@staticmethod
def decode_structs(header, block, handle):
@@ -199,7 +206,7 @@ class BlendFile:
"""
log.debug("building DNA catalog")
shortstruct = DNA_IO.USHORT[header.endian_index]
shortstruct2 = struct.Struct(header.endian_str + b'HH')
shortstruct2 = struct.Struct(header.endian_str + b"HH")
intstruct = DNA_IO.UINT[header.endian_index]
data = handle.read(block.size)
@@ -281,6 +288,7 @@ class BlendFileBlock:
"""
Instance of a struct.
"""
__slots__ = (
# BlendFile
"file",
@@ -291,21 +299,25 @@ class BlendFileBlock:
"count",
"file_offset",
"user_data",
)
)
def __str__(self):
return ("<%s.%s (%s), size=%d at %s>" %
# fields=[%s]
(self.__class__.__name__,
self.dna_type.dna_type_id.decode('ascii'),
self.code.decode(),
self.size,
# b", ".join(f.dna_name.name_only for f in self.dna_type.fields).decode('ascii'),
hex(self.addr_old),
))
return (
"<%s.%s (%s), size=%d at %s>"
%
# fields=[%s]
(
self.__class__.__name__,
self.dna_type.dna_type_id.decode("ascii"),
self.code.decode(),
self.size,
# b", ".join(f.dna_name.name_only for f in self.dna_type.fields).decode('ascii'),
hex(self.addr_old),
)
)
def __init__(self, handle, bfile):
OLDBLOCK = struct.Struct(b'4sI')
OLDBLOCK = struct.Struct(b"4sI")
self.file = bfile
self.user_data = None
@@ -318,8 +330,8 @@ class BlendFileBlock:
if len(data) > 15:
blockheader = bfile.block_header_struct.unpack(data)
self.code = blockheader[0].partition(b'\0')[0]
if self.code != b'ENDB':
self.code = blockheader[0].partition(b"\0")[0]
if self.code != b"ENDB":
self.size = blockheader[1]
self.addr_old = blockheader[2]
self.sdna_index = blockheader[3]
@@ -333,7 +345,7 @@ class BlendFileBlock:
self.file_offset = 0
else:
blockheader = OLDBLOCK.unpack(data)
self.code = blockheader[0].partition(b'\0')[0]
self.code = blockheader[0].partition(b"\0")[0]
self.code = DNA_IO.read_data0(blockheader[0])
self.size = 0
self.addr_old = 0
@@ -346,28 +358,30 @@ class BlendFileBlock:
return self.file.structs[self.sdna_index]
def refine_type_from_index(self, sdna_index_next):
assert(type(sdna_index_next) is int)
assert type(sdna_index_next) is int
sdna_index_curr = self.sdna_index
self.file.ensure_subtype_smaller(sdna_index_curr, sdna_index_next)
self.sdna_index = sdna_index_next
def refine_type(self, dna_type_id):
assert(type(dna_type_id) is bytes)
assert type(dna_type_id) is bytes
self.refine_type_from_index(self.file.sdna_index_from_id[dna_type_id])
def get_file_offset(self, path,
default=...,
sdna_index_refine=None,
base_index=0,
):
def get_file_offset(
self,
path,
default=...,
sdna_index_refine=None,
base_index=0,
):
"""
Return (offset, length)
"""
assert(type(path) is bytes)
assert type(path) is bytes
ofs = self.file_offset
if base_index != 0:
assert(base_index < self.count)
assert base_index < self.count
ofs += (self.size // self.count) * base_index
self.file.handle.seek(ofs, os.SEEK_SET)
@@ -377,21 +391,23 @@ class BlendFileBlock:
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
dna_struct = self.file.structs[sdna_index_refine]
field = dna_struct.field_from_path(
self.file.header, self.file.handle, path)
field = dna_struct.field_from_path(self.file.header, self.file.handle, path)
return (self.file.handle.tell(), field.dna_name.array_size)
def get(self, path,
default=...,
sdna_index_refine=None,
use_nil=True, use_str=True,
base_index=0,
):
def get(
self,
path,
default=...,
sdna_index_refine=None,
use_nil=True,
use_str=True,
base_index=0,
):
ofs = self.file_offset
if base_index != 0:
assert(base_index < self.count)
assert base_index < self.count
ofs += (self.size // self.count) * base_index
self.file.handle.seek(ofs, os.SEEK_SET)
@@ -402,36 +418,55 @@ class BlendFileBlock:
dna_struct = self.file.structs[sdna_index_refine]
return dna_struct.field_get(
self.file.header, self.file.handle, path,
default=default,
use_nil=use_nil, use_str=use_str,
)
self.file.header,
self.file.handle,
path,
default=default,
use_nil=use_nil,
use_str=use_str,
)
def get_recursive_iter(self, path, path_root=b"",
default=...,
sdna_index_refine=None,
use_nil=True, use_str=True,
base_index=0,
):
def get_recursive_iter(
self,
path,
path_root=b"",
default=...,
sdna_index_refine=None,
use_nil=True,
use_str=True,
base_index=0,
):
if path_root:
path_full = (
(path_root if type(path_root) is tuple else (path_root, )) +
(path if type(path) is tuple else (path, )))
path_full = (path_root if type(path_root) is tuple else (path_root,)) + (
path if type(path) is tuple else (path,)
)
else:
path_full = path
try:
yield (path_full, self.get(path_full, default, sdna_index_refine, use_nil, use_str, base_index))
yield (
path_full,
self.get(
path_full, default, sdna_index_refine, use_nil, use_str, base_index
),
)
except NotImplementedError as ex:
msg, dna_name, dna_type = ex.args
struct_index = self.file.sdna_index_from_id.get(dna_type.dna_type_id, None)
if struct_index is None:
yield (path_full, "<%s>" % dna_type.dna_type_id.decode('ascii'))
yield (path_full, "<%s>" % dna_type.dna_type_id.decode("ascii"))
else:
struct = self.file.structs[struct_index]
for f in struct.fields:
yield from self.get_recursive_iter(
f.dna_name.name_only, path_full, default, None, use_nil, use_str, 0)
f.dna_name.name_only,
path_full,
default,
None,
use_nil,
use_str,
0,
)
def items_recursive_iter(self):
for k in self.keys():
@@ -445,9 +480,13 @@ class BlendFileBlock:
# TODO This implementation is most likely far from optimal... and CRC32 is not renown as the best hashing
# algo either. But for now does the job!
import zlib
def _is_pointer(self, k):
return self.file.structs[self.sdna_index].field_from_path(
self.file.header, self.file.handle, k).dna_name.is_pointer
return (
self.file.structs[self.sdna_index]
.field_from_path(self.file.header, self.file.handle, k)
.dna_name.is_pointer
)
hsh = 1
for k, v in self.items_recursive_iter():
@@ -455,9 +494,12 @@ class BlendFileBlock:
hsh = zlib.adler32(str(v).encode(), hsh)
return hsh
def set(self, path, value,
sdna_index_refine=None,
):
def set(
self,
path,
value,
sdna_index_refine=None,
):
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
@@ -467,29 +509,34 @@ class BlendFileBlock:
dna_struct = self.file.structs[sdna_index_refine]
self.file.handle.seek(self.file_offset, os.SEEK_SET)
self.file.is_modified = True
return dna_struct.field_set(
self.file.header, self.file.handle, path, value)
return dna_struct.field_set(self.file.header, self.file.handle, path, value)
# ---------------
# Utility get/set
#
# avoid inline pointer casting
def get_pointer(
self, path,
default=...,
sdna_index_refine=None,
base_index=0,
):
self,
path,
default=...,
sdna_index_refine=None,
base_index=0,
):
if sdna_index_refine is None:
sdna_index_refine = self.sdna_index
result = self.get(path, default, sdna_index_refine=sdna_index_refine, base_index=base_index)
result = self.get(
path, default, sdna_index_refine=sdna_index_refine, base_index=base_index
)
# default
if type(result) is not int:
return result
assert(self.file.structs[sdna_index_refine].field_from_path(
self.file.header, self.file.handle, path).dna_name.is_pointer)
assert (
self.file.structs[sdna_index_refine]
.field_from_path(self.file.header, self.file.handle, path)
.dna_name.is_pointer
)
if result != 0:
# possible (but unlikely)
# that this fails and returns None
@@ -517,7 +564,7 @@ class BlendFileBlock:
yield self[k]
except NotImplementedError as ex:
msg, dna_name, dna_type = ex.args
yield "<%s>" % dna_type.dna_type_id.decode('ascii')
yield "<%s>" % dna_type.dna_type_id.decode("ascii")
def items(self):
for k in self.keys():
@@ -525,7 +572,7 @@ class BlendFileBlock:
yield (k, self[k])
except NotImplementedError as ex:
msg, dna_name, dna_type = ex.args
yield (k, "<%s>" % dna_type.dna_type_id.decode('ascii'))
yield (k, "<%s>" % dna_type.dna_type_id.decode("ascii"))
# -----------------------------------------------------------------------------
@@ -542,6 +589,7 @@ class BlendFileHeader:
BlendFileHeader allocates the first 12 bytes of a blend file
it contains information about the hardware architecture
"""
__slots__ = (
# str
"magic",
@@ -555,56 +603,61 @@ class BlendFileHeader:
"endian_str",
# int, used to index common types
"endian_index",
)
)
def __init__(self, handle):
FILEHEADER = struct.Struct(b'7s1s1s3s')
FILEHEADER = struct.Struct(b"7s1s1s3s")
log.debug("reading blend-file-header")
values = FILEHEADER.unpack(handle.read(FILEHEADER.size))
self.magic = values[0]
pointer_size_id = values[1]
if pointer_size_id == b'-':
if pointer_size_id == b"-":
self.pointer_size = 8
elif pointer_size_id == b'_':
elif pointer_size_id == b"_":
self.pointer_size = 4
else:
assert(0)
assert 0
endian_id = values[2]
if endian_id == b'v':
if endian_id == b"v":
self.is_little_endian = True
self.endian_str = b'<'
self.endian_str = b"<"
self.endian_index = 0
elif endian_id == b'V':
elif endian_id == b"V":
self.is_little_endian = False
self.endian_index = 1
self.endian_str = b'>'
self.endian_str = b">"
else:
assert(0)
assert 0
version_id = values[3]
self.version = int(version_id)
def create_block_header_struct(self):
return struct.Struct(b''.join((
self.endian_str,
b'4sI',
b'I' if self.pointer_size == 4 else b'Q',
b'II',
)))
return struct.Struct(
b"".join(
(
self.endian_str,
b"4sI",
b"I" if self.pointer_size == 4 else b"Q",
b"II",
)
)
)
class DNAName:
"""
DNAName is a C-type name stored in the DNA
"""
__slots__ = (
"name_full",
"name_only",
"is_pointer",
"is_method_pointer",
"array_size",
)
)
def __init__(self, name_full):
self.name_full = name_full
@@ -614,40 +667,40 @@ class DNAName:
self.array_size = self.calc_array_size()
def __repr__(self):
return '%s(%r)' % (type(self).__qualname__, self.name_full)
return "%s(%r)" % (type(self).__qualname__, self.name_full)
def as_reference(self, parent):
if parent is None:
result = b''
result = b""
else:
result = parent + b'.'
result = parent + b"."
result = result + self.name_only
return result
def calc_name_only(self):
result = self.name_full.strip(b'*()')
index = result.find(b'[')
result = self.name_full.strip(b"*()")
index = result.find(b"[")
if index != -1:
result = result[:index]
return result
def calc_is_pointer(self):
return (b'*' in self.name_full)
return b"*" in self.name_full
def calc_is_method_pointer(self):
return (b'(*' in self.name_full)
return b"(*" in self.name_full
def calc_array_size(self):
result = 1
temp = self.name_full
index = temp.find(b'[')
index = temp.find(b"[")
while index != -1:
index_2 = temp.find(b']')
result *= int(temp[index + 1:index_2])
temp = temp[index_2 + 1:]
index = temp.find(b'[')
index_2 = temp.find(b"]")
result *= int(temp[index + 1 : index_2])
temp = temp[index_2 + 1 :]
index = temp.find(b"[")
return result
@@ -657,6 +710,7 @@ class DNAField:
DNAField is a coupled DNAStruct and DNAName
and cache offset for reuse
"""
__slots__ = (
# DNAName
"dna_name",
@@ -667,7 +721,7 @@ class DNAField:
"dna_size",
# cached info (avoid looping over fields each time)
"dna_offset",
)
)
def __init__(self, dna_type, dna_name, dna_size, dna_offset):
self.dna_type = dna_type
@@ -680,13 +734,14 @@ class DNAStruct:
"""
DNAStruct is a C-type structure stored in the DNA
"""
__slots__ = (
"dna_type_id",
"size",
"fields",
"field_from_name",
"user_data",
)
)
def __init__(self, dna_type_id):
self.dna_type_id = dna_type_id
@@ -695,7 +750,7 @@ class DNAStruct:
self.user_data = None
def __repr__(self):
return '%s(%r)' % (type(self).__qualname__, self.dna_type_id)
return "%s(%r)" % (type(self).__qualname__, self.dna_type_id)
def field_from_path(self, header, handle, path):
"""
@@ -709,7 +764,7 @@ class DNAStruct:
if len(path) >= 2 and type(path[1]) is not bytes:
name_tail = path[2:]
index = path[1]
assert(type(index) is int)
assert type(index) is int
else:
name_tail = path[1:]
index = 0
@@ -718,7 +773,7 @@ class DNAStruct:
name_tail = None
index = 0
assert(type(name) is bytes)
assert type(name) is bytes
field = self.field_from_name.get(name)
@@ -729,47 +784,69 @@ class DNAStruct:
index_offset = header.pointer_size * index
else:
index_offset = field.dna_type.size * index
assert(index_offset < field.dna_size)
assert index_offset < field.dna_size
handle.seek(index_offset, os.SEEK_CUR)
if not name_tail: # None or ()
return field
else:
return field.dna_type.field_from_path(header, handle, name_tail)
def field_get(self, header, handle, path,
default=...,
use_nil=True, use_str=True,
):
def field_get(
self,
header,
handle,
path,
default=...,
use_nil=True,
use_str=True,
):
field = self.field_from_path(header, handle, path)
if field is None:
if default is not ...:
return default
else:
raise KeyError("%r not found in %r (%r)" %
(path, [f.dna_name.name_only for f in self.fields], self.dna_type_id))
raise KeyError(
"%r not found in %r (%r)"
% (
path,
[f.dna_name.name_only for f in self.fields],
self.dna_type_id,
)
)
dna_type = field.dna_type
dna_name = field.dna_name
if dna_name.is_pointer:
return DNA_IO.read_pointer(handle, header)
elif dna_type.dna_type_id == b'int':
elif dna_type.dna_type_id == b"int":
if dna_name.array_size > 1:
return [DNA_IO.read_int(handle, header) for i in range(dna_name.array_size)]
return [
DNA_IO.read_int(handle, header) for i in range(dna_name.array_size)
]
return DNA_IO.read_int(handle, header)
elif dna_type.dna_type_id == b'short':
elif dna_type.dna_type_id == b"short":
if dna_name.array_size > 1:
return [DNA_IO.read_short(handle, header) for i in range(dna_name.array_size)]
return [
DNA_IO.read_short(handle, header)
for i in range(dna_name.array_size)
]
return DNA_IO.read_short(handle, header)
elif dna_type.dna_type_id == b'uint64_t':
elif dna_type.dna_type_id == b"uint64_t":
if dna_name.array_size > 1:
return [DNA_IO.read_ulong(handle, header) for i in range(dna_name.array_size)]
return [
DNA_IO.read_ulong(handle, header)
for i in range(dna_name.array_size)
]
return DNA_IO.read_ulong(handle, header)
elif dna_type.dna_type_id == b'float':
elif dna_type.dna_type_id == b"float":
if dna_name.array_size > 1:
return [DNA_IO.read_float(handle, header) for i in range(dna_name.array_size)]
return [
DNA_IO.read_float(handle, header)
for i in range(dna_name.array_size)
]
return DNA_IO.read_float(handle, header)
elif dna_type.dna_type_id == b'char':
elif dna_type.dna_type_id == b"char":
if use_str:
if use_nil:
return DNA_IO.read_string0(handle, dna_name.array_size)
@@ -781,30 +858,39 @@ class DNAStruct:
else:
return DNA_IO.read_bytes(handle, dna_name.array_size)
else:
raise NotImplementedError("%r exists but isn't pointer, can't resolve field %r" %
(path, dna_name.name_only), dna_name, dna_type)
raise NotImplementedError(
"%r exists but isn't pointer, can't resolve field %r"
% (path, dna_name.name_only),
dna_name,
dna_type,
)
def field_set(self, header, handle, path, value):
assert(type(path) == bytes)
assert type(path) == bytes
field = self.field_from_path(header, handle, path)
if field is None:
raise KeyError("%r not found in %r" %
(path, [f.dna_name.name_only for f in self.fields]))
raise KeyError(
"%r not found in %r"
% (path, [f.dna_name.name_only for f in self.fields])
)
dna_type = field.dna_type
dna_name = field.dna_name
if dna_type.dna_type_id == b'char':
if dna_type.dna_type_id == b"char":
if type(value) is str:
return DNA_IO.write_string(handle, value, dna_name.array_size)
else:
return DNA_IO.write_bytes(handle, value, dna_name.array_size)
elif dna_type.dna_type_id == b'int':
elif dna_type.dna_type_id == b"int":
DNA_IO.write_int(handle, header, value)
else:
raise NotImplementedError("Setting %r is not yet supported for %r" %
(dna_type, dna_name), dna_name, dna_type)
raise NotImplementedError(
"Setting %r is not yet supported for %r" % (dna_type, dna_name),
dna_name,
dna_type,
)
class DNA_IO:
@@ -821,20 +907,20 @@ class DNA_IO:
@staticmethod
def write_string(handle, astring, fieldlen):
assert(isinstance(astring, str))
assert isinstance(astring, str)
if len(astring) >= fieldlen:
stringw = astring[0:fieldlen]
else:
stringw = astring + '\0'
handle.write(stringw.encode('utf-8'))
stringw = astring + "\0"
handle.write(stringw.encode("utf-8"))
@staticmethod
def write_bytes(handle, astring, fieldlen):
assert(isinstance(astring, (bytes, bytearray)))
assert isinstance(astring, (bytes, bytearray))
if len(astring) >= fieldlen:
stringw = astring[0:fieldlen]
else:
stringw = astring + b'\0'
stringw = astring + b"\0"
handle.write(stringw)
@@ -850,44 +936,44 @@ class DNA_IO:
@staticmethod
def read_string(handle, length):
return DNA_IO.read_bytes(handle, length).decode('utf-8')
return DNA_IO.read_bytes(handle, length).decode("utf-8")
@staticmethod
def read_string0(handle, length):
return DNA_IO.read_bytes0(handle, length).decode('utf-8')
return DNA_IO.read_bytes0(handle, length).decode("utf-8")
@staticmethod
def read_data0_offset(data, offset):
add = data.find(b'\0', offset) - offset
return data[offset:offset + add]
add = data.find(b"\0", offset) - offset
return data[offset : offset + add]
@staticmethod
def read_data0(data):
add = data.find(b'\0')
add = data.find(b"\0")
return data[:add]
USHORT = struct.Struct(b'<H'), struct.Struct(b'>H')
USHORT = struct.Struct(b"<H"), struct.Struct(b">H")
@staticmethod
def read_ushort(handle, fileheader):
st = DNA_IO.USHORT[fileheader.endian_index]
return st.unpack(handle.read(st.size))[0]
SSHORT = struct.Struct(b'<h'), struct.Struct(b'>h')
SSHORT = struct.Struct(b"<h"), struct.Struct(b">h")
@staticmethod
def read_short(handle, fileheader):
st = DNA_IO.SSHORT[fileheader.endian_index]
return st.unpack(handle.read(st.size))[0]
UINT = struct.Struct(b'<I'), struct.Struct(b'>I')
UINT = struct.Struct(b"<I"), struct.Struct(b">I")
@staticmethod
def read_uint(handle, fileheader):
st = DNA_IO.UINT[fileheader.endian_index]
return st.unpack(handle.read(st.size))[0]
SINT = struct.Struct(b'<i'), struct.Struct(b'>i')
SINT = struct.Struct(b"<i"), struct.Struct(b">i")
@staticmethod
def read_int(handle, fileheader):
@@ -896,19 +982,22 @@ class DNA_IO:
@staticmethod
def write_int(handle, fileheader, value):
assert isinstance(value, int), 'value must be int, but is %r: %r' % (type(value), value)
assert isinstance(value, int), "value must be int, but is %r: %r" % (
type(value),
value,
)
st = DNA_IO.SINT[fileheader.endian_index]
to_write = st.pack(value)
handle.write(to_write)
FLOAT = struct.Struct(b'<f'), struct.Struct(b'>f')
FLOAT = struct.Struct(b"<f"), struct.Struct(b">f")
@staticmethod
def read_float(handle, fileheader):
st = DNA_IO.FLOAT[fileheader.endian_index]
return st.unpack(handle.read(st.size))[0]
ULONG = struct.Struct(b'<Q'), struct.Struct(b'>Q')
ULONG = struct.Struct(b"<Q"), struct.Struct(b">Q")
@staticmethod
def read_ulong(handle, fileheader):