diff --git a/pillar/application/modules/users.py b/pillar/application/modules/users.py index 636d18ea..6c0ca4e7 100644 --- a/pillar/application/modules/users.py +++ b/pillar/application/modules/users.py @@ -66,21 +66,32 @@ def check_user_access(request, lookup): # No access when not logged in. current_user = g.get('current_user') - if current_user is None: - raise Forbidden() + current_user_id = current_user['user_id'] if current_user else None # Admins can do anything and get everything, except the 'auth' block. if user_has_role(u'admin'): return - # Only allow access to the current user. - if '_id' in lookup: - if str(lookup['_id']) != str(current_user['user_id']): - raise Forbidden() - return + if not lookup and not current_user: + raise Forbidden() # Add a filter to only return the current user. - lookup['_id'] = current_user['user_id'] + if '_id' not in lookup: + lookup['_id'] = current_user['user_id'] + + +def check_put_access(request, lookup): + """Only allow PUT to the current user, or all users if admin.""" + + if user_has_role(u'admin'): + return + + current_user = g.get('current_user') + if not current_user: + raise Forbidden() + + if str(lookup['_id']) != str(current_user['user_id']): + raise Forbidden() def after_fetching_user(user): @@ -88,6 +99,23 @@ def after_fetching_user(user): # custom end-points. user.pop('auth', None) + current_user = g.get('current_user') + current_user_id = current_user['user_id'] if current_user else None + + # Admins can do anything and get everything, except the 'auth' block. + if user_has_role(u'admin'): + return + + # Only allow full access to the current user. + if str(user['_id']) == str(current_user_id): + return + + # Remove all fields except public ones. + public_fields = {'full_name', 'email'} + for field in list(user.keys()): + if field not in public_fields: + del user[field] + def after_fetching_user_resource(response): for user in response['_items']: @@ -97,7 +125,7 @@ def after_fetching_user_resource(response): def setup_app(app): app.on_pre_GET_users += check_user_access app.on_post_GET_users += post_GET_user - app.on_pre_PUT_users += check_user_access + app.on_pre_PUT_users += check_put_access app.on_pre_PUT_users += before_replacing_user app.on_replaced_users += after_replacing_user app.on_fetched_item_users += after_fetching_user diff --git a/pillar/settings.py b/pillar/settings.py index a450445c..571a9f62 100644 --- a/pillar/settings.py +++ b/pillar/settings.py @@ -707,7 +707,7 @@ users = { 'resource_methods': ['GET'], 'item_methods': ['GET', 'PUT'], - 'public_methods': [], + 'public_item_methods': ['GET'], # By default don't include the 'auth' field. It can still be obtained # using projections, though, so we block that in hooks. diff --git a/tests/test_auth.py b/tests/test_auth.py index c754a25a..623735ec 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -7,6 +7,8 @@ from bson import tz_util, ObjectId from common_test_class import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS +PUBLIC_USER_FIELDS = {'full_name', 'email'} + class AuthenticationTests(AbstractPillarTest): def test_make_unique_username(self): @@ -169,7 +171,7 @@ class UserListTests(AbstractPillarTest): self.create_valid_auth_token('323456789abc123456789abc', 'other-token') def test_list_all_users_anonymous(self): - # Anonymous access should be denied. + # Listing all users should be forbidden resp = self.client.get('/users') self.assertEqual(403, resp.status_code) @@ -211,9 +213,20 @@ class UserListTests(AbstractPillarTest): for user_info in users['_items']: self.assertNotIn('auth', user_info) + def test_user_anonymous(self): + from application.utils import remove_private_keys + + # Getting a user should be limited to certain fields + resp = self.client.get('/users/123456789abc123456789abc') + self.assertEqual(200, resp.status_code) + + user_info = json.loads(resp.data) + regular_info = remove_private_keys(user_info) + self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys())) + def test_own_user_subscriber(self): # Regular access should result in only your own info. - resp = self.client.get('/users/%s' % '123456789abc123456789abc', + resp = self.client.get('/users/123456789abc123456789abc', headers={'Authorization': self.make_header('token')}) user_info = json.loads(resp.data) @@ -231,14 +244,19 @@ class UserListTests(AbstractPillarTest): self.assertNotIn('auth', user_info) def test_other_user_subscriber(self): - # Requesting another user should be denied. + from application.utils import remove_private_keys + + # Requesting another user should be limited to full name and email. resp = self.client.get('/users/%s' % '223456789abc123456789abc', headers={'Authorization': self.make_header('token')}) user_info = json.loads(resp.data) - self.assertEqual(403, resp.status_code) + self.assertEqual(200, resp.status_code) self.assertNotIn('auth', user_info) + regular_info = remove_private_keys(user_info) + self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys())) + def test_put_user(self): from application.utils import remove_private_keys diff --git a/tests/test_local_auth.py b/tests/test_local_auth.py index 62301cf4..9fa3e9a6 100644 --- a/tests/test_local_auth.py +++ b/tests/test_local_auth.py @@ -53,11 +53,12 @@ class LocalAuthTest(AbstractPillarTest): exp = datetime.datetime.now(tz=tz_util.utc) - datetime.timedelta(1) result = tokens.update_one({'token': token}, - {'$set': {'expire_time': exp}}) + {'$set': {'expire_time': exp}}) self.assertEqual(1, result.modified_count) + # Do something restricted. headers = {'Authorization': self.make_header(token)} - resp = self.client.get('/users/%s' % user_id, + resp = self.client.put('/users/%s' % user_id, headers=headers) self.assertEqual(403, resp.status_code, resp.data)