Introduced role-based capability system.

It's still rather limited and hard-coded, but it works.
This commit is contained in:
Sybren A. Stüvel 2017-08-18 14:47:42 +02:00
parent 566a23d3b6
commit 575a7ed1a7
14 changed files with 137 additions and 27 deletions

View File

@ -16,7 +16,7 @@ blueprint_api = Blueprint('projects_api', __name__)
@blueprint_api.route('/create', methods=['POST']) @blueprint_api.route('/create', methods=['POST'])
@authorization.require_login(require_roles={'admin', 'subscriber', 'demo'}) @authorization.require_login(require_cap='subscriber')
def create_project(overrides=None): def create_project(overrides=None):
"""Creates a new project.""" """Creates a new project."""

View File

@ -265,12 +265,19 @@ def merge_permissions(*args):
def require_login(require_roles=set(), def require_login(require_roles=set(),
require_cap='',
require_all=False): require_all=False):
"""Decorator that enforces users to authenticate. """Decorator that enforces users to authenticate.
Optionally only allows access to users with a certain role. Optionally only allows access to users with a certain role and/or capability.
Either check on roles or on a capability, but never on both. There is no
require_all check for capabilities; if you need to check for multiple
capabilities at once, it's a sign that you need to add another capability
and give it to everybody that needs it.
:param require_roles: set of roles. :param require_roles: set of roles.
:param require_cap: a capability.
:param require_all: :param require_all:
When False (the default): if the user's roles have a When False (the default): if the user's roles have a
non-empty intersection with the given roles, access is granted. non-empty intersection with the given roles, access is granted.
@ -279,7 +286,13 @@ def require_login(require_roles=set(),
""" """
if not isinstance(require_roles, set): if not isinstance(require_roles, set):
raise TypeError('require_roles param should be a set, but is a %r' % type(require_roles)) raise TypeError(f'require_roles param should be a set, but is {type(require_roles)!r}')
if not isinstance(require_cap, str):
raise TypeError(f'require_caps param should be a str, but is {type(require_cap)!r}')
if require_roles and require_cap:
raise ValueError('either use require_roles or require_cap, but not both')
if require_all and not require_roles: if require_all and not require_roles:
raise ValueError('require_login(require_all=True) cannot be used with empty require_roles.') raise ValueError('require_login(require_all=True) cannot be used with empty require_roles.')
@ -287,15 +300,21 @@ def require_login(require_roles=set(),
def decorator(func): def decorator(func):
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
if not user_matches_roles(require_roles, require_all): if g.current_user is None:
if g.current_user is None: # We don't need to log at a higher level, as this is very common.
# We don't need to log at a higher level, as this is very common. # Many browsers first try to see whether authentication is needed
# Many browsers first try to see whether authentication is needed # at all, before sending the password.
# at all, before sending the password. log.debug('Unauthenticated acces to %s attempted.', func)
log.debug('Unauthenticated acces to %s attempted.', func) abort(403)
else:
log.warning('User %s is authenticated, but does not have required roles %s to ' if require_roles and not g.current_user.matches_roles(require_roles, require_all):
'access %s', g.current_user['user_id'], require_roles, func) log.warning('User %s is authenticated, but does not have required roles %s to '
'access %s', g.current_user['user_id'], require_roles, func)
abort(403)
if require_cap and not g.current_user.has_cap(require_cap):
log.warning('User %s is authenticated, but does not have required capability %s to '
'access %s', g.current_user.user_id, require_cap, func)
abort(403) abort(403)
return func(*args, **kwargs) return func(*args, **kwargs)
@ -352,6 +371,23 @@ def user_has_role(role, user: UserClass=None):
return user.has_role(role) return user.has_role(role)
def user_has_cap(capability: str, user: UserClass=None) -> bool:
"""Returns True iff the user is logged in and has the given capability."""
assert capability
if user is None:
user = g.get('current_user')
if user is None:
return False
if not isinstance(user, UserClass):
raise TypeError(f'user should be instance of UserClass, not {type(user)}')
return user.has_cap(capability)
def user_matches_roles(require_roles=set(), def user_matches_roles(require_roles=set(),
require_all=False): require_all=False):
"""Returns True iff the user's roles matches the query. """Returns True iff the user's roles matches the query.

View File

@ -16,6 +16,14 @@ from ..api.utils import authentication
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Mapping from user role to capabilities obtained by users with that role.
CAPABILITIES = collections.defaultdict(**{
'subscriber': {'subscriber', 'home-project'},
'demo': {'subscriber', 'home-project'},
'admin': {'subscriber', 'home-project', 'video-encoding', 'admin',
'view-pending-nodes', 'edit-project-node-types'},
}, default_factory=frozenset)
class UserClass(flask_login.UserMixin): class UserClass(flask_login.UserMixin):
def __init__(self, token: typing.Optional[str]): def __init__(self, token: typing.Optional[str]):
@ -30,6 +38,7 @@ class UserClass(flask_login.UserMixin):
self.roles: typing.List[str] = [] self.roles: typing.List[str] = []
self.groups: typing.List[str] = [] # NOTE: these are stringified object IDs. self.groups: typing.List[str] = [] # NOTE: these are stringified object IDs.
self.group_ids: typing.List[bson.ObjectId] = [] self.group_ids: typing.List[bson.ObjectId] = []
self.capabilities: typing.Set[str] = set()
@classmethod @classmethod
def construct(cls, token: str, db_user: dict) -> 'UserClass': def construct(cls, token: str, db_user: dict) -> 'UserClass':
@ -48,6 +57,7 @@ class UserClass(flask_login.UserMixin):
user.objectid = str(db_user['_id']) user.objectid = str(db_user['_id'])
user.gravatar = utils.gravatar(user.email) user.gravatar = utils.gravatar(user.email)
user.groups = [str(g) for g in user.group_ids] user.groups = [str(g) for g in user.group_ids]
user.collect_capabilities()
return user return user
@ -71,6 +81,12 @@ class UserClass(flask_login.UserMixin):
except KeyError: except KeyError:
return default return default
def collect_capabilities(self):
"""Constructs the capabilities set given the user's current roles."""
self.capabilities = set().union(*(CAPABILITIES.get(role, frozenset())
for role in self.roles))
def has_role(self, *roles): def has_role(self, *roles):
"""Returns True iff the user has one or more of the given roles.""" """Returns True iff the user has one or more of the given roles."""
@ -79,6 +95,14 @@ class UserClass(flask_login.UserMixin):
return bool(set(self.roles).intersection(set(roles))) return bool(set(self.roles).intersection(set(roles)))
def has_cap(self, *capabilities: typing.Iterable[str]) -> bool:
"""Returns True iff the user has one or more of the given capabilities."""
if not self.capabilities:
return False
return bool(set(self.capabilities).intersection(set(capabilities)))
def matches_roles(self, def matches_roles(self,
require_roles=set(), require_roles=set(),
require_all=False) -> bool: require_all=False) -> bool:
@ -113,6 +137,8 @@ class AnonymousUser(flask_login.AnonymousUserMixin, UserClass):
def has_role(self, *roles): def has_role(self, *roles):
return False return False
def has_cap(self, *capabilities):
return False
def _load_user(token) -> typing.Union[UserClass, AnonymousUser]: def _load_user(token) -> typing.Union[UserClass, AnonymousUser]:

View File

@ -205,6 +205,8 @@ def settings_billing():
@blueprint.route('/u/<user_id>/edit', methods=['GET', 'POST']) @blueprint.route('/u/<user_id>/edit', methods=['GET', 'POST'])
@login_required @login_required
def users_edit(user_id): def users_edit(user_id):
from pillar.auth import UserClass
if not current_user.has_role('admin'): if not current_user.has_role('admin'):
return abort(403) return abort(403)
api = system_util.pillar_api() api = system_util.pillar_api()
@ -221,7 +223,9 @@ def users_edit(user_id):
else: else:
form.roles.data = user.roles form.roles.data = user.roles
form.email.data = user.email form.email.data = user.email
return render_template('users/edit_embed.html', user=user, form=form)
user_ob = UserClass.construct('', db_user=user.to_dict())
return render_template('users/edit_embed.html', user=user_ob, form=form)
def _users_edit(form, user, api): def _users_edit(form, user, api):

View File

@ -96,7 +96,7 @@ $search-hit-width_grid: 100px
#search-container #search-container
display: flex display: flex
border-radius: 0 border-radius: 0
min-height: 500px min-height: 600px
background-color: white background-color: white
+container-behavior +container-behavior
@ -782,4 +782,3 @@ $search-hit-width_grid: 100px
color: white color: white
background-color: $color-primary background-color: $color-primary
border-color: transparent border-color: transparent

View File

@ -4,7 +4,7 @@
| {% if current_user.has_role('demo') %} | {% if current_user.has_role('demo') %}
| {% set subscription = 'demo' %} | {% set subscription = 'demo' %}
| {% elif current_user.has_role('subscriber') %} | {% elif current_user.has_cap('subscriber') %}
| {% set subscription = 'subscriber' %} | {% set subscription = 'subscriber' %}
| {% else %} | {% else %}
| {% set subscription = 'none' %} | {% set subscription = 'none' %}

View File

@ -8,7 +8,7 @@
| {% if current_user.is_authenticated %} | {% if current_user.is_authenticated %}
| {% if current_user.has_role('demo') %} | {% if current_user.has_role('demo') %}
| {% set subscription = 'demo' %} | {% set subscription = 'demo' %}
| {% elif current_user.has_role('subscriber') %} | {% elif current_user.has_cap('subscriber') %}
| {% set subscription = 'subscriber' %} | {% set subscription = 'subscriber' %}
| {% else %} | {% else %}
| {% set subscription = 'none' %} | {% set subscription = 'none' %}

View File

@ -44,7 +44,7 @@
| {# * User is authenticated, but has no subscription or 'POST' permission #} | {# * User is authenticated, but has no subscription or 'POST' permission #}
.comment-reply-form .comment-reply-form
.comment-reply-field.sign-in .comment-reply-field.sign-in
| {% if current_user.has_role('subscriber') or current_user.has_role('demo') %} | {% if current_user.has_cap('subscriber') %}
i.pi-lock i.pi-lock
| Only project members can comment. | Only project members can comment.
| {% else %} | {% else %}

View File

@ -18,7 +18,7 @@ span#project-edit-title
When we add support for new node types in the future, it means we When we add support for new node types in the future, it means we
allow the creation of new items (such as textures). allow the creation of new items (such as textures).
| {% if current_user.has_role('admin') %} | {% if current_user.has_cap('edit-project-node-types') %}
ul.list-generic ul.list-generic
| {% for node_type in project.node_types %} | {% for node_type in project.node_types %}
li li

View File

@ -39,7 +39,7 @@ meta(name="twitter:image", content="{{ url_for('static', filename='assets/img/ba
span ({{ projects_shared|length }}) span ({{ projects_shared|length }})
| {% endif %} | {% endif %}
| {% if (current_user.has_role('subscriber') or current_user.has_role('admin')) %} | {% if current_user.has_cap('subscriber') %}
li.create( li.create(
data-url="{{ url_for('projects.create') }}") data-url="{{ url_for('projects.create') }}")
a#project-create( a#project-create(
@ -73,7 +73,7 @@ meta(name="twitter:image", content="{{ url_for('static', filename='assets/img/ba
li.when(title="{{ project._created }}") {{ project._created | pretty_date }} li.when(title="{{ project._created }}") {{ project._created | pretty_date }}
li.edit li.edit
a(href="{{ url_for('projects.edit', project_url=project.url) }}") Edit a(href="{{ url_for('projects.edit', project_url=project.url) }}") Edit
| {% if project.status == 'pending' and current_user.is_authenticated and current_user.has_role('admin') %} | {% if project.status == 'pending' and current_user.has_cap('view-pending-nodes') %}
li.pending Not Published li.pending Not Published
| {% endif %} | {% endif %}
@ -113,7 +113,7 @@ meta(name="twitter:image", content="{{ url_for('static', filename='assets/img/ba
li.who by {{ project.user.full_name }} li.who by {{ project.user.full_name }}
li.edit li.edit
a(href="{{ url_for('projects.edit', project_url=project.url) }}") Edit a(href="{{ url_for('projects.edit', project_url=project.url) }}") Edit
| {% if project.status == 'pending' and current_user.is_authenticated and current_user.has_role('admin') %} | {% if project.status == 'pending' and current_user.has_cap('view-pending-nodes') %}
li.pending Not Published li.pending Not Published
| {% endif %} | {% endif %}

View File

@ -11,7 +11,7 @@ span#project-edit-title
#node-edit-container #node-edit-container
#node-edit-form #node-edit-form
.col-md-6 .col-md-6
| {% if (project.user == current_user.objectid or current_user.has_role('admin')) %} | {% if (project.user == current_user.objectid or current_user.has_cap('admin')) %}
.sharing-users-search .sharing-users-search
.form-group .form-group
input#user-select.form-control( input#user-select.form-control(
@ -44,7 +44,7 @@ span#project-edit-title
span.sharing-users-extra {{user['username']}} span.sharing-users-extra {{user['username']}}
.sharing-users-action .sharing-users-action
| {# Only allow deletion if we are: admin, project owners, or current_user in the team #} | {# Only allow deletion if we are: admin, project owners, or current_user in the team #}
| {% if current_user.has_role('admin') or (project.user == current_user.objectid) or (current_user.objectid == user['_id']) %} | {% if current_user.has_cap('admin') or (project.user == current_user.objectid) or (current_user.objectid == user['_id']) %}
| {% if project.user == user['_id'] %} | {% if project.user == user['_id'] %}
span span
@ -70,7 +70,7 @@ span#project-edit-title
| {% endblock %} | {% endblock %}
| {% block footer_scripts %} | {% block footer_scripts %}
| {% if (project.user == current_user.objectid or current_user.has_role('admin')) %} | {% if (project.user == current_user.objectid or current_user.has_cap('admin')) %}
script(src="{{ url_for('static_pillar', filename='assets/js/vendor/jquery.autocomplete-0.22.0.min.js') }}", async=true) script(src="{{ url_for('static_pillar', filename='assets/js/vendor/jquery.autocomplete-0.22.0.min.js') }}", async=true)
script. script.
$(document).ready(function() { $(document).ready(function() {

View File

@ -177,7 +177,7 @@ link(href="{{ url_for('static_pillar', filename='assets/css/project-main.css', v
i.pi-more-vertical i.pi-more-vertical
ul.dropdown-menu ul.dropdown-menu
| {% if current_user.has_role('admin') %} | {% if current_user.has_cap('admin') %}
li.button-featured li.button-featured
a#item_featured( a#item_featured(
href="javascript:void(0);", href="javascript:void(0);",

View File

@ -59,6 +59,18 @@
| {% endfor %} | {% endfor %}
.form-group.capabilities
label Capabilities
| {% if user.capabilities %}
ul
| {% for cap in user.capabilities|sort %}
li {{ cap }}
| {% endfor %}
| {% else %}
p
i.pi-cancel
| none
| {% endif %}
a#button-cancel.btn.btn-default(href="#", data-user-id='{{user._id}}') Cancel a#button-cancel.btn.btn-default(href="#", data-user-id='{{user._id}}') Cancel

View File

@ -526,7 +526,6 @@ class PermissionComputationTest(AbstractPillarTest):
class RequireRolesTest(AbstractPillarTest): class RequireRolesTest(AbstractPillarTest):
def test_no_roles_required(self): def test_no_roles_required(self):
from flask import g
from pillar.api.utils.authorization import require_login from pillar.api.utils.authorization import require_login
called = [False] called = [False]
@ -604,6 +603,40 @@ class RequireRolesTest(AbstractPillarTest):
self.assertFalse(user_has_role('admin', make_user(None))) self.assertFalse(user_has_role('admin', make_user(None)))
self.assertFalse(user_has_role('admin', None)) self.assertFalse(user_has_role('admin', None))
def test_cap_required(self):
from pillar.api.utils.authorization import require_login
called = [False]
@require_login(require_cap='subscriber')
def call_me():
called[0] = True
with self.app.test_request_context():
self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
self.assertRaises(Forbidden, call_me)
self.assertFalse(called[0])
with self.app.test_request_context():
self.login_api_as(ObjectId(24 * 'a'), ['admin'])
call_me()
self.assertTrue(called[0])
def test_invalid_combinations(self):
from pillar.api.utils.authorization import require_login
with self.assertRaises(TypeError):
require_login(require_roles=['abc', 'def'])
with self.assertRaises(TypeError):
require_login(require_cap={'multiple', 'caps'})
with self.assertRaises(ValueError):
require_login(require_roles=set(), require_all=True)
with self.assertRaises(ValueError):
require_login(require_roles={'admin'}, require_cap='hey')
class UserCreationTest(AbstractPillarTest): class UserCreationTest(AbstractPillarTest):
@responses.activate @responses.activate