pillar/tests/test_auth.py
Sybren A. Stüvel e600d87592 Secure write access to /users endpoint
- Admins can PUT everything
- Users can only PUT themselves
- The 'auth' field is always taken from the original, and never overwritten
  by the PUT. It can be missing from the request, so you can GET and then
  PUT the same data.
- Nobody can POST or DELETE users
2016-04-26 12:38:44 +02:00

337 lines
15 KiB
Python

# -*- encoding: utf-8 -*-
import datetime
import responses
import json
from bson import tz_util, ObjectId
from common_test_class import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
class AuthenticationTests(AbstractPillarTest):
def test_make_unique_username(self):
from application.utils import authentication as auth
with self.app.test_request_context():
# This user shouldn't exist yet.
self.assertEqual(TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
# Add a user, then test again.
auth.create_new_user(TEST_EMAIL_ADDRESS, TEST_EMAIL_USER, 'test1234')
self.assertEqual('%s1' % TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
@responses.activate
def test_validate_token__not_logged_in(self):
from application.utils import authentication as auth
with self.app.test_request_context():
self.assertFalse(auth.validate_token())
@responses.activate
def test_validate_token__unknown_token(self):
"""Test validating of invalid token, unknown both to us and Blender ID."""
from application.utils import authentication as auth
self.mock_blenderid_validate_unhappy()
with self.app.test_request_context(
headers={'Authorization': self.make_header('unknowntoken')}):
self.assertFalse(auth.validate_token())
@responses.activate
def test_validate_token__unknown_but_valid_token(self):
"""Test validating of valid token, unknown to us but known to Blender ID."""
from application.utils import authentication as auth
self.mock_blenderid_validate_happy()
with self.app.test_request_context(
headers={'Authorization': self.make_header('knowntoken')}):
self.assertTrue(auth.validate_token())
@responses.activate
def test_find_token(self):
"""Test finding of various tokens."""
from application.utils import authentication as auth
user_id = self.create_user()
now = datetime.datetime.now(tz_util.utc)
future = now + datetime.timedelta(days=1)
past = now - datetime.timedelta(days=1)
subclient = self.app.config['BLENDER_ID_SUBCLIENT_ID']
with self.app.test_request_context():
auth.store_token(user_id, 'nonexpired-main', future, None)
auth.store_token(user_id, 'nonexpired-sub', future, subclient)
token3 = auth.store_token(user_id, 'expired-sub', past, subclient)
with self.app.test_request_context(
headers={'Authorization': self.make_header('nonexpired-main')}):
self.assertTrue(auth.validate_token())
with self.app.test_request_context(
headers={'Authorization': self.make_header('nonexpired-main', subclient)}):
self.assertFalse(auth.validate_token())
with self.app.test_request_context(
headers={'Authorization': self.make_header('nonexpired-sub')}):
self.assertFalse(auth.validate_token())
with self.app.test_request_context(
headers={'Authorization': self.make_header('nonexpired-sub', subclient)}):
self.assertTrue(auth.validate_token())
with self.app.test_request_context(
headers={'Authorization': self.make_header('expired-sub', subclient)}):
self.assertFalse(auth.validate_token())
self.mock_blenderid_validate_happy()
with self.app.test_request_context(
headers={'Authorization': self.make_header('expired-sub', subclient)}):
self.assertTrue(auth.validate_token())
# We now should be able to find a new token for this user.
found_token = auth.find_token('expired-sub', subclient)
self.assertIsNotNone(found_token)
self.assertNotEqual(token3['_id'], found_token['_id'])
@responses.activate
def test_save_own_user(self):
"""Tests that a user can't change their own fields."""
from application.utils import authentication as auth
from application.utils import PillarJSONEncoder, remove_private_keys
user_id = self.create_user(roles=[u'subscriber'])
now = datetime.datetime.now(tz_util.utc)
future = now + datetime.timedelta(days=1)
with self.app.test_request_context():
auth.store_token(user_id, 'nonexpired-main', future, None)
with self.app.test_request_context(
headers={'Authorization': self.make_header('nonexpired-main')}):
self.assertTrue(auth.validate_token())
users = self.app.data.driver.db['users']
db_user = users.find_one(user_id)
updated_fields = remove_private_keys(db_user)
updated_fields['roles'] = ['admin', 'subscriber', 'demo'] # Try to elevate our roles.
# POSTing updated info to a specific user URL is not allowed by Eve.
resp = self.client.post('/users/%s' % user_id,
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
headers={'Authorization': self.make_header('nonexpired-main'),
'Content-Type': 'application/json'})
self.assertEqual(405, resp.status_code)
# PUT and PATCH should not be allowed.
resp = self.client.put('/users/%s' % user_id,
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
headers={'Authorization': self.make_header('nonexpired-main'),
'Content-Type': 'application/json'})
self.assertEqual(403, resp.status_code)
updated_fields = {'roles': ['admin', 'subscriber', 'demo']}
resp = self.client.patch('/users/%s' % user_id,
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
headers={'Authorization': self.make_header('nonexpired-main'),
'Content-Type': 'application/json'})
self.assertEqual(405, resp.status_code)
# After all of this, the roles should be the same.
with self.app.test_request_context(
headers={'Authorization': self.make_header('nonexpired-main')}):
self.assertTrue(auth.validate_token())
users = self.app.data.driver.db['users']
db_user = users.find_one(user_id)
self.assertEqual([u'subscriber'], db_user['roles'])
class UserListTests(AbstractPillarTest):
"""Security-related tests."""
def setUp(self, **kwargs):
super(UserListTests, self).setUp()
self.create_user(roles=[u'subscriber'], user_id='123456789abc123456789abc')
self.create_user(roles=[u'admin'], user_id='223456789abc123456789abc')
self.create_user(roles=[u'subscriber'], user_id='323456789abc123456789abc')
self.create_valid_auth_token('123456789abc123456789abc', 'token')
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
self.create_valid_auth_token('323456789abc123456789abc', 'other-token')
def test_list_all_users_anonymous(self):
# Anonymous access should be denied.
resp = self.client.get('/users')
self.assertEqual(403, resp.status_code)
def test_list_all_users_subscriber(self):
# Regular access should result in only your own info.
resp = self.client.get('/users', headers={'Authorization': self.make_header('token')})
users = json.loads(resp.data)
self.assertEqual(200, resp.status_code)
self.assertEqual(1, users['_meta']['total'])
# The 'auth' section should be removed.
user_info = users['_items'][0]
self.assertNotIn('auth', user_info)
def test_list_all_users_admin(self):
# Admin access should result in all users
resp = self.client.get('/users', headers={'Authorization': self.make_header('admin-token')})
users = json.loads(resp.data)
self.assertEqual(200, resp.status_code)
self.assertEqual(3, users['_meta']['total'])
# The 'auth' section should be removed.
for user_info in users['_items']:
self.assertNotIn('auth', user_info)
def test_list_all_users_admin_explicit_projection(self):
# Admin access should result in all users
projection = json.dumps({'auth': 1})
resp = self.client.get('/users?projection=%s' % projection,
headers={'Authorization': self.make_header('admin-token')})
users = json.loads(resp.data)
self.assertEqual(200, resp.status_code)
self.assertEqual(3, users['_meta']['total'])
# The 'auth' section should be removed.
for user_info in users['_items']:
self.assertNotIn('auth', user_info)
def test_own_user_subscriber(self):
# Regular access should result in only your own info.
resp = self.client.get('/users/%s' % '123456789abc123456789abc',
headers={'Authorization': self.make_header('token')})
user_info = json.loads(resp.data)
self.assertEqual(200, resp.status_code)
self.assertNotIn('auth', user_info)
def test_own_user_subscriber_explicit_projection(self):
# With a custom projection requesting the auth list
projection = json.dumps({'auth': 1})
resp = self.client.get('/users/%s?projection=%s' % ('123456789abc123456789abc', projection),
headers={'Authorization': self.make_header('token')})
user_info = json.loads(resp.data)
self.assertEqual(200, resp.status_code)
self.assertNotIn('auth', user_info)
def test_other_user_subscriber(self):
# Requesting another user should be denied.
resp = self.client.get('/users/%s' % '223456789abc123456789abc',
headers={'Authorization': self.make_header('token')})
user_info = json.loads(resp.data)
self.assertEqual(403, resp.status_code)
self.assertNotIn('auth', user_info)
def test_put_user(self):
from application.utils import remove_private_keys
# PUTting a user should work, and not mess up the auth field.
resp = self.client.get('/users/123456789abc123456789abc',
headers={'Authorization': self.make_header('token')})
user_info = json.loads(resp.data)
put_user = remove_private_keys(user_info)
resp = self.client.put('/users/123456789abc123456789abc',
headers={'Authorization': self.make_header('token'),
'Content-Type': 'application/json',
'If-Match': user_info['_etag']},
data=json.dumps(put_user))
self.assertEqual(200, resp.status_code, resp.data)
# Get directly from MongoDB, Eve blocks access to the auth field.
with self.app.test_request_context():
users = self.app.data.driver.db['users']
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
self.assertIn('auth', db_user)
def test_put_other_user(self):
from application.utils import remove_private_keys
# PUTting the user as another user should fail.
resp = self.client.get('/users/123456789abc123456789abc',
headers={'Authorization': self.make_header('token')})
user_info = json.loads(resp.data)
put_user = remove_private_keys(user_info)
resp = self.client.put('/users/123456789abc123456789abc',
headers={'Authorization': self.make_header('other-token'),
'Content-Type': 'application/json',
'If-Match': user_info['_etag']},
data=json.dumps(put_user))
self.assertEqual(403, resp.status_code, resp.data)
def test_put_admin(self):
from application.utils import remove_private_keys
# PUTting a user should work, and not mess up the auth field.
resp = self.client.get('/users/123456789abc123456789abc',
headers={'Authorization': self.make_header('token')})
user_info = json.loads(resp.data)
put_user = remove_private_keys(user_info)
resp = self.client.put('/users/123456789abc123456789abc',
headers={'Authorization': self.make_header('admin-token'),
'Content-Type': 'application/json',
'If-Match': user_info['_etag']},
data=json.dumps(put_user))
self.assertEqual(200, resp.status_code, resp.data)
# Get directly from MongoDB, Eve blocks access to the auth field.
with self.app.test_request_context():
users = self.app.data.driver.db['users']
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
self.assertIn('auth', db_user)
def test_post(self):
"""POSTing to /users should fail for subscribers and admins alike."""
post_user = {
'username': 'unique-user-name',
'groups': [],
'roles': ['subscriber'],
'settings': {'email_communications': 1},
'auth': [],
'full_name': u'คนรักของผัดไทย',
'email': TEST_EMAIL_ADDRESS,
}
resp = self.client.post('/users',
headers={'Authorization': self.make_header('token'),
'Content-Type': 'application/json'},
data=json.dumps(post_user))
self.assertEqual(405, resp.status_code, resp.data)
resp = self.client.post('/users',
headers={'Authorization': self.make_header('admin-token'),
'Content-Type': 'application/json'},
data=json.dumps(post_user))
self.assertEqual(405, resp.status_code, resp.data)
def test_delete(self):
"""DELETING a user should fail for subscribers and admins alike."""
resp = self.client.delete('/users/323456789abc123456789abc',
headers={'Authorization': self.make_header('token')})
self.assertEqual(405, resp.status_code, resp.data)
resp = self.client.delete('/users/323456789abc123456789abc',
headers={'Authorization': self.make_header('admin-token')})
self.assertEqual(405, resp.status_code, resp.data)