Slight improvement to project group mgmnt tests
This commit is contained in:
@@ -262,18 +262,13 @@ def project_manage_users():
|
|||||||
|
|
||||||
projects_collection = current_app.data.driver.db['projects']
|
projects_collection = current_app.data.driver.db['projects']
|
||||||
project = projects_collection.find_one({'_id': ObjectId(project_id)})
|
project = projects_collection.find_one({'_id': ObjectId(project_id)})
|
||||||
|
|
||||||
# Check if the current_user is owner of the project
|
# Check if the current_user is owner of the project
|
||||||
# TODO: check based on permissions
|
# TODO: check based on permissions
|
||||||
if project['user'] != user_id:
|
if project['user'] != user_id:
|
||||||
return abort_with_error(403)
|
return abort_with_error(403)
|
||||||
# Get admin group
|
|
||||||
# TODO: improve this by checking actual permissions
|
admin_group = get_admin_group(project)
|
||||||
admin_group_id = project['permissions']['groups'][0]['group']
|
|
||||||
# Additional check for admin group (if group name is the same as project id)
|
|
||||||
groups_collection = current_app.data.driver.db['groups']
|
|
||||||
group = groups_collection.find_one({'_id': admin_group_id})
|
|
||||||
if group['name'] != project_id:
|
|
||||||
return abort_with_error(403)
|
|
||||||
|
|
||||||
# Get the user and add the admin group to it
|
# Get the user and add the admin group to it
|
||||||
if action == 'add':
|
if action == 'add':
|
||||||
@@ -285,7 +280,7 @@ def project_manage_users():
|
|||||||
|
|
||||||
users_collection = current_app.data.driver.db['users']
|
users_collection = current_app.data.driver.db['users']
|
||||||
users_collection.update({'_id': ObjectId(target_user_id)},
|
users_collection.update({'_id': ObjectId(target_user_id)},
|
||||||
{operation: {'groups': admin_group_id}})
|
{operation: {'groups': admin_group['_id']}})
|
||||||
user = users_collection.find_one({'_id': ObjectId(target_user_id)},
|
user = users_collection.find_one({'_id': ObjectId(target_user_id)},
|
||||||
{'username': 1, 'email': 1,
|
{'username': 1, 'email': 1,
|
||||||
'full_name': 1})
|
'full_name': 1})
|
||||||
@@ -294,6 +289,24 @@ def project_manage_users():
|
|||||||
return jsonify(user)
|
return jsonify(user)
|
||||||
|
|
||||||
|
|
||||||
|
def get_admin_group(project):
|
||||||
|
"""Returns the admin group for the project."""
|
||||||
|
|
||||||
|
groups_collection = current_app.data.driver.db['groups']
|
||||||
|
|
||||||
|
# TODO: search through all groups to find the one with the project ID as its name.
|
||||||
|
admin_group_id = ObjectId(project['permissions']['groups'][0]['group'])
|
||||||
|
group = groups_collection.find_one({'_id': admin_group_id})
|
||||||
|
|
||||||
|
if group is None:
|
||||||
|
raise ValueError('Unable to handle project without admin group.')
|
||||||
|
|
||||||
|
if group['name'] != str(project['_id']):
|
||||||
|
return abort_with_error(403)
|
||||||
|
|
||||||
|
return group
|
||||||
|
|
||||||
|
|
||||||
def abort_with_error(status):
|
def abort_with_error(status):
|
||||||
"""Aborts with the given status, or 500 if the status doesn't indicate an error.
|
"""Aborts with the given status, or 500 if the status doesn't indicate an error.
|
||||||
|
|
||||||
|
@@ -433,6 +433,9 @@ class ProjectNodeAccess(AbstractProjectTest):
|
|||||||
self.assertTrue(db_proj['is_private'])
|
self.assertTrue(db_proj['is_private'])
|
||||||
|
|
||||||
def test_add_remove_user(self):
|
def test_add_remove_user(self):
|
||||||
|
from application.modules import projects
|
||||||
|
from application.utils import dumps
|
||||||
|
|
||||||
project_add_user_url = '/p/users'
|
project_add_user_url = '/p/users'
|
||||||
|
|
||||||
# Create another user we can try to share the project with
|
# Create another user we can try to share the project with
|
||||||
@@ -447,36 +450,36 @@ class ProjectNodeAccess(AbstractProjectTest):
|
|||||||
'action': 'add'}
|
'action': 'add'}
|
||||||
|
|
||||||
resp = self.client.post(project_add_user_url,
|
resp = self.client.post(project_add_user_url,
|
||||||
data=json.dumps(payload),
|
data=dumps(payload),
|
||||||
content_type='application/json',
|
content_type='application/json',
|
||||||
headers={
|
headers={
|
||||||
'Authorization': self.make_header('token'),
|
'Authorization': self.make_header('token'),
|
||||||
'If-Match': self.project['_etag']})
|
'If-Match': self.project['_etag']})
|
||||||
self.assertEqual(200, resp.status_code, resp.data)
|
self.assertEqual(200, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
# Check if the user is now actually member of the group.
|
||||||
with self.app.test_request_context():
|
with self.app.test_request_context():
|
||||||
groups = self.app.data.driver.db['groups']
|
|
||||||
users = self.app.data.driver.db['users']
|
users = self.app.data.driver.db['users']
|
||||||
|
|
||||||
# Get other_user document
|
|
||||||
db_user = users.find_one(ObjectId(other_user_id))
|
db_user = users.find_one(ObjectId(other_user_id))
|
||||||
|
admin_group = projects.get_admin_group(self.project)
|
||||||
|
|
||||||
# Get the admin group (has same name as project id)
|
self.assertIn(admin_group['_id'], db_user['groups'])
|
||||||
# TODO: handle case when user has multiple groups
|
|
||||||
admin_group_id = db_user['groups'][0]
|
|
||||||
admin_group = groups.find_one({'_id': admin_group_id})
|
|
||||||
|
|
||||||
# Check if admin group name matches
|
|
||||||
self.assertEqual(admin_group['name'], str(self.project['_id']))
|
|
||||||
|
|
||||||
# Update payload to remove the user we just added
|
# Update payload to remove the user we just added
|
||||||
payload['action'] = 'remove'
|
payload['action'] = 'remove'
|
||||||
|
|
||||||
resp = self.client.post(project_add_user_url,
|
resp = self.client.post(project_add_user_url,
|
||||||
data=json.dumps(payload),
|
data=dumps(payload),
|
||||||
content_type='application/json',
|
content_type='application/json',
|
||||||
headers={
|
headers={
|
||||||
'Authorization': self.make_header('token'),
|
'Authorization': self.make_header('token'),
|
||||||
'If-Match': self.project['_etag']})
|
'If-Match': self.project['_etag']})
|
||||||
self.assertEqual(200, resp.status_code, resp.data)
|
self.assertEqual(200, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
# Check if the user is now actually removed from the group.
|
||||||
|
with self.app.test_request_context():
|
||||||
|
users = self.app.data.driver.db['users']
|
||||||
|
|
||||||
|
db_user = users.find_one(ObjectId(other_user_id))
|
||||||
|
self.assertNotIn(admin_group['_id'], db_user['groups'])
|
||||||
|
Reference in New Issue
Block a user