#!/usr/bin/env python3 # ***** BEGIN GPL LICENSE BLOCK ***** # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # ***** END GPL LICENCE BLOCK ***** """ Blender asset manager """ if __name__ != "__main__": raise Exception("must be imported directly") # ------------------ # Ensure module path import os import sys path = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "modules")) if path not in sys.path: sys.path.append(path) del os, sys, path # -------- class bam_config: # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) CONFIG_DIR = ".bam" @staticmethod def find_basedir(cwd=None): """ Return the config path (or None when not found) Actually should raise an error? """ import os if cwd is None: cwd = os.getcwd() parent = (os.path.normpath( os.path.abspath( cwd))) parent_prev = None while parent != parent_prev: test_dir = os.path.join(parent, bam_config.CONFIG_DIR) if os.path.isdir(test_dir): return test_dir parent_prev = parent parent = os.path.dirname(parent) return None @staticmethod def load(id_="config", cwd=None): import os basedir = bam_config.find_basedir(cwd=cwd) filepath = os.path.join(basedir, id_) with open(filepath, 'r') as f: import json return json.load(f) @staticmethod def write(id_, data, cwd=None): import os basedir = bam_config.find_basedir(cwd=cwd) filepath = os.path.join(basedir, id_) with open(filepath, 'w') as f: import json json.dump( data, f, ensure_ascii=False, check_circular=False, # optional (pretty) sort_keys=True, indent=4, separators=(',', ': '), ) class bam_utils: # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) @staticmethod def session_find_url(): return "http://localhost:5000" @staticmethod def session_request_url(req_path): # TODO, get from config project_config = bam_config.load() BAM_SERVER = bam_utils.session_find_url() result = "%s/%s" % (project_config['url'], req_path) return result @staticmethod def init(url, directory_name=None): import os import urllib.parse parsed_url = urllib.parse.urlsplit(url) project_directory_name = os.path.basename(parsed_url.path) if directory_name: project_directory_name = directory_name project_directory_path = os.path.join(os.getcwd(), project_directory_name) # Create the project directory inside the current directory os.mkdir(project_directory_path) # Create the .bam folder bam_folder = os.path.join(project_directory_path, bam_config.CONFIG_DIR) os.mkdir(bam_folder) # Add a config file with project url, username and password bam_config.write( "config", {"url":url, "user":"bam", "password":"bam", "config_version":1 }, cwd=bam_folder) print("Project %s initialized" % project_directory_name) @staticmethod def checkout(paths): import sys import os import requests # Load project configuration project_config = bam_config.load() # TODO(cam) multiple paths path = paths[0] del paths # TODO(cam) we may want to checkout a single file? how to handle this? # we may want to checkout a dir too dst_dir = os.path.basename(path) payload = { "filepath": path, "command": "checkout", } r = requests.get( bam_utils.session_request_url("file"), params=payload, auth=(project_config['user'], project_config['password']), stream=True, ) if r.status_code not in {200, }: # TODO(cam), make into reusable function? print("Error %d:\n%s" % (r.status_code, next(r.iter_content(chunk_size=1024)).decode('utf-8'))) return # TODO(cam) how to tell if we get back a message payload? or real data??? dst_dir_data = payload['filepath'].split('/')[-1] if 1: dst_dir_data += ".zip" with open(dst_dir_data, 'wb') as f: import struct ID_MESSAGE = 1 ID_PAYLOAD = 2 head = r.raw.read(4) if head != b'BAM\0': print("Bad header...") return while True: msg_type, msg_size = struct.unpack(" %r" % (os.path.join(fp.basedir, fp_blend_basename), fp.filepath)) def subcommand_init_cb(args): bam_utils.init(args.url, args.directory_name) def subcommand_checkout_cb(args): bam_utils.checkout(args.paths) def subcommand_commit_cb(args): bam_utils.commit(args.paths or ["."], args.message) def subcommand_update_cb(args): print(args) def subcommand_revert_cb(args): print(args) def subcommand_status_cb(args): print(args) def subcommand_list_cb(args): bam_utils.list_dir(args.paths or ["."]) def subcommand_deps_cb(args): bam_utils.deps(args.paths or ["."], args.recursive) def create_argparse_init(subparsers): subparse = subparsers.add_parser("init") subparse.add_argument( "url", help="Project repository url", ) subparse.add_argument( "directory_name", nargs="?", help="Directory name", ) subparse.set_defaults(func=subcommand_init_cb) def create_argparse_checkout(subparsers): subparse = subparsers.add_parser( "checkout", aliases=("co",), help="", ) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_checkout_cb) def create_argparse_commit(subparsers): subparse = subparsers.add_parser( "commit", aliases=("ci",), help="", ) subparse.add_argument( "-m", "--message", dest="message", metavar='MESSAGE', required=True, help="Commit message", ) subparse.add_argument( "paths", nargs="+", help="paths to commit", ) subparse.set_defaults(func=subcommand_commit_cb) def create_argparse_update(subparsers): subparse = subparsers.add_parser( "update", aliases=("up",), help="", ) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_update_cb) def create_argparse_revert(subparsers): subparse = subparsers.add_parser( "revert", aliases=("rv",), help="", ) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_revert_cb) def create_argparse_status(subparsers): subparse = subparsers.add_parser( "status", aliases=("st",), help="", ) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_status_cb) def create_argparse_list(subparsers): subparse = subparsers.add_parser( "list", aliases=("ls",), help="", ) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_list_cb) def create_argparse_deps(subparsers): subparse = subparsers.add_parser( "deps", aliases=("dp",), help="", ) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.add_argument( "-r", "--recursive", dest="recursive", action='store_true', help="Scan dependencies recursively", ) subparse.set_defaults(func=subcommand_deps_cb) def create_argparse(): import os import argparse usage_text = ( "BAM! (Blender Asset Manager)\n" + __doc__ ) parser = argparse.ArgumentParser(description=usage_text) subparsers = parser.add_subparsers( title='subcommands', description='valid subcommands', help='additional help') create_argparse_init(subparsers) create_argparse_checkout(subparsers) create_argparse_commit(subparsers) create_argparse_update(subparsers) create_argparse_revert(subparsers) create_argparse_status(subparsers) create_argparse_list(subparsers) create_argparse_deps(subparsers) return parser def main(): import sys parser = create_argparse() args = parser.parse_args(sys.argv[1:]) # call subparser callback if not hasattr(args, "func"): parser.print_help() return args.func(args) if __name__ == "__main__": main()