Implemented merging of permissions.
Permissions are now merged between project, node type and node, instead of having the lower-level permissions override the higher-level permissions.
This commit is contained in:
parent
4dc5b4dbbf
commit
899497b3b1
@ -178,36 +178,6 @@ def validate_token_at_every_request():
|
||||
validate_token()
|
||||
|
||||
|
||||
def before_returning_item_permissions(response):
|
||||
# Run validation process, since GET on nodes entry point is public
|
||||
check_permissions(response, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def before_returning_resource_permissions(response):
|
||||
for item in response['_items']:
|
||||
check_permissions(item, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def project_node_type_has_method(response):
|
||||
"""Check for a specific request arg, and check generate the allowed_methods
|
||||
list for the required node_type.
|
||||
"""
|
||||
try:
|
||||
node_type_name = request.args['node_type']
|
||||
except KeyError:
|
||||
return
|
||||
# Proceed only node_type has been requested
|
||||
if node_type_name:
|
||||
# Look up the node type in the project document
|
||||
node_type = next(
|
||||
(item for item in response['node_types'] if item.get('name') \
|
||||
and item['name'] == node_type_name), None)
|
||||
if not node_type:
|
||||
return abort(404)
|
||||
# Check permissions and append the allowed_methods to the node_type
|
||||
check_permissions(node_type, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def before_returning_item_notifications(response):
|
||||
if request.args.get('parse'):
|
||||
notification_parse(response)
|
||||
@ -221,9 +191,6 @@ def before_returning_resource_notifications(response):
|
||||
|
||||
app.on_fetched_item_notifications += before_returning_item_notifications
|
||||
app.on_fetched_resource_notifications += before_returning_resource_notifications
|
||||
app.on_fetched_item_projects += before_returning_item_permissions
|
||||
app.on_fetched_item_projects += project_node_type_has_method
|
||||
app.on_fetched_resource_projects += before_returning_resource_permissions
|
||||
|
||||
|
||||
# The encoding module (receive notification and report progress)
|
||||
|
@ -66,7 +66,7 @@ def resource_parse_attachments(response):
|
||||
item_parse_attachments(item)
|
||||
|
||||
def before_replacing_node(item, original):
|
||||
check_permissions(original, 'PUT')
|
||||
check_permissions('nodes', original, 'PUT')
|
||||
update_file_name(item)
|
||||
|
||||
|
||||
@ -108,7 +108,7 @@ def before_inserting_nodes(items):
|
||||
return None
|
||||
|
||||
for item in items:
|
||||
check_permissions(item, 'POST')
|
||||
check_permissions('nodes', item, 'POST')
|
||||
if 'parent' in item and 'project' not in item:
|
||||
parent = nodes_collection.find_one({'_id': item['parent']})
|
||||
project = find_parent_project(parent)
|
||||
@ -188,12 +188,20 @@ def deduct_content_type(node_doc, original):
|
||||
node_doc['properties']['content_type'] = content_type
|
||||
|
||||
|
||||
def setup_app(app):
|
||||
from application import before_returning_item_permissions, before_returning_resource_permissions
|
||||
def before_returning_node_permissions(response):
|
||||
# Run validation process, since GET on nodes entry point is public
|
||||
check_permissions('nodes', response, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def before_returning_node_resource_permissions(response):
|
||||
for item in response['_items']:
|
||||
check_permissions('nodes', item, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def setup_app(app):
|
||||
# Permission hooks
|
||||
app.on_fetched_item_nodes += before_returning_item_permissions
|
||||
app.on_fetched_resource_nodes += before_returning_resource_permissions
|
||||
app.on_fetched_item_nodes += before_returning_node_permissions
|
||||
app.on_fetched_resource_nodes += before_returning_node_resource_permissions
|
||||
|
||||
app.on_fetched_item_nodes += item_parse_attachments
|
||||
app.on_fetched_resource_nodes += resource_parse_attachments
|
||||
|
@ -55,7 +55,7 @@ def before_edit_check_permissions(document, original):
|
||||
if user_has_role(u'admin'):
|
||||
return
|
||||
|
||||
check_permissions(original, request.method)
|
||||
check_permissions('projects', original, request.method)
|
||||
|
||||
|
||||
def before_delete_project(document):
|
||||
@ -66,7 +66,7 @@ def before_delete_project(document):
|
||||
if user_has_role(u'admin'):
|
||||
return
|
||||
|
||||
check_permissions(document, request.method)
|
||||
check_permissions('projects', document, request.method)
|
||||
|
||||
|
||||
def protect_sensitive_fields(document, original):
|
||||
@ -323,9 +323,9 @@ def project_quotas(project_id):
|
||||
|
||||
# Check that the user has GET permissions on the project itself.
|
||||
project = mongo_utils.find_one_or_404('projects', project_id)
|
||||
check_permissions(project, 'GET')
|
||||
check_permissions('projects', project, 'GET')
|
||||
|
||||
file_size_used = _project_total_file_size(project_id)
|
||||
file_size_used = project_total_file_size(project_id)
|
||||
|
||||
info = {
|
||||
'file_size_quota': None, # TODO: implement this later.
|
||||
@ -335,7 +335,7 @@ def project_quotas(project_id):
|
||||
return jsonify(info)
|
||||
|
||||
|
||||
def _project_total_file_size(project_id):
|
||||
def project_total_file_size(project_id):
|
||||
"""Returns the total number of bytes used by files of this project."""
|
||||
|
||||
files = current_app.data.driver.db['files']
|
||||
@ -347,7 +347,42 @@ def _project_total_file_size(project_id):
|
||||
])
|
||||
|
||||
# The aggregate function returns a cursor, not a document.
|
||||
try:
|
||||
return next(file_size_used)['all_files']
|
||||
except StopIteration:
|
||||
# No files used at all.
|
||||
return 0
|
||||
|
||||
|
||||
def before_returning_project_permissions(response):
|
||||
# Run validation process, since GET on nodes entry point is public
|
||||
check_permissions('projects', response, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def before_returning_project_resource_permissions(response):
|
||||
for item in response['_items']:
|
||||
check_permissions('projects', item, 'GET', append_allowed_methods=True)
|
||||
|
||||
|
||||
def project_node_type_has_method(response):
|
||||
"""Check for a specific request arg, and check generate the allowed_methods
|
||||
list for the required node_type.
|
||||
"""
|
||||
|
||||
node_type_name = request.args.get('node_type', '')
|
||||
|
||||
# Proceed only node_type has been requested
|
||||
if not node_type_name:
|
||||
return
|
||||
|
||||
# Look up the node type in the project document
|
||||
if not any(node_type.get('name') == node_type_name
|
||||
for node_type in response['node_types']):
|
||||
return abort(404)
|
||||
|
||||
# Check permissions and append the allowed_methods to the node_type
|
||||
check_permissions('project', response, 'GET', append_allowed_methods=True,
|
||||
check_node_type=node_type_name)
|
||||
|
||||
|
||||
def setup_app(app, url_prefix):
|
||||
@ -361,4 +396,9 @@ def setup_app(app, url_prefix):
|
||||
app.on_insert_projects += before_inserting_override_is_private_field
|
||||
app.on_insert_projects += before_inserting_projects
|
||||
app.on_inserted_projects += after_inserting_projects
|
||||
|
||||
app.on_fetched_item_projects += before_returning_project_permissions
|
||||
app.on_fetched_resource_projects += before_returning_project_resource_permissions
|
||||
app.on_fetched_item_projects += project_node_type_has_method
|
||||
|
||||
app.register_blueprint(blueprint, url_prefix=url_prefix)
|
||||
|
@ -1,54 +1,45 @@
|
||||
import logging
|
||||
import functools
|
||||
|
||||
from bson import ObjectId
|
||||
from flask import g
|
||||
from flask import abort
|
||||
from flask import current_app
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
CHECK_PERMISSIONS_IMPLEMENTED_FOR = {'projects', 'nodes'}
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def check_permissions(resource, method, append_allowed_methods=False):
|
||||
def check_permissions(collection_name, resource, method, append_allowed_methods=False,
|
||||
check_node_type=None):
|
||||
"""Check user permissions to access a node. We look up node permissions from
|
||||
world to groups to users and match them with the computed user permissions.
|
||||
If there is not match, we raise 403.
|
||||
|
||||
:param collection_name: name of the collection the resource comes from.
|
||||
:param resource: resource from MongoDB
|
||||
:type resource: dict
|
||||
:param method: name of the requested HTTP method
|
||||
:param append_allowed_methods: whether to return the list of allowed methods
|
||||
in the resource. Only valid when method='GET'.
|
||||
:param check_node_type: node type to check. Only valid when collection_name='projects'.
|
||||
:type check_node_type: str
|
||||
"""
|
||||
if method != 'GET' and append_allowed_methods:
|
||||
|
||||
# Check some input values.
|
||||
if collection_name not in CHECK_PERMISSIONS_IMPLEMENTED_FOR:
|
||||
raise ValueError('check_permission only implemented for %s, not for %s',
|
||||
CHECK_PERMISSIONS_IMPLEMENTED_FOR, collection_name)
|
||||
|
||||
if append_allowed_methods and method != 'GET':
|
||||
raise ValueError("append_allowed_methods only allowed with 'GET' method")
|
||||
|
||||
current_user = g.current_user
|
||||
if check_node_type is not None and collection_name != 'projects':
|
||||
raise ValueError('check_node_type parameter is only valid for checking projects.')
|
||||
|
||||
if 'permissions' in resource:
|
||||
# If permissions are embedded in the node (this overrides any other
|
||||
# matching permission originally set at node_type level)
|
||||
resource_permissions = resource['permissions']
|
||||
else:
|
||||
resource_permissions = {}
|
||||
if 'node_type' in resource:
|
||||
if type(resource['node_type']) is dict:
|
||||
# If the node_type is embedded in the document, extract permissions
|
||||
# from there
|
||||
computed_permissions = resource['node_type']['permissions']
|
||||
else:
|
||||
# If the node_type is referenced with an ObjectID (was not embedded
|
||||
# on request) query for if from the database and get the permissions
|
||||
|
||||
# node_types_collection = app.data.driver.db['node_types']
|
||||
# node_type = node_types_collection.find_one(resource['node_type'])
|
||||
|
||||
if type(resource['project']) is dict:
|
||||
project = resource['project']
|
||||
else:
|
||||
projects_collection = current_app.data.driver.db['projects']
|
||||
project = projects_collection.find_one(resource['project'])
|
||||
node_type = next(
|
||||
(item for item in project['node_types'] if item.get('name') \
|
||||
and item['name'] == resource['node_type']), None)
|
||||
computed_permissions = node_type['permissions']
|
||||
else:
|
||||
computed_permissions = {}
|
||||
|
||||
# Override computed_permissions if override is provided
|
||||
computed_permissions.update(resource_permissions)
|
||||
computed_permissions = compute_aggr_permissions(collection_name, resource, check_node_type)
|
||||
|
||||
if not computed_permissions:
|
||||
log.info('No permissions available to compute for %s on resource %r',
|
||||
@ -58,13 +49,14 @@ def check_permissions(resource, method, append_allowed_methods=False):
|
||||
# Accumulate allowed methods from the user, group and world level.
|
||||
allowed_methods = set()
|
||||
|
||||
current_user = g.current_user
|
||||
if current_user:
|
||||
# If the user is authenticated, proceed to compare the group permissions
|
||||
for permission in computed_permissions['groups']:
|
||||
for permission in computed_permissions.get('groups', []):
|
||||
if permission['group'] in current_user['groups']:
|
||||
allowed_methods.update(permission['methods'])
|
||||
|
||||
for permission in computed_permissions['users']:
|
||||
for permission in computed_permissions.get('users', []):
|
||||
if current_user['user_id'] == permission['user']:
|
||||
allowed_methods.update(permission['methods'])
|
||||
|
||||
@ -84,6 +76,112 @@ def check_permissions(resource, method, append_allowed_methods=False):
|
||||
abort(403)
|
||||
|
||||
|
||||
def compute_aggr_permissions(collection_name, resource, check_node_type):
|
||||
"""Returns a permissions dict."""
|
||||
|
||||
projects_collection = current_app.data.driver.db['projects']
|
||||
|
||||
# We always need the know the project.
|
||||
if collection_name == 'projects':
|
||||
project = resource
|
||||
if check_node_type is None:
|
||||
return project['permissions']
|
||||
node_type_name = check_node_type
|
||||
else:
|
||||
# Not a project, so it's a node.
|
||||
assert 'project' in resource
|
||||
assert 'node_type' in resource
|
||||
|
||||
node_type_name = resource['node_type']
|
||||
|
||||
if isinstance(resource['project'], dict):
|
||||
# embedded project
|
||||
project = resource['project']
|
||||
else:
|
||||
project = projects_collection.find_one(
|
||||
ObjectId(resource['project']),
|
||||
{'permissions': 1,
|
||||
'node_types': {'$elemMatch': {'name': node_type_name}},
|
||||
'node_types.name': 1,
|
||||
'node_types.permissions': 1})
|
||||
|
||||
# Every node should have a project.
|
||||
if project is None:
|
||||
log.warning('Resource %s from "%s" refers to a project that does not exist.',
|
||||
resource['_id'], collection_name)
|
||||
raise Forbidden()
|
||||
|
||||
project_permissions = project['permissions']
|
||||
|
||||
# Find the node type from the project.
|
||||
node_type = next((node_type for node_type in project['node_types']
|
||||
if node_type['name'] == node_type_name), None)
|
||||
if node_type is None: # This node type is not known, so doesn't give permissions.
|
||||
node_type_permissions = {}
|
||||
else:
|
||||
node_type_permissions = node_type.get('permissions', {})
|
||||
|
||||
# For projects or specific node types in projects, we're done now.
|
||||
if collection_name == 'projects':
|
||||
return merge_permissions(project_permissions, node_type_permissions)
|
||||
|
||||
node_permissions = resource.get('permissions', {})
|
||||
return merge_permissions(project_permissions, node_type_permissions, node_permissions)
|
||||
|
||||
|
||||
def merge_permissions(*args):
|
||||
"""Merges all given permissions.
|
||||
|
||||
:param args: list of {'user': ..., 'group': ..., 'world': ...} dicts.
|
||||
:returns: combined list of permissions.
|
||||
"""
|
||||
|
||||
if not args:
|
||||
return {}
|
||||
|
||||
if len(args) == 1:
|
||||
return args[0]
|
||||
|
||||
effective = {}
|
||||
|
||||
# When testing we want stable results, and not be dependent on PYTHONHASH values etc.
|
||||
if current_app.config['TESTING']:
|
||||
maybe_sorted = sorted
|
||||
else:
|
||||
def maybe_sorted(arg):
|
||||
return arg
|
||||
|
||||
def merge(field_name):
|
||||
plural_name = field_name + 's'
|
||||
|
||||
from0 = args[0].get(plural_name, [])
|
||||
from1 = args[1].get(plural_name, [])
|
||||
|
||||
asdict0 = {permission[field_name]: permission['methods'] for permission in from0}
|
||||
asdict1 = {permission[field_name]: permission['methods'] for permission in from1}
|
||||
|
||||
keys = set(asdict0.keys() + asdict1.keys())
|
||||
for user_id in maybe_sorted(keys):
|
||||
methods = maybe_sorted(set(asdict0.get(user_id, []) + asdict1.get(user_id, [])))
|
||||
effective.setdefault(plural_name, []).append({field_name: user_id, u'methods': methods})
|
||||
|
||||
merge(u'user')
|
||||
merge(u'group')
|
||||
|
||||
# Gather permissions for world
|
||||
world0 = args[0].get('world', [])
|
||||
world1 = args[1].get('world', [])
|
||||
world_methods = set(world0 + world1)
|
||||
if world_methods:
|
||||
effective[u'world'] = maybe_sorted(world_methods)
|
||||
|
||||
# Recurse for longer merges
|
||||
if len(args) > 2:
|
||||
return merge_permissions(effective, *args[2:])
|
||||
|
||||
return effective
|
||||
|
||||
|
||||
def require_login(require_roles=set()):
|
||||
"""Decorator that enforces users to authenticate.
|
||||
|
||||
@ -108,7 +206,9 @@ def require_login(require_roles=set()):
|
||||
abort(403)
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
|
@ -28,7 +28,6 @@ EXAMPLE_FILE = {u'_id': ObjectId('5672e2c1c379cf0007b31995'),
|
||||
u'link': 'http://localhost:8002/file',
|
||||
u'link_expires': datetime.datetime(2016, 3, 22, 9, 28, 22, tzinfo=tz_util.utc)}
|
||||
|
||||
|
||||
EXAMPLE_PROJECT = {
|
||||
u'_created': datetime.datetime(2015, 12, 17, 13, 22, 56, tzinfo=tz_util.utc),
|
||||
u'_etag': u'cc4643e98d3606f87bbfaaa200bfbae941b642f3',
|
||||
@ -116,7 +115,7 @@ EXAMPLE_PROJECT = {
|
||||
u'permissions': {u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'GET', u'PUT', u'POST']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'GET']},
|
||||
u'methods': [u'DELETE', u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'users': [],
|
||||
@ -270,7 +269,7 @@ EXAMPLE_PROJECT = {
|
||||
u'organization': ObjectId('55a99fb43004867fb9934f01'),
|
||||
u'owners': {u'groups': [], u'users': []},
|
||||
u'permissions': {u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'GET', u'PUT', u'POST']}],
|
||||
u'methods': [u'GET', u'POST', u'PUT']}],
|
||||
u'users': [],
|
||||
u'world': [u'GET']},
|
||||
u'picture_header': ObjectId('5673f260c379cf0007b31bc4'),
|
||||
@ -279,3 +278,21 @@ EXAMPLE_PROJECT = {
|
||||
u'summary': u'Texture collection from all Blender Institute open projects.',
|
||||
u'url': u'textures',
|
||||
u'user': ObjectId('552b066b41acdf5dec4436f2')}
|
||||
|
||||
EXAMPLE_NODE = {
|
||||
u'_id': ObjectId('572761099837730efe8e120d'),
|
||||
u'picture': ObjectId('572761f39837730efe8e1210'),
|
||||
u'description': u'',
|
||||
u'node_type': u'asset',
|
||||
u'user': ObjectId('57164ca1983773118cbaf779'),
|
||||
u'properties': {
|
||||
u'status': u'published',
|
||||
u'content_type': u'image',
|
||||
u'file': ObjectId('572761129837730efe8e120e')
|
||||
},
|
||||
u'_updated': datetime.datetime(2016, 5, 2, 14, 19, 58, 0, tzinfo=tz_util.utc),
|
||||
u'name': u'Image test',
|
||||
u'project': EXAMPLE_PROJECT_ID,
|
||||
u'_created': datetime.datetime(2016, 5, 2, 14, 19, 37, 0, tzinfo=tz_util.utc),
|
||||
u'_etag': u'6b8589b42c880e3626f43f3e82a5c5b946742687'
|
||||
}
|
||||
|
@ -1,11 +1,15 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import responses
|
||||
import json
|
||||
|
||||
from bson import tz_util, ObjectId
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
from common_test_class import AbstractPillarTest, TEST_EMAIL_USER, TEST_EMAIL_ADDRESS
|
||||
from common_test_data import EXAMPLE_PROJECT, EXAMPLE_NODE
|
||||
|
||||
PUBLIC_USER_FIELDS = {'full_name', 'email'}
|
||||
|
||||
@ -352,3 +356,117 @@ class UserListTests(AbstractPillarTest):
|
||||
resp = self.client.delete('/users/323456789abc123456789abc',
|
||||
headers={'Authorization': self.make_header('admin-token')})
|
||||
self.assertEqual(405, resp.status_code, resp.data)
|
||||
|
||||
|
||||
class PermissionComputationTest(AbstractPillarTest):
|
||||
maxDiff = None
|
||||
|
||||
def test_merge_permissions(self):
|
||||
from application.utils.authorization import merge_permissions
|
||||
|
||||
with self.app.test_request_context():
|
||||
self.assertEqual({}, merge_permissions())
|
||||
self.assertEqual({}, merge_permissions({}))
|
||||
self.assertEqual({}, merge_permissions({}, {}, {}))
|
||||
|
||||
# Merge one level deep
|
||||
self.assertEqual(
|
||||
{},
|
||||
merge_permissions({'users': []}, {'groups': []}, {'world': []}))
|
||||
self.assertEqual(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'groups': [{'group': 'manatees', 'methods': ['DELETE', 'GET']}],
|
||||
'world': ['GET']},
|
||||
merge_permissions(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}]},
|
||||
{'groups': [{'group': 'manatees', 'methods': ['DELETE', 'GET']}]},
|
||||
{'world': ['GET']}))
|
||||
|
||||
# Merge two levels deep.
|
||||
self.assertEqual(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET']},
|
||||
{'group': 'manatees', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'world': ['GET']},
|
||||
merge_permissions(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'PUT', 'POST']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET']}]},
|
||||
{'groups': [{'group': 'manatees', 'methods': ['GET', 'PUT', 'POST']}]},
|
||||
{'world': ['GET']}))
|
||||
|
||||
# Merge three levels deep
|
||||
self.assertEqual(
|
||||
{'users': [{'user': 'micak', 'methods': ['DELETE', 'GET', 'POST', 'PUT']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET', 'PUT', 'SCRATCH']},
|
||||
{'group': 'manatees', 'methods': ['GET', 'POST', 'PUT']}],
|
||||
'world': ['GET']},
|
||||
merge_permissions(
|
||||
{'users': [{'user': 'micak', 'methods': ['GET', 'PUT', 'POST']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET']},
|
||||
{'group': 'manatees', 'methods': ['GET', 'PUT', 'POST']}],
|
||||
'world': ['GET']},
|
||||
{'users': [{'user': 'micak', 'methods': ['DELETE']}],
|
||||
'groups': [{'group': 'lions', 'methods': ['GET', 'PUT', 'SCRATCH']}],
|
||||
}
|
||||
))
|
||||
|
||||
def sort(self, permissions):
|
||||
"""Returns a sorted copy of the permissions."""
|
||||
|
||||
from application.utils.authorization import merge_permissions
|
||||
return merge_permissions(permissions, {})
|
||||
|
||||
def test_effective_permissions(self):
|
||||
from application.utils.authorization import compute_aggr_permissions
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Test project permissions.
|
||||
self.assertEqual(
|
||||
{
|
||||
u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'GET', u'POST', u'PUT']}],
|
||||
u'world': [u'GET']
|
||||
},
|
||||
self.sort(compute_aggr_permissions('projects', EXAMPLE_PROJECT, None)))
|
||||
|
||||
# Test node type permissions.
|
||||
self.assertEqual(
|
||||
{
|
||||
u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'GET', u'POST', u'PUT']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'world': [u'GET']
|
||||
},
|
||||
self.sort(compute_aggr_permissions('projects', EXAMPLE_PROJECT, 'texture')))
|
||||
|
||||
# Test node permissions with non-existing project.
|
||||
node = copy.deepcopy(EXAMPLE_NODE)
|
||||
self.assertRaises(Forbidden, compute_aggr_permissions, 'nodes', node, None)
|
||||
|
||||
# Test node permissions without embedded project.
|
||||
self.ensure_project_exists()
|
||||
self.assertEqual(
|
||||
{u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'GET', u'POST', u'PUT']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'DELETE', u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'world': [u'GET']},
|
||||
self.sort(compute_aggr_permissions('nodes', node, None)))
|
||||
|
||||
# Test node permissions with embedded project.
|
||||
node = copy.deepcopy(EXAMPLE_NODE)
|
||||
node['project'] = EXAMPLE_PROJECT
|
||||
self.assertEqual(
|
||||
{u'groups': [{u'group': ObjectId('5596e975ea893b269af85c0e'),
|
||||
u'methods': [u'GET', u'POST', u'PUT']},
|
||||
{u'group': ObjectId('5596e975ea893b269af85c0f'),
|
||||
u'methods': [u'DELETE', u'GET']},
|
||||
{u'group': ObjectId('564733b56dcaf85da2faee8a'),
|
||||
u'methods': [u'GET']}],
|
||||
u'world': [u'GET']},
|
||||
self.sort(compute_aggr_permissions('nodes', node, None)))
|
||||
|
Loading…
x
Reference in New Issue
Block a user