Introducing Pillar Framework
Refactor of pillar-server and pillar-web into a single python package. This simplifies the overall architecture of pillar applications. Special thanks @sybren and @venomgfx
This commit is contained in:
601
tests/test_api/test_auth.py
Normal file
601
tests/test_api/test_auth.py
Normal file
@@ -0,0 +1,601 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
|
||||
import pillar.tests.common_test_data as ctd
|
||||
import responses
|
||||
from bson import tz_util, ObjectId
|
||||
from pillar.tests import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
|
||||
from pillar.tests.common_test_data import EXAMPLE_NODE
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
PUBLIC_USER_FIELDS = {'full_name', 'email'}
|
||||
|
||||
# Use the example project with some additional permissions for these tests.
|
||||
EXAMPLE_PROJECT = copy.deepcopy(ctd.EXAMPLE_PROJECT)
|
||||
|
||||
_texture_nt = next(nt for nt in EXAMPLE_PROJECT['node_types']
|
||||
if nt['name'] == 'texture')
|
||||
_texture_nt['permissions']['groups'] = [
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'), u'methods': [u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'), u'methods': [u'GET']}, ]
|
||||
|
||||
_asset_nt = next(nt for nt in EXAMPLE_PROJECT['node_types']
|
||||
if nt['name'] == 'asset')
|
||||
_asset_nt['permissions']['groups'] = [
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'), u'methods': [u'DELETE', u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'), u'methods': [u'GET']}]
|
||||
|
||||
|
||||
class AuthenticationTests(AbstractPillarTest):
|
||||
def test_make_unique_username(self):
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
with self.app.test_request_context():
|
||||
# This user shouldn't exist yet.
|
||||
self.assertEqual(TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
||||
|
||||
# Add a user, then test again.
|
||||
auth.create_new_user(TEST_EMAIL_ADDRESS, TEST_EMAIL_USER, 'test1234')
|
||||
self.assertEqual('%s1' % TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
||||
|
||||
@responses.activate
|
||||
def test_validate_token__not_logged_in(self):
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
with self.app.test_request_context():
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
||||
@responses.activate
|
||||
def test_validate_token__unknown_token(self):
|
||||
"""Test validating of invalid token, unknown both to us and Blender ID."""
|
||||
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
self.mock_blenderid_validate_unhappy()
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('unknowntoken')}):
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
||||
@responses.activate
|
||||
def test_validate_token__unknown_but_valid_token(self):
|
||||
"""Test validating of valid token, unknown to us but known to Blender ID."""
|
||||
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
self.mock_blenderid_validate_happy()
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('knowntoken')}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
|
||||
@responses.activate
|
||||
def test_find_token(self):
|
||||
"""Test finding of various tokens."""
|
||||
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
user_id = self.create_user()
|
||||
|
||||
now = datetime.datetime.now(tz_util.utc)
|
||||
future = now + datetime.timedelta(days=1)
|
||||
past = now - datetime.timedelta(days=1)
|
||||
subclient = self.app.config['BLENDER_ID_SUBCLIENT_ID']
|
||||
|
||||
with self.app.test_request_context():
|
||||
auth.store_token(user_id, 'nonexpired-main', future, None)
|
||||
auth.store_token(user_id, 'nonexpired-sub', future, subclient)
|
||||
token3 = auth.store_token(user_id, 'expired-sub', past, subclient)
|
||||
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('nonexpired-main')}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('nonexpired-main', subclient)}):
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('nonexpired-sub')}):
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('nonexpired-sub', subclient)}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('expired-sub', subclient)}):
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
||||
self.mock_blenderid_validate_happy()
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('expired-sub', subclient)}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
|
||||
# We now should be able to find a new token for this user.
|
||||
found_token = auth.find_token('expired-sub', subclient)
|
||||
self.assertIsNotNone(found_token)
|
||||
self.assertNotEqual(token3['_id'], found_token['_id'])
|
||||
|
||||
@responses.activate
|
||||
def test_save_own_user(self):
|
||||
"""Tests that a user can't change their own fields."""
|
||||
|
||||
from pillar.api.utils import authentication as auth
|
||||
from pillar.api.utils import PillarJSONEncoder, remove_private_keys
|
||||
|
||||
user_id = self.create_user(roles=[u'subscriber'])
|
||||
|
||||
now = datetime.datetime.now(tz_util.utc)
|
||||
future = now + datetime.timedelta(days=1)
|
||||
|
||||
with self.app.test_request_context():
|
||||
auth.store_token(user_id, 'nonexpired-main', future, None)
|
||||
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('nonexpired-main')}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
|
||||
users = self.app.data.driver.db['users']
|
||||
db_user = users.find_one(user_id)
|
||||
|
||||
updated_fields = remove_private_keys(db_user)
|
||||
updated_fields['roles'] = ['admin', 'subscriber', 'demo'] # Try to elevate our roles.
|
||||
|
||||
# POSTing updated info to a specific user URL is not allowed by Eve.
|
||||
resp = self.client.post('/api/users/%s' % user_id,
|
||||
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
||||
headers={'Authorization': self.make_header('nonexpired-main'),
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(405, resp.status_code)
|
||||
|
||||
# PUT and PATCH should not be allowed.
|
||||
resp = self.client.put('/api/users/%s' % user_id,
|
||||
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
||||
headers={'Authorization': self.make_header('nonexpired-main'),
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
updated_fields = {'roles': ['admin', 'subscriber', 'demo']}
|
||||
resp = self.client.patch('/api/users/%s' % user_id,
|
||||
data=json.dumps(updated_fields, cls=PillarJSONEncoder),
|
||||
headers={'Authorization': self.make_header('nonexpired-main'),
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
# After all of this, the roles should be the same.
|
||||
with self.app.test_request_context(
|
||||
headers={'Authorization': self.make_header('nonexpired-main')}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
|
||||
users = self.app.data.driver.db['users']
|
||||
db_user = users.find_one(user_id)
|
||||
|
||||
self.assertEqual([u'subscriber'], db_user['roles'])
|
||||
|
||||
def test_token_expiry(self):
|
||||
"""Expired tokens should be deleted from the database."""
|
||||
|
||||
# Insert long-expired, almost-expired and not-expired token.
|
||||
user_id = self.create_user()
|
||||
now = datetime.datetime.now(tz_util.utc)
|
||||
|
||||
with self.app.test_request_context():
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
auth.store_token(user_id, 'long-expired',
|
||||
now - datetime.timedelta(days=365), None)
|
||||
auth.store_token(user_id, 'short-expired',
|
||||
now - datetime.timedelta(seconds=5), None)
|
||||
auth.store_token(user_id, 'not-expired',
|
||||
now + datetime.timedelta(days=1), None)
|
||||
|
||||
# Validation should clean up old tokens.
|
||||
auth.validate_this_token('je', 'moeder')
|
||||
|
||||
token_coll = self.app.data.driver.db['tokens']
|
||||
self.assertEqual({'short-expired', 'not-expired'},
|
||||
{item['token'] for item in token_coll.find()})
|
||||
|
||||
|
||||
class UserListTests(AbstractPillarTest):
|
||||
"""Security-related tests."""
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
super(UserListTests, self).setUp()
|
||||
|
||||
self.create_user(roles=[u'subscriber'], user_id='123456789abc123456789abc')
|
||||
self.create_user(roles=[u'admin'], user_id='223456789abc123456789abc')
|
||||
self.create_user(roles=[u'subscriber'], user_id='323456789abc123456789abc')
|
||||
|
||||
self.create_valid_auth_token('123456789abc123456789abc', 'token')
|
||||
self.create_valid_auth_token('223456789abc123456789abc', 'admin-token')
|
||||
self.create_valid_auth_token('323456789abc123456789abc', 'other-token')
|
||||
|
||||
def test_list_all_users_anonymous(self):
|
||||
# Listing all users should be forbidden
|
||||
resp = self.client.get('/api/users')
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_list_all_users_subscriber(self):
|
||||
# Regular access should result in only your own info.
|
||||
resp = self.client.get('/api/users', headers={'Authorization': self.make_header('token')})
|
||||
users = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual(1, users['_meta']['total'])
|
||||
|
||||
# The 'auth' section should be removed.
|
||||
user_info = users['_items'][0]
|
||||
self.assertNotIn('auth', user_info)
|
||||
|
||||
def test_list_all_users_admin(self):
|
||||
# Admin access should result in all users
|
||||
resp = self.client.get('/api/users', headers={'Authorization': self.make_header('admin-token')})
|
||||
users = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual(3, users['_meta']['total'])
|
||||
|
||||
# The 'auth' section should be removed.
|
||||
for user_info in users['_items']:
|
||||
self.assertNotIn('auth', user_info)
|
||||
|
||||
def test_list_all_users_admin_explicit_projection(self):
|
||||
# Admin access should result in all users
|
||||
projection = json.dumps({'auth': 1})
|
||||
resp = self.client.get('/api/users?projection=%s' % projection,
|
||||
headers={'Authorization': self.make_header('admin-token')})
|
||||
users = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual(3, users['_meta']['total'])
|
||||
|
||||
# The 'auth' section should be removed.
|
||||
for user_info in users['_items']:
|
||||
self.assertNotIn('auth', user_info)
|
||||
|
||||
def test_user_anonymous(self):
|
||||
from pillar.api.utils import remove_private_keys
|
||||
|
||||
# Getting a user should be limited to certain fields
|
||||
resp = self.client.get('/api/users/123456789abc123456789abc')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
user_info = json.loads(resp.data)
|
||||
regular_info = remove_private_keys(user_info)
|
||||
self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys()))
|
||||
|
||||
def test_own_user_subscriber(self):
|
||||
# Regular access should result in only your own info.
|
||||
resp = self.client.get('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
user_info = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertNotIn('auth', user_info)
|
||||
|
||||
def test_own_user_subscriber_explicit_projection(self):
|
||||
# With a custom projection requesting the auth list
|
||||
projection = json.dumps({'auth': 1})
|
||||
resp = self.client.get('/api/users/%s?projection=%s' % ('123456789abc123456789abc', projection),
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
user_info = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertNotIn('auth', user_info)
|
||||
|
||||
def test_other_user_subscriber(self):
|
||||
from pillar.api.utils import remove_private_keys
|
||||
|
||||
# Requesting another user should be limited to full name and email.
|
||||
resp = self.client.get('/api/users/%s' % '223456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
user_info = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertNotIn('auth', user_info)
|
||||
|
||||
regular_info = remove_private_keys(user_info)
|
||||
self.assertEqual(PUBLIC_USER_FIELDS, set(regular_info.keys()))
|
||||
|
||||
def test_put_user(self):
|
||||
from pillar.api.utils import remove_private_keys
|
||||
|
||||
# PUTting a user should work, and not mess up the auth field.
|
||||
resp = self.client.get('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
user_info = json.loads(resp.data)
|
||||
put_user = remove_private_keys(user_info)
|
||||
|
||||
resp = self.client.put('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': user_info['_etag']},
|
||||
data=json.dumps(put_user))
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Get directly from MongoDB, Eve blocks access to the auth field.
|
||||
with self.app.test_request_context():
|
||||
users = self.app.data.driver.db['users']
|
||||
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
||||
self.assertIn('auth', db_user)
|
||||
|
||||
def test_put_other_user(self):
|
||||
from pillar.api.utils import remove_private_keys
|
||||
|
||||
# PUTting the user as another user should fail.
|
||||
resp = self.client.get('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
user_info = json.loads(resp.data)
|
||||
put_user = remove_private_keys(user_info)
|
||||
|
||||
resp = self.client.put('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('other-token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': user_info['_etag']},
|
||||
data=json.dumps(put_user))
|
||||
self.assertEqual(403, resp.status_code, resp.data)
|
||||
|
||||
def test_put_admin(self):
|
||||
from pillar.api.utils import remove_private_keys
|
||||
|
||||
# PUTting a user should work, and not mess up the auth field.
|
||||
resp = self.client.get('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
user_info = json.loads(resp.data)
|
||||
put_user = remove_private_keys(user_info)
|
||||
|
||||
resp = self.client.put('/api/users/123456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('admin-token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': user_info['_etag']},
|
||||
data=json.dumps(put_user))
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Get directly from MongoDB, Eve blocks access to the auth field.
|
||||
with self.app.test_request_context():
|
||||
users = self.app.data.driver.db['users']
|
||||
db_user = users.find_one(ObjectId('123456789abc123456789abc'))
|
||||
self.assertIn('auth', db_user)
|
||||
|
||||
def test_post(self):
|
||||
"""POSTing to /users should fail for subscribers and admins alike."""
|
||||
|
||||
post_user = {
|
||||
'username': 'unique-user-name',
|
||||
'groups': [],
|
||||
'roles': ['subscriber'],
|
||||
'settings': {'email_communications': 1},
|
||||
'auth': [],
|
||||
'full_name': u'คนรักของผัดไทย',
|
||||
'email': TEST_EMAIL_ADDRESS,
|
||||
}
|
||||
|
||||
resp = self.client.post('/api/users',
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json'},
|
||||
data=json.dumps(post_user))
|
||||
self.assertEqual(405, resp.status_code, resp.data)
|
||||
|
||||
resp = self.client.post('/api/users',
|
||||
headers={'Authorization': self.make_header('admin-token'),
|
||||
'Content-Type': 'application/json'},
|
||||
data=json.dumps(post_user))
|
||||
self.assertEqual(405, resp.status_code, resp.data)
|
||||
|
||||
def test_delete(self):
|
||||
"""DELETING a user should fail for subscribers and admins alike."""
|
||||
|
||||
resp = self.client.delete('/api/users/323456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(405, resp.status_code, resp.data)
|
||||
|
||||
resp = self.client.delete('/api/users/323456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('admin-token')})
|
||||
self.assertEqual(405, resp.status_code, resp.data)
|
||||
|
||||
|
||||
class PermissionComputationTest(AbstractPillarTest):
|
||||
maxDiff = None
|
||||
|
||||
def test_merge_permissions(self):
|
||||
from pillar.api.utils.authorization import merge_permissions
|
||||
|
||||
with self.app.test_request_context():
|
||||
self.assertEqual({}, merge_permissions())
|
||||
self.assertEqual({}, merge_permissions({}))
|
||||
self.assertEqual({}, merge_permissions({}, {}, {}))
|
||||
|
||||
# Merge one level deep
|
||||
self.assertEqual(
|
||||
{},
|
||||
merge_permissions({'users': []}, {'groups': []}, {'world': []}))
|
||||
self.assertEqual(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'groups': [{'group': 'manatees', 'methods': ['DELETE', 'GET']}],
|
||||
'world': ['GET']},
|
||||
merge_permissions(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}]},
|
||||
{'groups': [{'group': 'manatees', 'methods': ['DELETE', 'GET']}]},
|
||||
{'world': ['GET']}))
|
||||
|
||||
# Merge two levels deep.
|
||||
self.assertEqual(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET']},
|
||||
{'group': 'manatees', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'world': ['GET']},
|
||||
merge_permissions(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'PUT', 'POST']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET']}]},
|
||||
{'groups': [{'group': 'manatees', 'methods': ['GET', 'PUT', 'POST']}]},
|
||||
{'world': ['GET']}))
|
||||
|
||||
# Merge three levels deep
|
||||
self.assertEqual(
|
||||
{'users': [{'user': 'micak', 'methods': ['DELETE', 'GET', 'POST', 'PUT']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET', 'PUT', 'SCRATCH']},
|
||||
{'group': 'manatees', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'world': ['GET']},
|
||||
merge_permissions(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'PUT', 'POST']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET']},
|
||||
{'group': 'manatees', 'methods': ['GET', 'PUT', 'POST']}],
|
||||
'world': ['GET']},
|
||||
{'users': [{'user': 'micak', 'methods': ['DELETE']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET', 'PUT', 'SCRATCH']}],
|
||||
}
|
||||
))
|
||||
|
||||
def sort(self, permissions):
|
||||
"""Returns a sorted copy of the permissions."""
|
||||
|
||||
from pillar.api.utils.authorization import merge_permissions
|
||||
return merge_permissions(permissions, {})
|
||||
|
||||
def test_effective_permissions(self):
|
||||
from pillar.api.utils.authorization import compute_aggr_permissions
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Test project permissions.
|
||||
self.assertEqual(
|
||||
{
|
||||
u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'DELETE', u'GET', u'POST', u'PUT']}],
|
||||
u'world': [u'GET']
|
||||
},
|
||||
self.sort(compute_aggr_permissions('projects', EXAMPLE_PROJECT, None)))
|
||||
|
||||
# Test node type permissions.
|
||||
self.assertEqual(
|
||||
{
|
||||
u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'DELETE', u'GET', u'POST', u'PUT']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'world': [u'GET']
|
||||
},
|
||||
self.sort(compute_aggr_permissions('projects', EXAMPLE_PROJECT, 'texture')))
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Test node permissions with non-existing project.
|
||||
node = copy.deepcopy(EXAMPLE_NODE)
|
||||
self.assertRaises(Forbidden, compute_aggr_permissions, 'nodes', node, None)
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Test node permissions without embedded project.
|
||||
self.ensure_project_exists(project_overrides=EXAMPLE_PROJECT)
|
||||
self.assertEqual(
|
||||
{u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'DELETE', u'GET', u'POST', u'PUT']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'DELETE', u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'world': [u'GET']},
|
||||
self.sort(compute_aggr_permissions('nodes', node, None)))
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Test node permissions with embedded project.
|
||||
node = copy.deepcopy(EXAMPLE_NODE)
|
||||
node['project'] = EXAMPLE_PROJECT
|
||||
self.assertEqual(
|
||||
{u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'DELETE', u'GET', u'POST', u'PUT']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'DELETE', u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'world': [u'GET']},
|
||||
self.sort(compute_aggr_permissions('nodes', node, None)))
|
||||
|
||||
|
||||
class RequireRolesTest(AbstractPillarTest):
|
||||
def test_no_roles_required(self):
|
||||
from flask import g
|
||||
from pillar.api.utils.authorization import require_login
|
||||
|
||||
called = [False]
|
||||
|
||||
@require_login()
|
||||
def call_me():
|
||||
called[0] = True
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'succubus']}
|
||||
call_me()
|
||||
|
||||
self.assertTrue(called[0])
|
||||
|
||||
def test_some_roles_required(self):
|
||||
from flask import g
|
||||
from pillar.api.utils.authorization import require_login
|
||||
|
||||
called = [False]
|
||||
|
||||
@require_login(require_roles={u'admin'})
|
||||
def call_me():
|
||||
called[0] = True
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'succubus']}
|
||||
self.assertRaises(Forbidden, call_me)
|
||||
self.assertFalse(called[0])
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'admin']}
|
||||
call_me()
|
||||
self.assertTrue(called[0])
|
||||
|
||||
def test_all_roles_required(self):
|
||||
from flask import g
|
||||
from pillar.api.utils.authorization import require_login
|
||||
|
||||
called = [False]
|
||||
|
||||
@require_login(require_roles={u'service', u'badger'},
|
||||
require_all=True)
|
||||
def call_me():
|
||||
called[0] = True
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'admin']}
|
||||
self.assertRaises(Forbidden, call_me)
|
||||
self.assertFalse(called[0])
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'service']}
|
||||
self.assertRaises(Forbidden, call_me)
|
||||
self.assertFalse(called[0])
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'badger']}
|
||||
self.assertRaises(Forbidden, call_me)
|
||||
self.assertFalse(called[0])
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': ObjectId(24 * 'a'),
|
||||
'roles': [u'service', u'badger']}
|
||||
call_me()
|
||||
self.assertTrue(called[0])
|
||||
|
||||
def test_user_has_role(self):
|
||||
from pillar.api.utils.authorization import user_has_role
|
||||
|
||||
with self.app.test_request_context():
|
||||
self.assertTrue(user_has_role('subscriber', {'roles': ['aap', 'noot', 'subscriber']}))
|
||||
self.assertTrue(user_has_role('subscriber', {'roles': [u'aap', u'subscriber']}))
|
||||
self.assertFalse(user_has_role('admin', {'roles': [u'aap', u'noot', u'subscriber']}))
|
||||
self.assertFalse(user_has_role('admin', {'roles': []}))
|
||||
self.assertFalse(user_has_role('admin', {'roles': None}))
|
||||
self.assertFalse(user_has_role('admin', {}))
|
595
tests/test_api/test_bcloud_home_project.py
Normal file
595
tests/test_api/test_bcloud_home_project.py
Normal file
@@ -0,0 +1,595 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
"""Unit tests for the Blender Cloud home project module."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import responses
|
||||
from bson import ObjectId
|
||||
from flask import url_for
|
||||
from pillar.tests import AbstractPillarTest, TEST_EMAIL_ADDRESS
|
||||
from werkzeug import exceptions as wz_exceptions
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AbstractHomeProjectTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
self.create_standard_groups()
|
||||
|
||||
def _create_user_with_token(self, roles, token, user_id='cafef00df00df00df00df00d'):
|
||||
"""Creates a user directly in MongoDB, so that it doesn't trigger any Eve hooks.
|
||||
|
||||
Adds the 'homeproject' role too, which we need to get past the AB-testing.
|
||||
"""
|
||||
user_id = self.create_user(roles=roles.union({u'homeproject'}), user_id=user_id)
|
||||
self.create_valid_auth_token(user_id, token)
|
||||
return user_id
|
||||
|
||||
|
||||
class HomeProjectTest(AbstractHomeProjectTest):
|
||||
def test_create_home_project(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
from pillar.api.utils.authentication import validate_token
|
||||
|
||||
user_id = self._create_user_with_token(roles={u'subscriber'}, token='token')
|
||||
|
||||
# Test home project creation
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
validate_token()
|
||||
|
||||
proj = home_project.create_home_project(user_id, write_access=True)
|
||||
self.assertEqual('home', proj['category'])
|
||||
self.assertEqual({u'group', u'asset', u'comment'},
|
||||
set(nt['name'] for nt in proj['node_types']))
|
||||
|
||||
endpoint = url_for('blender_cloud.home_project.home_project')
|
||||
db_proj = self.app.data.driver.db['projects'].find_one(proj['_id'])
|
||||
|
||||
# Test availability at end-point
|
||||
resp = self.client.get(endpoint)
|
||||
# While we're still AB-testing, unauthenticated users should get a 404.
|
||||
# When that's over, it should result in a 403.
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
resp = self.client.get(endpoint, headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_proj = json.loads(resp.data)
|
||||
self.assertEqual(ObjectId(json_proj['_id']), proj['_id'])
|
||||
self.assertEqual(json_proj['_etag'], db_proj['_etag'])
|
||||
|
||||
@responses.activate
|
||||
def test_autocreate_home_project_with_subscriber_role(self):
|
||||
# Implicitly create user by token validation.
|
||||
self.mock_blenderid_validate_happy()
|
||||
resp = self.client.get('/api/users/me', headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp)
|
||||
|
||||
# Grant subscriber and homeproject roles, and fetch the home project.
|
||||
self.badger(TEST_EMAIL_ADDRESS, {'subscriber', 'homeproject'}, 'grant')
|
||||
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_proj = json.loads(resp.data)
|
||||
self.assertEqual('home', json_proj['category'])
|
||||
self.assertEqual('home', json_proj['url'])
|
||||
|
||||
# Check that a Blender Sync node was created automatically.
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
nodes_coll = self.app.data.driver.db['nodes']
|
||||
node = nodes_coll.find_one({
|
||||
'project': ObjectId(json_proj['_id']),
|
||||
'node_type': 'group',
|
||||
'name': 'Blender Sync',
|
||||
})
|
||||
self.assertIsNotNone(node)
|
||||
|
||||
@responses.activate
|
||||
def test_autocreate_home_project_with_demo_role(self):
|
||||
# Implicitly create user by token validation.
|
||||
self.mock_blenderid_validate_happy()
|
||||
resp = self.client.get('/api/users/me', headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp)
|
||||
|
||||
# Grant demo and homeproject role, which should allow creation of the home project.
|
||||
self.badger(TEST_EMAIL_ADDRESS, {'demo', 'homeproject'}, 'grant')
|
||||
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_proj = json.loads(resp.data)
|
||||
self.assertEqual('home', json_proj['category'])
|
||||
self.assertEqual('home', json_proj['url'])
|
||||
|
||||
# Check that a Blender Sync node was created automatically.
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
nodes_coll = self.app.data.driver.db['nodes']
|
||||
node = nodes_coll.find_one({
|
||||
'project': ObjectId(json_proj['_id']),
|
||||
'node_type': 'group',
|
||||
'name': 'Blender Sync',
|
||||
})
|
||||
self.assertIsNotNone(node)
|
||||
|
||||
@responses.activate
|
||||
def test_autocreate_home_project_with_succubus_role(self):
|
||||
from pillar.api.utils import dumps
|
||||
|
||||
# Implicitly create user by token validation.
|
||||
self.mock_blenderid_validate_happy()
|
||||
resp = self.client.get('/api/users/me', headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
user_id = ObjectId(json.loads(resp.data)['_id'])
|
||||
|
||||
# Grant succubus role, which should allow creation of a read-only home project.
|
||||
self.badger(TEST_EMAIL_ADDRESS, {'succubus', 'homeproject'}, 'grant')
|
||||
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_proj = json.loads(resp.data)
|
||||
self.assertEqual('home', json_proj['category'])
|
||||
self.assertEqual('home', json_proj['url'])
|
||||
|
||||
# Check that the admin group of the project only has GET permissions.
|
||||
self.assertEqual({'GET'}, set(json_proj['permissions']['groups'][0]['methods']))
|
||||
project_id = ObjectId(json_proj['_id'])
|
||||
admin_group_id = json_proj['permissions']['groups'][0]['group']
|
||||
|
||||
# Check that a Blender Sync node was created automatically.
|
||||
expected_node_permissions = {u'users': [],
|
||||
u'groups': [
|
||||
{u'group': ObjectId(admin_group_id),
|
||||
u'methods': [u'GET', u'PUT', u'POST', u'DELETE']}, ],
|
||||
u'world': []}
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
nodes_coll = self.app.data.driver.db['nodes']
|
||||
node = nodes_coll.find_one({
|
||||
'project': project_id,
|
||||
'node_type': 'group',
|
||||
'name': 'Blender Sync',
|
||||
})
|
||||
self.assertIsNotNone(node)
|
||||
|
||||
# Check that the node itself has write permissions for the admin group.
|
||||
node_perms = node['permissions']
|
||||
self.assertEqual(node_perms, expected_node_permissions)
|
||||
sync_node_id = node['_id']
|
||||
|
||||
# Check that we can create a group node inside the sync node.
|
||||
sub_sync_node = {'project': project_id,
|
||||
'node_type': 'group',
|
||||
'parent': sync_node_id,
|
||||
'name': '2.77',
|
||||
'user': user_id,
|
||||
'description': 'Sync folder for Blender 2.77',
|
||||
'properties': {'status': 'published'},
|
||||
}
|
||||
resp = self.client.post('/api/nodes', data=dumps(sub_sync_node),
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json'}
|
||||
)
|
||||
self.assertEqual(201, resp.status_code, resp.data)
|
||||
sub_node_info = json.loads(resp.data)
|
||||
|
||||
# Check the explicit node-level permissions are copied.
|
||||
# These aren't returned by the POST to Eve, so we have to check them in the DB manually.
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
nodes_coll = self.app.data.driver.db['nodes']
|
||||
sub_node = nodes_coll.find_one(ObjectId(sub_node_info['_id']))
|
||||
|
||||
node_perms = sub_node['permissions']
|
||||
self.assertEqual(node_perms, expected_node_permissions)
|
||||
|
||||
def test_has_home_project(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
from pillar.api.utils.authentication import validate_token
|
||||
|
||||
user_id = self._create_user_with_token(roles={u'subscriber'}, token='token')
|
||||
|
||||
# Test home project creation
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
validate_token()
|
||||
|
||||
self.assertFalse(home_project.has_home_project(user_id))
|
||||
proj = home_project.create_home_project(user_id, write_access=True)
|
||||
self.assertTrue(home_project.has_home_project(user_id))
|
||||
|
||||
# Delete the project.
|
||||
resp = self.client.delete('/api/projects/%s' % proj['_id'],
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'If-Match': proj['_etag']})
|
||||
self.assertEqual(204, resp.status_code, resp.data)
|
||||
self.assertFalse(home_project.has_home_project(user_id))
|
||||
|
||||
@responses.activate
|
||||
def test_home_project_projections(self):
|
||||
"""Getting the home project should support projections."""
|
||||
|
||||
# Implicitly create user by token validation.
|
||||
self.mock_blenderid_validate_happy()
|
||||
resp = self.client.get('/api/users/me', headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp)
|
||||
|
||||
# Grant subscriber role, and fetch the home project.
|
||||
self.badger(TEST_EMAIL_ADDRESS, {'subscriber', 'homeproject'}, 'grant')
|
||||
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
query_string={'projection': json.dumps(
|
||||
{'permissions': 1,
|
||||
'category': 1,
|
||||
'user': 1})},
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
json_proj = json.loads(resp.data)
|
||||
self.assertNotIn('name', json_proj)
|
||||
self.assertNotIn('node_types', json_proj)
|
||||
self.assertEqual('home', json_proj['category'])
|
||||
|
||||
@responses.activate
|
||||
def test_home_project_url(self):
|
||||
"""The home project should have 'home' as URL."""
|
||||
|
||||
# Implicitly create user by token validation.
|
||||
self.mock_blenderid_validate_happy()
|
||||
resp = self.client.get('/api/users/me', headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp)
|
||||
|
||||
# Grant subscriber role, and fetch the home project.
|
||||
self.badger(TEST_EMAIL_ADDRESS, {'subscriber', 'homeproject'}, 'grant')
|
||||
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
json_proj = json.loads(resp.data)
|
||||
self.assertEqual('home', json_proj['url'])
|
||||
|
||||
@responses.activate
|
||||
def test_multiple_users_with_home_project(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
from pillar.api.utils.authentication import validate_token
|
||||
|
||||
uid1 = self._create_user_with_token(roles={u'subscriber'}, token='token1', user_id=24 * 'a')
|
||||
uid2 = self._create_user_with_token(roles={u'subscriber'}, token='token2', user_id=24 * 'b')
|
||||
|
||||
# Create home projects
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token1')}):
|
||||
validate_token()
|
||||
proj1 = home_project.create_home_project(uid1, write_access=True)
|
||||
db_proj1 = self.app.data.driver.db['projects'].find_one(proj1['_id'])
|
||||
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token2')}):
|
||||
validate_token()
|
||||
proj2 = home_project.create_home_project(uid2, write_access=True)
|
||||
db_proj2 = self.app.data.driver.db['projects'].find_one(proj2['_id'])
|
||||
|
||||
# Test availability at end-point
|
||||
resp1 = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token1')})
|
||||
resp2 = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token2')})
|
||||
self.assertEqual(200, resp1.status_code)
|
||||
self.assertEqual(200, resp2.status_code)
|
||||
|
||||
json_proj1 = json.loads(resp1.data)
|
||||
json_proj2 = json.loads(resp2.data)
|
||||
|
||||
self.assertEqual(ObjectId(json_proj1['_id']), proj1['_id'])
|
||||
self.assertEqual(ObjectId(json_proj2['_id']), proj2['_id'])
|
||||
self.assertEqual(json_proj1['_etag'], db_proj1['_etag'])
|
||||
self.assertEqual(json_proj2['_etag'], db_proj2['_etag'])
|
||||
self.assertNotEqual(db_proj1['_etag'], db_proj2['_etag'])
|
||||
self.assertNotEqual(db_proj1['_id'], db_proj2['_id'])
|
||||
|
||||
def test_delete_restore(self):
|
||||
"""Deleting and then recreating a home project should restore the deleted project."""
|
||||
|
||||
self._create_user_with_token(roles={u'subscriber'}, token='token')
|
||||
|
||||
# Create home project by getting it.
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
before_delete_json_proj = json.loads(resp.data)
|
||||
|
||||
# Delete the project.
|
||||
resp = self.client.delete('/api/projects/%s' % before_delete_json_proj['_id'],
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'If-Match': before_delete_json_proj['_etag']})
|
||||
self.assertEqual(204, resp.status_code, resp.data)
|
||||
|
||||
# Recreate home project by getting it.
|
||||
resp = self.client.get('/api/bcloud/home-project',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
after_delete_json_proj = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(before_delete_json_proj['_id'],
|
||||
after_delete_json_proj['_id'])
|
||||
|
||||
|
||||
class HomeProjectUserChangedRoleTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
self.create_standard_groups()
|
||||
|
||||
def test_without_home_project(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
|
||||
self.user_id = self.create_user()
|
||||
|
||||
with self.app.test_request_context():
|
||||
changed = home_project.user_changed_role(None, {'_id': self.user_id})
|
||||
self.assertFalse(changed)
|
||||
|
||||
# Shouldn't do anything, shouldn't crash either.
|
||||
|
||||
def test_already_subscriber_role(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
from pillar.api.utils.authentication import validate_token
|
||||
|
||||
self.user_id = self.create_user(roles=set('subscriber'))
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
validate_token()
|
||||
|
||||
home_proj = home_project.create_home_project(self.user_id, write_access=True)
|
||||
changed = home_project.user_changed_role(None, {'_id': self.user_id,
|
||||
'roles': ['subscriber']})
|
||||
self.assertFalse(changed)
|
||||
|
||||
# The home project should still be writable, so we should be able to create a node.
|
||||
self.create_test_node(home_proj['_id'])
|
||||
|
||||
def test_granting_subscriber_role(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
from pillar.api.utils.authentication import validate_token
|
||||
|
||||
self.user_id = self.create_user(roles=set())
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
validate_token()
|
||||
|
||||
home_proj = home_project.create_home_project(self.user_id, write_access=False)
|
||||
changed = home_project.user_changed_role(None, {'_id': self.user_id,
|
||||
'roles': ['subscriber']})
|
||||
self.assertTrue(changed)
|
||||
|
||||
# The home project should be writable, so we should be able to create a node.
|
||||
self.create_test_node(home_proj['_id'])
|
||||
|
||||
def test_revoking_subscriber_role(self):
|
||||
from pillar.api.blender_cloud import home_project
|
||||
from pillar.api.utils.authentication import validate_token
|
||||
|
||||
self.user_id = self.create_user(roles=set('subscriber'))
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
|
||||
with self.app.test_request_context(headers={'Authorization': self.make_header('token')}):
|
||||
validate_token()
|
||||
|
||||
home_proj = home_project.create_home_project(self.user_id, write_access=True)
|
||||
changed = home_project.user_changed_role(None, {'_id': self.user_id,
|
||||
'roles': []})
|
||||
self.assertTrue(changed)
|
||||
|
||||
# The home project should NOT be writable, so we should NOT be able to create a node.
|
||||
self.create_test_node(home_proj['_id'], 403)
|
||||
|
||||
def create_test_node(self, project_id, status_code=201):
|
||||
from pillar.api.utils import dumps
|
||||
|
||||
node = {
|
||||
'project': project_id,
|
||||
'node_type': 'group',
|
||||
'name': 'test group node',
|
||||
'user': self.user_id,
|
||||
'properties': {},
|
||||
}
|
||||
|
||||
resp = self.client.post('/api/nodes', data=dumps(node),
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(status_code, resp.status_code, resp.data)
|
||||
|
||||
|
||||
class TextureLibraryTest(AbstractHomeProjectTest):
|
||||
def create_node(self, node_doc):
|
||||
with self.app.test_request_context():
|
||||
nodes_coll = self.app.data.driver.db['nodes']
|
||||
result = nodes_coll.insert_one(node_doc)
|
||||
return result.inserted_id
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
AbstractHomeProjectTest.setUp(self, **kwargs)
|
||||
|
||||
user_id = self._create_user_with_token(set(), 'token')
|
||||
self.hdri_proj_id, proj = self.ensure_project_exists(project_overrides={'_id': 24 * 'a'})
|
||||
self.tex_proj_id, proj2 = self.ensure_project_exists(project_overrides={'_id': 24 * 'b'})
|
||||
|
||||
self.create_node({'description': '',
|
||||
'project': self.hdri_proj_id,
|
||||
'node_type': 'group_hdri',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': '',
|
||||
'files': ''},
|
||||
'name': 'HDRi test node'}
|
||||
)
|
||||
|
||||
self.create_node({'description': '',
|
||||
'project': self.tex_proj_id,
|
||||
'node_type': 'group_texture',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': '',
|
||||
'files': ''},
|
||||
'name': 'Group texture test node'}
|
||||
)
|
||||
|
||||
def test_blender_cloud_addon_version(self):
|
||||
from pillar.api.blender_cloud import blender_cloud_addon_version
|
||||
|
||||
# Three-digit version
|
||||
with self.app.test_request_context(headers={'Blender-Cloud-Addon': '1.3.3'}):
|
||||
self.assertEqual((1, 3, 3), blender_cloud_addon_version())
|
||||
|
||||
# Two-digit version
|
||||
with self.app.test_request_context(headers={'Blender-Cloud-Addon': '1.5'}):
|
||||
self.assertEqual((1, 5), blender_cloud_addon_version())
|
||||
|
||||
# No version
|
||||
with self.app.test_request_context():
|
||||
self.assertEqual(None, blender_cloud_addon_version())
|
||||
|
||||
# Malformed version
|
||||
with self.app.test_request_context(headers={'Blender-Cloud-Addon': 'je moeder'}):
|
||||
self.assertRaises(wz_exceptions.BadRequest, blender_cloud_addon_version)
|
||||
|
||||
def test_hdri_library__no_bcloud_version(self):
|
||||
resp = self.get('/api/bcloud/texture-libraries', auth_token='token')
|
||||
libs = resp.json()['_items']
|
||||
library_project_ids = {proj['_id'] for proj in libs}
|
||||
|
||||
self.assertNotIn(unicode(self.hdri_proj_id), library_project_ids)
|
||||
self.assertIn(unicode(self.tex_proj_id), library_project_ids)
|
||||
|
||||
def test_hdri_library__old_bcloud_addon(self):
|
||||
resp = self.get('/api/bcloud/texture-libraries',
|
||||
auth_token='token',
|
||||
headers={'Blender-Cloud-Addon': '1.3.3'})
|
||||
libs = resp.json()['_items']
|
||||
library_project_ids = {proj['_id'] for proj in libs}
|
||||
self.assertNotIn(unicode(self.hdri_proj_id), library_project_ids)
|
||||
self.assertIn(unicode(self.tex_proj_id), library_project_ids)
|
||||
|
||||
def test_hdri_library__new_bcloud_addon(self):
|
||||
resp = self.get('/api/bcloud/texture-libraries',
|
||||
auth_token='token',
|
||||
headers={'Blender-Cloud-Addon': '1.4.0'})
|
||||
libs = resp.json()['_items']
|
||||
library_project_ids = {proj['_id'] for proj in libs}
|
||||
self.assertIn(unicode(self.hdri_proj_id), library_project_ids)
|
||||
self.assertIn(unicode(self.tex_proj_id), library_project_ids)
|
||||
|
||||
|
||||
class HdriSortingTest(AbstractHomeProjectTest):
|
||||
def setUp(self, **kwargs):
|
||||
from pillar.api.node_types.hdri import node_type_hdri
|
||||
|
||||
super(HdriSortingTest, self).setUp(**kwargs)
|
||||
|
||||
self.user_id = self._create_user_with_token({u'subscriber'}, 'token')
|
||||
self.hdri_proj_id, proj = self.ensure_project_exists(project_overrides={
|
||||
'user': self.user_id,
|
||||
'permissions': {'world': ['DELETE', 'GET', 'POST', 'PUT']},
|
||||
'node_types': [node_type_hdri],
|
||||
})
|
||||
self.hdri_proj_id = ObjectId(self.hdri_proj_id)
|
||||
self.assertIsNotNone(self.hdri_proj_id)
|
||||
|
||||
# Add some test files.
|
||||
with self.app.test_request_context():
|
||||
files_coll = self.app.data.driver.db['files']
|
||||
self.file_256p = files_coll.insert_one({
|
||||
'name': '96f1adf5330a4cadbf73c13d718ac163.hdr',
|
||||
'format': 'radiance-hdr',
|
||||
'filename': 'symmetrical_garden_256p.hdr',
|
||||
'project': self.hdri_proj_id,
|
||||
'length': 106435,
|
||||
'user': self.user_id,
|
||||
'content_type': 'application/octet-stream',
|
||||
'file_path': '96f1adf5330a4cadbf73c13d718ac163.hdr'}).inserted_id
|
||||
self.file_1k = files_coll.insert_one({
|
||||
'name': '96f1adf5330a4cadbf73qweqwecqwev142v.hdr',
|
||||
'format': 'radiance-hdr',
|
||||
'filename': 'symmetrical_garden_1k.hdr',
|
||||
'project': self.hdri_proj_id,
|
||||
'length': 1431435,
|
||||
'user': self.user_id,
|
||||
'content_type': 'application/octet-stream',
|
||||
'file_path': 'd13rfc13r1evadbf73c13d718ac163.hdr'}).inserted_id
|
||||
self.file_2k = files_coll.insert_one({
|
||||
'name': '34134df5330a4cadbf73qweqwecqwev142v.hdr',
|
||||
'format': 'radiance-hdr',
|
||||
'filename': 'symmetrical_garden_2k.hdr',
|
||||
'project': self.hdri_proj_id,
|
||||
'length': 2431435,
|
||||
'user': self.user_id,
|
||||
'content_type': 'application/octet-stream',
|
||||
'file_path': 'd13rfc13r1evadbf73c13d718ac163.hdr'}).inserted_id
|
||||
|
||||
def test_hdri_sorting_on_create(self):
|
||||
# Create node with files in wrong order
|
||||
node = {'project': self.hdri_proj_id,
|
||||
'node_type': 'hdri',
|
||||
'user': self.user_id,
|
||||
'properties': {
|
||||
'files': [
|
||||
{'resolution': '2k', 'file': self.file_2k},
|
||||
{'resolution': '256p', 'file': self.file_256p},
|
||||
{'resolution': '1k', 'file': self.file_1k},
|
||||
],
|
||||
'status': 'published',
|
||||
},
|
||||
'name': 'Symmetrical Garden'
|
||||
}
|
||||
resp = self.post('/api/nodes', json=node, expected_status=201, auth_token='token')
|
||||
node_info = resp.json()
|
||||
|
||||
# Check that the node's files are in the right order
|
||||
resp = self.get('/api/nodes/%s' % node_info['_id'], auth_token='token')
|
||||
get_node = resp.json()
|
||||
|
||||
self.assertEqual(['256p', '1k', '2k'],
|
||||
[file_info['resolution']
|
||||
for file_info in get_node['properties']['files']])
|
||||
|
||||
def test_hdri_sorting_on_update(self):
|
||||
# Create node with files in correct order
|
||||
node = {'project': self.hdri_proj_id,
|
||||
'node_type': 'hdri',
|
||||
'user': self.user_id,
|
||||
'properties': {
|
||||
'files': [
|
||||
{'resolution': '256p', 'file': self.file_256p},
|
||||
{'resolution': '1k', 'file': self.file_1k},
|
||||
{'resolution': '2k', 'file': self.file_2k},
|
||||
],
|
||||
'status': 'published',
|
||||
},
|
||||
'name': 'Symmetrical Garden'
|
||||
}
|
||||
resp = self.post('/api/nodes', json=node, expected_status=201, auth_token='token')
|
||||
node_info = resp.json()
|
||||
|
||||
# Mess up the node's order
|
||||
node['properties']['files'] = [
|
||||
{'resolution': '2k', 'file': self.file_2k},
|
||||
{'resolution': '1k', 'file': self.file_1k},
|
||||
{'resolution': '256p', 'file': self.file_256p},
|
||||
]
|
||||
self.put('/api/nodes/%s' % node_info['_id'], json=node, auth_token='token',
|
||||
headers={'If-Match': node_info['_etag']})
|
||||
|
||||
# Check that the node's files are in the right order
|
||||
resp = self.get('/api/nodes/%s' % node_info['_id'], auth_token='token')
|
||||
get_node = resp.json()
|
||||
|
||||
self.assertEqual(['256p', '1k', '2k'],
|
||||
[file_info['resolution']
|
||||
for file_info in get_node['properties']['files']])
|
109
tests/test_api/test_blender_id_subclient.py
Normal file
109
tests/test_api/test_blender_id_subclient.py
Normal file
@@ -0,0 +1,109 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import json
|
||||
|
||||
import responses
|
||||
from bson import ObjectId
|
||||
from flask import g
|
||||
from pillar.tests import (AbstractPillarTest, TEST_EMAIL_ADDRESS, BLENDER_ID_TEST_USERID,
|
||||
TEST_SUBCLIENT_TOKEN, TEST_EMAIL_USER, TEST_FULL_NAME)
|
||||
|
||||
|
||||
class BlenderIdSubclientTest(AbstractPillarTest):
|
||||
@responses.activate
|
||||
def test_store_scst_new_user(self):
|
||||
self._common_user_test(201)
|
||||
|
||||
@responses.activate
|
||||
def test_store_scst_new_user_without_full_name(self):
|
||||
|
||||
responses.add(responses.POST,
|
||||
'%s/u/validate_token' % self.app.config['BLENDER_ID_ENDPOINT'],
|
||||
json={'status': 'success',
|
||||
'user': {'email': TEST_EMAIL_ADDRESS,
|
||||
'full_name': None,
|
||||
'id': BLENDER_ID_TEST_USERID},
|
||||
'token_expires': 'Mon, 1 Jan 2218 01:02:03 GMT'},
|
||||
status=200)
|
||||
|
||||
self._common_user_test(201,
|
||||
expected_full_name=TEST_EMAIL_USER,
|
||||
mock_happy_blender_id=False)
|
||||
|
||||
@responses.activate
|
||||
def test_store_scst_existing_user(self):
|
||||
# Make sure the user exists in our database.
|
||||
from pillar.api.utils.authentication import create_new_user
|
||||
with self.app.test_request_context():
|
||||
create_new_user(TEST_EMAIL_ADDRESS, 'apekoppie', BLENDER_ID_TEST_USERID)
|
||||
|
||||
self._common_user_test(200, expected_full_name='apekoppie')
|
||||
|
||||
@responses.activate
|
||||
def test_store_multiple_tokens(self):
|
||||
scst1 = '%s-1' % TEST_SUBCLIENT_TOKEN
|
||||
scst2 = '%s-2' % TEST_SUBCLIENT_TOKEN
|
||||
db_user1 = self._common_user_test(201, scst=scst1)
|
||||
db_user2 = self._common_user_test(200, scst=scst2)
|
||||
self.assertEqual(db_user1['_id'], db_user2['_id'])
|
||||
|
||||
# Now there should be two tokens.
|
||||
with self.app.test_request_context():
|
||||
tokens = self.app.data.driver.db['tokens']
|
||||
self.assertIsNotNone(tokens.find_one({'user': db_user1['_id'], 'token': scst1}))
|
||||
self.assertIsNotNone(tokens.find_one({'user': db_user1['_id'], 'token': scst2}))
|
||||
|
||||
# There should still be only one auth element for blender-id in the user doc.
|
||||
self.assertEqual(1, len(db_user1['auth']))
|
||||
|
||||
@responses.activate
|
||||
def test_authenticate_with_scst(self):
|
||||
# Make sure there is a user and SCST.
|
||||
db_user = self._common_user_test(201)
|
||||
|
||||
# Make a call that's authenticated with the SCST
|
||||
from pillar.api.utils import authentication as auth
|
||||
|
||||
subclient_id = self.app.config['BLENDER_ID_SUBCLIENT_ID']
|
||||
auth_header = self.make_header(TEST_SUBCLIENT_TOKEN, subclient_id)
|
||||
|
||||
with self.app.test_request_context(headers={'Authorization': auth_header}):
|
||||
self.assertTrue(auth.validate_token())
|
||||
self.assertIsNotNone(g.current_user)
|
||||
self.assertEqual(db_user['_id'], g.current_user['user_id'])
|
||||
|
||||
def _common_user_test(self, expected_status_code, scst=TEST_SUBCLIENT_TOKEN,
|
||||
expected_full_name=TEST_FULL_NAME,
|
||||
mock_happy_blender_id=True):
|
||||
if mock_happy_blender_id:
|
||||
self.mock_blenderid_validate_happy()
|
||||
|
||||
subclient_id = self.app.config['BLENDER_ID_SUBCLIENT_ID']
|
||||
resp = self.client.post('/api/blender_id/store_scst',
|
||||
data={'user_id': BLENDER_ID_TEST_USERID,
|
||||
'subclient_id': subclient_id,
|
||||
'token': scst})
|
||||
self.assertEqual(expected_status_code, resp.status_code, resp.data)
|
||||
|
||||
user_info = json.loads(resp.data) # {'status': 'success', 'subclient_user_id': '...'}
|
||||
self.assertEqual('success', user_info['status'])
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Check that the user was correctly updated
|
||||
users = self.app.data.driver.db['users']
|
||||
db_user = users.find_one(ObjectId(user_info['subclient_user_id']))
|
||||
self.assertIsNotNone(db_user, 'user %r not found' % user_info['subclient_user_id'])
|
||||
|
||||
self.assertEqual(TEST_EMAIL_ADDRESS, db_user['email'])
|
||||
self.assertEqual(expected_full_name, db_user['full_name'])
|
||||
# self.assertEqual(TEST_SUBCLIENT_TOKEN, db_user['auth'][0]['token'])
|
||||
self.assertEqual(str(BLENDER_ID_TEST_USERID), db_user['auth'][0]['user_id'])
|
||||
self.assertEqual('blender-id', db_user['auth'][0]['provider'])
|
||||
|
||||
# Check that the token was succesfully stored.
|
||||
tokens = self.app.data.driver.db['tokens']
|
||||
db_token = tokens.find_one({'user': db_user['_id'],
|
||||
'token': scst})
|
||||
self.assertIsNotNone(db_token)
|
||||
|
||||
return db_user
|
46
tests/test_api/test_encoding.py
Normal file
46
tests/test_api/test_encoding.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Test cases for the zencoder notifications."""
|
||||
import json
|
||||
|
||||
from pillar.tests import AbstractPillarTest
|
||||
|
||||
|
||||
class ZencoderNotificationTest(AbstractPillarTest):
|
||||
|
||||
def test_missing_secret(self):
|
||||
with self.app.test_request_context():
|
||||
resp = self.client.post('/api/encoding/zencoder/notifications')
|
||||
self.assertEqual(401, resp.status_code)
|
||||
|
||||
def test_wrong_secret(self):
|
||||
with self.app.test_request_context():
|
||||
resp = self.client.post('/api/encoding/zencoder/notifications',
|
||||
headers={'X-Zencoder-Notification-Secret': 'koro'})
|
||||
self.assertEqual(401, resp.status_code)
|
||||
|
||||
def test_good_secret_existing_file(self):
|
||||
self.ensure_file_exists(file_overrides={
|
||||
'processing': {'backend': 'zencoder',
|
||||
'job_id': 'koro-007',
|
||||
'status': 'processing'}
|
||||
})
|
||||
|
||||
with self.app.test_request_context():
|
||||
secret = self.app.config['ZENCODER_NOTIFICATIONS_SECRET']
|
||||
resp = self.client.post('/api/encoding/zencoder/notifications',
|
||||
data=json.dumps({'job': {'id': 'koro-007',
|
||||
'state': 'done'},
|
||||
'outputs': [{
|
||||
'format': 'jpg',
|
||||
'height': 1080,
|
||||
'width': 2048,
|
||||
'file_size_in_bytes': 15,
|
||||
'md5_checksum': None,
|
||||
}],
|
||||
'input': {
|
||||
'duration_in_ms': 5000,
|
||||
}}),
|
||||
headers={'X-Zencoder-Notification-Secret': secret,
|
||||
'Content-Type': 'application/json'})
|
||||
|
||||
# TODO: check that the file in MongoDB is actually updated properly.
|
||||
self.assertEqual(204, resp.status_code)
|
58
tests/test_api/test_file_caching.py
Normal file
58
tests/test_api/test_file_caching.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Test cases for the /files/{id} entrypoint, testing cache behaviour."""
|
||||
|
||||
import bson.tz_util
|
||||
import datetime
|
||||
from eve import RFC1123_DATE_FORMAT
|
||||
|
||||
from pillar.tests import AbstractPillarTest
|
||||
|
||||
|
||||
class FileCachingTest(AbstractPillarTest):
|
||||
|
||||
def test_nonexistant_file(self):
|
||||
with self.app.test_request_context():
|
||||
resp = self.client.get('/api/files/12345')
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
def test_existing_file(self):
|
||||
file_id, _ = self.ensure_file_exists()
|
||||
|
||||
resp = self.client.get('/api/files/%s' % file_id)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_if_modified_304(self):
|
||||
with self.app.test_request_context():
|
||||
# Make sure the file link has not expired.
|
||||
expires = datetime.datetime.now(tz=bson.tz_util.utc) + datetime.timedelta(minutes=1)
|
||||
file_id, file_doc = self.ensure_file_exists(file_overrides={
|
||||
u'link_expires': expires
|
||||
})
|
||||
|
||||
updated = file_doc['_updated'].strftime(RFC1123_DATE_FORMAT)
|
||||
resp = self.client.get('/api/files/%s' % file_id,
|
||||
headers={'if_modified_since': updated})
|
||||
self.assertEqual(304, resp.status_code)
|
||||
|
||||
def test_if_modified_200(self):
|
||||
file_id, file_doc = self.ensure_file_exists()
|
||||
|
||||
delta = datetime.timedelta(days=-1)
|
||||
|
||||
with self.app.test_request_context():
|
||||
updated = (file_doc['_updated'] + delta).strftime(RFC1123_DATE_FORMAT)
|
||||
resp = self.client.get('/api/files/%s' % file_id,
|
||||
headers={'if_modified_since': updated})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_if_modified_link_expired(self):
|
||||
with self.app.test_request_context():
|
||||
# Make sure the file link has expired.
|
||||
expires = datetime.datetime.now(tz=bson.tz_util.utc) - datetime.timedelta(seconds=1)
|
||||
file_id, file_doc = self.ensure_file_exists(file_overrides={
|
||||
u'link_expires': expires
|
||||
})
|
||||
|
||||
updated = file_doc['_updated'].strftime(RFC1123_DATE_FORMAT)
|
||||
resp = self.client.get('/api/files/%s' % file_id,
|
||||
headers={'if_modified_since': updated})
|
||||
self.assertEqual(200, resp.status_code)
|
235
tests/test_api/test_file_storage.py
Normal file
235
tests/test_api/test_file_storage.py
Normal file
@@ -0,0 +1,235 @@
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pillar.tests.common_test_data as ctd
|
||||
import rsa.randnum
|
||||
from pillar.tests import AbstractPillarTest, TEST_EMAIL_ADDRESS
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
|
||||
class FileStorageTest(AbstractPillarTest):
|
||||
def fake_file(self, filename, content_type):
|
||||
return FileStorage(filename=filename,
|
||||
name='file', # form field name
|
||||
content_type=content_type)
|
||||
|
||||
def test_override_content_type(self):
|
||||
from pillar.api.file_storage import override_content_type
|
||||
|
||||
fake = self.fake_file('compressed.blend', 'jemoeder')
|
||||
override_content_type(fake)
|
||||
self.assertEqual('application/x-blender', fake.content_type)
|
||||
self.assertEqual('application/x-blender', fake.mimetype)
|
||||
|
||||
fake = self.fake_file('blend.mp3', 'application/octet-stream')
|
||||
override_content_type(fake)
|
||||
self.assertEqual('audio/mpeg', fake.content_type)
|
||||
self.assertEqual('audio/mpeg', fake.mimetype)
|
||||
|
||||
# Official one is audio/mpeg, but if the browser gives audio/XXX, it should
|
||||
# just be used.
|
||||
fake = self.fake_file('blend.mp3', 'audio/mp3')
|
||||
override_content_type(fake)
|
||||
self.assertEqual('audio/mp3', fake.content_type)
|
||||
self.assertEqual('audio/mp3', fake.mimetype)
|
||||
|
||||
fake = self.fake_file('mp3.mkv', 'application/octet-stream')
|
||||
override_content_type(fake)
|
||||
self.assertEqual('video/x-matroska', fake.content_type)
|
||||
self.assertEqual('video/x-matroska', fake.mimetype)
|
||||
|
||||
fake = self.fake_file('mkv.mp3.avi.mp4', 'application/octet-stream')
|
||||
override_content_type(fake)
|
||||
self.assertEqual('video/mp4', fake.content_type)
|
||||
self.assertEqual('video/mp4', fake.mimetype)
|
||||
|
||||
fake = self.fake_file('mkv.mp3.avi.mp4.unknown', 'application/awesome-type')
|
||||
override_content_type(fake)
|
||||
self.assertEqual('application/awesome-type', fake.content_type)
|
||||
self.assertEqual('application/awesome-type', fake.mimetype)
|
||||
|
||||
|
||||
class TempDirTest(AbstractPillarTest):
|
||||
def test_tempfiles_location(self):
|
||||
# After importing the application, tempfiles should be created in the STORAGE_DIR
|
||||
storage = self.app.config['STORAGE_DIR']
|
||||
self.assertEqual(os.environ['TMP'], storage)
|
||||
self.assertNotIn('TEMP', os.environ)
|
||||
self.assertNotIn('TMPDIR', os.environ)
|
||||
|
||||
handle, filename = tempfile.mkstemp()
|
||||
os.close(handle)
|
||||
dirname = os.path.dirname(filename)
|
||||
self.assertEqual(dirname, storage)
|
||||
|
||||
tmpfile = tempfile.NamedTemporaryFile()
|
||||
dirname = os.path.dirname(tmpfile.name)
|
||||
self.assertEqual(dirname, storage)
|
||||
|
||||
|
||||
class FileAccessTest(AbstractPillarTest):
|
||||
def __test_link_stripping(self):
|
||||
"""Subscribers should get all links, but non-subscribers only a subset."""
|
||||
|
||||
img_file_id, _ = self.ensure_file_exists()
|
||||
video_file_id, _ = self.ensure_file_exists({
|
||||
u'_id': None,
|
||||
u'content_type': u'video/matroska',
|
||||
'variations': [
|
||||
{
|
||||
'format': 'mp4',
|
||||
'height': 446,
|
||||
'width': 1064,
|
||||
'length': 219399183,
|
||||
'link': 'https://hosting/filename.mp4',
|
||||
'content_type': 'video/mp4',
|
||||
'duration': 44,
|
||||
'size': '446p',
|
||||
'file_path': 'c1/c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0.mp4',
|
||||
'md5': 'c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0'
|
||||
},
|
||||
{
|
||||
'format': 'webm',
|
||||
'height': 446,
|
||||
'width': 1064,
|
||||
'length': 31219520,
|
||||
'link': 'https://hosting/filename.webm',
|
||||
'content_type': 'video/webm',
|
||||
'duration': 44,
|
||||
'md5': 'c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0',
|
||||
'file_path': 'c1/c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0.webm',
|
||||
'size': '446p'
|
||||
}
|
||||
]
|
||||
|
||||
})
|
||||
blend_file_id, _ = self.ensure_file_exists({u'_id': None,
|
||||
u'content_type': u'application/x-blender',
|
||||
u'variations': None})
|
||||
|
||||
nonsub_user_id = self.create_user(user_id='cafef00dcafef00d00000000', roles=())
|
||||
sub_user_id = self.create_user(user_id='cafef00dcafef00dcafef00d', roles=(u'subscriber',))
|
||||
demo_user_id = self.create_user(user_id='cafef00dcafef00ddeadbeef', roles=(u'demo',))
|
||||
admin_user_id = self.create_user(user_id='aaaaaaaaaaaaaaaaaaaaaaaa', roles=(u'admin',))
|
||||
|
||||
self.create_valid_auth_token(nonsub_user_id, 'nonsub-token')
|
||||
self.create_valid_auth_token(sub_user_id, 'sub-token')
|
||||
self.create_valid_auth_token(demo_user_id, 'demo-token')
|
||||
self.create_valid_auth_token(admin_user_id, 'admin-token')
|
||||
|
||||
def assert_variations(file_id, has_access, token=None):
|
||||
if token:
|
||||
headers = {'Authorization': self.make_header(token)}
|
||||
else:
|
||||
headers = None
|
||||
resp = self.client.get('/api/files/%s' % file_id, headers=headers)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
file_info = json.loads(resp.data)
|
||||
|
||||
self.assertEqual(has_access, 'link' in file_info)
|
||||
self.assertEqual(has_access, 'link_expires' in file_info)
|
||||
return file_info
|
||||
|
||||
# Unauthenticated user and non-subscriber should still get the file, but limited.
|
||||
file_info = assert_variations(img_file_id, False)
|
||||
self.assertEqual({'t', 'h', 'b'}, {var['size'] for var in file_info['variations']})
|
||||
file_info = assert_variations(img_file_id, False, 'nonsub-token')
|
||||
self.assertEqual({'t', 'h', 'b'}, {var['size'] for var in file_info['variations']})
|
||||
|
||||
# Authenticated subscribers, demos and admins should get the full file.
|
||||
file_info = assert_variations(img_file_id, True, 'sub-token')
|
||||
self.assertEqual({'t', 'h', 'b'}, {var['size'] for var in file_info['variations']})
|
||||
file_info = assert_variations(img_file_id, True, 'demo-token')
|
||||
self.assertEqual({'t', 'h', 'b'}, {var['size'] for var in file_info['variations']})
|
||||
file_info = assert_variations(img_file_id, True, 'admin-token')
|
||||
self.assertEqual({'t', 'h', 'b'}, {var['size'] for var in file_info['variations']})
|
||||
|
||||
# Unauthenticated user and non-subscriber should get no links what so ever.
|
||||
file_info = assert_variations(video_file_id, False)
|
||||
self.assertEqual([], file_info['variations'])
|
||||
file_info = assert_variations(video_file_id, False, 'nonsub-token')
|
||||
self.assertEqual([], file_info['variations'])
|
||||
|
||||
# Authenticated subscribers, demos and admins should get the full file.
|
||||
file_info = assert_variations(video_file_id, True, 'sub-token')
|
||||
self.assertEqual({'mp4', 'webm'}, {var['format'] for var in file_info['variations']})
|
||||
file_info = assert_variations(video_file_id, True, 'demo-token')
|
||||
self.assertEqual({'mp4', 'webm'}, {var['format'] for var in file_info['variations']})
|
||||
file_info = assert_variations(video_file_id, True, 'admin-token')
|
||||
self.assertEqual({'mp4', 'webm'}, {var['format'] for var in file_info['variations']})
|
||||
|
||||
# Unauthenticated user and non-subscriber should get no links what so ever.
|
||||
file_info = assert_variations(blend_file_id, False)
|
||||
self.assertIsNone(file_info['variations'])
|
||||
file_info = assert_variations(blend_file_id, False, 'nonsub-token')
|
||||
self.assertIsNone(file_info['variations'])
|
||||
|
||||
# Authenticated subscribers, demos and admins should get the full file.
|
||||
file_info = assert_variations(blend_file_id, True, 'sub-token')
|
||||
self.assertIsNone(file_info['variations'])
|
||||
file_info = assert_variations(blend_file_id, True, 'demo-token')
|
||||
self.assertIsNone(file_info['variations'])
|
||||
file_info = assert_variations(blend_file_id, True, 'admin-token')
|
||||
self.assertIsNone(file_info['variations'])
|
||||
|
||||
|
||||
class FileMaxSizeTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
|
||||
self.project_id, _ = self.ensure_project_exists()
|
||||
self.user_id = self.create_user(groups=[ctd.EXAMPLE_ADMIN_GROUP_ID],
|
||||
roles=set())
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
|
||||
def test_upload_small_file(self):
|
||||
file_size = 10 * 2 ** 10
|
||||
test_file = self.create_test_file(file_size)
|
||||
|
||||
resp = self.post('/api/storage/stream/%s' % self.project_id,
|
||||
expected_status=201,
|
||||
auth_token='token',
|
||||
files={'file': (test_file, 'test_file.bin')})
|
||||
stream_info = resp.json()
|
||||
file_id = stream_info['file_id']
|
||||
|
||||
self.assert_file_doc_ok(file_id, file_size)
|
||||
|
||||
def test_upload_too_large_file(self):
|
||||
file_size = 30 * 2 ** 10
|
||||
test_file = self.create_test_file(file_size)
|
||||
|
||||
self.post('/api/storage/stream/%s' % self.project_id,
|
||||
expected_status=413,
|
||||
auth_token='token',
|
||||
files={'file': (test_file, 'test_file.bin')})
|
||||
|
||||
def test_upload_large_file_subscriber(self):
|
||||
self.badger(TEST_EMAIL_ADDRESS, 'subscriber', 'grant')
|
||||
|
||||
file_size = 30 * 2 ** 10
|
||||
test_file = self.create_test_file(file_size)
|
||||
|
||||
resp = self.post('/api/storage/stream/%s' % self.project_id,
|
||||
expected_status=201,
|
||||
auth_token='token',
|
||||
files={'file': (test_file, 'test_file.bin')})
|
||||
stream_info = resp.json()
|
||||
file_id = stream_info['file_id']
|
||||
|
||||
self.assert_file_doc_ok(file_id, file_size)
|
||||
|
||||
def assert_file_doc_ok(self, file_id, file_size):
|
||||
with self.app.test_request_context():
|
||||
from pillar.api.utils import str2id
|
||||
|
||||
# Check that the file exists in MongoDB
|
||||
files_coll = self.app.data.driver.db['files']
|
||||
db_file = files_coll.find_one({'_id': str2id(file_id)})
|
||||
self.assertEqual(file_size, db_file['length'])
|
||||
|
||||
def create_test_file(self, file_size_bytes):
|
||||
fileob = io.BytesIO(rsa.randnum.read_random_bits(file_size_bytes * 8))
|
||||
return fileob
|
118
tests/test_api/test_link_refresh.py
Normal file
118
tests/test_api/test_link_refresh.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Tests chunked refreshing of links."""
|
||||
import json
|
||||
|
||||
from bson import ObjectId, tz_util
|
||||
from pillar.tests import AbstractPillarTest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
class LinkRefreshTest(AbstractPillarTest):
|
||||
# noinspection PyMethodOverriding
|
||||
def ensure_file_exists(self, file_overrides):
|
||||
file_id = file_overrides[u'_id']
|
||||
|
||||
file_overrides.update({
|
||||
u'_id': ObjectId(file_id),
|
||||
u'name': '%s.png' % file_id,
|
||||
u'file_path': '%s.png' % file_id,
|
||||
u'backend': 'unittest',
|
||||
})
|
||||
|
||||
return super(LinkRefreshTest, self).ensure_file_exists(file_overrides)
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
super(LinkRefreshTest, self).setUp(**kwargs)
|
||||
|
||||
self.project_id, self.project = self.ensure_project_exists()
|
||||
self.now = datetime.now(tz=tz_util.utc)
|
||||
|
||||
# All expired
|
||||
|
||||
expiry = [datetime(2016, 3, 22, 9, 28, 1, tzinfo=tz_util.utc),
|
||||
datetime(2016, 3, 22, 9, 28, 2, tzinfo=tz_util.utc),
|
||||
datetime(2016, 3, 22, 9, 28, 3, tzinfo=tz_util.utc),
|
||||
self.now + timedelta(minutes=30), self.now + timedelta(minutes=90), ]
|
||||
ids_and_files = [self.ensure_file_exists(file_overrides={
|
||||
u'_id': 'cafef00ddeadbeef0000000%i' % file_idx,
|
||||
u'link_expires': expiry})
|
||||
for file_idx, expiry in enumerate(expiry)]
|
||||
|
||||
self.file_id, self.file = zip(*ids_and_files)
|
||||
self.file = list(self.file) # otherwise it's a tuple, which is immutable.
|
||||
|
||||
# Get initial expiries from the database (it has a different precision than datetime).
|
||||
self.expiry = [file_doc['link_expires'] for file_doc in self.file]
|
||||
|
||||
# Should be ordered by link expiry
|
||||
assert self.file[0]['link_expires'] < self.file[1]['link_expires']
|
||||
assert self.file[1]['link_expires'] < self.file[2]['link_expires']
|
||||
assert self.file[2]['link_expires'] < self.file[3]['link_expires']
|
||||
assert self.file[3]['link_expires'] < self.file[4]['link_expires']
|
||||
|
||||
# Files 0-2 should be expired already
|
||||
assert self.file[2]['link_expires'] < self.now
|
||||
|
||||
# Files 3-4 should not be expired yet
|
||||
assert self.now < self.file[3]['link_expires']
|
||||
|
||||
def _reload_from_db(self):
|
||||
files_collection = self.app.data.driver.db['files']
|
||||
|
||||
for idx, file_id in enumerate(self.file_id):
|
||||
self.file[idx] = files_collection.find_one(file_id)
|
||||
|
||||
def test_link_refresh(self):
|
||||
hour_from_now = 3600
|
||||
validity_seconds = self.app.config['FILE_LINK_VALIDITY']['unittest']
|
||||
refreshed_lower_limit = self.now + timedelta(seconds=0.9 * validity_seconds)
|
||||
|
||||
with self.app.test_request_context():
|
||||
from pillar.api import file_storage
|
||||
|
||||
# First run: refresh files 0 and 1, don't touch 2-4 (due to chunking).
|
||||
file_storage.refresh_links_for_project(self.project_id, 2, hour_from_now)
|
||||
self._reload_from_db()
|
||||
self.assertLess(refreshed_lower_limit, self.file[0]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[1]['link_expires'])
|
||||
self.assertEqual(self.expiry[2], self.file[2]['link_expires'])
|
||||
self.assertEqual(self.expiry[3], self.file[3]['link_expires'])
|
||||
self.assertEqual(self.expiry[4], self.file[4]['link_expires'])
|
||||
|
||||
# Second run: refresh files 2 (expired) and 3 (within timedelta).
|
||||
file_storage.refresh_links_for_project(self.project_id, 2, hour_from_now)
|
||||
self._reload_from_db()
|
||||
self.assertLess(refreshed_lower_limit, self.file[0]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[1]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[2]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[3]['link_expires'])
|
||||
self.assertEqual(self.expiry[4], self.file[4]['link_expires'])
|
||||
|
||||
# Third run: refresh nothing, file 4 is out of timedelta.
|
||||
file_storage.refresh_links_for_project(self.project_id, 2, hour_from_now)
|
||||
self._reload_from_db()
|
||||
self.assertLess(refreshed_lower_limit, self.file[0]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[1]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[2]['link_expires'])
|
||||
self.assertLess(refreshed_lower_limit, self.file[3]['link_expires'])
|
||||
self.assertEqual(self.expiry[4], self.file[4]['link_expires'])
|
||||
|
||||
def test_refresh_upon_fetch(self):
|
||||
"""Test that expired links are refreshed when we fetch a file document."""
|
||||
|
||||
validity_seconds = self.app.config['FILE_LINK_VALIDITY']['unittest']
|
||||
refreshed_lower_limit = self.now + timedelta(seconds=0.9 * validity_seconds)
|
||||
|
||||
resp = self.client.get('/api/files/%s' % self.file_id[0])
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
# Test the returned document.
|
||||
file_doc = json.loads(resp.data)
|
||||
expires = datetime.strptime(file_doc['link_expires'],
|
||||
self.app.config['RFC1123_DATE_FORMAT'])
|
||||
expires = expires.replace(tzinfo=tz_util.utc)
|
||||
self.assertLess(refreshed_lower_limit, expires)
|
||||
|
||||
# Test the database.
|
||||
with self.app.test_request_context():
|
||||
self._reload_from_db()
|
||||
self.assertLess(refreshed_lower_limit, self.file[0]['link_expires'])
|
77
tests/test_api/test_local_auth.py
Normal file
77
tests/test_api/test_local_auth.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import json
|
||||
import datetime
|
||||
|
||||
from bson import tz_util
|
||||
|
||||
from pillar.tests import AbstractPillarTest
|
||||
|
||||
|
||||
class LocalAuthTest(AbstractPillarTest):
|
||||
def create_test_user(self):
|
||||
from pillar.api import local_auth
|
||||
with self.app.test_request_context():
|
||||
user_id = local_auth.create_local_user('koro@example.com', 'oti')
|
||||
return user_id
|
||||
|
||||
def test_create_local_user(self):
|
||||
user_id = self.create_test_user()
|
||||
|
||||
with self.app.test_request_context():
|
||||
users = self.app.data.driver.db['users']
|
||||
db_user = users.find_one(user_id)
|
||||
self.assertIsNotNone(db_user)
|
||||
|
||||
def test_login_existing_user(self):
|
||||
user_id = self.create_test_user()
|
||||
|
||||
resp = self.client.post('/api/auth/make-token',
|
||||
data={'username': 'koro',
|
||||
'password': 'oti'})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
token_info = json.loads(resp.data)
|
||||
token = token_info['token']
|
||||
|
||||
headers = {'Authorization': self.make_header(token)}
|
||||
resp = self.client.get('/api/users/%s' % user_id,
|
||||
headers=headers)
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
def test_login_expired_token(self):
|
||||
user_id = self.create_test_user()
|
||||
|
||||
resp = self.client.post('/api/auth/make-token',
|
||||
data={'username': 'koro',
|
||||
'password': 'oti'})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
token_info = json.loads(resp.data)
|
||||
token = token_info['token']
|
||||
|
||||
with self.app.test_request_context():
|
||||
tokens = self.app.data.driver.db['tokens']
|
||||
|
||||
exp = datetime.datetime.now(tz=tz_util.utc) - datetime.timedelta(1)
|
||||
result = tokens.update_one({'token': token},
|
||||
{'$set': {'expire_time': exp}})
|
||||
self.assertEqual(1, result.modified_count)
|
||||
|
||||
# Do something restricted.
|
||||
headers = {'Authorization': self.make_header(token)}
|
||||
resp = self.client.put('/api/users/%s' % user_id,
|
||||
headers=headers)
|
||||
self.assertEqual(403, resp.status_code, resp.data)
|
||||
|
||||
def test_login_nonexistant_user(self):
|
||||
resp = self.client.post('/api/auth/make-token',
|
||||
data={'username': 'proog',
|
||||
'password': 'oti'})
|
||||
|
||||
self.assertEqual(403, resp.status_code, resp.data)
|
||||
|
||||
def test_login_bad_pwd(self):
|
||||
resp = self.client.post('/api/auth/make-token',
|
||||
data={'username': 'koro',
|
||||
'password': 'koro'})
|
||||
|
||||
self.assertEqual(403, resp.status_code, resp.data)
|
381
tests/test_api/test_nodes.py
Normal file
381
tests/test_api/test_nodes.py
Normal file
@@ -0,0 +1,381 @@
|
||||
import json
|
||||
|
||||
import pillar.tests.common_test_data as ctd
|
||||
from bson import ObjectId
|
||||
from eve.methods.post import post_internal
|
||||
from eve.methods.put import put_internal
|
||||
from flask import g
|
||||
from mock import mock
|
||||
from pillar.tests import AbstractPillarTest
|
||||
from werkzeug.exceptions import UnprocessableEntity
|
||||
|
||||
|
||||
class NodeContentTypeTest(AbstractPillarTest):
|
||||
def mkfile(self, file_id, content_type):
|
||||
file_id, _ = self.ensure_file_exists(file_overrides={
|
||||
'_id': ObjectId(file_id),
|
||||
'content_type': content_type})
|
||||
return file_id
|
||||
|
||||
def test_node_types(self):
|
||||
"""Tests that the node's content_type properties is updated correctly from its file."""
|
||||
|
||||
file_id_image = self.mkfile('cafef00dcafef00dcafef00d', 'image/jpeg')
|
||||
file_id_video = self.mkfile('cafef00dcafef00dcafecafe', 'video/matroska')
|
||||
file_id_blend = self.mkfile('cafef00dcafef00ddeadbeef', 'application/x-blender')
|
||||
|
||||
user_id = self.create_user()
|
||||
project_id, _ = self.ensure_project_exists()
|
||||
|
||||
def perform_test(file_id, expected_type):
|
||||
node_doc = {'picture': file_id_image,
|
||||
'description': '',
|
||||
'project': project_id,
|
||||
'node_type': 'asset',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': ''},
|
||||
'name': 'My first test node'}
|
||||
|
||||
with self.app.test_request_context():
|
||||
g.current_user = {'user_id': user_id,
|
||||
# This group is hardcoded in the EXAMPLE_PROJECT.
|
||||
'groups': [ObjectId('5596e975ea893b269af85c0e')],
|
||||
'roles': {u'subscriber', u'admin'}}
|
||||
nodes = self.app.data.driver.db['nodes']
|
||||
|
||||
# Create the node.
|
||||
r, _, _, status = post_internal('nodes', node_doc)
|
||||
self.assertEqual(status, 201, r)
|
||||
node_id = r['_id']
|
||||
|
||||
# Get from database to check its default content type.
|
||||
db_node = nodes.find_one(node_id)
|
||||
self.assertNotIn('content_type', db_node['properties'])
|
||||
|
||||
# PUT it again, without a file -- should be blocked.
|
||||
self.assertRaises(UnprocessableEntity, put_internal, 'nodes', node_doc,
|
||||
_id=node_id)
|
||||
|
||||
# PUT it with a file.
|
||||
node_doc['properties']['file'] = str(file_id)
|
||||
r, _, _, status = put_internal('nodes', node_doc, _id=node_id)
|
||||
self.assertEqual(status, 200, r)
|
||||
|
||||
# Get from database to test the final node.
|
||||
db_node = nodes.find_one(node_id)
|
||||
self.assertEqual(expected_type, db_node['properties']['content_type'])
|
||||
|
||||
perform_test(file_id_image, 'image')
|
||||
perform_test(file_id_video, 'video')
|
||||
perform_test(file_id_blend, 'file')
|
||||
|
||||
def test_get_project_node_type(self):
|
||||
user_id = self.create_user()
|
||||
self.create_valid_auth_token(user_id, 'token')
|
||||
project_id, _ = self.ensure_project_exists()
|
||||
|
||||
resp = self.client.get('/api/projects/%s?node_type=asset' % project_id)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
data = json.loads(resp.data)
|
||||
self.assertEqual([u'GET'], data['allowed_methods'])
|
||||
|
||||
def test_default_picture_image_asset(self):
|
||||
from pillar.api.utils import dumps
|
||||
|
||||
file_id_image = self.mkfile(24 * 'a', 'image/jpeg')
|
||||
file_id_video = self.mkfile(24 * 'b', 'video/matroska')
|
||||
file_id_image_spec = self.mkfile(24 * 'c', 'image/jpeg')
|
||||
file_id_image_bump = self.mkfile(24 * 'd', 'image/jpeg')
|
||||
|
||||
user_id = self.create_user(groups=[ctd.EXAMPLE_ADMIN_GROUP_ID])
|
||||
self.create_valid_auth_token(user_id, 'token')
|
||||
project_id, _ = self.ensure_project_exists()
|
||||
|
||||
def test_for(node, expected_picture_id):
|
||||
# Create the node
|
||||
resp = self.client.post('/api/nodes',
|
||||
data=dumps(node),
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(resp.status_code, 201, resp.data)
|
||||
node_id = json.loads(resp.data)['_id']
|
||||
|
||||
# Test that the node has the attached file as picture.
|
||||
resp = self.client.get('/api/nodes/%s' % node_id,
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(resp.status_code, 200, resp.data)
|
||||
json_node = json.loads(resp.data)
|
||||
|
||||
if expected_picture_id:
|
||||
self.assertEqual(ObjectId(json_node['picture']), expected_picture_id)
|
||||
else:
|
||||
self.assertNotIn('picture', json_node)
|
||||
|
||||
# Image asset node
|
||||
test_for({'description': '',
|
||||
'project': project_id,
|
||||
'node_type': 'asset',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': '',
|
||||
'file': file_id_image},
|
||||
'name': 'Image asset'},
|
||||
file_id_image)
|
||||
|
||||
# Video asset node, should not get default picture
|
||||
test_for({'description': '',
|
||||
'project': project_id,
|
||||
'node_type': 'asset',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': '',
|
||||
'file': file_id_video},
|
||||
'name': 'Video asset'},
|
||||
None)
|
||||
|
||||
# Texture node, should default to colour map.
|
||||
test_for({'description': '',
|
||||
'project': project_id,
|
||||
'node_type': 'texture',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': '',
|
||||
'files': [
|
||||
{'file': file_id_image_bump, 'map_type': 'bump'},
|
||||
{'file': file_id_image_spec, 'map_type': 'specular'},
|
||||
{'file': file_id_image, 'map_type': 'color'},
|
||||
],
|
||||
'is_tileable': False,
|
||||
'aspect_ratio': 0.0,
|
||||
'is_landscape': False,
|
||||
'resolution': '',
|
||||
},
|
||||
'name': 'Texture node'},
|
||||
file_id_image)
|
||||
|
||||
# Texture node, should default to first image if there is no colour map.
|
||||
test_for({'description': '',
|
||||
'project': project_id,
|
||||
'node_type': 'texture',
|
||||
'user': user_id,
|
||||
'properties': {'status': 'published',
|
||||
'tags': [],
|
||||
'order': 0,
|
||||
'categories': '',
|
||||
'files': [
|
||||
{'file': file_id_image_bump, 'map_type': 'bump'},
|
||||
{'file': file_id_image_spec, 'map_type': 'specular'},
|
||||
],
|
||||
'is_tileable': False,
|
||||
'aspect_ratio': 0.0,
|
||||
'is_landscape': False,
|
||||
'resolution': '',
|
||||
},
|
||||
'name': 'Texture node'},
|
||||
file_id_image_bump)
|
||||
|
||||
|
||||
class NodeOwnerTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
|
||||
self.user_id = self.create_user()
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
self.project_id, _ = self.ensure_project_exists(
|
||||
project_overrides={'permissions': {
|
||||
'users': [
|
||||
{'user': self.user_id,
|
||||
'methods': ['GET', 'PUT', 'POST', 'DELETE']}
|
||||
]
|
||||
}}
|
||||
)
|
||||
|
||||
def test_create_with_explicit_owner(self):
|
||||
test_node = {
|
||||
'project': self.project_id,
|
||||
'node_type': 'asset',
|
||||
'name': 'test with user',
|
||||
'user': self.user_id,
|
||||
'properties': {},
|
||||
}
|
||||
self._test_user(test_node)
|
||||
|
||||
def test_create_with_implicit_owner(self):
|
||||
test_node = {
|
||||
'project': self.project_id,
|
||||
'node_type': 'asset',
|
||||
'name': 'test with user',
|
||||
'properties': {},
|
||||
}
|
||||
self._test_user(test_node)
|
||||
|
||||
def _test_user(self, test_node):
|
||||
from pillar.api.utils import dumps
|
||||
|
||||
resp = self.client.post('/api/nodes', data=dumps(test_node),
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json'})
|
||||
self.assertEqual(201, resp.status_code, resp.data)
|
||||
created = json.loads(resp.data)
|
||||
resp = self.client.get('/api/nodes/%s' % created['_id'],
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
json_node = json.loads(resp.data)
|
||||
self.assertEqual(str(self.user_id), json_node['user'])
|
||||
|
||||
|
||||
class NodeSharingTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
|
||||
self.project_id, _ = self.ensure_project_exists(
|
||||
project_overrides={
|
||||
u'category': 'home',
|
||||
u'permissions':
|
||||
{u'groups': [{u'group': ctd.EXAMPLE_ADMIN_GROUP_ID,
|
||||
u'methods': [u'GET', u'POST', u'PUT', u'DELETE']}],
|
||||
u'users': [],
|
||||
u'world': []}}
|
||||
)
|
||||
self.user_id = self.create_user(groups=[ctd.EXAMPLE_ADMIN_GROUP_ID])
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
|
||||
# Create a node to share
|
||||
resp = self.post('/api/nodes', expected_status=201, auth_token='token', json={
|
||||
'project': self.project_id,
|
||||
'node_type': 'asset',
|
||||
'name': str(self),
|
||||
'properties': {},
|
||||
})
|
||||
self.node_id = resp.json()['_id']
|
||||
|
||||
def _check_share_data(self, share_data):
|
||||
base_url = self.app.config['SHORT_LINK_BASE_URL']
|
||||
|
||||
self.assertEqual(6, len(share_data['short_code']))
|
||||
self.assertTrue(share_data['short_link'].startswith(base_url))
|
||||
|
||||
def test_share_node(self):
|
||||
# Share the node
|
||||
resp = self.post('/api/nodes/%s/share' % self.node_id, auth_token='token',
|
||||
expected_status=201)
|
||||
share_data = resp.json()
|
||||
|
||||
self._check_share_data(share_data)
|
||||
|
||||
def test_anonymous_access_shared_node(self):
|
||||
# Anonymous user should not have access
|
||||
self.get('/api/nodes/%s' % self.node_id, expected_status=403)
|
||||
|
||||
# Share the node
|
||||
self.post('/api/nodes/%s/share' % self.node_id, auth_token='token',
|
||||
expected_status=201)
|
||||
|
||||
# Check that an anonymous user has acces.
|
||||
resp = self.get('/api/nodes/%s' % self.node_id)
|
||||
self.assertEqual(str(self.node_id), resp.json()['_id'])
|
||||
|
||||
def test_other_user_access_shared_node(self):
|
||||
# Share the node
|
||||
self.post('/api/nodes/%s/share' % self.node_id, auth_token='token',
|
||||
expected_status=201)
|
||||
|
||||
# Check that another user has access
|
||||
other_user_id = self.create_user(user_id=24 * 'a')
|
||||
self.create_valid_auth_token(other_user_id, 'other-token')
|
||||
resp = self.get('/api/nodes/%s' % self.node_id, auth_token='other-token')
|
||||
self.assertEqual(str(self.node_id), resp.json()['_id'])
|
||||
|
||||
def test_get_share_data__unshared_node(self):
|
||||
self.get('/api/nodes/%s/share' % self.node_id,
|
||||
expected_status=204,
|
||||
auth_token='token')
|
||||
|
||||
def test_get_share_data__shared_node(self):
|
||||
# Share the node first.
|
||||
self.post('/api/nodes/%s/share' % self.node_id, auth_token='token',
|
||||
expected_status=201)
|
||||
|
||||
# Then get its share info.
|
||||
resp = self.get('/api/nodes/%s/share' % self.node_id, auth_token='token')
|
||||
share_data = resp.json()
|
||||
|
||||
self._check_share_data(share_data)
|
||||
|
||||
def test_unauthenticated(self):
|
||||
self.post('/api/nodes/%s/share' % self.node_id,
|
||||
expected_status=403)
|
||||
|
||||
def test_other_user(self):
|
||||
other_user_id = self.create_user(user_id=24 * 'a')
|
||||
self.create_valid_auth_token(other_user_id, 'other-token')
|
||||
|
||||
self.post('/api/nodes/%s/share' % self.node_id,
|
||||
auth_token='other-token',
|
||||
expected_status=403)
|
||||
|
||||
def test_create_short_link(self):
|
||||
from pillar.api.nodes import create_short_code
|
||||
|
||||
with self.app.test_request_context():
|
||||
length = self.app.config['SHORT_CODE_LENGTH']
|
||||
|
||||
# We're testing a random process, so we have to repeat it
|
||||
# a few times to see if it really works.
|
||||
for _ in range(10000):
|
||||
short_code = create_short_code({})
|
||||
self.assertEqual(length, len(short_code))
|
||||
|
||||
def test_short_code_collision(self):
|
||||
# Create a second node that has already been shared.
|
||||
self.post('/api/nodes', expected_status=201, auth_token='token', json={
|
||||
'project': self.project_id,
|
||||
'node_type': 'asset',
|
||||
'name': 'collider',
|
||||
'properties': {},
|
||||
'short_code': 'takenX',
|
||||
})
|
||||
|
||||
# Mock create_short_code so that it returns predictable short codes.
|
||||
codes = ['takenX', 'takenX', 'freeXX']
|
||||
with mock.patch('pillar.api.nodes.create_short_code',
|
||||
side_effect=codes) as create_short_link:
|
||||
resp = self.post('/api/nodes/%s/share' % self.node_id, auth_token='token',
|
||||
expected_status=201)
|
||||
|
||||
share_data = resp.json()
|
||||
|
||||
self._check_share_data(share_data)
|
||||
self.assertEqual(3, create_short_link.call_count)
|
||||
|
||||
def test_projections(self):
|
||||
"""Projecting short_code should get us short_link as well."""
|
||||
|
||||
# Share the node
|
||||
resp = self.post('/api/nodes/%s/share' % self.node_id, auth_token='token',
|
||||
expected_status=201)
|
||||
share_data = resp.json()
|
||||
|
||||
# Get the node with short_code
|
||||
resp = self.get('/api/nodes/%s' % self.node_id,
|
||||
json={'projection': {'short_code': 1}})
|
||||
node = resp.json()
|
||||
self.assertEqual(node['short_code'], share_data['short_code'])
|
||||
self.assertTrue(node['short_link'].endswith(share_data['short_code']))
|
||||
|
||||
# Get the node without short_code
|
||||
resp = self.get('/api/nodes/%s' % self.node_id,
|
||||
qs={'projection': {'short_code': 0}})
|
||||
node = resp.json()
|
||||
self.assertNotIn('short_code', node)
|
||||
self.assertNotIn('short_link', node)
|
163
tests/test_api/test_patch.py
Normal file
163
tests/test_api/test_patch.py
Normal file
@@ -0,0 +1,163 @@
|
||||
from pillar.tests import AbstractPillarTest
|
||||
|
||||
|
||||
class PatchCommentTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
|
||||
self.project_id, proj = self.ensure_project_exists()
|
||||
admin_group_id = proj['permissions']['groups'][0]['group']
|
||||
|
||||
self.user_id = self.create_user(user_id=24 * 'a')
|
||||
self.owner_id = self.create_user(user_id=24 * 'b', groups=[admin_group_id])
|
||||
self.create_valid_auth_token(self.user_id, 'token')
|
||||
self.create_valid_auth_token(self.owner_id, 'owner-token')
|
||||
|
||||
# Create a node to attach the comment to
|
||||
asset = {'description': '',
|
||||
'project': self.project_id,
|
||||
'node_type': 'asset',
|
||||
'user': self.owner_id,
|
||||
'properties': {'status': 'published'},
|
||||
'name': 'Test asset'}
|
||||
|
||||
resp = self.post('/api/nodes', json=asset,
|
||||
auth_token='owner-token',
|
||||
expected_status=201)
|
||||
asset_id = resp.json()['_id']
|
||||
|
||||
# Create the comment
|
||||
comment = {'description': '',
|
||||
'project': self.project_id,
|
||||
'parent': asset_id,
|
||||
'node_type': 'comment',
|
||||
'user': self.owner_id,
|
||||
'properties': {'rating_positive': 0,
|
||||
'rating_negative': 0,
|
||||
'status': 'published',
|
||||
'confidence': 0,
|
||||
'content': 'Purrrr kittycat',
|
||||
},
|
||||
'name': 'Test comment'}
|
||||
|
||||
resp = self.post('/api/nodes', json=comment,
|
||||
auth_token='owner-token',
|
||||
expected_status=201)
|
||||
comment_info = resp.json()
|
||||
self.node_url = '/api/nodes/%s' % comment_info['_id']
|
||||
|
||||
def test_upvote_other_comment(self):
|
||||
# Patch the node
|
||||
res = self.patch(self.node_url,
|
||||
json={'op': 'upvote'},
|
||||
auth_token='token').json()
|
||||
self.assertEqual(1, res['properties']['rating_positive'])
|
||||
self.assertEqual(0, res['properties']['rating_negative'])
|
||||
|
||||
# Get the node again, to inspect its changed state.
|
||||
patched_node = self.get(self.node_url, auth_token='token').json()
|
||||
self.assertEqual(1, patched_node['properties']['rating_positive'])
|
||||
self.assertEqual(0, patched_node['properties']['rating_negative'])
|
||||
self.assertEqual({u'user': str(self.user_id), u'is_positive': True},
|
||||
patched_node['properties']['ratings'][0])
|
||||
self.assertEqual(1, len(patched_node['properties']['ratings']))
|
||||
|
||||
def test_upvote_twice(self):
|
||||
# Both tests check for rating_positive=1
|
||||
self.test_upvote_other_comment()
|
||||
self.test_upvote_other_comment()
|
||||
|
||||
def test_downvote_other_comment(self):
|
||||
# Patch the node
|
||||
res = self.patch(self.node_url,
|
||||
json={'op': 'downvote'},
|
||||
auth_token='token').json()
|
||||
self.assertEqual(0, res['properties']['rating_positive'])
|
||||
self.assertEqual(1, res['properties']['rating_negative'])
|
||||
|
||||
# Get the node again, to inspect its changed state.
|
||||
patched_node = self.get(self.node_url, auth_token='token').json()
|
||||
self.assertEqual(0, patched_node['properties']['rating_positive'])
|
||||
self.assertEqual(1, patched_node['properties']['rating_negative'])
|
||||
self.assertEqual({u'user': str(self.user_id), u'is_positive': False},
|
||||
patched_node['properties']['ratings'][0])
|
||||
self.assertEqual(1, len(patched_node['properties']['ratings']))
|
||||
|
||||
def test_downvote_twice(self):
|
||||
# Both tests check for rating_negative=1
|
||||
self.test_downvote_other_comment()
|
||||
self.test_downvote_other_comment()
|
||||
|
||||
def test_up_then_downvote(self):
|
||||
self.test_upvote_other_comment()
|
||||
self.test_downvote_other_comment()
|
||||
|
||||
def test_down_then_upvote(self):
|
||||
self.test_downvote_other_comment()
|
||||
self.test_upvote_other_comment()
|
||||
|
||||
def test_down_then_up_then_downvote(self):
|
||||
self.test_downvote_other_comment()
|
||||
self.test_upvote_other_comment()
|
||||
self.test_downvote_other_comment()
|
||||
|
||||
def test_revoke_noop(self):
|
||||
# Patch the node
|
||||
self.patch(self.node_url,
|
||||
json={'op': 'revoke'},
|
||||
auth_token='token')
|
||||
|
||||
# Get the node again, to inspect its changed state.
|
||||
patched_node = self.get(self.node_url, auth_token='token').json()
|
||||
self.assertEqual(0, patched_node['properties']['rating_positive'])
|
||||
self.assertEqual(0, patched_node['properties']['rating_negative'])
|
||||
self.assertEqual([], patched_node['properties'].get('ratings', []))
|
||||
|
||||
def test_revoke_upvote(self):
|
||||
self.test_upvote_other_comment()
|
||||
self.test_revoke_noop()
|
||||
|
||||
def test_revoke_downvote(self):
|
||||
self.test_downvote_other_comment()
|
||||
self.test_revoke_noop()
|
||||
|
||||
def test_with_other_users(self):
|
||||
# Generate a bunch of users
|
||||
other_user_ids = []
|
||||
for idx in range(5):
|
||||
uid = self.create_user(user_id=24 * str(idx))
|
||||
other_user_ids.append(uid)
|
||||
self.create_valid_auth_token(uid, 'other-token-%i' % idx)
|
||||
|
||||
# Let them all vote positive.
|
||||
for idx in range(5):
|
||||
self.patch(self.node_url,
|
||||
json={'op': 'upvote'},
|
||||
auth_token='other-token-%i' % idx)
|
||||
|
||||
# Use our standard user to downvote (the negative nancy)
|
||||
self.patch(self.node_url,
|
||||
json={'op': 'downvote'},
|
||||
auth_token='token')
|
||||
|
||||
# Let one of the other users revoke
|
||||
self.patch(self.node_url,
|
||||
json={'op': 'revoke'},
|
||||
auth_token='other-token-2')
|
||||
|
||||
# And another user downvotes to override their previous upvote
|
||||
self.patch(self.node_url,
|
||||
json={'op': 'downvote'},
|
||||
auth_token='other-token-4')
|
||||
|
||||
# Inspect the result
|
||||
patched_node = self.get(self.node_url, auth_token='token').json()
|
||||
self.assertEqual(3, patched_node['properties']['rating_positive'])
|
||||
self.assertEqual(2, patched_node['properties']['rating_negative'])
|
||||
self.assertEqual([
|
||||
{u'user': unicode(other_user_ids[0]), u'is_positive': True},
|
||||
{u'user': unicode(other_user_ids[1]), u'is_positive': True},
|
||||
{u'user': unicode(other_user_ids[3]), u'is_positive': True},
|
||||
{u'user': unicode(other_user_ids[4]), u'is_positive': False},
|
||||
{u'user': unicode(self.user_id), u'is_positive': False},
|
||||
], patched_node['properties'].get('ratings', []))
|
573
tests/test_api/test_project_management.py
Normal file
573
tests/test_api/test_project_management.py
Normal file
@@ -0,0 +1,573 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
"""Unit tests for creating and editing projects_blueprint."""
|
||||
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
from bson import ObjectId
|
||||
from pillar.tests import AbstractPillarTest
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AbstractProjectTest(AbstractPillarTest):
|
||||
def _create_user_with_token(self, roles, token, user_id='cafef00df00df00df00df00d'):
|
||||
user_id = self.create_user(roles=roles, user_id=user_id)
|
||||
self.create_valid_auth_token(user_id, token)
|
||||
return user_id
|
||||
|
||||
def _create_project(self, project_name, token):
|
||||
resp = self.client.post('/api/p/create',
|
||||
headers={'Authorization': self.make_header(token)},
|
||||
data={'project_name': project_name})
|
||||
return resp
|
||||
|
||||
def _create_user_and_project(self, roles, user_id='cafef00df00df00df00df00d', token='token',
|
||||
project_name=u'Prøject El Niño'):
|
||||
self._create_user_with_token(roles, token, user_id=user_id)
|
||||
resp = self._create_project(project_name, token)
|
||||
|
||||
self.assertEqual(201, resp.status_code, resp.data)
|
||||
project = json.loads(resp.data)
|
||||
|
||||
return project
|
||||
|
||||
|
||||
class ProjectCreationTest(AbstractProjectTest):
|
||||
def test_project_creation_wrong_role(self):
|
||||
self._create_user_with_token([u'whatever'], 'token')
|
||||
resp = self._create_project(u'Prøject El Niño', 'token')
|
||||
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
# Test that the project wasn't created.
|
||||
with self.app.test_request_context():
|
||||
projects = self.app.data.driver.db['projects']
|
||||
self.assertEqual(0, len(list(projects.find())))
|
||||
|
||||
def test_project_creation_good_role(self):
|
||||
user_id = self._create_user_with_token([u'subscriber'], 'token')
|
||||
resp = self._create_project(u'Prøject El Niño', 'token')
|
||||
self.assertEqual(201, resp.status_code)
|
||||
|
||||
# The response of a POST is the entire project, but we'll test a GET on
|
||||
# the returned Location nevertheless.
|
||||
project_info = json.loads(resp.data.decode('utf-8'))
|
||||
project_id = project_info['_id']
|
||||
|
||||
# Test that the Location header contains the location of the project document.
|
||||
self.assertEqual('http://localhost/api/projects/%s' % project_id,
|
||||
resp.headers['Location'])
|
||||
|
||||
# GET the project from the URL in the Location header to see if that works too.
|
||||
auth_header = {'Authorization': self.make_header('token')}
|
||||
resp = self.client.get(resp.headers['Location'], headers=auth_header)
|
||||
project = json.loads(resp.data.decode('utf-8'))
|
||||
project_id = project['_id']
|
||||
|
||||
# Check some of the more complex/interesting fields.
|
||||
self.assertEqual(u'Prøject El Niño', project['name'])
|
||||
self.assertEqual(str(user_id), project['user'])
|
||||
self.assertEqual('p-%s' % project_id, project['url'])
|
||||
self.assertEqual(1, len(project['permissions']['groups']))
|
||||
|
||||
# Check the etag
|
||||
resp = self.client.get('/api/projects/%s' % project_id, headers=auth_header)
|
||||
from_db = json.loads(resp.data)
|
||||
self.assertEqual(from_db['_etag'], project['_etag'])
|
||||
|
||||
group_id = ObjectId(project['permissions']['groups'][0]['group'])
|
||||
|
||||
# Check that there is a group for the project, and that the user is member of it.
|
||||
with self.app.test_request_context():
|
||||
groups = self.app.data.driver.db['groups']
|
||||
users = self.app.data.driver.db['users']
|
||||
|
||||
group = groups.find_one(group_id)
|
||||
db_user = users.find_one(user_id)
|
||||
|
||||
self.assertEqual(str(project_id), group['name'])
|
||||
self.assertIn(group_id, db_user['groups'])
|
||||
|
||||
def test_project_creation_access_admin(self):
|
||||
"""Admin-created projects should be public"""
|
||||
|
||||
proj = self._create_user_and_project(roles={u'admin'})
|
||||
self.assertEqual(['GET'], proj['permissions']['world'])
|
||||
|
||||
def test_project_creation_access_subscriber(self):
|
||||
"""Subscriber-created projects should be private"""
|
||||
|
||||
proj = self._create_user_and_project(roles={u'subscriber'})
|
||||
self.assertEqual([], proj['permissions']['world'])
|
||||
self.assertTrue(proj['is_private'])
|
||||
|
||||
# Also check the database contents
|
||||
with self.app.test_request_context():
|
||||
project_id = ObjectId(proj['_id'])
|
||||
db_proj = self.app.data.driver.db['projects'].find_one(project_id)
|
||||
self.assertEqual([], db_proj['permissions']['world'])
|
||||
self.assertTrue(db_proj['is_private'])
|
||||
|
||||
def test_project_list(self):
|
||||
"""Test that we get an empty list when querying for non-existing projects, instead of 403"""
|
||||
|
||||
proj_a = self._create_user_and_project(user_id=24 * 'a',
|
||||
roles={u'subscriber'},
|
||||
project_name=u'Prøject A',
|
||||
token='token-a')
|
||||
proj_b = self._create_user_and_project(user_id=24 * 'b',
|
||||
roles={u'subscriber'},
|
||||
project_name=u'Prøject B',
|
||||
token='token-b')
|
||||
|
||||
# Assertion: each user must have access to their own project.
|
||||
resp = self.client.get('/api/projects/%s' % proj_a['_id'],
|
||||
headers={'Authorization': self.make_header('token-a')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
resp = self.client.get('/api/projects/%s' % proj_b['_id'],
|
||||
headers={'Authorization': self.make_header('token-b')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Getting a project list should return projects you have access to.
|
||||
resp = self.client.get('/api/projects',
|
||||
headers={'Authorization': self.make_header('token-a')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
proj_list = json.loads(resp.data)
|
||||
self.assertEqual({u'Prøject A'}, {p['name'] for p in proj_list['_items']})
|
||||
|
||||
resp = self.client.get('/api/projects',
|
||||
headers={'Authorization': self.make_header('token-b')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
proj_list = json.loads(resp.data)
|
||||
self.assertEqual({u'Prøject B'}, {p['name'] for p in proj_list['_items']})
|
||||
|
||||
# No access to anything for user C, should result in empty list.
|
||||
self._create_user_with_token(roles={u'subscriber'}, token='token-c', user_id=12 * 'c')
|
||||
resp = self.client.get('/api/projects',
|
||||
headers={'Authorization': self.make_header('token-c')})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
proj_list = json.loads(resp.data)
|
||||
self.assertEqual([], proj_list['_items'])
|
||||
|
||||
|
||||
class ProjectEditTest(AbstractProjectTest):
|
||||
def test_editing_as_subscriber(self):
|
||||
"""Test that we can set certain fields, but not all."""
|
||||
|
||||
from pillar.api.utils import remove_private_keys, PillarJSONEncoder
|
||||
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
|
||||
|
||||
project_info = self._create_user_and_project([u'subscriber'])
|
||||
project_url = '/api/projects/%(_id)s' % project_info
|
||||
|
||||
resp = self.client.get(project_url,
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
project = json.loads(resp.data.decode('utf-8'))
|
||||
|
||||
# Create another user we can try and assign the project to.
|
||||
other_user_id = 'f00dd00df00dd00df00dd00d'
|
||||
self._create_user_with_token(['subscriber'], 'other-token', user_id=other_user_id)
|
||||
|
||||
# Unauthenticated should be forbidden
|
||||
resp = self.client.put('/api/projects/%s' % project['_id'],
|
||||
data=dumps(remove_private_keys(project)),
|
||||
headers={'Content-Type': 'application/json'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
# Regular user should be able to PUT, but only be able to edit certain fields.
|
||||
put_project = remove_private_keys(project)
|
||||
put_project['url'] = u'very-offensive-url'
|
||||
put_project['description'] = u'Blender je besplatan set alata za izradu interaktivnog 3D ' \
|
||||
u'sadržaja pod različitim operativnim sustavima.'
|
||||
put_project['name'] = u'โครงการปั่นเมฆ'
|
||||
put_project['summary'] = u'Это переведена на Google'
|
||||
put_project['status'] = 'pending'
|
||||
put_project['category'] = 'software'
|
||||
put_project['user'] = other_user_id
|
||||
|
||||
# Try making the project public. This should update is_private as well.
|
||||
put_project['permissions']['world'] = ['GET']
|
||||
|
||||
resp = self.client.put(project_url,
|
||||
data=dumps(put_project),
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': project['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Re-fetch from database to see which fields actually made it there.
|
||||
# equal to put_project -> changed in DB
|
||||
# equal to project -> not changed in DB
|
||||
resp = self.client.get(project_url,
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
db_proj = json.loads(resp.data)
|
||||
self.assertEqual(project['url'], db_proj['url'])
|
||||
self.assertEqual(put_project['description'], db_proj['description'])
|
||||
self.assertEqual(put_project['name'], db_proj['name'])
|
||||
self.assertEqual(put_project['summary'], db_proj['summary'])
|
||||
self.assertEqual(project['status'], db_proj['status'])
|
||||
self.assertEqual(project['category'], db_proj['category'])
|
||||
|
||||
# Project should be consistent.
|
||||
self.assertEqual(False, db_proj['is_private'])
|
||||
self.assertEqual(['GET'], db_proj['permissions']['world'])
|
||||
|
||||
def test_editing_as_admin(self):
|
||||
"""Test that we can set all fields as admin."""
|
||||
|
||||
from pillar.api.utils import remove_private_keys, PillarJSONEncoder
|
||||
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
|
||||
|
||||
project_info = self._create_user_and_project([u'subscriber', u'admin'])
|
||||
project_url = '/api/projects/%(_id)s' % project_info
|
||||
|
||||
resp = self.client.get(project_url)
|
||||
project = json.loads(resp.data.decode('utf-8'))
|
||||
|
||||
# Create another user we can try and assign the project to.
|
||||
other_user_id = 'f00dd00df00dd00df00dd00d'
|
||||
self._create_user_with_token(['subscriber'], 'other-token', user_id=other_user_id)
|
||||
|
||||
# Admin user should be able to PUT everything.
|
||||
put_project = remove_private_keys(project)
|
||||
put_project['url'] = u'very-offensive-url'
|
||||
put_project['description'] = u'Blender je besplatan set alata za izradu interaktivnog 3D ' \
|
||||
u'sadržaja pod različitim operativnim sustavima.'
|
||||
put_project['name'] = u'โครงการปั่นเมฆ'
|
||||
put_project['summary'] = u'Это переведена на Google'
|
||||
put_project['is_private'] = False
|
||||
put_project['status'] = 'pending'
|
||||
put_project['category'] = 'software'
|
||||
put_project['user'] = other_user_id
|
||||
|
||||
resp = self.client.put(project_url,
|
||||
data=dumps(put_project),
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': project['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Re-fetch from database to see which fields actually made it there.
|
||||
# equal to put_project -> changed in DB
|
||||
# equal to project -> not changed in DB
|
||||
resp = self.client.get('/api/projects/%s' % project['_id'])
|
||||
db_proj = json.loads(resp.data)
|
||||
self.assertEqual(put_project['url'], db_proj['url'])
|
||||
self.assertEqual(put_project['description'], db_proj['description'])
|
||||
self.assertEqual(put_project['name'], db_proj['name'])
|
||||
self.assertEqual(put_project['summary'], db_proj['summary'])
|
||||
self.assertEqual(put_project['is_private'], db_proj['is_private'])
|
||||
self.assertEqual(put_project['status'], db_proj['status'])
|
||||
self.assertEqual(put_project['category'], db_proj['category'])
|
||||
self.assertEqual(put_project['user'], db_proj['user'])
|
||||
|
||||
def test_edits_by_nonowner_admin(self):
|
||||
"""Any admin should be able to edit any project."""
|
||||
|
||||
from pillar.api.utils import remove_private_keys, PillarJSONEncoder
|
||||
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
|
||||
|
||||
# Create test project.
|
||||
project = self._create_user_and_project([u'subscriber'])
|
||||
project_id = project['_id']
|
||||
project_url = '/api/projects/%s' % project_id
|
||||
|
||||
# Create test user.
|
||||
self._create_user_with_token(['admin'], 'admin-token', user_id='cafef00dbeef')
|
||||
|
||||
# Admin user should be able to PUT.
|
||||
put_project = remove_private_keys(project)
|
||||
put_project['name'] = u'โครงการปั่นเมฆ'
|
||||
|
||||
resp = self.client.put(project_url,
|
||||
data=dumps(put_project),
|
||||
headers={'Authorization': self.make_header('admin-token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': project['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
def test_edits_by_nonowner_subscriber(self):
|
||||
"""A subscriber should only be able to edit their own projects."""
|
||||
|
||||
from pillar.api.utils import remove_private_keys, PillarJSONEncoder
|
||||
dumps = functools.partial(json.dumps, cls=PillarJSONEncoder)
|
||||
|
||||
# Create test project.
|
||||
project = self._create_user_and_project([u'subscriber'])
|
||||
project_id = project['_id']
|
||||
project_url = '/api/projects/%s' % project_id
|
||||
|
||||
# Create test user.
|
||||
my_user_id = 'cafef00dbeefcafef00dbeef'
|
||||
self._create_user_with_token(['subscriber'], 'mortal-token', user_id=my_user_id)
|
||||
|
||||
# Regular subscriber should not be able to do this.
|
||||
put_project = remove_private_keys(project)
|
||||
put_project['name'] = u'Болту́н -- нахо́дка для шпио́на.'
|
||||
put_project['user'] = my_user_id
|
||||
resp = self.client.put(project_url,
|
||||
data=dumps(put_project),
|
||||
headers={'Authorization': self.make_header('mortal-token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': project['_etag']})
|
||||
self.assertEqual(403, resp.status_code, resp.data)
|
||||
|
||||
def test_delete_by_admin(self):
|
||||
# Create public test project.
|
||||
project_info = self._create_user_and_project([u'admin'])
|
||||
project_id = project_info['_id']
|
||||
project_url = '/api/projects/%s' % project_id
|
||||
|
||||
# Create admin user that doesn't own the project, to check that
|
||||
# non-owner admins can delete projects too.
|
||||
self._create_user_with_token(['admin'], 'admin-token', user_id='cafef00dbeef')
|
||||
|
||||
# Admin user should be able to DELETE.
|
||||
resp = self.client.delete(project_url,
|
||||
headers={'Authorization': self.make_header('admin-token'),
|
||||
'If-Match': project_info['_etag']})
|
||||
self.assertEqual(204, resp.status_code, resp.data)
|
||||
|
||||
# Check that the project is gone.
|
||||
resp = self.client.get(project_url)
|
||||
self.assertEqual(404, resp.status_code, resp.data)
|
||||
|
||||
# ... but we should still get it in the body.
|
||||
db_proj = json.loads(resp.data)
|
||||
self.assertEqual(u'Prøject El Niño', db_proj['name'])
|
||||
self.assertTrue(db_proj['_deleted'])
|
||||
|
||||
# Querying for deleted projects should include it.
|
||||
# TODO: limit this to admin users only.
|
||||
# Also see http://python-eve.org/features.html#soft-delete
|
||||
projection = json.dumps({'name': 1, 'permissions': 1})
|
||||
where = json.dumps({'_deleted': True}) # MUST be True, 1 does not work.
|
||||
resp = self.client.get('/api/projects?where=%s&projection=%s' %
|
||||
(urllib.quote(where), urllib.quote(projection)))
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
projlist = json.loads(resp.data)
|
||||
self.assertEqual(1, projlist['_meta']['total'])
|
||||
self.assertEqual(u'Prøject El Niño', projlist['_items'][0]['name'])
|
||||
|
||||
def test_delete_by_subscriber(self):
|
||||
# Create test project.
|
||||
project_info = self._create_user_and_project([u'subscriber'])
|
||||
project_id = project_info['_id']
|
||||
project_url = '/api/projects/%s' % project_id
|
||||
|
||||
# Create test user.
|
||||
self._create_user_with_token(['subscriber'], 'mortal-token', user_id='cafef00dbeef')
|
||||
|
||||
# Other user should NOT be able to DELETE.
|
||||
resp = self.client.delete(project_url,
|
||||
headers={'Authorization': self.make_header('mortal-token'),
|
||||
'If-Match': project_info['_etag']})
|
||||
self.assertEqual(403, resp.status_code, resp.data)
|
||||
|
||||
# Owner should be able to DELETE
|
||||
resp = self.client.delete(project_url,
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'If-Match': project_info['_etag']})
|
||||
self.assertEqual(204, resp.status_code, resp.data)
|
||||
|
||||
|
||||
class ProjectNodeAccess(AbstractProjectTest):
|
||||
def setUp(self, **kwargs):
|
||||
super(ProjectNodeAccess, self).setUp(**kwargs)
|
||||
|
||||
from pillar.api.utils import PillarJSONEncoder
|
||||
|
||||
# Project is created by regular subscriber, so should be private.
|
||||
self.user_id = self._create_user_with_token([u'subscriber'], 'token')
|
||||
resp = self._create_project(u'Prøject El Niño', 'token')
|
||||
self.assertEqual(201, resp.status_code)
|
||||
self.assertEqual('application/json', resp.mimetype)
|
||||
self.project = json.loads(resp.data)
|
||||
self.project_id = ObjectId(self.project['_id'])
|
||||
|
||||
self.other_user_id = self._create_user_with_token([u'subscriber'], 'other-token',
|
||||
user_id='deadbeefdeadbeefcafef00d')
|
||||
|
||||
self.test_node = {
|
||||
'description': '',
|
||||
'node_type': 'asset',
|
||||
'user': self.user_id,
|
||||
'properties': {
|
||||
'status': 'published',
|
||||
'content_type': 'image',
|
||||
},
|
||||
'name': 'Micak is a cool cat',
|
||||
'project': self.project_id,
|
||||
}
|
||||
|
||||
# Add a node to the project
|
||||
resp = self.client.post('/api/nodes',
|
||||
headers={'Authorization': self.make_header('token'),
|
||||
'Content-Type': 'application/json'},
|
||||
data=json.dumps(self.test_node, cls=PillarJSONEncoder),
|
||||
)
|
||||
self.assertEqual(201, resp.status_code, (resp.status_code, resp.data))
|
||||
self.node_info = json.loads(resp.data)
|
||||
self.node_id = self.node_info['_id']
|
||||
self.node_url = '/api/nodes/%s' % self.node_id
|
||||
|
||||
def test_node_access(self):
|
||||
"""Getting nodes should adhere to project access rules."""
|
||||
|
||||
# Getting the node as the project owner should work.
|
||||
resp = self.client.get(self.node_url,
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, (resp.status_code, resp.data))
|
||||
|
||||
# Getting the node as an outsider should not work.
|
||||
resp = self.client.get(self.node_url,
|
||||
headers={'Authorization': self.make_header('other-token')})
|
||||
self.assertEqual(403, resp.status_code, (resp.status_code, resp.data))
|
||||
|
||||
def test_node_resource_access(self):
|
||||
# The owner of the project should get the node.
|
||||
resp = self.client.get('/api/nodes',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, (resp.status_code, resp.data))
|
||||
listed_nodes = json.loads(resp.data)['_items']
|
||||
self.assertEquals(self.node_id, listed_nodes[0]['_id'])
|
||||
|
||||
# Listing all nodes should not include nodes from private projects.
|
||||
resp = self.client.get('/api/nodes',
|
||||
headers={'Authorization': self.make_header('other-token')})
|
||||
self.assertEqual(403, resp.status_code, (resp.status_code, resp.data))
|
||||
|
||||
def test_is_private_updated_by_world_permissions(self):
|
||||
"""For backward compatibility, is_private should reflect absence of world-GET"""
|
||||
|
||||
from pillar.api.utils import remove_private_keys, dumps
|
||||
|
||||
project_url = '/api/projects/%s' % self.project_id
|
||||
put_project = remove_private_keys(self.project)
|
||||
|
||||
# Create admin user.
|
||||
self._create_user_with_token(['admin'], 'admin-token', user_id='cafef00dbeef')
|
||||
|
||||
# Make the project public
|
||||
put_project['permissions']['world'] = ['GET'] # make public
|
||||
put_project['is_private'] = True # This should be overridden.
|
||||
|
||||
resp = self.client.put(project_url,
|
||||
data=dumps(put_project),
|
||||
headers={'Authorization': self.make_header('admin-token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': self.project['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
with self.app.test_request_context():
|
||||
projects = self.app.data.driver.db['projects']
|
||||
db_proj = projects.find_one(self.project_id)
|
||||
self.assertEqual(['GET'], db_proj['permissions']['world'])
|
||||
self.assertFalse(db_proj['is_private'])
|
||||
|
||||
# Make the project private
|
||||
put_project['permissions']['world'] = []
|
||||
|
||||
resp = self.client.put(project_url,
|
||||
data=dumps(put_project),
|
||||
headers={'Authorization': self.make_header('admin-token'),
|
||||
'Content-Type': 'application/json',
|
||||
'If-Match': db_proj['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
with self.app.test_request_context():
|
||||
projects = self.app.data.driver.db['projects']
|
||||
db_proj = projects.find_one(self.project_id)
|
||||
self.assertEqual([], db_proj['permissions']['world'])
|
||||
self.assertTrue(db_proj['is_private'])
|
||||
|
||||
def test_add_remove_user(self):
|
||||
from pillar.api.projects import utils as proj_utils
|
||||
from pillar.api.utils import dumps
|
||||
|
||||
project_mng_user_url = '/api/p/users'
|
||||
|
||||
# Use our API to add user to group
|
||||
payload = {
|
||||
'project_id': self.project_id,
|
||||
'user_id': self.other_user_id,
|
||||
'action': 'add'}
|
||||
|
||||
resp = self.client.post(project_mng_user_url,
|
||||
data=dumps(payload),
|
||||
content_type='application/json',
|
||||
headers={
|
||||
'Authorization': self.make_header('token'),
|
||||
'If-Match': self.project['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Check if the user is now actually member of the group.
|
||||
with self.app.test_request_context():
|
||||
users = self.app.data.driver.db['users']
|
||||
|
||||
db_user = users.find_one(self.other_user_id)
|
||||
admin_group = proj_utils.get_admin_group(self.project)
|
||||
|
||||
self.assertIn(admin_group['_id'], db_user['groups'])
|
||||
|
||||
# Update payload to remove the user we just added
|
||||
payload['action'] = 'remove'
|
||||
|
||||
resp = self.client.post(project_mng_user_url,
|
||||
data=dumps(payload),
|
||||
content_type='application/json',
|
||||
headers={
|
||||
'Authorization': self.make_header('token'),
|
||||
'If-Match': self.project['_etag']})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Check if the user is now actually removed from the group.
|
||||
with self.app.test_request_context():
|
||||
users = self.app.data.driver.db['users']
|
||||
|
||||
db_user = users.find_one(self.other_user_id)
|
||||
self.assertNotIn(admin_group['_id'], db_user['groups'])
|
||||
|
||||
def test_remove_self(self):
|
||||
"""Every user should be able to remove themselves from a project,
|
||||
regardless of permissions.
|
||||
"""
|
||||
|
||||
from pillar.api.projects import utils as proj_utils
|
||||
from pillar.api.utils import dumps
|
||||
|
||||
project_mng_user_url = '/api/p/users'
|
||||
|
||||
# Use our API to add user to group
|
||||
payload = {
|
||||
'project_id': self.project_id,
|
||||
'user_id': self.other_user_id,
|
||||
'action': 'add'}
|
||||
|
||||
resp = self.client.post(project_mng_user_url,
|
||||
data=dumps(payload),
|
||||
content_type='application/json',
|
||||
headers={'Authorization': self.make_header('token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Update payload to remove the user we just added, and call it as that user.
|
||||
payload['action'] = 'remove'
|
||||
|
||||
resp = self.client.post(project_mng_user_url,
|
||||
data=dumps(payload),
|
||||
content_type='application/json',
|
||||
headers={'Authorization': self.make_header('other-token')})
|
||||
self.assertEqual(200, resp.status_code, resp.data)
|
||||
|
||||
# Check if the user is now actually removed from the group.
|
||||
with self.app.test_request_context():
|
||||
users = self.app.data.driver.db['users']
|
||||
|
||||
db_user = users.find_one(self.other_user_id)
|
||||
admin_group = proj_utils.get_admin_group(self.project)
|
||||
self.assertNotIn(admin_group['_id'], db_user['groups'])
|
105
tests/test_api/test_service_badger.py
Normal file
105
tests/test_api/test_service_badger.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""Test badger service."""
|
||||
|
||||
from pillar.tests import AbstractPillarTest, TEST_EMAIL_ADDRESS
|
||||
|
||||
|
||||
class BadgerServiceTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
AbstractPillarTest.setUp(self, **kwargs)
|
||||
|
||||
from pillar.api import service
|
||||
|
||||
with self.app.test_request_context():
|
||||
self.badger, token_doc = service.create_service_account(
|
||||
'serviceaccount@example.com', [u'badger'],
|
||||
{u'badger': [u'succubus', u'subscriber', u'demo']}
|
||||
)
|
||||
self.badger_token = token_doc['token']
|
||||
|
||||
self.user_id = self.create_user()
|
||||
self.user_email = TEST_EMAIL_ADDRESS
|
||||
|
||||
def _post(self, data):
|
||||
from pillar.api.utils import dumps
|
||||
return self.client.post('/api/service/badger',
|
||||
data=dumps(data),
|
||||
headers={'Authorization': self.make_header(self.badger_token),
|
||||
'Content-Type': 'application/json'})
|
||||
|
||||
def test_grant_revoke_badge(self):
|
||||
# Grant the badge
|
||||
resp = self._post({'action': 'grant', 'user_email': self.user_email, 'role': 'succubus'})
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
||||
with self.app.test_request_context():
|
||||
user = self.app.data.driver.db['users'].find_one(self.user_id)
|
||||
self.assertIn(u'succubus', user['roles'])
|
||||
|
||||
# Aaaahhhw it's gone again
|
||||
resp = self._post({'action': 'revoke', 'user_email': self.user_email, 'role': 'succubus'})
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
||||
with self.app.test_request_context():
|
||||
user = self.app.data.driver.db['users'].find_one(self.user_id)
|
||||
self.assertNotIn(u'succubus', user['roles'])
|
||||
|
||||
def test_grant_not_allowed_badge(self):
|
||||
resp = self._post({'action': 'grant', 'user_email': self.user_email, 'role': 'admin'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
with self.app.test_request_context():
|
||||
user = self.app.data.driver.db['users'].find_one(self.user_id)
|
||||
self.assertNotIn(u'admin', user['roles'])
|
||||
|
||||
def test_group_membership(self):
|
||||
"""Certain roles are linked to certain groups."""
|
||||
|
||||
group_ids = self.create_standard_groups(additional_groups=['succubus'])
|
||||
|
||||
with self.app.test_request_context():
|
||||
def test_for_group(group_name, test=self.assertIn):
|
||||
# Assign the 'subscriber' role
|
||||
resp = self._post({'action': 'grant',
|
||||
'user_email': self.user_email,
|
||||
'role': group_name})
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
||||
# Check that the user is actually member of that group.
|
||||
user = self.app.data.driver.db['users'].find_one(self.user_id)
|
||||
test(group_ids[group_name], user['groups'])
|
||||
|
||||
# There are special groups for those. Also for admin, but if
|
||||
# it works for those, it also works for admin, and another test
|
||||
# case requires admin to be ingrantable.
|
||||
test_for_group('demo')
|
||||
test_for_group('subscriber')
|
||||
|
||||
# This role isn't linked to group membership.
|
||||
test_for_group('succubus', test=self.assertNotIn)
|
||||
|
||||
def test_project_groups(self):
|
||||
"""Projects groups should be maintained."""
|
||||
|
||||
group_ids = self.create_standard_groups()
|
||||
|
||||
with self.app.test_request_context():
|
||||
user_coll = self.app.data.driver.db['users']
|
||||
|
||||
def test_group_membership(expected_groups):
|
||||
user = user_coll.find_one(self.user_id)
|
||||
self.assertEqual(expected_groups, set(user['groups']))
|
||||
|
||||
# Fresh user, no roles.
|
||||
test_group_membership(set())
|
||||
|
||||
# Add some groups
|
||||
user_coll.update_one({'_id': self.user_id},
|
||||
{'$set': {'groups': ['project1', 'project2']}})
|
||||
test_group_membership({'project1', 'project2'})
|
||||
|
||||
# Grant subscriber role.
|
||||
resp = self._post({'action': 'grant',
|
||||
'user_email': self.user_email,
|
||||
'role': 'subscriber'})
|
||||
self.assertEqual(204, resp.status_code)
|
||||
test_group_membership({'project1', 'project2', group_ids['subscriber']})
|
30
tests/test_api/test_utils.py
Normal file
30
tests/test_api/test_utils.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
from bson import ObjectId
|
||||
from pillar.tests import AbstractPillarTest
|
||||
from werkzeug.exceptions import BadRequest
|
||||
|
||||
|
||||
class Str2idTest(AbstractPillarTest):
|
||||
def test_happy(self):
|
||||
from pillar.api.utils import str2id
|
||||
|
||||
def happy(str_id):
|
||||
self.assertEqual(ObjectId(str_id), str2id(str_id))
|
||||
|
||||
happy(24 * 'a')
|
||||
happy(12 * 'a')
|
||||
happy(u'577e23ad98377323f74c368c')
|
||||
|
||||
def test_unhappy(self):
|
||||
from pillar.api.utils import str2id
|
||||
|
||||
def unhappy(str_id):
|
||||
self.assertRaises(BadRequest, str2id, str_id)
|
||||
|
||||
unhappy(13 * 'a')
|
||||
unhappy(u'577e23ad 8377323f74c368c')
|
||||
unhappy(u'김치') # Kimchi
|
||||
unhappy('')
|
||||
unhappy(u'')
|
||||
unhappy(None)
|
Reference in New Issue
Block a user