Limit project editing for subscribers.

Certain fields are limited for subscribers. Also, subscribers are checked
against the project permissions.

Users with the 'admin' role can edit all fields, on any project.
This commit is contained in:
2016-04-25 13:54:54 +02:00
parent 4edb8cfd39
commit a6258f5193
2 changed files with 231 additions and 32 deletions

View File

@@ -1,3 +1,4 @@
import copy
import logging import logging
import json import json
@@ -8,7 +9,7 @@ from flask import g, Blueprint, request, abort, current_app
from application.utils import remove_private_keys, authorization, PillarJSONEncoder from application.utils import remove_private_keys, authorization, PillarJSONEncoder
from application.utils.gcs import GoogleCloudStorageBucket from application.utils.gcs import GoogleCloudStorageBucket
from application.utils.authorization import user_has_role from application.utils.authorization import user_has_role, check_permissions
from manage_extra.node_types.asset import node_type_asset from manage_extra.node_types.asset import node_type_asset
from manage_extra.node_types.blog import node_type_blog from manage_extra.node_types.blog import node_type_blog
from manage_extra.node_types.comment import node_type_comment from manage_extra.node_types.comment import node_type_comment
@@ -35,6 +36,34 @@ def before_inserting_projects(items):
item.pop('url', None) item.pop('url', None)
def before_edit_check_permissions(document, original):
# Allow admin users to do whatever they want.
# TODO: possibly move this into the check_permissions function.
if user_has_role(u'admin'):
return
check_permissions(original, request.method)
def protect_sensitive_fields(document, original):
"""When not logged in as admin, prevents update to certain fields."""
# Allow admin users to do whatever they want.
if user_has_role(u'admin'):
return
def revert(name):
if name not in original:
return
document[name] = original[name]
revert('url')
revert('is_private')
revert('status')
revert('category')
revert('user')
def after_inserting_projects(items): def after_inserting_projects(items):
"""After inserting a project in the collection we do some processing such as: """After inserting a project in the collection we do some processing such as:
- apply the right permissions - apply the right permissions
@@ -86,19 +115,18 @@ def after_inserting_project(project, db_user):
] ]
} }
def with_permissions(node_type):
copied = copy.deepcopy(node_type)
copied['permissions'] = permissions
return copied
# Assign permissions to the project itself, as well as to the node_types # Assign permissions to the project itself, as well as to the node_types
project['permissions'] = permissions project['permissions'] = permissions
node_type_asset['permissions'] = permissions
node_type_group['permissions'] = permissions
node_type_page['permissions'] = permissions
node_type_comment['permissions'] = permissions
# Assign the basic 'group', 'asset' and 'page' node_types
project['node_types'] = [ project['node_types'] = [
node_type_group, with_permissions(node_type_group),
node_type_asset, with_permissions(node_type_asset),
node_type_page, with_permissions(node_type_page),
node_type_comment] with_permissions(node_type_comment)]
# Allow admin users to use whatever url they want. # Allow admin users to use whatever url they want.
if not user_has_role(u'admin') or not project.get('url'): if not user_has_role(u'admin') or not project.get('url'):
@@ -114,11 +142,14 @@ def after_inserting_project(project, db_user):
else: else:
log.warning('Unable to create CGS instance for project %s', project_id) log.warning('Unable to create CGS instance for project %s', project_id)
# Commit the changes # Commit the changes directly to the MongoDB; a PUT is not allowed yet,
result, _, _, status = put_internal('projects', remove_private_keys(project), _id=project_id) # as the project doesn't have a valid permission structure.
if status != 200: projects_collection = current_app.data.driver.db['projects']
log.warning('Unable to update project %s: %s', project_id, result) result = projects_collection.update_one({'_id': project_id},
abort_with_error(status) {'$set': remove_private_keys(project)})
if result.matched_count != 1:
log.warning('Unable to update project %s: %s', project_id, result.raw_result)
abort_with_error(500)
def _create_new_project(project_name, user_id, overrides): def _create_new_project(project_name, user_id, overrides):
@@ -188,6 +219,10 @@ def abort_with_error(status):
def setup_app(app, url_prefix): def setup_app(app, url_prefix):
app.on_replace_projects += before_edit_check_permissions
app.on_replace_projects += protect_sensitive_fields
app.on_update_projects += before_edit_check_permissions
app.on_update_projects += protect_sensitive_fields
app.on_insert_projects += before_inserting_projects app.on_insert_projects += before_inserting_projects
app.on_inserted_projects += after_inserting_projects app.on_inserted_projects += after_inserting_projects
app.register_blueprint(blueprint, url_prefix=url_prefix) app.register_blueprint(blueprint, url_prefix=url_prefix)

View File

@@ -2,23 +2,34 @@
"""Unit tests for creating and editing projects_blueprint.""" """Unit tests for creating and editing projects_blueprint."""
import functools
import json import json
import logging
import responses
from bson import ObjectId from bson import ObjectId
from common_test_class import AbstractPillarTest from common_test_class import AbstractPillarTest
log = logging.getLogger(__name__)
class ProjectCreationTest(AbstractPillarTest):
@responses.activate
def test_project_creation_wrong_role(self):
user_id = self.create_user(roles=[u'whatever'])
self.create_valid_auth_token(user_id, 'token')
class AbstractProjectTest(AbstractPillarTest):
def _create_user_with_token(self, roles, token, user_id='cafef00df00df00df00df00d'):
user_id = self.create_user(roles=roles, user_id=user_id)
self.create_valid_auth_token(user_id, token)
return user_id
def _create_project(self, project_name, token):
resp = self.client.post('/p/create', resp = self.client.post('/p/create',
headers={'Authorization': self.make_header('token')}, headers={'Authorization': self.make_header(token)},
data={'project_name': u'Prøject El Niño'}) data={'project_name': project_name})
return resp
class ProjectCreationTest(AbstractProjectTest):
def test_project_creation_wrong_role(self):
self._create_user_with_token([u'whatever'], 'token')
resp = self._create_project(u'Prøject El Niño', 'token')
self.assertEqual(403, resp.status_code) self.assertEqual(403, resp.status_code)
@@ -27,14 +38,9 @@ class ProjectCreationTest(AbstractPillarTest):
projects = self.app.data.driver.db['projects'] projects = self.app.data.driver.db['projects']
self.assertEqual(0, len(list(projects.find()))) self.assertEqual(0, len(list(projects.find())))
@responses.activate
def test_project_creation_good_role(self): def test_project_creation_good_role(self):
user_id = self.create_user(roles=[u'subscriber']) user_id = self._create_user_with_token([u'subscriber'], 'token')
self.create_valid_auth_token(user_id, 'token') resp = self._create_project(u'Prøject El Niño', 'token')
resp = self.client.post('/p/create',
headers={'Authorization': self.make_header('token')},
data={'project_name': u'Prøject El Niñö'})
self.assertEqual(201, resp.status_code) self.assertEqual(201, resp.status_code)
project = json.loads(resp.data.decode('utf-8')) project = json.loads(resp.data.decode('utf-8'))
@@ -45,7 +51,7 @@ class ProjectCreationTest(AbstractPillarTest):
resp.headers['Location']) resp.headers['Location'])
# Check some of the more complex/interesting fields. # Check some of the more complex/interesting fields.
self.assertEqual(u'Prøject El Niñö', project['name']) self.assertEqual(u'Prøject El Niño', project['name'])
self.assertEqual(str(user_id), project['user']) self.assertEqual(str(user_id), project['user'])
self.assertEqual('p-%s' % project_id, project['url']) self.assertEqual('p-%s' % project_id, project['url'])
self.assertEqual(1, len(project['permissions']['groups'])) self.assertEqual(1, len(project['permissions']['groups']))
@@ -68,3 +74,161 @@ class ProjectCreationTest(AbstractPillarTest):
self.assertEqual(str(project_id), group['name']) self.assertEqual(str(project_id), group['name'])
self.assertIn(group_id, db_user['groups']) self.assertIn(group_id, db_user['groups'])
class ProjectEditTest(AbstractProjectTest):
def test_editing_as_subscriber(self):
"""Test that we can set certain fields, but not all."""
from application.utils import remove_private_keys, PillarJSONEncoder
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
project = self._create_user_and_project([u'subscriber'])
project_url = '/projects/%(_id)s' % project
# Create another user we can try and assign the project to.
other_user_id = 'f00dd00df00dd00df00dd00d'
self._create_user_with_token(['subscriber'], 'other-token', user_id=other_user_id)
# Unauthenticated should be forbidden
resp = self.client.put('/projects/%s' % project['_id'],
data=dumps(remove_private_keys(project)),
headers={'Content-Type': 'application/json'})
self.assertEqual(403, resp.status_code)
# Regular user should be able to PUT, but only be able to edit certain fields.
put_project = remove_private_keys(project)
put_project['url'] = u'very-offensive-url'
put_project['description'] = u'Blender je besplatan set alata za izradu interaktivnog 3D ' \
u'sadržaja pod različitim operativnim sustavima.'
put_project['name'] = u'โครงการปั่นเมฆ'
put_project['summary'] = u'Это переведена на Google'
put_project['is_private'] = False
put_project['status'] = 'deleted'
put_project['category'] = 'software'
put_project['user'] = other_user_id
resp = self.client.put(project_url,
data=dumps(put_project),
headers={'Authorization': self.make_header('token'),
'Content-Type': 'application/json',
'If-Match': project['_etag']})
self.assertEqual(200, resp.status_code, resp.data)
# Re-fetch from database to see which fields actually made it there.
# equal to put_project -> changed in DB
# equal to project -> not changed in DB
resp = self.client.get(project_url)
db_proj = json.loads(resp.data)
self.assertEqual(project['url'], db_proj['url'])
self.assertEqual(put_project['description'], db_proj['description'])
self.assertEqual(put_project['name'], db_proj['name'])
self.assertEqual(put_project['summary'], db_proj['summary'])
self.assertEqual(project['is_private'], db_proj['is_private'])
self.assertEqual(project['status'], db_proj['status'])
self.assertEqual(project['category'], db_proj['category'])
def test_editing_as_admin(self):
"""Test that we can set all fields as admin."""
from application.utils import remove_private_keys, PillarJSONEncoder
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
project = self._create_user_and_project([u'subscriber', u'admin'])
project_url = '/projects/%(_id)s' % project
# Create another user we can try and assign the project to.
other_user_id = 'f00dd00df00dd00df00dd00d'
self._create_user_with_token(['subscriber'], 'other-token', user_id=other_user_id)
# Admin user should be able to PUT everything.
put_project = remove_private_keys(project)
put_project['url'] = u'very-offensive-url'
put_project['description'] = u'Blender je besplatan set alata za izradu interaktivnog 3D ' \
u'sadržaja pod različitim operativnim sustavima.'
put_project['name'] = u'โครงการปั่นเมฆ'
put_project['summary'] = u'Это переведена на Google'
put_project['is_private'] = False
put_project['status'] = 'deleted'
put_project['category'] = 'software'
put_project['user'] = other_user_id
resp = self.client.put(project_url,
data=dumps(put_project),
headers={'Authorization': self.make_header('token'),
'Content-Type': 'application/json',
'If-Match': project['_etag']})
self.assertEqual(200, resp.status_code, resp.data)
# Re-fetch from database to see which fields actually made it there.
# equal to put_project -> changed in DB
# equal to project -> not changed in DB
resp = self.client.get('/projects/%s' % project['_id'])
db_proj = json.loads(resp.data)
self.assertEqual(put_project['url'], db_proj['url'])
self.assertEqual(put_project['description'], db_proj['description'])
self.assertEqual(put_project['name'], db_proj['name'])
self.assertEqual(put_project['summary'], db_proj['summary'])
self.assertEqual(put_project['is_private'], db_proj['is_private'])
self.assertEqual(put_project['status'], db_proj['status'])
self.assertEqual(put_project['category'], db_proj['category'])
self.assertEqual(put_project['user'], db_proj['user'])
def test_edits_by_nonowner_admin(self):
"""Any admin should be able to edit any project."""
from application.utils import remove_private_keys, PillarJSONEncoder
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
# Create test project.
project = self._create_user_and_project([u'subscriber'])
project_id = project['_id']
project_url = '/projects/%s' % project_id
# Create test user.
self._create_user_with_token(['admin'], 'admin-token', user_id='cafef00dbeef')
# Admin user should be able to PUT.
put_project = remove_private_keys(project)
put_project['name'] = u'โครงการปั่นเมฆ'
resp = self.client.put(project_url,
data=dumps(put_project),
headers={'Authorization': self.make_header('admin-token'),
'Content-Type': 'application/json',
'If-Match': project['_etag']})
self.assertEqual(200, resp.status_code, resp.data)
def test_edits_by_nonowner_subscriber(self):
"""A subscriber should only be able to edit their own projects."""
from application.utils import remove_private_keys, PillarJSONEncoder
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
# Create test project.
project = self._create_user_and_project([u'subscriber'])
project_id = project['_id']
project_url = '/projects/%s' % project_id
# Create test user.
my_user_id = 'cafef00dbeefcafef00dbeef'
self._create_user_with_token(['subscriber'], 'mortal-token', user_id=my_user_id)
# Regular subscriber should not be able to do this.
put_project = remove_private_keys(project)
put_project['name'] = u'Болту́н -- нахо́дка для шпио́на.'
put_project['user'] = my_user_id
resp = self.client.put(project_url,
data=dumps(put_project),
headers={'Authorization': self.make_header('mortal-token'),
'Content-Type': 'application/json',
'If-Match': project['_etag']})
self.assertEqual(403, resp.status_code, resp.data)
def _create_user_and_project(self, roles):
self._create_user_with_token(roles, 'token')
resp = self._create_project(u'Prøject El Niño', 'token')
self.assertEqual(201, resp.status_code, resp.data)
project = json.loads(resp.data)
return project