2016-04-26 11:04:28 +02:00
|
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
|
2016-04-15 16:27:24 +02:00
|
|
|
import datetime
|
2016-04-13 15:33:54 +02:00
|
|
|
import responses
|
2016-04-19 16:00:32 +02:00
|
|
|
import json
|
2016-04-26 11:04:28 +02:00
|
|
|
from bson import tz_util, ObjectId
|
2016-03-04 14:08:10 +01:00
|
|
|
|
2016-03-25 16:05:36 +01:00
|
|
|
from common_test_class import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
|
2016-03-25 12:22:31 +01:00
|
|
|
|
2016-04-26 17:27:56 +02:00
|
|
|
PUBLIC_USER_FIELDS = {'full_name', 'email'}
|
|
|
|
|
2016-03-04 14:47:48 +01:00
|
|
|
|
2016-03-25 15:57:17 +01:00
|
|
|
class AuthenticationTests(AbstractPillarTest):
|
2016-03-04 14:08:10 +01:00
|
|
|
def test_make_unique_username(self):
|
2016-03-25 15:57:17 +01:00
|
|
|
from application.utils import authentication as auth
|
2016-03-04 14:08:10 +01:00
|
|
|
|
2016-03-25 15:57:17 +01:00
|
|
|
with self.app.test_request_context():
|
2016-03-04 14:08:10 +01:00
|
|
|
# This user shouldn't exist yet.
|
2016-03-04 14:49:48 +01:00
|
|
|
self.assertEqual(TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
2016-03-04 14:08:10 +01:00
|
|
|
|
|
|
|
# Add a user, then test again.
|
2016-03-04 14:49:48 +01:00
|
|
|
auth.create_new_user(TEST_EMAIL_ADDRESS, TEST_EMAIL_USER, 'test1234')
|
2016-03-04 15:19:01 +01:00
|
|
|
self.assertEqual('%s1' % TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
2016-03-04 14:47:48 +01:00
|
|
|
|
2016-04-13 15:33:54 +02:00
|
|
|
@responses.activate
|
2016-03-04 18:43:20 +01:00
|
|
|
def test_validate_token__not_logged_in(self):
|
2016-03-25 15:57:17 +01:00
|
|
|
from application.utils import authentication as auth
|
|
|
|
|
|
|
|
with self.app.test_request_context():
|
2016-03-04 18:43:20 +01:00
|
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
|
2016-04-13 15:33:54 +02:00
|
|
|
@responses.activate
|
2016-03-04 14:47:48 +01:00
|
|
|
def test_validate_token__unknown_token(self):
|
2016-03-04 15:19:01 +01:00
|
|
|
"""Test validating of invalid token, unknown both to us and Blender ID."""
|
|
|
|
|
2016-03-25 15:57:17 +01:00
|
|
|
from application.utils import authentication as auth
|
|
|
|
|
2016-04-13 15:33:54 +02:00
|
|
|
self.mock_blenderid_validate_unhappy()
|
2016-04-12 15:24:50 +02:00
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('unknowntoken')}):
|
2016-03-04 14:49:48 +01:00
|
|
|
self.assertFalse(auth.validate_token())
|
2016-03-04 15:19:01 +01:00
|
|
|
|
2016-04-13 15:33:54 +02:00
|
|
|
@responses.activate
|
2016-03-04 15:19:01 +01:00
|
|
|
def test_validate_token__unknown_but_valid_token(self):
|
|
|
|
"""Test validating of valid token, unknown to us but known to Blender ID."""
|
|
|
|
|
2016-03-25 15:57:17 +01:00
|
|
|
from application.utils import authentication as auth
|
|
|
|
|
2016-04-13 15:33:54 +02:00
|
|
|
self.mock_blenderid_validate_happy()
|
2016-04-12 15:24:50 +02:00
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('knowntoken')}):
|
2016-03-04 15:19:01 +01:00
|
|
|
self.assertTrue(auth.validate_token())
|
2016-04-15 16:27:24 +02:00
|
|
|
|
|
|
|
@responses.activate
|
|
|
|
def test_find_token(self):
|
|
|
|
"""Test finding of various tokens."""
|
|
|
|
|
|
|
|
from application.utils import authentication as auth
|
|
|
|
|
|
|
|
user_id = self.create_user()
|
|
|
|
|
|
|
|
now = datetime.datetime.now(tz_util.utc)
|
|
|
|
future = now + datetime.timedelta(days=1)
|
|
|
|
past = now - datetime.timedelta(days=1)
|
|
|
|
subclient = self.app.config['BLENDER_ID_SUBCLIENT_ID']
|
|
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
|
auth.store_token(user_id, 'nonexpired-main', future, None)
|
|
|
|
auth.store_token(user_id, 'nonexpired-sub', future, subclient)
|
|
|
|
token3 = auth.store_token(user_id, 'expired-sub', past, subclient)
|
|
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main')}):
|
|
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main', subclient)}):
|
|
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-sub')}):
|
|
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-sub', subclient)}):
|
|
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('expired-sub', subclient)}):
|
|
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
|
|
|
|
self.mock_blenderid_validate_happy()
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('expired-sub', subclient)}):
|
|
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
|
|
|
|
# We now should be able to find a new token for this user.
|
|
|
|
found_token = auth.find_token('expired-sub', subclient)
|
|
|
|
self.assertIsNotNone(found_token)
|
|
|
|
self.assertNotEqual(token3['_id'], found_token['_id'])
|
2016-04-19 16:00:32 +02:00
|
|
|
|
|
|
|
@responses.activate
|
|
|
|
def test_save_own_user(self):
|
|
|
|
"""Tests that a user can't change their own fields."""
|
|
|
|
|
|
|
|
from application.utils import authentication as auth
|
|
|
|
from application.utils import PillarJSONEncoder, remove_private_keys
|
|
|
|
|
|
|
|
user_id = self.create_user(roles=[u'subscriber'])
|
|
|
|
|
|
|
|
now = datetime.datetime.now(tz_util.utc)
|
|
|
|
future = now + datetime.timedelta(days=1)
|
|
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
|
auth.store_token(user_id, 'nonexpired-main', future, None)
|
|
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main')}):
|
|
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
|
|
|
|
users = self.app.data.driver.db['users']
|
|
|
|
db_user = users.find_one(user_id)
|
|
|
|
|
|
|
|
updated_fields = remove_private_keys(db_user)
|
|
|
|
updated_fields['roles'] = ['admin', 'subscriber', 'demo'] # Try to elevate our roles.
|
|
|
|
|
|
|
|
# POSTing updated info to a specific user URL is not allowed by Eve.
|
|
|
|
resp = self.client.post('/users/%s' % user_id,
|
|
|
|
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main'),
|
|
|
|
'Content-Type': 'application/json'})
|
|
|
|
self.assertEqual(405, resp.status_code)
|
|
|
|
|
|
|
|
# PUT and PATCH should not be allowed.
|
|
|
|
resp = self.client.put('/users/%s' % user_id,
|
|
|
|
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main'),
|
|
|
|
'Content-Type': 'application/json'})
|
|
|
|
self.assertEqual(403, resp.status_code)
|
|
|
|
|
|
|
|
updated_fields = {'roles': ['admin', 'subscriber', 'demo']}
|
|
|
|
resp = self.client.patch('/users/%s' % user_id,
|
|
|
|
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main'),
|
|
|
|
'Content-Type': 'application/json'})
|
2016-04-26 11:04:28 +02:00
|
|
|
self.assertEqual(405, resp.status_code)
|
2016-04-19 16:00:32 +02:00
|
|
|
|
|
|
|
# After all of this, the roles should be the same.
|
|
|
|
with self.app.test_request_context(
|
|
|
|
headers={'Authorization': self.make_header('nonexpired-main')}):
|
|
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
|
|
|
|
users = self.app.data.driver.db['users']
|
|
|
|
db_user = users.find_one(user_id)
|
|
|
|
|
|
|
|
self.assertEqual([u'subscriber'], db_user['roles'])
|
2016-04-26 10:45:54 +02:00
|
|
|
|
|
|
|
|
|
|
|
class UserListTests(AbstractPillarTest):
|
|
|
|
"""Security-related tests."""
|
|
|
|
|
|
|
|
def setUp(self, **kwargs):
|
|
|
|
super(UserListTests, self).setUp()
|
|
|
|
|
|
|
|
self.create_user(roles=[u'subscriber'], user_id='123456789abc123456789abc')
|
|
|
|
self.create_user(roles=[u'admin'], user_id='223456789abc123456789abc')
|
|
|
|
self.create_user(roles=[u'subscriber'], user_id='323456789abc123456789abc')
|
|
|
|
|
|
|
|
self.create_valid_auth_token('123456789abc123456789abc', 'token')
|
|
|
|
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
|
2016-04-26 11:04:28 +02:00
|
|
|
self.create_valid_auth_token('323456789abc123456789abc', 'other-token')
|
2016-04-26 10:45:54 +02:00
|
|
|
|
|
|
|
def test_list_all_users_anonymous(self):
|
2016-04-26 17:27:56 +02:00
|
|
|
# Listing all users should be forbidden
|
2016-04-26 10:45:54 +02:00
|
|
|
resp = self.client.get('/users')
|
|
|
|
self.assertEqual(403, resp.status_code)
|
|
|
|
|
|
|
|
def test_list_all_users_subscriber(self):
|
|
|
|
# Regular access should result in only your own info.
|
|
|
|
resp = self.client.get('/users', headers={'Authorization': self.make_header('token')})
|
|
|
|
users = json.loads(resp.data)
|
|
|
|
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
self.assertEqual(1, users['_meta']['total'])
|
|
|
|
|
|
|
|
# The 'auth' section should be removed.
|
|
|
|
user_info = users['_items'][0]
|
|
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
|
|
|
|
def test_list_all_users_admin(self):
|
|
|
|
# Admin access should result in all users
|
|
|
|
resp = self.client.get('/users', headers={'Authorization': self.make_header('admin-token')})
|
|
|
|
users = json.loads(resp.data)
|
|
|
|
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
self.assertEqual(3, users['_meta']['total'])
|
|
|
|
|
|
|
|
# The 'auth' section should be removed.
|
|
|
|
for user_info in users['_items']:
|
|
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
|
|
|
|
def test_list_all_users_admin_explicit_projection(self):
|
|
|
|
# Admin access should result in all users
|
|
|
|
projection = json.dumps({'auth': 1})
|
|
|
|
resp = self.client.get('/users?projection=%s' % projection,
|
|
|
|
headers={'Authorization': self.make_header('admin-token')})
|
|
|
|
users = json.loads(resp.data)
|
|
|
|
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
self.assertEqual(3, users['_meta']['total'])
|
|
|
|
|
|
|
|
# The 'auth' section should be removed.
|
|
|
|
for user_info in users['_items']:
|
|
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
|
2016-04-26 17:27:56 +02:00
|
|
|
def test_user_anonymous(self):
|
|
|
|
from application.utils import remove_private_keys
|
|
|
|
|
|
|
|
# Getting a user should be limited to certain fields
|
|
|
|
resp = self.client.get('/users/123456789abc123456789abc')
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
regular_info = remove_private_keys(user_info)
|
|
|
|
self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys()))
|
|
|
|
|
2016-04-26 10:45:54 +02:00
|
|
|
def test_own_user_subscriber(self):
|
|
|
|
# Regular access should result in only your own info.
|
2016-04-26 17:27:56 +02:00
|
|
|
resp = self.client.get('/users/123456789abc123456789abc',
|
2016-04-26 10:45:54 +02:00
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
|
|
|
|
def test_own_user_subscriber_explicit_projection(self):
|
|
|
|
# With a custom projection requesting the auth list
|
|
|
|
projection = json.dumps({'auth': 1})
|
|
|
|
resp = self.client.get('/users/%s?projection=%s' % ('123456789abc123456789abc', projection),
|
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
|
|
|
|
def test_other_user_subscriber(self):
|
2016-04-26 17:27:56 +02:00
|
|
|
from application.utils import remove_private_keys
|
|
|
|
|
|
|
|
# Requesting another user should be limited to full name and email.
|
2016-04-26 10:45:54 +02:00
|
|
|
resp = self.client.get('/users/%s' % '223456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
|
2016-04-26 17:27:56 +02:00
|
|
|
self.assertEqual(200, resp.status_code)
|
2016-04-26 10:45:54 +02:00
|
|
|
self.assertNotIn('auth', user_info)
|
2016-04-26 11:04:28 +02:00
|
|
|
|
2016-04-26 17:27:56 +02:00
|
|
|
regular_info = remove_private_keys(user_info)
|
|
|
|
self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys()))
|
|
|
|
|
2016-04-26 11:04:28 +02:00
|
|
|
def test_put_user(self):
|
|
|
|
from application.utils import remove_private_keys
|
|
|
|
|
|
|
|
# PUTting a user should work, and not mess up the auth field.
|
|
|
|
resp = self.client.get('/users/123456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
put_user = remove_private_keys(user_info)
|
|
|
|
|
|
|
|
resp = self.client.put('/users/123456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('token'),
|
|
|
|
'Content-Type': 'application/json',
|
|
|
|
'If-Match': user_info['_etag']},
|
|
|
|
data=json.dumps(put_user))
|
|
|
|
self.assertEqual(200, resp.status_code, resp.data)
|
|
|
|
|
|
|
|
# Get directly from MongoDB, Eve blocks access to the auth field.
|
|
|
|
with self.app.test_request_context():
|
|
|
|
users = self.app.data.driver.db['users']
|
|
|
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
|
|
|
self.assertIn('auth', db_user)
|
|
|
|
|
|
|
|
def test_put_other_user(self):
|
|
|
|
from application.utils import remove_private_keys
|
|
|
|
|
|
|
|
# PUTting the user as another user should fail.
|
|
|
|
resp = self.client.get('/users/123456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
put_user = remove_private_keys(user_info)
|
|
|
|
|
|
|
|
resp = self.client.put('/users/123456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('other-token'),
|
|
|
|
'Content-Type': 'application/json',
|
|
|
|
'If-Match': user_info['_etag']},
|
|
|
|
data=json.dumps(put_user))
|
|
|
|
self.assertEqual(403, resp.status_code, resp.data)
|
|
|
|
|
|
|
|
def test_put_admin(self):
|
|
|
|
from application.utils import remove_private_keys
|
|
|
|
|
|
|
|
# PUTting a user should work, and not mess up the auth field.
|
|
|
|
resp = self.client.get('/users/123456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
user_info = json.loads(resp.data)
|
|
|
|
put_user = remove_private_keys(user_info)
|
|
|
|
|
|
|
|
resp = self.client.put('/users/123456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('admin-token'),
|
|
|
|
'Content-Type': 'application/json',
|
|
|
|
'If-Match': user_info['_etag']},
|
|
|
|
data=json.dumps(put_user))
|
|
|
|
self.assertEqual(200, resp.status_code, resp.data)
|
|
|
|
|
|
|
|
# Get directly from MongoDB, Eve blocks access to the auth field.
|
|
|
|
with self.app.test_request_context():
|
|
|
|
users = self.app.data.driver.db['users']
|
|
|
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
|
|
|
self.assertIn('auth', db_user)
|
|
|
|
|
|
|
|
def test_post(self):
|
|
|
|
"""POSTing to /users should fail for subscribers and admins alike."""
|
|
|
|
|
|
|
|
post_user = {
|
|
|
|
'username': 'unique-user-name',
|
|
|
|
'groups': [],
|
|
|
|
'roles': ['subscriber'],
|
|
|
|
'settings': {'email_communications': 1},
|
|
|
|
'auth': [],
|
|
|
|
'full_name': u'คนรักของผัดไทย',
|
|
|
|
'email': TEST_EMAIL_ADDRESS,
|
|
|
|
}
|
|
|
|
|
|
|
|
resp = self.client.post('/users',
|
|
|
|
headers={'Authorization': self.make_header('token'),
|
|
|
|
'Content-Type': 'application/json'},
|
|
|
|
data=json.dumps(post_user))
|
|
|
|
self.assertEqual(405, resp.status_code, resp.data)
|
|
|
|
|
|
|
|
resp = self.client.post('/users',
|
|
|
|
headers={'Authorization': self.make_header('admin-token'),
|
|
|
|
'Content-Type': 'application/json'},
|
|
|
|
data=json.dumps(post_user))
|
|
|
|
self.assertEqual(405, resp.status_code, resp.data)
|
|
|
|
|
|
|
|
def test_delete(self):
|
|
|
|
"""DELETING a user should fail for subscribers and admins alike."""
|
|
|
|
|
|
|
|
resp = self.client.delete('/users/323456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('token')})
|
|
|
|
self.assertEqual(405, resp.status_code, resp.data)
|
|
|
|
|
|
|
|
resp = self.client.delete('/users/323456789abc123456789abc',
|
|
|
|
headers={'Authorization': self.make_header('admin-token')})
|
|
|
|
self.assertEqual(405, resp.status_code, resp.data)
|