Secure read access to /users endpoint.
- auth field is never returned - unauthenticated access is rejected - non-admin users can only access themselves
This commit is contained in:
@@ -3,6 +3,12 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
|
from flask import g
|
||||||
|
from eve.auth import TokenAuth
|
||||||
|
from werkzeug.exceptions import Forbidden
|
||||||
|
|
||||||
|
from application.utils.authorization import user_has_role
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -36,6 +42,42 @@ def after_replacing_user(item, original):
|
|||||||
item.get('username'), item.get('_id'), ex)
|
item.get('username'), item.get('_id'), ex)
|
||||||
|
|
||||||
|
|
||||||
|
def before_getting_users(request, lookup):
|
||||||
|
"""Modifies the lookup dict to limit returned user info."""
|
||||||
|
|
||||||
|
# No access when not logged in.
|
||||||
|
current_user = g.get('current_user')
|
||||||
|
if current_user is None:
|
||||||
|
raise Forbidden()
|
||||||
|
|
||||||
|
# Admins can do anything and get everything, except the 'auth' block.
|
||||||
|
if user_has_role(u'admin'):
|
||||||
|
return
|
||||||
|
|
||||||
|
# Only allow access to the current user.
|
||||||
|
if '_id' in lookup:
|
||||||
|
if str(lookup['_id']) != str(current_user['user_id']):
|
||||||
|
raise Forbidden()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Add a filter to only return the current user.
|
||||||
|
lookup['_id'] = current_user['user_id']
|
||||||
|
|
||||||
|
|
||||||
|
def after_fetching_user(user):
|
||||||
|
# Deny access to auth block; authentication stuff is managed by
|
||||||
|
# custom end-points.
|
||||||
|
user.pop('auth', None)
|
||||||
|
|
||||||
|
|
||||||
|
def after_fetching_user_resource(response):
|
||||||
|
for user in response['_items']:
|
||||||
|
after_fetching_user(user)
|
||||||
|
|
||||||
|
|
||||||
def setup_app(app):
|
def setup_app(app):
|
||||||
|
app.on_pre_GET_users += before_getting_users
|
||||||
app.on_post_GET_users += post_GET_user
|
app.on_post_GET_users += post_GET_user
|
||||||
app.on_replace_users += after_replacing_user
|
app.on_replace_users += after_replacing_user
|
||||||
|
app.on_fetched_item_users += after_fetching_user
|
||||||
|
app.on_fetched_resource_users += after_fetching_user_resource
|
||||||
|
@@ -707,8 +707,11 @@ users = {
|
|||||||
|
|
||||||
'resource_methods': ['GET', 'POST'],
|
'resource_methods': ['GET', 'POST'],
|
||||||
|
|
||||||
'public_methods': ['GET', 'POST'],
|
'public_methods': [],
|
||||||
# 'public_item_methods': ['GET'],
|
|
||||||
|
# By default don't include the 'auth' field. It can still be obtained
|
||||||
|
# using projections, though, so we block that in hooks.
|
||||||
|
'datasource': {'projection': {u'auth': 0}},
|
||||||
|
|
||||||
'schema': users_schema
|
'schema': users_schema
|
||||||
}
|
}
|
||||||
|
@@ -105,6 +105,8 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
return found['_id'], found
|
return found['_id'], found
|
||||||
|
|
||||||
def create_user(self, user_id='cafef00dc379cf10c4aaceaf', roles=('subscriber', )):
|
def create_user(self, user_id='cafef00dc379cf10c4aaceaf', roles=('subscriber', )):
|
||||||
|
from application.utils.authentication import make_unique_username
|
||||||
|
|
||||||
with self.app.test_request_context():
|
with self.app.test_request_context():
|
||||||
users = self.app.data.driver.db['users']
|
users = self.app.data.driver.db['users']
|
||||||
assert isinstance(users, pymongo.collection.Collection)
|
assert isinstance(users, pymongo.collection.Collection)
|
||||||
@@ -113,7 +115,7 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
'_id': ObjectId(user_id),
|
'_id': ObjectId(user_id),
|
||||||
'_updated': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
'_updated': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
||||||
'_created': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
'_created': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
||||||
'username': 'tester',
|
'username': make_unique_username('tester'),
|
||||||
'groups': [],
|
'groups': [],
|
||||||
'roles': list(roles),
|
'roles': list(roles),
|
||||||
'settings': {'email_communications': 1},
|
'settings': {'email_communications': 1},
|
||||||
|
@@ -158,3 +158,88 @@ class AuthenticationTests(AbstractPillarTest):
|
|||||||
db_user = users.find_one(user_id)
|
db_user = users.find_one(user_id)
|
||||||
|
|
||||||
self.assertEqual([u'subscriber'], db_user['roles'])
|
self.assertEqual([u'subscriber'], db_user['roles'])
|
||||||
|
|
||||||
|
|
||||||
|
class UserListTests(AbstractPillarTest):
|
||||||
|
"""Security-related tests."""
|
||||||
|
|
||||||
|
def setUp(self, **kwargs):
|
||||||
|
super(UserListTests, self).setUp()
|
||||||
|
|
||||||
|
self.create_user(roles=[u'subscriber'], user_id='123456789abc123456789abc')
|
||||||
|
self.create_user(roles=[u'admin'], user_id='223456789abc123456789abc')
|
||||||
|
self.create_user(roles=[u'subscriber'], user_id='323456789abc123456789abc')
|
||||||
|
|
||||||
|
self.create_valid_auth_token('123456789abc123456789abc', 'token')
|
||||||
|
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
|
||||||
|
|
||||||
|
def test_list_all_users_anonymous(self):
|
||||||
|
# Anonymous access should be denied.
|
||||||
|
resp = self.client.get('/users')
|
||||||
|
self.assertEqual(403, resp.status_code)
|
||||||
|
|
||||||
|
def test_list_all_users_subscriber(self):
|
||||||
|
# Regular access should result in only your own info.
|
||||||
|
resp = self.client.get('/users', headers={'Authorization': self.make_header('token')})
|
||||||
|
users = json.loads(resp.data)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
self.assertEqual(1, users['_meta']['total'])
|
||||||
|
|
||||||
|
# The 'auth' section should be removed.
|
||||||
|
user_info = users['_items'][0]
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
|
def test_list_all_users_admin(self):
|
||||||
|
# Admin access should result in all users
|
||||||
|
resp = self.client.get('/users', headers={'Authorization': self.make_header('admin-token')})
|
||||||
|
users = json.loads(resp.data)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
self.assertEqual(3, users['_meta']['total'])
|
||||||
|
|
||||||
|
# The 'auth' section should be removed.
|
||||||
|
for user_info in users['_items']:
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
|
def test_list_all_users_admin_explicit_projection(self):
|
||||||
|
# Admin access should result in all users
|
||||||
|
projection = json.dumps({'auth': 1})
|
||||||
|
resp = self.client.get('/users?projection=%s' % projection,
|
||||||
|
headers={'Authorization': self.make_header('admin-token')})
|
||||||
|
users = json.loads(resp.data)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
self.assertEqual(3, users['_meta']['total'])
|
||||||
|
|
||||||
|
# The 'auth' section should be removed.
|
||||||
|
for user_info in users['_items']:
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
|
def test_own_user_subscriber(self):
|
||||||
|
# Regular access should result in only your own info.
|
||||||
|
resp = self.client.get('/users/%s' % '123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
user_info = json.loads(resp.data)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
|
def test_own_user_subscriber_explicit_projection(self):
|
||||||
|
# With a custom projection requesting the auth list
|
||||||
|
projection = json.dumps({'auth': 1})
|
||||||
|
resp = self.client.get('/users/%s?projection=%s' % ('123456789abc123456789abc', projection),
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
user_info = json.loads(resp.data)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
|
def test_other_user_subscriber(self):
|
||||||
|
# Requesting another user should be denied.
|
||||||
|
resp = self.client.get('/users/%s' % '223456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
user_info = json.loads(resp.data)
|
||||||
|
|
||||||
|
self.assertEqual(403, resp.status_code)
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
Reference in New Issue
Block a user