Added node_type_utils to assign permissions to certain node types.

This separates "mechanism" from "policy".
This commit is contained in:
Sybren A. Stüvel 2016-09-23 17:13:26 +02:00
parent 91e3ec659f
commit 7968c6ca37
2 changed files with 203 additions and 0 deletions

View File

@ -0,0 +1,79 @@
import copy
import logging
log = logging.getLogger(__name__)
def assign_permissions(project, node_types, permission_callback):
"""Generator, yields the node types with certain permissions set.
The permission_callback is called for each node type, and each user
and group permission in the project, and should return the appropriate
extra permissions for that node type.
Yields copies of the given node types with new permissions.
permission_callback(node_type, uwg, ident, proj_methods) is returned, where
- 'node_type' is the node type dict
- 'ugw' is either 'user', 'group', or 'world',
- 'ident' is the group or user ID, or None when ugw is 'world',
- 'proj_methods' is the list of already-allowed project methods.
"""
proj_perms = project['permissions']
for nt in node_types:
permissions = {}
for key in ('users', 'groups'):
perms = proj_perms[key]
singular = key.rstrip('s')
for perm in perms:
assert isinstance(perm, dict), 'perm should be dict, but is %r' % perm
ident = perm[singular] # group or user ID.
methods_to_allow = permission_callback(nt, singular, ident, perm['methods'])
if not methods_to_allow:
continue
permissions.setdefault(key, []).append(
{singular: ident,
'methods': methods_to_allow}
)
# World permissions are simpler.
world_methods_to_allow = permission_callback(nt, 'world', None,
permissions.get('world', []))
if world_methods_to_allow:
permissions.setdefault('world', []).extend(world_methods_to_allow)
node_type = copy.deepcopy(nt)
if permissions:
node_type['permissions'] = permissions
yield node_type
def add_to_project(project, node_types, replace_existing):
"""Adds the given node types to the project.
Overwrites any existing by the same name when replace_existing=True.
"""
project_id = project['_id']
for node_type in node_types:
found = [nt for nt in project['node_types']
if nt['name'] == node_type['name']]
if found:
assert len(found) == 1, 'node type name should be unique (found %ix)' % len(found)
# TODO: validate that the node type contains all the properties Attract needs.
if replace_existing:
log.info('Replacing existing node type %s on project %s',
node_type['name'], project_id)
project['node_types'].remove(found[0])
else:
continue
project['node_types'].append(node_type)

View File

@ -0,0 +1,124 @@
import unittest
from pillar.api.utils import node_type_utils
class NodeTypeUtilsTest(unittest.TestCase):
def setUp(self):
self.proj = {
'permissions': {
'users': [
{'user': 41,
'methods': ['GET', 'POST']},
],
'groups': [
{'group': 1,
'methods': ['GET', 'PUT']},
{'group': 2,
'methods': ['DELETE']},
]
}
}
self.node_type_1 = {'name': 'node-type-1'}
self.node_type_2 = {'name': 'node-type-2'}
self.node_types = [self.node_type_1, self.node_type_2]
def test_trivial(self):
def callback(*args):
self.fail('Callback should not be called.')
gen = node_type_utils.assign_permissions(self.proj, [], callback)
self.assertEqual([], list(gen))
def test_not_modified(self):
def callback(*args):
return []
gen = node_type_utils.assign_permissions(self.proj, self.node_types, callback)
new_types = list(gen)
# They should be equal, but be copies, not references.
self.assertEqual(self.node_types, new_types)
self.assertIsNot(self.node_type_1, new_types[0])
self.assertIsNot(self.node_type_2, new_types[1])
def test_modified(self):
def callback(node_type, ugw, ident, proj_methods):
if node_type['name'] == 'node-type-1' and ugw == 'user':
self.assertEqual(ident, 41)
self.assertEqual(proj_methods, ['GET', 'POST'])
return ['SPLOOSH']
if node_type['name'] == 'node-type-2' and ugw == 'group':
if ident == 1:
self.assertEqual(proj_methods, ['GET', 'PUT'])
return ['SPLASH', 'EEK']
self.assertEqual(proj_methods, ['DELETE'])
return ['OOF']
if node_type['name'] == 'node-type-2' and ugw == 'world':
self.assertEqual(proj_methods, [])
return ['ICECREAM']
return None
gen = node_type_utils.assign_permissions(self.proj, self.node_types, callback)
new_types = list(gen)
# Only the additional permissions should be included in the node type.
self.assertEqual({'name': 'node-type-1',
'permissions': {
'users': [
{'user': 41,
'methods': ['SPLOOSH']},
],
}},
new_types[0])
self.assertEqual({'name': 'node-type-2',
'permissions': {
'groups': [
{'group': 1,
'methods': ['SPLASH', 'EEK']},
{'group': 2,
'methods': ['OOF']},
],
'world': ['ICECREAM']
}},
new_types[1])
def test_already_existing(self):
def callback(node_type, ugw, ident, proj_methods):
if node_type['name'] == 'node-type-1' and ugw == 'user':
return ['POST']
if node_type['name'] == 'node-type-1' and ugw == 'world':
self.assertIsNone(ident)
return ['GET']
return None
self.node_type_1['permissions'] = {
'users': [
{'user': 41,
'methods': ['GET', 'POST']},
],
'world': ['GET']
}
gen = node_type_utils.assign_permissions(self.proj, self.node_types, callback)
new_types = list(gen)
# These permissions are explicitly given to this node type, and even though are already
# present on the project, should still be included here.
self.assertEqual({'name': 'node-type-1',
'permissions': {
'users': [
{'user': 41,
'methods': ['POST']},
],
'world': ['GET']
}},
new_types[0])
self.assertEqual({'name': 'node-type-2'}, new_types[1])