diff --git a/pillar/api/nodes/__init__.py b/pillar/api/nodes/__init__.py index b55f0c31..cddf1f8d 100644 --- a/pillar/api/nodes/__init__.py +++ b/pillar/api/nodes/__init__.py @@ -365,6 +365,10 @@ def nodes_set_default_picture(nodes): node_set_default_picture(node) +def before_deleting_node(node: dict): + check_permissions('nodes', node, 'DELETE') + + def after_deleting_node(item): from pillar.celery import algolia_tasks algolia_tasks.algolia_index_node_delete.delay(str(item['_id'])) @@ -415,6 +419,7 @@ def setup_app(app, url_prefix): app.on_update_nodes += convert_markdown + app.on_delete_item_nodes += before_deleting_node app.on_deleted_item_nodes += after_deleting_node app.register_api_blueprint(blueprint, url_prefix=url_prefix) diff --git a/pillar/auth/__init__.py b/pillar/auth/__init__.py index f3962649..732ed85f 100644 --- a/pillar/auth/__init__.py +++ b/pillar/auth/__init__.py @@ -80,7 +80,7 @@ def config_login_manager(app): return login_manager -def login_user(oauth_token, *, load_from_db=False): +def login_user(oauth_token: str, *, load_from_db=False): """Log in the user identified by the given token.""" if load_from_db: diff --git a/tests/test_api/test_auth.py b/tests/test_api/test_auth.py index f8328698..51a0d52d 100644 --- a/tests/test_api/test_auth.py +++ b/tests/test_api/test_auth.py @@ -484,6 +484,45 @@ class PermissionComputationTest(AbstractPillarTest): 'world': ['GET']}, self.sort(compute_aggr_permissions('nodes', node, None))) + def test_delete_node(self): + self.enter_app_context() + + proj_id, proj = self.ensure_project_exists() + self.create_user(user_id=24 * 'a', roles={'subscriber'}, + groups=[ctd.EXAMPLE_PROJECT_OWNER_ID]) + + node = copy.deepcopy(ctd.EXAMPLE_NODE) + node['project'] = proj_id + node_id = self.create_node(node) + + # Try deletion by a user who is not part of the project. + self.create_user(user_id=6 * 'dafe', roles={'subscriber'}, token='dafe-token') + self.delete(f'/api/nodes/{node_id}', + auth_token='dafe-token', + etag=node['_etag'], + expected_status=403) + + found = self.app.db('nodes').find_one(node_id) + self.assertFalse(found.get('_deleted', False)) + + def test_delete_project(self): + self.enter_app_context() + + proj_id, proj = self.ensure_project_exists() + self.create_user(user_id=24 * 'a', roles={'subscriber'}, + groups=[ctd.EXAMPLE_PROJECT_OWNER_ID]) + + # Try deletion by a user who is not part of the project. + self.create_user(user_id=6 * 'dafe', roles={'subscriber'}, token='dafe-token') + self.delete(f'/api/projects/{proj_id}', + auth_token='dafe-token', + etag=proj['_etag'], + expected_status=403) + + found = self.app.db('projects').find_one(proj_id) + self.assertIsNotNone(found) + self.assertFalse(found.get('_deleted', False)) + class RequireRolesTest(AbstractPillarTest): def test_no_roles_required(self):