Asset Pipeline v2 #145
@ -9,7 +9,15 @@ from .asset_mapping import AssetTransferMapping
|
||||
from . import constants
|
||||
|
||||
|
||||
def ownership_cleanup(obj, task_layer_name):
|
||||
def ownership_transfer_data_cleanup(
|
||||
obj: bpy.types.Object, task_layer_name: str
|
||||
) -> None:
|
||||
"""Remove Transfer Data ownership items if the corrisponding data is missing
|
||||
|
||||
Args:
|
||||
obj (bpy.types.Object): Object that contains the transfer data
|
||||
task_layer_name (str): Name of the current task layer that owns the data
|
||||
"""
|
||||
ownership = obj.transfer_data_ownership
|
||||
to_remove = []
|
||||
for item in ownership:
|
||||
@ -25,9 +33,29 @@ def ownership_cleanup(obj, task_layer_name):
|
||||
ownership.remove(ownership.keys().index(name))
|
||||
|
||||
|
||||
def ownership_get(local_col: str, task_layer_name: str, new_transfer_data):
|
||||
def ownership_get(
|
||||
local_col: bpy.types.Collection,
|
||||
task_layer_name: str,
|
||||
temp_transfer_data: bpy.types.CollectionProperty,
|
||||
) -> list[bpy.types.Object]:
|
||||
"""Find new transfer data owned by the local task layer.
|
||||
Marks items as owned by the local task layer if they are in the
|
||||
corrisponding task layer collection and have no owner.
|
||||
|
||||
Args:
|
||||
local_col (bpy.types.Collection): The top level asset collection that is local to the file
|
||||
task_layer_name (str): Name of the current task layer that will be the owner of the data
|
||||
temp_transfer_data (bpy.types.CollectionProperty): Collection property containing newly found
|
||||
data and the object that contains this data.
|
||||
|
||||
Returns:
|
||||
list[bpy.types.Object]: Returns a list of objects that have no owner and will not be included
|
||||
in the merge process
|
||||
"""
|
||||
invalid_objs = []
|
||||
task_layer_col_name = get_enum_item(constants.TASK_LAYER_ITEMS, task_layer_name)[1]
|
||||
task_layer_col_name = get_dict_tuple_item(
|
||||
constants.TASK_LAYER_ITEMS, task_layer_name
|
||||
)[1]
|
||||
task_layer_col = local_col.children.get(task_layer_col_name)
|
||||
for obj in local_col.all_objects:
|
||||
# Mark Asset ID Owner for objects in the current task layers collection
|
||||
@ -37,45 +65,61 @@ def ownership_get(local_col: str, task_layer_name: str, new_transfer_data):
|
||||
if obj.asset_id_owner == "NONE":
|
||||
invalid_objs.append(obj)
|
||||
continue
|
||||
ownership_cleanup(obj, task_layer_name)
|
||||
transfer_functions.get_vertex_groups(obj, task_layer_name, new_transfer_data)
|
||||
transfer_functions.get_material_slots(obj, task_layer_name, new_transfer_data)
|
||||
transfer_functions.get_modifiers(obj, task_layer_name, new_transfer_data)
|
||||
ownership_transfer_data_cleanup(obj, task_layer_name)
|
||||
transfer_functions.get_vertex_groups(obj, task_layer_name, temp_transfer_data)
|
||||
transfer_functions.get_material_slots(obj, task_layer_name, temp_transfer_data)
|
||||
transfer_functions.get_modifiers(obj, task_layer_name, temp_transfer_data)
|
||||
|
||||
return invalid_objs
|
||||
|
||||
|
||||
def ownership_set(new_transfer_data):
|
||||
for item in new_transfer_data:
|
||||
def ownership_set(temp_transfer_data: bpy.types.CollectionProperty) -> None:
|
||||
"""Add new transfer data items on each object found in the
|
||||
temp transfer data collection property
|
||||
|
||||
Args:
|
||||
temp_transfer_data (bpy.types.CollectionProperty): Collection property containing newly found
|
||||
data and the object that contains this data.
|
||||
"""
|
||||
for item in temp_transfer_data:
|
||||
ownership = item.obj.transfer_data_ownership
|
||||
transfer_core.transfer_data_add_entry(
|
||||
ownership, item.name, item.type, item.owner
|
||||
)
|
||||
|
||||
|
||||
def remap_user(source_datablock: bpy.data, target_datablock: bpy.data):
|
||||
"""Remap datablock and append name to datablock that has been remapped"""
|
||||
def remap_user(source_datablock: bpy.data, target_datablock: bpy.data) -> None:
|
||||
"""Remap datablock and append name to datablock that has been remapped
|
||||
|
||||
Args:
|
||||
source_datablock (bpy.data): datablock that will be replaced by the target
|
||||
target_datablock (bpy.data): datablock that will replace the source
|
||||
"""
|
||||
print(f"REMAPPING {source_datablock.name} to {target_datablock.name}")
|
||||
source_datablock.user_remap(target_datablock)
|
||||
source_datablock.name += "_Users_Remapped"
|
||||
|
||||
|
||||
def update_task_layer_objects(
|
||||
target_col: bpy.types.Collection,
|
||||
transfer_objs: list[bpy.types.Object],
|
||||
):
|
||||
# Link new obj to collection
|
||||
for transfer_obj in transfer_objs:
|
||||
obj_root_name = transfer_obj.name
|
||||
transfer_obj.name = f"{obj_root_name}"
|
||||
target_col.objects.link(transfer_obj)
|
||||
|
||||
|
||||
def merge_task_layer(
|
||||
context: bpy.types.Context,
|
||||
local_tls: list[str],
|
||||
target_file: Path,
|
||||
):
|
||||
external_file: Path,
|
||||
) -> None:
|
||||
"""Combines data from an external task layer collection in the local
|
||||
task layer collection. By finding the owner of each collection,
|
||||
object and transfer data item and keeping each layer of data via a copy
|
||||
from it's respective owners.
|
||||
|
||||
This ensures that objects owned by an external task layer will always be kept
|
||||
linked into the scene, and any local transfer data like a modifier will be applied
|
||||
ontop of that external object of vice versa. Ownership is stored in an objects properties,
|
||||
and map is created to match each object to it's respective owner.
|
||||
|
||||
Args:
|
||||
context: (bpy.types.Context): context of current .blend
|
||||
local_tls: (list[str]): list of task layers that are local to the current file
|
||||
external_file (Path): external file to pull data into the current file from
|
||||
"""
|
||||
local_col = context.scene.asset_status.asset_collection
|
||||
if not local_col:
|
||||
return "Unable to find Asset Collection"
|
||||
@ -84,7 +128,7 @@ def merge_task_layer(
|
||||
external_suffix = constants.EXTERNAL_SUFFIX
|
||||
asset_suffix.add_suffix_to_hierarchy(local_col, local_suffix)
|
||||
|
||||
appended_col = import_data_from_lib(target_file, "collections", col_base_name)
|
||||
appended_col = import_data_from_lib(external_file, "collections", col_base_name)
|
||||
asset_suffix.add_suffix_to_hierarchy(appended_col, external_suffix)
|
||||
|
||||
local_col = bpy.data.collections[f"{col_base_name}.{local_suffix}"]
|
||||
@ -107,13 +151,31 @@ def merge_task_layer(
|
||||
asset_suffix.remove_suffix_from_hierarchy(local_col)
|
||||
|
||||
|
||||
def find_file_version(file):
|
||||
return int(file.name.split(".")[1].replace("v", ""))
|
||||
def find_file_version(published_file: Path) -> int:
|
||||
"""Returns the version number from a published file's name
|
||||
|
||||
Args:
|
||||
file (Path): Path to a publish file, naming convention is
|
||||
asset_name.v{3-digit_version}.blend`
|
||||
|
||||
Returns:
|
||||
int: returns current version in filename as integer
|
||||
"""
|
||||
return int(published_file.name.split(".")[1].replace("v", ""))
|
||||
|
||||
|
||||
def get_next_published_file(
|
||||
current_file: Path, publish_type=constants.ACTIVE_PUBLISH_KEY
|
||||
):
|
||||
) -> Path:
|
||||
"""Returns the path where the next published file version should be saved to
|
||||
|
||||
Args:
|
||||
current_file (Path): Current file, which must be a task file at root of asset directory
|
||||
publish_type (_type_, optional): Publish type, 'publish', 'staged', 'review'. Defaults to 'publish'.
|
||||
|
||||
Returns:
|
||||
Path: Path where the next published file should be saved to, path doesn't exist yet
|
||||
""" """"""
|
||||
last_publish = find_latest_publish(current_file, publish_type)
|
||||
base_name = current_file.name.split(".")[0]
|
||||
publish_dir = current_file.parent.joinpath(publish_type)
|
||||
@ -126,7 +188,17 @@ def get_next_published_file(
|
||||
return publish_dir.joinpath(base_name + f".v" + new_version + ".blend")
|
||||
|
||||
|
||||
def find_all_published(current_file, publish_type):
|
||||
def find_all_published(current_file: Path, publish_type: str) -> list[Path]:
|
||||
"""Retuns a list of published files of a given type,
|
||||
each publish type is seperated into it's own folder at the
|
||||
root of the asset's directory
|
||||
Args:
|
||||
current_file (Path): Current file, which must be a task file at root of asset directory
|
||||
publish_type (_type_, optional): Publish type, 'publish', 'staged', 'review'. Defaults to 'publish'.
|
||||
|
||||
Returns:
|
||||
list[Path]: list of published files of a given publish type
|
||||
"""
|
||||
publish_dir = current_file.parent.joinpath(publish_type)
|
||||
if not publish_dir.exists():
|
||||
return
|
||||
@ -135,13 +207,34 @@ def find_all_published(current_file, publish_type):
|
||||
return published_files
|
||||
|
||||
|
||||
def find_latest_publish(current_file: Path, publish_type=constants.ACTIVE_PUBLISH_KEY):
|
||||
def find_latest_publish(
|
||||
current_file: Path, publish_type=constants.ACTIVE_PUBLISH_KEY
|
||||
) -> Path:
|
||||
"""Returns the path to the latest published file in a given folder
|
||||
|
||||
Args:
|
||||
current_file (Path): Current file, which must be a task file at root of asset directory
|
||||
publish_type (_type_, optional): Publish type, 'publish', 'staged', 'review'. Defaults to 'publish'.
|
||||
|
||||
Returns:
|
||||
Path: Path to latest publish file of a given publish type
|
||||
"""
|
||||
published_files = find_all_published(current_file, publish_type)
|
||||
if published_files:
|
||||
return published_files[-1]
|
||||
|
||||
|
||||
def find_sync_target(current_file: Path):
|
||||
def find_sync_target(current_file: Path) -> Path:
|
||||
"""Returns the latest published file to use as push/pull a.k.a sync target
|
||||
this will either be the latest active publish, or the latest staged asset if
|
||||
any asset is staged
|
||||
|
||||
Args:
|
||||
current_file (Path): Current file, which must be a task file at root of asset directory
|
||||
|
||||
Returns:
|
||||
Path: Path to latest active or staged publish file
|
||||
""" """"""
|
||||
latest_staged = find_latest_publish(
|
||||
current_file, publish_type=constants.STAGED_PUBLISH_KEY
|
||||
)
|
||||
@ -155,7 +248,19 @@ def import_data_from_lib(
|
||||
data_category: str,
|
||||
data_name: str,
|
||||
link: bool = False,
|
||||
):
|
||||
) -> bpy.data:
|
||||
"""Appends/Links data from an external file into the current file.
|
||||
|
||||
Args:
|
||||
libpath (Path): path to .blend file that contains library
|
||||
data_category (str): bpy.types, like object or collection
|
||||
data_name (str): name of datablock to link/append
|
||||
link (bool, optional): Set to link library otherwise append. Defaults to False.
|
||||
|
||||
Returns:
|
||||
bpy.data: returns whichever data_category/type that was linked/appended
|
||||
"""
|
||||
|
||||
noun = "Appended"
|
||||
if link:
|
||||
noun = "Linked"
|
||||
@ -191,7 +296,8 @@ def import_data_from_lib(
|
||||
return eval(f"bpy.data.{data_category}['{data_name}']")
|
||||
|
||||
|
||||
def get_task_layer_name_from_file():
|
||||
def get_task_layer_name_from_file() -> str:
|
||||
"""Returns task layer name found task's file name"""
|
||||
file_name = bpy.path.basename(bpy.context.blend_data.filepath)
|
||||
task_layer_name = file_name.split(".")[-2]
|
||||
if task_layer_name in constants.TASK_LAYER_KEYS:
|
||||
@ -199,6 +305,7 @@ def get_task_layer_name_from_file():
|
||||
|
||||
|
||||
def get_dict_tuple_item(dict: dict, key: str) -> tuple:
|
||||
"""For a dict of tuples, returns a dict item based on it's key"""
|
||||
for item in dict:
|
||||
if item[0] == key:
|
||||
return item
|
||||
|
Loading…
Reference in New Issue
Block a user