From 1b2c64188416d789dcc02c7e94e9bc04b2e3fa96 Mon Sep 17 00:00:00 2001 From: Francesco Siddi Date: Sat, 20 Dec 2014 18:21:39 +0100 Subject: [PATCH] Web service tests and bundle command --- tests/test_webserver.py | 204 ++++++++++++++++++ .../application/modules/resources/__init__.py | 54 ++++- 2 files changed, 255 insertions(+), 3 deletions(-) create mode 100644 tests/test_webserver.py diff --git a/tests/test_webserver.py b/tests/test_webserver.py new file mode 100644 index 0000000..b1a7c22 --- /dev/null +++ b/tests/test_webserver.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 + +""" +Welcome to the webservice test suite. Simply run python test_webservice.py and check +that all tests pass. + +Individual tests can be run with the following syntax: + + python tests.py ServerTestCase.test_job_delete + +""" + +import os +import sys +from base64 import b64encode +from werkzeug import Headers + +# ------------------ +# Ensure module path +import os +import sys +path = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "webservice", "bam")) +if path not in sys.path: + sys.path.append(path) +del path + +TMP_DIR = "/tmp/bam_webservice_test" +PROJECT_NAME = "mytestproject" + +from application import app +from application import db + +from test_cli import svn_repo_create +from test_cli import svn_repo_checkout +from test_cli import file_quick_touch +from test_cli import run_check +from test_cli import wait_for_input + +from application.modules.projects.model import Project +from application.modules.projects.model import ProjectSetting + +import unittest +import tempfile +import json +import shutil + + +class ServerTestingUtils: + + def add_project(self, is_active=True): + project = Project( + name='Caminandes') + db.session.add(project) + db.session.commit() + + if is_active: + setting = Setting( + name='active_project', + value=str(project.id)) + db.session.add(setting) + db.session.commit() + return project.id + + +class ServerTestCase(unittest.TestCase): + + utils = ServerTestingUtils() + + def setUp(self): + + if not os.path.isdir(TMP_DIR): + os.makedirs(TMP_DIR) + + # Create remote storage (usually is on the server). + # SVN repo and SVN checkout will live here + if not os.path.isdir(self.path_remote_store): + os.makedirs(self.path_remote_store) + + # Check for SVN repo folder + path_svn_repo = os.path.join(self.path_remote_store, "svn_repo") + if not os.path.isdir(path_svn_repo): + os.makedirs(path_svn_repo) + + # Create a fresh SVN repository + if not svn_repo_create(self.proj_name, path_svn_repo): + self.fail("svn_repo: create") + + # Check for SVN checkout + path_svn_checkout = os.path.join(self.path_remote_store, "svn_checkout") + + # Create an SVN checkout of the freshly created repo + path_svn_repo_url = "file://%s" % os.path.join(path_svn_repo, self.proj_name) + if not svn_repo_checkout(path_svn_repo_url, path_svn_checkout): + self.fail("svn_repo: checkout %r" % path_svn_repo_url) + + dummy_file = os.path.join(path_svn_checkout, "file1") + file_quick_touch(dummy_file) + + # adds all files recursively + if not run_check(["svn", "add", dummy_file]): + return False + + if not run_check(["svn", "commit", "-m", "First commit"], path_svn_checkout): + return False + + + app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///'+ TMP_DIR +'/test.sqlite' + app.config['TESTING'] = True + db.create_all() + # Create a testing project, based on the global configuration (depends on a + # correct initialization of the SVN repo and on the creation of a checkout) + + # TODO(fsiddi): turn these values in variables + project = Project( + name=PROJECT_NAME, + repository_path=os.path.join(TMP_DIR, "remote_store/svn_checkout"), + upload_path=os.path.join(TMP_DIR, "remote_store/upload"), + status="active", + ) + db.session.add(project) + db.session.commit() + + setting = ProjectSetting( + project_id=project.id, + name="svn_password", + value="my_password", + data_type="str", + ) + db.session.add(setting) + db.session.commit() + + setting = ProjectSetting( + project_id=project.id, + name="svn_default_user", + value="my_user", + data_type="str", + ) + db.session.add(setting) + db.session.commit() + self.app = app.test_client() + + def tearDown(self): + shutil.rmtree(TMP_DIR) + + def init_defaults(self): + self.path_remote_store = os.path.join(TMP_DIR, "remote_store") + self.proj_name = PROJECT_NAME + + + def open_with_auth(self, url, method, data=None): + a = b64encode(b"username:").decode('ascii') + + if method == 'GET': + args = ['?',] + for k, v in data.items(): + args.append(k + '=' + v + '&') + url = url + ''.join(args) + + return self.app.open(url, + method=method, + headers={ + 'Authorization': 'Basic ' + str(a) + }, + data=data, + ) + + +class ServerUsageTest(ServerTestCase): + def __init__(self, *args): + self.init_defaults() + super().__init__(*args) + + def test_directory_browse(self): + res = self.open_with_auth('/{0}/file_list'.format(PROJECT_NAME), + 'GET', + data=dict(path='')) + + assert res.status_code == 200 + d = json.loads(res.data.decode('utf-8')) + # print(d) + + + def test_file_info(self): + res = self.open_with_auth('/{0}/file'.format(PROJECT_NAME), + 'GET', + data=dict(filepath='file1', command='info')) + + assert res.status_code == 200 + f = json.loads(res.data.decode('utf-8')) + # print(f['size']) + + + def test_file_bundle(self): + res = self.open_with_auth('/{0}/file'.format(PROJECT_NAME), + 'GET', + data=dict(filepath='file1', command='bundle')) + + assert res.status_code == 200 + f = json.loads(res.data.decode('utf-8')) + print(f['filepath']) + + +if __name__ == '__main__': + unittest.main() diff --git a/webservice/bam/application/modules/resources/__init__.py b/webservice/bam/application/modules/resources/__init__.py index ec41fb1..dda909f 100644 --- a/webservice/bam/application/modules/resources/__init__.py +++ b/webservice/bam/application/modules/resources/__init__.py @@ -30,7 +30,6 @@ from application.modules.projects.model import Project from application.modules.projects.model import ProjectSetting - class DirectoryAPI(Resource): """Displays list of files.""" @@ -118,14 +117,63 @@ class FileAPI(Resource): project = Project.query.filter_by(name=project_name).first() if command == 'info': - r = svn.local.LocalClient(project.repository_path) + r = svn.local.LocalClient(project.repository_path) svn_log = r.log_default(None, None, 5, filepath) svn_log = [l for l in svn_log] + size = os.path.getsize(os.path.join(r.path, filepath)) + return jsonify( filepath=filepath, - log=svn_log) + log=svn_log, + size=size) + + elif command == 'bundle': + filepath = os.path.join(project.repository_path, filepath) + + if not os.path.exists(filepath): + return jsonify(message="Path not found %r" % filepath) + elif os.path.isdir(filepath): + return jsonify(message="Path is a directory %r" % filepath) + + def bundle(): + def report(txt): + pass + + # pack the file! + import tempfile + + # weak! (ignore original opened file) + filepath_zip = tempfile.mkstemp(suffix=".zip") + os.close(filepath_zip[0]) + filepath_zip = filepath_zip[1] + + # subprocess here + for r in self.pack_fn( + filepath, filepath_zip, + project.repository_path, + True, + report, + ): + pass + # once done, send a message to the cloud and mark the download as available + # we will send the download form the cloud server + # return jsonify(filepath=filepath_zip) + + # Check in database if file has been requested already + + # Check if archive is available on the filesystem + + # If file not avaliable, start the bundling and return a None filepath, + # which the cloud will interpret as, no file is available at the moment + + p = Process(target=bundle,) + p.start() + + filepath=None + + return jsonify(filepath=filepath, status="building") elif command == 'checkout': filepath = os.path.join(project.repository_path, filepath)