This repository has been archived on 2023-02-28. You can view files and clone it, but cannot push or open issues or pull requests.
Files
blender-asset-manager/client/cli/bam

378 lines
9.9 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python3
# ***** BEGIN GPL LICENSE BLOCK *****
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# ***** END GPL LICENCE BLOCK *****
2014-10-30 22:38:05 +01:00
"""
Blender asset manager
"""
# TODO
2014-10-30 22:38:05 +01:00
# * checkout command (store some handy json info!)
# * commit command
# ** staging for svn commit
# ** svn commit
# ** handle errors
# * definition of project (.bam (like .git)) ... json
2014-10-31 16:46:52 +01:00
class bam_config:
# fake module
__slots__ = ()
def __new__(cls, *args, **kwargs):
raise RuntimeError("%s should not be instantiated" % cls)
CONFIG_DIR = ".bam"
@staticmethod
2014-11-04 17:56:34 +01:00
def find_basedir(cwd=None):
2014-10-31 16:46:52 +01:00
"""
Return the config path (or None when not found)
"""
import os
2014-11-04 17:56:34 +01:00
if cwd is None:
cwd = os.getcwd()
2014-10-31 16:46:52 +01:00
parent = (os.path.normpath(
os.path.abspath(
2014-11-04 17:56:34 +01:00
cwd)))
2014-10-31 16:46:52 +01:00
parent_prev = None
while parent != parent_prev:
test_dir = os.path.join(parent, bam_config.CONFIG_DIR)
if os.path.isdir(test_dir):
return test_dir
parent_prev = parent
parent = os.path.dirname(parent)
return None
@staticmethod
2014-11-04 17:56:34 +01:00
def load(id_, cwd=None):
basedir = bam_config.find_basedir(cwd=cwd)
filepath = os.path.join(basedir, id_)
with open(filepath, 'r') as f:
import json
return json.load(f)
2014-10-31 16:46:52 +01:00
@staticmethod
2014-11-04 17:56:34 +01:00
def write(id_, data, cwd=None):
bam_config.find_basedir(cwd=cwd)
filepath = os.path.join(basedir, id_)
with open(filepath) as f:
import json
json.dump(
data, f, ensure_ascii=False,
check_circular=False,
# optional (pretty)
sort_keys=True, indent=4, separators=(',', ': '),
)
2014-10-31 16:46:52 +01:00
2014-10-30 22:38:05 +01:00
class bam_utils:
# fake module
__slots__ = ()
def __new__(cls, *args, **kwargs):
raise RuntimeError("%s should not be instantiated" % cls)
2014-11-04 14:35:32 +01:00
# Copied from 'packer.py', TODO(cam) de-duplicate
@staticmethod
def sha1_for_file(fn, block_size=1 << 20):
with open(fn, 'rb') as f:
import hashlib
sha1 = hashlib.new('sha1')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
return sha1.hexdigest()
# end code duplication
# --------------------
@staticmethod
def session_find_url():
return "http://localhost:5000"
2014-10-30 22:38:05 +01:00
@staticmethod
def session_request_url(req_path):
# TODO, get from config
2014-10-31 16:46:52 +01:00
BAM_SERVER = bam_utils.session_find_url()
2014-10-30 22:38:05 +01:00
result = "%s/%s" % (BAM_SERVER, req_path)
return result
2014-10-30 22:38:05 +01:00
@staticmethod
def checkout(paths):
import sys
import requests
# TODO(cam) multiple paths
path = paths[0]
del paths
payload = {
"filepath": path,
"command": "checkout",
}
r = requests.get(
bam_utils.session_request_url("file"),
params=payload,
auth=("bam", "bam"),
stream=True,
)
if r.status_code not in {200, }:
2014-10-30 22:38:05 +01:00
# TODO(cam), make into reusable function?
print("Error %d:\n%s" % (r.status_code, next(r.iter_content(chunk_size=1024)).decode('utf-8')))
return
# TODO(cam) how to tell if we get back a message payload? or real data???
local_filename = payload['filepath'].split('/')[-1]
if 1:
local_filename += ".zip"
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.flush()
sys.stdout.write(".")
sys.stdout.flush()
print("Written:", local_filename)
2014-11-04 14:35:32 +01:00
@staticmethod
2014-11-04 14:46:57 +01:00
def commit(paths, message):
2014-11-04 14:35:32 +01:00
import sys
import os
import requests
# TODO(cam) ignore files
# TODO(cam) multiple paths
path = paths[0]
if not os.path.isdir(path):
print("Expected a directory (%r)" % path)
sys.exit(1)
# make a zipfile from session
import json
with open(os.path.join(path, ".bam_paths_uuid.json")) as f:
d = json.load(f)
paths_modified = []
for fn, sha1 in d.items():
fn_abs = os.path.join(path, fn)
if bam_utils.sha1_for_file(fn_abs) != sha1:
paths_modified.append((fn, fn_abs))
print(d)
print(paths_modified)
# -------------------------
print("Now make a zipfile")
import zipfile
temp_zip = os.path.join(path, ".bam_tmp.zip")
with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zip:
for (fn, fn_abs) in paths_modified:
print(" Archiving %r" % fn_abs)
zip.write(fn_abs,
arcname=fn)
2014-11-04 14:46:57 +01:00
# --------------
# Commit Request
2014-11-04 14:35:32 +01:00
args = {
2014-11-04 14:46:57 +01:00
'message': message,
2014-11-04 14:35:32 +01:00
}
payload = {
'command': 'commit',
'arguments': json.dumps(args),
}
files = {
'file': open(temp_zip, 'rb'),
}
r = requests.put(
bam_utils.session_request_url("file"),
params=payload,
auth=('bam', 'bam'),
files=files)
print("Return is:", r.text)
2014-11-04 14:35:32 +01:00
files['file'].close()
os.remove(temp_zip)
2014-10-30 22:38:05 +01:00
@staticmethod
def list_dir(paths):
import sys
import requests
2014-10-31 16:46:52 +01:00
# print(bam_config.find_config())
2014-10-30 22:38:05 +01:00
path = paths[0]
del paths
# TODO(cam) multiple paths
payload = {
"path": path,
}
r = requests.get(
bam_utils.session_request_url("file_list"),
params=payload,
auth=("bam", "bam"),
stream=True,
)
2014-10-30 23:03:39 +01:00
items = r.json().get("items_list", ())
for (name_short, name_full, file_type) in items:
if file_type == "dir":
print(" %s/" % name_short)
for (name_short, name_full, file_type) in items:
if file_type != "dir":
print(" %s" % name_short)
def subcommand_checkout_cb(args):
2014-10-30 22:38:05 +01:00
bam_utils.checkout(args.paths)
def subcommand_commit_cb(args):
bam_utils.commit(args.paths, args.message)
def subcommand_update_cb(args):
print(args)
def subcommand_revert_cb(args):
print(args)
def subcommand_list_cb(args):
2014-10-30 22:38:05 +01:00
bam_utils.list_dir(args.paths)
def subcommand_status_cb(args):
print(args)
def create_argparse_checkout(subparsers):
subparse = subparsers.add_parser("checkout", aliases=("co",))
subparse.add_argument(
"paths", nargs="+", help="Path(s) to operate on",
)
subparse.set_defaults(func=subcommand_checkout_cb)
def create_argparse_commit(subparsers):
subparse = subparsers.add_parser("commit", aliases=("ci",))
2014-10-30 22:38:05 +01:00
subparse.add_argument(
"-m", "--message", dest="message", metavar='MESSAGE',
help="Commit message",
)
subparse.add_argument(
"paths", nargs="+", help="paths to commit",
)
2014-10-30 22:38:05 +01:00
subparse.set_defaults(func=subcommand_commit_cb)
def create_argparse_update(subparsers):
subparse = subparsers.add_parser("update", aliases=("up",))
subparse.add_argument(
"paths", nargs="+", help="Path(s) to operate on",
)
subparse.set_defaults(func=subcommand_update_cb)
def create_argparse_revert(subparsers):
subparse = subparsers.add_parser("update", aliases=("up",))
subparse.add_argument(
"paths", nargs="+", help="Path(s) to operate on",
)
subparse.set_defaults(func=subcommand_revert_cb)
def create_argparse_status(subparsers):
subparse = subparsers.add_parser("status", aliases=("st",))
subparse.add_argument(
"paths", nargs="+", help="Path(s) to operate on",
)
subparse.set_defaults(func=subcommand_status_cb)
def create_argparse_list(subparsers):
subparse = subparsers.add_parser("list", aliases=("ls",))
subparse.add_argument(
"paths", nargs="+", help="Path(s) to operate on",
)
subparse.set_defaults(func=subcommand_list_cb)
def create_argparse():
import os
import argparse
usage_text = (
2014-10-30 22:38:05 +01:00
"BAM! (Blender Asset Manager)\n" +
__doc__
)
parser = argparse.ArgumentParser(description=usage_text)
subparsers = parser.add_subparsers(
title='subcommands',
description='valid subcommands',
help='additional help')
create_argparse_checkout(subparsers)
create_argparse_commit(subparsers)
create_argparse_update(subparsers)
create_argparse_revert(subparsers)
create_argparse_status(subparsers)
create_argparse_list(subparsers)
return parser
def main():
import sys
parser = create_argparse()
args = parser.parse_args(sys.argv[1:])
# call subparser callback
if not hasattr(args, "func"):
parser.print_help()
return
args.func(args)
if __name__ == "__main__":
main()