Nathan Craddock
874024a367
Update the blendfile module to support iteration through array theme data. This removes the need to hard code the theme color values for bone color sets and collection colors. Differential Revision: https://developer.blender.org/D9482
964 lines
31 KiB
Python
964 lines
31 KiB
Python
# ***** BEGIN GPL LICENSE BLOCK *****
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
#
|
|
# ***** END GPL LICENCE BLOCK *****
|
|
#
|
|
# (c) 2009, At Mind B.V. - Jeroen Bakker
|
|
# (c) 2014, Blender Foundation - Campbell Barton
|
|
|
|
import gzip
|
|
import logging
|
|
import os
|
|
import struct
|
|
import tempfile
|
|
|
|
log = logging.getLogger("blendfile")
|
|
|
|
FILE_BUFFER_SIZE = 1024 * 1024
|
|
|
|
|
|
class BlendFileError(Exception):
|
|
"""Raised when there was an error reading/parsing a blend file."""
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# module global routines
|
|
#
|
|
# read routines
|
|
# open a filename
|
|
# determine if the file is compressed
|
|
# and returns a handle
|
|
def open_blend(filename, access="rb"):
|
|
"""Opens a blend file for reading or writing pending on the access
|
|
supports 2 kind of blend files. Uncompressed and compressed.
|
|
Known issue: does not support packaged blend files
|
|
"""
|
|
handle = open(filename, access)
|
|
magic_test = b"BLENDER"
|
|
magic = handle.read(len(magic_test))
|
|
if magic == magic_test:
|
|
log.debug("normal blendfile detected")
|
|
handle.seek(0, os.SEEK_SET)
|
|
bfile = BlendFile(handle)
|
|
bfile.is_compressed = False
|
|
bfile.filepath_orig = filename
|
|
return bfile
|
|
elif magic[:2] == b'\x1f\x8b':
|
|
log.debug("gzip blendfile detected")
|
|
handle.close()
|
|
log.debug("decompressing started")
|
|
fs = gzip.open(filename, "rb")
|
|
data = fs.read(FILE_BUFFER_SIZE)
|
|
magic = data[:len(magic_test)]
|
|
if magic == magic_test:
|
|
handle = tempfile.TemporaryFile()
|
|
while data:
|
|
handle.write(data)
|
|
data = fs.read(FILE_BUFFER_SIZE)
|
|
log.debug("decompressing finished")
|
|
fs.close()
|
|
log.debug("resetting decompressed file")
|
|
handle.seek(os.SEEK_SET, 0)
|
|
bfile = BlendFile(handle)
|
|
bfile.is_compressed = True
|
|
bfile.filepath_orig = filename
|
|
return bfile
|
|
else:
|
|
raise BlendFileError("filetype inside gzip not a blend")
|
|
else:
|
|
raise BlendFileError("filetype not a blend or a gzip blend")
|
|
|
|
|
|
def pad_up_4(offset):
|
|
return (offset + 3) & ~3
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# module classes
|
|
|
|
|
|
class BlendFile:
|
|
"""
|
|
Blend file.
|
|
"""
|
|
__slots__ = (
|
|
# file (result of open())
|
|
"handle",
|
|
# str (original name of the file path)
|
|
"filepath_orig",
|
|
# BlendFileHeader
|
|
"header",
|
|
# struct.Struct
|
|
"block_header_struct",
|
|
# BlendFileBlock
|
|
"blocks",
|
|
# [DNAStruct, ...]
|
|
"structs",
|
|
# dict {b'StructName': sdna_index}
|
|
# (where the index is an index into 'structs')
|
|
"sdna_index_from_id",
|
|
# dict {addr_old: block}
|
|
"block_from_offset",
|
|
# int
|
|
"code_index",
|
|
# bool (did we make a change)
|
|
"is_modified",
|
|
# bool (is file gzipped)
|
|
"is_compressed",
|
|
)
|
|
|
|
def __init__(self, handle):
|
|
log.debug("initializing reading blend-file")
|
|
self.handle = handle
|
|
self.header = BlendFileHeader(handle)
|
|
self.block_header_struct = self.header.create_block_header_struct()
|
|
self.blocks = []
|
|
self.code_index = {}
|
|
self.structs = []
|
|
self.sdna_index_from_id = {}
|
|
|
|
block = BlendFileBlock(handle, self)
|
|
while block.code != b'ENDB':
|
|
if block.code == b'DNA1':
|
|
(self.structs,
|
|
self.sdna_index_from_id,
|
|
) = BlendFile.decode_structs(self.header, block, handle)
|
|
else:
|
|
handle.seek(block.size, os.SEEK_CUR)
|
|
|
|
self.blocks.append(block)
|
|
self.code_index.setdefault(block.code, []).append(block)
|
|
|
|
block = BlendFileBlock(handle, self)
|
|
self.is_modified = False
|
|
self.blocks.append(block)
|
|
|
|
if not self.structs:
|
|
raise BlendFileError("No DNA1 block in file, this is not a valid .blend file!")
|
|
|
|
# Cache (could lazy init, in case we never use?).
|
|
self.block_from_offset = {block.addr_old: block for block in self.blocks if block.code != b'ENDB'}
|
|
|
|
def __repr__(self):
|
|
return '<%s %r>' % (self.__class__.__qualname__, self.handle)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.close()
|
|
|
|
def find_blocks_from_code(self, code):
|
|
assert(type(code) == bytes)
|
|
if code not in self.code_index:
|
|
return []
|
|
return self.code_index[code]
|
|
|
|
def find_block_from_offset(self, offset):
|
|
# same as looking looping over all blocks,
|
|
# then checking ``block.addr_old == offset``
|
|
assert(type(offset) is int)
|
|
return self.block_from_offset.get(offset)
|
|
|
|
def close(self):
|
|
"""
|
|
Close the blend file
|
|
writes the blend file to disk if changes has happened
|
|
"""
|
|
handle = self.handle
|
|
|
|
if self.is_modified:
|
|
if self.is_compressed:
|
|
log.debug("close compressed blend file")
|
|
handle.seek(os.SEEK_SET, 0)
|
|
log.debug("compressing started")
|
|
fs = gzip.open(self.filepath_orig, "wb")
|
|
data = handle.read(FILE_BUFFER_SIZE)
|
|
while data:
|
|
fs.write(data)
|
|
data = handle.read(FILE_BUFFER_SIZE)
|
|
fs.close()
|
|
log.debug("compressing finished")
|
|
|
|
handle.close()
|
|
|
|
def ensure_subtype_smaller(self, sdna_index_curr, sdna_index_next):
|
|
# never refine to a smaller type
|
|
if (self.structs[sdna_index_curr].size >
|
|
self.structs[sdna_index_next].size):
|
|
|
|
raise RuntimeError("cant refine to smaller type (%s -> %s)" %
|
|
(self.structs[sdna_index_curr].dna_type_id.decode('ascii'),
|
|
self.structs[sdna_index_next].dna_type_id.decode('ascii')))
|
|
|
|
@staticmethod
|
|
def decode_structs(header, block, handle):
|
|
"""
|
|
DNACatalog is a catalog of all information in the DNA1 file-block
|
|
"""
|
|
log.debug("building DNA catalog")
|
|
shortstruct = DNA_IO.USHORT[header.endian_index]
|
|
shortstruct2 = struct.Struct(header.endian_str + b'HH')
|
|
intstruct = DNA_IO.UINT[header.endian_index]
|
|
|
|
data = handle.read(block.size)
|
|
types = []
|
|
names = []
|
|
|
|
structs = []
|
|
sdna_index_from_id = {}
|
|
|
|
offset = 8
|
|
names_len = intstruct.unpack_from(data, offset)[0]
|
|
offset += 4
|
|
|
|
log.debug("building #%d names" % names_len)
|
|
for i in range(names_len):
|
|
tName = DNA_IO.read_data0_offset(data, offset)
|
|
offset = offset + len(tName) + 1
|
|
names.append(DNAName(tName))
|
|
del names_len
|
|
|
|
offset = pad_up_4(offset)
|
|
offset += 4
|
|
types_len = intstruct.unpack_from(data, offset)[0]
|
|
offset += 4
|
|
log.debug("building #%d types" % types_len)
|
|
for i in range(types_len):
|
|
dna_type_id = DNA_IO.read_data0_offset(data, offset)
|
|
# None will be replaced by the DNAStruct, below
|
|
types.append(DNAStruct(dna_type_id))
|
|
offset += len(dna_type_id) + 1
|
|
|
|
offset = pad_up_4(offset)
|
|
offset += 4
|
|
log.debug("building #%d type-lengths" % types_len)
|
|
for i in range(types_len):
|
|
tLen = shortstruct.unpack_from(data, offset)[0]
|
|
offset = offset + 2
|
|
types[i].size = tLen
|
|
del types_len
|
|
|
|
offset = pad_up_4(offset)
|
|
offset += 4
|
|
|
|
structs_len = intstruct.unpack_from(data, offset)[0]
|
|
offset += 4
|
|
log.debug("building #%d structures" % structs_len)
|
|
for sdna_index in range(structs_len):
|
|
d = shortstruct2.unpack_from(data, offset)
|
|
struct_type_index = d[0]
|
|
offset += 4
|
|
dna_struct = types[struct_type_index]
|
|
sdna_index_from_id[dna_struct.dna_type_id] = sdna_index
|
|
structs.append(dna_struct)
|
|
|
|
fields_len = d[1]
|
|
dna_offset = 0
|
|
|
|
for field_index in range(fields_len):
|
|
d2 = shortstruct2.unpack_from(data, offset)
|
|
field_type_index = d2[0]
|
|
field_name_index = d2[1]
|
|
offset += 4
|
|
dna_type = types[field_type_index]
|
|
dna_name = names[field_name_index]
|
|
if dna_name.is_pointer or dna_name.is_method_pointer:
|
|
dna_size = header.pointer_size * dna_name.array_size
|
|
else:
|
|
dna_size = dna_type.size * dna_name.array_size
|
|
|
|
field = DNAField(dna_type, dna_name, dna_size, dna_offset)
|
|
dna_struct.fields.append(field)
|
|
dna_struct.field_from_name[dna_name.name_only] = field
|
|
dna_offset += dna_size
|
|
|
|
return structs, sdna_index_from_id
|
|
|
|
|
|
class BlendFileBlock:
|
|
"""
|
|
Instance of a struct.
|
|
"""
|
|
__slots__ = (
|
|
# BlendFile
|
|
"file",
|
|
"code",
|
|
"size",
|
|
"addr_old",
|
|
"sdna_index",
|
|
"count",
|
|
"file_offset",
|
|
"user_data",
|
|
)
|
|
|
|
def __str__(self):
|
|
return ("<%s.%s (%s), size=%d at %s>" %
|
|
# fields=[%s]
|
|
(self.__class__.__name__,
|
|
self.dna_type_name,
|
|
self.code.decode(),
|
|
self.size,
|
|
# b", ".join(f.dna_name.name_only for f in self.dna_type.fields).decode('ascii'),
|
|
hex(self.addr_old),
|
|
))
|
|
|
|
def __init__(self, handle, bfile):
|
|
OLDBLOCK = struct.Struct(b'4sI')
|
|
|
|
self.file = bfile
|
|
self.user_data = None
|
|
|
|
data = handle.read(bfile.block_header_struct.size)
|
|
|
|
if len(data) != bfile.block_header_struct.size:
|
|
print("WARNING! Blend file seems to be badly truncated!")
|
|
self.code = b'ENDB'
|
|
self.size = 0
|
|
self.addr_old = 0
|
|
self.sdna_index = 0
|
|
self.count = 0
|
|
self.file_offset = 0
|
|
return
|
|
# header size can be 8, 20, or 24 bytes long
|
|
# 8: old blend files ENDB block (exception)
|
|
# 20: normal headers 32 bit platform
|
|
# 24: normal headers 64 bit platform
|
|
if len(data) > 15:
|
|
blockheader = bfile.block_header_struct.unpack(data)
|
|
self.code = blockheader[0].partition(b'\0')[0]
|
|
if self.code != b'ENDB':
|
|
self.size = blockheader[1]
|
|
self.addr_old = blockheader[2]
|
|
self.sdna_index = blockheader[3]
|
|
self.count = blockheader[4]
|
|
self.file_offset = handle.tell()
|
|
else:
|
|
self.size = 0
|
|
self.addr_old = 0
|
|
self.sdna_index = 0
|
|
self.count = 0
|
|
self.file_offset = 0
|
|
else:
|
|
blockheader = OLDBLOCK.unpack(data)
|
|
self.code = blockheader[0].partition(b'\0')[0]
|
|
self.code = DNA_IO.read_data0(blockheader[0])
|
|
self.size = 0
|
|
self.addr_old = 0
|
|
self.sdna_index = 0
|
|
self.count = 0
|
|
self.file_offset = 0
|
|
|
|
@property
|
|
def dna_type(self):
|
|
return self.file.structs[self.sdna_index]
|
|
|
|
@property
|
|
def dna_type_name(self):
|
|
return self.dna_type.dna_type_id.decode('ascii')
|
|
|
|
def refine_type_from_index(self, sdna_index_next):
|
|
assert(type(sdna_index_next) is int)
|
|
sdna_index_curr = self.sdna_index
|
|
self.file.ensure_subtype_smaller(sdna_index_curr, sdna_index_next)
|
|
self.sdna_index = sdna_index_next
|
|
|
|
def refine_type(self, dna_type_id):
|
|
assert(type(dna_type_id) is bytes)
|
|
self.refine_type_from_index(self.file.sdna_index_from_id[dna_type_id])
|
|
|
|
def get_file_offset(self, path,
|
|
default=...,
|
|
sdna_index_refine=None,
|
|
base_index=0,
|
|
):
|
|
"""
|
|
Return (offset, length)
|
|
"""
|
|
assert(type(path) is bytes)
|
|
|
|
ofs = self.file_offset
|
|
if base_index != 0:
|
|
assert(base_index < self.count)
|
|
ofs += (self.size // self.count) * base_index
|
|
self.file.handle.seek(ofs, os.SEEK_SET)
|
|
|
|
if sdna_index_refine is None:
|
|
sdna_index_refine = self.sdna_index
|
|
else:
|
|
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
|
|
|
|
dna_struct = self.file.structs[sdna_index_refine]
|
|
field = dna_struct.field_from_path(
|
|
self.file.header, self.file.handle, path)
|
|
|
|
return (self.file.handle.tell(), field.dna_name.array_size)
|
|
|
|
def get(self, path,
|
|
default=...,
|
|
sdna_index_refine=None,
|
|
use_nil=True, use_str=True,
|
|
base_index=0,
|
|
):
|
|
|
|
ofs = self.file_offset
|
|
if base_index != 0:
|
|
assert(base_index < self.count)
|
|
ofs += (self.size // self.count) * base_index
|
|
self.file.handle.seek(ofs, os.SEEK_SET)
|
|
|
|
if sdna_index_refine is None:
|
|
sdna_index_refine = self.sdna_index
|
|
else:
|
|
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
|
|
|
|
dna_struct = self.file.structs[sdna_index_refine]
|
|
return dna_struct.field_get(
|
|
self.file.header, self.file.handle, path,
|
|
default=default,
|
|
use_nil=use_nil, use_str=use_str,
|
|
)
|
|
|
|
def get_recursive_iter(self, path, path_root=b"",
|
|
default=...,
|
|
sdna_index_refine=None,
|
|
use_nil=True, use_str=True,
|
|
base_index=0,
|
|
):
|
|
if path_root:
|
|
path_full = (
|
|
(path_root if type(path_root) is tuple else (path_root, )) +
|
|
(path if type(path) is tuple else (path, )))
|
|
else:
|
|
path_full = path
|
|
|
|
try:
|
|
yield (path_full, self.get(path_full, default, sdna_index_refine, use_nil, use_str, base_index))
|
|
except NotImplementedError as ex:
|
|
msg, dna_name, dna_type = ex.args
|
|
struct_index = self.file.sdna_index_from_id.get(dna_type.dna_type_id, None)
|
|
if struct_index is None:
|
|
yield (path_full, "<%s>" % dna_type.dna_type_id.decode('ascii'))
|
|
else:
|
|
struct = self.file.structs[struct_index]
|
|
if dna_name.array_size > 1:
|
|
for index in range(dna_name.array_size):
|
|
for f in struct.fields:
|
|
yield from self.get_recursive_iter(
|
|
(index, f.dna_name.name_only), path_full,
|
|
default, None, use_nil, use_str, 0)
|
|
else:
|
|
for f in struct.fields:
|
|
yield from self.get_recursive_iter(
|
|
f.dna_name.name_only, path_full,
|
|
default, None, use_nil, use_str, 0)
|
|
def items_recursive_iter(self, use_nil=True):
|
|
for k in self.keys():
|
|
yield from self.get_recursive_iter(k, use_nil=use_nil, use_str=False)
|
|
|
|
def get_data_hash(self):
|
|
"""
|
|
Generates a 'hash' that can be used instead of addr_old as block id, and that should be 'stable' across .blend
|
|
file load & save (i.e. it does not changes due to pointer addresses variations).
|
|
"""
|
|
# TODO This implementation is most likely far from optimal... and CRC32 is not renown as the best hashing
|
|
# algo either. But for now does the job!
|
|
import zlib
|
|
def _is_pointer(self, k):
|
|
return self.file.structs[self.sdna_index].field_from_path(
|
|
self.file.header, self.file.handle, k).dna_name.is_pointer
|
|
|
|
hsh = 1
|
|
for k, v in self.items_recursive_iter():
|
|
if not _is_pointer(self, k):
|
|
hsh = zlib.adler32(str(v).encode(), hsh)
|
|
return hsh
|
|
|
|
def set(self, path, value,
|
|
sdna_index_refine=None,
|
|
):
|
|
|
|
if sdna_index_refine is None:
|
|
sdna_index_refine = self.sdna_index
|
|
else:
|
|
self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine)
|
|
|
|
dna_struct = self.file.structs[sdna_index_refine]
|
|
self.file.handle.seek(self.file_offset, os.SEEK_SET)
|
|
self.file.is_modified = True
|
|
return dna_struct.field_set(
|
|
self.file.header, self.file.handle, path, value)
|
|
|
|
# ---------------
|
|
# Utility get/set
|
|
#
|
|
# avoid inline pointer casting
|
|
def get_pointer(
|
|
self, path,
|
|
default=...,
|
|
sdna_index_refine=None,
|
|
base_index=0,
|
|
):
|
|
if sdna_index_refine is None:
|
|
sdna_index_refine = self.sdna_index
|
|
result = self.get(path, default, sdna_index_refine=sdna_index_refine, base_index=base_index)
|
|
|
|
# default
|
|
if type(result) is not int:
|
|
return result
|
|
|
|
assert(self.file.structs[sdna_index_refine].field_from_path(
|
|
self.file.header, self.file.handle, path).dna_name.is_pointer)
|
|
if result != 0:
|
|
# possible (but unlikely)
|
|
# that this fails and returns None
|
|
# maybe we want to raise some exception in this case
|
|
return self.file.find_block_from_offset(result)
|
|
else:
|
|
return None
|
|
|
|
# ----------------------
|
|
# Python convenience API
|
|
|
|
# dict like access
|
|
def __getitem__(self, item):
|
|
return self.get(item, use_str=False)
|
|
|
|
def __setitem__(self, item, value):
|
|
self.set(item, value)
|
|
|
|
def keys(self):
|
|
return (f.dna_name.name_only for f in self.dna_type.fields)
|
|
|
|
def values(self):
|
|
for k in self.keys():
|
|
try:
|
|
yield self[k]
|
|
except NotImplementedError as ex:
|
|
msg, dna_name, dna_type = ex.args
|
|
yield "<%s>" % dna_type.dna_type_id.decode('ascii')
|
|
|
|
def items(self):
|
|
for k in self.keys():
|
|
try:
|
|
yield (k, self[k])
|
|
except NotImplementedError as ex:
|
|
msg, dna_name, dna_type = ex.args
|
|
yield (k, "<%s>" % dna_type.dna_type_id.decode('ascii'))
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Read Magic
|
|
#
|
|
# magic = str
|
|
# pointer_size = int
|
|
# is_little_endian = bool
|
|
# version = int
|
|
|
|
|
|
class BlendFileHeader:
|
|
"""
|
|
BlendFileHeader allocates the first 12 bytes of a blend file
|
|
it contains information about the hardware architecture
|
|
"""
|
|
__slots__ = (
|
|
# str
|
|
"magic",
|
|
# int 4/8
|
|
"pointer_size",
|
|
# bool
|
|
"is_little_endian",
|
|
# int
|
|
"version",
|
|
# str, used to pass to 'struct'
|
|
"endian_str",
|
|
# int, used to index common types
|
|
"endian_index",
|
|
)
|
|
|
|
def __init__(self, handle):
|
|
FILEHEADER = struct.Struct(b'7s1s1s3s')
|
|
|
|
log.debug("reading blend-file-header")
|
|
values = FILEHEADER.unpack(handle.read(FILEHEADER.size))
|
|
self.magic = values[0]
|
|
pointer_size_id = values[1]
|
|
if pointer_size_id == b'-':
|
|
self.pointer_size = 8
|
|
elif pointer_size_id == b'_':
|
|
self.pointer_size = 4
|
|
else:
|
|
assert(0)
|
|
endian_id = values[2]
|
|
if endian_id == b'v':
|
|
self.is_little_endian = True
|
|
self.endian_str = b'<'
|
|
self.endian_index = 0
|
|
elif endian_id == b'V':
|
|
self.is_little_endian = False
|
|
self.endian_index = 1
|
|
self.endian_str = b'>'
|
|
else:
|
|
assert(0)
|
|
|
|
version_id = values[3]
|
|
self.version = int(version_id)
|
|
|
|
def create_block_header_struct(self):
|
|
return struct.Struct(b''.join((
|
|
self.endian_str,
|
|
b'4sI',
|
|
b'I' if self.pointer_size == 4 else b'Q',
|
|
b'II',
|
|
)))
|
|
|
|
|
|
class DNAName:
|
|
"""
|
|
DNAName is a C-type name stored in the DNA
|
|
"""
|
|
__slots__ = (
|
|
"name_full",
|
|
"name_only",
|
|
"is_pointer",
|
|
"is_method_pointer",
|
|
"array_size",
|
|
)
|
|
|
|
def __init__(self, name_full):
|
|
self.name_full = name_full
|
|
self.name_only = self.calc_name_only()
|
|
self.is_pointer = self.calc_is_pointer()
|
|
self.is_method_pointer = self.calc_is_method_pointer()
|
|
self.array_size = self.calc_array_size()
|
|
|
|
def __repr__(self):
|
|
return '%s(%r)' % (type(self).__qualname__, self.name_full)
|
|
|
|
def as_reference(self, parent):
|
|
if parent is None:
|
|
result = b''
|
|
else:
|
|
result = parent + b'.'
|
|
|
|
result = result + self.name_only
|
|
return result
|
|
|
|
def calc_name_only(self):
|
|
result = self.name_full.strip(b'*()')
|
|
index = result.find(b'[')
|
|
if index != -1:
|
|
result = result[:index]
|
|
return result
|
|
|
|
def calc_is_pointer(self):
|
|
return (b'*' in self.name_full)
|
|
|
|
def calc_is_method_pointer(self):
|
|
return (b'(*' in self.name_full)
|
|
|
|
def calc_array_size(self):
|
|
result = 1
|
|
temp = self.name_full
|
|
index = temp.find(b'[')
|
|
|
|
while index != -1:
|
|
index_2 = temp.find(b']')
|
|
result *= int(temp[index + 1:index_2])
|
|
temp = temp[index_2 + 1:]
|
|
index = temp.find(b'[')
|
|
|
|
return result
|
|
|
|
|
|
class DNAField:
|
|
"""
|
|
DNAField is a coupled DNAStruct and DNAName
|
|
and cache offset for reuse
|
|
"""
|
|
__slots__ = (
|
|
# DNAName
|
|
"dna_name",
|
|
# tuple of 3 items
|
|
# [bytes (struct name), int (struct size), DNAStruct]
|
|
"dna_type",
|
|
# size on-disk
|
|
"dna_size",
|
|
# cached info (avoid looping over fields each time)
|
|
"dna_offset",
|
|
)
|
|
|
|
def __init__(self, dna_type, dna_name, dna_size, dna_offset):
|
|
self.dna_type = dna_type
|
|
self.dna_name = dna_name
|
|
self.dna_size = dna_size
|
|
self.dna_offset = dna_offset
|
|
|
|
|
|
class DNAStruct:
|
|
"""
|
|
DNAStruct is a C-type structure stored in the DNA
|
|
"""
|
|
__slots__ = (
|
|
"dna_type_id",
|
|
"size",
|
|
"fields",
|
|
"field_from_name",
|
|
"user_data",
|
|
)
|
|
|
|
def __init__(self, dna_type_id):
|
|
self.dna_type_id = dna_type_id
|
|
self.fields = []
|
|
self.field_from_name = {}
|
|
self.user_data = None
|
|
|
|
def __repr__(self):
|
|
return '%s(%r)' % (type(self).__qualname__, self.dna_type_id)
|
|
|
|
def field_from_path(self, header, handle, path):
|
|
"""
|
|
Support lookups as bytes or a tuple of bytes and optional index.
|
|
|
|
C style 'id.name' --> (b'id', b'name')
|
|
C style 'array[4]' --> ('array', 4)
|
|
"""
|
|
if type(path) is tuple:
|
|
name = path[0]
|
|
if len(path) >= 2 and type(path[1]) is not bytes:
|
|
name_tail = path[2:]
|
|
index = path[1]
|
|
assert(type(index) is int)
|
|
else:
|
|
name_tail = path[1:]
|
|
index = 0
|
|
else:
|
|
name = path
|
|
name_tail = None
|
|
index = 0
|
|
|
|
assert(type(name) is bytes)
|
|
|
|
field = self.field_from_name.get(name)
|
|
|
|
if field is not None:
|
|
handle.seek(field.dna_offset, os.SEEK_CUR)
|
|
if index != 0:
|
|
if field.dna_name.is_pointer:
|
|
index_offset = header.pointer_size * index
|
|
else:
|
|
index_offset = field.dna_type.size * index
|
|
assert(index_offset < field.dna_size)
|
|
handle.seek(index_offset, os.SEEK_CUR)
|
|
if not name_tail: # None or ()
|
|
return field
|
|
else:
|
|
return field.dna_type.field_from_path(header, handle, name_tail)
|
|
|
|
def field_get(self, header, handle, path,
|
|
default=...,
|
|
use_nil=True, use_str=True,
|
|
):
|
|
field = self.field_from_path(header, handle, path)
|
|
if field is None:
|
|
if default is not ...:
|
|
return default
|
|
else:
|
|
raise KeyError("%r not found in %r (%r)" %
|
|
(path, [f.dna_name.name_only for f in self.fields], self.dna_type_id))
|
|
|
|
dna_type = field.dna_type
|
|
dna_name = field.dna_name
|
|
dna_size = field.dna_size
|
|
|
|
if dna_name.is_pointer:
|
|
return DNA_IO.read_pointer(handle, header)
|
|
elif dna_type.dna_type_id == b'int':
|
|
if dna_name.array_size > 1:
|
|
return [DNA_IO.read_int(handle, header) for i in range(dna_name.array_size)]
|
|
return DNA_IO.read_int(handle, header)
|
|
elif dna_type.dna_type_id == b'short':
|
|
if dna_name.array_size > 1:
|
|
return [DNA_IO.read_short(handle, header) for i in range(dna_name.array_size)]
|
|
return DNA_IO.read_short(handle, header)
|
|
elif dna_type.dna_type_id == b'uint64_t':
|
|
if dna_name.array_size > 1:
|
|
return [DNA_IO.read_ulong(handle, header) for i in range(dna_name.array_size)]
|
|
return DNA_IO.read_ulong(handle, header)
|
|
elif dna_type.dna_type_id == b'float':
|
|
if dna_name.array_size > 1:
|
|
return [DNA_IO.read_float(handle, header) for i in range(dna_name.array_size)]
|
|
return DNA_IO.read_float(handle, header)
|
|
elif dna_type.dna_type_id == b'char':
|
|
if dna_size == 1:
|
|
# Single char, assume it's bitflag or int value, and not a string/bytes data...
|
|
return DNA_IO.read_char(handle, header)
|
|
if use_str:
|
|
if use_nil:
|
|
return DNA_IO.read_string0(handle, dna_name.array_size)
|
|
else:
|
|
return DNA_IO.read_string(handle, dna_name.array_size)
|
|
else:
|
|
if use_nil:
|
|
return DNA_IO.read_bytes0(handle, dna_name.array_size)
|
|
else:
|
|
return DNA_IO.read_bytes(handle, dna_name.array_size)
|
|
else:
|
|
raise NotImplementedError("%r exists but isn't pointer, can't resolve field %r" %
|
|
(path, dna_name.name_only), dna_name, dna_type)
|
|
|
|
def field_set(self, header, handle, path, value):
|
|
assert(type(path) == bytes)
|
|
|
|
field = self.field_from_path(header, handle, path)
|
|
if field is None:
|
|
raise KeyError("%r not found in %r" %
|
|
(path, [f.dna_name.name_only for f in self.fields]))
|
|
|
|
dna_type = field.dna_type
|
|
dna_name = field.dna_name
|
|
|
|
if dna_type.dna_type_id == b'char':
|
|
if type(value) is str:
|
|
return DNA_IO.write_string(handle, value, dna_name.array_size)
|
|
else:
|
|
return DNA_IO.write_bytes(handle, value, dna_name.array_size)
|
|
else:
|
|
raise NotImplementedError("Setting %r is not yet supported for %r" %
|
|
(dna_type, dna_name), dna_name, dna_type)
|
|
|
|
|
|
class DNA_IO:
|
|
"""
|
|
Module like class, for read-write utility functions.
|
|
|
|
Only stores static methods & constants.
|
|
"""
|
|
|
|
__slots__ = ()
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
raise RuntimeError("%s should not be instantiated" % cls)
|
|
|
|
@staticmethod
|
|
def write_string(handle, astring, fieldlen):
|
|
assert(isinstance(astring, str))
|
|
if len(astring) >= fieldlen:
|
|
stringw = astring[0:fieldlen]
|
|
else:
|
|
stringw = astring + '\0'
|
|
handle.write(stringw.encode('utf-8'))
|
|
|
|
@staticmethod
|
|
def write_bytes(handle, astring, fieldlen):
|
|
assert(isinstance(astring, (bytes, bytearray)))
|
|
if len(astring) >= fieldlen:
|
|
stringw = astring[0:fieldlen]
|
|
else:
|
|
stringw = astring + b'\0'
|
|
|
|
handle.write(stringw)
|
|
|
|
@staticmethod
|
|
def read_bytes(handle, length):
|
|
data = handle.read(length)
|
|
return data
|
|
|
|
@staticmethod
|
|
def read_bytes0(handle, length):
|
|
data = handle.read(length)
|
|
return DNA_IO.read_data0(data)
|
|
|
|
@staticmethod
|
|
def read_string(handle, length):
|
|
return DNA_IO.read_bytes(handle, length).decode('utf-8')
|
|
|
|
@staticmethod
|
|
def read_string0(handle, length):
|
|
return DNA_IO.read_bytes0(handle, length).decode('utf-8')
|
|
|
|
@staticmethod
|
|
def read_data0_offset(data, offset):
|
|
add = data.find(b'\0', offset) - offset
|
|
return data[offset:offset + add]
|
|
|
|
@staticmethod
|
|
def read_data0(data):
|
|
add = data.find(b'\0')
|
|
return data[:add]
|
|
|
|
UCHAR = struct.Struct(b'<b'), struct.Struct(b'>b')
|
|
|
|
@staticmethod
|
|
def read_char(handle, fileheader):
|
|
st = DNA_IO.UCHAR[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
USHORT = struct.Struct(b'<H'), struct.Struct(b'>H')
|
|
|
|
@staticmethod
|
|
def read_ushort(handle, fileheader):
|
|
st = DNA_IO.USHORT[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
SSHORT = struct.Struct(b'<h'), struct.Struct(b'>h')
|
|
|
|
@staticmethod
|
|
def read_short(handle, fileheader):
|
|
st = DNA_IO.SSHORT[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
UINT = struct.Struct(b'<I'), struct.Struct(b'>I')
|
|
|
|
@staticmethod
|
|
def read_uint(handle, fileheader):
|
|
st = DNA_IO.UINT[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
SINT = struct.Struct(b'<i'), struct.Struct(b'>i')
|
|
|
|
@staticmethod
|
|
def read_int(handle, fileheader):
|
|
st = DNA_IO.SINT[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
FLOAT = struct.Struct(b'<f'), struct.Struct(b'>f')
|
|
|
|
@staticmethod
|
|
def read_float(handle, fileheader):
|
|
st = DNA_IO.FLOAT[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
ULONG = struct.Struct(b'<Q'), struct.Struct(b'>Q')
|
|
|
|
@staticmethod
|
|
def read_ulong(handle, fileheader):
|
|
st = DNA_IO.ULONG[fileheader.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
|
|
@staticmethod
|
|
def read_pointer(handle, header):
|
|
"""
|
|
reads an pointer from a file handle
|
|
the pointer size is given by the header (BlendFileHeader)
|
|
"""
|
|
if header.pointer_size == 4:
|
|
st = DNA_IO.UINT[header.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|
|
if header.pointer_size == 8:
|
|
st = DNA_IO.ULONG[header.endian_index]
|
|
return st.unpack(handle.read(st.size))[0]
|