From 5c04cdbd6ee875c5d9eb7ef1f3e9f7a730c961d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 26 Apr 2016 10:45:54 +0200 Subject: [PATCH] Secure read access to /users endpoint. - auth field is never returned - unauthenticated access is rejected - non-admin users can only access themselves --- pillar/application/modules/users.py | 42 ++++++++++++++ pillar/settings.py | 7 ++- tests/common_test_class.py | 4 +- tests/test_auth.py | 85 +++++++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 3 deletions(-) diff --git a/pillar/application/modules/users.py b/pillar/application/modules/users.py index 3b8618b4..585a9436 100644 --- a/pillar/application/modules/users.py +++ b/pillar/application/modules/users.py @@ -3,6 +3,12 @@ import json import logging import urllib +from flask import g +from eve.auth import TokenAuth +from werkzeug.exceptions import Forbidden + +from application.utils.authorization import user_has_role + log = logging.getLogger(__name__) @@ -36,6 +42,42 @@ def after_replacing_user(item, original): item.get('username'), item.get('_id'), ex) +def before_getting_users(request, lookup): + """Modifies the lookup dict to limit returned user info.""" + + # No access when not logged in. + current_user = g.get('current_user') + if current_user is None: + raise Forbidden() + + # Admins can do anything and get everything, except the 'auth' block. + if user_has_role(u'admin'): + return + + # Only allow access to the current user. + if '_id' in lookup: + if str(lookup['_id']) != str(current_user['user_id']): + raise Forbidden() + return + + # Add a filter to only return the current user. + lookup['_id'] = current_user['user_id'] + + +def after_fetching_user(user): + # Deny access to auth block; authentication stuff is managed by + # custom end-points. + user.pop('auth', None) + + +def after_fetching_user_resource(response): + for user in response['_items']: + after_fetching_user(user) + + def setup_app(app): + app.on_pre_GET_users += before_getting_users app.on_post_GET_users += post_GET_user app.on_replace_users += after_replacing_user + app.on_fetched_item_users += after_fetching_user + app.on_fetched_resource_users += after_fetching_user_resource diff --git a/pillar/settings.py b/pillar/settings.py index 44581452..d343ae22 100644 --- a/pillar/settings.py +++ b/pillar/settings.py @@ -707,8 +707,11 @@ users = { 'resource_methods': ['GET', 'POST'], - 'public_methods': ['GET', 'POST'], - # 'public_item_methods': ['GET'], + 'public_methods': [], + + # By default don't include the 'auth' field. It can still be obtained + # using projections, though, so we block that in hooks. + 'datasource': {'projection': {u'auth': 0}}, 'schema': users_schema } diff --git a/tests/common_test_class.py b/tests/common_test_class.py index 07274356..6d87542c 100644 --- a/tests/common_test_class.py +++ b/tests/common_test_class.py @@ -105,6 +105,8 @@ class AbstractPillarTest(TestMinimal): return found['_id'], found def create_user(self, user_id='cafef00dc379cf10c4aaceaf', roles=('subscriber', )): + from application.utils.authentication import make_unique_username + with self.app.test_request_context(): users = self.app.data.driver.db['users'] assert isinstance(users, pymongo.collection.Collection) @@ -113,7 +115,7 @@ class AbstractPillarTest(TestMinimal): '_id': ObjectId(user_id), '_updated': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc), '_created': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc), - 'username': 'tester', + 'username': make_unique_username('tester'), 'groups': [], 'roles': list(roles), 'settings': {'email_communications': 1}, diff --git a/tests/test_auth.py b/tests/test_auth.py index eef250a8..9c0aa102 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -158,3 +158,88 @@ class AuthenticationTests(AbstractPillarTest): db_user = users.find_one(user_id) self.assertEqual([u'subscriber'], db_user['roles']) + + +class UserListTests(AbstractPillarTest): + """Security-related tests.""" + + def setUp(self, **kwargs): + super(UserListTests, self).setUp() + + self.create_user(roles=[u'subscriber'], user_id='123456789abc123456789abc') + self.create_user(roles=[u'admin'], user_id='223456789abc123456789abc') + self.create_user(roles=[u'subscriber'], user_id='323456789abc123456789abc') + + self.create_valid_auth_token('123456789abc123456789abc', 'token') + self.create_valid_auth_token('223456789abc123456789abc', 'admin-token') + + def test_list_all_users_anonymous(self): + # Anonymous access should be denied. + resp = self.client.get('/users') + self.assertEqual(403, resp.status_code) + + def test_list_all_users_subscriber(self): + # Regular access should result in only your own info. + resp = self.client.get('/users', headers={'Authorization': self.make_header('token')}) + users = json.loads(resp.data) + + self.assertEqual(200, resp.status_code) + self.assertEqual(1, users['_meta']['total']) + + # The 'auth' section should be removed. + user_info = users['_items'][0] + self.assertNotIn('auth', user_info) + + def test_list_all_users_admin(self): + # Admin access should result in all users + resp = self.client.get('/users', headers={'Authorization': self.make_header('admin-token')}) + users = json.loads(resp.data) + + self.assertEqual(200, resp.status_code) + self.assertEqual(3, users['_meta']['total']) + + # The 'auth' section should be removed. + for user_info in users['_items']: + self.assertNotIn('auth', user_info) + + def test_list_all_users_admin_explicit_projection(self): + # Admin access should result in all users + projection = json.dumps({'auth': 1}) + resp = self.client.get('/users?projection=%s' % projection, + headers={'Authorization': self.make_header('admin-token')}) + users = json.loads(resp.data) + + self.assertEqual(200, resp.status_code) + self.assertEqual(3, users['_meta']['total']) + + # The 'auth' section should be removed. + for user_info in users['_items']: + self.assertNotIn('auth', user_info) + + def test_own_user_subscriber(self): + # Regular access should result in only your own info. + resp = self.client.get('/users/%s' % '123456789abc123456789abc', + headers={'Authorization': self.make_header('token')}) + user_info = json.loads(resp.data) + + self.assertEqual(200, resp.status_code) + self.assertNotIn('auth', user_info) + + def test_own_user_subscriber_explicit_projection(self): + # With a custom projection requesting the auth list + projection = json.dumps({'auth': 1}) + resp = self.client.get('/users/%s?projection=%s' % ('123456789abc123456789abc', projection), + headers={'Authorization': self.make_header('token')}) + user_info = json.loads(resp.data) + + self.assertEqual(200, resp.status_code) + self.assertNotIn('auth', user_info) + + def test_other_user_subscriber(self): + # Requesting another user should be denied. + resp = self.client.get('/users/%s' % '223456789abc123456789abc', + headers={'Authorization': self.make_header('token')}) + user_info = json.loads(resp.data) + + self.assertEqual(403, resp.status_code) + self.assertNotIn('auth', user_info)