diff --git a/pillar/application/modules/projects.py b/pillar/application/modules/projects.py index f267cc5c..516e2c0d 100644 --- a/pillar/application/modules/projects.py +++ b/pillar/application/modules/projects.py @@ -262,18 +262,13 @@ def project_manage_users(): projects_collection = current_app.data.driver.db['projects'] project = projects_collection.find_one({'_id': ObjectId(project_id)}) + # Check if the current_user is owner of the project # TODO: check based on permissions if project['user'] != user_id: return abort_with_error(403) - # Get admin group - # TODO: improve this by checking actual permissions - admin_group_id = project['permissions']['groups'][0]['group'] - # Additional check for admin group (if group name is the same as project id) - groups_collection = current_app.data.driver.db['groups'] - group = groups_collection.find_one({'_id': admin_group_id}) - if group['name'] != project_id: - return abort_with_error(403) + + admin_group = get_admin_group(project) # Get the user and add the admin group to it if action == 'add': @@ -285,7 +280,7 @@ def project_manage_users(): users_collection = current_app.data.driver.db['users'] users_collection.update({'_id': ObjectId(target_user_id)}, - {operation: {'groups': admin_group_id}}) + {operation: {'groups': admin_group['_id']}}) user = users_collection.find_one({'_id': ObjectId(target_user_id)}, {'username': 1, 'email': 1, 'full_name': 1}) @@ -294,6 +289,24 @@ def project_manage_users(): return jsonify(user) +def get_admin_group(project): + """Returns the admin group for the project.""" + + groups_collection = current_app.data.driver.db['groups'] + + # TODO: search through all groups to find the one with the project ID as its name. + admin_group_id = ObjectId(project['permissions']['groups'][0]['group']) + group = groups_collection.find_one({'_id': admin_group_id}) + + if group is None: + raise ValueError('Unable to handle project without admin group.') + + if group['name'] != str(project['_id']): + return abort_with_error(403) + + return group + + def abort_with_error(status): """Aborts with the given status, or 500 if the status doesn't indicate an error. diff --git a/tests/test_project_management.py b/tests/test_project_management.py index bc5dd87b..1a425152 100644 --- a/tests/test_project_management.py +++ b/tests/test_project_management.py @@ -433,6 +433,9 @@ class ProjectNodeAccess(AbstractProjectTest): self.assertTrue(db_proj['is_private']) def test_add_remove_user(self): + from application.modules import projects + from application.utils import dumps + project_add_user_url = '/p/users' # Create another user we can try to share the project with @@ -447,36 +450,36 @@ class ProjectNodeAccess(AbstractProjectTest): 'action': 'add'} resp = self.client.post(project_add_user_url, - data=json.dumps(payload), + data=dumps(payload), content_type='application/json', headers={ 'Authorization': self.make_header('token'), 'If-Match': self.project['_etag']}) self.assertEqual(200, resp.status_code, resp.data) + # Check if the user is now actually member of the group. with self.app.test_request_context(): - groups = self.app.data.driver.db['groups'] users = self.app.data.driver.db['users'] - # Get other_user document db_user = users.find_one(ObjectId(other_user_id)) + admin_group = projects.get_admin_group(self.project) - # Get the admin group (has same name as project id) - # TODO: handle case when user has multiple groups - admin_group_id = db_user['groups'][0] - admin_group = groups.find_one({'_id': admin_group_id}) - - # Check if admin group name matches - self.assertEqual(admin_group['name'], str(self.project['_id'])) + self.assertIn(admin_group['_id'], db_user['groups']) # Update payload to remove the user we just added payload['action'] = 'remove' resp = self.client.post(project_add_user_url, - data=json.dumps(payload), + data=dumps(payload), content_type='application/json', headers={ 'Authorization': self.make_header('token'), 'If-Match': self.project['_etag']}) self.assertEqual(200, resp.status_code, resp.data) + # Check if the user is now actually removed from the group. + with self.app.test_request_context(): + users = self.app.data.driver.db['users'] + + db_user = users.find_one(ObjectId(other_user_id)) + self.assertNotIn(admin_group['_id'], db_user['groups'])