Badger service: also manage group membership
For the subscriber, demo and admin roles, the badger service now also manages group membership for the role-specific groups.
This commit is contained in:
parent
ec7b3159ac
commit
ba1f8a4101
@ -12,6 +12,8 @@ from application.modules import local_auth
|
||||
blueprint = Blueprint('service', __name__)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ROLES_WITH_GROUPS = {u'admin', u'demo', u'subscriber'}
|
||||
|
||||
|
||||
@blueprint.route('/badger', methods=['POST'])
|
||||
@authorization.require_login(require_roles={u'service', u'badger'}, require_all=True)
|
||||
@ -60,17 +62,55 @@ def badger():
|
||||
return 'User not found', 404
|
||||
|
||||
# Apply the action
|
||||
roles = set(db_user.get('roles', []) or [])
|
||||
roles = set(db_user.get('roles') or [])
|
||||
if action == 'grant':
|
||||
roles.add(role)
|
||||
else:
|
||||
roles.discard(role)
|
||||
|
||||
groups = manage_user_group_membership(db_user, role, action)
|
||||
|
||||
updates = {'roles': list(roles)}
|
||||
if groups is not None:
|
||||
updates['groups'] = list(groups)
|
||||
|
||||
users_coll.update_one({'_id': db_user['_id']},
|
||||
{'$set': {'roles': list(roles)}})
|
||||
{'$set': updates})
|
||||
|
||||
return '', 204
|
||||
|
||||
|
||||
def manage_user_group_membership(db_user, role, action):
|
||||
"""Some roles have associated groups; this function maintains group & role membership.
|
||||
|
||||
Does NOT alter the given user, nor the database.
|
||||
|
||||
:return: the new groups of the user, or None if the groups shouldn't be changed.
|
||||
:rtype: set
|
||||
"""
|
||||
|
||||
# Currently only three roles have associated groups.
|
||||
if role not in ROLES_WITH_GROUPS:
|
||||
return
|
||||
|
||||
# Find the group
|
||||
groups_coll = current_app.data.driver.db['groups']
|
||||
group = groups_coll.find_one({'name': role}, projection={'_id': 1})
|
||||
if group is None:
|
||||
log.warning('Group for role %r cannot be found, unable to %s members for user %s',
|
||||
role, action, db_user['_id'])
|
||||
return
|
||||
group_id = group['_id']
|
||||
|
||||
user_groups = set(db_user.get('groups') or [])
|
||||
if action == 'grant':
|
||||
user_groups.add(group_id)
|
||||
else:
|
||||
user_groups.discard(group_id)
|
||||
|
||||
return user_groups
|
||||
|
||||
|
||||
def create_service_account(email, roles, service):
|
||||
"""Creates a service account with the given roles + the role 'service'.
|
||||
|
||||
|
@ -11,7 +11,8 @@ class BadgerServiceTest(AbstractPillarTest):
|
||||
|
||||
with self.app.test_request_context():
|
||||
self.badger, token_doc = service.create_service_account(
|
||||
'serviceaccount@example.com', [u'badger'], {u'badger': [u'succubus']}
|
||||
'serviceaccount@example.com', [u'badger'],
|
||||
{u'badger': [u'succubus', u'subscriber', u'demo']}
|
||||
)
|
||||
self.badger_token = token_doc['token']
|
||||
|
||||
@ -49,3 +50,33 @@ class BadgerServiceTest(AbstractPillarTest):
|
||||
with self.app.test_request_context():
|
||||
user = self.app.data.driver.db['users'].find_one(self.user_id)
|
||||
self.assertNotIn(u'admin', user['roles'])
|
||||
|
||||
def test_group_membership(self):
|
||||
"""Certain roles are linked to certain groups."""
|
||||
|
||||
def test_for_group(group_name, test=self.assertIn):
|
||||
# Create the group
|
||||
with self.app.test_request_context():
|
||||
groups_coll = self.app.data.driver.db['groups']
|
||||
result = groups_coll.insert_one({'name': group_name})
|
||||
group_id = result.inserted_id
|
||||
|
||||
# Assign the 'subscriber' role
|
||||
resp = self._post({'action': 'grant',
|
||||
'user_email': self.user_email,
|
||||
'role': group_name})
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
||||
# Check that the user is actually member of that group.
|
||||
with self.app.test_request_context():
|
||||
user = self.app.data.driver.db['users'].find_one(self.user_id)
|
||||
test(group_id, user['groups'])
|
||||
|
||||
# There are special groups for those. Also for admin, but if
|
||||
# it works for those, it also works for admin, and another test
|
||||
# case requires admin to be ingrantable.
|
||||
test_for_group('demo')
|
||||
test_for_group('subscriber')
|
||||
|
||||
# This role isn't linked to group membership.
|
||||
test_for_group('succubus', test=self.assertNotIn)
|
||||
|
Loading…
x
Reference in New Issue
Block a user