#!/usr/bin/env python3 # ***** BEGIN GPL LICENSE BLOCK ***** # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # ***** END GPL LICENCE BLOCK ***** """ Blender asset manager """ # TODO # * checkout command (store some handy json info!) # * commit command # ** staging for svn commit # ** svn commit # ** handle errors # * definition of project (.bam (like .git)) ... json class bam_config: # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) CONFIG_DIR = ".bam" @staticmethod def find_basedir(): """ Return the config path (or None when not found) """ import os parent = (os.path.normpath( os.path.abspath( os.getcwd()))) parent_prev = None while parent != parent_prev: test_dir = os.path.join(parent, bam_config.CONFIG_DIR) if os.path.isdir(test_dir): return test_dir parent_prev = parent parent = os.path.dirname(parent) return None @staticmethod def load(id_): # bam_config.find_basedir() # data = pass @staticmethod def write(id_, data): pass class bam_utils: # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) # Copied from 'packer.py', TODO(cam) de-duplicate @staticmethod def sha1_for_file(fn, block_size=1 << 20): with open(fn, 'rb') as f: import hashlib sha1 = hashlib.new('sha1') while True: data = f.read(block_size) if not data: break sha1.update(data) return sha1.hexdigest() # end code duplication # -------------------- @staticmethod def session_find_url(): return "http://localhost:5000" @staticmethod def session_request_url(req_path): # TODO, get from config BAM_SERVER = bam_utils.session_find_url() result = "%s/%s" % (BAM_SERVER, req_path) return result @staticmethod def checkout(paths): import sys import requests # TODO(cam) multiple paths path = paths[0] del paths payload = { "filepath": path, "command": "checkout", } r = requests.get( bam_utils.session_request_url("file"), params=payload, auth=("bam", "bam"), stream=True, ) if r.status_code not in {200, }: # TODO(cam), make into reusable function? print("Error %d:\n%s" % (r.status_code, next(r.iter_content(chunk_size=1024)).decode('utf-8'))) return # TODO(cam) how to tell if we get back a message payload? or real data??? local_filename = payload['filepath'].split('/')[-1] if 1: local_filename += ".zip" with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks f.write(chunk) f.flush() sys.stdout.write(".") sys.stdout.flush() print("Written:", local_filename) @staticmethod def commit(paths, message): import sys import os import requests # TODO(cam) ignore files # TODO(cam) multiple paths path = paths[0] if not os.path.isdir(path): print("Expected a directory (%r)" % path) sys.exit(1) # make a zipfile from session import json with open(os.path.join(path, ".bam_paths_uuid.json")) as f: d = json.load(f) paths_modified = [] for fn, sha1 in d.items(): fn_abs = os.path.join(path, fn) if bam_utils.sha1_for_file(fn_abs) != sha1: paths_modified.append((fn, fn_abs)) print(d) print(paths_modified) # ------------------------- print("Now make a zipfile") import zipfile temp_zip = os.path.join(path, ".bam_tmp.zip") with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zip: for (fn, fn_abs) in paths_modified: print(" Archiving %r" % fn_abs) zip.write(fn_abs, arcname=fn) # -------------- # Commit Request args = { 'message': message, } payload = { 'command': 'commit', 'arguments': json.dumps(args), } files = { 'file': open(temp_zip, 'rb'), } r = requests.put( bam_utils.session_request_url("file"), params=payload, auth=('bam', 'bam'), files=files) print(r.text) files['file'].close() os.remove(temp_zip) @staticmethod def list_dir(paths): import sys import requests # print(bam_config.find_config()) path = paths[0] del paths # TODO(cam) multiple paths payload = { "path": path, } r = requests.get( bam_utils.session_request_url("file_list"), params=payload, auth=("bam", "bam"), stream=True, ) items = r.json().get("items_list", ()) for (name_short, name_full, file_type) in items: if file_type == "dir": print(" %s/" % name_short) for (name_short, name_full, file_type) in items: if file_type != "dir": print(" %s" % name_short) def subcommand_checkout_cb(args): bam_utils.checkout(args.paths) def subcommand_commit_cb(args): bam_utils.commit(args.paths) def subcommand_update_cb(args): print(args) def subcommand_revert_cb(args): print(args) def subcommand_list_cb(args): bam_utils.list_dir(args.paths) def subcommand_status_cb(args): print(args) def create_argparse_checkout(subparsers): subparse = subparsers.add_parser("checkout", aliases=("co",)) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_checkout_cb) def create_argparse_commit(subparsers): subparse = subparsers.add_parser("commit", aliases=("ci",)) subparse.add_argument( "-m", "--message", dest="message", metavar='MESSAGE', help="Commit message", ) subparse.add_argument( "paths", nargs="+", help="paths to commit", ) subparse.set_defaults(func=subcommand_commit_cb) def create_argparse_update(subparsers): subparse = subparsers.add_parser("update", aliases=("up",)) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_update_cb) def create_argparse_revert(subparsers): subparse = subparsers.add_parser("update", aliases=("up",)) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_revert_cb) def create_argparse_status(subparsers): subparse = subparsers.add_parser("status", aliases=("st",)) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_status_cb) def create_argparse_list(subparsers): subparse = subparsers.add_parser("list", aliases=("ls",)) subparse.add_argument( "paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults(func=subcommand_list_cb) def create_argparse(): import os import argparse usage_text = ( "BAM! (Blender Asset Manager)\n" + __doc__ ) parser = argparse.ArgumentParser(description=usage_text) subparsers = parser.add_subparsers( title='subcommands', description='valid subcommands', help='additional help') create_argparse_checkout(subparsers) create_argparse_commit(subparsers) create_argparse_update(subparsers) create_argparse_revert(subparsers) create_argparse_status(subparsers) create_argparse_list(subparsers) return parser def main(): import sys parser = create_argparse() args = parser.parse_args(sys.argv[1:]) # call subparser callback if not hasattr(args, "func"): parser.print_help() return args.func(args) if __name__ == "__main__": main()