Improve tests

- enable logging when VERBOSE is set
- now possible to run single tests from the commandline
- include a test for commit and checkout
This commit is contained in:
2014-11-19 14:09:14 +01:00
parent b404a22843
commit 0a2b4886c1

View File

@@ -3,8 +3,18 @@
"""
Test bam command line client
Run all tests:
python3 test_cli.py
Run a single test:
python3 -m unittest test_cli.BamCommitTest.test_checkout
"""
VERBOSE = 0
# ------------------
# Ensure module path
import os
@@ -34,8 +44,9 @@ if 1:
def exit(status):
import io
globals().update(sys.exit.exit_data)
if isinstance(sys.stdout, io.StringIO):
globals().update(sys.exit.exit_data)
_stdout.write("\nsys.exit(%d) with message:\n" % status)
sys.stdout.seek(0)
@@ -85,7 +96,8 @@ PROJECT_NAME = "test_project"
def run(cmd, cwd=None):
# print(">>> ", " ".join(cmd))
if VERBOSE:
print(">>> ", " ".join(cmd))
import subprocess
kwargs = dict(
stderr=subprocess.PIPE,
@@ -147,8 +159,8 @@ def svn_repo_create(id_, dirname):
run(["svnadmin", "create", id_], cwd=dirname)
def svn_repo_checkout(path):
run(["svn", "checkout", path])
def svn_repo_checkout(repo, path):
run(["svn", "checkout", repo, path])
def svn_repo_populate(path):
@@ -159,12 +171,11 @@ def svn_repo_populate(path):
def bam_run(argv, cwd=None):
with CHDir(cwd):
import bam
if 1:
sys.stdout.write(" running: ")
if VERBOSE:
sys.stdout.write("\n running: ")
if cwd is not None:
sys.stdout.write("cd %r ; " % cwd)
sys.stdout.write("bam %s\n" % " ".join(argv))
@@ -174,11 +185,45 @@ def bam_run(argv, cwd=None):
bam.main(argv)
ret = fakeio.read()
if VERBOSE:
sys.stdout.write(" stdout: %s\n" % ret[0].strip())
sys.stdout.write(" stderr: %s\n" % ret[1].strip())
return ret
# if __name__ == "__main__":
# main()
def file_quick_write(path, filepart=None, data=None):
"""Quick file creation utility.
"""
if data is None:
data = b''
elif type(data) is bytes:
mode = 'wb'
elif type(data) is str:
mode = 'w'
else:
raise Exception("type %r not known" % type(data))
if filepart is not None:
path = os.path.join(path, filepart)
with open(path, mode) as f:
f.write(data)
def file_quick_read(path, filepart=None, mode='rb'):
if filepart is not None:
path = os.path.join(path, filepart)
with open(path, mode) as f:
return f.read(data)
def wait_for_input():
"""for debugging,
so we can inspect the state of the system before the test finished.
"""
input('press any key to continue:')
# ------------------------------------------------------------------------------
# Server
@@ -191,11 +236,12 @@ def server(mode='testing', debug=False):
def run_testing_server():
from application import app
# If we run the server in testing mode (the default) we override sqlite database,
# with a testing, disposable one (create TMP dir)
if mode == 'testing':
from application import db
from application.modules.projects.model import Project
from application.modules.projects.model import Project, ProjectSetting
# Override sqlite database
if not os.path.isdir(TEMP_SERVER):
os.makedirs(TEMP_SERVER)
@@ -210,10 +256,29 @@ def server(mode='testing', debug=False):
name=PROJECT_NAME,
repository_path="/tmp/bam_test/remote_store/svn_checkout",
upload_path="/tmp/bam_test/remote_store/upload",
status="active"
status="active",
)
db.session.add(project)
db.session.commit()
setting = ProjectSetting(
project_id=project.id,
name="svn_password",
value="my_password",
data_type="str",
)
db.session.add(setting)
db.session.commit()
setting = ProjectSetting(
project_id=project.id,
name="svn_default_user",
value="my_user",
data_type="str",
)
db.session.add(setting)
db.session.commit()
# Run the app in production mode (prevents tests to run twice)
app.run(port=PORT, debug=debug)
@@ -225,6 +290,19 @@ def server(mode='testing', debug=False):
return p
def global_setup():
shutil.rmtree(TEMP_SERVER, ignore_errors=True)
p = server()
data = p
return data
def global_teardown(data):
p = data
p.terminate()
shutil.rmtree(TEMP_SERVER, ignore_errors=True)
# ------------------------------------------------------------------------------
# Unit Tests
@@ -234,6 +312,11 @@ import unittest
class BamSessionTestCase(unittest.TestCase):
def setUp(self):
# for running single tests
if __name__ != "__main__":
self._data = global_setup()
if not os.path.isdir(TEMP):
os.makedirs(TEMP)
# Create local storage folder
@@ -254,20 +337,22 @@ class BamSessionTestCase(unittest.TestCase):
svn_repo_create(self.proj_name, path_svn_repo)
# Check for SVN checkout
path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout", self.proj_name)
if not os.path.isdir(path_svn_checkout):
os.makedirs(path_svn_checkout)
path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout")
# Create an SVN checkout of the freshly created repo
svn_repo_checkout(path_svn_checkout)
svn_repo_checkout("file://%s" % os.path.join(path_svn_repo, self.proj_name), path_svn_checkout)
# Pupulate the repo with an empty file
svn_repo_populate(path_svn_checkout)
svn_repo_populate(os.path.join(path_svn_checkout, self.proj_name))
def tearDown(self):
# input('Wait:')
shutil.rmtree(TEMP)
# for running single tests
if __name__ != "__main__":
global_teardown(self._data)
def get_url(self):
url_full = "%s@%s/%s" % (self.user_name, self.server_addr, self.proj_name)
user_name, url = url_full.rpartition('@')[0::2]
@@ -337,23 +422,66 @@ class BamCommitTest(BamSessionTestCase):
self.init_defaults()
super().__init__(*args)
def test_ls(self):
def test_commit(self):
self.init_repo()
file_data = b"hello world!\n"
d = os.path.join(self.path_local_store, self.proj_name)
proj_path = os.path.join(self.path_local_store, self.proj_name)
co_id = "mysession"
session_path = os.path.join(proj_path, co_id)
stdout, stderr = bam_run(["create", "--json"], d)
stdout, stderr = bam_run(["create", co_id], proj_path)
self.assertEqual("", stderr)
import json
ret = json.loads(stdout)
# check an empty commit fails gracefully
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
self.assertEqual("", stderr)
self.assertEqual("Nothing to commit!\n", stdout)
# now do a real commit
file_quick_write(session_path, "testfile.txt", file_data)
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
self.assertEqual("", stderr)
def test_checkout(self):
self.init_repo()
file_data = b"hello world!\n"
proj_path = os.path.join(self.path_local_store, self.proj_name)
co_id = "mysession"
session_path = os.path.join(proj_path, co_id)
stdout, stderr = bam_run(["create", co_id], proj_path)
self.assertEqual("", stderr)
# now do a real commit
file_quick_write(session_path, "testfile.txt", file_data)
stdout, stderr = bam_run(["commit", "-m", "test message"], session_path)
self.assertEqual("", stderr)
# remove the path
shutil.rmtree(session_path)
# checkout the file again
stdout, stderr = bam_run(["checkout", "testfile.txt"], proj_path)
self.assertEqual("", stderr)
print(os.path.join(proj_path, "testfile.txt"))
return
wait_for_input()
self.assertEqual(True, os.path.exists(os.path.join(proj_path, "testfile.txt")))
file_data_test = file_quick_read(os.path.join(proj_path, "testfile.txt"))
self.assertEqual(file_data, file_data_test)
if __name__ == '__main__':
p = server()
unittest.main(exit=False)
p.terminate()
if VERBOSE:
# for server
import logging
logging.basicConfig(level=logging.DEBUG)
del logging
data = global_setup()
unittest.main(exit=False)
global_teardown(data)
shutil.rmtree(TEMP_SERVER, ignore_errors=True)