diff --git a/pillar/api/eve_settings.py b/pillar/api/eve_settings.py index 050eab8c..fb51cc6b 100644 --- a/pillar/api/eve_settings.py +++ b/pillar/api/eve_settings.py @@ -737,6 +737,11 @@ groups = { organizations = { 'schema': organizations_schema, + 'resource_methods': ['GET', 'POST'], + 'item_methods': ['GET'], + 'public_item_methods': [], + 'public_methods': [], + 'soft_delete': True, } projects = { diff --git a/pillar/api/organizations/__init__.py b/pillar/api/organizations/__init__.py index 7345b4ea..18b816ef 100644 --- a/pillar/api/organizations/__init__.py +++ b/pillar/api/organizations/__init__.py @@ -254,6 +254,7 @@ class OrgManager: def setup_app(app): - from . import patch + from . import patch, hooks + hooks.setup_app(app) patch.setup_app(app) diff --git a/pillar/api/organizations/hooks.py b/pillar/api/organizations/hooks.py new file mode 100644 index 00000000..fb843d42 --- /dev/null +++ b/pillar/api/organizations/hooks.py @@ -0,0 +1,27 @@ +import werkzeug.exceptions as wz_exceptions + +from pillar.api.utils.authentication import current_user + + +def pre_get_organizations(request, lookup): + user = current_user() + if user.is_anonymous: + raise wz_exceptions.Forbidden() + + if user.has_cap('admin'): + # Allow all lookups to admins. + return + + # Only allow users to see their own organizations. + lookup['$or'] = [{'admin_uid': user.user_id}, {'members': user.user_id}] + + +def pre_post_organizations(request): + user = current_user() + if user.is_anonymous or not user.has_cap('admin'): + raise wz_exceptions.Forbidden() + + +def setup_app(app): + app.on_pre_GET_organizations += pre_get_organizations + app.on_pre_POST_organizations += pre_post_organizations diff --git a/tests/test_api/test_organizations.py b/tests/test_api/test_organizations.py index 637181cd..75ed4689 100644 --- a/tests/test_api/test_organizations.py +++ b/tests/test_api/test_organizations.py @@ -1,7 +1,10 @@ -from pillar.tests import AbstractPillarTest +import typing import bson +from pillar.tests import AbstractPillarTest +from pillar.api.utils import remove_private_keys + class OrganizationCruTest(AbstractPillarTest): """Test creating and updating organizations.""" @@ -327,3 +330,248 @@ class OrganizationPatchTest(AbstractPillarTest): self.assertEqual('Blender Institute', db_org['name']) self.assertEqual('Open Source animation studio', db_org['description']) self.assertEqual('https://blender.institute/', db_org['website']) + + +class OrganizationResourceEveTest(AbstractPillarTest): + """Test GET/POST/PUT access to Organization resource""" + + def setUp(self, **kwargs): + super().setUp(**kwargs) + + self.enter_app_context() + self.om = self.app.org_manager + + # Pillar admin + self.create_user(24 * '1', roles={'admin'}, token='uberadmin') + + # Create organizations with admin who is not organization member. + self.admin1_uid = self.create_user(24 * 'a', token='admin1-token') + self.admin2_uid = self.create_user(24 * 'b', token='admin2-token') + + org_doc = self.om.create_new_org('Хакеры 1', self.admin1_uid, 25) + self.org1_id = org_doc['_id'] + org_doc = self.om.create_new_org('Хакеры 2', self.admin2_uid, 10) + self.org2_id = org_doc['_id'] + + # Create members of the organizations. + self.member1_uid = self.create_user(24 * 'd', + email='member1@example.com', + token='member1-token') + self.member2_uid = self.create_user(24 * 'e', + email='member2@example.com', + token='member2-token') + self.om.assign_users(self.org1_id, ['member1@example.com', 'member2@example.com']) + self.om.assign_users(self.org2_id, ['member2@example.com']) + + # Create an outside user. + self.outsider_uid = self.create_user(24 * '0', token='outsider-token') + + # Re-fetch the organizations so that self.org_docx has the correct etag. + self.org1_doc = self._from_db(self.org1_id) + self.org2_doc = self._from_db(self.org2_id) + + def _from_db(self, org_id) -> dict: + return self.om._get_org(org_id) + + def test_list_as_pillar_admin(self): + """Pillar Admins should see all orgs""" + + resp = self.get('/api/organizations', auth_token='uberadmin') + docs = resp.get_json() + + self.assertEqual(2, docs['_meta']['total']) + self.assertEqual({str(self.org1_id), str(self.org2_id)}, + {doc['_id'] for doc in docs['_items']}) + + def test_list_as_admin1(self): + """Admins should only see their own orgs""" + + resp = self.get('/api/organizations', auth_token='admin1-token') + docs = resp.get_json() + + self.assertEqual(1, docs['_meta']['total']) + self.assertEqual(str(self.org1_id), docs['_items'][0]['_id']) + + def test_list_as_member(self): + """Members should only see their own orgs""" + + resp = self.get('/api/organizations', auth_token='member1-token') + docs = resp.get_json() + + self.assertEqual(1, docs['_meta']['total']) + self.assertEqual(str(self.org1_id), docs['_items'][0]['_id']) + + resp = self.get('/api/organizations', auth_token='member2-token') + docs = resp.get_json() + + self.assertEqual(2, docs['_meta']['total']) + self.assertEqual({str(self.org1_id), str(self.org2_id)}, + {doc['_id'] for doc in docs['_items']}) + + def test_list_as_outsider(self): + """Outsiders shouldn't see any orgs""" + + resp = self.get('/api/organizations', auth_token='outsider-token') + docs = resp.get_json() + + self.assertEqual(0, docs['_meta']['total']) + self.assertEqual([], docs['_items']) + + def test_list_as_anonymous(self): + """Anonymous users should be denied""" + + self.get('/api/organizations', expected_status=403) + + def test_create_as_pillar_admin(self): + """Pillar admins should be able to create a new organization""" + + new_org = { + 'name': 'Union of €-forgers', + 'seat_count': 5, + 'admin_uid': self.admin1_uid, + } + resp = self.post('/api/organizations', auth_token='uberadmin', json=new_org, + expected_status=201) + new_doc = resp.get_json() + + org_id = bson.ObjectId(new_doc['_id']) + db_org = self._from_db(org_id) + self.assertEqual(new_org['name'], db_org['name']) + self.assertEqual(self.admin1_uid, db_org['admin_uid']) + + def _create_test(self, auth_token): + """Generic creation test for non-pillar-admin users""" + + new_org = { + 'name': 'Union of €-forgers', + 'seat_count': 5, + 'admin_uid': self.admin1_uid, + } + + # Tests both as a POST and as a PUT request. Should have the same result (no creation). + self.post('/api/organizations', auth_token=auth_token, json=new_org, expected_status=403) + + new_id = bson.ObjectId() + self.put(f'/api/organizations/{new_id}', + auth_token=auth_token, json=new_org, expected_status=405) + + def test_create_as_admin1(self): + self._create_test('admin1-token') + + def test_create_as_member(self): + self._create_test('member1-token') + self._create_test('member2-token') + + def test_create_as_outsider(self): + self._create_test('outsider-token') + + def test_create_as_anonymous(self): + self._create_test(None) + + +class OrganizationItemEveTest(AbstractPillarTest): + """Test GET/PUT/DELETE access to Organization items""" + + def setUp(self, **kwargs): + super().setUp(**kwargs) + + self.enter_app_context() + self.om = self.app.org_manager + + # Create an organization with admin who is not organization member. + self.admin_uid = self.create_user(24 * 'a', token='admin-token') + org_doc = self.om.create_new_org('Хакеры', self.admin_uid, 25) + self.org_id = org_doc['_id'] + + # Create a member of the organization. + self.member_uid = self.create_user(24 * 'b', + email='member@example.com', + token='member-token') + self.om.assign_users(self.org_id, ['member@example.com']) + + # Create an outside user. + self.outsider_uid = self.create_user(24 * '0', token='outsider-token') + + # Re-fetch the organization so that self.org_doc has the correct etag. + self.org_doc = self._from_db() + + def _from_db(self) -> dict: + return self.om._get_org(self.org_id) + + def test_get_admin(self): + resp = self.get(f'/api/organizations/{self.org_id}', + auth_token='admin-token') + org = resp.get_json() + + self.assertEqual(str(self.org_id), org['_id']) + self.assertEqual(25, org['seat_count']) + + def test_get_member(self): + resp = self.get(f'/api/organizations/{self.org_id}', + auth_token='member-token') + org = resp.get_json() + + self.assertEqual(str(self.org_id), org['_id']) + self.assertEqual(25, org['seat_count']) + + def test_get_outside_user(self): + # Eve pretends the organization doesn't even exist when you don't have access. + self.get(f'/api/organizations/{self.org_id}', + auth_token='outsider-token', + expected_status=404) + + def test_get_anonymous(self): + self.get(f'/api/organizations/{self.org_id}', + expected_status=403) + + def _put_test(self, auth_token: typing.Optional[str]): + """Generic PUT test, should be same result for all cases.""" + + put_doc = remove_private_keys(self.org_doc) + put_doc['name'] = 'new name' + + self.put(f'/api/organizations/{self.org_id}', + json=put_doc, + etag=self.org_doc['_etag'], + auth_token=auth_token, + expected_status=405) + + # The name shouldn't have changed in the database. + db_org = self._from_db() + self.assertEqual(self.org_doc['name'], db_org['name']) + + def test_put_admin(self): + self._put_test('admin-token') + + def test_put_member(self): + self._put_test('member-token') + + def test_put_outside_user(self): + self._put_test('outsider-token') + + def test_put_anonymous(self): + self._put_test(None) + + def _delete_test(self, auth_token: typing.Optional[str]): + """Generic DELETE test, should be same result for all cases.""" + + self.delete(f'/api/organizations/{self.org_id}', + etag=self.org_doc['_etag'], + auth_token=auth_token, + expected_status=405) + + # The organization shouldn't be deleted. + db_org = self._from_db() + self.assertFalse(db_org['_deleted']) + + def test_delete_admin(self): + self._delete_test('admin-token') + + def test_delete_member(self): + self._delete_test('member-token') + + def test_delete_outside_user(self): + self._delete_test('outsider-token') + + def test_delete_anonymous(self): + self._delete_test(None)