Blender Kitsu: Fix Gazu Module out of sync #119

Merged
Nick Alberelli merged 11 commits from :fix/restore-gazu-out-of-sync into main 2023-07-13 19:39:30 +02:00
45 changed files with 302 additions and 6623 deletions

1
.gitattributes vendored
View File

@ -3,3 +3,4 @@
*.mp4 filter=lfs diff=lfs merge=lfs -text *.mp4 filter=lfs diff=lfs merge=lfs -text
*.png filter=lfs diff=lfs merge=lfs -text *.png filter=lfs diff=lfs merge=lfs -text
*.jpg filter=lfs diff=lfs merge=lfs -text *.jpg filter=lfs diff=lfs merge=lfs -text
*.whl filter=lfs diff=lfs merge=lfs -text

View File

@ -263,6 +263,14 @@ blender-kitsu has different checks that are performed during file load or during
![image info](/media/addons/blender_kitsu/error_animation.jpg) ![image info](/media/addons/blender_kitsu/error_animation.jpg)
## Development
### Update Dependencies
To update the dependencies of `Blender_Kitsu` please follow these steps.
1. `cd scripts-blender/addons/blender_kitsu/wheels` To enter the directory of dependant modules
2. `rm -r *.whl` To remove any existing packages (or manually remove .whl files if you are on windows)
3. `pip download gazu` to get the latest gazu and it's dependencies as wheels
4. `rm certifi* charset_normalizer* idna* requests* urllib3* websocket_client*` to remove the modules that are already included in blender
## Troubleshoot ## Troubleshoot
blender-kitsu makes good use of logging and status reports. Most of the operators report information in the blender info bar. More detailed logs can be found in the blender system console. If you feel like anything went wrong, consider opening a console and check the logs. blender-kitsu makes good use of logging and status reports. Most of the operators report information in the blender info bar. More detailed logs can be found in the blender system console. If you feel like anything went wrong, consider opening a console and check the logs.

View File

@ -18,7 +18,10 @@
# #
# (c) 2021, Blender Foundation - Paul Golter # (c) 2021, Blender Foundation - Paul Golter
import bpy from . import dependencies
dependencies.preload_modules()
from blender_kitsu import ( from blender_kitsu import (
shot_builder, shot_builder,
lookdev, lookdev,
@ -41,7 +44,6 @@ from blender_kitsu import (
) )
from blender_kitsu.logger import LoggerFactory, LoggerLevelManager from blender_kitsu.logger import LoggerFactory, LoggerLevelManager
logger = LoggerFactory.getLogger(__name__) logger = LoggerFactory.getLogger(__name__)
@ -96,7 +98,6 @@ def register():
playblast.register() playblast.register()
anim.register() anim.register()
shot_builder.register() shot_builder.register()
LoggerLevelManager.configure_levels() LoggerLevelManager.configure_levels()
logger.info("Registered blender-kitsu") logger.info("Registered blender-kitsu")

View File

@ -22,8 +22,8 @@ from typing import Dict, List, Set, Optional, Tuple, Any
import bpy import bpy
import threading import threading
import gazu
from blender_kitsu import cache, prefs, gazu from blender_kitsu import cache, prefs
# TODO: restructure this to not access ops_playblast_data. # TODO: restructure this to not access ops_playblast_data.
from blender_kitsu.playblast import opsdata as ops_playblast_data from blender_kitsu.playblast import opsdata as ops_playblast_data
@ -34,6 +34,7 @@ logger = LoggerFactory.getLogger()
active_thread = False active_thread = False
class KITSU_OT_session_start(bpy.types.Operator): class KITSU_OT_session_start(bpy.types.Operator):
""" """
Starts the Session, which is stored in blender_kitsu addon preferences. Starts the Session, which is stored in blender_kitsu addon preferences.
@ -135,6 +136,7 @@ def auto_login_on_file_open():
if not session.is_auth(): if not session.is_auth():
bpy.ops.kitsu.session_start() bpy.ops.kitsu.session_start()
# ---------REGISTER ----------. # ---------REGISTER ----------.
classes = [ classes = [

View File

@ -38,7 +38,7 @@ from blender_kitsu.types import (
User, User,
) )
from blender_kitsu.logger import LoggerFactory from blender_kitsu.logger import LoggerFactory
from blender_kitsu.gazu.exception import RouteNotFoundException import gazu
logger = LoggerFactory.getLogger() logger = LoggerFactory.getLogger()
@ -411,7 +411,6 @@ def get_user_all_tasks() -> List[Task]:
def _init_cache_entity( def _init_cache_entity(
entity_id: str, entity_type: Any, cache_variable_name: Any, cache_name: str entity_id: str, entity_type: Any, cache_variable_name: Any, cache_name: str
) -> None: ) -> None:
if entity_id: if entity_id:
try: try:
globals()[cache_variable_name] = entity_type.by_id(entity_id) globals()[cache_variable_name] = entity_type.by_id(entity_id)
@ -420,7 +419,7 @@ def _init_cache_entity(
cache_name, cache_name,
globals()[cache_variable_name].name, globals()[cache_variable_name].name,
) )
except RouteNotFoundException: except gazu.exception.RouteNotFoundException:
logger.error( logger.error(
"Failed to initialize active %s cache. ID not found on server: %s", "Failed to initialize active %s cache. ID not found on server: %s",
cache_name, cache_name,

View File

@ -0,0 +1,16 @@
# SPDX-License-Identifier: GPL-3.0-or-later
def preload_modules() -> None:
"""Pre-load the datetime module from a wheel so that the API can find it."""
import sys
if "gazu" in sys.modules:
return
from . import wheels
wheels.load_wheel_global("bidict", "bidict")
wheels.load_wheel_global("engineio", "python_engineio")
wheels.load_wheel_global("socketio", "python_socketio")
wheels.load_wheel_global("gazu", "gazu")

View File

@ -1,165 +0,0 @@
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.

View File

@ -1,64 +0,0 @@
from . import client as raw
from . import cache
from . import helpers
from . import asset
from . import casting
from . import context
from . import entity
from . import edit
from . import files
from . import project
from . import person
from . import shot
from . import sync
from . import task
from . import user
from . import playlist
from .exception import AuthFailedException, ParameterException
from .__version__ import __version__
def get_host(client=raw.default_client):
return raw.get_host(client=client)
def set_host(url, client=raw.default_client):
raw.set_host(url, client=client)
def log_in(email, password, client=raw.default_client):
tokens = {}
try:
tokens = raw.post(
"auth/login", {"email": email, "password": password}, client=client
)
except ParameterException:
pass
if not tokens or (
"login" in tokens and tokens.get("login", False) == False
):
raise AuthFailedException
else:
raw.set_tokens(tokens, client=client)
return tokens
def log_out(client=raw.default_client):
tokens = {}
try:
raw.get("auth/logout", client=client)
except ParameterException:
pass
raw.set_tokens(tokens, client=client)
return tokens
def get_event_host(client=raw.default_client):
return raw.get_event_host(client=client)
def set_event_host(url, client=raw.default_client):
raw.set_event_host(url, client=client)

View File

@ -1 +0,0 @@
__version__ = "0.8.30"

View File

@ -1,530 +0,0 @@
from .helpers import normalize_model_parameter
from . import client as raw
from . import project as gazu_project
from .sorting import sort_by_name
from .cache import cache
from .shot import get_episode
default = raw.default_client
@cache
def all_assets_for_open_projects(client=default):
"""
Returns:
list: Assets stored in the database for open projects.
"""
all_assets = []
for project in gazu_project.all_open_projects(client=default):
all_assets.extend(all_assets_for_project(project, client))
return sort_by_name(all_assets)
@cache
def all_assets_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Assets stored in the database for given project.
"""
project = normalize_model_parameter(project)
if project is None:
return sort_by_name(raw.fetch_all("assets/all", client=client))
else:
path = "projects/%s/assets" % project["id"]
return sort_by_name(raw.fetch_all(path, client=client))
@cache
def all_assets_for_episode(episode, client=default):
"""
Args:
episode (str / dict): The episode dict or the episode ID.
Returns:
list: Assets stored in the database for given episode.
"""
episode = normalize_model_parameter(episode)
return sort_by_name(
raw.fetch_all("assets", {"source_id": episode["id"]}, client=client)
)
@cache
def all_assets_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Assets stored in the database for given shot.
"""
shot = normalize_model_parameter(shot)
path = "shots/%s/assets" % shot["id"]
return sort_by_name(raw.fetch_all(path, client=client))
@cache
def all_assets_for_project_and_type(project, asset_type, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
asset_type (str / dict): The asset type dict or the asset type ID.
Returns:
list: Assets stored in the database for given project and asset type.
"""
project = normalize_model_parameter(project)
asset_type = normalize_model_parameter(asset_type)
project_id = project["id"]
asset_type_id = asset_type["id"]
path = "projects/{project_id}/asset-types/{asset_type_id}/assets"
path = path.format(project_id=project_id, asset_type_id=asset_type_id)
assets = raw.fetch_all(path, client=client)
return sort_by_name(assets)
@cache
def get_asset_by_name(project, name, asset_type=None, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
name (str): The asset name
asset_type (str / dict): Asset type dict or ID (optional).
Returns:
dict: Asset matching given name for given project and asset type.
"""
project = normalize_model_parameter(project)
path = "assets/all"
if asset_type is None:
params = {"project_id": project["id"], "name": name}
else:
asset_type = normalize_model_parameter(asset_type)
params = {
"project_id": project["id"],
"name": name,
"entity_type_id": asset_type["id"],
}
return raw.fetch_first(path, params, client=client)
@cache
def get_asset(asset_id, client=default):
"""
Args:
asset_id (str): Id of claimed asset.
Returns:
dict: Asset matching given ID.
"""
return raw.fetch_one("assets", asset_id, client=client)
@cache
def get_asset_url(asset, client=default):
"""
Args:
asset (str / dict): The asset dict or the asset ID.
Returns:
url (str): Web url associated to the given asset
"""
asset = normalize_model_parameter(asset)
asset = get_asset(asset["id"])
project = gazu_project.get_project(asset["project_id"])
episode_id = "main"
path = "{host}/productions/{project_id}/"
if project["production_type"] != "tvshow":
path += "assets/{asset_id}/"
else:
path += "episodes/{episode_id}/assets/{asset_id}/"
if len(asset["episode_id"]) > 0:
episode_id = asset["episode_id"]
return path.format(
host=raw.get_api_url_from_host(),
project_id=asset["project_id"],
asset_id=asset["id"],
episode_id=episode_id,
client=client,
)
def new_asset(
project,
asset_type,
name,
description="",
extra_data={},
episode=None,
client=default,
):
"""
Create a new asset in the database for given project and asset type.
Args:
project (str / dict): The project dict or the project ID.
asset_type (str / dict): The asset type dict or the asset type ID.
name (str): Asset name.
description (str): Additional information.
extra_data (dict): Free field to add any kind of metadata.
episode (str / dict): The episode this asset is linked to.
Returns:
dict: Created asset.
"""
project = normalize_model_parameter(project)
asset_type = normalize_model_parameter(asset_type)
episode = normalize_model_parameter(episode)
data = {"name": name, "description": description, "data": extra_data}
if episode is not None:
data["episode_id"] = episode["id"]
asset = get_asset_by_name(project, name, asset_type, client=client)
if asset is None:
asset = raw.post(
"data/projects/%s/asset-types/%s/assets/new"
% (project["id"], asset_type["id"]),
data,
client=client,
)
return asset
def update_asset(asset, client=default):
"""
Save given asset data into the API. It assumes that the asset already
exists.
Args:
asset (dict): Asset to save.
"""
if "episode_id" in asset:
asset["source_id"] = asset["episode_id"]
return raw.put("data/entities/%s" % asset["id"], asset, client=client)
def update_asset_data(asset, data={}, client=default):
"""
Update the metadata for the provided asset. Keys that are not provided are
not changed.
Args:
asset (dict / ID): The asset dict or ID to save in database.
data (dict): Free field to set metadata of any kind.
Returns:
dict: Updated asset.
"""
asset = normalize_model_parameter(asset)
current_asset = get_asset(asset["id"], client=client)
updated_asset = {"id": current_asset["id"], "data": current_asset["data"]}
updated_asset["data"].update(data)
return update_asset(updated_asset, client=client)
def remove_asset(asset, force=False, client=default):
"""
Remove given asset from database.
Args:
asset (dict): Asset to remove.
"""
asset = normalize_model_parameter(asset)
path = "data/assets/%s" % asset["id"]
params = {}
if force:
params = {"force": "true"}
return raw.delete(path, params, client=client)
@cache
def all_asset_types(client=default):
"""
Returns:
list: Asset types stored in the database.
"""
return sort_by_name(raw.fetch_all("asset-types", client=client))
@cache
def all_asset_types_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Asset types from assets listed in given project.
"""
path = "projects/%s/asset-types" % project["id"]
return sort_by_name(raw.fetch_all(path, client=client))
@cache
def all_asset_types_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Asset types from assets casted in given shot.
"""
path = "shots/%s/asset-types" % shot["id"]
return sort_by_name(raw.fetch_all(path, client=client))
@cache
def get_asset_type(asset_type_id, client=default):
"""
Args:
asset_type_id (str/): Id of claimed asset type.
Returns:
dict: Asset Type matching given ID.
"""
asset_type_id = normalize_model_parameter(asset_type_id)["id"]
return raw.fetch_one("asset-types", asset_type_id, client=client)
@cache
def get_asset_type_by_name(name, client=default):
"""
Args:
name (str): name of asset type.
Returns:
dict: Asset Type matching given name.
"""
return raw.fetch_first("entity-types", {"name": name}, client=client)
def new_asset_type(name, client=default):
"""
Create a new asset type in the database.
Args:
name (str): The name of asset type to create.
Returns:
(dict): Created asset type.
"""
data = {"name": name}
asset_type = raw.fetch_first("entity-types", {"name": name}, client=client)
if asset_type is None:
asset_type = raw.create("entity-types", data, client=client)
return asset_type
def update_asset_type(asset_type, client=default):
"""
Save given asset type data into the API. It assumes that the asset type
already exists.
Args:
asset_type (dict): Asset Type to save.
"""
data = {"name": asset_type["name"]}
path = "data/asset-types/%s" % asset_type["id"]
return raw.put(path, data, client=client)
def remove_asset_type(asset_type, client=default):
"""
Remove given asset type from database.
Args:
asset_type (dict): Asset type to remove.
"""
asset_type = normalize_model_parameter(asset_type)
path = "data/asset-types/%s" % asset_type["id"]
return raw.delete(path, client=client)
@cache
def get_asset_instance(asset_instance_id, client=default):
"""
Args:
asset_instance_id (str): Id of claimed asset instance.
Returns:
dict: Asset Instance matching given ID.
"""
return raw.fetch_one("asset-instances", asset_instance_id, client=client)
@cache
def all_shot_asset_instances_for_asset(asset, client=default):
"""
Args:
asset (str / dict): The asset dict or the asset ID.
Returns:
list: Asset instances existing for a given asset.
"""
asset = normalize_model_parameter(asset)
path = "assets/%s/shot-asset-instances" % asset["id"]
return raw.fetch_all(path, client=client)
def enable_asset_instance(asset_instance, client=default):
"""
Set active flag of given asset instance to True.
Args:
asset_instance (str / dict): The asset instance dict or ID.
"""
asset_instance = normalize_model_parameter(asset_instance)
data = {"active": True}
path = "asset-instances/%s" % asset_instance["id"]
return raw.put(path, data, client=client)
def disable_asset_instance(asset_instance, client=default):
"""
Set active flag of given asset instance to False.
Args:
asset_instance (str / dict): The asset instance dict or ID.
"""
asset_instance = normalize_model_parameter(asset_instance)
data = {"active": False}
path = "asset-instances/%s" % asset_instance["id"]
return raw.put(path, data, client=client)
@cache
def all_scene_asset_instances_for_asset(asset, client=default):
"""
Args:
asset (str / dict): The asset dict or the asset ID.
Returns:
list: Scene asset instances existing for a given asset.
"""
asset = normalize_model_parameter(asset)
path = "assets/%s/scene-asset-instances" % asset["id"]
return raw.fetch_all(path, client=client)
@cache
def all_asset_instances_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Asset instances existing for a given shot.
"""
path = "shots/%s/asset-instances" % shot["id"]
return raw.fetch_all(path, client=client)
@cache
def all_asset_instances_for_asset(asset, client=default):
"""
Args:
asset (str / dict): The asset dict or the asset ID.
Returns:
list: Asset instances existing for a given asset.
"""
asset = normalize_model_parameter(asset)
path = "assets/%s/asset-asset-instances" % asset["id"]
return raw.fetch_all(path, client=client)
def new_asset_asset_instance(
asset, asset_to_instantiate, description="", client=default
):
"""
Creates a new asset instance for given asset. The instance number is
automatically generated (increment highest number).
Args:
asset (str / dict): The asset dict or the shot ID.
asset_instance (str / dict): The asset instance dict or ID.
description (str): Additional information (optional)
Returns:
(dict): Created asset instance.
"""
asset = normalize_model_parameter(asset)
asset_to_instantiate = normalize_model_parameter(asset_to_instantiate)
data = {
"asset_to_instantiate_id": asset_to_instantiate["id"],
"description": description,
}
return raw.post(
"data/assets/%s/asset-asset-instances" % asset["id"],
data,
client=client,
)
def import_assets_with_csv(project, csv_file_path, client=default):
project = normalize_model_parameter(project)
return raw.upload(
"import/csv/projects/%s/assets" % project["id"],
csv_file_path,
client=client,
)
def export_assets_with_csv(
project, csv_file_path, episode=None, assigned_to=None, client=default
):
project = normalize_model_parameter(project)
episode = normalize_model_parameter(episode)
assigned_to = normalize_model_parameter(assigned_to)
params = {}
if episode:
params["episode_id"] = episode["id"]
if assigned_to:
params["assigned_to"] = assigned_to["id"]
return raw.download(
"export/csv/projects/%s/assets.csv" % project["id"],
csv_file_path,
params=params,
client=client,
)
@cache
def get_episode_from_asset(asset, client=default):
"""
Args:
asset (dict): The asset dict.
Returns:
dict: Episode which is parent of given asset.
"""
if asset["parent_id"] is None:
return None
else:
return get_episode(asset["parent_id"], client=client)
@cache
def get_asset_type_from_asset(asset, client=default):
"""
Args:
asset (dict): The asset dict.
Returns:
dict: Asset type which is the type of given asset.
"""
return get_asset_type(asset["entity_type_id"], client=client)

View File

@ -1,215 +0,0 @@
import copy
import datetime
import json
from functools import wraps
cache_settings = {"enabled": False}
cached_functions = []
def enable():
"""
Enable caching on all decorated functions.
"""
cache_settings["enabled"] = True
return cache_settings["enabled"]
def disable():
"""
Disable caching on all decorated functions.
"""
cache_settings["enabled"] = False
return cache_settings["enabled"]
def clear_all():
"""
Clear all cached functions.
"""
for function in cached_functions:
function.clear_cache()
def remove_oldest_entry(memo, maxsize):
"""
Remove the oldest cache entry if there is more value stored than allowed.
Params:
memo (dict): Cache used for function memoization.
maxsize (int): Maximum number of entries for the cache.
Returns:
Oldest entry for given cache.
"""
oldest_entry = None
if maxsize > 0 and len(memo) > maxsize:
oldest_entry_key = list(memo.keys())[0]
for entry_key in memo.keys():
oldest_date = memo[oldest_entry_key]["date_accessed"]
if memo[entry_key]["date_accessed"] < oldest_date:
oldest_entry_key = entry_key
memo.pop(oldest_entry_key)
return oldest_entry
def get_cache_key(args, kwargs):
"""
Serialize arguments to get a cache key. It will be used to store function
results.
Returns:
str: generated key
"""
kwargscopy = kwargs.copy()
if "client" in kwargscopy:
kwargscopy["client"] = kwargscopy["client"].host
if len(args) == 0 and len(kwargscopy) == 0:
return ""
elif len(args) == 0:
return json.dumps(kwargscopy)
elif len(kwargscopy.keys()) == 0:
return json.dumps(args)
else:
return json.dumps([args, kwargscopy])
def insert_value(function, cache_store, args, kwargs):
"""
Serialize function call arguments and store function result in given cache
store.
Args:
function (func): The function to cache value for.
cache_store (dict): The cache which will contain the value to cache.
args, kwargs: The arguments for which a cache must be set.
Returns:
The cached value.
"""
returned_value = function(*args, **kwargs)
key = get_cache_key(args, kwargs)
cache_store[key] = {
"date_accessed": datetime.datetime.now(),
"value": returned_value,
}
return get_value(cache_store, key)
def get_value(cache_store, key):
"""
It generates a deep copy of the requested value. It's needed because if a
pointer is returned, the value can be changed. Which leads to a modified
cache and unexpected results.
Returns:
Value matching given key inside given cache store
"""
value = cache_store[key]["value"]
return copy.deepcopy(value)
def is_cache_enabled(state):
"""
Args:
state: The state describing the cache state.
Returns:
True if cache is enabled for given state.
"""
return cache_settings["enabled"] and state["enabled"]
def is_cache_expired(memo, state, key):
"""
Check if cache is expired (outdated) for given wrapper state and cache key.
Args:
memo (dict): The function cache
state (dict): The parameters of the cache (enabled, expire, maxsize)
key: The key to check
Returns:
True if cache value is expired.
"""
date = memo[key]["date_accessed"]
expire = state["expire"]
date_to_check = date + datetime.timedelta(seconds=expire)
return expire > 0 and date_to_check < datetime.datetime.now()
def cache(function, maxsize=300, expire=120):
"""
Decorator that generate cache wrapper and that adds cache feature to
target function. A max cache size and and expiration time (in seconds) can
be set too.
Args:
function (func): Decorated function:
maxsize: Number of value stored in cache (300 by default).
expire: Time to live in seconds of stored value (disabled by default)
"""
cache_store = {}
state = {"enabled": True, "expire": expire, "maxsize": maxsize}
statistics = {"hits": 0, "misses": 0, "expired_hits": 0}
def clear_cache():
cache_store.clear()
def get_cache_infos():
size = {"current_size": len(cache_store)}
infos = {}
for d in [state, statistics, size]:
infos.update(d)
return infos
def set_expire(new_expire):
state["expire"] = new_expire
def set_max_size(maxsize):
state["maxsize"] = maxsize
def enable_cache():
state["enabled"] = True
def disable_cache():
state["enabled"] = False
@wraps(function)
def wrapper(*args, **kwargs):
if is_cache_enabled(state):
key = get_cache_key(args, kwargs)
if key in cache_store:
if is_cache_expired(cache_store, state, key):
statistics["expired_hits"] += 1
return insert_value(function, cache_store, args, kwargs)
else:
statistics["hits"] += 1
return get_value(cache_store, key)
else:
statistics["misses"] += 1
returned_value = insert_value(
function, cache_store, args, kwargs
)
remove_oldest_entry(cache_store, state["maxsize"])
return returned_value
else:
return function(*args, **kwargs)
wrapper.set_cache_expire = set_expire
wrapper.set_cache_max_size = set_max_size
wrapper.clear_cache = clear_cache
wrapper.enable_cache = enable_cache
wrapper.disable_cache = disable_cache
wrapper.get_cache_infos = get_cache_infos
cached_functions.append(wrapper)
return wrapper

View File

@ -1,153 +0,0 @@
from . import client as raw
from .helpers import normalize_model_parameter
default = raw.default_client
def update_shot_casting(project, shot, casting, client=default):
"""
Change casting of given shot with given casting (list of asset ids displayed
into the shot).
Args:
shot (str / dict): The shot dict or the shot ID.
casting (dict): The casting description.
Ex: `casting = [{"asset_id": "asset-1", "nb_occurences": 3}]`
Returns:
dict: Related shot.
"""
shot = normalize_model_parameter(shot)
project = normalize_model_parameter(project)
path = "data/projects/%s/entities/%s/casting" % (project["id"], shot["id"])
return raw.put(path, casting, client=client)
def update_asset_casting(project, asset, casting, client=default):
"""
Change casting of given asset with given casting (list of asset ids
displayed into the asset).
Args:
asset (str / dict): The asset dict or the asset ID.
casting (dict): The casting description.
Ex: `casting = [{"asset_id": "asset-1", "nb_occurences": 3}]`
Returns:
dict: Related asset.
"""
asset = normalize_model_parameter(asset)
project = normalize_model_parameter(project)
path = "data/projects/%s/entities/%s/casting" % (
project["id"],
asset["id"],
)
return raw.put(path, casting, client=client)
def get_asset_type_casting(project, asset_type, client=default):
"""
Return casting for given asset_type.
`casting = {
"asset-id": [{"asset_id": "asset-1", "nb_occurences": 3}],
...
}
`
Args:
project (str / dict): The project dict or the project ID.
asset_type (str / dict): The asset_type dict or the asset_type ID.
Returns:
dict: Casting of the given asset_type.
"""
project = normalize_model_parameter(project)
asset_type = normalize_model_parameter(asset_type)
path = "/data/projects/%s/asset-types/%s/casting" % (
project["id"],
asset_type["id"],
)
return raw.get(path, client=client)
def get_sequence_casting(sequence, client=default):
"""
Return casting for given sequence.
`casting = {
"shot-id": [{"asset_id": "asset-1", "nb_occurences": 3}]},
...
}
`
Args:
sequence (dict): The sequence dict
Returns:
dict: Casting of the given sequence.
"""
path = "/data/projects/%s/sequences/%s/casting" % (
sequence["project_id"],
sequence["id"],
)
return raw.get(path, client=client)
def get_shot_casting(shot, client=default):
"""
Return casting for given shot.
`[{"asset_id": "asset-1", "nb_occurences": 3}]}`
Args:
shot (dict): The shot dict
Returns:
dict: Casting of the given shot.
"""
path = "/data/projects/%s/entities/%s/casting" % (
shot["project_id"],
shot["id"],
)
return raw.get(path, client=client)
def get_asset_casting(asset, client=default):
"""
Return casting for given asset.
`[{"asset_id": "asset-1", "nb_occurences": 3}]}`
Args:
asset (dict): The asset dict
Returns:
dict: Casting for given asset.
"""
path = "/data/projects/%s/entities/%s/casting" % (
asset["project_id"],
asset["id"],
)
return raw.get(path, client=client)
def get_asset_cast_in(asset, client=default):
"""
Return entity list where given asset is casted.
Args:
asset (dict): The asset dict
Returns:
dict: Entity list where given asset is casted.
"""
asset = normalize_model_parameter(asset)
path = "/data/assets/%s/cast-in" % asset["id"]
return raw.get(path, client=client)
def all_entity_links_for_project(project, client=default):
"""
Args:
project (dict): The project
Returns:
dict: Entity links for given project.
"""
project = normalize_model_parameter(project)
path = "/data/projects/%s/entity-links" % project["id"]
return raw.get(path, client=client)

View File

@ -1,488 +0,0 @@
import sys
import functools
import json
import shutil
import urllib
from .encoder import CustomJSONEncoder
if sys.version_info[0] == 3:
from json import JSONDecodeError
else:
JSONDecodeError = ValueError
from .__version__ import __version__
from .exception import (
TooBigFileException,
NotAuthenticatedException,
NotAllowedException,
MethodNotAllowedException,
ParameterException,
RouteNotFoundException,
ServerErrorException,
UploadFailedException,
)
class KitsuClient(object):
def __init__(self, host, ssl_verify=True, cert=None):
self.tokens = {"access_token": "", "refresh_token": ""}
self.session = requests.Session()
self.session.verify = ssl_verify
self.session.cert = cert
self.host = host
self.event_host = host
def create_client(host, ssl_verify=True, cert=None):
return KitsuClient(host, ssl_verify, cert=None)
default_client = None
try:
import requests
# Little hack to allow json encoder to manage dates.
requests.models.complexjson.dumps = functools.partial(
json.dumps, cls=CustomJSONEncoder
)
# Set host to "" otherwise requests.Session() takes a long time during Blender startup
# Whyever that is.
# host = "http://gazu.change.serverhost/api"
host = ""
default_client = create_client(host)
except Exception:
print("Warning, running in setup mode!")
def host_is_up(client=default_client):
"""
Returns:
True if the host is up.
"""
try:
response = client.session.head(client.host)
except Exception:
return False
return response.status_code == 200
def host_is_valid(client=default_client):
"""
Check if the host is valid by simulating a fake login.
Returns:
True if the host is valid.
"""
if not host_is_up(client):
return False
try:
post("auth/login", {"email": "", "password": ""})
except Exception as exc:
return type(exc) == ParameterException
def get_host(client=default_client):
"""
Returns:
Host on which requests are sent.
"""
return client.host
def get_api_url_from_host(client=default_client):
"""
Returns:
Zou url, retrieved from host.
"""
return client.host[:-4]
def set_host(new_host, client=default_client):
"""
Returns:
Set currently configured host on which requests are sent.
"""
client.host = new_host
return client.host
def get_event_host(client=default_client):
"""
Returns:
Host on which listening for events.
"""
return client.event_host or client.host
def set_event_host(new_host, client=default_client):
"""
Returns:
Set currently configured host on which listening for events.
"""
client.event_host = new_host
return client.event_host
def set_tokens(new_tokens, client=default_client):
"""
Store authentication token to reuse them for all requests.
Args:
new_tokens (dict): Tokens to use for authentication.
"""
client.tokens = new_tokens
return client.tokens
def make_auth_header(client=default_client):
"""
Returns:
Headers required to authenticate.
"""
headers = {"User-Agent": "CGWire Gazu %s" % __version__}
if "access_token" in client.tokens:
headers["Authorization"] = "Bearer %s" % client.tokens["access_token"]
return headers
def url_path_join(*items):
"""
Make it easier to build url path by joining every arguments with a '/'
character.
Args:
items (list): Path elements
"""
return "/".join([item.lstrip("/").rstrip("/") for item in items])
def get_full_url(path, client=default_client):
"""
Args:
path (str): The path to integrate to host url.
Returns:
The result of joining configured host url with given path.
"""
return url_path_join(get_host(client), path)
def build_path_with_params(path, params):
"""
Add params to a path using urllib encoding
Args:
path (str): The url base path
params (dict): The parameters to add as a dict
Returns:
str: the builded path
"""
if not params:
return path
if hasattr(urllib, "urlencode"):
path = "%s?%s" % (path, urllib.urlencode(params))
else:
path = "%s?%s" % (path, urllib.parse.urlencode(params))
return path
def get(path, json_response=True, params=None, client=default_client):
"""
Run a get request toward given path for configured host.
Returns:
The request result.
"""
path = build_path_with_params(path, params)
response = client.session.get(
get_full_url(path, client=client),
headers=make_auth_header(client=client),
)
check_status(response, path)
if json_response:
return response.json()
else:
return response.text
def post(path, data, client=default_client):
"""
Run a post request toward given path for configured host.
Returns:
The request result.
"""
response = client.session.post(
get_full_url(path, client),
json=data,
headers=make_auth_header(client=client),
)
check_status(response, path)
try:
result = response.json()
except JSONDecodeError:
print(response.text)
raise
return result
def put(path, data, client=default_client):
"""
Run a put request toward given path for configured host.
Returns:
The request result.
"""
response = client.session.put(
get_full_url(path, client),
json=data,
headers=make_auth_header(client=client),
)
check_status(response, path)
return response.json()
def delete(path, params=None, client=default_client):
"""
Run a delete request toward given path for configured host.
Returns:
The request result.
"""
path = build_path_with_params(path, params)
response = client.session.delete(
get_full_url(path, client), headers=make_auth_header(client=client)
)
check_status(response, path)
return response.text
def check_status(request, path):
"""
Raise an exception related to status code, if the status code does not
match a success code. Print error message when it's relevant.
Args:
request (Request): The request to validate.
Returns:
int: Status code
Raises:
ParameterException: when 400 response occurs
NotAuthenticatedException: when 401 response occurs
RouteNotFoundException: when 404 response occurs
NotAllowedException: when 403 response occurs
MethodNotAllowedException: when 405 response occurs
TooBigFileException: when 413 response occurs
ServerErrorException: when 500 response occurs
"""
status_code = request.status_code
if status_code == 404:
raise RouteNotFoundException(path)
elif status_code == 403:
raise NotAllowedException(path)
elif status_code == 400:
text = request.json().get("message", "No additional information")
raise ParameterException(path, text)
elif status_code == 405:
raise MethodNotAllowedException(path)
elif status_code == 413:
raise TooBigFileException(
"%s: You send a too big file. "
"Change your proxy configuration to allow bigger files." % path
)
elif status_code in [401, 422]:
raise NotAuthenticatedException(path)
elif status_code in [500, 502]:
try:
stacktrace = request.json().get(
"stacktrace", "No stacktrace sent by the server"
)
message = request.json().get(
"message", "No message sent by the server"
)
print("A server error occured!\n")
print("Server stacktrace:\n%s" % stacktrace)
print("Error message:\n%s\n" % message)
except Exception:
print(request.text)
raise ServerErrorException(path)
return status_code
def fetch_all(path, params=None, client=default_client):
"""
Args:
path (str): The path for which we want to retrieve all entries.
Returns:
list: All entries stored in database for a given model. You can add a
filter to the model name like this: "tasks?project_id=project-id"
"""
return get(url_path_join("data", path), params=params, client=client)
def fetch_first(path, params=None, client=default_client):
"""
Args:
path (str): The path for which we want to retrieve the first entry.
Returns:
dict: The first entry for which a model is required.
"""
entries = get(url_path_join("data", path), params=params, client=client)
if len(entries) > 0:
return entries[0]
else:
return None
def fetch_one(model_name, id, client=default_client):
"""
Function dedicated at targeting routes that returns a single model
instance.
Args:
model_name (str): Model type name.
id (str): Model instance ID.
Returns:
dict: The model instance matching id and model name.
"""
return get(url_path_join("data", model_name, id), client=client)
def create(model_name, data, client=default_client):
"""
Create an entry for given model and data.
Args:
model (str): The model type involved
data (str): The data to use for creation
Returns:
dict: Created entry
"""
return post(url_path_join("data", model_name), data, client=client)
def update(model_name, model_id, data, client=default_client):
"""
Update an entry for given model, id and data.
Args:
model (str): The model type involved
id (str): The target model id
data (dict): The data to update
Returns:
dict: Updated entry
"""
return put(
url_path_join("data", model_name, model_id), data, client=client
)
def upload(path, file_path, data={}, extra_files=[], client=default_client):
"""
Upload file located at *file_path* to given url *path*.
Args:
path (str): The url path to upload file.
file_path (str): The file location on the hard drive.
Returns:
Response: Request response object.
"""
url = get_full_url(path, client)
files = _build_file_dict(file_path, extra_files)
response = client.session.post(
url, data=data, headers=make_auth_header(client=client), files=files
)
check_status(response, path)
try:
result = response.json()
except JSONDecodeError:
print(response.text)
raise
if "message" in result:
raise UploadFailedException(result["message"])
return result
def _build_file_dict(file_path, extra_files):
files = {"file": open(file_path, "rb")}
i = 2
for file_path in extra_files:
files["file-%s" % i] = open(file_path, "rb")
i += 1
return files
def download(path, file_path, params=None, client=default_client):
"""
Download file located at *file_path* to given url *path*.
Args:
path (str): The url path to download file from.
file_path (str): The location to store the file on the hard drive.
Returns:
Response: Request response object.
"""
path = build_path_with_params(path, params)
with client.session.get(
get_full_url(path, client),
headers=make_auth_header(client=client),
stream=True,
) as response:
with open(file_path, "wb") as target_file:
shutil.copyfileobj(response.raw, target_file)
return response
def get_file_data_from_url(url, full=False, client=default_client):
"""
Return data found at given url.
"""
if not full:
url = get_full_url(url)
response = requests.get(
url,
stream=True,
headers=make_auth_header(client=client),
)
check_status(response, url)
return response.content
def import_data(model_name, data, client=default_client):
"""
Args:
model_name (str): The data model to import
data (dict): The data to import
"""
return post("/import/kitsu/%s" % model_name, data, client=client)
def get_api_version(client=default_client):
"""
Returns:
str: Current version of the API.
"""
return get("", client=client)["version"]
def get_current_user(client=default_client):
"""
Returns:
dict: User database information for user linked to auth tokens.
"""
return get("auth/authenticated", client=client)["user"]

View File

@ -1,151 +0,0 @@
from . import user as gazu_user
from . import project as gazu_project
from . import asset as gazu_asset
from . import task as gazu_task
from . import shot as gazu_shot
from . import scene as gazu_scene
def all_open_projects(user_context=False):
"""
Return the list of projects for which the user has a task.
"""
if user_context:
return gazu_user.all_open_projects()
else:
return gazu_project.all_open_projects()
def all_assets_for_project(project, user_context=False):
"""
Return the list of assets for which the user has a task.
"""
if user_context:
return gazu_user.all_assets_for_project(project)
else:
return gazu_asset.all_assets_for_project(project)
def all_asset_types_for_project(project, user_context=False):
"""
Return the list of asset types for which the user has a task.
"""
if user_context:
return gazu_user.all_asset_types_for_project(project)
else:
return gazu_asset.all_asset_types_for_project(project)
def all_assets_for_asset_type_and_project(
project, asset_type, user_context=False
):
"""
Return the list of assets for given project and asset_type and for which
the user has a task.
"""
if user_context:
return gazu_user.all_assets_for_asset_type_and_project(
project, asset_type
)
else:
return gazu_asset.all_assets_for_project_and_type(project, asset_type)
def all_task_types_for_asset(asset, user_context=False):
"""
Return the list of tasks for given asset and current user.
"""
if user_context:
return gazu_user.all_task_types_for_asset(asset)
else:
return gazu_task.all_task_types_for_asset(asset)
def all_task_types_for_shot(shot, user_context=False):
"""
Return the list of tasks for given shot and current user.
"""
if user_context:
return gazu_user.all_task_types_for_shot(shot)
else:
return gazu_task.all_task_types_for_shot(shot)
def all_task_types_for_scene(scene, user_context=False):
"""
Return the list of tasks for given scene and current user.
"""
if user_context:
return gazu_user.all_task_types_for_scene(scene)
else:
return gazu_task.all_task_types_for_scene(scene)
def all_task_types_for_sequence(sequence, user_context=False):
"""
Return the list of tasks for given sequence and current user.
"""
if user_context:
return gazu_user.all_task_types_for_sequence(sequence)
else:
return gazu_task.all_task_types_for_sequence(sequence)
def all_sequences_for_project(project, user_context=False):
"""
Return the list of sequences for given project and current user.
"""
if user_context:
return gazu_user.all_sequences_for_project(project)
else:
return gazu_shot.all_sequences_for_project(project)
def all_scenes_for_project(project, user_context=False):
"""
Return the list of scenes for given project and current user.
"""
if user_context:
return gazu_user.all_scenes_for_project(project)
else:
return gazu_scene.all_scenes(project)
def all_shots_for_sequence(sequence, user_context=False):
"""
Return the list of shots for given sequence and current user.
"""
if user_context:
return gazu_user.all_shots_for_sequence(sequence)
else:
return gazu_shot.all_shots_for_sequence(sequence)
def all_scenes_for_sequence(sequence, user_context=False):
"""
Return the list of scenes for given sequence and current user.
"""
if user_context:
return gazu_user.all_scenes_for_sequence(sequence)
else:
return gazu_scene.all_scenes_for_sequence(sequence)
def all_sequences_for_episode(episode, user_context=False):
"""
Return the list of shots for given sequence and current user.
"""
if user_context:
return gazu_user.all_sequences_for_episode(episode)
else:
return gazu_shot.all_sequences_for_episode(episode)
def all_episodes_for_project(project, user_context=False):
"""
Return the list of shots for given sequence and current user.
"""
if user_context:
return gazu_user.all_episodes_for_project(project)
else:
return gazu_shot.all_episodes_for_project(project)

View File

@ -1,59 +0,0 @@
from blender_kitsu import gazu
from . import client as raw
from .sorting import sort_by_name
from .cache import cache
from .helpers import normalize_model_parameter
default = raw.default_client
@cache
def get_all_edits(relations=False, client=default):
"""
Retrieve all edit entries.
"""
params = {}
if relations:
params = {"relations": "true"}
path = "edits/all"
edits = raw.fetch_all(path, params, client=client)
return sort_by_name(edits)
@cache
def get_edit(edit_id, relations=False, client=default):
"""
Retrieve all edit entries.
"""
edit_entry = normalize_model_parameter(edit_id)
params = {}
if relations:
params = {"relations": "true"}
path = f"edits/{edit_entry['id']}"
edit_entry = raw.fetch_all(path, params, client=client)
return edit_entry
@cache
def get_all_edits_with_tasks(relations=False, client=default):
"""
Retrieve all edit entries.
"""
params = {}
if relations:
params = {"relations": "true"}
path = "edits/with-tasks"
edits_with_tasks = raw.fetch_all(path, params, client=client)
return sort_by_name(edits_with_tasks)
@cache
def get_all_previews_for_edit(edit, client=default):
"""
Args:
episode (str / dict): The episode dict or the episode ID.
Returns:
list: Shots which are children of given episode.
"""
edit = normalize_model_parameter(edit)
edit_previews = (raw.fetch_all(f"edits/{edit['id']}/preview-files", client=client))
for key in [key for key in enumerate(edit_previews.keys())]:
return edit_previews[key[1]]

View File

@ -1,15 +0,0 @@
import json
import datetime
class CustomJSONEncoder(json.JSONEncoder):
"""
This JSON encoder is here to handle dates which are not handled by default.
The standard does not want to assum how you handle dates.
"""
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)

View File

@ -1,119 +0,0 @@
from . import client as raw
from .cache import cache
from .sorting import sort_by_name
from .helpers import normalize_model_parameter
default = raw.default_client
@cache
def all_entities(client=default):
"""
Returns:
list: Retrieve all entities
"""
return raw.fetch_all("entities", client=client)
@cache
def all_entity_types(client=default):
"""
Returns:
list: Entity types listed in database.
"""
return sort_by_name(raw.fetch_all("entity-types", client=client))
@cache
def get_entity(entity_id, client=default):
"""
Args:
id (str, client=default): ID of claimed entity.
Returns:
dict: Retrieve entity matching given ID (It can be an entity of any
kind: asset, shot, sequence or episode).
"""
return raw.fetch_one("entities", entity_id, client=client)
@cache
def get_entity_by_name(entity_name, client=default):
"""
Args:
name (str, client=default): The name of the claimed entity.
Returns:
Retrieve entity matching given name.
"""
return raw.fetch_first("entities", {"name": entity_name}, client=client)
@cache
def get_entity_type(entity_type_id, client=default):
"""
Args:
id (str, client=default): ID of claimed entity type.
, client=client
Returns:
Retrieve entity type matching given ID (It can be an entity type of any
kind).
"""
return raw.fetch_one("entity-types", entity_type_id, client=client)
@cache
def get_entity_type_by_name(entity_type_name, client=default):
"""
Args:
name (str, client=default): The name of the claimed entity type
Returns:
Retrieve entity type matching given name.
"""
return raw.fetch_first(
"entity-types", {"name": entity_type_name}, client=client
)
def new_entity_type(name, client=default):
"""
Creates an entity type with the given name.
Args:
name (str, client=default): The name of the entity type
Returns:
dict: The created entity type
"""
data = {"name": name}
return raw.create("entity-types", data, client=client)
def remove_entity(entity, force=False, client=default):
"""
Remove given entity from database.
Args:
entity (dict): Entity to remove.
"""
entity = normalize_model_parameter(entity)
path = "data/entities/%s" % entity["id"]
params = {}
if force:
params = {"force": "true"}
return raw.delete(path, params, client=client)
def update_entity(entity, client=default):
"""
Save given shot data into the API. Metadata are fully replaced by the ones
set on given shot.
Args:
Entity (dict): The shot dict to update.
Returns:
dict: Updated entity.
"""
return raw.put(f"data/entities/{entity['id']}", entity, client=client)

View File

@ -1,93 +0,0 @@
class HostException(Exception):
"""
Error raised when host is not valid.
"""
pass
class AuthFailedException(Exception):
"""
Error raised when user credentials are wrong.
"""
pass
class NotAuthenticatedException(Exception):
"""
Error raised when a 401 error (not authenticated) is sent by the API.
"""
pass
class NotAllowedException(Exception):
"""
Error raised when a 403 error (not authorized) is sent by the API.
"""
pass
class MethodNotAllowedException(Exception):
"""
Error raised when a 405 error (method not handled) is sent by the API.
"""
pass
class RouteNotFoundException(Exception):
"""
Error raised when a 404 error (not found) is sent by the API.
"""
pass
class ServerErrorException(Exception):
"""
Error raised when a 500 error (server error) is sent by the API.
"""
pass
class ParameterException(Exception):
"""
Error raised when a 400 error (argument error) is sent by the API.
"""
pass
class UploadFailedException(Exception):
"""
Error raised when an error while uploading a file, mainly to handle cases
where processing that occurs on the remote server fails.
"""
pass
class TooBigFileException(Exception):
"""
Error raised when a 413 error (payload too big error) is sent by the API.
"""
pass
class TaskStatusNotFound(Exception):
"""
Error raised when a task status is not found.
"""
pass
class DownloadFileException(Exception):
"""
Error raised when a file can't be downloaded.
"""

File diff suppressed because it is too large Load Diff

View File

@ -1,140 +0,0 @@
import os
import sys
import re
import datetime
import shutil
import requests
import tempfile
import mimetypes
from .exception import DownloadFileException
if sys.version_info[0] == 3:
import urllib.parse as urlparse
else:
import urlparse
_UUID_RE = re.compile(
"([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}){1}"
)
def normalize_model_parameter(model_parameter):
"""
Args:
model_parameter (str / dict): The parameter to convert.
Returns:
dict: If `model_parameter` is an ID (a string), it turns it into a model
dict. If it's already a dict, the `model_parameter` is returned as it
is. It returns None if the paramater is None.
"""
if model_parameter is None:
return None
elif isinstance(model_parameter, dict):
return model_parameter
else:
try:
id_str = str(model_parameter)
except Exception:
raise ValueError("Failed to cast argument to str")
if _UUID_RE.match(id_str):
return {"id": id_str}
else:
raise ValueError("Wrong format: expected ID string or Data dict")
def normalize_list_of_models_for_links(models=[]):
"""
Args:
models (list): The models to convert.
Returns:
list: A list of ids of the models.
"""
if not isinstance(models, list):
models = [models]
return [normalize_model_parameter(model)["id"] for model in models]
def validate_date_format(date_text):
try:
datetime.datetime.strptime(date_text, "%Y-%m-%dT%H:%M:%S")
except ValueError:
try:
datetime.datetime.strptime(date_text, "%Y-%m-%d")
except ValueError:
raise ValueError(
"Incorrect date format for %s, should be YYYY-mm-dd or YYYY-mm-ddTHH:MM:SS"
% date_text
)
return date_text
def sanitize_filename(filename):
forbidden = "@|():%/,\\[]<>*?;`\n"
return "".join(
[c for c in filename.replace("..", "_") if c not in forbidden]
).strip()
def download_file(url, file_path=None, headers={}):
"""
Download file located at *file_path* to given url *url*.
Args:
url (str): The url path to download file from.
file_path (str): The location to store the file on the hard drive.
headers (dict): The headers to pass to requests
Returns:
str: The location where the file is stored.
"""
with requests.get(
url,
headers=headers,
stream=True,
) as response:
if response.ok:
if file_path is None:
file_path = tempfile.gettempdir()
if os.path.isdir(file_path):
file_path = os.path.join(file_path, "")
(dir, filename) = os.path.split(file_path)
if not filename:
url_parts = urlparse.urlparse(url)
filename = url_parts.path.split("/")[-1]
if not dir:
dir = os.getcwd()
name, ext = os.path.splitext(filename)
if ext == "":
if "Content-Type" in response.headers:
guessed_ext = mimetypes.guess_extension(
response.headers["Content-Type"]
)
if guessed_ext is not None:
ext = guessed_ext
if name == "":
name = "file"
filename = sanitize_filename(name + ext)
file_path = os.path.join(dir, filename)
with open(file_path, "wb") as target_file:
shutil.copyfileobj(response.raw, target_file)
return file_path
else:
raise DownloadFileException(
"File (%s) can't be downloaded (%i %s)."
% (url, response.status_code, response.reason)
)

View File

@ -1,224 +0,0 @@
from . import client as raw
from .sorting import sort_by_name
from .helpers import (
normalize_model_parameter,
normalize_list_of_models_for_links,
)
from .cache import cache
default = raw.default_client
@cache
def all_organisations(client=default):
"""
Returns:
list: Organisations listed in database.
"""
return sort_by_name(raw.fetch_all("organisations", client=client))
@cache
def all_departments(client=default):
"""
Returns:
list: Departments listed in database.
"""
return sort_by_name(raw.fetch_all("departments", client=client))
@cache
def all_persons(client=default):
"""
Returns:
list: Persons listed in database.
"""
return sort_by_name(raw.fetch_all("persons", client=client))
@cache
def get_person(id, client=default):
"""
Args:
id (str): An uuid identifying a person.
Returns:
dict: Person corresponding to given id.
"""
return raw.fetch_one("persons", id, client=client)
@cache
def get_person_by_desktop_login(desktop_login, client=default):
"""
Args:
desktop_login (str): Login used to sign in on the desktop computer.
Returns:
dict: Person corresponding to given desktop computer login.
"""
return raw.fetch_first(
"persons", {"desktop_login": desktop_login}, client=client
)
@cache
def get_person_by_email(email, client=default):
"""
Args:
email (str): User's email.
Returns:
dict: Person corresponding to given email.
"""
return raw.fetch_first("persons", {"email": email}, client=client)
@cache
def get_person_by_full_name(full_name, client=default):
"""
Args:
full_name (str): User's full name
Returns:
dict: Person corresponding to given name.
"""
if " " in full_name:
first_name, last_name = full_name.lower().split(" ")
else:
first_name, last_name = full_name.lower().strip(), ""
for person in all_persons():
is_right_first_name = (
first_name == person["first_name"].lower().strip()
)
is_right_last_name = (
len(last_name) == 0 or last_name == person["last_name"].lower()
)
if is_right_first_name and is_right_last_name:
return person
return None
@cache
def get_person_url(person, client=default):
"""
Args:
person (str / dict): The person dict or the person ID.
Returns:
url (str): Web url associated to the given person
"""
person = normalize_model_parameter(person)
path = "{host}/people/{person_id}/"
return path.format(
host=raw.get_api_url_from_host(client=client),
person_id=person["id"],
)
@cache
def get_organisation(client=default):
"""
Returns:
dict: Database information for organisation linked to auth tokens.
"""
return raw.get("auth/authenticated", client=client)["organisation"]
def new_person(
first_name,
last_name,
email,
phone="",
role="user",
desktop_login="",
departments=[],
client=default,
):
"""
Create a new person based on given parameters. His/her password will is
set automatically to default.
Args:
first_name (str):
last_name (str):
email (str):
phone (str):
role (str): user, manager, admin (wich match CG artist, Supervisor
and studio manager)
desktop_login (str): The login the users uses to log on its computer.
departments (list): The departments for the person.
Returns:
dict: Created person.
"""
person = get_person_by_email(email)
if person is None:
person = raw.post(
"data/persons/new",
{
"first_name": first_name,
"last_name": last_name,
"email": email,
"phone": phone,
"role": role,
"desktop_login": desktop_login,
"departments": normalize_list_of_models_for_links(departments),
},
client=client,
)
return person
def update_person(person, client=default):
"""
Update a person.
Args:
person (dict): The person dict that needs to be upgraded.
Returns:
dict: The updated person.
"""
if "departments" in person:
person["departments"] = normalize_list_of_models_for_links(
person["departments"]
)
person = normalize_model_parameter(person)
return raw.put(
"data/persons/%s" % (person["id"]),
person,
client=client,
)
def set_avatar(person, file_path, client=default):
"""
Upload picture and set it as avatar for given person.
Args:
person (str / dict): The person dict or the person ID.
file_path (str): Path where the avatar file is located on the hard
drive.
"""
person = normalize_model_parameter(person)
return raw.upload(
"/pictures/thumbnails/persons/%s" % person["id"],
file_path,
client=client,
)
def get_presence_log(year, month, client=default):
"""
Args:
year (int):
month (int):
Returns:
The presence log table for given month and year.
"""
path = "data/persons/presence-logs/%s-%s" % (year, str(month).zfill(2))
return raw.get(path, json_response=False, client=client)

View File

@ -1,153 +0,0 @@
from . import client as raw
from .helpers import normalize_model_parameter
from .sorting import sort_by_name
from .cache import cache
default = raw.default_client
@cache
def all_playlists(client=default):
"""
Returns:
list: All playlists for all projects.
"""
return sort_by_name(raw.fetch_all("playlists", client=client))
@cache
def all_shots_for_playlist(playlist, client=default):
"""
Args:
playlist (str / dict): The playlist dict or the playlist ID.
Returns:
list: All shots linked to the given playlist
"""
playlist = normalize_model_parameter(playlist)
playlist = raw.fetch_one("playlists", playlist["id"], client=client)
return sort_by_name(playlist["shots"])
@cache
def all_playlists_for_project(project, client=default, page=1):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: All playlists for the given project
"""
project = normalize_model_parameter(project)
return sort_by_name(
raw.fetch_all(
"projects/%s/playlists" % project["id"],
params={"page": page},
client=client,
)
)
@cache
def all_playlists_for_episode(episode, client=default):
"""
Args:
episode (str / dict): The episode dict or the episode ID.
Returns:
list: All playlists for the given episode.
"""
project = normalize_model_parameter(episode["project_id"])
return sort_by_name(
raw.fetch_all(
"projects/%s/episodes/%s/playlists"
% (
project["id"],
episode["id"],
),
client=client,
)
)
@cache
def get_playlist(playlist, client=default):
"""
Args:
playlist (str / dict): The playlist dict or the playlist ID.
Returns:
dict: playlist object for given id.
"""
playlist = normalize_model_parameter(playlist)
return raw.fetch_one("playlists", playlist["id"], client=client)
@cache
def get_playlist_by_name(project, name, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
name (str): The playlist name
Returns:
dict: Playlist matching given name for given project.
"""
project = normalize_model_parameter(project)
params = {"project_id": project["id"], "name": name}
return raw.fetch_first("playlists", params=params, client=client)
def new_playlist(
project,
name,
episode=None,
for_entity="shot",
for_client=False,
client=default,
):
"""
Create a new playlist in the database for given project.
Args:
project (str / dict): The project dict or the project ID.
name (str): Playlist name.
Returns:
dict: Created playlist.
"""
project = normalize_model_parameter(project)
data = {
"name": name,
"project_id": project["id"],
"for_entity": for_entity,
"for_client": for_client,
}
if episode is not None:
episode = normalize_model_parameter(episode)
data["episode_id"] = episode["id"]
playlist = get_playlist_by_name(project, name, client=client)
if playlist is None:
playlist = raw.post("data/playlists/", data, client=client)
return playlist
def update_playlist(playlist, client=default):
"""
Save given playlist data into the API. Metadata are fully replaced by
the ones set on given playlist.
Args:
playlist (dict): The playlist dict to update.
Returns:
dict: Updated playlist.
"""
return raw.put(
"data/playlists/%s" % playlist["id"], playlist, client=client
)

View File

@ -1,377 +0,0 @@
from . import client as raw
from .sorting import sort_by_name
from .cache import cache
from .helpers import (
normalize_model_parameter,
normalize_list_of_models_for_links,
)
default = raw.default_client
@cache
def all_project_status(client=default):
"""
Returns:
list: Project status listed in database.
"""
return sort_by_name(raw.fetch_all("project-status", client=client))
@cache
def get_project_status_by_name(project_status_name, client=default):
"""
Args:
project_status_name (str): Name of claimed project status.
Returns:
dict: Project status corresponding to given name.
"""
return raw.fetch_first(
"project-status", {"name": project_status_name}, client=client
)
@cache
def all_projects(client=default):
"""
Returns:
list: Projects stored in the database.
"""
return sort_by_name(raw.fetch_all("projects", client=client))
@cache
def all_open_projects(client=default):
"""
Returns:
Open projects stored in the database.
"""
return sort_by_name(raw.fetch_all("projects/open", client=client))
@cache
def get_project(project_id, client=default):
"""
Args:
project_id (str): ID of claimed project.
Returns:
dict: Project corresponding to given id.
"""
return raw.fetch_one("projects", project_id, client=client)
@cache
def get_project_url(project, section="assets", client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
section (str): The section we want to open in the browser.
Returns:
url (str): Web url associated to the given project
"""
project = normalize_model_parameter(project)
path = "{host}/productions/{project_id}/{section}/"
return path.format(
host=raw.get_api_url_from_host(),
project_id=project["id"],
section=section,
)
@cache
def get_project_by_name(project_name, client=default):
"""
Args:
project_name (str): Name of claimed project.
Returns:
dict: Project corresponding to given name.
"""
return raw.fetch_first("projects", {"name": project_name}, client=client)
def new_project(
name,
production_type="short",
team=[],
asset_types=[],
task_statuses=[],
task_types=[],
client=default,
):
"""
Creates a new project.
Args:
name (str): Name of the project to create.
production_type (str): short, featurefilm, tvshow.
team (list): Team of the project.
asset_types (list): Asset types of the project.
task_statuses (list): Task statuses of the project.
task_types (list): Task types of the project.
Returns:
dict: Created project.
"""
project = get_project_by_name(name, client=client)
if project is None:
project = raw.create(
"projects",
{
"name": name,
"production_type": production_type,
"team": normalize_list_of_models_for_links(team),
"asset_types": normalize_list_of_models_for_links(asset_types),
"task_statuses": normalize_list_of_models_for_links(
task_statuses
),
"task_types": normalize_list_of_models_for_links(task_types),
},
client=client,
)
return project
def remove_project(project, force=False, client=default):
"""
Remove given project from database. (Prior to do that, make sure, there
is no asset or shot left).
Args:
project (dict / str): Project to remove.
"""
project = normalize_model_parameter(project)
path = "data/projects/%s" % project["id"]
if force:
path += "?force=true"
return raw.delete(path, client=client)
def update_project(project, client=default):
"""
Save given project data into the API. Metadata are fully replaced by the
ones set on given project.
Args:
project (dict): The project to update.
Returns:
dict: Updated project.
"""
if "team" in project:
project["team"] = normalize_list_of_models_for_links(project["team"])
if "asset_types" in project:
project["asset_types"] = normalize_list_of_models_for_links(
project["asset_types"]
)
if "task_statuses" in project:
project["task_statuses"] = normalize_list_of_models_for_links(
project["task_statuses"]
)
if "task_types" in project:
project["task_types"] = normalize_list_of_models_for_links(
project["task_types"]
)
return raw.put("data/projects/%s" % project["id"], project, client=client)
def update_project_data(project, data={}, client=default):
"""
Update the metadata for the provided project. Keys that are not provided
are not changed.
Args:
project (dict / ID): The project dict or id to save in database.
data (dict): Free field to set metadata of any kind.
Returns:
dict: Updated project.
"""
project = normalize_model_parameter(project)
project = get_project(project["id"], client=client)
if "data" not in project or project["data"] is None:
project["data"] = {}
project["data"].update(data)
return update_project(project, client=client)
def close_project(project, client=default):
"""
Closes the provided project.
Args:
project (dict / ID): The project dict or id to save in database.
Returns:
dict: Updated project.
"""
project = normalize_model_parameter(project)
closed_status_id = None
for status in all_project_status(client=client):
if status["name"].lower() == "closed":
closed_status_id = status["id"]
project["project_status_id"] = closed_status_id
update_project(project, client=client)
return project
def add_asset_type(project, asset_type, client=default):
project = normalize_model_parameter(project)
asset_type = normalize_model_parameter(asset_type)
data = {"asset_type_id": asset_type["id"]}
return raw.post(
"data/projects/%s/settings/asset-types" % project["id"],
data,
client=client,
)
def add_task_type(project, task_type, priority, client=default):
project = normalize_model_parameter(project)
task_type = normalize_model_parameter(task_type)
data = {"task_type_id": task_type["id"], "priority": priority}
return raw.post(
"data/projects/%s/settings/task-types" % project["id"],
data,
client=client,
)
def add_task_status(project, task_status, client=default):
project = normalize_model_parameter(project)
task_status = normalize_model_parameter(task_status)
data = {"task_status_id": task_status["id"]}
return raw.post(
"data/projects/%s/settings/task-status" % project["id"],
data,
client=client,
)
def add_metadata_descriptor(
project,
name,
entity_type,
choices=[],
for_client=False,
departments=[],
client=default,
):
"""
Create a new metadata descriptor for a project.
Args:
project (dict / ID): The project dict or id.
name (str): The name of the metadata descriptor
entity_type (str): asset, shot or scene.
choices (list): A list of possible values, empty list for free values.
for_client (bool) : Wheter it should be displayed in Kitsu or not.
departments (list): A list of departments dict or id.
Returns:
dict: Created metadata descriptor.
"""
project = normalize_model_parameter(project)
data = {
"name": name,
"choices": choices,
"for_client": for_client,
"entity_type": entity_type,
"departments": normalize_list_of_models_for_links(departments),
}
return raw.post(
"data/projects/%s/metadata-descriptors" % project["id"],
data,
client=client,
)
def get_metadata_descriptor(project, metadata_descriptor_id, client=default):
"""
Get a metadata descriptor matchind it's ID.
Args:
project (dict / ID): The project dict or id.
metadata_descriptor_id (dict / ID): The metadata descriptor dict or id.
Returns:
dict: The metadata descriptor matchind the ID.
"""
project = normalize_model_parameter(project)
metadata_descriptor = normalize_model_parameter(metadata_descriptor_id)
return raw.fetch_one(
"projects/%s/metadata-descriptors" % project["id"],
metadata_descriptor["id"],
client=client,
)
def all_metadata_descriptors(project, client=default):
"""
Get all the metadata descriptors.
Args:
project (dict / ID): The project dict or id.
Returns:
list: The metadata descriptors.
"""
project = normalize_model_parameter(project)
return raw.fetch_all(
"projects/%s/metadata-descriptors" % project["id"],
client=client,
)
def update_metadata_descriptor(project, metadata_descriptor, client=default):
"""
Update a metadata descriptor.
Args:
project (dict / ID): The project dict or id.
metadata_descriptor (dict): The metadata descriptor that needs to be updated.
Returns:
dict: The updated metadata descriptor.
"""
if "departments" in metadata_descriptor:
metadata_descriptor[
"departments"
] = normalize_list_of_models_for_links(
metadata_descriptor["departments"]
)
project = normalize_model_parameter(project)
return raw.put(
"data/projects/%s/metadata-descriptors/%s"
% (project["id"], metadata_descriptor["id"]),
metadata_descriptor,
client=client,
)
def remove_metadata_descriptor(
project, metadata_descriptor_id, force=False, client=default
):
"""
Remove a metadata descriptor.
Args:
project (dict / ID): The project dict or id.
metadata_descriptor_id (dict / ID): The metadata descriptor dict or id.
"""
project = normalize_model_parameter(project)
metadata_descriptor = normalize_model_parameter(metadata_descriptor_id)
params = {}
if force:
params = {"force": "true"}
return raw.delete(
"data/projects/%s/metadata-descriptors/%s"
% (project["id"], metadata_descriptor["id"]),
params,
client=client,
)

View File

@ -1,189 +0,0 @@
from . import client as raw
from .sorting import sort_by_name
from .cache import cache
from .helpers import normalize_model_parameter
from .shot import get_sequence
default = raw.default_client
def new_scene(project, sequence, name, client=default):
"""
Create a scene for given sequence.
"""
project = normalize_model_parameter(project)
sequence = normalize_model_parameter(sequence)
shot = {"name": name, "sequence_id": sequence["id"]}
return raw.post(
"data/projects/%s/scenes" % project["id"], shot, client=client
)
@cache
def all_scenes(project=None, client=default):
"""
Retrieve all scenes.
"""
project = normalize_model_parameter(project)
if project is not None:
scenes = raw.fetch_all(
"projects/%s/scenes" % project["id"], client=client
)
else:
scenes = raw.fetch_all("scenes", client=client)
return sort_by_name(scenes)
@cache
def all_scenes_for_project(project, client=default):
"""
Retrieve all scenes for given project.
"""
project = normalize_model_parameter(project)
scenes = raw.fetch_all("projects/%s/scenes" % project["id"], client=client)
return sort_by_name(scenes)
@cache
def all_scenes_for_sequence(sequence, client=default):
"""
Retrieve all scenes which are children from given sequence.
"""
sequence = normalize_model_parameter(sequence)
return sort_by_name(
raw.fetch_all("sequences/%s/scenes" % sequence["id"], client=client),
)
@cache
def get_scene(scene_id, client=default):
"""
Return scene corresponding to given scene ID.
"""
return raw.fetch_one("scenes", scene_id, client=client)
@cache
def get_scene_by_name(sequence, scene_name, client=default):
"""
Returns scene corresponding to given sequence and name.
"""
sequence = normalize_model_parameter(sequence)
result = raw.fetch_all(
"scenes/all",
{"parent_id": sequence["id"], "name": scene_name},
client=client,
)
return next(iter(result or []), None)
def update_scene(scene, client=default):
"""
Save given scene data into the API.
"""
return raw.put("data/entities/%s" % scene["id"], scene, client=client)
def new_scene_asset_instance(scene, asset, description="", client=default):
"""
Creates a new asset instance on given scene. The instance number is
automatically generated (increment highest number).
"""
scene = normalize_model_parameter(scene)
asset = normalize_model_parameter(asset)
data = {"asset_id": asset["id"], "description": description}
return raw.post(
"data/scenes/%s/asset-instances" % scene["id"], data, client=client
)
@cache
def all_asset_instances_for_scene(scene, client=default):
"""
Return the list of asset instances listed in a scene.
"""
scene = normalize_model_parameter(scene)
return raw.get(
"data/scenes/%s/asset-instances" % scene["id"], client=client
)
@cache
def get_asset_instance_by_name(scene, name, client=default):
"""
Returns the asset instance of the scene that has the given name.
"""
return raw.fetch_first(
"asset-instances",
{"name": name, "scene_id": scene["id"]},
client=client,
)
@cache
def all_camera_instances_for_scene(scene, client=default):
"""
Return the list of camera instances listed in a scene.
"""
scene = normalize_model_parameter(scene)
return raw.get(
"data/scenes/%s/camera-instances" % scene["id"], client=client
)
@cache
def all_shots_for_scene(scene, client=default):
"""
Return the list of shots issued from given scene.
"""
scene = normalize_model_parameter(scene)
return raw.get("data/scenes/%s/shots" % scene["id"], client=client)
def add_shot_to_scene(scene, shot, client=default):
"""
Link a shot to a scene to mark the fact it was generated out from that
scene.
"""
scene = normalize_model_parameter(scene)
shot = normalize_model_parameter(shot)
data = {"shot_id": shot["id"]}
return raw.post("data/scenes/%s/shots" % scene["id"], data, client=client)
def remove_shot_from_scene(scene, shot, client=default):
"""
Remove link between a shot and a scene.
"""
scene = normalize_model_parameter(scene)
shot = normalize_model_parameter(shot)
return raw.delete(
"data/scenes/%s/shots/%s" % (scene["id"], shot["id"]), client=client
)
def update_asset_instance_name(asset_instance, name, client=default):
"""
Update the name of given asset instance.
"""
path = "/data/asset-instances/%s" % asset_instance["id"]
return raw.put(path, {"name": name}, client=client)
def update_asset_instance_data(asset_instance, data, client=default):
"""
Update the extra data of given asset instance.
"""
asset_instance = normalize_model_parameter(asset_instance)
path = "/data/asset-instances/%s" % asset_instance["id"]
return raw.put(path, {"data": data}, client=client)
@cache
def get_sequence_from_scene(scene, client=default):
"""
Return sequence which is parent of given shot.
"""
scene = normalize_model_parameter(scene)
return get_sequence(scene["parent_id"], client=client)

View File

@ -1,628 +0,0 @@
from . import client as raw
from .sorting import sort_by_name
from .cache import cache
from .helpers import normalize_model_parameter
default = raw.default_client
@cache
def all_previews_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Previews from database for given shot.
"""
shot = normalize_model_parameter(shot)
return raw.fetch_all("shots/%s/preview-files" % shot["id"], client=client)
@cache
def all_shots_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Shots from database or for given project.
"""
project = normalize_model_parameter(project)
shots = raw.fetch_all("projects/%s/shots" % project["id"], client=client)
return sort_by_name(shots)
@cache
def all_shots_for_episode(episode, client=default):
"""
Args:
episode (str / dict): The episode dict or the episode ID.
Returns:
list: Shots which are children of given episode.
"""
episode = normalize_model_parameter(episode)
return sort_by_name(
raw.fetch_all("episodes/%s/shots" % episode["id"], client=client)
)
@cache
def all_shots_for_sequence(sequence, client=default):
"""
Args:
sequence (str / dict): The sequence dict or the sequence ID.
Returns:
list: Shots which are children of given sequence.
"""
sequence = normalize_model_parameter(sequence)
return sort_by_name(
raw.fetch_all("sequences/%s/shots" % sequence["id"], client=client)
)
@cache
def all_sequences_for_project(project, client=default):
"""
Args:
sequence (str / dict): The sequence dict or the sequence ID.
Returns:
list: Sequences from database for given project.
"""
project = normalize_model_parameter(project)
path = "projects/%s/sequences" % project["id"]
sequences = raw.fetch_all(path, client=client)
return sort_by_name(sequences)
@cache
def all_sequences_for_episode(episode, client=default):
"""
Args:
sequence (str / dict): The sequence dict or the sequence ID.
Returns:
list: Sequences which are children of given episode.
"""
episode = normalize_model_parameter(episode)
path = "episodes/%s/sequences" % episode["id"]
sequences = raw.fetch_all(path, client=client)
return sort_by_name(sequences)
@cache
def all_episodes_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Episodes from database for given project.
"""
project = normalize_model_parameter(project)
path = "projects/%s/episodes" % project["id"]
episodes = raw.fetch_all(path, client=client)
return sort_by_name(episodes)
@cache
def get_episode(episode_id, client=default):
"""
Args:
episode_id (str): Id of claimed episode.
Returns:
dict: Episode corresponding to given episode ID.
"""
return raw.fetch_one("episodes", episode_id, client=client)
@cache
def get_episode_by_name(project, episode_name, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
episode_name (str): Name of claimed episode.
Returns:
dict: Episode corresponding to given name and project.
"""
project = normalize_model_parameter(project)
return raw.fetch_first(
"episodes",
{"project_id": project["id"], "name": episode_name},
client=client,
)
@cache
def get_episode_from_sequence(sequence, client=default):
"""
Args:
sequence (dict): The sequence dict.
Returns:
dict: Episode which is parent of given sequence.
"""
if sequence["parent_id"] is None:
return None
else:
return get_episode(sequence["parent_id"], client=client)
@cache
def get_sequence(sequence_id, client=default):
"""
Args:
sequence_id (str): ID of claimed sequence.
Returns:
dict: Sequence corresponding to given sequence ID.
"""
return raw.fetch_one("sequences", sequence_id, client=client)
@cache
def get_sequence_by_name(project, sequence_name, episode=None, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
sequence_name (str): Name of claimed sequence.
episode (str / dict): The episode dict or the episode ID (optional).
Returns:
dict: Seqence corresponding to given name and project (and episode in
case of TV Show).
"""
project = normalize_model_parameter(project)
if episode is None:
params = {"project_id": project["id"], "name": sequence_name}
else:
episode = normalize_model_parameter(episode)
params = {"episode_id": episode["id"], "name": sequence_name}
return raw.fetch_first("sequences", params, client=client)
@cache
def get_sequence_from_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
dict: Sequence which is parent of given shot.
"""
shot = normalize_model_parameter(shot)
return get_sequence(shot["parent_id"], client=client)
@cache
def get_shot(shot_id, client=default):
"""
Args:
shot_id (str): Id of claimed shot.
Returns:
dict: Shot corresponding to given shot ID.
"""
return raw.fetch_one("shots", shot_id, client=client)
@cache
def get_shot_by_name(sequence, shot_name, client=default):
"""
Args:
sequence (str / dict): The sequence dict or the sequence ID.
shot_name (str): Name of claimed shot.
Returns:
dict: Shot corresponding to given name and sequence.
"""
sequence = normalize_model_parameter(sequence)
return raw.fetch_first(
"shots/all",
{"sequence_id": sequence["id"], "name": shot_name},
client=client,
)
@cache
def get_episode_url(episode, client=default):
"""
Args:
episode (str / dict): The episode dict or the episode ID.
Returns:
url (str): Web url associated to the given episode
"""
episode = normalize_model_parameter(episode)
episode = get_episode(episode["id"])
path = "{host}/productions/{project_id}/episodes/{episode_id}/shots"
return path.format(
host=raw.get_api_url_from_host(client=client),
project_id=episode["project_id"],
episode_id=episode["id"],
)
@cache
def get_shot_url(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
url (str): Web url associated to the given shot
"""
shot = normalize_model_parameter(shot)
shot = get_shot(shot["id"])
path = "{host}/productions/{project_id}/"
if shot["episode_id"] is None:
path += "shots/{shot_id}/"
else:
path += "episodes/{episode_id}/shots/{shot_id}/"
return path.format(
host=raw.get_api_url_from_host(client=client),
project_id=shot["project_id"],
shot_id=shot["id"],
episode_id=shot["episode_id"],
)
def new_sequence(project, name, episode=None, client=default):
"""
Create a sequence for given project and episode.
Args:
project (str / dict): The project dict or the project ID.
episode (str / dict): The episode dict or the episode ID.
name (str): The name of the sequence to create.
Returns:
Created sequence.
"""
project = normalize_model_parameter(project)
data = {"name": name}
if episode is not None:
episode = normalize_model_parameter(episode)
data["episode_id"] = episode["id"]
sequence = get_sequence_by_name(
project, name, episode=episode, client=client
)
if sequence is None:
path = "data/projects/%s/sequences" % project["id"]
return raw.post(path, data, client=client)
else:
return sequence
def new_shot(
project,
sequence,
name,
nb_frames=None,
frame_in=None,
frame_out=None,
description=None,
data={},
client=default,
):
"""
Create a shot for given sequence and project. Add frame in and frame out
parameters to shot extra data. Allow to set metadata too.
Args:
project (str / dict): The project dict or the project ID.
sequence (str / dict): The sequence dict or the sequence ID.
name (str): The name of the shot to create.
frame_in (int):
frame_out (int):
data (dict): Free field to set metadata of any kind.
Returns:
Created shot.
"""
project = normalize_model_parameter(project)
sequence = normalize_model_parameter(sequence)
if frame_in is not None:
data["frame_in"] = frame_in
if frame_out is not None:
data["frame_out"] = frame_out
data = {"name": name, "data": data, "sequence_id": sequence["id"]}
if nb_frames is not None:
data["nb_frames"] = nb_frames
if description is not None:
data["description"] = description
shot = get_shot_by_name(sequence, name, client=client)
if shot is None:
path = "data/projects/%s/shots" % project["id"]
return raw.post(path, data, client=client)
else:
return shot
def update_shot(shot, client=default):
"""
Save given shot data into the API. Metadata are fully replaced by the ones
set on given shot.
Args:
shot (dict): The shot dict to update.
Returns:
dict: Updated shot.
"""
return raw.put("data/entities/%s" % shot["id"], shot, client=client)
def update_sequence(sequence, client=default):
"""
Save given sequence data into the API. Metadata are fully replaced by the
ones set on given sequence.
Args:
sequence (dict): The sequence dict to update.
Returns:
dict: Updated sequence.
"""
return raw.put(
"data/entities/%s" % sequence["id"], sequence, client=client
)
@cache
def get_asset_instances_for_shot(shot, client=default):
"""
Return the list of asset instances linked to given shot.
"""
shot = normalize_model_parameter(shot)
return raw.get("data/shots/%s/asset-instances" % shot["id"], client=client)
def update_shot_data(shot, data={}, client=default):
"""
Update the metadata for the provided shot. Keys that are not provided are
not changed.
Args:
shot (dict / ID): The shot dict or ID to save in database.
data (dict): Free field to set metadata of any kind.
Returns:
dict: Updated shot.
"""
shot = normalize_model_parameter(shot)
current_shot = get_shot(shot["id"], client=client)
updated_shot = {"id": current_shot["id"], "data": current_shot["data"]}
updated_shot["data"].update(data)
return update_shot(updated_shot, client=client)
def update_sequence_data(sequence, data={}, client=default):
"""
Update the metadata for the provided sequence. Keys that are not provided
are not changed.
Args:
sequence (dict / ID): The sequence dicto or ID to save in database.
data (dict): Free field to set metadata of any kind.
Returns:
dict: Updated sequence.
"""
sequence = normalize_model_parameter(sequence)
current_sequence = get_sequence(sequence["id"], client=client)
if not current_sequence.get("data"):
current_sequence["data"] = {}
updated_sequence = {
"id": current_sequence["id"],
"data": current_sequence["data"],
}
updated_sequence["data"].update(data)
return update_sequence(updated_sequence, client)
def remove_shot(shot, force=False, client=default):
"""
Remove given shot from database.
Args:
shot (dict / str): Shot to remove.
"""
shot = normalize_model_parameter(shot)
path = "data/shots/%s" % shot["id"]
params = {}
if force:
params = {"force": "true"}
return raw.delete(path, params, client=client)
def restore_shot(shot, client=default):
"""
Restore given shot into database (uncancel it).
Args:
shot (dict / str): Shot to restore.
"""
shot = normalize_model_parameter(shot)
path = "data/shots/%s" % shot["id"]
data = {"canceled": False}
return raw.put(path, data, client=client)
def new_episode(project, name, client=default):
"""
Create an episode for given project.
Args:
project (str / dict): The project dict or the project ID.
name (str): The name of the episode to create.
Returns:
dict: Created episode.
"""
project = normalize_model_parameter(project)
data = {"name": name}
episode = get_episode_by_name(project, name, client=client)
if episode is None:
return raw.post(
"data/projects/%s/episodes" % project["id"], data, client=client
)
else:
return episode
def update_episode(episode, client=default):
"""
Save given episode data into the API. Metadata are fully replaced by the
ones set on given episode.
Args:
episode (dict): The episode dict to update.
Returns:
dict: Updated episode.
"""
return raw.put("data/entities/%s" % episode["id"], episode, client=client)
def update_episode_data(episode, data={}, client=default):
"""
Update the metadata for the provided episode. Keys that are not provided
are not changed.
Args:
episode (dict / ID): The episode dict or ID to save in database.
data (dict): Free field to set metadata of any kind.
Returns:
dict: Updated episode.
"""
episode = normalize_model_parameter(episode)
current_episode = get_sequence(episode["id"], client=client)
updated_episode = {
"id": current_episode["id"],
"data": current_episode["data"],
}
updated_episode["data"].update(data)
return update_episode(updated_episode, client=client)
def remove_episode(episode, force=False, client=default):
"""
Remove given episode and related from database.
Args:
episode (dict / str): Episode to remove.
"""
episode = normalize_model_parameter(episode)
path = "data/episodes/%s" % episode["id"]
params = {}
if force:
params = {"force": "true"}
return raw.delete(path, params=params, client=client)
def remove_sequence(sequence, force=False, client=default):
"""
Remove given sequence and related from database.
Args:
sequence (dict / str): Sequence to remove.
"""
sequence = normalize_model_parameter(sequence)
path = "data/sequences/%s" % sequence["id"]
params = {}
if force:
params = {"force": "true"}
return raw.delete(path, params=params, client=client)
@cache
def all_asset_instances_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Asset instances linked to given shot.
"""
shot = normalize_model_parameter(shot)
return raw.get("data/shots/%s/asset-instances" % shot["id"], client=client)
def add_asset_instance_to_shot(shot, asset_instance, client=default):
"""
Link a new asset instance to given shot.
Args:
shot (str / dict): The shot dict or the shot ID.
asset_instance (str / dict): The asset instance dict or ID.
Returns:
dict: Related shot.
"""
shot = normalize_model_parameter(shot)
asset_instance = normalize_model_parameter(asset_instance)
data = {"asset_instance_id": asset_instance["id"]}
path = "data/shots/%s/asset-instances" % shot["id"]
return raw.post(path, data, client=client)
def remove_asset_instance_from_shot(shot, asset_instance, client=default):
"""
Remove link between an asset instance and given shot.
Args:
shot (str / dict): The shot dict or the shot ID.
asset_instance (str / dict): The asset instance dict or ID.
"""
shot = normalize_model_parameter(shot)
asset_instance = normalize_model_parameter(asset_instance)
path = "data/shots/%s/asset-instances/%s" % (
shot["id"],
asset_instance["id"],
)
return raw.delete(path, client=client)
def import_shots_with_csv(project, csv_file_path, client=default):
project = normalize_model_parameter(project)
return raw.upload(
"import/csv/projects/%s/shots" % project["id"],
csv_file_path,
client=client,
)
def export_shots_with_csv(
project, csv_file_path, episode=None, assigned_to=None, client=default
):
project = normalize_model_parameter(project)
episode = normalize_model_parameter(episode)
assigned_to = normalize_model_parameter(assigned_to)
params = {}
if episode:
params["episode_id"] = episode["id"]
if assigned_to:
params["assigned_to"] = assigned_to["id"]
return raw.download(
"export/csv/projects/%s/shots.csv" % project["id"],
csv_file_path,
params=params,
client=client,
)

View File

@ -1,11 +0,0 @@
def sort_by_name(dicts):
"""
Sorting of a list of dicts. The sorting is based on the name field.
Args:
list: The list of dicts to sort.
Returns:
Sorted list.
"""
return sorted(dicts, key=lambda k: k.get("name", "").lower())

View File

@ -1,172 +0,0 @@
from . import client as raw
from .helpers import normalize_model_parameter, validate_date_format
default = raw.default_client
def get_last_events(
page_size=20000, project=None, after=None, before=None, client=default
):
"""
Get last events that occured on the machine.
Args:
page_size (int): Number of events to retrieve.
project (dict/id): Get only events related to this project.
after (dict/id): Get only events occuring after given date.
before (dict/id): Get only events occuring before given date.
Returns:
dict: Last events matching criterions.
"""
path = "/data/events/last"
params = {"page_size": page_size}
if project is not None:
project = normalize_model_parameter(project)
params["project_id"] = project["id"]
if after is not None:
params["after"] = validate_date_format(after)
if before is not None:
params["before"] = validate_date_format(before)
return raw.get(path, params=params, client=client)
def import_entities(entities, client=default):
"""
Import entities from another instance to target instance (keep id and audit
dates).
Args:
entities (list): Entities to import.
Returns:
dict: Entities created.
"""
return raw.post("import/kitsu/entities", entities, client=client)
def import_tasks(tasks, client=default):
"""
Import tasks from another instance to target instance (keep id and audit
dates).
Args:
tasks (list): Tasks to import.
Returns:
dict: Tasks created.
"""
return raw.post("import/kitsu/tasks", tasks, client=client)
def import_entity_links(links, client=default):
"""
Import enitity links from another instance to target instance (keep id and
audit dates).
Args:
links (list): Entity links to import.
Returns:
dict: Entity links created.
"""
return raw.post("import/kitsu/entity-links", links, client=client)
def get_model_list_diff(source_list, target_list):
"""
Args:
source_list (list): List of models to compare.
target_list (list): List of models for which we want a diff.
Returns:
tuple: Two lists, one containing the missing models in the target list
and one containing the models that should not be in the target list.
"""
missing = []
source_ids = {m["id"]: True for m in source_list}
target_ids = {m["id"]: True for m in target_list}
for model in source_list:
if model["id"] not in target_ids:
missing.append(model)
unexpected = [
model for model in target_list if model["id"] not in source_ids
]
return (missing, unexpected)
def get_link_list_diff(source_list, target_list):
"""
Args:
source_list (list): List of links to compare.
target_list (list): List of links for which we want a diff.
Returns:
tuple: Two lists, one containing the missing links in the target list
and one containing the links that should not be in the target list.
Links are identified by their in ID and their out ID.
"""
def get_link_key(l):
return l["entity_in_id"] + "-" + l["entity_out_id"]
missing = []
unexpected = []
source_ids = {get_link_key(m): True for m in source_list}
target_ids = {get_link_key(m): True for m in target_list}
for link in source_list:
if get_link_key(link) not in target_ids:
missing.append(link)
for link in target_list:
if get_link_key(link) not in source_ids:
unexpected.append(link)
return (missing, unexpected)
def get_id_map_by_name(source_list, target_list):
"""
Args:
source_list (list): List of links to compare.
target_list (list): List of links for which we want a diff.
Returns:
dict: A dict where keys are the source model names and the values are
the IDs of the target models with same name.
It's useful to match a model from the source list to its relative in
the target list based on its name.
"""
link_map = {}
name_map = {}
for model in target_list:
name_map[model["name"].lower()] = model["id"]
for model in source_list:
if model["name"].lower() in name_map:
link_map[model["name"]] = name_map[model["name"].lower()]
return link_map
def get_id_map_by_id(source_list, target_list, field="name"):
"""
Args:
source_list (list): List of links to compare.
target_list (list): List of links for which we want a diff.
Returns:
dict: A dict where keys are the source model names and the values are
the IDs of the target models with same name.
It's useful to match a model from the source list to its relative in
the target list based on its name.
"""
link_map = {}
name_map = {}
for model in target_list:
name_map[model[field].lower()] = model["id"]
for model in source_list:
if model[field].lower() in name_map:
link_map[model["id"]] = name_map[model[field].lower()]
return link_map
def is_changed(source_model, target_model):
source_date = source_model["updated_at"]
target_date = target_model["updated_at"]
return source_date > target_date

File diff suppressed because it is too large Load Diff

View File

@ -1,270 +0,0 @@
import datetime
from .exception import NotAuthenticatedException
from . import client as raw
from .sorting import sort_by_name
from .helpers import normalize_model_parameter
from .cache import cache
default = raw.default_client
@cache
def all_open_projects(client=default):
"""
Returns:
list: Projects for which the user is part of the team. Admins see all
projects
"""
projects = raw.fetch_all("user/projects/open", client=client)
return sort_by_name(projects)
@cache
def all_asset_types_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Asset types for which the user has a task assigned for given
project.
"""
project = normalize_model_parameter(project)
path = "user/projects/%s/asset-types" % project["id"]
asset_types = raw.fetch_all(path, client=client)
return sort_by_name(asset_types)
@cache
def all_assets_for_asset_type_and_project(project, asset_type, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
asset_type (str / dict): The asset type dict or ID.
Returns:
list: Assets for given project and asset type and for which the user has
a task assigned.
"""
project = normalize_model_parameter(project)
asset_type = normalize_model_parameter(asset_type)
path = "user/projects/%s/asset-types/%s/assets" % (
project["id"],
asset_type["id"],
)
assets = raw.fetch_all(path, client=client)
return sort_by_name(assets)
@cache
def all_tasks_for_asset(asset, client=default):
"""
Args:
asset (str / dict): The asset dict or the asset ID.
Returns:
list: Tasks for given asset and current user.
"""
asset = normalize_model_parameter(asset)
path = "user/assets/%s/tasks" % asset["id"]
tasks = raw.fetch_all(path, client=client)
return sort_by_name(tasks)
@cache
def all_tasks_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Tasks assigned to current user for given shot.
"""
shot = normalize_model_parameter(shot)
path = "user/shots/%s/tasks" % shot["id"]
tasks = raw.fetch_all(path, client=client)
return sort_by_name(tasks)
@cache
def all_tasks_for_scene(scene, client=default):
"""
Args:
scene (str / dict): The scene dict or the scene ID.
Returns:
list: Tasks assigned to current user for given scene.
"""
scene = normalize_model_parameter(scene)
path = "user/scene/%s/tasks" % scene["id"]
tasks = raw.fetch_all(path, client=client)
return sort_by_name(tasks)
@cache
def all_tasks_for_sequence(sequence, client=default):
"""
Return the list of tasks for given asset and current user.
"""
sequence = normalize_model_parameter(sequence)
path = "user/sequences/%s/tasks" % sequence["id"]
tasks = raw.fetch_all(path, client=client)
return sort_by_name(tasks)
@cache
def all_task_types_for_asset(asset, client=default):
"""
Args:
asset (str / dict): The asset dict or the asset ID.
Returns:
list: Task Types of tasks assigned to current user for given asset.
"""
asset = normalize_model_parameter(asset)
path = "user/assets/%s/task-types" % asset["id"]
tasks = raw.fetch_all(path, client=client)
return sort_by_name(tasks)
@cache
def all_task_types_for_shot(shot, client=default):
"""
Args:
shot (str / dict): The shot dict or the shot ID.
Returns:
list: Task Types of tasks assigned to current user for given shot.
"""
shot = normalize_model_parameter(shot)
path = "user/shots/%s/task-types" % shot["id"]
task_types = raw.fetch_all(path, client=client)
return sort_by_name(task_types)
@cache
def all_task_types_for_scene(scene, client=default):
"""
Args:
scene (str / dict): The scene dict or the scene ID.
Returns:
list: Task Types of tasks assigned to current user for given scene.
"""
scene = normalize_model_parameter(scene)
path = "user/scenes/%s/task-types" % scene["id"]
task_types = raw.fetch_all(path, client=client)
return sort_by_name(task_types)
@cache
def all_task_types_for_sequence(sequence, client=default):
"""
return the list of task_tyes for given asset and current user.
"""
sequence = normalize_model_parameter(sequence)
path = "user/sequences/%s/task-types" % sequence["id"]
task_types = raw.fetch_all(path, client=client)
return sort_by_name(task_types)
@cache
def all_sequences_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Sequences for which user has tasks assigned for given project.
"""
project = normalize_model_parameter(project)
path = "user/projects/%s/sequences" % project["id"]
sequences = raw.fetch_all(path, client=client)
return sort_by_name(sequences)
@cache
def all_episodes_for_project(project, client=default):
"""
Args:
project (str / dict): The project dict or the project ID.
Returns:
list: Episodes for which user has tasks assigned for given project.
"""
path = "user/projects/%s/episodes" % project["id"]
asset_types = raw.fetch_all(path, client=client)
return sort_by_name(asset_types)
@cache
def all_shots_for_sequence(sequence, client=default):
"""
Args:
sequence (str / dict): The sequence dict or the sequence ID.
Returns:
list: Shots for which user has tasks assigned for given sequence.
"""
sequence = normalize_model_parameter(sequence)
path = "user/sequences/%s/shots" % sequence["id"]
shots = raw.fetch_all(path, client=client)
return sort_by_name(shots)
@cache
def all_scenes_for_sequence(sequence, client=default):
"""
Args:
sequence (str / dict): The sequence dict or the sequence ID.
Returns:
list: Scenes for which user has tasks assigned for given sequence.
"""
sequence = normalize_model_parameter(sequence)
path = "user/sequences/%s/scenes" % sequence["id"]
scenes = raw.fetch_all(path, client=client)
return sort_by_name(scenes)
@cache
def all_tasks_to_do(client=default):
"""
Returns:
list: Tasks assigned to current user which are not complete.
"""
return raw.fetch_all("user/tasks", client=client)
@cache
def all_done_tasks(client=default):
"""
Returns:
list: Tasks assigned to current user which are done.
"""
return raw.fetch_all("user/done-tasks", client=client)
def log_desktop_session_log_in(client=default):
"""
Add a log entry to mention that the user logged in his computer.
Returns:
dict: Desktop session log entry.
"""
path = "/data/user/desktop-login-logs"
data = {"date": datetime.datetime.now().isoformat()}
return raw.post(path, data, client=client)
def is_authenticated(client=default):
"""
Returns:
boolean: Current user authenticated or not
"""
try:
return raw.get("auth/authenticated")["authenticated"]
except NotAuthenticatedException:
return False

View File

@ -26,8 +26,7 @@ from blender_kitsu.shot_builder.render_settings import RenderSettings
from blender_kitsu.shot_builder.connectors.connector import Connector from blender_kitsu.shot_builder.connectors.connector import Connector
import requests import requests
from blender_kitsu import cache from blender_kitsu import cache
from blender_kitsu.gazu.asset import all_assets_for_shot import gazu
from blender_kitsu.gazu.shot import all_shots_for_project, all_sequences_for_project
import typing import typing
import logging import logging
@ -168,7 +167,7 @@ class KitsuConnector(Connector):
def get_shots(self) -> typing.List[ShotRef]: def get_shots(self) -> typing.List[ShotRef]:
project = cache.project_active_get() project = cache.project_active_get()
kitsu_sequences = all_sequences_for_project(project.id) kitsu_sequences = gazu.shot.all_sequences_for_project(project.id)
sequence_lookup = { sequence_lookup = {
sequence_data['id']: KitsuSequenceRef( sequence_data['id']: KitsuSequenceRef(
@ -179,7 +178,7 @@ class KitsuConnector(Connector):
for sequence_data in kitsu_sequences for sequence_data in kitsu_sequences
} }
kitsu_shots = all_shots_for_project(project.id) kitsu_shots = gazu.shot.all_shots_for_project(project.id)
shots: typing.List[ShotRef] = [] shots: typing.List[ShotRef] = []
for shot_data in kitsu_shots: for shot_data in kitsu_shots:
@ -230,7 +229,7 @@ class KitsuConnector(Connector):
return shots return shots
def get_assets_for_shot(self, shot: Shot) -> typing.List[AssetRef]: def get_assets_for_shot(self, shot: Shot) -> typing.List[AssetRef]:
kitsu_assets = all_assets_for_shot(shot.kitsu_id) kitsu_assets = gazu.asset.all_assets_for_shot(shot.kitsu_id)
return [ return [
AssetRef(name=asset_data['name'], code=asset_data['code']) AssetRef(name=asset_data['name'], code=asset_data['code'])

View File

@ -1,7 +1,8 @@
import bpy import bpy
from typing import Set from typing import Set
from blender_kitsu.shot_builder.editorial.core import editorial_export_get_latest from blender_kitsu.shot_builder.editorial.core import editorial_export_get_latest
from blender_kitsu import cache, gazu from blender_kitsu import cache
import gazu
class ANIM_SETUP_OT_load_latest_editorial(bpy.types.Operator): class ANIM_SETUP_OT_load_latest_editorial(bpy.types.Operator):

View File

@ -20,37 +20,59 @@
import pathlib import pathlib
from typing import * from typing import *
import bpy import bpy
import gazu
from blender_kitsu.shot_builder.shot import ShotRef from blender_kitsu.shot_builder.shot import ShotRef
from blender_kitsu.shot_builder.project import ensure_loaded_production, get_active_production from blender_kitsu.shot_builder.project import (
ensure_loaded_production,
get_active_production,
)
from blender_kitsu.shot_builder.builder import ShotBuilder from blender_kitsu.shot_builder.builder import ShotBuilder
from blender_kitsu.shot_builder.task_type import TaskType from blender_kitsu.shot_builder.task_type import TaskType
from blender_kitsu import prefs, cache, gazu from blender_kitsu import prefs, cache
from blender_kitsu.shot_builder.anim_setup.core import animation_workspace_delete_others, animation_workspace_vse_area_add from blender_kitsu.shot_builder.anim_setup.core import (
animation_workspace_delete_others,
animation_workspace_vse_area_add,
)
from blender_kitsu.shot_builder.editorial.core import editorial_export_get_latest from blender_kitsu.shot_builder.editorial.core import editorial_export_get_latest
from blender_kitsu.shot_builder.builder.save_file import save_shot_builder_file from blender_kitsu.shot_builder.builder.save_file import save_shot_builder_file
_production_task_type_items: List[Tuple[str, str, str]] = [] _production_task_type_items: List[Tuple[str, str, str]] = []
def production_task_type_items(self: Any, context: bpy.types.Context) -> List[Tuple[str, str, str]]:
def production_task_type_items(
self: Any, context: bpy.types.Context
) -> List[Tuple[str, str, str]]:
global _production_task_type_items global _production_task_type_items
return _production_task_type_items return _production_task_type_items
_production_seq_id_items: List[Tuple[str, str, str]] = [] _production_seq_id_items: List[Tuple[str, str, str]] = []
def production_seq_id_items(self: Any, context: bpy.types.Context) -> List[Tuple[str, str, str]]:
def production_seq_id_items(
self: Any, context: bpy.types.Context
) -> List[Tuple[str, str, str]]:
global _production_seq_id_items global _production_seq_id_items
return _production_seq_id_items return _production_seq_id_items
_production_shots: List[ShotRef] = [] _production_shots: List[ShotRef] = []
def production_shots(self: Any, context: bpy.types.Context) -> List[Tuple[str, str, str]]:
def production_shots(
self: Any, context: bpy.types.Context
) -> List[Tuple[str, str, str]]:
global _production_shots global _production_shots
return _production_shots return _production_shots
_production_shot_id_items_for_seq: List[Tuple[str, str, str]] = [] _production_shot_id_items_for_seq: List[Tuple[str, str, str]] = []
def production_shot_id_items_for_seq(self: Any, context: bpy.types.Context) -> List[Tuple[str, str, str]]:
def production_shot_id_items_for_seq(
self: Any, context: bpy.types.Context
) -> List[Tuple[str, str, str]]:
global _production_shot_id_items_for_seq global _production_shot_id_items_for_seq
global _production_shot_id_items global _production_shot_id_items
@ -58,23 +80,27 @@ def production_shot_id_items_for_seq(self: Any, context: bpy.types.Context) -> L
return [] return []
shots_for_seq: List[Tuple(str, str, str)] = [ shots_for_seq: List[Tuple(str, str, str)] = [
(s.name, s.name, "") for s in _production_shots (s.name, s.name, "")
for s in _production_shots
if s.sequence.name == self.seq_id if s.sequence.name == self.seq_id
] ]
_production_shot_id_items_for_seq.clear() _production_shot_id_items_for_seq.clear()
_production_shot_id_items_for_seq.extend(shots_for_seq) _production_shot_id_items_for_seq.extend(shots_for_seq)
return _production_shot_id_items_for_seq return _production_shot_id_items_for_seq
def reset_shot_id_enum(self : Any, context: bpy.types.Context) -> None:
def reset_shot_id_enum(self: Any, context: bpy.types.Context) -> None:
production_shot_id_items_for_seq(self, context) production_shot_id_items_for_seq(self, context)
global _production_shot_id_items_for_seq global _production_shot_id_items_for_seq
if _production_shot_id_items_for_seq: if _production_shot_id_items_for_seq:
self.shot_id = _production_shot_id_items_for_seq[0][0] self.shot_id = _production_shot_id_items_for_seq[0][0]
class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator): class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
"""Build a new shot file""" """Build a new shot file"""
bl_idname = "shotbuilder.new_shot_file" bl_idname = "shotbuilder.new_shot_file"
bl_label = "New Production Shot File" bl_label = "New Production Shot File"
@ -84,14 +110,13 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
_file_path = '' _file_path = ''
production_root: bpy.props.StringProperty( # type: ignore production_root: bpy.props.StringProperty( # type: ignore
name="Production Root", name="Production Root", description="Root of the production", subtype='DIR_PATH'
description="Root of the production", )
subtype='DIR_PATH')
production_name: bpy.props.StringProperty( # type: ignore production_name: bpy.props.StringProperty( # type: ignore
name="Production", name="Production",
description="Name of the production to create a shot file for", description="Name of the production to create a shot file for",
options=set() options=set(),
) )
seq_id: bpy.props.EnumProperty( # type: ignore seq_id: bpy.props.EnumProperty( # type: ignore
@ -110,7 +135,7 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
task_type: bpy.props.EnumProperty( # type: ignore task_type: bpy.props.EnumProperty( # type: ignore
name="Task", name="Task",
description="Task to create the shot file for", description="Task to create the shot file for",
items=production_task_type_items items=production_task_type_items,
) )
auto_save: bpy.props.BoolProperty( auto_save: bpy.props.BoolProperty(
name="Save after building.", name="Save after building.",
@ -119,16 +144,16 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
) )
def modal(self, context, event): def modal(self, context, event):
if event.type == 'TIMER' and not self._add_vse_area: if event.type == 'TIMER' and not self._add_vse_area:
# Show Storyboard/Animatic from VSE # Show Storyboard/Animatic from VSE
"""Running as Modal Event because functions within execute() function like """Running as Modal Event because functions within execute() function like
animation_workspace_delete_others() changed UI context that needs to be refreshed. animation_workspace_delete_others() changed UI context that needs to be refreshed.
https://docs.blender.org/api/current/info_gotcha.html#no-updates-after-changing-ui-context""" https://docs.blender.org/api/current/info_gotcha.html#no-updates-after-changing-ui-context
#TODO this is a hack, should be inherient to above builder """
#TODO fix during refactor # TODO this is a hack, should be inherient to above builder
# TODO fix during refactor
if self.task_type == 'anim': if self.task_type == 'anim':
animation_workspace_vse_area_add(context) animation_workspace_vse_area_add(context)
self._add_vse_area = True self._add_vse_area = True
if self._built_shot and self._add_vse_area: if self._built_shot and self._add_vse_area:
@ -136,14 +161,19 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
file_path = pathlib.Path() file_path = pathlib.Path()
try: try:
save_shot_builder_file(self._file_path) save_shot_builder_file(self._file_path)
self.report({"INFO"}, f"Saved Shot{self.shot_id} at {self._file_path}") self.report(
{"INFO"}, f"Saved Shot{self.shot_id} at {self._file_path}"
)
return {'FINISHED'} return {'FINISHED'}
except FileExistsError: except FileExistsError:
self.report({"ERROR"}, f"Cannot create a file/folder when that file/folder already exists {file_path}") self.report(
{"ERROR"},
f"Cannot create a file/folder when that file/folder already exists {file_path}",
)
return {'CANCELLED'} return {'CANCELLED'}
self.report({"INFO"}, f"Built Shot {self.shot_id}, file is not saved!") self.report({"INFO"}, f"Built Shot {self.shot_id}, file is not saved!")
return {'FINISHED'} return {'FINISHED'}
return {'PASS_THROUGH'} return {'PASS_THROUGH'}
def invoke(self, context: bpy.types.Context, event: bpy.types.Event) -> Set[str]: def invoke(self, context: bpy.types.Context, event: bpy.types.Event) -> Set[str]:
@ -152,35 +182,39 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
if addon_prefs.session.is_auth() is False: if addon_prefs.session.is_auth() is False:
self.report( self.report(
{'ERROR'}, "Must be logged into Kitsu to continue. \nCheck login status in 'Blender Kitsu' addon preferences.") {'ERROR'},
"Must be logged into Kitsu to continue. \nCheck login status in 'Blender Kitsu' addon preferences.",
)
return {'CANCELLED'} return {'CANCELLED'}
if project.id == "": if project.id == "":
self.report( self.report(
{'ERROR'}, "Operator is not able to determine the Kitsu production's name. \nCheck project is selected in 'Blender Kitsu' addon preferences.") {'ERROR'},
"Operator is not able to determine the Kitsu production's name. \nCheck project is selected in 'Blender Kitsu' addon preferences.",
)
return {'CANCELLED'} return {'CANCELLED'}
if not addon_prefs.is_project_root_valid: if not addon_prefs.is_project_root_valid:
self.report( self.report(
{'ERROR'}, "Operator is not able to determine the project root directory. \nCheck project root directiory is configured in 'Blender Kitsu' addon preferences.") {'ERROR'},
"Operator is not able to determine the project root directory. \nCheck project root directiory is configured in 'Blender Kitsu' addon preferences.",
)
return {'CANCELLED'} return {'CANCELLED'}
self.production_root = addon_prefs.project_root_dir self.production_root = addon_prefs.project_root_dir
self.production_name = project.name self.production_name = project.name
if not ensure_loaded_production(context): if not ensure_loaded_production(context):
self.report( self.report(
{'ERROR'}, "Shot builder configuration files not found in current project directory. \nCheck addon preferences to ensure project root contains shot_builder config.") {'ERROR'},
"Shot builder configuration files not found in current project directory. \nCheck addon preferences to ensure project root contains shot_builder config.",
)
return {'CANCELLED'} return {'CANCELLED'}
production = get_active_production() production = get_active_production()
global _production_task_type_items global _production_task_type_items
_production_task_type_items = production.get_task_type_items( _production_task_type_items = production.get_task_type_items(context=context)
context=context)
global _production_seq_id_items global _production_seq_id_items
_production_seq_id_items = production.get_seq_items(context=context) _production_seq_id_items = production.get_seq_items(context=context)
@ -188,7 +222,9 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
global _production_shots global _production_shots
_production_shots = production.get_shots(context=context) _production_shots = production.get_shots(context=context)
return cast(Set[str], context.window_manager.invoke_props_dialog(self, width=400)) return cast(
Set[str], context.window_manager.invoke_props_dialog(self, width=400)
)
def execute(self, context: bpy.types.Context) -> Set[str]: def execute(self, context: bpy.types.Context) -> Set[str]:
addon_prefs = bpy.context.preferences.addons["blender_kitsu"].preferences addon_prefs = bpy.context.preferences.addons["blender_kitsu"].preferences
@ -197,36 +233,44 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
wm.modal_handler_add(self) wm.modal_handler_add(self)
if not self.production_root: if not self.production_root:
self.report( self.report(
{'ERROR'}, "Shot builder can only be started from the File menu. Shortcuts like CTRL-N don't work") {'ERROR'},
"Shot builder can only be started from the File menu. Shortcuts like CTRL-N don't work",
)
return {'CANCELLED'} return {'CANCELLED'}
if self._built_shot: if self._built_shot:
return {'RUNNING_MODAL'} return {'RUNNING_MODAL'}
ensure_loaded_production(context) ensure_loaded_production(context)
production = get_active_production() production = get_active_production()
shot_builder = ShotBuilder( shot_builder = ShotBuilder(
context=context, production=production, shot_name=self.shot_id, task_type=TaskType(self.task_type)) context=context,
production=production,
shot_name=self.shot_id,
task_type=TaskType(self.task_type),
)
shot_builder.create_build_steps() shot_builder.create_build_steps()
shot_builder.build() shot_builder.build()
# Build Kitsu Context # Build Kitsu Context
sequence = gazu.shot.get_sequence_by_name(production.config['KITSU_PROJECT_ID'], self.seq_id) sequence = gazu.shot.get_sequence_by_name(
production.config['KITSU_PROJECT_ID'], self.seq_id
)
shot = gazu.shot.get_shot_by_name(sequence, self.shot_id) shot = gazu.shot.get_shot_by_name(sequence, self.shot_id)
#TODO this is a hack, should be inherient to above builder # TODO this is a hack, should be inherient to above builder
#TODO fix during refactor # TODO fix during refactor
if self.task_type == 'anim': if self.task_type == 'anim':
#Load EDIT # Load EDIT
editorial_export_get_latest(context, shot) editorial_export_get_latest(context, shot)
# Load Anim Workspace # Load Anim Workspace
animation_workspace_delete_others() animation_workspace_delete_others()
# Initilize armatures # Initilize armatures
for obj in [obj for obj in bpy.data.objects if obj.type == "ARMATURE"]: for obj in [obj for obj in bpy.data.objects if obj.type == "ARMATURE"]:
base_name = obj.name.split( base_name = obj.name.split(addon_prefs.shot_builder_armature_prefix)[-1]
addon_prefs.shot_builder_armature_prefix)[-1]
new_action = bpy.data.actions.new( new_action = bpy.data.actions.new(
f"{addon_prefs.shot_builder_action_prefix}{base_name}.{self.shot_id}.v001") f"{addon_prefs.shot_builder_action_prefix}{base_name}.{self.shot_id}.v001"
)
new_action.use_fake_user = True new_action.use_fake_user = True
obj.animation_data.action = new_action obj.animation_data.action = new_action
@ -239,11 +283,10 @@ class SHOTBUILDER_OT_NewShotFile(bpy.types.Operator):
# Run User Script # Run User Script
exec(addon_prefs.user_exec_code) exec(addon_prefs.user_exec_code)
self._file_path = shot_builder.build_context.shot.file_path self._file_path = shot_builder.build_context.shot.file_path
self._built_shot = True self._built_shot = True
return {'RUNNING_MODAL'} return {'RUNNING_MODAL'}
def draw(self, context: bpy.types.Context) -> None: def draw(self, context: bpy.types.Context) -> None:
layout = self.layout layout = self.layout
row = layout.row() row = layout.row()

View File

@ -22,7 +22,7 @@ from typing import Optional
import bpy import bpy
from blender_kitsu import gazu import gazu
from blender_kitsu.types import Sequence, Project, Shot, Cache from blender_kitsu.types import Sequence, Project, Shot, Cache
from blender_kitsu.logger import LoggerFactory from blender_kitsu.logger import LoggerFactory

View File

@ -26,8 +26,8 @@ from pathlib import Path
from typing import Dict, List, Set, Optional, Tuple, Any from typing import Dict, List, Set, Optional, Tuple, Any
import datetime import datetime
import bpy import bpy
import gazu
from blender_kitsu import gazu, cache, util, prefs, bkglobals from blender_kitsu import cache, util, prefs, bkglobals
from blender_kitsu.sqe import push, pull, checkstrip, opsdata, checksqe from blender_kitsu.sqe import push, pull, checkstrip, opsdata, checksqe
from blender_kitsu.logger import LoggerFactory from blender_kitsu.logger import LoggerFactory
@ -2407,9 +2407,8 @@ class KITSU_OT_vse_publish_edit_revision(bpy.types.Operator):
sorted_edits = [] sorted_edits = []
active_project = cache.project_active_get() active_project = cache.project_active_get()
for edit in gazu.edit.get_all_edits_with_tasks(): for edit in gazu.edit.all_edits_for_project(active_project.id):
if (edit["project_id"] == active_project.id) and not edit['canceled']: sorted_edits.append(edit)
sorted_edits.append(edit)
return [ return [
( (
@ -2456,7 +2455,8 @@ class KITSU_OT_vse_publish_edit_revision(bpy.types.Operator):
def invoke(self, context, event): def invoke(self, context, event):
# Ensure user has permissions to access edit data # Ensure user has permissions to access edit data
try: try:
edits = gazu.edit.get_all_edits_with_tasks() active_project = cache.project_active_get()
edits = gazu.edit.all_edits_for_project(active_project.id)
except gazu.exception.NotAllowedException: except gazu.exception.NotAllowedException:
self.report( self.report(
{"ERROR"}, "Kitsu User doesn't have permissions to access edit data." {"ERROR"}, "Kitsu User doesn't have permissions to access edit data."
@ -2499,7 +2499,7 @@ class KITSU_OT_vse_publish_edit_revision(bpy.types.Operator):
active_project = cache.project_active_get() active_project = cache.project_active_get()
existing_previews = gazu.edit.get_all_previews_for_edit(self.edit_entry) existing_previews = gazu.edit.all_previews_for_edit(self.edit_entry)
len_previews = get_dict_len(existing_previews) len_previews = get_dict_len(existing_previews)
revision = str(set_revision_int(len_previews)).zfill(3) revision = str(set_revision_int(len_previews)).zfill(3)
@ -2538,7 +2538,7 @@ class KITSU_OT_vse_publish_edit_revision(bpy.types.Operator):
edit_entity_update = set_entity_data( edit_entity_update = set_entity_data(
edit_entry, 'frame_start', self.frame_start edit_entry, 'frame_start', self.frame_start
) )
updated_edit_entity = gazu.entity.update_entity( updated_edit_entity = gazu.edit.update_edit(
edit_entity_update edit_entity_update
) # TODO add a generic function to update entites ) # TODO add a generic function to update entites

View File

@ -22,8 +22,8 @@ from typing import Dict, List, Set, Optional, Tuple, Any
from blender_kitsu import tasks from blender_kitsu import tasks
import bpy import bpy
import gazu
from blender_kitsu import cache, prefs, gazu, util from blender_kitsu import cache, prefs, util
from blender_kitsu.tasks import opsdata from blender_kitsu.tasks import opsdata
from blender_kitsu.logger import LoggerFactory from blender_kitsu.logger import LoggerFactory

View File

@ -23,7 +23,7 @@ import inspect
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List, Optional, Union, Tuple, TypeVar from typing import Any, Dict, List, Optional, Union, Tuple, TypeVar
from blender_kitsu import gazu import gazu
from blender_kitsu.logger import LoggerFactory from blender_kitsu.logger import LoggerFactory
logger = LoggerFactory.getLogger() logger = LoggerFactory.getLogger()
@ -619,7 +619,6 @@ class Asset(Entity):
asset_name: str, asset_name: str,
asset_type: Optional[AssetType] = None, asset_type: Optional[AssetType] = None,
) -> Optional[Asset]: ) -> Optional[Asset]:
# Convert args to dict for api call. # Convert args to dict for api call.
project_dict = asdict(project) project_dict = asdict(project)
asset_type_dict = asdict(asset_type) if asset_type else asset_type asset_type_dict = asdict(asset_type) if asset_type else asset_type
@ -693,12 +692,19 @@ class TaskType(Entity):
@classmethod @classmethod
def all_shot_task_types(cls) -> List[TaskType]: def all_shot_task_types(cls) -> List[TaskType]:
return [cls.from_dict(t) for t in gazu.task.all_task_types() if t["for_entity"] == "Shot"] return [
cls.from_dict(t)
for t in gazu.task.all_task_types()
if t["for_entity"] == "Shot"
]
@classmethod @classmethod
def all_asset_task_types(cls) -> List[TaskType]: def all_asset_task_types(cls) -> List[TaskType]:
return [ return [
cls.from_dict(t) for t in gazu.task.all_task_types() if t["for_entity"] == "Asset"] cls.from_dict(t)
for t in gazu.task.all_task_types()
if t["for_entity"] == "Asset"
]
def __bool__(self) -> bool: def __bool__(self) -> bool:
return bool(self.id) return bool(self.id)
@ -777,7 +783,6 @@ class Task(Entity):
task_type: TaskType, task_type: TaskType,
name: str = "main", name: str = "main",
) -> Optional[Task]: ) -> Optional[Task]:
# Convert args to dict for api call. # Convert args to dict for api call.
asset_shotdict = asdict(asset_shot) asset_shotdict = asdict(asset_shot)
task_type_dict = asdict(task_type) task_type_dict = asdict(task_type)
@ -804,7 +809,6 @@ class Task(Entity):
assigner: Optional[Person] = None, assigner: Optional[Person] = None,
assignees: Optional[List[Person]] = None, assignees: Optional[List[Person]] = None,
) -> Task: ) -> Task:
# Convert args. # Convert args.
assigner = asdict(assigner) if assigner else assigner assigner = asdict(assigner) if assigner else assigner
task_status = asdict(task_status) if task_status else task_status task_status = asdict(task_status) if task_status else task_status
@ -857,7 +861,6 @@ class Task(Entity):
# I think attachements is equal to attachment_files in Comment class. # I think attachements is equal to attachment_files in Comment class.
created_at: Optional[str] = None, created_at: Optional[str] = None,
) -> Comment: ) -> Comment:
# Convert args. # Convert args.
person = asdict(user) if user else user person = asdict(user) if user else user
@ -909,7 +912,6 @@ class TaskStatus(Entity):
@classmethod @classmethod
def by_short_name(cls, short_name: str) -> Optional[TaskStatus]: def by_short_name(cls, short_name: str) -> Optional[TaskStatus]:
# Can return None if task status does not exist. # Can return None if task status does not exist.
task_status_dict = gazu.task.get_task_status_by_short_name(short_name) task_status_dict = gazu.task.get_task_status_by_short_name(short_name)
@ -919,7 +921,6 @@ class TaskStatus(Entity):
@classmethod @classmethod
def by_name(cls, name: str) -> Optional[TaskStatus]: def by_name(cls, name: str) -> Optional[TaskStatus]:
# Can return None if task status does not exist. # Can return None if task status does not exist.
task_status_dict = gazu.task.get_task_status_by_name(name) task_status_dict = gazu.task.get_task_status_by_name(name)

View File

@ -0,0 +1,135 @@
"""External dependencies loader."""
import contextlib
import importlib
from pathlib import Path
import sys
import logging
from types import ModuleType
from typing import Iterator, Iterable
_my_dir = Path(__file__).parent
_log = logging.getLogger(__name__)
def load_wheel(module_name: str, submodules: Iterable[str]) -> list[ModuleType]:
"""Loads modules from a wheel file 'module_name*.whl'.
Loads `module_name`, and if submodules are given, loads
`module_name.submodule` for each of the submodules. This allows loading all
required modules from the same wheel in one session, ensuring that
inter-submodule references are correct.
Returns the loaded modules, so [module, submodule, submodule, ...].
"""
fname_prefix = _fname_prefix_from_module_name(module_name)
wheel = _wheel_filename(fname_prefix)
loaded_modules: list[ModuleType] = []
to_load = [module_name] + [f"{module_name}.{submodule}" for submodule in submodules]
# Load the module from the wheel file. Keep a backup of sys.path so that it
# can be restored later. This should ensure that future import statements
# cannot find this wheel file, increasing the separation of dependencies of
# this add-on from other add-ons.
with _sys_path_mod_backup(wheel):
for modname in to_load:
try:
module = importlib.import_module(modname)
except ImportError as ex:
raise ImportError(
"Unable to load %r from %s: %s" % (modname, wheel, ex)
) from None
assert isinstance(module, ModuleType)
loaded_modules.append(module)
_log.info("Loaded %s from %s", modname, module.__file__)
assert len(loaded_modules) == len(
to_load
), f"expecting to load {len(to_load)} modules, but only have {len(loaded_modules)}: {loaded_modules}"
return loaded_modules
def load_wheel_global(module_name: str, fname_prefix: str = "") -> ModuleType:
"""Loads a wheel from 'fname_prefix*.whl', unless the named module can be imported.
This allows us to use system-installed packages before falling back to the shipped wheels.
This is useful for development, less so for deployment.
If `fname_prefix` is the empty string, it will use the first package from `module_name`.
In other words, `module_name="pkg.subpkg"` will result in `fname_prefix="pkg"`.
"""
if not fname_prefix:
fname_prefix = _fname_prefix_from_module_name(module_name)
try:
module = importlib.import_module(module_name)
except ImportError as ex:
_log.debug("Unable to import %s directly, will try wheel: %s", module_name, ex)
else:
_log.debug(
"Was able to load %s from %s, no need to load wheel %s",
module_name,
module.__file__,
fname_prefix,
)
return module
wheel = _wheel_filename(fname_prefix)
wheel_filepath = str(wheel)
if wheel_filepath not in sys.path:
sys.path.insert(0, wheel_filepath)
try:
module = importlib.import_module(module_name)
except ImportError as ex:
raise ImportError(
"Unable to load %r from %s: %s" % (module_name, wheel, ex)
) from None
_log.debug("Globally loaded %s from %s", module_name, module.__file__)
return module
@contextlib.contextmanager
def _sys_path_mod_backup(wheel_file: Path) -> Iterator[None]:
"""Temporarily inserts a wheel onto sys.path.
When the context exits, it restores sys.path and sys.modules, so that
anything that was imported within the context remains unimportable by other
modules.
"""
old_syspath = sys.path[:]
old_sysmod = sys.modules.copy()
try:
sys.path.insert(0, str(wheel_file))
yield
finally:
# Restore without assigning a new list instance. That way references
# held by other code will stay valid.
sys.path[:] = old_syspath
sys.modules.clear()
sys.modules.update(old_sysmod)
def _wheel_filename(fname_prefix: str) -> Path:
path_pattern = "%s*.whl" % fname_prefix
wheels: list[Path] = list(_my_dir.glob(path_pattern))
if not wheels:
raise RuntimeError("Unable to find wheel at %r" % path_pattern)
# If there are multiple wheels that match, load the last-modified one.
# Alphabetical sorting isn't going to cut it since BAT 1.10 was released.
def modtime(filepath: Path) -> float:
return filepath.stat().st_mtime
wheels.sort(key=modtime)
return wheels[-1]
def _fname_prefix_from_module_name(module_name: str) -> str:
return module_name.split(".", 1)[0]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.