No longer importing, then deleting, then re-importing modules all over the place. Just import at the top of the file. Also removed some superfluous return statements.
1773 lines
58 KiB
Python
Executable File
1773 lines
58 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Apache License, Version 2.0
|
|
|
|
"""
|
|
Test bam command line client
|
|
|
|
Run all tests:
|
|
|
|
python3 test_cli.py
|
|
|
|
Run a single test:
|
|
|
|
python3 -m unittest test_cli.BamCheckoutTest.test_checkout
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import unittest
|
|
|
|
VERBOSE = os.environ.get("VERBOSE", False)
|
|
if VERBOSE == "0":
|
|
VERBOSE = None
|
|
if VERBOSE:
|
|
# for the server subprocess
|
|
os.environ["BAM_VERBOSE"] = "1"
|
|
|
|
|
|
# ------------------
|
|
# Ensure module path
|
|
path = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
|
|
if path not in sys.path:
|
|
sys.path.append(path)
|
|
del path
|
|
# --------
|
|
|
|
|
|
# ------------------
|
|
# Ensure module path
|
|
path = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "webservice", "bam"))
|
|
if path not in sys.path:
|
|
sys.path.append(path)
|
|
del path
|
|
# --------
|
|
|
|
|
|
# -------------------------
|
|
# Quiet the werkzeug logger
|
|
if not VERBOSE:
|
|
import werkzeug
|
|
import werkzeug._internal
|
|
werkzeug._internal._log = lambda *a, **b: None
|
|
del werkzeug
|
|
# --------
|
|
|
|
|
|
# -----------------------------------------
|
|
# Ensure we get stdout & stderr on sys.exit
|
|
#
|
|
# We have this because we override the standard output,
|
|
# _AND_ during this state a command may call `sys.exit`
|
|
# In this case, we the output is hidden which is very annoying.
|
|
#
|
|
# So override `sys.exit` with one that prints to the original
|
|
# stdout/stderr on exit.
|
|
if 1:
|
|
def exit(status):
|
|
import io
|
|
globals().update(sys.exit.exit_data)
|
|
if isinstance(sys.stdout, io.StringIO):
|
|
|
|
_stdout.write("\nsys.exit(%d) with message:\n" % status)
|
|
|
|
sys.stdout.seek(0)
|
|
sys.stderr.seek(0)
|
|
_stdout.write(sys.stdout.read())
|
|
_stderr.write(sys.stderr.read())
|
|
|
|
_stdout.write("\n")
|
|
|
|
_stdout.flush()
|
|
_stderr.flush()
|
|
_exit(status)
|
|
|
|
exit.exit_data = {
|
|
"_exit": sys.exit,
|
|
"_stdout": sys.stdout,
|
|
"_stderr": sys.stderr,
|
|
}
|
|
sys.exit = exit
|
|
del exit
|
|
# --------
|
|
|
|
# --------------------------------------------
|
|
# Don't Exit when argparse fails to parse args
|
|
#
|
|
# Argparse can call `sys.exit` if the wrong args are given.
|
|
# This messes with testing, which we want to keep the process running.
|
|
#
|
|
# This monkey-patches in an exist function which simply raises an exception.
|
|
# We could do something a bit nicer here,
|
|
# but for now just use a basic exception.
|
|
|
|
|
|
def argparse_fake_exit(self, status, message):
|
|
sys.__stdout__.write(message)
|
|
raise Exception(message)
|
|
|
|
argparse.ArgumentParser.exit = argparse_fake_exit
|
|
del argparse_fake_exit
|
|
# --------
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Real beginning of code!
|
|
|
|
TEMP_LOCAL = "/tmp/bam_test"
|
|
# Separate tmp directory for server, since we don't reset the server at every test
|
|
TEMP_SERVER = "/tmp/bam_test_server"
|
|
PORT = 5555
|
|
PROJECT_NAME = "test_project"
|
|
|
|
# running scripts next to this one!
|
|
CURRENT_DIR = os.path.dirname(__file__)
|
|
|
|
|
|
def args_as_string(args):
|
|
"""
|
|
Print args so we can paste them to run them again.
|
|
"""
|
|
import shlex
|
|
return " ".join([shlex.quote(c) for c in args])
|
|
|
|
|
|
def run(cmd, cwd=None):
|
|
if VERBOSE:
|
|
print(">>> ", args_as_string(cmd))
|
|
import subprocess
|
|
kwargs = {
|
|
"stderr": subprocess.PIPE,
|
|
"stdout": subprocess.PIPE,
|
|
}
|
|
if cwd is not None:
|
|
kwargs["cwd"] = cwd
|
|
|
|
proc = subprocess.Popen(cmd, **kwargs)
|
|
stdout, stderr = proc.communicate()
|
|
returncode = proc.returncode
|
|
|
|
if VERBOSE:
|
|
sys.stdout.write(" stdout: %s\n" % stdout.strip())
|
|
sys.stdout.write(" stderr: %s\n" % stderr.strip())
|
|
sys.stdout.write(" return: %d\n" % returncode)
|
|
|
|
return stdout, stderr, returncode
|
|
|
|
|
|
def run_check(cmd, cwd=None, returncode_ok=(0,)):
|
|
stdout, stderr, returncode = run(cmd, cwd)
|
|
if returncode in returncode_ok:
|
|
return True
|
|
|
|
# verbose will have already printed
|
|
if not VERBOSE:
|
|
print(">>> ", args_as_string(cmd))
|
|
sys.stdout.write(" stdout: %s\n" % stdout.strip())
|
|
sys.stdout.write(" stderr: %s\n" % stderr.strip())
|
|
sys.stdout.write(" return: %d\n" % returncode)
|
|
return False
|
|
|
|
|
|
class CHDir:
|
|
__slots__ = (
|
|
"dir_old",
|
|
"dir_new",
|
|
)
|
|
|
|
def __init__(self, directory):
|
|
self.dir_old = os.getcwd()
|
|
self.dir_new = directory
|
|
|
|
def __enter__(self):
|
|
os.chdir(self.dir_new)
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
os.chdir(self.dir_old)
|
|
|
|
|
|
class StdIO:
|
|
__slots__ = (
|
|
"stdout",
|
|
"stderr",
|
|
)
|
|
|
|
def __init__(self):
|
|
self.stdout = sys.stdout
|
|
self.stderr = sys.stderr
|
|
|
|
def read(self):
|
|
sys.stdout.seek(0)
|
|
sys.stderr.seek(0)
|
|
return sys.stdout.read(), sys.stderr.read()
|
|
|
|
def __enter__(self):
|
|
import io
|
|
sys.stdout = io.StringIO()
|
|
sys.stderr = io.StringIO()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if exc_type is not None:
|
|
self.stdout.write("\n".join(self.read()))
|
|
|
|
sys.stdout = self.stdout
|
|
sys.stderr = self.stderr
|
|
|
|
|
|
def svn_repo_create(id_, dirname):
|
|
return run_check(["svnadmin", "create", id_], cwd=dirname)
|
|
|
|
|
|
def svn_repo_checkout(repo, path):
|
|
return run_check(["svn", "checkout", repo, path])
|
|
|
|
|
|
def bam_run(argv, cwd=None):
|
|
with CHDir(cwd):
|
|
import bam.cli
|
|
|
|
if VERBOSE:
|
|
sys.stdout.write("\n running: ")
|
|
if cwd is not None:
|
|
sys.stdout.write("cd %r ; " % cwd)
|
|
import shlex
|
|
sys.stdout.write("bam %s\n" % " ".join([shlex.quote(c) for c in argv]))
|
|
|
|
# input('press_key!:')
|
|
|
|
with StdIO() as fakeio:
|
|
bam.cli.main(argv)
|
|
ret = fakeio.read()
|
|
|
|
if VERBOSE:
|
|
sys.stdout.write(" stdout: %s\n" % ret[0].strip())
|
|
sys.stdout.write(" stderr: %s\n" % ret[1].strip())
|
|
|
|
return ret
|
|
|
|
|
|
def bam_run_as_json(argv, cwd=None):
|
|
stdout, stderr = bam_run(argv, cwd=cwd)
|
|
if stderr:
|
|
raise Exception(stderr)
|
|
|
|
try:
|
|
return json.loads(stdout)
|
|
except Exception as e:
|
|
print("---- JSON BEGIN (invalid) ----")
|
|
print(stdout)
|
|
print("---- JSON END ----")
|
|
raise e
|
|
|
|
|
|
def file_quick_write(path, filepart=None, data=None, append=False):
|
|
"""
|
|
Quick file creation utility.
|
|
"""
|
|
|
|
if data is None:
|
|
data = b''
|
|
|
|
mode = 'a' if append else 'w'
|
|
if type(data) is bytes:
|
|
mode = mode + 'b'
|
|
elif type(data) is str:
|
|
pass
|
|
else:
|
|
raise Exception("type %r not known" % type(data))
|
|
|
|
if filepart is not None:
|
|
path = os.path.join(path, filepart)
|
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, mode) as f:
|
|
f.write(data)
|
|
|
|
|
|
def file_quick_read(path, filepart=None, mode='rb'):
|
|
|
|
if filepart is not None:
|
|
path = os.path.join(path, filepart)
|
|
|
|
with open(path, mode) as f:
|
|
return f.read()
|
|
|
|
|
|
def file_quick_touch(path, filepart=None, times=None):
|
|
if filepart is not None:
|
|
path = os.path.join(path, filepart)
|
|
with open(path, 'a'):
|
|
os.utime(path, times)
|
|
|
|
|
|
def file_quick_touch_blend(path, filepart=None, times=None):
|
|
if filepart is not None:
|
|
path = os.path.join(path, filepart)
|
|
|
|
if not os.path.exists(path):
|
|
Exception("Path not found %r" % path)
|
|
|
|
# we can write junk data into the end of the blend file
|
|
with open(path, 'a') as fh:
|
|
fh.write("_")
|
|
os.utime(path, times)
|
|
|
|
|
|
def file_quick_image(path, filepart=None, fill_color=b'\xff' * 4):
|
|
def write_png(buf, width, height):
|
|
"""
|
|
buf: must be bytes or a bytearray in py3, a regular string in py2. formatted RGBARGBA...
|
|
"""
|
|
import zlib
|
|
import struct
|
|
|
|
width_byte_4 = width * 4
|
|
raw_data = b''.join(b'\x00' + buf[span:span + width_byte_4]
|
|
for span in range((height - 1) * width * 4, -1, - width_byte_4))
|
|
|
|
def png_pack(png_tag, data):
|
|
chunk_head = png_tag + data
|
|
return (struct.pack("!I", len(data)) +
|
|
chunk_head +
|
|
struct.pack("!I", 0xFFFFFFFF & zlib.crc32(chunk_head)))
|
|
|
|
return b''.join([
|
|
b'\x89PNG\r\n\x1a\n',
|
|
png_pack(b'IHDR', struct.pack("!2I5B", width, height, 8, 6, 0, 0, 0)),
|
|
png_pack(b'IDAT', zlib.compress(raw_data, 9)),
|
|
png_pack(b'IEND', b'')])
|
|
|
|
if filepart is not None:
|
|
path = os.path.join(path, filepart)
|
|
with open(path, 'wb') as f:
|
|
f.write(write_png(fill_color * 4, 2, 2))
|
|
|
|
|
|
def _dbg_dump_path(path):
|
|
stdout, stderr, returncode = run(["find", path], path)
|
|
print("Contents of: %r" % path)
|
|
print("\n".join(sorted(stdout.decode('utf-8').split("\n"))))
|
|
|
|
|
|
def blendfile_template_create(blendfile, blendfile_root, create_id, create_data, deps):
|
|
returncode_test = 123
|
|
blendfile_deps_json = os.path.join(TEMP_LOCAL, "blend_template_deps.json")
|
|
os.makedirs(os.path.dirname(blendfile), exist_ok=True)
|
|
|
|
if create_data is not None:
|
|
blendfile_create_data_json = os.path.join(TEMP_LOCAL, "blendfile_create_data.json")
|
|
with open(blendfile_create_data_json, 'w') as f:
|
|
import json
|
|
json.dump(
|
|
create_data, f, ensure_ascii=False,
|
|
check_circular=False,
|
|
# optional (pretty)
|
|
sort_keys=True, indent=4, separators=(',', ': '),
|
|
)
|
|
del json
|
|
else:
|
|
blendfile_create_data_json = None
|
|
|
|
blender = os.getenv('BLENDER_BIN', "blender")
|
|
cmd = (
|
|
blender,
|
|
"--background",
|
|
"--factory-startup",
|
|
"-noaudio",
|
|
"--python",
|
|
os.path.join(CURRENT_DIR, "blendfile_templates.py"),
|
|
"--",
|
|
blendfile,
|
|
blendfile_root,
|
|
blendfile_deps_json,
|
|
create_id,
|
|
"NONE" if blendfile_create_data_json is None else blendfile_create_data_json,
|
|
str(returncode_test),
|
|
)
|
|
stdout, stderr, returncode = run(cmd)
|
|
|
|
if os.path.exists(blendfile_deps_json):
|
|
with open(blendfile_deps_json, 'r') as f:
|
|
import json
|
|
deps[:] = json.load(f)
|
|
del json
|
|
os.remove(blendfile_deps_json)
|
|
else:
|
|
deps.clear()
|
|
|
|
if blendfile_create_data_json is not None:
|
|
os.remove(blendfile_create_data_json)
|
|
|
|
if returncode != returncode_test:
|
|
# verbose will have already printed
|
|
if not VERBOSE:
|
|
print(">>> ", args_as_string(cmd))
|
|
sys.stdout.write(" stdout: %s\n" % stdout.strip())
|
|
sys.stdout.write(" stderr: %s\n" % stderr.strip())
|
|
sys.stdout.write(" return: %d\n" % returncode)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def blendfile_template_create_from_files(proj_path, session_path, blendfile, images):
|
|
|
|
for i, f_proj in enumerate(images):
|
|
f_abs = os.path.join(session_path, f_proj)
|
|
os.makedirs(os.path.dirname(f_abs), exist_ok=True)
|
|
file_quick_image(f_abs, fill_color=bytes([i]))
|
|
|
|
blendfile_abs = os.path.join(session_path, blendfile)
|
|
deps = []
|
|
if not blendfile_template_create(blendfile_abs, session_path, "create_from_files", None, deps):
|
|
return False
|
|
|
|
# not essential but we need to be sure what we made has correct deps
|
|
# otherwise further tests will fail
|
|
ret = bam_run_as_json(["deps", blendfile_abs, "--json"], proj_path)
|
|
|
|
# not real test since we don't use static method,
|
|
# just check we at least account for all deps
|
|
assert(len(ret) == len(images))
|
|
|
|
|
|
def blendfile_template_create_from_file_liblinks(proj_path, session_path, blendfile, links):
|
|
|
|
blendfile_abs = os.path.join(session_path, blendfile)
|
|
deps = []
|
|
|
|
links_abs = []
|
|
for f, f_id, f_links in links:
|
|
f_abs = os.path.join(session_path, f)
|
|
f_abs_dir = os.path.dirname(f_abs)
|
|
os.makedirs(f_abs_dir, exist_ok=True)
|
|
links_abs.append((
|
|
f_abs,
|
|
f_id,
|
|
[os.path.join(session_path, l) for l in f_links],
|
|
))
|
|
|
|
if not blendfile_template_create(blendfile_abs, session_path, "create_from_file_liblinks", links_abs, deps):
|
|
return False
|
|
|
|
# not essential but we need to be sure what we made has correct deps
|
|
# otherwise further tests will fail
|
|
ret = bam_run_as_json(["deps", blendfile_abs, "--json"], proj_path)
|
|
|
|
# not real test since we don't use static method,
|
|
# just check we at least account for all deps
|
|
# assert(len(ret) == len(links))
|
|
return True
|
|
|
|
|
|
def wait_for_input():
|
|
"""
|
|
For debugging, so we can inspect the state of the system before the test finished.
|
|
"""
|
|
input('press any key to continue:')
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Server
|
|
|
|
|
|
def server(mode='testing', debug=False):
|
|
"""
|
|
Start development server via Flask app.run() in a separate thread. We need server
|
|
to run in order to check most of the client commands.
|
|
"""
|
|
|
|
def run_testing_server():
|
|
from application import app
|
|
|
|
# If we run the server in testing mode (the default) we override sqlite database,
|
|
# with a testing, disposable one (create TMP dir)
|
|
if mode == 'testing':
|
|
from application import db
|
|
from application.modules.projects.model import Project, ProjectSetting
|
|
# Override sqlite database
|
|
if not os.path.isdir(TEMP_SERVER):
|
|
os.makedirs(TEMP_SERVER)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + TEMP_SERVER + '/bam_test.db'
|
|
# Use the model definitions to create all the tables
|
|
db.create_all()
|
|
# Create a testing project, based on the global configuration (depends on a
|
|
# correct initialization of the SVN repo and on the creation of a checkout)
|
|
|
|
# TODO(fsiddi): turn these values in variables
|
|
project = Project(
|
|
name=PROJECT_NAME,
|
|
repository_path=os.path.join(TEMP_LOCAL, "remote_store/svn_checkout"),
|
|
upload_path=os.path.join(TEMP_LOCAL, "remote_store/upload"),
|
|
status="active",
|
|
)
|
|
db.session.add(project)
|
|
db.session.commit()
|
|
|
|
setting = ProjectSetting(
|
|
project_id=project.id,
|
|
name="svn_password",
|
|
value="my_password",
|
|
data_type="str",
|
|
)
|
|
db.session.add(setting)
|
|
db.session.commit()
|
|
|
|
setting = ProjectSetting(
|
|
project_id=project.id,
|
|
name="svn_default_user",
|
|
value="my_user",
|
|
data_type="str",
|
|
)
|
|
db.session.add(setting)
|
|
db.session.commit()
|
|
|
|
# Run the app in production mode (prevents tests to run twice)
|
|
app.run(port=PORT, debug=debug)
|
|
|
|
from multiprocessing import Process
|
|
p = Process(target=run_testing_server, args=())
|
|
p.start()
|
|
|
|
os.system("sleep 1")
|
|
return p
|
|
|
|
|
|
def global_setup(use_server=True):
|
|
data = []
|
|
|
|
if VERBOSE:
|
|
# for server
|
|
import logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
del logging
|
|
|
|
shutil.rmtree(TEMP_SERVER, ignore_errors=True)
|
|
shutil.rmtree(TEMP_LOCAL, ignore_errors=True)
|
|
|
|
if use_server:
|
|
p = server()
|
|
data.append(p)
|
|
|
|
return data
|
|
|
|
|
|
def global_teardown(data, use_server=True):
|
|
|
|
if use_server:
|
|
p = data.pop(0)
|
|
p.terminate()
|
|
|
|
shutil.rmtree(TEMP_SERVER, ignore_errors=True)
|
|
shutil.rmtree(TEMP_LOCAL, ignore_errors=True)
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Unit Tests
|
|
|
|
|
|
class BamSimpleTestCase(unittest.TestCase):
|
|
"""
|
|
Basic testcase, only make temp dirs.
|
|
"""
|
|
def setUp(self):
|
|
|
|
# for running single tests
|
|
if __name__ != "__main__":
|
|
self._data = global_setup(use_server=False)
|
|
|
|
if not os.path.isdir(TEMP_LOCAL):
|
|
os.makedirs(TEMP_LOCAL)
|
|
|
|
def tearDown(self):
|
|
# input('Wait:')
|
|
shutil.rmtree(TEMP_LOCAL)
|
|
|
|
# for running single tests
|
|
if __name__ != "__main__":
|
|
global_teardown(self._data, use_server=False)
|
|
|
|
|
|
class BamSessionTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
# for running single tests
|
|
if __name__ != "__main__":
|
|
self._data = global_setup()
|
|
|
|
if not os.path.isdir(TEMP_LOCAL):
|
|
os.makedirs(TEMP_LOCAL)
|
|
# Create local storage directory
|
|
if not os.path.isdir(self.path_local_store):
|
|
os.makedirs(self.path_local_store)
|
|
|
|
# Create remote storage (usually is on the server).
|
|
# SVN repo and SVN checkout will live here
|
|
if not os.path.isdir(self.path_remote_store):
|
|
os.makedirs(self.path_remote_store)
|
|
|
|
# Check for SVN repo directory
|
|
path_svn_repo = os.path.join(self.path_remote_store, "svn_repo")
|
|
if not os.path.isdir(path_svn_repo):
|
|
os.makedirs(path_svn_repo)
|
|
|
|
# Create a fresh SVN repository
|
|
if not svn_repo_create(self.proj_name, path_svn_repo):
|
|
self.fail("svn_repo: create")
|
|
|
|
# Check for SVN checkout
|
|
path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout")
|
|
|
|
# Create an SVN checkout of the freshly created repo
|
|
path_svn_repo_url = "file://%s" % os.path.join(path_svn_repo, self.proj_name)
|
|
if not svn_repo_checkout(path_svn_repo_url, path_svn_checkout):
|
|
self.fail("svn_repo: checkout %r" % path_svn_repo_url)
|
|
|
|
def tearDown(self):
|
|
# input('Wait:')
|
|
shutil.rmtree(TEMP_LOCAL)
|
|
|
|
# for running single tests
|
|
if __name__ != "__main__":
|
|
global_teardown(self._data)
|
|
|
|
def get_url(self):
|
|
url_full = "%s@%s/%s" % (self.user_name, self.server_addr, self.proj_name)
|
|
user_name, url = url_full.rpartition('@')[0::2]
|
|
return url_full, user_name, url
|
|
|
|
def init_defaults(self):
|
|
self.path_local_store = os.path.join(TEMP_LOCAL, "local_store")
|
|
self.path_remote_store = os.path.join(TEMP_LOCAL, "remote_store")
|
|
|
|
self.proj_name = PROJECT_NAME
|
|
self.user_name = "user"
|
|
self.server_addr = "http://localhost:%s" % PORT
|
|
|
|
def init_repo(self):
|
|
url_full, user_name, url = self.get_url()
|
|
stdout, stderr = bam_run(["init", url_full], self.path_local_store)
|
|
self.assertEqual("", stderr)
|
|
proj_path = os.path.join(self.path_local_store, self.proj_name)
|
|
return proj_path
|
|
|
|
def init_session(self, session_name):
|
|
"""
|
|
Initialize the project and create a new session.
|
|
"""
|
|
|
|
proj_path = self.init_repo()
|
|
session_path = os.path.join(proj_path, session_name)
|
|
|
|
stdout, stderr = bam_run(["create", session_name], proj_path)
|
|
self.assertEqual("", stderr)
|
|
return proj_path, session_path
|
|
|
|
|
|
class BamInitTest(BamSessionTestCase):
|
|
"""
|
|
Test the `bam init user@http://bamserver/projectname` command.
|
|
We verify that a project directory is created, and that it contains a .bam subdirectory
|
|
with a config file, with the right url and user values (given in the command)
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_init(self):
|
|
proj_path = self.init_repo()
|
|
|
|
url_full, user_name, url = self.get_url()
|
|
with open(os.path.join(proj_path, ".bam", "config")) as f:
|
|
cfg = json.load(f)
|
|
self.assertEqual(url, cfg["url"])
|
|
self.assertEqual(user_name, cfg["user"])
|
|
|
|
with open(os.path.join(proj_path, ".bamignore")) as f:
|
|
self.assertEqual(f.readline(), ".*\.blend\d+$")
|
|
|
|
|
|
class BamListTest(BamSessionTestCase):
|
|
"""
|
|
Test for the `bam ls --json` command. We run it with --json for easier command
|
|
output parsing.
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_ls(self):
|
|
proj_path = self.init_repo()
|
|
|
|
ret = bam_run_as_json(["ls", "--json"], proj_path)
|
|
|
|
self.assertEqual([], [])
|
|
|
|
|
|
class BamCommitTest(BamSessionTestCase):
|
|
"""
|
|
Test for the `bam create` command. We run it with --json for easier command
|
|
output parsing.
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_commit(self):
|
|
session_name = "mysession"
|
|
file_name = "testfile.txt"
|
|
file_data = b"hello world!\n"
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# check an empty commit fails gracefully
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
self.assertEqual("Nothing to commit!\n", stdout)
|
|
|
|
# now do a real commit
|
|
file_quick_write(session_path, file_name, file_data)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
def test_commit_partial(self):
|
|
"""
|
|
Checks the commit is only writing the modified files,
|
|
across multiple commits and changes.
|
|
"""
|
|
session_name = "mysession"
|
|
files = (
|
|
"a.data",
|
|
os.path.join("b_dir", "b.data"),
|
|
os.path.join("c_dir", "c_subdir", "c.data"),
|
|
os.path.join("d_dir", "d_subdir", "d_nested", "d.data"),
|
|
)
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# ------
|
|
# Commit
|
|
# arbitrary data so this is seen as binary data
|
|
file_binary_chunk = b'\x89'
|
|
for f in files:
|
|
file_quick_write(session_path, f, file_binary_chunk + f.encode('ascii'))
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "test 1"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# now check that status reads there are no changes
|
|
ret = bam_run_as_json(["status", "--json"], session_path)
|
|
self.assertEqual([], ret)
|
|
|
|
# ------
|
|
# Modify
|
|
# check the status now shows modified
|
|
for f in files:
|
|
file_quick_write(session_path, f, b'_foo', append=True)
|
|
|
|
ret = bam_run_as_json(["status", "--json"], session_path)
|
|
ret.sort()
|
|
self.assertEqual(
|
|
[["M", "a.data"],
|
|
["M", "b_dir/b.data"],
|
|
["M", "c_dir/c_subdir/c.data"],
|
|
["M", "d_dir/d_subdir/d_nested/d.data"],
|
|
], ret)
|
|
|
|
def test_create_commit_update(self):
|
|
"""
|
|
After creating a new session, we commit its content. Then, we do some
|
|
edits and we commit again.
|
|
"""
|
|
|
|
session_name = "mysession"
|
|
file_name = "testfile.txt"
|
|
file_data = b"hello world!\n"
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# now do a real commit
|
|
file_quick_write(session_path, file_name, file_data)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# edit the file within the same session
|
|
new_file_data = b"goodbye cruel world!\n"
|
|
file_quick_write(session_path, file_name, new_file_data)
|
|
stdout, stderr = bam_run(["commit", "-m", "session second commit"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
|
|
# checkout the file again and compare its content with the committed change
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
self.assertTrue(os.path.exists(os.path.join(session_path, file_name)))
|
|
|
|
|
|
class BamCheckoutTest(BamSessionTestCase):
|
|
"""
|
|
Test for the `bam checkout` command.
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_checkout(self):
|
|
session_name = "mysession"
|
|
file_name = "other_file.txt"
|
|
file_data = b"yo world!\n"
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# now do a real commit
|
|
file_quick_write(session_path, file_name, file_data)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
|
|
# checkout the file again
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
# wait_for_input()
|
|
self.assertTrue(os.path.exists(os.path.join(session_path, file_name)))
|
|
|
|
file_data_test = file_quick_read(os.path.join(session_path, file_name))
|
|
self.assertEqual(file_data, file_data_test)
|
|
|
|
def test_checkout_in_existing_session(self):
|
|
session_name = "mysession"
|
|
file_name = "other_file.txt"
|
|
file_data = b"yo world!\n"
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# now do a real commit
|
|
file_quick_write(session_path, file_name, file_data)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# checkout inside of the existing session, should raise exception
|
|
self.assertRaises(RuntimeError, bam_run, ["checkout", file_name, "--output", session_path], session_path)
|
|
|
|
def test_checkout_variation(self):
|
|
"""
|
|
Checks that we can swap out a blend library with a variation.
|
|
"""
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
variation_path = os.path.join(session_path, "variations")
|
|
|
|
if 1:
|
|
# path cant already exist, ugh
|
|
shutil.copytree(
|
|
os.path.join(CURRENT_DIR, "blends", "variations"),
|
|
variation_path,
|
|
)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
|
|
listing = bam_run_as_json(["ls", "variations", "--json"], session_path)
|
|
listing_expect = [
|
|
["cone.blend", "file"],
|
|
["cone.blue.blend", "file"],
|
|
["lib_endpoint.blend", "file"],
|
|
["lib_user.blend", "file"],
|
|
]
|
|
|
|
self.assertEqual(listing, listing_expect)
|
|
|
|
# now create variation file & commit it
|
|
f_variation = os.path.join(variation_path, "lib_user.json")
|
|
file_quick_write(
|
|
f_variation,
|
|
data='{"variations": ["cone.blue.blend"]}',
|
|
)
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "add variation"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
listing = bam_run_as_json(["ls", "variations", "--json"], session_path)
|
|
listing_expect.append(["lib_user.json", "file"])
|
|
listing_expect.sort()
|
|
self.assertEqual(listing, listing_expect)
|
|
|
|
# now clear the repo and do a fresh checkout, and see that the variation is applied.
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
|
|
# checkout the file again
|
|
file_name = "variations/lib_endpoint.blend"
|
|
session_path = session_path + "_a"
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
if 1:
|
|
# try to commit (ensure UUID's are correct)
|
|
# (not mismatch since the variations are applied)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
self.assertEqual("Nothing to commit!\n", stdout)
|
|
|
|
ret = bam_run_as_json(["deps", "lib_endpoint.blend", "--json", "--recursive"], session_path)
|
|
ret.sort()
|
|
|
|
self.assertEqual(ret[0][1], "//lib_user.blend")
|
|
self.assertEqual(ret[0][3], "OK")
|
|
self.assertEqual(ret[1][1], "//cone.blue.blend")
|
|
self.assertEqual(ret[1][3], "OK")
|
|
|
|
def test_checkout_variations_image(self):
|
|
# absolute path: (project relative) -->
|
|
# checkout path: (relative to blend)
|
|
blendfile = "image_user.blend"
|
|
images = ("maps/generic.png",)
|
|
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
os.makedirs(os.path.join(session_path, "maps"))
|
|
|
|
blendfile_template_create_from_files(
|
|
proj_path, session_path,
|
|
blendfile, images)
|
|
|
|
# add image
|
|
file_quick_image(
|
|
session_path,
|
|
"maps/generic.blue.png",
|
|
fill_color=b'\x00\x00\xff\xff',
|
|
)
|
|
|
|
# now create variation file & commit it
|
|
f_variation = os.path.join(session_path, "image_user.json")
|
|
file_quick_write(
|
|
f_variation,
|
|
data='{"variations": ["maps/generic.blue.png"]}',
|
|
)
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "variation"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# we could commit and checkout, but instead pack
|
|
stdout, stderr = bam_run(["pack", blendfile, "--output", "out.zip", "--compress", "store"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
import zipfile
|
|
with zipfile.ZipFile(os.path.join(session_path, "out.zip"), 'r') as z_handle:
|
|
ret = z_handle.namelist()
|
|
ret.sort()
|
|
self.assertEqual(
|
|
z_handle.namelist(),
|
|
["image_user.blend", "maps/generic.blue.png"])
|
|
|
|
def test_checkout_missing_image(self):
|
|
"""
|
|
Check we can checkout a blend that has a missing path.
|
|
"""
|
|
blendfile = "image_user.blend"
|
|
images = ("maps/generic.png",)
|
|
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
os.makedirs(os.path.join(session_path, "maps"))
|
|
|
|
blendfile_template_create_from_files(
|
|
proj_path, session_path,
|
|
blendfile, images)
|
|
|
|
# we are going to remove the maps directory, getting bam to handle a missing path
|
|
shutil.rmtree(os.path.join(session_path, "maps"))
|
|
|
|
# commit and checkout
|
|
stdout, stderr = bam_run(["commit", "-m", "blend with missing files"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
shutil.rmtree(session_path)
|
|
|
|
session_path = session_path
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
self.assertIn("source missing", stdout)
|
|
self.assertIn(images[0], stdout)
|
|
|
|
|
|
class BamUpdateTest(BamSessionTestCase):
|
|
"""
|
|
Test for the `bam update` command.
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_update_blank(self):
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
stdout, stderr = bam_run(["update"], session_path)
|
|
# Empty and new session should not update at all
|
|
self.assertEqual("", stderr)
|
|
self.assertEqual("Nothing to update!\n", stdout)
|
|
|
|
def test_update_simple(self):
|
|
session_name = "mysession"
|
|
file_name = "other_file.txt"
|
|
file_data = b"initial data!\n"
|
|
file_data_append = b"appended data!\n"
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# now do a real commit
|
|
file_quick_write(session_path, file_name, file_data)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
|
|
# checkout the file again
|
|
session_path_a = session_path + "_a"
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", session_path_a], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
session_path_b = session_path + "_b"
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", session_path_b], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
file_quick_write(session_path_a, file_name, file_data_append, append=True)
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "commit appended data"], session_path_a)
|
|
self.assertEqual("", stderr)
|
|
|
|
stdout, stderr = bam_run(["update"], session_path_b)
|
|
self.assertEqual("", stderr)
|
|
|
|
with open(os.path.join(session_path_b, file_name), 'rb') as f:
|
|
self.assertEqual(f.read(), file_data + file_data_append)
|
|
|
|
|
|
class BamRevertTest(BamSessionTestCase):
|
|
"""
|
|
Test for the `bam revert` command.
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_revert_simple(self):
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
file_quick_write(session_path, "test.txt", data="test123")
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "commit test"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
stdout, stderr = bam_run(["status", ], session_path)
|
|
self.assertEqual("", stdout)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove and revert it
|
|
os.remove(os.path.join(session_path, "test.txt"))
|
|
|
|
ret = bam_run_as_json(["status", "--json"], session_path)
|
|
ret.sort()
|
|
self.assertEqual(
|
|
[["D", "test.txt"],
|
|
], ret)
|
|
stdout, stderr = bam_run(["revert", "test.txt"], session_path)
|
|
|
|
ret = bam_run_as_json(["status", "--json"], session_path)
|
|
ret.sort()
|
|
self.assertEqual([
|
|
], ret)
|
|
|
|
with open(os.path.join(session_path, "test.txt"), 'rb') as fh:
|
|
self.assertEqual(b'test123', fh.read())
|
|
|
|
def test_revert_blend(self):
|
|
"""
|
|
This replays binary edits.
|
|
"""
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
os.makedirs(os.path.join(session_path, "dir"))
|
|
# just a way to quickly get a lot of files.
|
|
for d in ("abs", "subdir"):
|
|
# path cant already exist, ugh
|
|
shutil.copytree(
|
|
os.path.join(CURRENT_DIR, "blends", "multi_level", d),
|
|
os.path.join(session_path, "dir", d),
|
|
)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
shutil.rmtree(os.path.join(session_path))
|
|
|
|
file_name = os.path.join("dir", "subdir", "house_lib_user.blend")
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
for i in range(2):
|
|
if i == 1:
|
|
blends = [
|
|
os.path.join(session_path, "house_lib_user.blend"),
|
|
os.path.join(session_path, "_dir", "abs", "path", "house_abs.blend"),
|
|
os.path.join(session_path, "rel", "path", "house_rel.blend"),
|
|
]
|
|
for f in blends:
|
|
os.remove(f)
|
|
stdout, stderr = bam_run(["revert"] + blends, session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
ret = bam_run_as_json(["deps", "house_lib_user.blend", "--json", "--recursive"], session_path)
|
|
ret.sort()
|
|
self.assertEqual(ret[0][1], "//_dir/abs/path/house_abs.blend")
|
|
self.assertEqual(ret[0][3], "OK")
|
|
self.assertEqual(ret[1][1], "//rel/path/house_rel.blend")
|
|
self.assertEqual(ret[1][3], "OK")
|
|
|
|
ret = bam_run_as_json(["status", "--json"], session_path)
|
|
ret.sort()
|
|
self.assertEqual([
|
|
], ret)
|
|
|
|
class BamBlendTest(BamSimpleTestCase):
|
|
|
|
def test_create_all(self):
|
|
"""
|
|
This simply tests all the create functions run without error.
|
|
"""
|
|
import blendfile_templates
|
|
TEMP_SESSION = os.path.join(TEMP_LOCAL, "blend_file_template")
|
|
|
|
def iter_files_session():
|
|
for dirpath, dirnames, filenames in os.walk(TEMP_SESSION):
|
|
for filename in filenames:
|
|
filepath = os.path.join(dirpath, filename)
|
|
yield filepath
|
|
|
|
for create_id, create_fn in blendfile_templates.__dict__.items():
|
|
if (create_id.startswith("create_") and create_fn.__class__.__name__ == "function"):
|
|
|
|
# ignore create functions which need data
|
|
if create_id in {"create_from_file_liblinks"}:
|
|
continue
|
|
|
|
os.makedirs(TEMP_SESSION)
|
|
|
|
blendfile = os.path.join(TEMP_SESSION, create_id + ".blend")
|
|
deps = []
|
|
|
|
if not blendfile_template_create(blendfile, TEMP_SESSION, create_id, None, deps):
|
|
# self.fail("blend file couldn't be create")
|
|
# ... we want to keep running
|
|
self.assertTrue(False, True) # GRR, a better way?
|
|
shutil.rmtree(TEMP_SESSION)
|
|
continue
|
|
|
|
self.assertTrue(os.path.exists(blendfile))
|
|
with open(blendfile, 'rb') as blendfile_handle:
|
|
self.assertEqual(b'BLENDER', blendfile_handle.read(7))
|
|
os.remove(blendfile)
|
|
|
|
# check all deps are accounted for
|
|
for f in deps:
|
|
self.assertTrue(os.path.exists(f))
|
|
for f in iter_files_session():
|
|
self.assertIn(f, deps)
|
|
|
|
shutil.rmtree(TEMP_SESSION)
|
|
|
|
def test_empty(self):
|
|
file_name = "testfile.blend"
|
|
blendfile = os.path.join(TEMP_LOCAL, file_name)
|
|
if not blendfile_template_create(blendfile, TEMP_LOCAL, "create_blank", None, []):
|
|
self.fail("blend file couldn't be created")
|
|
return
|
|
|
|
self.assertTrue(os.path.exists(blendfile))
|
|
|
|
|
|
class BamDeleteTest(BamSessionTestCase):
|
|
"""
|
|
Test for the `bam commit` command when files are being deleted.
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_delete(self):
|
|
session_name = "mysession"
|
|
file_name = "testfile.blend"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# now do a real commit
|
|
blendfile = os.path.join(session_path, file_name)
|
|
if not blendfile_template_create(blendfile, session_path, "create_blank", None, []):
|
|
self.fail("blend file couldn't be created")
|
|
return
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "tests message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
del session_path
|
|
|
|
# -----------
|
|
# New Session
|
|
|
|
# checkout the file again
|
|
stdout, stderr = bam_run(["checkout", file_name, "--output", "new_out"], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# now delete the file we just checked out
|
|
session_path = os.path.join(proj_path, "new_out")
|
|
os.remove(os.path.join(session_path, file_name))
|
|
stdout, stderr = bam_run(["commit", "-m", "test deletion"], session_path)
|
|
self.assertEqual("", stderr)
|
|
# check if deletion of the file has happened
|
|
|
|
listing = bam_run_as_json(["ls", "--json"], session_path)
|
|
|
|
# parse the response searching for the file. If it fails it means the file has
|
|
# not been removed
|
|
for e in listing:
|
|
self.assertNotEqual(e[0], file_name)
|
|
|
|
|
|
class BamRelativeAbsoluteTest(BamSessionTestCase):
|
|
"""Create a checkout and commit it into the repository,
|
|
using both absolute & relative paths.
|
|
"""
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def helper_test_from_files(self, blendfile_pair, images):
|
|
"""
|
|
"""
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# create the image files we need
|
|
blendfile_template_create_from_files(
|
|
proj_path, session_path,
|
|
blendfile_pair[0], [f[0] for f in images])
|
|
|
|
# now commit the files
|
|
stdout, stderr = bam_run(["commit", "-m", "commit shot_01"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
del session_path
|
|
|
|
# -----------
|
|
# New Session
|
|
|
|
# checkout the file again
|
|
stdout, stderr = bam_run(["checkout", blendfile_pair[0], "--output", "new_out"], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# now delete the file we just checked out
|
|
session_path = os.path.join(proj_path, "new_out")
|
|
# _dbg_dump_path(session_path)
|
|
|
|
# Now check if all the paths we expected are found!
|
|
for f_proj, f_local in images:
|
|
f_abs = os.path.join(session_path, f_local)
|
|
# assert message isn't so useful
|
|
if VERBOSE:
|
|
print("Exists?", f_abs)
|
|
self.assertTrue(os.path.exists(f_abs))
|
|
|
|
def helper_test_from_liblinks(self, blendfile, liblinks_src, liblinks_dst):
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# create the image files we need
|
|
blendfile_template_create_from_file_liblinks(proj_path, session_path, blendfile, liblinks_src)
|
|
|
|
# now commit the files
|
|
stdout, stderr = bam_run(["commit", "-m", "commit shot_01"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# remove the path
|
|
shutil.rmtree(session_path)
|
|
del session_path
|
|
|
|
# -----------
|
|
# New Session
|
|
|
|
# checkout the file again
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", "new_out"], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# now delete the file we just checked out
|
|
session_path = os.path.join(proj_path, "new_out")
|
|
|
|
# _dbg_dump_path(session_path)
|
|
|
|
# Now check if all the paths we expected are found!
|
|
for f_rel in liblinks_dst:
|
|
f_abs = os.path.join(session_path, f_rel)
|
|
# assert message isn't so useful
|
|
if VERBOSE:
|
|
print("Exists?", f_abs)
|
|
self.assertTrue(os.path.exists(f_abs))
|
|
return proj_path, session_path
|
|
|
|
def test_absolute_relative_images(self):
|
|
"""
|
|
Layout is as follows.
|
|
|
|
- ./shots/01/shot_01.blend
|
|
- ./shots/01/maps/special.png
|
|
- ./maps/generic.png
|
|
|
|
Maps to...
|
|
- ./shot_01.blend
|
|
- ./_maps/special.png
|
|
- ./maps/generic.png
|
|
"""
|
|
|
|
# absolute path: (project relative) -->
|
|
# checkout path: (relative to blend)
|
|
blendfile_pair = ("shots/01/shot_01.blend", "shot_01.blend")
|
|
if 1:
|
|
images = (
|
|
("shots/01/maps/special.png", "maps/special.png"),
|
|
("maps/generic.png", "_maps/generic.png"),
|
|
)
|
|
else:
|
|
images = (
|
|
("shots/01/maps/special.png", "maps/special.png"),
|
|
("maps/generic.png", "__/__/maps/generic.png"),
|
|
)
|
|
|
|
self.helper_test_from_files(blendfile_pair, images)
|
|
|
|
def test_absolute_relative_liblinks(self):
|
|
"""
|
|
Layout is as follows.
|
|
|
|
- ./shots/01/shot_01.blend
|
|
- ./shots/01/maps/special.blend
|
|
- ./maps/generic.blend
|
|
|
|
Maps to...
|
|
- ./shot_01.blend
|
|
- ./maps/special.blend
|
|
- ./_maps/generic.blend
|
|
"""
|
|
# NOTE: test_absolute_relative_from_subdir() test calls this one.
|
|
blendfile = "shots/01/shot_01.blend"
|
|
|
|
blend_shot = "shots/01/shot_01.blend"
|
|
blend_special = "shots/01/maps/special.blend"
|
|
blend_generic = "maps/generic.blend"
|
|
|
|
# absolute path: (project relative) -->
|
|
# checkout path: (relative to blend)
|
|
liblinks_src = (
|
|
(blend_shot, "Scene10", (blend_special,)),
|
|
(blend_special, "MySpecial", (blend_generic,)),
|
|
(blend_generic, "MyGeneric", ()),
|
|
)
|
|
liblinks_dst = (
|
|
"shot_01.blend",
|
|
"maps/special.blend",
|
|
"_maps/generic.blend",
|
|
)
|
|
|
|
return self.helper_test_from_liblinks(blendfile, liblinks_src, liblinks_dst)
|
|
|
|
def test_absolute_relative_from_subdir(self):
|
|
"""
|
|
Layout is as follows.
|
|
|
|
- ./shots/01/shot_01.blend
|
|
- ./shots/01/maps/special.blend
|
|
- ./maps/generic.blend
|
|
|
|
Maps to...
|
|
- ./shot_01.blend
|
|
- ./_maps/special.blend
|
|
- ./maps/generic.blend
|
|
|
|
Now add a file to these directory,
|
|
- ./maps_more/rel.txt
|
|
- ./_maps_more/abs.txt
|
|
|
|
Maps to...
|
|
- ./shots/01/maps_more/rel.txt
|
|
- ./maps_more/abs.txt
|
|
"""
|
|
|
|
# WEAK, set in the test called next
|
|
blendfile = "shots/01/shot_01.blend"
|
|
proj_path, session_path = self.test_absolute_relative_liblinks()
|
|
|
|
shutil.rmtree(session_path)
|
|
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", "new_out"], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
session_path = os.path.join(proj_path, "new_out")
|
|
|
|
# create these new
|
|
file_quick_write(session_path, os.path.join("maps_more", "rel.txt"))
|
|
file_quick_write(session_path, os.path.join("_maps_more", "abs.txt"))
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "new abs and rel files"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
ret = bam_run_as_json(["ls", "--json"], proj_path)
|
|
|
|
ret = bam_run_as_json(["ls", "shots/01/maps_more", "--json"], proj_path)
|
|
self.assertIn(["rel.txt", "file"], ret)
|
|
|
|
ret = bam_run_as_json(["ls", "maps_more", "--json"], proj_path)
|
|
self.assertIn(["abs.txt", "file"], ret)
|
|
|
|
def _test_absolute_relative_from_blendfiles__structure(self, proj_path, session_path):
|
|
# used by both
|
|
# - test_absolute_relative_from_blendfiles()
|
|
# - test_absolute_relative_from_blendfiles_partial()
|
|
#
|
|
shutil.rmtree(session_path)
|
|
|
|
blendfile = os.path.join("subdir", "house_lib_user.blend")
|
|
|
|
# ---- make a new checkout
|
|
|
|
def _check():
|
|
ret = bam_run_as_json(["deps", os.path.join(session_path, os.path.basename(blendfile)), "--json"], proj_path)
|
|
ret.sort()
|
|
|
|
self.assertEqual(ret[0][1], "//" + os.path.join("_abs", "path", "house_abs.blend"))
|
|
self.assertEqual(ret[0][3], "OK")
|
|
self.assertEqual(ret[1][1], "//" + os.path.join("rel", "path", "house_rel.blend"))
|
|
self.assertEqual(ret[1][3], "OK")
|
|
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
_check()
|
|
|
|
# ---- touch and commit
|
|
|
|
file_quick_touch_blend(os.path.join(os.path.join(session_path, "_abs", "path", "house_abs.blend")))
|
|
file_quick_touch_blend(os.path.join(os.path.join(session_path, "rel", "path", "house_rel.blend")))
|
|
file_quick_touch_blend(os.path.join(os.path.join(session_path, os.path.basename(blendfile))))
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "just touched"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
shutil.rmtree(session_path)
|
|
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
_check()
|
|
# _dbg_dump_path(session_path)
|
|
|
|
def test_absolute_relative_from_blendfiles(self):
|
|
"""
|
|
This uses 3x blend files to test multi-level commit, checkout.
|
|
|
|
- ./subdir/house_lib_user.blend
|
|
- ./subdir/rel/path/house_rel.blend
|
|
- ./abs/path/house_abs.blend
|
|
|
|
Maps to...
|
|
- ./house_lib_user.blend
|
|
- ./rel/path/house_rel.blend
|
|
- ./_abs/path/house_abs.blend
|
|
"""
|
|
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
if 1:
|
|
for d in ("abs", "subdir"):
|
|
# path cant already exist, ugh
|
|
shutil.copytree(
|
|
os.path.join(CURRENT_DIR, "blends", "multi_level", d),
|
|
os.path.join(session_path, d),
|
|
)
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
self._test_absolute_relative_from_blendfiles__structure(proj_path, session_path)
|
|
|
|
def test_absolute_relative_from_blendfiles_partial(self):
|
|
"""
|
|
Same as test_absolute_relative_from_blendfiles(),
|
|
but start from a single file commit
|
|
"""
|
|
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
blendfile = os.path.join("subdir", "house_lib_user.blend")
|
|
blendfile_abs = os.path.join(session_path, blendfile)
|
|
|
|
# --------------------------------------------------------------------
|
|
# now do the same test, on a checkout which already _has_ 'subdir/house_lib_user.blend'
|
|
# to begin with, then add in the linked libs afterwards.
|
|
|
|
if not blendfile_template_create(blendfile_abs, session_path, "create_blank", None, []):
|
|
self.fail("blend file couldn't be created")
|
|
self.assertTrue(os.path.exists(blendfile_abs))
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
shutil.rmtree(session_path)
|
|
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# Now write the relative paths into the current checkout,
|
|
# they must now map back to the correct paths.
|
|
|
|
if 1:
|
|
shutil.copytree(
|
|
os.path.join(CURRENT_DIR, "blends", "multi_level", "abs", "path"),
|
|
os.path.join(session_path, "_abs", "path"),
|
|
)
|
|
shutil.copytree(
|
|
os.path.join(CURRENT_DIR, "blends", "multi_level", "subdir", "rel", "path"),
|
|
os.path.join(session_path, "rel", "path"),
|
|
)
|
|
shutil.copy(
|
|
os.path.join(CURRENT_DIR, "blends", "multi_level", "subdir", os.path.basename(blendfile)),
|
|
session_path,
|
|
)
|
|
|
|
# Now store the link as if we made the path locally, manually adding "./_abs/"
|
|
# This test is to show that BAM can remap this back to an absolute dir.
|
|
#
|
|
# XXX, binary search & replace, WEAK!
|
|
with open(os.path.join(session_path, os.path.basename(blendfile)), 'rb') as f:
|
|
data = f.read()
|
|
data = data.replace(
|
|
b'//../abs/path/house_abs.blend\x00',
|
|
b'//_abs/path/house_abs.blend\x00__',
|
|
1)
|
|
with open(os.path.join(session_path, os.path.basename(blendfile)), 'wb') as f:
|
|
f.write(data)
|
|
del data, f
|
|
# XXX (end hack!)
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "new house to remap"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
self._test_absolute_relative_from_blendfiles__structure(proj_path, session_path)
|
|
|
|
# ret = bam_run_as_json(["ls", "abs/path", "--json"], proj_path)
|
|
# self.assertEqual(ret[0], ["house_abs.blend", "file"])
|
|
# ret = bam_run_as_json(["ls", "subdir/rel/path", "--json"], proj_path)
|
|
# self.assertEqual(ret[0], ["house_rel.blend", "file"])
|
|
|
|
def test_absolute_relative_from_blendfiles_texture(self):
|
|
"""
|
|
Texture on library
|
|
"""
|
|
|
|
session_name = "mysession"
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
blendfile = os.path.join("root", "level1", "level2", "level3", "level3.blend")
|
|
|
|
shutil.copytree(
|
|
os.path.join(CURRENT_DIR, "blends", "multi_level_link"),
|
|
os.path.join(session_path, "root"),
|
|
)
|
|
|
|
stdout, stderr = bam_run(["commit", "-m", "multi_level_link"], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
shutil.rmtree(session_path)
|
|
|
|
stdout, stderr = bam_run(["checkout", blendfile, "--output", session_path], proj_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# finally run deps to see the paths are as we expect
|
|
ret = bam_run_as_json(["deps", "level3.blend", "--recursive", "--json"], session_path)
|
|
|
|
self.assertEqual(ret[0][1], "//" + os.path.join("_root", "level1_lib", "level1_lib.blend"))
|
|
self.assertEqual(ret[0][3], "OK")
|
|
self.assertEqual(ret[1][1], "//" + os.path.join("..", "..", "_root", "level1_lib", "level2_lib", "texture.png"))
|
|
self.assertEqual(ret[1][3], "OK")
|
|
|
|
import re
|
|
stdout, stderr = bam_run(["status"], session_path)
|
|
pattern = re.compile("D:|M:")
|
|
changes = pattern.search(stdout) != None
|
|
self.assertEqual(False, changes)
|
|
|
|
shutil.rmtree(session_path)
|
|
|
|
class BamIgnoreTest(BamSessionTestCase):
|
|
"""
|
|
Checks out a project, creates a .bamignore file with a few rules
|
|
and tries to commit files that violate them.
|
|
"""
|
|
def __init__(self, *args):
|
|
self.init_defaults()
|
|
super().__init__(*args)
|
|
|
|
def test_ignore(self):
|
|
session_name = "mysession"
|
|
file_name = "testfile.txt"
|
|
file_data = b"hello world!\n"
|
|
|
|
# Regular expressions for smart people
|
|
file_data_bamignore = (
|
|
r".*\.txt$",
|
|
r".*/subdirectory/.*",
|
|
)
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# write the .bamignore in the session root
|
|
file_quick_write(proj_path, ".bamignore", "\n".join(file_data_bamignore))
|
|
|
|
# create some files
|
|
file_quick_write(session_path, file_name, file_data)
|
|
|
|
subdir_path = os.path.join(session_path, "subdirectory")
|
|
os.makedirs(subdir_path)
|
|
file_quick_write(subdir_path, "testfile.blend1", file_data)
|
|
|
|
# now check for status
|
|
stdout, stderr = bam_run(["status", ], session_path)
|
|
self.assertEqual("", stderr)
|
|
|
|
# try to commit
|
|
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
|
|
self.assertEqual("", stderr)
|
|
self.assertEqual("Nothing to commit!\n", stdout)
|
|
|
|
def test_invalid_ignore(self):
|
|
session_name = "mysession"
|
|
file_name = "testfile.txt"
|
|
file_data = b"hello world!\n"
|
|
|
|
# A failing regex that breaks syntax highlight as nice side effect
|
|
file_data_bamignore = (
|
|
r".*\.txt$",
|
|
r".*\.($", # invalid!
|
|
)
|
|
|
|
proj_path, session_path = self.init_session(session_name)
|
|
|
|
# write the .bamignore in the session root
|
|
file_quick_write(proj_path, ".bamignore", "\n".join(file_data_bamignore))
|
|
|
|
# create some files
|
|
file_quick_write(session_path, file_name, file_data)
|
|
|
|
# now check for status
|
|
self.assertRaises(RuntimeError, bam_run, ["status", ], session_path)
|
|
|
|
|
|
class BamRemapTest(BamSimpleTestCase):
|
|
"""
|
|
Test remapping existing blend files via the 'bam remap' command.
|
|
|
|
note: this doesn't need any bam-session. simply a directory to work in.
|
|
"""
|
|
|
|
@staticmethod
|
|
def remap_path_pair(base, src_dst):
|
|
return ("%s -> %s" % (
|
|
os.path.join(base, src_dst[0]).encode('utf-8'),
|
|
os.path.join(base, src_dst[1]).encode('utf-8'),
|
|
))
|
|
|
|
def test_remap_empty(self):
|
|
subdir_path = os.path.join(TEMP_LOCAL, "my_remap")
|
|
os.makedirs(subdir_path)
|
|
|
|
ret = bam_run_as_json(["remap", "start", "--json"], subdir_path)
|
|
self.assertEqual(["nothing to remap!"], ret)
|
|
|
|
def test_remap_simple(self):
|
|
subdir_path = os.path.join(TEMP_LOCAL, "my_remap")
|
|
subdir_path_sub = os.path.join(TEMP_LOCAL, "my_remap", "sub")
|
|
os.makedirs(subdir_path_sub)
|
|
|
|
# DUMMY VALUES (we don't really need)
|
|
proj_path = subdir_path
|
|
session_path = subdir_path_sub
|
|
|
|
# absolute path: (project relative) -->
|
|
# checkout path: (relative to blend)
|
|
blendfile_pair = ("shots/01/shot_01.blend", "new/deeply/nested/path/testme.blend")
|
|
images = (
|
|
("maps/generic.png", "foobar/another.png"),
|
|
("shots/01/maps/special.png", "blah/image.png"),
|
|
)
|
|
|
|
blendfile_template_create_from_files(
|
|
proj_path, session_path,
|
|
blendfile_pair[0], [f[0] for f in images])
|
|
|
|
blendfile_pair_abs = (
|
|
os.path.join(session_path, blendfile_pair[0]),
|
|
os.path.join(session_path, blendfile_pair[1]),
|
|
)
|
|
|
|
ret_expect = [
|
|
['info', "blend read: %s" % blendfile_pair_abs[0].encode()],
|
|
'complete',
|
|
]
|
|
ret = bam_run_as_json(["remap", "start", "--json"], session_path)
|
|
self.assertEqual(ret_expect, ret)
|
|
|
|
for f_pair in ((blendfile_pair,) + images):
|
|
f_src = os.path.join(session_path, f_pair[0])
|
|
f_dst = os.path.join(session_path, f_pair[1])
|
|
os.makedirs(os.path.dirname(f_dst), exist_ok=True)
|
|
shutil.move(f_src, f_dst)
|
|
|
|
ret_expect = [
|
|
["info", "blend write: %s" % BamRemapTest.remap_path_pair(session_path, blendfile_pair)],
|
|
["info", "remap %s" % BamRemapTest.remap_path_pair(session_path, images[0])],
|
|
["info", "remap %s" % BamRemapTest.remap_path_pair(session_path, images[1])],
|
|
"complete",
|
|
]
|
|
ret = bam_run_as_json(["remap", "finish", "--json", "--dry-run"], session_path)
|
|
self.assertEqual(ret_expect, ret)
|
|
ret = bam_run_as_json(["remap", "finish", "--json"], session_path)
|
|
self.assertEqual(ret_expect, ret)
|
|
|
|
# finally run deps to see the paths are as we expect
|
|
ret = bam_run_as_json(["deps", blendfile_pair_abs[1], "--json"], session_path)
|
|
self.assertEqual(ret[0][1], "//" + os.path.join("..", "..", "..", "..", images[0][1]))
|
|
self.assertEqual(ret[0][3], "OK")
|
|
self.assertEqual(ret[1][1], "//" + os.path.join("..", "..", "..", "..", images[1][1]))
|
|
self.assertEqual(ret[1][3], "OK")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
data = global_setup()
|
|
unittest.main(exit=False)
|
|
global_teardown(data)
|