Added access control to organizations Eve endpoints

This commit is contained in:
Sybren A. Stüvel 2017-08-23 12:16:54 +02:00
parent cf51d1a280
commit b53d485960
4 changed files with 283 additions and 2 deletions

View File

@ -737,6 +737,11 @@ groups = {
organizations = {
'schema': organizations_schema,
'resource_methods': ['GET', 'POST'],
'item_methods': ['GET'],
'public_item_methods': [],
'public_methods': [],
'soft_delete': True,
}
projects = {

View File

@ -254,6 +254,7 @@ class OrgManager:
def setup_app(app):
from . import patch
from . import patch, hooks
hooks.setup_app(app)
patch.setup_app(app)

View File

@ -0,0 +1,27 @@
import werkzeug.exceptions as wz_exceptions
from pillar.api.utils.authentication import current_user
def pre_get_organizations(request, lookup):
user = current_user()
if user.is_anonymous:
raise wz_exceptions.Forbidden()
if user.has_cap('admin'):
# Allow all lookups to admins.
return
# Only allow users to see their own organizations.
lookup['$or'] = [{'admin_uid': user.user_id}, {'members': user.user_id}]
def pre_post_organizations(request):
user = current_user()
if user.is_anonymous or not user.has_cap('admin'):
raise wz_exceptions.Forbidden()
def setup_app(app):
app.on_pre_GET_organizations += pre_get_organizations
app.on_pre_POST_organizations += pre_post_organizations

View File

@ -1,7 +1,10 @@
from pillar.tests import AbstractPillarTest
import typing
import bson
from pillar.tests import AbstractPillarTest
from pillar.api.utils import remove_private_keys
class OrganizationCruTest(AbstractPillarTest):
"""Test creating and updating organizations."""
@ -327,3 +330,248 @@ class OrganizationPatchTest(AbstractPillarTest):
self.assertEqual('Blender Institute', db_org['name'])
self.assertEqual('Open Source animation studio', db_org['description'])
self.assertEqual('https://blender.institute/', db_org['website'])
class OrganizationResourceEveTest(AbstractPillarTest):
"""Test GET/POST/PUT access to Organization resource"""
def setUp(self, **kwargs):
super().setUp(**kwargs)
self.enter_app_context()
self.om = self.app.org_manager
# Pillar admin
self.create_user(24 * '1', roles={'admin'}, token='uberadmin')
# Create organizations with admin who is not organization member.
self.admin1_uid = self.create_user(24 * 'a', token='admin1-token')
self.admin2_uid = self.create_user(24 * 'b', token='admin2-token')
org_doc = self.om.create_new_org('Хакеры 1', self.admin1_uid, 25)
self.org1_id = org_doc['_id']
org_doc = self.om.create_new_org('Хакеры 2', self.admin2_uid, 10)
self.org2_id = org_doc['_id']
# Create members of the organizations.
self.member1_uid = self.create_user(24 * 'd',
email='member1@example.com',
token='member1-token')
self.member2_uid = self.create_user(24 * 'e',
email='member2@example.com',
token='member2-token')
self.om.assign_users(self.org1_id, ['member1@example.com', 'member2@example.com'])
self.om.assign_users(self.org2_id, ['member2@example.com'])
# Create an outside user.
self.outsider_uid = self.create_user(24 * '0', token='outsider-token')
# Re-fetch the organizations so that self.org_docx has the correct etag.
self.org1_doc = self._from_db(self.org1_id)
self.org2_doc = self._from_db(self.org2_id)
def _from_db(self, org_id) -> dict:
return self.om._get_org(org_id)
def test_list_as_pillar_admin(self):
"""Pillar Admins should see all orgs"""
resp = self.get('/api/organizations', auth_token='uberadmin')
docs = resp.get_json()
self.assertEqual(2, docs['_meta']['total'])
self.assertEqual({str(self.org1_id), str(self.org2_id)},
{doc['_id'] for doc in docs['_items']})
def test_list_as_admin1(self):
"""Admins should only see their own orgs"""
resp = self.get('/api/organizations', auth_token='admin1-token')
docs = resp.get_json()
self.assertEqual(1, docs['_meta']['total'])
self.assertEqual(str(self.org1_id), docs['_items'][0]['_id'])
def test_list_as_member(self):
"""Members should only see their own orgs"""
resp = self.get('/api/organizations', auth_token='member1-token')
docs = resp.get_json()
self.assertEqual(1, docs['_meta']['total'])
self.assertEqual(str(self.org1_id), docs['_items'][0]['_id'])
resp = self.get('/api/organizations', auth_token='member2-token')
docs = resp.get_json()
self.assertEqual(2, docs['_meta']['total'])
self.assertEqual({str(self.org1_id), str(self.org2_id)},
{doc['_id'] for doc in docs['_items']})
def test_list_as_outsider(self):
"""Outsiders shouldn't see any orgs"""
resp = self.get('/api/organizations', auth_token='outsider-token')
docs = resp.get_json()
self.assertEqual(0, docs['_meta']['total'])
self.assertEqual([], docs['_items'])
def test_list_as_anonymous(self):
"""Anonymous users should be denied"""
self.get('/api/organizations', expected_status=403)
def test_create_as_pillar_admin(self):
"""Pillar admins should be able to create a new organization"""
new_org = {
'name': 'Union of €-forgers',
'seat_count': 5,
'admin_uid': self.admin1_uid,
}
resp = self.post('/api/organizations', auth_token='uberadmin', json=new_org,
expected_status=201)
new_doc = resp.get_json()
org_id = bson.ObjectId(new_doc['_id'])
db_org = self._from_db(org_id)
self.assertEqual(new_org['name'], db_org['name'])
self.assertEqual(self.admin1_uid, db_org['admin_uid'])
def _create_test(self, auth_token):
"""Generic creation test for non-pillar-admin users"""
new_org = {
'name': 'Union of €-forgers',
'seat_count': 5,
'admin_uid': self.admin1_uid,
}
# Tests both as a POST and as a PUT request. Should have the same result (no creation).
self.post('/api/organizations', auth_token=auth_token, json=new_org, expected_status=403)
new_id = bson.ObjectId()
self.put(f'/api/organizations/{new_id}',
auth_token=auth_token, json=new_org, expected_status=405)
def test_create_as_admin1(self):
self._create_test('admin1-token')
def test_create_as_member(self):
self._create_test('member1-token')
self._create_test('member2-token')
def test_create_as_outsider(self):
self._create_test('outsider-token')
def test_create_as_anonymous(self):
self._create_test(None)
class OrganizationItemEveTest(AbstractPillarTest):
"""Test GET/PUT/DELETE access to Organization items"""
def setUp(self, **kwargs):
super().setUp(**kwargs)
self.enter_app_context()
self.om = self.app.org_manager
# Create an organization with admin who is not organization member.
self.admin_uid = self.create_user(24 * 'a', token='admin-token')
org_doc = self.om.create_new_org('Хакеры', self.admin_uid, 25)
self.org_id = org_doc['_id']
# Create a member of the organization.
self.member_uid = self.create_user(24 * 'b',
email='member@example.com',
token='member-token')
self.om.assign_users(self.org_id, ['member@example.com'])
# Create an outside user.
self.outsider_uid = self.create_user(24 * '0', token='outsider-token')
# Re-fetch the organization so that self.org_doc has the correct etag.
self.org_doc = self._from_db()
def _from_db(self) -> dict:
return self.om._get_org(self.org_id)
def test_get_admin(self):
resp = self.get(f'/api/organizations/{self.org_id}',
auth_token='admin-token')
org = resp.get_json()
self.assertEqual(str(self.org_id), org['_id'])
self.assertEqual(25, org['seat_count'])
def test_get_member(self):
resp = self.get(f'/api/organizations/{self.org_id}',
auth_token='member-token')
org = resp.get_json()
self.assertEqual(str(self.org_id), org['_id'])
self.assertEqual(25, org['seat_count'])
def test_get_outside_user(self):
# Eve pretends the organization doesn't even exist when you don't have access.
self.get(f'/api/organizations/{self.org_id}',
auth_token='outsider-token',
expected_status=404)
def test_get_anonymous(self):
self.get(f'/api/organizations/{self.org_id}',
expected_status=403)
def _put_test(self, auth_token: typing.Optional[str]):
"""Generic PUT test, should be same result for all cases."""
put_doc = remove_private_keys(self.org_doc)
put_doc['name'] = 'new name'
self.put(f'/api/organizations/{self.org_id}',
json=put_doc,
etag=self.org_doc['_etag'],
auth_token=auth_token,
expected_status=405)
# The name shouldn't have changed in the database.
db_org = self._from_db()
self.assertEqual(self.org_doc['name'], db_org['name'])
def test_put_admin(self):
self._put_test('admin-token')
def test_put_member(self):
self._put_test('member-token')
def test_put_outside_user(self):
self._put_test('outsider-token')
def test_put_anonymous(self):
self._put_test(None)
def _delete_test(self, auth_token: typing.Optional[str]):
"""Generic DELETE test, should be same result for all cases."""
self.delete(f'/api/organizations/{self.org_id}',
etag=self.org_doc['_etag'],
auth_token=auth_token,
expected_status=405)
# The organization shouldn't be deleted.
db_org = self._from_db()
self.assertFalse(db_org['_deleted'])
def test_delete_admin(self):
self._delete_test('admin-token')
def test_delete_member(self):
self._delete_test('member-token')
def test_delete_outside_user(self):
self._delete_test('outsider-token')
def test_delete_anonymous(self):
self._delete_test(None)