Unified user representation for web and API calls
Both approaches now use a pillar.auth.UserClass instance. g.current_user is now always set to that instance, even for web entry points. This UserClass instance can still be keyed like the old dict, but this is for temporary compatibility and shouldn't be relied on in new or touched code.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
"""Authentication code common to the web and api modules."""
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import typing
|
||||
|
||||
@@ -8,6 +9,8 @@ import flask_login
|
||||
import flask_oauthlib.client
|
||||
from werkzeug.local import LocalProxy
|
||||
|
||||
import bson
|
||||
|
||||
from ..api import utils
|
||||
from ..api.utils import authentication
|
||||
|
||||
@@ -20,11 +23,53 @@ class UserClass(flask_login.UserMixin):
|
||||
self.id = token
|
||||
self.username: str = None
|
||||
self.full_name: str = None
|
||||
self.user_id: bson.ObjectId = None
|
||||
self.objectid: str = None
|
||||
self.gravatar: str = None
|
||||
self.email: str = None
|
||||
self.roles: typing.List[str] = []
|
||||
self.groups: typing.List[str] = []
|
||||
self.groups: typing.List[str] = [] # NOTE: these are stringified object IDs.
|
||||
self.group_ids: typing.List[bson.ObjectId] = []
|
||||
|
||||
@classmethod
|
||||
def construct(cls, token: str, db_user: dict) -> 'UserClass':
|
||||
"""Constructs a new UserClass instance from a Mongo user document."""
|
||||
|
||||
user = UserClass(token)
|
||||
|
||||
user.user_id = db_user['_id']
|
||||
user.roles = db_user.get('roles') or []
|
||||
user.group_ids = db_user.get('groups') or []
|
||||
user.email = db_user.get('email') or ''
|
||||
user.username = db_user['username']
|
||||
user.full_name = db_user.get('full_name') or ''
|
||||
|
||||
# Derived properties
|
||||
user.objectid = str(db_user['_id'])
|
||||
user.gravatar = utils.gravatar(user.email)
|
||||
user.groups = [str(g) for g in user.group_ids]
|
||||
|
||||
return user
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Compatibility layer with old dict-based g.current_user object."""
|
||||
|
||||
if item == 'user_id':
|
||||
return self.user_id
|
||||
if item == 'groups':
|
||||
return self.group_ids
|
||||
if item == 'roles':
|
||||
return set(self.roles)
|
||||
|
||||
raise KeyError(f'No such key {item!r}')
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Compatibility layer with old dict-based g.current_user object."""
|
||||
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
def has_role(self, *roles):
|
||||
"""Returns True iff the user has one or more of the given roles."""
|
||||
@@ -34,6 +79,32 @@ class UserClass(flask_login.UserMixin):
|
||||
|
||||
return bool(set(self.roles).intersection(set(roles)))
|
||||
|
||||
def matches_roles(self,
|
||||
require_roles=set(),
|
||||
require_all=False) -> bool:
|
||||
"""Returns True iff the user's roles matches the query.
|
||||
|
||||
:param require_roles: set of roles.
|
||||
:param require_all:
|
||||
When False (the default): if the user's roles have a
|
||||
non-empty intersection with the given roles, returns True.
|
||||
When True: require the user to have all given roles before
|
||||
returning True.
|
||||
"""
|
||||
|
||||
if not isinstance(require_roles, set):
|
||||
raise TypeError(f'require_roles param should be a set, but is {type(require_roles)!r}')
|
||||
|
||||
if require_all and not require_roles:
|
||||
raise ValueError('require_login(require_all=True) cannot be used with '
|
||||
'empty require_roles.')
|
||||
|
||||
intersection = require_roles.intersection(self.roles)
|
||||
if require_all:
|
||||
return len(intersection) == len(require_roles)
|
||||
|
||||
return not bool(require_roles) or bool(intersection)
|
||||
|
||||
|
||||
class AnonymousUser(flask_login.AnonymousUserMixin, UserClass):
|
||||
def __init__(self):
|
||||
@@ -43,27 +114,20 @@ class AnonymousUser(flask_login.AnonymousUserMixin, UserClass):
|
||||
return False
|
||||
|
||||
|
||||
def _load_user(token):
|
||||
|
||||
def _load_user(token) -> typing.Union[UserClass, AnonymousUser]:
|
||||
"""Loads a user by their token.
|
||||
|
||||
:returns: returns a UserClass instance if logged in, or an AnonymousUser() if not.
|
||||
:rtype: UserClass
|
||||
"""
|
||||
|
||||
db_user = authentication.validate_this_token(token)
|
||||
if not db_user:
|
||||
return AnonymousUser()
|
||||
|
||||
login_user = UserClass(token)
|
||||
login_user.email = db_user['email']
|
||||
login_user.objectid = str(db_user['_id'])
|
||||
login_user.username = db_user['username']
|
||||
login_user.gravatar = utils.gravatar(db_user['email'])
|
||||
login_user.roles = db_user.get('roles', [])
|
||||
login_user.groups = [str(g) for g in db_user['groups'] or ()]
|
||||
login_user.full_name = db_user.get('full_name', '')
|
||||
user = UserClass.construct(token, db_user)
|
||||
|
||||
return login_user
|
||||
return user
|
||||
|
||||
|
||||
def config_login_manager(app):
|
||||
@@ -83,11 +147,14 @@ def config_login_manager(app):
|
||||
def login_user(oauth_token: str, *, load_from_db=False):
|
||||
"""Log in the user identified by the given token."""
|
||||
|
||||
from flask import g
|
||||
|
||||
if load_from_db:
|
||||
user = _load_user(oauth_token)
|
||||
else:
|
||||
user = UserClass(oauth_token)
|
||||
flask_login.login_user(user)
|
||||
g.current_user = user
|
||||
|
||||
|
||||
def get_blender_id_oauth_token():
|
||||
|
Reference in New Issue
Block a user