Added sync_role_groups management command.

This ensures that group membership is consistent with the user's roles.
The roles are leading in this.
This commit is contained in:
Sybren A. Stüvel 2016-06-14 16:41:37 +02:00
parent ba1f8a4101
commit 36a2e028d4
3 changed files with 130 additions and 23 deletions

View File

@ -14,6 +14,27 @@ log = logging.getLogger(__name__)
ROLES_WITH_GROUPS = {u'admin', u'demo', u'subscriber'}
# Map of role name to group ID, for the above groups.
_role_to_group_id = {}
@blueprint.before_app_first_request
def fetch_role_to_group_id_map():
"""Fills the _role_to_group_id mapping upon application startup."""
global _role_to_group_id
groups_coll = current_app.data.driver.db['groups']
for role in ROLES_WITH_GROUPS:
group = groups_coll.find_one({'name': role}, projection={'_id': 1})
if group is None:
log.warning('Group for role %r not found', role)
continue
_role_to_group_id[role] = group['_id']
log.debug('Group IDs for roles: %s', _role_to_group_id)
@blueprint.route('/badger', methods=['POST'])
@authorization.require_login(require_roles={u'service', u'badger'}, require_all=True)
@ -89,18 +110,20 @@ def manage_user_group_membership(db_user, role, action):
:rtype: set
"""
if action not in {'grant', 'revoke'}:
raise ValueError('Action %r not supported' % action)
# Currently only three roles have associated groups.
if role not in ROLES_WITH_GROUPS:
return
# Find the group
groups_coll = current_app.data.driver.db['groups']
group = groups_coll.find_one({'name': role}, projection={'_id': 1})
if group is None:
try:
group_id = _role_to_group_id[role]
except KeyError:
log.warning('Group for role %r cannot be found, unable to %s members for user %s',
role, action, db_user['_id'])
return
group_id = group['_id']
user_groups = set(db_user.get('groups') or [])
if action == 'grant':

View File

@ -859,5 +859,85 @@ def find_duplicate_users():
projects_coll.count({'user': user['_id']}),
))
@manager.command
def sync_role_groups(do_revoke_groups):
"""For each user, synchronizes roles and group membership.
This ensures that everybody with the 'subscriber' role is also member of the 'subscriber'
group, and people without the 'subscriber' role are not member of that group. Same for
admin and demo groups.
When do_revoke_groups=False (the default), people are only added to groups.
when do_revoke_groups=True, people are also removed from groups.
"""
from application.modules import service
if do_revoke_groups not in {'true', 'false'}:
print('Use either "true" or "false" as first argument.')
print('When passing "false", people are only added to groups.')
print('when passing "true", people are also removed from groups.')
raise SystemExit()
service.fetch_role_to_group_id_map()
users_coll = app.data.driver.db['users']
groups_coll = app.data.driver.db['groups']
group_names = {}
def gname(gid):
try:
return group_names[gid]
except KeyError:
name = groups_coll.find_one(gid, projection={'name': 1})['name']
name = str(name)
group_names[gid] = name
return name
ok_users = bad_users = 0
for user in users_coll.find():
for role in service.ROLES_WITH_GROUPS:
action = 'grant' if role in user.get('roles', ()) else 'revoke'
groups = service.manage_user_group_membership(user, role, action)
if groups is None:
# No changes required
ok_users += 1
continue
current_groups = set(user.get('groups'))
if groups == current_groups:
ok_users += 1
continue
bad_users += 1
grant_groups = groups.difference(current_groups)
revoke_groups = current_groups.difference(groups)
print('Discrepancy for user %s/%s:' % (user['_id'], user['full_name']))
print(' - actual groups :', sorted(gname(gid) for gid in user.get('groups')))
print(' - expected groups:', sorted(gname(gid) for gid in groups))
print(' - will grant :', sorted(gname(gid) for gid in grant_groups))
print(' - might revoke :', sorted(gname(gid) for gid in revoke_groups))
if grant_groups and revoke_groups:
print(' ------ CAREFUL this one has BOTH grant AND revoke -----')
# Determine which changes we'll apply
if do_revoke_groups:
final_groups = groups
else:
final_groups = current_groups.union(grant_groups)
print(' - final groups :', sorted(gname(gid) for gid in final_groups))
# Perform the actual update
users_coll.update_one({'_id': user['_id']},
{'$set': {'groups': list(final_groups)}})
print('%i bad and %i ok users seen.' % (bad_users, ok_users))
if __name__ == '__main__':
manager.run()

View File

@ -54,29 +54,33 @@ class BadgerServiceTest(AbstractPillarTest):
def test_group_membership(self):
"""Certain roles are linked to certain groups."""
def test_for_group(group_name, test=self.assertIn):
# Create the group
with self.app.test_request_context():
from application.modules import service
with self.app.test_request_context():
# Create the groups
group_ids = {}
for group_name in ['admin', 'demo', 'subscriber', 'succubus']:
groups_coll = self.app.data.driver.db['groups']
result = groups_coll.insert_one({'name': group_name})
group_id = result.inserted_id
group_ids[group_name] = result.inserted_id
service.fetch_role_to_group_id_map()
# Assign the 'subscriber' role
resp = self._post({'action': 'grant',
'user_email': self.user_email,
'role': group_name})
self.assertEqual(204, resp.status_code)
def test_for_group(group_name, test=self.assertIn):
# Assign the 'subscriber' role
resp = self._post({'action': 'grant',
'user_email': self.user_email,
'role': group_name})
self.assertEqual(204, resp.status_code)
# Check that the user is actually member of that group.
with self.app.test_request_context():
# Check that the user is actually member of that group.
user = self.app.data.driver.db['users'].find_one(self.user_id)
test(group_id, user['groups'])
test(group_ids[group_name], user['groups'])
# There are special groups for those. Also for admin, but if
# it works for those, it also works for admin, and another test
# case requires admin to be ingrantable.
test_for_group('demo')
test_for_group('subscriber')
# There are special groups for those. Also for admin, but if
# it works for those, it also works for admin, and another test
# case requires admin to be ingrantable.
test_for_group('demo')
test_for_group('subscriber')
# This role isn't linked to group membership.
test_for_group('succubus', test=self.assertNotIn)
# This role isn't linked to group membership.
test_for_group('succubus', test=self.assertNotIn)