#!/usr/bin/env python3 # ***** BEGIN GPL LICENSE BLOCK ***** # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # ***** END GPL LICENCE BLOCK ***** """ This is the entry point for command line access. """ import os import sys import json # ------------------ # Ensure module path path = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "modules")) if path not in sys.path: sys.path.append(path) del path # -------- import logging log = logging.getLogger("bam_cli") def fatal(msg): if __name__ == "__main__": sys.stderr.write("fatal: ") sys.stderr.write(msg) sys.stderr.write("\n") sys.exit(1) else: raise RuntimeError(msg) class bam_config: # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) CONFIG_DIR = ".bam" # can infact be any file in the session SESSION_FILE = ".bam_paths_remap.json" @staticmethod def find_basedir(cwd=None, path_suffix=None, abort=False, test_subpath=CONFIG_DIR, descr=""): """ Return the config path (or None when not found) Actually should raise an error? """ if cwd is None: cwd = os.getcwd() parent = (os.path.normpath( os.path.abspath( cwd))) parent_prev = None while parent != parent_prev: test_dir = os.path.join(parent, test_subpath) if os.path.exists(test_dir): if path_suffix is not None: test_dir = os.path.join(test_dir, path_suffix) return test_dir parent_prev = parent parent = os.path.dirname(parent) if abort is True: fatal("Not a %s (or any of the parent directories): %s" % (descr, test_subpath)) return None @staticmethod def find_rootdir(cwd=None, path_suffix=None, abort=False, test_subpath=CONFIG_DIR, descr=""): """ find_basedir(), without '.bam' suffix """ path = bam_config.find_basedir( cwd=cwd, path_suffix=path_suffix, abort=abort, test_subpath=test_subpath, ) return path[:-(len(test_subpath) + 1)] def find_sessiondir(cwd=None, abort=False): """ from: my_project/my_session/some/subdir to: my_project/my_session where: my_project/.bam/ (is the basedir) """ session_rootdir = bam_config.find_basedir( cwd=cwd, test_subpath=bam_config.SESSION_FILE, abort=abort, descr="bam session" ) if session_rootdir is not None: return session_rootdir[:-len(bam_config.SESSION_FILE)] else: if abort: if not os.path.isdir(session_rootdir): fatal("Expected a directory (%r)" % session_rootdir) return None @staticmethod def load(id_="config", cwd=None, abort=False): filepath = bam_config.find_basedir( cwd=cwd, path_suffix=id_, descr="bam repository", ) if abort is True: if filepath is None: fatal("Not a bam repository (or any of the parent directories): .bam") with open(filepath, 'r') as f: return json.load(f) @staticmethod def write(id_="config", data=None, cwd=None): filepath = bam_config.find_basedir( cwd=cwd, path_suffix=id_, descr="bam repository", ) from bam.utils.system import write_json_to_file write_json_to_file(filepath, data) @staticmethod def write_bamignore(cwd=None): path = bam_config.find_rootdir(cwd=cwd) if path: filepath = os.path.join(path, ".bamignore") with open(filepath, 'w') as f: f.write(r".*\.blend\d+$") @staticmethod def create_bamignore_filter(id_=".bamignore", cwd=None): path = bam_config.find_rootdir() bamignore = os.path.join(path, id_) if os.path.isfile(bamignore): with open(bamignore, 'r', encoding='utf-8') as f: compiled_patterns = [] import re for i, l in enumerate(f): l = l.rstrip() if l: try: p = re.compile(l) except re.error as e: fatal("%s:%d file contains an invalid regular expression, %s" % (bamignore, i + 1, str(e))) compiled_patterns.append(p) if compiled_patterns: def filter_ignore(f): for pattern in filter_ignore.compiled_patterns: if re.match(pattern, f): return False return True filter_ignore.compiled_patterns = compiled_patterns return filter_ignore return None class bam_session: # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) def session_path_to_cache( path, cachedir=None, session_rootdir=None, paths_remap_relbase=None, abort=True): """ Given an absolute path, give us the cache-path on disk. """ if session_rootdir is None: session_rootdir = bam_config.find_sessiondir(path, abort=abort) if paths_remap_relbase is None: with open(os.path.join(session_rootdir, ".bam_paths_remap.json")) as fp: paths_remap = json.load(fp) paths_remap_relbase = paths_remap.get(".", "") del fp, paths_remap cachedir = os.path.join(bam_config.find_rootdir(cwd=session_rootdir, abort=True), ".cache") path_rel = os.path.relpath(path, session_rootdir) if path_rel[0] == "_": path_cache = os.path.join(cachedir, path_rel[1:]) else: path_cache = os.path.join(cachedir, paths_remap_relbase, path_rel) path_cache = os.path.normpath(path_cache) return path_cache @staticmethod def request_url(req_path): cfg = bam_config.load() result = "%s/%s" % (cfg['url'], req_path) return result @staticmethod def status(session_rootdir, paths_uuid_update=None): paths_add = {} paths_remove = {} paths_modified = {} from bam.utils.system import uuid_from_file session_rootdir = os.path.abspath(session_rootdir) # don't commit metadata paths_used = { os.path.join(session_rootdir, ".bam_paths_uuid.json"), os.path.join(session_rootdir, ".bam_paths_remap.json"), os.path.join(session_rootdir, ".bam_deps_remap.json"), os.path.join(session_rootdir, ".bam_paths_edit.data"), os.path.join(session_rootdir, ".bam_tmp.zip"), } paths_uuid = bam_session.load_paths_uuid(session_rootdir) for f_rel, sha1 in paths_uuid.items(): f_abs = os.path.join(session_rootdir, f_rel) if os.path.exists(f_abs): sha1_modified = uuid_from_file(f_abs) if sha1_modified != sha1: paths_modified[f_rel] = f_abs if paths_uuid_update is not None: paths_uuid_update[f_rel] = sha1_modified paths_used.add(f_abs) else: paths_remove[f_rel] = f_abs # ---- # find new files def iter_files(path, filename_check=None): for dirpath, dirnames, filenames in os.walk(path): # skip '.svn' if dirpath.startswith(".") and dirpath != ".": continue for filename in filenames: filepath = os.path.join(dirpath, filename) if filename_check is None or filename_check(filepath): yield filepath bamignore_filter = bam_config.create_bamignore_filter() for f_abs in iter_files(session_rootdir, bamignore_filter): if f_abs not in paths_used: # we should be clever - add the file to a useful location based on some rules # (category, filetype & tags?) f_rel = os.path.relpath(f_abs, session_rootdir) paths_add[f_rel] = f_abs if paths_uuid_update is not None: paths_uuid_update[f_rel] = uuid_from_file(f_abs) return paths_add, paths_remove, paths_modified @staticmethod def load_paths_uuid(session_rootdir): with open(os.path.join(session_rootdir, ".bam_paths_uuid.json")) as f: return json.load(f) @staticmethod def is_dirty(session_rootdir): paths_add, paths_remove, paths_modified = bam_session.status(session_rootdir) return any((paths_add, paths_modified, paths_remove)) @staticmethod def binary_edits_apply_single( blendfile_abs, # str blendfile, # bytes binary_edits, session_rootdir, paths_uuid_update=None, ): sys.stdout.write(" operating on: %r\n" % blendfile_abs) sys.stdout.flush() # we don't want to read, just edit whats there. with open(blendfile_abs, 'rb+') as fh_blend: for ofs, data in binary_edits: # sys.stdout.write("\n%r\n" % data) sys.stdout.flush() # ensure we're writing to the correct location. # fh_blend.seek(ofs) # sys.stdout.write(repr(b'existing data: ' + fh_blend.read(len(data) + 1))) fh_blend.seek(ofs) fh_blend.write(data) sys.stdout.write("\n") sys.stdout.flush() if paths_uuid_update is not None: # update hash! # we could do later, but the file is fresh in cache, so do now from bam.utils.system import uuid_from_file f_rel = os.path.relpath(blendfile_abs, session_rootdir) paths_uuid_update[f_rel] = uuid_from_file(blendfile_abs) del uuid_from_file @staticmethod def binary_edits_apply_all( session_rootdir, # collection of local paths or None (to apply all binary edits) paths=None, update_uuid=False, ): # sanity check if paths is not None: for path in paths: assert(type(path) is bytes) assert(not os.path.isabs(path)) assert(os.path.exists(os.path.join(session_rootdir, path.decode('utf-8')))) with open(os.path.join(session_rootdir, ".bam_paths_remap.json")) as fp: paths_remap = json.load(fp) paths_remap_relbase = paths_remap.get(".", "") paths_remap_reverse = {v: k for k, v in paths_remap.items()} del paths_remap with open(os.path.join(session_rootdir, ".bam_paths_edit.data"), 'rb') as fh: import pickle binary_edits_all = pickle.load(fh) paths_uuid_update = {} if update_uuid else None for blendfile, binary_edits in binary_edits_all.items(): if binary_edits: if paths is not None and blendfile not in paths: continue # get the absolute path as it is in the main repo # then remap back to our local checkout blendfile_abs_remote = os.path.normpath(os.path.join(paths_remap_relbase, blendfile.decode('utf-8'))) blendfile_abs = os.path.join(session_rootdir, paths_remap_reverse[blendfile_abs_remote]) bam_session.binary_edits_apply_single( blendfile_abs, blendfile, binary_edits, session_rootdir, paths_uuid_update, ) del pickle del binary_edits_all if update_uuid and paths_uuid_update: # freshen the UUID's based on the replayed binary_edits from bam.utils.system import write_json_to_file paths_uuid = bam_session.load_paths_uuid(session_rootdir) paths_uuid.update(paths_uuid_update) write_json_to_file(os.path.join(session_rootdir, ".bam_paths_uuid.json"), paths_uuid) del write_json_to_file del paths_uuid @staticmethod def binary_edits_update_single( blendfile_abs, binary_edits, # callback, takes a filepath remap_filepath_cb, ): """ After committing a blend file, we need to re-create the binary edits. """ from bam.blend import blendfile_path_walker for fp, (rootdir, fp_blend_basename) in blendfile_path_walker.FilePath.visit_from_blend( blendfile_abs, readonly=True, recursive=False, ): f_rel_orig = fp.filepath f_rel = remap_filepath_cb(f_rel_orig) fp.filepath_assign_edits(f_rel, binary_edits) class bam_commands: """ Sub-commands from the command-line map directly to these methods. """ # fake module __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) @staticmethod def init(url, directory_name=None): import urllib.parse if "@" in url: # first & last :) username, url = url.rpartition('@')[0::2] else: import getpass username = getpass.getuser() print("Using username:", username) del getpass parsed_url = urllib.parse.urlsplit(url) proj_dirname = os.path.basename(parsed_url.path) if directory_name: proj_dirname = directory_name proj_dirname_abs = os.path.join(os.getcwd(), proj_dirname) if os.path.exists(proj_dirname_abs): fatal("Cannot create project %r already exists" % proj_dirname_abs) # Create the project directory inside the current directory os.mkdir(proj_dirname_abs) # Create the .bam directory bam_basedir = os.path.join(proj_dirname_abs, bam_config.CONFIG_DIR) os.mkdir(bam_basedir) # Add a config file with project url, username and password bam_config.write( data={ "url": url, "user": username, "password": "", "config_version": 1 }, cwd=proj_dirname_abs) # Create the default .bamignore # TODO (fsiddi) get this data from the project config on the server bam_config.write_bamignore(cwd=proj_dirname_abs) print("Project %r initialized" % proj_dirname) @staticmethod def create(session_name): rootdir = bam_config.find_rootdir(abort=True) session_rootdir = os.path.join(rootdir, session_name) if os.path.exists(session_rootdir): fatal("session path exists %r" % session_rootdir) if rootdir != bam_config.find_rootdir(cwd=session_rootdir): fatal("session is located outside %r" % rootdir) def write_empty(f, data): with open(os.path.join(session_rootdir, f), 'wb') as f: f.write(data) os.makedirs(session_rootdir) write_empty(".bam_paths_uuid.json", b'{}') write_empty(".bam_paths_remap.json", b'{}') write_empty(".bam_deps_remap.json", b'{}') print("Session %r created" % session_name) @staticmethod def checkout( path, output_dir=None, session_rootdir_partial=None, all_deps=False, ): # --------- # constants CHUNK_SIZE = 1024 cfg = bam_config.load(abort=True) if output_dir is None: # fallback to the basename session_rootdir = os.path.splitext(os.path.basename(path))[0] else: output_dir = os.path.realpath(output_dir) if os.sep in output_dir.rstrip(os.sep): # are we a subdirectory? # (we know this exists, since we have config already) project_rootdir = bam_config.find_rootdir(abort=True) if ".." in os.path.relpath(output_dir, project_rootdir).split(os.sep): fatal("Output %r is outside the project path %r" % (output_dir, project_rootdir)) del project_rootdir session_rootdir = output_dir del output_dir if bam_config.find_sessiondir(cwd=session_rootdir): fatal("Can't checkout in existing session. Use update.") payload = { "filepath": path, "command": "checkout", "arguments": json.dumps({ "all_deps": all_deps, }), } # -------------------------------------------------------------------- # First request we simply get a list of files to download # import requests r = requests.get( bam_session.request_url("file"), params=payload, auth=(cfg['user'], cfg['password']), stream=True, ) if r.status_code not in {200, }: # TODO(cam), make into reusable function? print("Error %d:\n%s" % (r.status_code, next(r.iter_content(chunk_size=1024)).decode('utf-8'))) return # TODO(cam) how to tell if we get back a message payload? or real data??? dst_dir_data = payload['filepath'].split('/')[-1] if 1: dst_dir_data += ".zip" with open(dst_dir_data, 'wb') as f: import struct ID_MESSAGE = 1 ID_PAYLOAD = 2 head = r.raw.read(4) if head != b'BAM\0': fatal("bad header from server") while True: msg_type, msg_size = struct.unpack("= chunk_size: size -= chunk_size yield r.raw.read(chunk_size) if size: yield r.raw.read(size) import struct ID_MESSAGE = 1 ID_PAYLOAD = 2 ID_PAYLOAD_APPEND = 3 ID_PAYLOAD_EMPTY = 4 ID_DONE = 5 head = r.raw.read(4) if head != b'BAM\0': fatal("bad header from server") file_index = 0 is_header_read = True while True: if is_header_read: msg_type, msg_size = struct.unpack("")) except Exception: print(r.text) # TODO, handle error cases ok = True if ok: # ---------- # paths_uuid paths_uuid.update(paths_uuid_update) write_json_to_file(os.path.join(session_rootdir, ".bam_paths_uuid.json"), paths_uuid_update) # ----------- # paths_remap paths_remap.update(paths_remap_subset) for k in paths_remove: del paths_remap[k] write_json_to_file(os.path.join(session_rootdir, ".bam_paths_remap.json"), paths_remap) del write_json_to_file # ------------------ # Update Local Cache # # We now have 'pristine' files in basedir_temp, the commit went fine. # So move these into local cache AND we have to remake the binary_edit data. # since files were modified, if we don't do this - we wont be able to revert or avoid # re-downloading the files later. binary_edits_all_update = {} binary_edits_all_remove = set() for paths_dict, op in ((paths_modified, 'M'), (paths_add, 'A')): for f_rel, f_abs in paths_dict.items(): print(" caching (%s): %r" % (op, f_abs)) f_dst_abs = os.path.join(cachedir, f_rel) os.makedirs(os.path.dirname(f_dst_abs), exist_ok=True) if f_abs.startswith(basedir_temp): os.rename(f_abs, f_dst_abs) else: import shutil shutil.copyfile(f_abs, f_dst_abs) del shutil binary_edits = binary_edits_all_update[f_rel.encode('utf-8')] = [] # update binary_edits if f_rel.endswith(".blend"): bam_session.binary_edits_update_single( f_dst_abs, binary_edits, remap_filepath_cb=remap_filepath_bytes, ) for f_rel, f_abs in paths_remove.items(): binary_edits_all_remove.add(f_rel) paths_edit_abs = os.path.join(session_rootdir, ".bam_paths_edit.data") if binary_edits_all_update or binary_edits_all_remove: if os.path.exists(paths_edit_abs): with open(paths_edit_abs, 'rb') as fh: import pickle binary_edits_all = pickle.load(fh) del pickle else: binary_edits_all = {} if binary_edits_all_remove and binary_edits_all: for f_rel in binary_edits_all_remove: if f_rel in binary_edits_all: try: del binary_edits_all[f_rel] except KeyError: pass if binary_edits_all_update: binary_edits_all.update(binary_edits_all_update) import pickle with open(paths_edit_abs, 'wb') as fh: print() pickle.dump(binary_edits_all, fh, pickle.HIGHEST_PROTOCOL) del binary_edits_all del paths_edit_abs del pickle # ------------------------------ # Cleanup temp dir to finish off if os.path.exists(basedir_temp): import shutil shutil.rmtree(basedir_temp) del shutil @staticmethod def status(paths, use_json=False): # TODO(cam) multiple paths path = paths[0] del paths session_rootdir = bam_config.find_sessiondir(path, abort=True) paths_add, paths_remove, paths_modified = bam_session.status(session_rootdir) if not use_json: for f in sorted(paths_add): print(" A: %s" % f) for f in sorted(paths_modified): print(" M: %s" % f) for f in sorted(paths_remove): print(" D: %s" % f) else: ret = [] for f in sorted(paths_add): ret.append(("A", f)) for f in sorted(paths_modified): ret.append(("M", f)) for f in sorted(paths_remove): ret.append(("D", f)) print(json.dumps(ret)) @staticmethod def list_dir(paths, use_full=False, use_json=False): import requests # Load project configuration cfg = bam_config.load(abort=True) # TODO(cam) multiple paths path = paths[0] del paths payload = { "path": path, } r = requests.get( bam_session.request_url("file_list"), params=payload, auth=(cfg['user'], cfg['password']), stream=True, ) r_json = r.json() items = r_json.get("items_list") if items is None: fatal(r_json.get("message", "")) items.sort() if use_json: ret = [] for (name_short, name_full, file_type) in items: ret.append((name_short, file_type)) print(json.dumps(ret)) else: def strip_dot_slash(f): return f[2:] if f.startswith("./") else f for (name_short, name_full, file_type) in items: if file_type == "dir": print(" %s/" % (strip_dot_slash(name_full) if use_full else name_short)) for (name_short, name_full, file_type) in items: if file_type != "dir": print(" %s" % (strip_dot_slash(name_full) if use_full else name_short)) @staticmethod def deps(paths, recursive=False, use_json=False): def deps_path_walker(): from bam.blend import blendfile_path_walker for blendfile_src in paths: blendfile_src = blendfile_src.encode('utf-8') yield from blendfile_path_walker.FilePath.visit_from_blend( blendfile_src, readonly=True, recursive=recursive, ) def status_walker(): for fp, (rootdir, fp_blend_basename) in deps_path_walker(): f_rel = fp.filepath f_abs = fp.filepath_absolute yield ( # blendfile-src os.path.join(fp.basedir, fp_blend_basename).decode('utf-8'), # fillepath-dst f_rel.decode('utf-8'), f_abs.decode('utf-8'), # filepath-status "OK" if os.path.exists(f_abs) else "MISSING FILE", ) if use_json: is_first = True # print in parts, so we don't block the output print("[") for f_src, f_dst, f_dst_abs, f_status in status_walker(): if is_first: is_first = False else: print(",") print(json.dumps((f_src, f_dst, f_dst_abs, f_status)), end="") print("]") else: for f_src, f_dst, f_dst_abs, f_status in status_walker(): print(" %r -> (%r = %r) %s" % (f_src, f_dst, f_dst_abs, f_status)) @staticmethod def pack( paths, output, mode, repository_base_path=None, all_deps=False, use_quiet=False, warn_remap_externals=False, compress_level=-1, filename_filter=None, ): # Local packing (don't use any project/session stuff) from .blend import blendfile_pack # TODO(cam) multiple paths path = paths[0] del paths if output is None: fatal("Output path must be given when packing with: --mode=FILE") if os.path.isdir(output): if mode == "ZIP": output = os.path.join(output, os.path.splitext(path)[0] + ".zip") else: # FILE output = os.path.join(output, os.path.basename(path)) if use_quiet: report = lambda msg: None else: report = lambda msg: print(msg, end="") if repository_base_path is not None: repository_base_path = repository_base_path.encode('utf-8') # replace var with a pattern matching callback filename_filter_cb = blendfile_pack.exclusion_filter(filename_filter) for msg in blendfile_pack.pack( path.encode('utf-8'), output.encode('utf-8'), mode=mode, all_deps=all_deps, repository_base_path=repository_base_path, compress_level=compress_level, report=report, warn_remap_externals=warn_remap_externals, use_variations=True, filename_filter=filename_filter_cb, ): pass @staticmethod def copy( paths, output, base, all_deps=False, use_quiet=False, filename_filter=None, ): # Local packing (don't use any project/session stuff) from .blend import blendfile_copy from bam.utils.system import is_subdir paths = [os.path.abspath(path) for path in paths] base = os.path.abspath(base) output = os.path.abspath(output) # check all blends are in the base path for path in paths: if not is_subdir(path, base): fatal("Input blend file %r is not a sub directory of %r" % (path, base)) if use_quiet: report = lambda msg: None else: report = lambda msg: print(msg, end="") # replace var with a pattern matching callback if filename_filter: # convert string into regex callback # "*.txt;*.png;*.rst" --> r".*\.txt$|.*\.png$|.*\.rst$" import re import fnmatch compiled_pattern = re.compile( b'|'.join(fnmatch.translate(f).encode('utf-8') for f in filename_filter.split(";") if f), re.IGNORECASE, ) def filename_filter(f): return (not filename_filter.compiled_pattern.match(f)) filename_filter.compiled_pattern = compiled_pattern del compiled_pattern del re, fnmatch for msg in blendfile_copy.copy_paths( [path.encode('utf-8') for path in paths], output.encode('utf-8'), base.encode('utf-8'), all_deps=all_deps, report=report, filename_filter=filename_filter, ): pass @staticmethod def remap_start( paths, use_json=False, ): filepath_remap = "bam_remap.data" for p in paths: if not os.path.exists(p): fatal("Path %r not found!" % p) paths = [p.encode('utf-8') for p in paths] if os.path.exists(filepath_remap): fatal("Remap in progress, run with 'finish' or remove %r" % filepath_remap) from bam.blend import blendfile_path_remap remap_data = blendfile_path_remap.start( paths, use_json=use_json, ) with open(filepath_remap, 'wb') as fh: import pickle pickle.dump(remap_data, fh, pickle.HIGHEST_PROTOCOL) del pickle @staticmethod def remap_finish( paths, force_relative=False, dry_run=False, use_json=False, ): filepath_remap = "bam_remap.data" for p in paths: if not os.path.exists(p): fatal("Path %r not found!" % p) # bytes needed for blendfile_path_remap API paths = [p.encode('utf-8') for p in paths] if not os.path.exists(filepath_remap): fatal("Remap not started, run with 'start', (%r not found)" % filepath_remap) with open(filepath_remap, 'rb') as fh: import pickle remap_data = pickle.load(fh) del pickle from bam.blend import blendfile_path_remap blendfile_path_remap.finish( paths, remap_data, force_relative=force_relative, dry_run=dry_run, use_json=use_json, ) if not dry_run: os.remove(filepath_remap) @staticmethod def remap_reset( use_json=False, ): filepath_remap = "bam_remap.data" if os.path.exists(filepath_remap): os.remove(filepath_remap) else: fatal("remapping not started, nothing to do!") # ----------------------------------------------------------------------------- # Argument Parser def init_argparse_common( subparse, use_json=False, use_all_deps=False, use_quiet=False, use_compress_level=False, use_exclude=False, ): import argparse if use_json: subparse.add_argument( "-j", "--json", dest="json", action='store_true', help="Generate JSON output", ) if use_all_deps: subparse.add_argument( "-a", "--all-deps", dest="all_deps", action='store_true', help="Follow all dependencies (unused indirect dependencies too)", ) if use_quiet: subparse.add_argument( "-q", "--quiet", dest="use_quiet", action='store_true', help="Suppress status output", ) if use_compress_level: class ChoiceToZlibLevel(argparse.Action): def __call__(self, parser, namespace, value, option_string=None): setattr(namespace, self.dest, {"default": -1, "fast": 1, "best": 9, "store": 0}[value[0]]) subparse.add_argument( "-c", "--compress", dest="compress_level", nargs=1, default=-1, metavar='LEVEL', action=ChoiceToZlibLevel, choices=('default', 'fast', 'best', 'store'), help="Compression level for resulting archive", ) if use_exclude: subparse.add_argument( "-e", "--exclude", dest="exclude", metavar='PATTERN(S)', required=False, default="", help=""" Optionally exclude files from the pack. Using Unix shell-style wildcards *(case insensitive)*. ``--exclude="*.png"`` Multiple patterns can be passed using the ``;`` separator. ``--exclude="*.txt;*.avi;*.wav"`` """ ) def create_argparse_init(subparsers): subparse = subparsers.add_parser("init", help="Initialize a new project directory") subparse.add_argument( dest="url", help="Project repository url", ) subparse.add_argument( dest="directory_name", nargs="?", help="Directory name", ) subparse.set_defaults( func=lambda args: bam_commands.init(args.url, args.directory_name), ) def create_argparse_create(subparsers): subparse = subparsers.add_parser( "create", aliases=("cr",), help="Create a new empty session directory", ) subparse.add_argument( dest="session_name", nargs=1, help="Name of session directory", ) subparse.set_defaults( func=lambda args: bam_commands.create(args.session_name[0]), ) def create_argparse_checkout(subparsers): subparse = subparsers.add_parser( "checkout", aliases=("co",), help="Checkout a remote path in an existing project", ) subparse.add_argument( dest="path", type=str, metavar='REMOTE_PATH', help="Path to checkout on the server", ) subparse.add_argument( "-o", "--output", dest="output", type=str, metavar='DIRNAME', help="Local name to checkout the session into (optional, falls back to path name)", ) init_argparse_common(subparse, use_all_deps=True) subparse.set_defaults( func=lambda args: bam_commands.checkout(args.path, args.output, args.all_deps), ) def create_argparse_update(subparsers): subparse = subparsers.add_parser( "update", aliases=("up",), help="Update a local session with changes from the remote project", ) subparse.add_argument( dest="paths", nargs="*", help="Path(s) to operate on", ) subparse.set_defaults( func=lambda args: bam_commands.update(args.paths or ["."]), ) def create_argparse_revert(subparsers): subparse = subparsers.add_parser( "revert", aliases=("rv",), help="Reset local changes back to the state at time of checkout", ) subparse.add_argument( dest="paths", nargs="+", help="Path(s) to operate on", ) subparse.set_defaults( func=lambda args: bam_commands.revert(args.paths or ["."]), ) def create_argparse_commit(subparsers): subparse = subparsers.add_parser( "commit", aliases=("ci",), help="Commit changes from a session to the remote project", ) subparse.add_argument( "-m", "--message", dest="message", metavar='MESSAGE', required=True, help="Commit message", ) subparse.add_argument( dest="paths", nargs="*", help="paths to commit", ) subparse.set_defaults( func=lambda args: bam_commands.commit(args.paths or ["."], args.message), ) def create_argparse_status(subparsers): subparse = subparsers.add_parser( "status", aliases=("st",), help="Show any edits made in the local session", ) subparse.add_argument( dest="paths", nargs="*", help="Path(s) to operate on", ) init_argparse_common(subparse, use_json=True) subparse.set_defaults( func=lambda args: bam_commands.status(args.paths or ["."], use_json=args.json), ) def create_argparse_list(subparsers): subparse = subparsers.add_parser( "list", aliases=("ls",), help="List the contents of a remote directory", ) subparse.add_argument( dest="paths", nargs="*", help="Path(s) to operate on", ) subparse.add_argument( "-f", "--full", dest="full", action='store_true', help="Show the full paths", ) init_argparse_common(subparse, use_json=True) subparse.set_defaults( func=lambda args: bam_commands.list_dir( args.paths or ["."], use_full=args.full, use_json=args.json, ), ) def create_argparse_deps(subparsers): subparse = subparsers.add_parser( "deps", aliases=("dp",), help="List dependencies for file(s)", ) subparse.add_argument( dest="paths", nargs="+", help="Path(s) to operate on", ) subparse.add_argument( "-r", "--recursive", dest="recursive", action='store_true', help="Scan dependencies recursively", ) init_argparse_common(subparse, use_json=True) subparse.set_defaults( func=lambda args: bam_commands.deps( args.paths, args.recursive, use_json=args.json), ) def create_argparse_pack(subparsers): import argparse subparse = subparsers.add_parser( "pack", aliases=("pk",), help="Pack a blend file and its dependencies into an archive", description= """ You can simply pack a blend file like this to create a zip-file of the same name. .. code-block:: sh bam pack /path/to/scene.blend You may also want to give an explicit output directory. This command is used for packing a ``.blend`` file into a ``.zip`` file for redistribution. .. code-block:: sh # pack a blend with maximum compression for online downloads bam pack /path/to/scene.blend --output my_scene.zip --compress=best You may also pack a .blend while keeping your whole repository hierarchy by passing the path to the top directory of the repository, and ask to be warned about dependencies paths outside of that base path: .. code-block:: sh bam pack --repo="/path/to/repo" --warn-external /path/to/repo/path/to/scene.blend """, formatter_class=argparse.RawDescriptionHelpFormatter, ) subparse.add_argument( dest="paths", nargs="+", help="Path(s) to operate on", ) subparse.add_argument( "-o", "--output", dest="output", metavar='FILE', required=False, help="Output file or a directory when multiple inputs are passed", ) subparse.add_argument( "-m", "--mode", dest="mode", metavar='MODE', required=False, default='ZIP', choices=('ZIP', 'FILE'), help="Output file or a directory when multiple inputs are passed", ) subparse.add_argument( "--repo", dest="repository_base_path", metavar='DIR', required=False, help="Base directory from which you want to keep existing hierarchy (usually to repository directory)," "will default to packed blend file's directory if not specified", ) subparse.add_argument( "--warn-external", dest="warn_remap_externals", action='store_true', help="Warn for every dependency outside of given repository base path", ) init_argparse_common(subparse, use_all_deps=True, use_quiet=True, use_compress_level=True, use_exclude=True) subparse.set_defaults( func=lambda args: bam_commands.pack( args.paths, args.output or ((os.path.splitext(args.paths[0])[0] + ".zip") if args.mode == 'ZIP' else None), args.mode, repository_base_path=args.repository_base_path or None, all_deps=args.all_deps, use_quiet=args.use_quiet, warn_remap_externals=args.warn_remap_externals, compress_level=args.compress_level, filename_filter=args.exclude, ), ) def create_argparse_copy(subparsers): import argparse subparse = subparsers.add_parser( "copy", aliases=("cp",), help="Copy blend file(s) and their dependencies to a new location (maintaining the directory structure).", description= """ The line below will copy ``scene.blend`` to ``/destination/to/scene.blend``. .. code-block:: sh bam copy /path/to/scene.blend --base=/path --output=/destination .. code-block:: sh # you can also copy multiple files bam copy /path/to/scene.blend /path/other/file.blend --base=/path --output /other/destination """, formatter_class=argparse.RawDescriptionHelpFormatter, ) subparse.add_argument( dest="paths", nargs="+", help="Path(s) to blend files to operate on", ) subparse.add_argument( "-o", "--output", dest="output", metavar='DIR', required=True, help="Output directory where where files will be copied to", ) subparse.add_argument( "-b", "--base", dest="base", metavar='DIR', required=True, help="Base directory for input paths (files outside this path will be omitted)", ) init_argparse_common(subparse, use_all_deps=True, use_quiet=True, use_exclude=True) subparse.set_defaults( func=lambda args: bam_commands.copy( args.paths, args.output, args.base, all_deps=args.all_deps, use_quiet=args.use_quiet, filename_filter=args.exclude, ), ) def create_argparse_remap(subparsers): import argparse subparse = subparsers.add_parser( "remap", help="Remap blend file paths", description= """ This command is a 3 step process: - first run ``bam remap start .`` which stores the current state of your project (recursively). - then re-arrange the files on the filesystem (rename, relocate). - finally run ``bam remap finish`` to apply the changes, updating the ``.blend`` files internal paths. .. code-block:: sh cd /my/project bam remap start . mv photos textures mv house_v14_library.blend house_libraray.blend bam remap finish .. note:: Remapping creates a file called ``bam_remap.data`` in the current directory. You can relocate the entire project to a new location but on executing ``finish``, this file must be accessible from the current directory. .. note:: This command depends on files unique contents, take care not to modify the files once remap is started. """, formatter_class=argparse.RawDescriptionHelpFormatter, ) subparse_remap_commands = subparse.add_subparsers( title="Remap commands", description='valid subcommands', help='additional help', ) sub_subparse = subparse_remap_commands.add_parser( "start", help="Start remapping the blend files", ) sub_subparse.add_argument( dest="paths", nargs="*", help="Path(s) to operate on", ) init_argparse_common(sub_subparse, use_json=True) sub_subparse.set_defaults( func=lambda args: bam_commands.remap_start( args.paths or ["."], use_json=args.json, ), ) sub_subparse = subparse_remap_commands.add_parser( "finish", help="Finish remapping the blend files", ) sub_subparse.add_argument( dest="paths", nargs="*", help="Path(s) to operate on", ) sub_subparse.add_argument( "-r", "--force-relative", dest="force_relative", action='store_true', help="Make all remapped paths relative (even if they were originally absolute)", ) sub_subparse.add_argument( "-d", "--dry-run", dest="dry_run", action='store_true', help="Just print output as if the paths are being run", ) init_argparse_common(sub_subparse, use_json=True) sub_subparse.set_defaults( func=lambda args: bam_commands.remap_finish( args.paths or ["."], force_relative=args.force_relative, dry_run=args.dry_run, use_json=args.json, ), ) sub_subparse = subparse_remap_commands.add_parser( "reset", help="Cancel path remapping", ) init_argparse_common(sub_subparse, use_json=True) sub_subparse.set_defaults( func=lambda args: bam_commands.remap_reset( use_json=args.json, ), ) def create_argparse(): import argparse usage_text = ( "BAM!\n" + __doc__ ) parser = argparse.ArgumentParser( prog="bam", description=usage_text, ) subparsers = parser.add_subparsers( title='subcommands', description='valid subcommands', help='additional help', ) create_argparse_init(subparsers) create_argparse_create(subparsers) create_argparse_checkout(subparsers) create_argparse_commit(subparsers) create_argparse_update(subparsers) create_argparse_revert(subparsers) create_argparse_status(subparsers) create_argparse_list(subparsers) # non-bam project commands create_argparse_deps(subparsers) create_argparse_pack(subparsers) create_argparse_copy(subparsers) create_argparse_remap(subparsers) return parser def main(argv=None): if argv is None: argv = sys.argv[1:] logging.basicConfig( level=logging.INFO, format='%(asctime)-15s %(levelname)8s %(name)s %(message)s', ) parser = create_argparse() args = parser.parse_args(argv) # call subparser callback if not hasattr(args, "func"): parser.print_help() return args.func(args) if __name__ == "__main__": raise Exception("This module can't be executed directly, Call '../bam_cli.py'")