From a6258f5193821a2b65fcc1b5cdcaea3b77181ef2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 25 Apr 2016 13:54:54 +0200 Subject: [PATCH] Limit project editing for subscribers. Certain fields are limited for subscribers. Also, subscribers are checked against the project permissions. Users with the 'admin' role can edit all fields, on any project. --- pillar/application/modules/projects.py | 67 +++++++-- tests/test_project_management.py | 196 +++++++++++++++++++++++-- 2 files changed, 231 insertions(+), 32 deletions(-) diff --git a/pillar/application/modules/projects.py b/pillar/application/modules/projects.py index 3bb5dbcb..45bca6c5 100644 --- a/pillar/application/modules/projects.py +++ b/pillar/application/modules/projects.py @@ -1,3 +1,4 @@ +import copy import logging import json @@ -8,7 +9,7 @@ from flask import g, Blueprint, request, abort, current_app from application.utils import remove_private_keys, authorization, PillarJSONEncoder from application.utils.gcs import GoogleCloudStorageBucket -from application.utils.authorization import user_has_role +from application.utils.authorization import user_has_role, check_permissions from manage_extra.node_types.asset import node_type_asset from manage_extra.node_types.blog import node_type_blog from manage_extra.node_types.comment import node_type_comment @@ -35,6 +36,34 @@ def before_inserting_projects(items): item.pop('url', None) +def before_edit_check_permissions(document, original): + # Allow admin users to do whatever they want. + # TODO: possibly move this into the check_permissions function. + if user_has_role(u'admin'): + return + + check_permissions(original, request.method) + + +def protect_sensitive_fields(document, original): + """When not logged in as admin, prevents update to certain fields.""" + + # Allow admin users to do whatever they want. + if user_has_role(u'admin'): + return + + def revert(name): + if name not in original: + return + document[name] = original[name] + + revert('url') + revert('is_private') + revert('status') + revert('category') + revert('user') + + def after_inserting_projects(items): """After inserting a project in the collection we do some processing such as: - apply the right permissions @@ -86,19 +115,18 @@ def after_inserting_project(project, db_user): ] } + def with_permissions(node_type): + copied = copy.deepcopy(node_type) + copied['permissions'] = permissions + return copied + # Assign permissions to the project itself, as well as to the node_types project['permissions'] = permissions - node_type_asset['permissions'] = permissions - node_type_group['permissions'] = permissions - node_type_page['permissions'] = permissions - node_type_comment['permissions'] = permissions - - # Assign the basic 'group', 'asset' and 'page' node_types project['node_types'] = [ - node_type_group, - node_type_asset, - node_type_page, - node_type_comment] + with_permissions(node_type_group), + with_permissions(node_type_asset), + with_permissions(node_type_page), + with_permissions(node_type_comment)] # Allow admin users to use whatever url they want. if not user_has_role(u'admin') or not project.get('url'): @@ -114,11 +142,14 @@ def after_inserting_project(project, db_user): else: log.warning('Unable to create CGS instance for project %s', project_id) - # Commit the changes - result, _, _, status = put_internal('projects', remove_private_keys(project), _id=project_id) - if status != 200: - log.warning('Unable to update project %s: %s', project_id, result) - abort_with_error(status) + # Commit the changes directly to the MongoDB; a PUT is not allowed yet, + # as the project doesn't have a valid permission structure. + projects_collection = current_app.data.driver.db['projects'] + result = projects_collection.update_one({'_id': project_id}, + {'$set': remove_private_keys(project)}) + if result.matched_count != 1: + log.warning('Unable to update project %s: %s', project_id, result.raw_result) + abort_with_error(500) def _create_new_project(project_name, user_id, overrides): @@ -188,6 +219,10 @@ def abort_with_error(status): def setup_app(app, url_prefix): + app.on_replace_projects += before_edit_check_permissions + app.on_replace_projects += protect_sensitive_fields + app.on_update_projects += before_edit_check_permissions + app.on_update_projects += protect_sensitive_fields app.on_insert_projects += before_inserting_projects app.on_inserted_projects += after_inserting_projects app.register_blueprint(blueprint, url_prefix=url_prefix) diff --git a/tests/test_project_management.py b/tests/test_project_management.py index e905a9de..ace05782 100644 --- a/tests/test_project_management.py +++ b/tests/test_project_management.py @@ -2,23 +2,34 @@ """Unit tests for creating and editing projects_blueprint.""" +import functools import json +import logging -import responses from bson import ObjectId from common_test_class import AbstractPillarTest +log = logging.getLogger(__name__) -class ProjectCreationTest(AbstractPillarTest): - @responses.activate - def test_project_creation_wrong_role(self): - user_id = self.create_user(roles=[u'whatever']) - self.create_valid_auth_token(user_id, 'token') +class AbstractProjectTest(AbstractPillarTest): + def _create_user_with_token(self, roles, token, user_id='cafef00df00df00df00df00d'): + user_id = self.create_user(roles=roles, user_id=user_id) + self.create_valid_auth_token(user_id, token) + return user_id + + def _create_project(self, project_name, token): resp = self.client.post('/p/create', - headers={'Authorization': self.make_header('token')}, - data={'project_name': u'Prøject El Niño'}) + headers={'Authorization': self.make_header(token)}, + data={'project_name': project_name}) + return resp + + +class ProjectCreationTest(AbstractProjectTest): + def test_project_creation_wrong_role(self): + self._create_user_with_token([u'whatever'], 'token') + resp = self._create_project(u'Prøject El Niño', 'token') self.assertEqual(403, resp.status_code) @@ -27,14 +38,9 @@ class ProjectCreationTest(AbstractPillarTest): projects = self.app.data.driver.db['projects'] self.assertEqual(0, len(list(projects.find()))) - @responses.activate def test_project_creation_good_role(self): - user_id = self.create_user(roles=[u'subscriber']) - self.create_valid_auth_token(user_id, 'token') - - resp = self.client.post('/p/create', - headers={'Authorization': self.make_header('token')}, - data={'project_name': u'Prøject El Niñö'}) + user_id = self._create_user_with_token([u'subscriber'], 'token') + resp = self._create_project(u'Prøject El Niño', 'token') self.assertEqual(201, resp.status_code) project = json.loads(resp.data.decode('utf-8')) @@ -45,7 +51,7 @@ class ProjectCreationTest(AbstractPillarTest): resp.headers['Location']) # Check some of the more complex/interesting fields. - self.assertEqual(u'Prøject El Niñö', project['name']) + self.assertEqual(u'Prøject El Niño', project['name']) self.assertEqual(str(user_id), project['user']) self.assertEqual('p-%s' % project_id, project['url']) self.assertEqual(1, len(project['permissions']['groups'])) @@ -68,3 +74,161 @@ class ProjectCreationTest(AbstractPillarTest): self.assertEqual(str(project_id), group['name']) self.assertIn(group_id, db_user['groups']) + +class ProjectEditTest(AbstractProjectTest): + def test_editing_as_subscriber(self): + """Test that we can set certain fields, but not all.""" + + from application.utils import remove_private_keys, PillarJSONEncoder + dumps = functools.partial(json.dumps, cls=PillarJSONEncoder) + + project = self._create_user_and_project([u'subscriber']) + project_url = '/projects/%(_id)s' % project + + # Create another user we can try and assign the project to. + other_user_id = 'f00dd00df00dd00df00dd00d' + self._create_user_with_token(['subscriber'], 'other-token', user_id=other_user_id) + + # Unauthenticated should be forbidden + resp = self.client.put('/projects/%s' % project['_id'], + data=dumps(remove_private_keys(project)), + headers={'Content-Type': 'application/json'}) + self.assertEqual(403, resp.status_code) + + # Regular user should be able to PUT, but only be able to edit certain fields. + put_project = remove_private_keys(project) + put_project['url'] = u'very-offensive-url' + put_project['description'] = u'Blender je besplatan set alata za izradu interaktivnog 3D ' \ + u'sadržaja pod različitim operativnim sustavima.' + put_project['name'] = u'โครงการปั่นเมฆ' + put_project['summary'] = u'Это переведена на Google' + put_project['is_private'] = False + put_project['status'] = 'deleted' + put_project['category'] = 'software' + put_project['user'] = other_user_id + + resp = self.client.put(project_url, + data=dumps(put_project), + headers={'Authorization': self.make_header('token'), + 'Content-Type': 'application/json', + 'If-Match': project['_etag']}) + self.assertEqual(200, resp.status_code, resp.data) + + # Re-fetch from database to see which fields actually made it there. + # equal to put_project -> changed in DB + # equal to project -> not changed in DB + resp = self.client.get(project_url) + db_proj = json.loads(resp.data) + self.assertEqual(project['url'], db_proj['url']) + self.assertEqual(put_project['description'], db_proj['description']) + self.assertEqual(put_project['name'], db_proj['name']) + self.assertEqual(put_project['summary'], db_proj['summary']) + self.assertEqual(project['is_private'], db_proj['is_private']) + self.assertEqual(project['status'], db_proj['status']) + self.assertEqual(project['category'], db_proj['category']) + + def test_editing_as_admin(self): + """Test that we can set all fields as admin.""" + + from application.utils import remove_private_keys, PillarJSONEncoder + dumps = functools.partial(json.dumps, cls=PillarJSONEncoder) + + project = self._create_user_and_project([u'subscriber', u'admin']) + project_url = '/projects/%(_id)s' % project + + # Create another user we can try and assign the project to. + other_user_id = 'f00dd00df00dd00df00dd00d' + self._create_user_with_token(['subscriber'], 'other-token', user_id=other_user_id) + + # Admin user should be able to PUT everything. + put_project = remove_private_keys(project) + put_project['url'] = u'very-offensive-url' + put_project['description'] = u'Blender je besplatan set alata za izradu interaktivnog 3D ' \ + u'sadržaja pod različitim operativnim sustavima.' + put_project['name'] = u'โครงการปั่นเมฆ' + put_project['summary'] = u'Это переведена на Google' + put_project['is_private'] = False + put_project['status'] = 'deleted' + put_project['category'] = 'software' + put_project['user'] = other_user_id + + resp = self.client.put(project_url, + data=dumps(put_project), + headers={'Authorization': self.make_header('token'), + 'Content-Type': 'application/json', + 'If-Match': project['_etag']}) + self.assertEqual(200, resp.status_code, resp.data) + + # Re-fetch from database to see which fields actually made it there. + # equal to put_project -> changed in DB + # equal to project -> not changed in DB + resp = self.client.get('/projects/%s' % project['_id']) + db_proj = json.loads(resp.data) + self.assertEqual(put_project['url'], db_proj['url']) + self.assertEqual(put_project['description'], db_proj['description']) + self.assertEqual(put_project['name'], db_proj['name']) + self.assertEqual(put_project['summary'], db_proj['summary']) + self.assertEqual(put_project['is_private'], db_proj['is_private']) + self.assertEqual(put_project['status'], db_proj['status']) + self.assertEqual(put_project['category'], db_proj['category']) + self.assertEqual(put_project['user'], db_proj['user']) + + def test_edits_by_nonowner_admin(self): + """Any admin should be able to edit any project.""" + + from application.utils import remove_private_keys, PillarJSONEncoder + dumps = functools.partial(json.dumps, cls=PillarJSONEncoder) + + # Create test project. + project = self._create_user_and_project([u'subscriber']) + project_id = project['_id'] + project_url = '/projects/%s' % project_id + + # Create test user. + self._create_user_with_token(['admin'], 'admin-token', user_id='cafef00dbeef') + + # Admin user should be able to PUT. + put_project = remove_private_keys(project) + put_project['name'] = u'โครงการปั่นเมฆ' + + resp = self.client.put(project_url, + data=dumps(put_project), + headers={'Authorization': self.make_header('admin-token'), + 'Content-Type': 'application/json', + 'If-Match': project['_etag']}) + self.assertEqual(200, resp.status_code, resp.data) + + def test_edits_by_nonowner_subscriber(self): + """A subscriber should only be able to edit their own projects.""" + + from application.utils import remove_private_keys, PillarJSONEncoder + dumps = functools.partial(json.dumps, cls=PillarJSONEncoder) + + # Create test project. + project = self._create_user_and_project([u'subscriber']) + project_id = project['_id'] + project_url = '/projects/%s' % project_id + + # Create test user. + my_user_id = 'cafef00dbeefcafef00dbeef' + self._create_user_with_token(['subscriber'], 'mortal-token', user_id=my_user_id) + + # Regular subscriber should not be able to do this. + put_project = remove_private_keys(project) + put_project['name'] = u'Болту́н -- нахо́дка для шпио́на.' + put_project['user'] = my_user_id + resp = self.client.put(project_url, + data=dumps(put_project), + headers={'Authorization': self.make_header('mortal-token'), + 'Content-Type': 'application/json', + 'If-Match': project['_etag']}) + self.assertEqual(403, resp.status_code, resp.data) + + def _create_user_and_project(self, roles): + self._create_user_with_token(roles, 'token') + resp = self._create_project(u'Prøject El Niño', 'token') + + self.assertEqual(201, resp.status_code, resp.data) + project = json.loads(resp.data) + + return project