import responses from bson import ObjectId import pillarsdk import pillar.tests import pillar.auth import pillar.tests.common_test_data as ctd from abstract_attract_test import AbstractAttractTest class TaskWorkflowTest(AbstractAttractTest): def setUp(self, **kwargs): AbstractAttractTest.setUp(self, **kwargs) self.mngr = self.app.pillar_extensions['attract'].task_manager self.proj_id, self.project = self.ensure_project_exists() self.sdk_project = pillarsdk.Project(pillar.tests.mongo_to_sdk(self.project)) @responses.activate def test_create_task(self): with self.app.test_request_context(): # Log in as project admin user pillar.auth.login_user(ctd.EXAMPLE_PROJECT_OWNER_ID) self.mock_blenderid_validate_happy() task = self.mngr.create_task(self.sdk_project) self.assertIsNotNone(task) # Test directly with MongoDB with self.app.test_request_context(): nodes_coll = self.app.data.driver.db['nodes'] found = nodes_coll.find_one(ObjectId(task['_id'])) self.assertIsNotNone(found)