Secure write access to /users endpoint
- Admins can PUT everything - Users can only PUT themselves - The 'auth' field is always taken from the original, and never overwritten by the PUT. It can be missing from the request, so you can GET and then PUT the same data. - Nobody can POST or DELETE users
This commit is contained in:
@@ -1,11 +1,12 @@
|
|||||||
|
import copy
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
from flask import g
|
from flask import g, current_app
|
||||||
from eve.auth import TokenAuth
|
|
||||||
from werkzeug.exceptions import Forbidden
|
from werkzeug.exceptions import Forbidden
|
||||||
|
from eve.utils import parse_request
|
||||||
|
|
||||||
from application.utils.authorization import user_has_role
|
from application.utils.authorization import user_has_role
|
||||||
|
|
||||||
@@ -29,6 +30,24 @@ def post_GET_user(request, payload):
|
|||||||
payload.data = json.dumps(json_data)
|
payload.data = json.dumps(json_data)
|
||||||
|
|
||||||
|
|
||||||
|
def before_replacing_user(request, lookup):
|
||||||
|
"""Loads the auth field from the database, preventing any changes."""
|
||||||
|
|
||||||
|
# Find the user that is being replaced
|
||||||
|
req = parse_request('users')
|
||||||
|
req.projection = json.dumps({'auth': 1})
|
||||||
|
original = current_app.data.find_one('users', req, **lookup)
|
||||||
|
|
||||||
|
# Make sure that the replacement has a valid auth field.
|
||||||
|
updates = request.get_json()
|
||||||
|
assert updates is request.get_json() # We should get a ref to the cached JSON, and not a copy.
|
||||||
|
|
||||||
|
if 'auth' in original:
|
||||||
|
updates['auth'] = copy.deepcopy(original['auth'])
|
||||||
|
else:
|
||||||
|
updates.pop('auth', None)
|
||||||
|
|
||||||
|
|
||||||
def after_replacing_user(item, original):
|
def after_replacing_user(item, original):
|
||||||
"""Push an update to the Algolia index when a user item is updated"""
|
"""Push an update to the Algolia index when a user item is updated"""
|
||||||
|
|
||||||
@@ -42,7 +61,7 @@ def after_replacing_user(item, original):
|
|||||||
item.get('username'), item.get('_id'), ex)
|
item.get('username'), item.get('_id'), ex)
|
||||||
|
|
||||||
|
|
||||||
def before_getting_users(request, lookup):
|
def check_user_access(request, lookup):
|
||||||
"""Modifies the lookup dict to limit returned user info."""
|
"""Modifies the lookup dict to limit returned user info."""
|
||||||
|
|
||||||
# No access when not logged in.
|
# No access when not logged in.
|
||||||
@@ -76,8 +95,10 @@ def after_fetching_user_resource(response):
|
|||||||
|
|
||||||
|
|
||||||
def setup_app(app):
|
def setup_app(app):
|
||||||
app.on_pre_GET_users += before_getting_users
|
app.on_pre_GET_users += check_user_access
|
||||||
app.on_post_GET_users += post_GET_user
|
app.on_post_GET_users += post_GET_user
|
||||||
app.on_replace_users += after_replacing_user
|
app.on_pre_PUT_users += check_user_access
|
||||||
|
app.on_pre_PUT_users += before_replacing_user
|
||||||
|
app.on_replaced_users += after_replacing_user
|
||||||
app.on_fetched_item_users += after_fetching_user
|
app.on_fetched_item_users += after_fetching_user
|
||||||
app.on_fetched_resource_users += after_fetching_user_resource
|
app.on_fetched_resource_users += after_fetching_user_resource
|
||||||
|
@@ -705,8 +705,8 @@ users = {
|
|||||||
'cache_control': 'max-age=10,must-revalidate',
|
'cache_control': 'max-age=10,must-revalidate',
|
||||||
'cache_expires': 10,
|
'cache_expires': 10,
|
||||||
|
|
||||||
'resource_methods': ['GET', 'POST'],
|
'resource_methods': ['GET'],
|
||||||
|
'item_methods': ['GET', 'PUT'],
|
||||||
'public_methods': [],
|
'public_methods': [],
|
||||||
|
|
||||||
# By default don't include the 'auth' field. It can still be obtained
|
# By default don't include the 'auth' field. It can still be obtained
|
||||||
|
@@ -1,7 +1,9 @@
|
|||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import responses
|
import responses
|
||||||
import json
|
import json
|
||||||
from bson import tz_util
|
from bson import tz_util, ObjectId
|
||||||
|
|
||||||
from common_test_class import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
|
from common_test_class import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
|
||||||
|
|
||||||
@@ -127,14 +129,6 @@ class AuthenticationTests(AbstractPillarTest):
|
|||||||
'Content-Type': 'application/json'})
|
'Content-Type': 'application/json'})
|
||||||
self.assertEqual(405, resp.status_code)
|
self.assertEqual(405, resp.status_code)
|
||||||
|
|
||||||
# POSTing with our _id to update shouldn't work either, as POST always creates new users.
|
|
||||||
updated_fields_with_id = dict(_id=user_id, **updated_fields)
|
|
||||||
resp = self.client.post('/users',
|
|
||||||
data=json.dumps(updated_fields_with_id, cls=PillarJSONEncoder),
|
|
||||||
headers={'Authorization': self.make_header('nonexpired-main'),
|
|
||||||
'Content-Type': 'application/json'})
|
|
||||||
self.assertEqual(422, resp.status_code)
|
|
||||||
|
|
||||||
# PUT and PATCH should not be allowed.
|
# PUT and PATCH should not be allowed.
|
||||||
resp = self.client.put('/users/%s' % user_id,
|
resp = self.client.put('/users/%s' % user_id,
|
||||||
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
||||||
@@ -147,7 +141,7 @@ class AuthenticationTests(AbstractPillarTest):
|
|||||||
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
||||||
headers={'Authorization': self.make_header('nonexpired-main'),
|
headers={'Authorization': self.make_header('nonexpired-main'),
|
||||||
'Content-Type': 'application/json'})
|
'Content-Type': 'application/json'})
|
||||||
self.assertEqual(403, resp.status_code)
|
self.assertEqual(405, resp.status_code)
|
||||||
|
|
||||||
# After all of this, the roles should be the same.
|
# After all of this, the roles should be the same.
|
||||||
with self.app.test_request_context(
|
with self.app.test_request_context(
|
||||||
@@ -172,6 +166,7 @@ class UserListTests(AbstractPillarTest):
|
|||||||
|
|
||||||
self.create_valid_auth_token('123456789abc123456789abc', 'token')
|
self.create_valid_auth_token('123456789abc123456789abc', 'token')
|
||||||
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
|
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
|
||||||
|
self.create_valid_auth_token('323456789abc123456789abc', 'other-token')
|
||||||
|
|
||||||
def test_list_all_users_anonymous(self):
|
def test_list_all_users_anonymous(self):
|
||||||
# Anonymous access should be denied.
|
# Anonymous access should be denied.
|
||||||
@@ -243,3 +238,99 @@ class UserListTests(AbstractPillarTest):
|
|||||||
|
|
||||||
self.assertEqual(403, resp.status_code)
|
self.assertEqual(403, resp.status_code)
|
||||||
self.assertNotIn('auth', user_info)
|
self.assertNotIn('auth', user_info)
|
||||||
|
|
||||||
|
def test_put_user(self):
|
||||||
|
from application.utils import remove_private_keys
|
||||||
|
|
||||||
|
# PUTting a user should work, and not mess up the auth field.
|
||||||
|
resp = self.client.get('/users/123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
user_info = json.loads(resp.data)
|
||||||
|
put_user = remove_private_keys(user_info)
|
||||||
|
|
||||||
|
resp = self.client.put('/users/123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token'),
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'If-Match': user_info['_etag']},
|
||||||
|
data=json.dumps(put_user))
|
||||||
|
self.assertEqual(200, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
# Get directly from MongoDB, Eve blocks access to the auth field.
|
||||||
|
with self.app.test_request_context():
|
||||||
|
users = self.app.data.driver.db['users']
|
||||||
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
||||||
|
self.assertIn('auth', db_user)
|
||||||
|
|
||||||
|
def test_put_other_user(self):
|
||||||
|
from application.utils import remove_private_keys
|
||||||
|
|
||||||
|
# PUTting the user as another user should fail.
|
||||||
|
resp = self.client.get('/users/123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
user_info = json.loads(resp.data)
|
||||||
|
put_user = remove_private_keys(user_info)
|
||||||
|
|
||||||
|
resp = self.client.put('/users/123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('other-token'),
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'If-Match': user_info['_etag']},
|
||||||
|
data=json.dumps(put_user))
|
||||||
|
self.assertEqual(403, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
def test_put_admin(self):
|
||||||
|
from application.utils import remove_private_keys
|
||||||
|
|
||||||
|
# PUTting a user should work, and not mess up the auth field.
|
||||||
|
resp = self.client.get('/users/123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
user_info = json.loads(resp.data)
|
||||||
|
put_user = remove_private_keys(user_info)
|
||||||
|
|
||||||
|
resp = self.client.put('/users/123456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('admin-token'),
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'If-Match': user_info['_etag']},
|
||||||
|
data=json.dumps(put_user))
|
||||||
|
self.assertEqual(200, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
# Get directly from MongoDB, Eve blocks access to the auth field.
|
||||||
|
with self.app.test_request_context():
|
||||||
|
users = self.app.data.driver.db['users']
|
||||||
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
||||||
|
self.assertIn('auth', db_user)
|
||||||
|
|
||||||
|
def test_post(self):
|
||||||
|
"""POSTing to /users should fail for subscribers and admins alike."""
|
||||||
|
|
||||||
|
post_user = {
|
||||||
|
'username': 'unique-user-name',
|
||||||
|
'groups': [],
|
||||||
|
'roles': ['subscriber'],
|
||||||
|
'settings': {'email_communications': 1},
|
||||||
|
'auth': [],
|
||||||
|
'full_name': u'คนรักของผัดไทย',
|
||||||
|
'email': TEST_EMAIL_ADDRESS,
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = self.client.post('/users',
|
||||||
|
headers={'Authorization': self.make_header('token'),
|
||||||
|
'Content-Type': 'application/json'},
|
||||||
|
data=json.dumps(post_user))
|
||||||
|
self.assertEqual(405, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
resp = self.client.post('/users',
|
||||||
|
headers={'Authorization': self.make_header('admin-token'),
|
||||||
|
'Content-Type': 'application/json'},
|
||||||
|
data=json.dumps(post_user))
|
||||||
|
self.assertEqual(405, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
def test_delete(self):
|
||||||
|
"""DELETING a user should fail for subscribers and admins alike."""
|
||||||
|
|
||||||
|
resp = self.client.delete('/users/323456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('token')})
|
||||||
|
self.assertEqual(405, resp.status_code, resp.data)
|
||||||
|
|
||||||
|
resp = self.client.delete('/users/323456789abc123456789abc',
|
||||||
|
headers={'Authorization': self.make_header('admin-token')})
|
||||||
|
self.assertEqual(405, resp.status_code, resp.data)
|
||||||
|
Reference in New Issue
Block a user