diff --git a/tests/test_cli.py b/tests/test_cli.py index 38e21f8..155d9b4 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,8 +3,18 @@ """ Test bam command line client + +Run all tests: + + python3 test_cli.py + +Run a single test: + + python3 -m unittest test_cli.BamCommitTest.test_checkout """ +VERBOSE = 0 + # ------------------ # Ensure module path import os @@ -34,8 +44,9 @@ if 1: def exit(status): import io + globals().update(sys.exit.exit_data) if isinstance(sys.stdout, io.StringIO): - globals().update(sys.exit.exit_data) + _stdout.write("\nsys.exit(%d) with message:\n" % status) sys.stdout.seek(0) @@ -85,7 +96,8 @@ PROJECT_NAME = "test_project" def run(cmd, cwd=None): - # print(">>> ", " ".join(cmd)) + if VERBOSE: + print(">>> ", " ".join(cmd)) import subprocess kwargs = dict( stderr=subprocess.PIPE, @@ -147,8 +159,8 @@ def svn_repo_create(id_, dirname): run(["svnadmin", "create", id_], cwd=dirname) -def svn_repo_checkout(path): - run(["svn", "checkout", path]) +def svn_repo_checkout(repo, path): + run(["svn", "checkout", repo, path]) def svn_repo_populate(path): @@ -159,12 +171,11 @@ def svn_repo_populate(path): def bam_run(argv, cwd=None): - with CHDir(cwd): import bam - if 1: - sys.stdout.write(" running: ") + if VERBOSE: + sys.stdout.write("\n running: ") if cwd is not None: sys.stdout.write("cd %r ; " % cwd) sys.stdout.write("bam %s\n" % " ".join(argv)) @@ -174,11 +185,45 @@ def bam_run(argv, cwd=None): bam.main(argv) ret = fakeio.read() + if VERBOSE: + sys.stdout.write(" stdout: %s\n" % ret[0].strip()) + sys.stdout.write(" stderr: %s\n" % ret[1].strip()) + return ret -# if __name__ == "__main__": -# main() +def file_quick_write(path, filepart=None, data=None): + """Quick file creation utility. + """ + if data is None: + data = b'' + elif type(data) is bytes: + mode = 'wb' + elif type(data) is str: + mode = 'w' + else: + raise Exception("type %r not known" % type(data)) + + if filepart is not None: + path = os.path.join(path, filepart) + + with open(path, mode) as f: + f.write(data) + +def file_quick_read(path, filepart=None, mode='rb'): + + if filepart is not None: + path = os.path.join(path, filepart) + + with open(path, mode) as f: + return f.read(data) + + +def wait_for_input(): + """for debugging, + so we can inspect the state of the system before the test finished. + """ + input('press any key to continue:') # ------------------------------------------------------------------------------ # Server @@ -191,11 +236,12 @@ def server(mode='testing', debug=False): def run_testing_server(): from application import app + # If we run the server in testing mode (the default) we override sqlite database, # with a testing, disposable one (create TMP dir) if mode == 'testing': from application import db - from application.modules.projects.model import Project + from application.modules.projects.model import Project, ProjectSetting # Override sqlite database if not os.path.isdir(TEMP_SERVER): os.makedirs(TEMP_SERVER) @@ -210,10 +256,29 @@ def server(mode='testing', debug=False): name=PROJECT_NAME, repository_path="/tmp/bam_test/remote_store/svn_checkout", upload_path="/tmp/bam_test/remote_store/upload", - status="active" + status="active", ) db.session.add(project) db.session.commit() + + setting = ProjectSetting( + project_id=project.id, + name="svn_password", + value="my_password", + data_type="str", + ) + db.session.add(setting) + db.session.commit() + + setting = ProjectSetting( + project_id=project.id, + name="svn_default_user", + value="my_user", + data_type="str", + ) + db.session.add(setting) + db.session.commit() + # Run the app in production mode (prevents tests to run twice) app.run(port=PORT, debug=debug) @@ -225,6 +290,19 @@ def server(mode='testing', debug=False): return p +def global_setup(): + shutil.rmtree(TEMP_SERVER, ignore_errors=True) + p = server() + data = p + return data + + +def global_teardown(data): + p = data + p.terminate() + shutil.rmtree(TEMP_SERVER, ignore_errors=True) + + # ------------------------------------------------------------------------------ # Unit Tests @@ -234,6 +312,11 @@ import unittest class BamSessionTestCase(unittest.TestCase): def setUp(self): + + # for running single tests + if __name__ != "__main__": + self._data = global_setup() + if not os.path.isdir(TEMP): os.makedirs(TEMP) # Create local storage folder @@ -254,20 +337,22 @@ class BamSessionTestCase(unittest.TestCase): svn_repo_create(self.proj_name, path_svn_repo) # Check for SVN checkout - path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout", self.proj_name) - if not os.path.isdir(path_svn_checkout): - os.makedirs(path_svn_checkout) + path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout") # Create an SVN checkout of the freshly created repo - svn_repo_checkout(path_svn_checkout) + svn_repo_checkout("file://%s" % os.path.join(path_svn_repo, self.proj_name), path_svn_checkout) # Pupulate the repo with an empty file - svn_repo_populate(path_svn_checkout) + svn_repo_populate(os.path.join(path_svn_checkout, self.proj_name)) def tearDown(self): # input('Wait:') shutil.rmtree(TEMP) + # for running single tests + if __name__ != "__main__": + global_teardown(self._data) + def get_url(self): url_full = "%s@%s/%s" % (self.user_name, self.server_addr, self.proj_name) user_name, url = url_full.rpartition('@')[0::2] @@ -337,23 +422,66 @@ class BamCommitTest(BamSessionTestCase): self.init_defaults() super().__init__(*args) - def test_ls(self): + def test_commit(self): self.init_repo() + file_data = b"hello world!\n" - d = os.path.join(self.path_local_store, self.proj_name) + proj_path = os.path.join(self.path_local_store, self.proj_name) co_id = "mysession" + session_path = os.path.join(proj_path, co_id) - stdout, stderr = bam_run(["create", "--json"], d) - + stdout, stderr = bam_run(["create", co_id], proj_path) self.assertEqual("", stderr) - import json - ret = json.loads(stdout) + # check an empty commit fails gracefully + stdout, stderr = bam_run(["commit", "-m", "test message"], session_path) + self.assertEqual("", stderr) + self.assertEqual("Nothing to commit!\n", stdout) + + # now do a real commit + file_quick_write(session_path, "testfile.txt", file_data) + stdout, stderr = bam_run(["commit", "-m", "test message"], session_path) + self.assertEqual("", stderr) + + def test_checkout(self): + self.init_repo() + file_data = b"hello world!\n" + + proj_path = os.path.join(self.path_local_store, self.proj_name) + co_id = "mysession" + session_path = os.path.join(proj_path, co_id) + + stdout, stderr = bam_run(["create", co_id], proj_path) + self.assertEqual("", stderr) + + # now do a real commit + file_quick_write(session_path, "testfile.txt", file_data) + stdout, stderr = bam_run(["commit", "-m", "test message"], session_path) + self.assertEqual("", stderr) + + # remove the path + shutil.rmtree(session_path) + + # checkout the file again + stdout, stderr = bam_run(["checkout", "testfile.txt"], proj_path) + self.assertEqual("", stderr) + print(os.path.join(proj_path, "testfile.txt")) + return + wait_for_input() + self.assertEqual(True, os.path.exists(os.path.join(proj_path, "testfile.txt"))) + + file_data_test = file_quick_read(os.path.join(proj_path, "testfile.txt")) + self.assertEqual(file_data, file_data_test) if __name__ == '__main__': - p = server() - unittest.main(exit=False) - p.terminate() + if VERBOSE: + # for server + import logging + logging.basicConfig(level=logging.DEBUG) + del logging + + data = global_setup() + unittest.main(exit=False) + global_teardown(data) - shutil.rmtree(TEMP_SERVER, ignore_errors=True)