Introducing a real testing server
This commit is contained in:
@@ -35,8 +35,11 @@ import sys
|
||||
import shutil
|
||||
import json
|
||||
|
||||
TEMP = "/tmp/test"
|
||||
TEMP = "/tmp/bam_test"
|
||||
# Separate tmp folder for server, since we don't reset the server at every test
|
||||
TEMP_SERVER = "/tmp/bam_test_server"
|
||||
PORT = 5555
|
||||
PROJECT_NAME = "test_project"
|
||||
|
||||
def run(cmd, cwd=None):
|
||||
# print(">>> ", " ".join(cmd))
|
||||
@@ -98,6 +101,8 @@ class StdIO:
|
||||
def svn_repo_create(id_, dirname):
|
||||
run(["svnadmin", "create", id_], cwd=dirname)
|
||||
|
||||
def svn_repo_checkout(path, dirname):
|
||||
run(["svnadmin", "checkout", path, dirname])
|
||||
|
||||
def bam_run(argv, cwd=None):
|
||||
|
||||
@@ -117,16 +122,42 @@ def bam_run(argv, cwd=None):
|
||||
# Server
|
||||
|
||||
|
||||
def server():
|
||||
def server(mode='testing', debug=False):
|
||||
"""Start development server via Flask app.run() in a separate thread. We need server
|
||||
to run in order to check most of the client commands.
|
||||
"""
|
||||
import threading
|
||||
|
||||
def _():
|
||||
def run_testing_server():
|
||||
from application import app
|
||||
app.run(port=PORT, debug=False)
|
||||
# If we run the server in testing mode (the default) we override sqlite database,
|
||||
# with a testing, disposable one (create TMP dir)
|
||||
if mode == 'testing':
|
||||
from application import db
|
||||
from application.modules.projects.model import Project, ProjectSetting
|
||||
# Override sqlite database
|
||||
if not os.path.isdir(TEMP_SERVER):
|
||||
os.makedirs(TEMP_SERVER)
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + TEMP_SERVER + '/bam_test.db'
|
||||
# Use the model definitions to create all the tables
|
||||
db.create_all()
|
||||
# Create a testing project, based on the global configuration (depends on a
|
||||
# correct initialization of the SVN repo and on the creation of a checkout)
|
||||
|
||||
# TODO(fsiddi): turn these values in variables
|
||||
project = Project(
|
||||
name=PROJECT_NAME,
|
||||
repository_path="/tmp/bam_test/remote_store/svn_checkout",
|
||||
upload_path="/tmp/bam_test/remote_store/upload",
|
||||
status="active"
|
||||
)
|
||||
db.session.add(project)
|
||||
db.session.commit()
|
||||
# Run the app in production mode (prevents tests to run twice)
|
||||
app.run(port=PORT, debug=debug)
|
||||
|
||||
from multiprocessing import Process
|
||||
p = Process(target=_, args=())
|
||||
p = Process(target=run_testing_server, args=())
|
||||
p.start()
|
||||
|
||||
os.system("sleep 1")
|
||||
@@ -144,14 +175,30 @@ class BamSessionTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if not os.path.isdir(TEMP):
|
||||
os.makedirs(TEMP)
|
||||
# Create local storage folder
|
||||
if not os.path.isdir(self.path_local_store):
|
||||
os.makedirs(self.path_local_store)
|
||||
|
||||
if not os.path.isdir(self.path_repo):
|
||||
os.makedirs(self.path_repo)
|
||||
# Create remote storage (usually is on the server).
|
||||
# SVN repo and SVN checkout will live here
|
||||
if not os.path.isdir(self.path_remote_store):
|
||||
os.makedirs(self.path_remote_store)
|
||||
|
||||
if not os.path.isdir(self.path_remote):
|
||||
os.makedirs(self.path_remote)
|
||||
# Check for SVN repo folder
|
||||
path_svn_repo = os.path.join(self.path_remote_store, "svn_repo")
|
||||
if not os.path.isdir(path_svn_repo):
|
||||
os.makedirs(path_svn_repo)
|
||||
|
||||
svn_repo_create(self.proj_name, self.path_repo)
|
||||
# Create a fresh SVN repository
|
||||
svn_repo_create(self.proj_name, path_svn_repo)
|
||||
|
||||
# Check for SVN checkout
|
||||
path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout")
|
||||
if not os.path.isdir(path_svn_checkout):
|
||||
os.makedirs(path_svn_checkout)
|
||||
|
||||
# Create an SVN checkout of the freshly created repo
|
||||
svn_repo_checkout(path_svn_checkout, self.proj_name)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(TEMP)
|
||||
@@ -162,16 +209,16 @@ class BamSessionTestCase(unittest.TestCase):
|
||||
return url_full, user_name, url
|
||||
|
||||
def init_defaults(self):
|
||||
self.path_repo = os.path.join(TEMP, "remote_store")
|
||||
self.path_remote = os.path.join(TEMP, "local_store")
|
||||
self.path_local_store = os.path.join(TEMP, "local_store")
|
||||
self.path_remote_store = os.path.join(TEMP, "remote_store")
|
||||
|
||||
self.proj_name = "test"
|
||||
self.proj_name = PROJECT_NAME
|
||||
self.user_name = "user"
|
||||
self.server_addr = "http://localhost:%s" % PORT
|
||||
|
||||
def init_repo(self):
|
||||
url_full, user_name, url = self.get_url()
|
||||
bam_run(["init", url_full], self.path_remote)
|
||||
bam_run(["init", url_full], self.path_local_store)
|
||||
|
||||
|
||||
class BamInitTest(BamSessionTestCase):
|
||||
@@ -188,7 +235,7 @@ class BamInitTest(BamSessionTestCase):
|
||||
self.init_repo()
|
||||
|
||||
url_full, user_name, url = self.get_url()
|
||||
with open(os.path.join(self.path_remote, self.proj_name, ".bam", "config")) as f:
|
||||
with open(os.path.join(self.path_local_store, self.proj_name, ".bam", "config")) as f:
|
||||
cfg = json.load(f)
|
||||
self.assertEqual(url, cfg["url"])
|
||||
self.assertEqual(user_name, cfg["user"])
|
||||
@@ -206,7 +253,8 @@ class BamListTest(BamSessionTestCase):
|
||||
def test_ls(self):
|
||||
self.init_repo()
|
||||
|
||||
d = os.path.join(self.path_remote, self.proj_name)
|
||||
d = os.path.join(self.path_local_store, self.proj_name)
|
||||
|
||||
stdout, stderr = bam_run(["ls", "--json"], d)
|
||||
|
||||
self.assertEqual("", stderr)
|
||||
@@ -219,4 +267,5 @@ if __name__ == '__main__':
|
||||
p = server()
|
||||
unittest.main(exit=False)
|
||||
p.terminate()
|
||||
shutil.rmtree(TEMP_SERVER)
|
||||
|
||||
|
Reference in New Issue
Block a user