Load user capabilities from Pillar config and allow extensions to extend.

Default caps can be overridden using the USER_CAPABILITIES name in
config_local.py. These can be extended by Pillar Extensions.
This commit is contained in:
Sybren A. Stüvel 2017-08-22 11:31:17 +02:00
parent 566f2a4835
commit 2b09711eb0
6 changed files with 123 additions and 4 deletions

View File

@ -32,7 +32,6 @@ def _get_current_app():
current_app: 'PillarServer' = LocalProxy(_get_current_app)
"""the current app, annotated as PillarServer"""
from pillar.api import custom_field_validation
from pillar.api.utils import authentication
import pillar.web.jinja
@ -78,6 +77,10 @@ class PillarServer(Eve):
}
self._user_roles_indexable: typing.Set[str] = {'demo', 'admin', 'subscriber'}
# Mapping from role name to capabilities given to that role.
self._user_caps: typing.MutableMapping[str, typing.FrozenSet[str]] = \
collections.defaultdict(frozenset)
self.app_root = os.path.abspath(app_root)
self._load_flask_config()
self._config_logging()
@ -382,6 +385,24 @@ class PillarServer(Eve):
self.log.info('Loaded %i user roles from extensions, %i of which are indexable',
len(self._user_roles), len(self._user_roles_indexable))
def _config_user_caps(self):
"""Merges all capability settings from app config and extensions."""
app_caps = collections.defaultdict(frozenset, **self.config['USER_CAPABILITIES'])
for extension in self.pillar_extensions.values():
ext_caps = extension.user_caps
for role, caps in ext_caps.items():
union_caps = frozenset(app_caps[role] | caps)
app_caps[role] = union_caps
self._user_caps = app_caps
if self.log.isEnabledFor(logging.INFO):
import pprint
self.log.info('Configured user capabilities: %s', pprint.pformat(self._user_caps))
def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder):
from pillar.web.staticfile import PillarStaticFile
@ -555,6 +576,7 @@ class PillarServer(Eve):
self._config_jinja_env()
self._config_static_dirs()
self._config_user_roles()
self._config_user_caps()
# Only enable this when debugging.
# self._list_routes()
@ -715,3 +737,7 @@ class PillarServer(Eve):
@property
def user_roles_indexable(self) -> typing.FrozenSet[str]:
return frozenset(self._user_roles_indexable)
@property
def user_caps(self) -> typing.Mapping[str, typing.FrozenSet[str]]:
return self._user_caps

View File

@ -17,6 +17,10 @@ from flask import current_app
log = logging.getLogger(__name__)
# Construction is done when requested, since constructing a UserClass instance
# requires an application context to look up capabilities. We set the initial
# value to a not-None singleton to be able to differentiate between
# g.current_user set to "not logged in" or "uninitialised CLI_USER".
CLI_USER = ...

View File

@ -9,6 +9,8 @@ import flask_login
import flask_oauthlib.client
from werkzeug.local import LocalProxy
from pillar import current_app
import bson
from ..api import utils
@ -85,10 +87,14 @@ class UserClass(flask_login.UserMixin):
return default
def collect_capabilities(self):
"""Constructs the capabilities set given the user's current roles."""
"""Constructs the capabilities set given the user's current roles.
self.capabilities = set().union(*(CAPABILITIES.get(role, frozenset())
for role in self.roles))
Requires an application context to be active.
"""
app_caps = current_app.user_caps
self.capabilities = set().union(*(app_caps[role] for role in self.roles))
def has_role(self, *roles):
"""Returns True iff the user has one or more of the given roles."""

View File

@ -160,3 +160,12 @@ TLS_CERT_FILE = requests.certs.where()
CELERY_BACKEND = 'redis://redis/1'
CELERY_BROKER = 'amqp://guest:guest@rabbit//'
# Mapping from user role to capabilities obtained by users with that role.
USER_CAPABILITIES = defaultdict(**{
'subscriber': {'subscriber', 'home-project'},
'demo': {'subscriber', 'home-project'},
'admin': {'subscriber', 'home-project', 'video-encoding', 'admin',
'view-pending-nodes', 'edit-project-node-types'},
}, default_factory=frozenset)

View File

@ -36,6 +36,10 @@ class PillarExtension(object, metaclass=abc.ABCMeta):
user_roles: typing.Set[str] = set()
user_roles_indexable: typing.Set[str] = set()
# User capabilities introduced by this extension. The final set of
# capabilities is the union of all app-level and extension-level caps.
user_caps: typing.Mapping[str, typing.FrozenSet] = {}
@property
@abc.abstractmethod
def name(self):

View File

@ -0,0 +1,70 @@
from pillar.tests import PillarTestServer, AbstractPillarTest
class UserCapsTestServer(PillarTestServer):
def __init__(self, *args, **kwargs):
PillarTestServer.__init__(self, *args, **kwargs)
from pillar.extension import PillarExtension
# Late-declare this class, so that it is recreated for each unit test.
class UserCapsTestExtension(PillarExtension):
user_roles = {
'test-user',
'test-မျောက်',
}
user_caps = {
'subscriber': {'extra-sub-cap', 'another-cap'},
'test-user': {'test-user-cap-1', 'နဂါးမောက်သီး'},
'test-မျောက်': {'test-monkey-cap-1', 'နဂါးမောက်သီး'},
}
@property
def name(self):
return 'test_user_caps'
def flask_config(self):
return {}
def eve_settings(self):
return {}
def blueprints(self):
return []
self.load_extension(UserCapsTestExtension(), '/user-caps-test')
class UserCapsTest(AbstractPillarTest):
pillar_server_class = UserCapsTestServer
def setUp(self, **kwargs):
super().setUp(**kwargs)
def tearDown(self):
super().tearDown()
def test_default_caps(self):
app_caps = self.app.user_caps
self.assertEqual(app_caps['demo'], frozenset({
'subscriber', 'home-project'
}))
def test_aggr_caps_merged_subscriber(self):
app_caps = self.app.user_caps
self.assertEqual(app_caps['subscriber'], frozenset({
'subscriber', 'home-project', 'extra-sub-cap', 'another-cap'
}))
def test_aggr_caps_new_roles(self):
app_caps = self.app.user_caps
self.assertEqual(app_caps['test-user'], frozenset({
'test-user-cap-1', 'နဂါးမောက်သီး'
}))
self.assertEqual(app_caps['test-မျောက်'], frozenset({
'test-monkey-cap-1', 'နဂါးမောက်သီး'
}))