Fix privilege escalation leak
A PUT request on /api/user/{user-id} by the user themselves would allow too much, and would allow self-granting of roles (including admin), group membership (so join any arbitrary project) and pretend to be service accounts.
This commit is contained in:
@@ -721,10 +721,6 @@ users = {
|
|||||||
'item_methods': ['GET', 'PUT', 'PATCH'],
|
'item_methods': ['GET', 'PUT', 'PATCH'],
|
||||||
'public_item_methods': ['GET'],
|
'public_item_methods': ['GET'],
|
||||||
|
|
||||||
# By default don't include the 'auth' field. It can still be obtained
|
|
||||||
# using projections, though, so we block that in hooks.
|
|
||||||
'datasource': {'projection': {'auth': 0}},
|
|
||||||
|
|
||||||
'schema': users_schema
|
'schema': users_schema
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -7,23 +7,50 @@ from pillar.api.users.routes import log
|
|||||||
from pillar.api.utils.authorization import user_has_role
|
from pillar.api.utils.authorization import user_has_role
|
||||||
from werkzeug.exceptions import Forbidden
|
from werkzeug.exceptions import Forbidden
|
||||||
|
|
||||||
|
USER_EDITABLE_FIELDS = {'full_name', 'username', 'email', 'settings'}
|
||||||
|
|
||||||
|
# These fields nobody is allowed to touch directly, not even admins.
|
||||||
|
USER_ALWAYS_RESTORE_FIELDS = {'auth'}
|
||||||
|
|
||||||
def before_replacing_user(request, lookup):
|
def before_replacing_user(request, lookup):
|
||||||
"""Loads the auth field from the database, preventing any changes."""
|
"""Prevents changes to any field of the user doc, except USER_EDITABLE_FIELDS."""
|
||||||
|
|
||||||
# Find the user that is being replaced
|
# Find the user that is being replaced
|
||||||
req = parse_request('users')
|
req = parse_request('users')
|
||||||
req.projection = json.dumps({'auth': 1})
|
req.projection = json.dumps({key: 0 for key in USER_EDITABLE_FIELDS})
|
||||||
original = current_app.data.find_one('users', req, **lookup)
|
original = current_app.data.find_one('users', req, **lookup)
|
||||||
|
|
||||||
# Make sure that the replacement has a valid auth field.
|
# Make sure that the replacement has a valid auth field.
|
||||||
updates = request.get_json()
|
put_data = request.get_json()
|
||||||
assert updates is request.get_json() # We should get a ref to the cached JSON, and not a copy.
|
|
||||||
|
|
||||||
if 'auth' in original:
|
# We should get a ref to the cached JSON, and not a copy. This will allow us to
|
||||||
updates['auth'] = copy.deepcopy(original['auth'])
|
# modify the cached JSON so that Eve sees our modifications.
|
||||||
else:
|
assert put_data is request.get_json()
|
||||||
updates.pop('auth', None)
|
|
||||||
|
# Reset fields that shouldn't be edited to their original values. This is only
|
||||||
|
# needed when users are editing themselves; admins are allowed to edit much more.
|
||||||
|
if not user_has_role('admin'):
|
||||||
|
for db_key, db_value in original.items():
|
||||||
|
if db_key[0] == '_' or db_key in USER_EDITABLE_FIELDS:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if db_key in original:
|
||||||
|
put_data[db_key] = copy.deepcopy(original[db_key])
|
||||||
|
|
||||||
|
# Remove fields added by this PUT request, except when they are user-editable.
|
||||||
|
for put_key in list(put_data.keys()):
|
||||||
|
if put_key[0] == '_' or put_key in USER_EDITABLE_FIELDS:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if put_key not in original:
|
||||||
|
del put_data[put_key]
|
||||||
|
|
||||||
|
# Always restore those fields
|
||||||
|
for db_key in USER_ALWAYS_RESTORE_FIELDS:
|
||||||
|
if db_key in original:
|
||||||
|
put_data[db_key] = copy.deepcopy(original[db_key])
|
||||||
|
else:
|
||||||
|
del put_data[db_key]
|
||||||
|
|
||||||
|
|
||||||
def push_updated_user_to_algolia(user, original):
|
def push_updated_user_to_algolia(user, original):
|
||||||
|
@@ -4,6 +4,7 @@ import base64
|
|||||||
import copy
|
import copy
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import typing
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
@@ -200,14 +201,16 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
|
|
||||||
found = groups_coll.find_one(group_id)
|
found = groups_coll.find_one(group_id)
|
||||||
if found:
|
if found:
|
||||||
return
|
return group_id
|
||||||
|
|
||||||
result = groups_coll.insert_one({'_id': group_id, 'name': name})
|
result = groups_coll.insert_one({'_id': group_id, 'name': name})
|
||||||
assert result.inserted_id
|
assert result.inserted_id
|
||||||
|
return result.inserted_id
|
||||||
|
|
||||||
def create_user(self, user_id='cafef00dc379cf10c4aaceaf', roles=('subscriber',),
|
def create_user(self, user_id='cafef00dc379cf10c4aaceaf', roles=('subscriber',),
|
||||||
groups=None):
|
groups=None, *, token: str = None):
|
||||||
from pillar.api.utils.authentication import make_unique_username
|
from pillar.api.utils.authentication import make_unique_username
|
||||||
|
import uuid
|
||||||
|
|
||||||
with self.app.test_request_context():
|
with self.app.test_request_context():
|
||||||
users = self.app.data.driver.db['users']
|
users = self.app.data.driver.db['users']
|
||||||
@@ -217,6 +220,7 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
'_id': ObjectId(user_id),
|
'_id': ObjectId(user_id),
|
||||||
'_updated': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
'_updated': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
||||||
'_created': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
'_created': datetime.datetime(2016, 4, 15, 13, 15, 11, tzinfo=tz_util.utc),
|
||||||
|
'_etag': 'unittest-%s' % uuid.uuid4().hex,
|
||||||
'username': make_unique_username('tester'),
|
'username': make_unique_username('tester'),
|
||||||
'groups': groups or [],
|
'groups': groups or [],
|
||||||
'roles': list(roles),
|
'roles': list(roles),
|
||||||
@@ -228,7 +232,12 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
'email': TEST_EMAIL_ADDRESS
|
'email': TEST_EMAIL_ADDRESS
|
||||||
})
|
})
|
||||||
|
|
||||||
return result.inserted_id
|
user_id = result.inserted_id
|
||||||
|
|
||||||
|
if token:
|
||||||
|
self.create_valid_auth_token(user_id, token)
|
||||||
|
|
||||||
|
return user_id
|
||||||
|
|
||||||
def create_valid_auth_token(self, user_id, token='token'):
|
def create_valid_auth_token(self, user_id, token='token'):
|
||||||
now = datetime.datetime.now(tz_util.utc)
|
now = datetime.datetime.now(tz_util.utc)
|
||||||
@@ -381,7 +390,7 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
return urlencode(jsonified_params)
|
return urlencode(jsonified_params)
|
||||||
|
|
||||||
def client_request(self, method, path, qs=None, expected_status=200, auth_token=None, json=None,
|
def client_request(self, method, path, qs=None, expected_status=200, auth_token=None, json=None,
|
||||||
data=None, headers=None, files=None, content_type=None):
|
data=None, headers=None, files=None, content_type=None, etag=None):
|
||||||
"""Performs a HTTP request to the server."""
|
"""Performs a HTTP request to the server."""
|
||||||
|
|
||||||
from pillar.api.utils import dumps
|
from pillar.api.utils import dumps
|
||||||
@@ -395,6 +404,14 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
data = dumps(json)
|
data = dumps(json)
|
||||||
headers['Content-Type'] = 'application/json'
|
headers['Content-Type'] = 'application/json'
|
||||||
|
|
||||||
|
if etag is not None:
|
||||||
|
if method == 'PUT':
|
||||||
|
headers['If-Match'] = etag
|
||||||
|
elif method == 'GET':
|
||||||
|
headers['If-None-Match'] = etag
|
||||||
|
else:
|
||||||
|
raise ValueError('Not sure what to do with etag and method %s' % method)
|
||||||
|
|
||||||
if files:
|
if files:
|
||||||
data = data or {}
|
data = data or {}
|
||||||
content_type = 'multipart/form-data'
|
content_type = 'multipart/form-data'
|
||||||
|
@@ -309,6 +309,8 @@ class UserListTests(AbstractPillarTest):
|
|||||||
resp = self.client.get('/api/users/123456789abc123456789abc',
|
resp = self.client.get('/api/users/123456789abc123456789abc',
|
||||||
headers={'Authorization': self.make_header('token')})
|
headers={'Authorization': self.make_header('token')})
|
||||||
user_info = json.loads(resp.data)
|
user_info = json.loads(resp.data)
|
||||||
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
put_user = remove_private_keys(user_info)
|
put_user = remove_private_keys(user_info)
|
||||||
|
|
||||||
resp = self.client.put('/api/users/123456789abc123456789abc',
|
resp = self.client.put('/api/users/123456789abc123456789abc',
|
||||||
@@ -324,6 +326,41 @@ class UserListTests(AbstractPillarTest):
|
|||||||
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
||||||
self.assertIn('auth', db_user)
|
self.assertIn('auth', db_user)
|
||||||
|
|
||||||
|
def test_put_user_restricted_fields(self):
|
||||||
|
from pillar.api.utils import remove_private_keys
|
||||||
|
|
||||||
|
gid_admin = self.ensure_group_exists(24 * '1', 'admin')
|
||||||
|
gid_subscriber = self.ensure_group_exists(24 * '2', 'subscriber')
|
||||||
|
gid_demo = self.ensure_group_exists(24 * '3', 'demo')
|
||||||
|
|
||||||
|
# A user should be able to change only some fields, but not all.
|
||||||
|
user_info = self.get('/api/users/me', auth_token='token').json()
|
||||||
|
|
||||||
|
# Alter all fields (except auth, another test already checks that that's uneditable).
|
||||||
|
put_user = remove_private_keys(user_info)
|
||||||
|
put_user['full_name'] = '¿new name?'
|
||||||
|
put_user['username'] = 'üniék'
|
||||||
|
put_user['email'] = 'new+email@example.com'
|
||||||
|
put_user['roles'] = ['subscriber', 'demo', 'admin', 'service', 'flamenco_manager']
|
||||||
|
put_user['groups'] = [gid_admin, gid_subscriber, gid_demo]
|
||||||
|
put_user['settings']['email_communications'] = 0
|
||||||
|
put_user['service'] = {'flamenco_manager': {}}
|
||||||
|
|
||||||
|
self.put('/api/users/%(_id)s' % user_info,
|
||||||
|
json=put_user,
|
||||||
|
auth_token='token',
|
||||||
|
etag=user_info['_etag'])
|
||||||
|
|
||||||
|
new_user_info = self.get('/api/users/me', auth_token='token').json()
|
||||||
|
self.assertEqual(new_user_info['full_name'], put_user['full_name'])
|
||||||
|
self.assertEqual(new_user_info['username'], put_user['username'])
|
||||||
|
self.assertEqual(new_user_info['email'], put_user['email'])
|
||||||
|
self.assertEqual(new_user_info['roles'], user_info['roles'])
|
||||||
|
self.assertEqual(new_user_info['groups'], user_info['groups'])
|
||||||
|
self.assertEqual(new_user_info['settings']['email_communications'],
|
||||||
|
put_user['settings']['email_communications'])
|
||||||
|
self.assertNotIn('service', new_user_info)
|
||||||
|
|
||||||
def test_put_other_user(self):
|
def test_put_other_user(self):
|
||||||
from pillar.api.utils import remove_private_keys
|
from pillar.api.utils import remove_private_keys
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user