891 lines
36 KiB
Python
891 lines
36 KiB
Python
# -*- encoding: utf-8 -*-
|
|
|
|
import copy
|
|
import datetime
|
|
import json
|
|
|
|
import pillar.tests.common_test_data as ctd
|
|
import responses
|
|
from bson import tz_util, ObjectId
|
|
from pillar.tests import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
|
|
from pillar.tests.common_test_data import EXAMPLE_NODE
|
|
from werkzeug.exceptions import Forbidden
|
|
|
|
PUBLIC_USER_FIELDS = {'full_name', 'email', 'username'}
|
|
|
|
# Use the example project with some additional permissions for these tests.
|
|
EXAMPLE_PROJECT = copy.deepcopy(ctd.EXAMPLE_PROJECT)
|
|
|
|
_texture_nt = next(nt for nt in EXAMPLE_PROJECT['node_types']
|
|
if nt['name'] == 'texture')
|
|
_texture_nt['permissions'] = {'groups': [
|
|
{'group': ObjectId('5596e975ea893b269af85c0f'), 'methods': ['GET']},
|
|
{'group': ObjectId('564733b56dcaf85da2faee8a'), 'methods': ['GET']}
|
|
]}
|
|
|
|
_asset_nt = next(nt for nt in EXAMPLE_PROJECT['node_types']
|
|
if nt['name'] == 'asset')
|
|
_asset_nt['permissions'] = {'groups': [
|
|
{'group': ObjectId('5596e975ea893b269af85c0f'), 'methods': ['DELETE', 'GET']},
|
|
{'group': ObjectId('564733b56dcaf85da2faee8a'), 'methods': ['GET']}
|
|
]}
|
|
|
|
|
|
class AuthenticationTests(AbstractPillarTest):
|
|
def test_make_unique_username(self):
|
|
from pillar.api.utils import authentication as auth
|
|
|
|
with self.app.test_request_context():
|
|
# This user shouldn't exist yet.
|
|
self.assertEqual(TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
|
|
|
# Add a user, then test again.
|
|
auth.create_new_user(TEST_EMAIL_ADDRESS, TEST_EMAIL_USER, 'test1234')
|
|
self.assertEqual('%s1' % TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
|
|
|
@responses.activate
|
|
def test_validate_token__not_logged_in(self):
|
|
from pillar.api.utils import authentication as auth
|
|
|
|
with self.app.test_request_context():
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
@responses.activate
|
|
def test_validate_token__force_logged_in(self):
|
|
from pillar.api.utils import authentication as auth
|
|
from pillar.auth import UserClass
|
|
|
|
self.mock_blenderid_validate_happy()
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('knowntoken')}):
|
|
from flask import g
|
|
g.current_user = UserClass('12345')
|
|
self.assertTrue(g.current_user.is_authenticated)
|
|
|
|
self.assertTrue(auth.validate_token(force=True))
|
|
self.assertTrue(g.current_user.is_authenticated)
|
|
|
|
@responses.activate
|
|
def test_validate_token__force_not_logged_in(self):
|
|
from pillar.api.utils import authentication as auth
|
|
from pillar.auth import UserClass, current_user
|
|
|
|
with self.app.test_request_context():
|
|
from flask import g
|
|
g.current_user = UserClass('12345')
|
|
self.assertTrue(g.current_user.is_authenticated)
|
|
|
|
self.assertFalse(auth.validate_token(force=True))
|
|
self.assertFalse(g.current_user.is_authenticated)
|
|
|
|
@responses.activate
|
|
def test_validate_token__unknown_token(self):
|
|
"""Test validating of invalid token, unknown both to us and Blender ID."""
|
|
|
|
from pillar.api.utils import authentication as auth
|
|
|
|
self.mock_blenderid_validate_unhappy()
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('unknowntoken')}):
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
@responses.activate
|
|
def test_validate_token__unknown_but_valid_token(self):
|
|
"""Test validating of valid token, unknown to us but known to Blender ID."""
|
|
|
|
from pillar.api.utils import authentication as auth
|
|
|
|
self.mock_blenderid_validate_happy()
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('knowntoken')}):
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
@responses.activate
|
|
def test_find_token(self):
|
|
"""Test finding of various tokens."""
|
|
|
|
from pillar.api.utils import authentication as auth
|
|
|
|
user_id = self.create_user()
|
|
|
|
with self.app.app_context():
|
|
now = datetime.datetime.now(tz_util.utc)
|
|
future = now + datetime.timedelta(days=1)
|
|
past = now - datetime.timedelta(days=1)
|
|
subclient = self.app.config['BLENDER_ID_SUBCLIENT_ID']
|
|
|
|
auth.store_token(user_id, 'nonexpired-main', future, None)
|
|
auth.store_token(user_id, 'nonexpired-sub', future, subclient)
|
|
token3 = auth.store_token(user_id, 'expired-sub', past, subclient)
|
|
|
|
# We should not find the given tokens as unhashed tokens.
|
|
tokens_coll = self.app.db('tokens')
|
|
self.assertIsNone(tokens_coll.find_one({'token': 'nonespired-main'}))
|
|
self.assertIsNone(tokens_coll.find_one({'token': 'nonespired-sub'}))
|
|
self.assertIsNone(tokens_coll.find_one({'token': 'expired-sub'}))
|
|
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('nonexpired-main')}):
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('nonexpired-main', subclient)}):
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('nonexpired-sub')}):
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('nonexpired-sub', subclient)}):
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('expired-sub', subclient)}):
|
|
self.assertFalse(auth.validate_token())
|
|
|
|
self.mock_blenderid_validate_happy()
|
|
with self.app.test_request_context(
|
|
headers={'Authorization': self.make_header('expired-sub', subclient)}):
|
|
self.assertTrue(auth.validate_token())
|
|
|
|
# We now should be able to find a new token for this user.
|
|
found_token = auth.find_token('expired-sub', subclient)
|
|
self.assertIsNotNone(found_token)
|
|
self.assertNotEqual(token3['_id'], found_token['_id'])
|
|
|
|
@responses.activate
|
|
def test_save_own_user(self):
|
|
"""Tests that a user can't change their own fields."""
|
|
|
|
from pillar.api.utils import authentication as auth
|
|
from pillar.api.utils import PillarJSONEncoder, remove_private_keys
|
|
|
|
user_id = self.create_user(roles=['subscriber'], token='token')
|
|
|
|
def fetch_user():
|
|
with self.app.test_request_context():
|
|
users_coll = self.app.db('users')
|
|
return users_coll.find_one(user_id)
|
|
|
|
db_user = fetch_user()
|
|
updated_fields = remove_private_keys(db_user)
|
|
updated_fields['roles'] = ['admin', 'subscriber', 'demo'] # Try to elevate our roles.
|
|
|
|
# POSTing updated info to a specific user URL is not allowed by Eve.
|
|
self.post('/api/users/%s' % user_id,
|
|
json=updated_fields,
|
|
auth_token='token',
|
|
expected_status=405)
|
|
|
|
# PUT is allowed, but shouldn't change roles.
|
|
self.put('/api/users/%s' % user_id,
|
|
json=updated_fields,
|
|
auth_token='token',
|
|
etag=db_user['_etag'])
|
|
db_user = fetch_user()
|
|
self.assertEqual(['subscriber'], db_user['roles'])
|
|
|
|
# PATCH should not be allowed.
|
|
updated_fields = {'roles': ['admin', 'subscriber', 'demo']}
|
|
self.patch('/api/users/%s' % user_id,
|
|
json=updated_fields,
|
|
auth_token='token',
|
|
etag=db_user['_etag'],
|
|
expected_status=405)
|
|
db_user = fetch_user()
|
|
self.assertEqual(['subscriber'], db_user['roles'])
|
|
|
|
def test_token_expiry(self):
|
|
"""Expired tokens should be deleted from the database."""
|
|
|
|
# Insert long-expired, almost-expired and not-expired token.
|
|
user_id = self.create_user()
|
|
now = datetime.datetime.now(tz_util.utc)
|
|
|
|
with self.app.test_request_context():
|
|
from pillar.api.utils import authentication as auth
|
|
|
|
tokdat_le = auth.store_token(user_id, 'long-expired',
|
|
now - datetime.timedelta(days=365), None)
|
|
tokdat_se = auth.store_token(user_id, 'short-expired',
|
|
now - datetime.timedelta(seconds=5), None)
|
|
tokdat_ne = auth.store_token(user_id, 'not-expired',
|
|
now + datetime.timedelta(days=1), None)
|
|
|
|
# Validation should clean up old tokens.
|
|
auth.validate_this_token('je', 'moeder')
|
|
|
|
token_coll = self.app.data.driver.db['tokens']
|
|
self.assertEqual({tokdat_se['token_hashed'], tokdat_ne['token_hashed']},
|
|
{item['token_hashed'] for item in token_coll.find()})
|
|
|
|
def test_token_hashing_cli(self):
|
|
from dateutil.parser import parse
|
|
|
|
with self.app.app_context():
|
|
user_id1 = self.create_user(24 * 'a')
|
|
user_id2 = self.create_user(24 * 'b')
|
|
now = datetime.datetime.now(tz_util.utc)
|
|
|
|
# Force unhashed tokens into our database.
|
|
tokens_coll = self.app.db('tokens')
|
|
tokens_coll.insert_one({
|
|
'_id': ObjectId('59c0f8ef98377327e1525cd1'),
|
|
'_etag': '3b8fffa5177e87555acd95e49e6764cb81de5d70',
|
|
'_created': parse('2017-09-19T13:01:03.000+0200'),
|
|
'_updated': parse('2017-09-19T13:01:03.000+0200'),
|
|
'user': user_id1,
|
|
'token': 'unhashed-token',
|
|
'expire_time': now + datetime.timedelta(hours=1),
|
|
})
|
|
tokens_coll.insert_one({
|
|
'_id': ObjectId(24 * 'c'),
|
|
'_etag': '3b8fffa5177e87555acd95e49e6764cb81de5d70',
|
|
'_created': parse('2017-09-20T13:01:03.000+0200'),
|
|
'_updated': parse('2017-09-20T13:01:03.000+0200'),
|
|
'user': user_id2,
|
|
'token': 'some-other-token',
|
|
'expire_time': now + datetime.timedelta(hours=1),
|
|
})
|
|
|
|
# This token should work.
|
|
me1 = self.get('/api/users/me', auth_token='unhashed-token').get_json()
|
|
self.assertEqual(str(user_id1), me1['_id'])
|
|
me2 = self.get('/api/users/me', auth_token='some-other-token').get_json()
|
|
self.assertEqual(str(user_id2), me2['_id'])
|
|
|
|
with self.app.app_context():
|
|
from pillar.cli.operations import hash_auth_tokens
|
|
hash_auth_tokens()
|
|
|
|
# The same token should still work, but be hashed in the DB.
|
|
me1 = self.get('/api/users/me', auth_token='unhashed-token').get_json()
|
|
self.assertEqual(str(user_id1), me1['_id'])
|
|
me2 = self.get('/api/users/me', auth_token='some-other-token').get_json()
|
|
self.assertEqual(str(user_id2), me2['_id'])
|
|
|
|
db_token = tokens_coll.find_one({'_id': ObjectId('59c0f8ef98377327e1525cd1')})
|
|
self.assertNotIn('token', db_token)
|
|
self.assertIn('token_hashed', db_token)
|
|
|
|
db_token = tokens_coll.find_one({'_id': ObjectId('59c0f8ef98377327e1525cd1')})
|
|
self.assertNotIn('token', db_token)
|
|
self.assertIn('token_hashed', db_token)
|
|
|
|
|
|
class UserListTests(AbstractPillarTest):
|
|
"""Security-related tests."""
|
|
|
|
def setUp(self, **kwargs):
|
|
super(UserListTests, self).setUp()
|
|
|
|
self.create_user(roles=['subscriber'], user_id='123456789abc123456789abc')
|
|
self.create_user(roles=['admin'], user_id='223456789abc123456789abc')
|
|
self.create_user(roles=['subscriber'], user_id='323456789abc123456789abc')
|
|
|
|
self.create_valid_auth_token('123456789abc123456789abc', 'token')
|
|
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
|
|
self.create_valid_auth_token('323456789abc123456789abc', 'other-token')
|
|
|
|
def test_list_all_users_anonymous(self):
|
|
# Listing all users should be forbidden
|
|
self.get('/api/users', expected_status=403)
|
|
|
|
def test_list_all_users_subscriber(self):
|
|
# Regular access should result in only your own info.
|
|
users = self.get('/api/users', auth_token='token').json()
|
|
self.assertEqual(1, users['_meta']['total'])
|
|
|
|
# The 'auth' section should be removed.
|
|
user_info = users['_items'][0]
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
def test_list_all_users_admin(self):
|
|
# Admin access should result in all users
|
|
users = self.get('/api/users', auth_token='admin-token').json()
|
|
self.assertEqual(3, users['_meta']['total'])
|
|
|
|
# The 'auth' section should be removed.
|
|
for user_info in users['_items']:
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
def test_list_all_users_admin_explicit_projection(self):
|
|
"""Even admins shouldn't be able to GET auth info."""
|
|
|
|
projection = json.dumps({'auth': 1})
|
|
users = self.get(f'/api/users?projection={projection}', auth_token='admin-token').json()
|
|
self.assertEqual(3, users['_meta']['total'])
|
|
|
|
# The 'auth' section should be removed.
|
|
for user_info in users['_items']:
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
def test_user_anonymous(self):
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
# Getting a user should be limited to certain fields
|
|
resp = self.get('/api/users/123456789abc123456789abc')
|
|
|
|
user_info = json.loads(resp.data)
|
|
regular_info = remove_private_keys(user_info)
|
|
self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys()))
|
|
|
|
def test_own_user_subscriber(self):
|
|
# Regular access should result in only your own info.
|
|
user_info = self.get('/api/users/123456789abc123456789abc', auth_token='token').json()
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
def test_own_user_subscriber_explicit_projection(self):
|
|
# With a custom projection requesting the auth list
|
|
projection = json.dumps({'auth': 1})
|
|
user_info = self.get(f'/api/users/123456789abc123456789abc?projection={projection}',
|
|
auth_token='token').json()
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
def test_other_user_subscriber(self):
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
# Requesting another user should be limited to full name and email.
|
|
user_info = self.get('/api/users/223456789abc123456789abc', auth_token='token').json()
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
regular_info = remove_private_keys(user_info)
|
|
self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys()))
|
|
|
|
def test_put_user(self):
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
# PUTting a user should work, and not mess up the auth field.
|
|
user_info = self.get('/api/users/123456789abc123456789abc', auth_token='token').json()
|
|
self.assertNotIn('auth', user_info)
|
|
|
|
put_user = remove_private_keys(user_info)
|
|
self.put('/api/users/123456789abc123456789abc',
|
|
auth_token='token',
|
|
etag=user_info['_etag'],
|
|
json=put_user)
|
|
|
|
# Get directly from MongoDB, Eve blocks access to the auth field.
|
|
with self.app.test_request_context():
|
|
users = self.app.data.driver.db['users']
|
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
|
self.assertIn('auth', db_user)
|
|
|
|
def test_put_user_restricted_fields(self):
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
group_ids = self.create_standard_groups()
|
|
|
|
# A user should be able to change only some fields, but not all.
|
|
user_info = self.get('/api/users/me', auth_token='token').json()
|
|
|
|
# Alter all fields (except auth, another test already checks that that's uneditable).
|
|
put_user = remove_private_keys(user_info)
|
|
put_user['full_name'] = '¿new name?'
|
|
put_user['username'] = 'üniék'
|
|
put_user['email'] = 'new+email@example.com'
|
|
put_user['roles'] = ['subscriber', 'demo', 'admin', 'service', 'flamenco_manager']
|
|
put_user['groups'] = list(group_ids.keys())
|
|
put_user['settings']['email_communications'] = 0
|
|
put_user['service'] = {'flamenco_manager': {}}
|
|
|
|
self.put(f'/api/users/{user_info["_id"]}',
|
|
json=put_user,
|
|
auth_token='token',
|
|
etag=user_info['_etag'])
|
|
|
|
new_user_info = self.get('/api/users/me', auth_token='token').json()
|
|
self.assertEqual(new_user_info['full_name'], put_user['full_name'])
|
|
self.assertEqual(new_user_info['username'], put_user['username'])
|
|
self.assertEqual(new_user_info['email'], put_user['email'])
|
|
self.assertEqual(new_user_info['roles'], user_info['roles'])
|
|
self.assertEqual(new_user_info['groups'], user_info['groups'])
|
|
self.assertEqual(new_user_info['settings']['email_communications'],
|
|
put_user['settings']['email_communications'])
|
|
self.assertNotIn('service', new_user_info)
|
|
|
|
def test_put_other_user(self):
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
# PUTting the user as another user should fail.
|
|
user_info = self.get('/api/users/123456789abc123456789abc', auth_token='token').json()
|
|
put_user = remove_private_keys(user_info)
|
|
|
|
self.put('/api/users/123456789abc123456789abc', auth_token='other-token',
|
|
json=put_user, etag=user_info['_etag'],
|
|
expected_status=403)
|
|
|
|
def test_put_admin(self):
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
# PUTting a user should work, and not mess up the auth field.
|
|
user_info = self.get('/api/users/123456789abc123456789abc', auth_token='token').json()
|
|
put_user = remove_private_keys(user_info)
|
|
|
|
self.put('/api/users/123456789abc123456789abc', auth_token='admin-token',
|
|
json=put_user, etag=user_info['_etag'])
|
|
|
|
# Get directly from MongoDB, Eve blocks access to the auth field.
|
|
with self.app.test_request_context():
|
|
users = self.app.data.driver.db['users']
|
|
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
|
self.assertIn('auth', db_user)
|
|
|
|
def test_post(self):
|
|
"""POSTing to /users should fail for subscribers and admins alike."""
|
|
|
|
post_user = {
|
|
'username': 'unique-user-name',
|
|
'groups': [],
|
|
'roles': ['subscriber'],
|
|
'settings': {'email_communications': 1},
|
|
'auth': [],
|
|
'full_name': 'คนรักของผัดไทย',
|
|
'email': TEST_EMAIL_ADDRESS,
|
|
}
|
|
|
|
self.post('/api/users', auth_token='token', json=post_user, expected_status=405)
|
|
self.post('/api/users', auth_token='admin-token', json=post_user, expected_status=405)
|
|
|
|
def test_delete(self):
|
|
"""DELETING a user should fail for subscribers and admins alike."""
|
|
|
|
self.delete('/api/users', auth_token='token', expected_status=405)
|
|
self.delete('/api/users', auth_token='admin-token', expected_status=405)
|
|
|
|
|
|
class PermissionComputationTest(AbstractPillarTest):
|
|
maxDiff = None
|
|
|
|
def test_merge_permissions(self):
|
|
from pillar.api.utils.authorization import merge_permissions
|
|
|
|
with self.app.test_request_context():
|
|
self.assertEqual({}, merge_permissions())
|
|
self.assertEqual({}, merge_permissions({}))
|
|
self.assertEqual({}, merge_permissions({}, {}, {}))
|
|
|
|
# Merge one level deep
|
|
self.assertEqual(
|
|
{},
|
|
merge_permissions({'users': []}, {'groups': []}, {'world': []}))
|
|
self.assertEqual(
|
|
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}],
|
|
'groups': [{'group': 'manatees', 'methods': ['DELETE', 'GET']}],
|
|
'world': ['GET']},
|
|
merge_permissions(
|
|
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}]},
|
|
{'groups': [{'group': 'manatees', 'methods': ['DELETE', 'GET']}]},
|
|
{'world': ['GET']}))
|
|
|
|
# Merge two levels deep.
|
|
self.assertEqual(
|
|
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}],
|
|
'groups': [{'group': 'lions', 'methods': ['GET']},
|
|
{'group': 'manatees', 'methods': ['GET', 'POST', 'PUT']}],
|
|
'world': ['GET']},
|
|
merge_permissions(
|
|
{'users': [{'user': 'micak', 'methods': ['GET', 'PUT', 'POST']}],
|
|
'groups': [{'group': 'lions', 'methods': ['GET']}]},
|
|
{'groups': [{'group': 'manatees', 'methods': ['GET', 'PUT', 'POST']}]},
|
|
{'world': ['GET']}))
|
|
|
|
# Merge three levels deep
|
|
self.assertEqual(
|
|
{'users': [{'user': 'micak', 'methods': ['DELETE', 'GET', 'POST', 'PUT']}],
|
|
'groups': [{'group': 'lions', 'methods': ['GET', 'PUT', 'SCRATCH']},
|
|
{'group': 'manatees', 'methods': ['GET', 'POST', 'PUT']}],
|
|
'world': ['GET']},
|
|
merge_permissions(
|
|
{'users': [{'user': 'micak', 'methods': ['GET', 'PUT', 'POST']}],
|
|
'groups': [{'group': 'lions', 'methods': ['GET']},
|
|
{'group': 'manatees', 'methods': ['GET', 'PUT', 'POST']}],
|
|
'world': ['GET']},
|
|
{'users': [{'user': 'micak', 'methods': ['DELETE']}],
|
|
'groups': [{'group': 'lions', 'methods': ['GET', 'PUT', 'SCRATCH']}],
|
|
}
|
|
))
|
|
|
|
def sort(self, permissions):
|
|
"""Returns a sorted copy of the permissions."""
|
|
|
|
from pillar.api.utils.authorization import merge_permissions
|
|
return merge_permissions(permissions, {})
|
|
|
|
def test_effective_permissions(self):
|
|
from pillar.api.utils.authorization import compute_aggr_permissions
|
|
|
|
with self.app.test_request_context():
|
|
# Test project permissions.
|
|
self.assertEqual(
|
|
{
|
|
'groups': [{'group': ObjectId('5596e975ea893b269af85c0e'),
|
|
'methods': ['DELETE', 'GET', 'POST', 'PUT']}],
|
|
'world': ['GET']
|
|
},
|
|
self.sort(compute_aggr_permissions('projects', EXAMPLE_PROJECT, None)))
|
|
|
|
# Test node type permissions.
|
|
self.assertEqual(
|
|
{
|
|
'groups': [{'group': ObjectId('5596e975ea893b269af85c0e'),
|
|
'methods': ['DELETE', 'GET', 'POST', 'PUT']},
|
|
{'group': ObjectId('5596e975ea893b269af85c0f'),
|
|
'methods': ['GET']},
|
|
{'group': ObjectId('564733b56dcaf85da2faee8a'),
|
|
'methods': ['GET']}],
|
|
'world': ['GET']
|
|
},
|
|
self.sort(compute_aggr_permissions('projects', EXAMPLE_PROJECT, 'texture')))
|
|
|
|
with self.app.test_request_context():
|
|
# Test node permissions with non-existing project.
|
|
node = copy.deepcopy(EXAMPLE_NODE)
|
|
self.assertRaises(Forbidden, compute_aggr_permissions, 'nodes', node, None)
|
|
|
|
with self.app.test_request_context():
|
|
# Test node permissions without embedded project.
|
|
self.ensure_project_exists(project_overrides=EXAMPLE_PROJECT)
|
|
self.assertEqual(
|
|
{'groups': [{'group': ObjectId('5596e975ea893b269af85c0e'),
|
|
'methods': ['DELETE', 'GET', 'POST', 'PUT']},
|
|
{'group': ObjectId('5596e975ea893b269af85c0f'),
|
|
'methods': ['DELETE', 'GET']},
|
|
{'group': ObjectId('564733b56dcaf85da2faee8a'),
|
|
'methods': ['GET']}],
|
|
'world': ['GET']},
|
|
self.sort(compute_aggr_permissions('nodes', node, None)))
|
|
|
|
with self.app.test_request_context():
|
|
# Test node permissions with embedded project.
|
|
node = copy.deepcopy(EXAMPLE_NODE)
|
|
node['project'] = EXAMPLE_PROJECT
|
|
self.assertEqual(
|
|
{'groups': [{'group': ObjectId('5596e975ea893b269af85c0e'),
|
|
'methods': ['DELETE', 'GET', 'POST', 'PUT']},
|
|
{'group': ObjectId('5596e975ea893b269af85c0f'),
|
|
'methods': ['DELETE', 'GET']},
|
|
{'group': ObjectId('564733b56dcaf85da2faee8a'),
|
|
'methods': ['GET']}],
|
|
'world': ['GET']},
|
|
self.sort(compute_aggr_permissions('nodes', node, None)))
|
|
|
|
def test_delete_node(self):
|
|
self.enter_app_context()
|
|
|
|
proj_id, proj = self.ensure_project_exists()
|
|
self.create_user(user_id=24 * 'a', roles={'subscriber'},
|
|
groups=[ctd.EXAMPLE_PROJECT_OWNER_ID])
|
|
|
|
node = copy.deepcopy(ctd.EXAMPLE_NODE)
|
|
node['project'] = proj_id
|
|
node_id = self.create_node(node)
|
|
|
|
# Try deletion by a user who is not part of the project.
|
|
self.create_user(user_id=6 * 'dafe', roles={'subscriber'}, token='dafe-token')
|
|
self.delete(f'/api/nodes/{node_id}',
|
|
auth_token='dafe-token',
|
|
etag=node['_etag'],
|
|
expected_status=403)
|
|
|
|
found = self.app.db('nodes').find_one(node_id)
|
|
self.assertFalse(found.get('_deleted', False))
|
|
|
|
def test_delete_project(self):
|
|
self.enter_app_context()
|
|
|
|
proj_id, proj = self.ensure_project_exists()
|
|
self.create_user(user_id=24 * 'a', roles={'subscriber'},
|
|
groups=[ctd.EXAMPLE_PROJECT_OWNER_ID])
|
|
|
|
# Try deletion by a user who is not part of the project.
|
|
self.create_user(user_id=6 * 'dafe', roles={'subscriber'}, token='dafe-token')
|
|
self.delete(f'/api/projects/{proj_id}',
|
|
auth_token='dafe-token',
|
|
etag=proj['_etag'],
|
|
expected_status=403)
|
|
|
|
found = self.app.db('projects').find_one(proj_id)
|
|
self.assertIsNotNone(found)
|
|
self.assertFalse(found.get('_deleted', False))
|
|
|
|
|
|
class RequireRolesTest(AbstractPillarTest):
|
|
def test_no_roles_required(self):
|
|
from pillar.api.utils.authorization import require_login
|
|
|
|
called = [False]
|
|
|
|
@require_login()
|
|
def call_me():
|
|
called[0] = True
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), roles=['succubus'])
|
|
call_me()
|
|
|
|
self.assertTrue(called[0])
|
|
|
|
def test_some_roles_required(self):
|
|
from pillar.api.utils.authorization import require_login
|
|
|
|
called = [False]
|
|
|
|
@require_login(require_roles={'admin'})
|
|
def call_me():
|
|
called[0] = True
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
|
|
self.assertRaises(Forbidden, call_me)
|
|
self.assertFalse(called[0])
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['admin'])
|
|
call_me()
|
|
self.assertTrue(called[0])
|
|
|
|
def test_all_roles_required(self):
|
|
from pillar.api.utils.authorization import require_login
|
|
|
|
called = [False]
|
|
|
|
@require_login(require_roles={'service', 'badger'},
|
|
require_all=True)
|
|
def call_me():
|
|
called[0] = True
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['admin'])
|
|
self.assertRaises(Forbidden, call_me)
|
|
self.assertFalse(called[0])
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['service'])
|
|
self.assertRaises(Forbidden, call_me)
|
|
self.assertFalse(called[0])
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['badger'])
|
|
self.assertRaises(Forbidden, call_me)
|
|
self.assertFalse(called[0])
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['service', 'badger'])
|
|
call_me()
|
|
self.assertTrue(called[0])
|
|
|
|
def test_user_has_role(self):
|
|
from pillar.api.utils.authorization import user_has_role
|
|
|
|
def make_user(roles):
|
|
return self.create_user_object(ObjectId(), roles=roles)
|
|
|
|
with self.app.test_request_context():
|
|
self.assertTrue(user_has_role('subscriber', make_user(['aap', 'noot', 'subscriber'])))
|
|
self.assertTrue(user_has_role('subscriber', make_user(['aap', 'subscriber'])))
|
|
self.assertFalse(user_has_role('admin', make_user(['aap', 'noot', 'subscriber'])))
|
|
self.assertFalse(user_has_role('admin', make_user([])))
|
|
self.assertFalse(user_has_role('admin', make_user(None)))
|
|
self.assertFalse(user_has_role('admin', None))
|
|
|
|
def test_cap_required(self):
|
|
from pillar.api.utils.authorization import require_login
|
|
|
|
called = [False]
|
|
|
|
@require_login(require_cap='subscriber')
|
|
def call_me():
|
|
called[0] = True
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['succubus'])
|
|
self.assertRaises(Forbidden, call_me)
|
|
self.assertFalse(called[0])
|
|
|
|
with self.app.test_request_context():
|
|
self.login_api_as(ObjectId(24 * 'a'), ['demo'])
|
|
call_me()
|
|
self.assertTrue(called[0])
|
|
|
|
def test_invalid_combinations(self):
|
|
from pillar.api.utils.authorization import require_login
|
|
|
|
with self.assertRaises(TypeError):
|
|
require_login(require_roles=['abc', 'def'])
|
|
|
|
with self.assertRaises(TypeError):
|
|
require_login(require_cap={'multiple', 'caps'})
|
|
|
|
with self.assertRaises(ValueError):
|
|
require_login(require_roles=set(), require_all=True)
|
|
|
|
with self.assertRaises(ValueError):
|
|
require_login(require_roles={'admin'}, require_cap='hey')
|
|
|
|
|
|
class UserCreationTest(AbstractPillarTest):
|
|
@responses.activate
|
|
def test_create_by_auth(self):
|
|
"""Create user by authenticating against Blender ID."""
|
|
|
|
with self.app.test_request_context():
|
|
users_coll = self.app.db().users
|
|
self.assertEqual(0, users_coll.count())
|
|
|
|
self.mock_blenderid_validate_happy()
|
|
token = 'this is my life now'
|
|
self.get('/api/users/me', auth_token=token)
|
|
|
|
with self.app.test_request_context():
|
|
users_coll = self.app.db().users
|
|
self.assertEqual(1, users_coll.count())
|
|
|
|
db_user = users_coll.find()[0]
|
|
self.assertEqual(db_user['email'], TEST_EMAIL_ADDRESS)
|
|
|
|
@responses.activate
|
|
def test_create_by_auth_no_full_name(self):
|
|
"""Blender ID does not require full name, we do."""
|
|
|
|
with self.app.test_request_context():
|
|
users_coll = self.app.db().users
|
|
self.assertEqual(0, users_coll.count())
|
|
|
|
bid_resp = {'status': 'success',
|
|
'user': {'email': TEST_EMAIL_ADDRESS,
|
|
'full_name': '',
|
|
'id': ctd.BLENDER_ID_TEST_USERID},
|
|
'token_expires': 'Mon, 1 Jan 2218 01:02:03 GMT'}
|
|
|
|
responses.add(responses.POST,
|
|
'%s/u/validate_token' % self.app.config['BLENDER_ID_ENDPOINT'],
|
|
json=bid_resp,
|
|
status=200)
|
|
|
|
token = 'this is my life now'
|
|
self.get('/api/users/me', auth_token=token)
|
|
|
|
with self.app.test_request_context():
|
|
users_coll = self.app.db().users
|
|
self.assertEqual(1, users_coll.count())
|
|
|
|
db_user = users_coll.find()[0]
|
|
self.assertEqual(db_user['email'], TEST_EMAIL_ADDRESS)
|
|
self.assertNotEqual('', db_user['full_name'])
|
|
|
|
@responses.activate
|
|
def test_update_by_auth_no_full_name(self):
|
|
"""Blender ID does not require full name, we do."""
|
|
with self.app.app_context():
|
|
users_coll = self.app.db().users
|
|
self.assertEqual(0, users_coll.count())
|
|
|
|
# First request will create the user, the 2nd request will update.
|
|
self.mock_blenderid_validate_happy()
|
|
bid_resp = {'status': 'success',
|
|
'user': {'email': TEST_EMAIL_ADDRESS,
|
|
'full_name': '',
|
|
'id': ctd.BLENDER_ID_TEST_USERID},
|
|
'token_expires': 'Mon, 1 Jan 2218 01:02:03 GMT'}
|
|
responses.add(responses.POST,
|
|
'%s/u/validate_token' % self.app.config['BLENDER_ID_ENDPOINT'],
|
|
json=bid_resp,
|
|
status=200)
|
|
|
|
token = 'this is my life now'
|
|
self.get('/api/users/me', auth_token=token)
|
|
|
|
with self.app.app_context():
|
|
# Clear out the full name of the user. This could happen for some
|
|
# reason, and it shouldn't break the login flow.
|
|
users_coll.update_many({}, {'$set': {'full_name': ''}})
|
|
|
|
# Delete all tokens to force a re-check with Blender ID
|
|
tokens_coll = self.app.db('tokens')
|
|
tokens_coll.delete_many({})
|
|
|
|
self.get('/api/users/me', auth_token=token)
|
|
|
|
with self.app.app_context():
|
|
self.assertEqual(1, users_coll.count())
|
|
|
|
db_user = users_coll.find()[0]
|
|
self.assertEqual(db_user['email'], TEST_EMAIL_ADDRESS)
|
|
self.assertNotEqual('', db_user['full_name'])
|
|
|
|
def test_user_without_email_address(self):
|
|
"""Regular users should always have an email address.
|
|
|
|
Regular users are created by authentication with Blender ID, so we do not
|
|
have to test that (Blender ID ensures there is an email address). We do need
|
|
to test PUT access to erase the email address, though.
|
|
"""
|
|
|
|
from pillar.api.utils import remove_private_keys
|
|
|
|
user_id = self.create_user(24 * 'd', token='user-token')
|
|
|
|
with self.app.test_request_context():
|
|
users_coll = self.app.db().users
|
|
db_user = users_coll.find_one(user_id)
|
|
|
|
puttable = remove_private_keys(db_user)
|
|
|
|
empty_email = copy.deepcopy(puttable)
|
|
empty_email['email'] = ''
|
|
|
|
without_email = copy.deepcopy(puttable)
|
|
del without_email['email']
|
|
|
|
etag = db_user['_etag']
|
|
resp = self.put(f'/api/users/{user_id}', json=puttable, etag=etag,
|
|
auth_token='user-token', expected_status=200).json()
|
|
etag = resp['_etag']
|
|
self.put(f'/api/users/{user_id}', json=empty_email, etag=etag,
|
|
auth_token='user-token', expected_status=422)
|
|
self.put(f'/api/users/{user_id}', json=without_email, etag=etag,
|
|
auth_token='user-token', expected_status=422)
|
|
|
|
# An admin should be able to edit this user, but also not clear the email address.
|
|
self.create_user(24 * 'a', roles={'admin'}, token='admin-token')
|
|
resp = self.put(f'/api/users/{user_id}', json=puttable, etag=etag,
|
|
auth_token='admin-token', expected_status=200).json()
|
|
etag = resp['_etag']
|
|
self.put(f'/api/users/{user_id}', json=empty_email, etag=etag,
|
|
auth_token='admin-token', expected_status=422)
|
|
self.put(f'/api/users/{user_id}', json=without_email, etag=etag,
|
|
auth_token='admin-token', expected_status=422)
|
|
|
|
|
|
class CurrentUserTest(AbstractPillarTest):
|
|
def test_current_user_logged_in(self):
|
|
self.enter_app_context()
|
|
|
|
from flask import g
|
|
from pillar.auth import UserClass
|
|
from pillar.api.utils.authentication import current_user
|
|
|
|
with self.app.test_request_context():
|
|
g.current_user = UserClass.construct('the token', ctd.EXAMPLE_USER)
|
|
|
|
user = current_user()
|
|
self.assertIs(g.current_user, user)
|
|
|
|
def test_current_user_anonymous(self):
|
|
self.enter_app_context()
|
|
|
|
from flask import g
|
|
from pillar.auth import AnonymousUser
|
|
from pillar.api.utils.authentication import current_user
|
|
|
|
with self.app.test_request_context():
|
|
g.current_user = None
|
|
|
|
user = current_user()
|
|
self.assertIsInstance(user, AnonymousUser)
|
|
self.assertIsNone(user.user_id)
|
|
self.assertTrue(user.is_anonymous)
|
|
self.assertFalse(user.is_authenticated)
|