Store IP-based org-given roles in the user document.
This is a two-stage approach that happens when a new token is verified with Blender ID and stored in our local MongoDB: - Given the remote IP address of the HTTP request, compute and store the org roles in the token document. - Recompute the user's roles based on their own roles, regular org roles, and the roles stored in non-expired token documents. This happens once per hour, since that's how long we store tokens in our database.
This commit is contained in:
@@ -10,9 +10,11 @@ import logging
|
|||||||
import requests
|
import requests
|
||||||
from bson import tz_util
|
from bson import tz_util
|
||||||
from rauth import OAuth2Session
|
from rauth import OAuth2Session
|
||||||
from flask import Blueprint, request, current_app, jsonify, session
|
from flask import Blueprint, request, jsonify, session
|
||||||
from requests.adapters import HTTPAdapter
|
from requests.adapters import HTTPAdapter
|
||||||
|
|
||||||
|
from pillar import current_app
|
||||||
|
from pillar.api import service
|
||||||
from pillar.api.utils import authentication
|
from pillar.api.utils import authentication
|
||||||
from pillar.api.utils.authentication import find_user_in_db, upsert_user
|
from pillar.api.utils.authentication import find_user_in_db, upsert_user
|
||||||
|
|
||||||
@@ -79,7 +81,13 @@ def validate_create_user(blender_id_user_id, token, oauth_subclient_id):
|
|||||||
db_id, status = upsert_user(db_user)
|
db_id, status = upsert_user(db_user)
|
||||||
|
|
||||||
# Store the token in MongoDB.
|
# Store the token in MongoDB.
|
||||||
authentication.store_token(db_id, token, token_expiry, oauth_subclient_id)
|
ip_based_roles = current_app.org_manager.roles_for_request()
|
||||||
|
authentication.store_token(db_id, token, token_expiry, oauth_subclient_id,
|
||||||
|
org_roles=ip_based_roles)
|
||||||
|
|
||||||
|
if current_app.org_manager is not None:
|
||||||
|
roles = current_app.org_manager.refresh_roles(db_id)
|
||||||
|
db_user['roles'] = list(roles)
|
||||||
|
|
||||||
return db_user, status
|
return db_user, status
|
||||||
|
|
||||||
|
@@ -354,7 +354,16 @@ tokens_schema = {
|
|||||||
'is_subclient_token': {
|
'is_subclient_token': {
|
||||||
'type': 'boolean',
|
'type': 'boolean',
|
||||||
'required': False,
|
'required': False,
|
||||||
}
|
},
|
||||||
|
|
||||||
|
# Roles this user gets while this token is valid.
|
||||||
|
'org_roles': {
|
||||||
|
'type': 'list',
|
||||||
|
'default': [],
|
||||||
|
'schema': {
|
||||||
|
'type': 'string',
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
files_schema = {
|
files_schema = {
|
||||||
|
@@ -4,11 +4,13 @@ Assumes role names that are given to users by organization membership
|
|||||||
start with the string "org-".
|
start with the string "org-".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
import bson
|
import bson
|
||||||
|
import flask
|
||||||
import werkzeug.exceptions as wz_exceptions
|
import werkzeug.exceptions as wz_exceptions
|
||||||
|
|
||||||
from pillar import attrs_extra, current_app
|
from pillar import attrs_extra, current_app
|
||||||
@@ -244,8 +246,11 @@ class OrgManager:
|
|||||||
for uid in members:
|
for uid in members:
|
||||||
self.refresh_roles(uid)
|
self.refresh_roles(uid)
|
||||||
|
|
||||||
def refresh_roles(self, user_id: bson.ObjectId):
|
def refresh_roles(self, user_id: bson.ObjectId) -> typing.Set[str]:
|
||||||
"""Refreshes the user's roles to own roles + organizations' roles."""
|
"""Refreshes the user's roles to own roles + organizations' roles.
|
||||||
|
|
||||||
|
:returns: the applied set of roles.
|
||||||
|
"""
|
||||||
|
|
||||||
assert isinstance(user_id, bson.ObjectId)
|
assert isinstance(user_id, bson.ObjectId)
|
||||||
|
|
||||||
@@ -254,30 +259,41 @@ class OrgManager:
|
|||||||
self._log.info('Refreshing roles for user %s', user_id)
|
self._log.info('Refreshing roles for user %s', user_id)
|
||||||
|
|
||||||
org_coll = current_app.db('organizations')
|
org_coll = current_app.db('organizations')
|
||||||
|
tokens_coll = current_app.db('tokens')
|
||||||
|
|
||||||
# Aggregate all org-given roles for this user.
|
def aggr_roles(coll, match: dict) -> typing.Set[str]:
|
||||||
query = org_coll.aggregate([
|
query = coll.aggregate([
|
||||||
{'$match': {'members': user_id}},
|
{'$match': match},
|
||||||
{'$project': {'org_roles': 1}},
|
{'$project': {'org_roles': 1}},
|
||||||
{'$unwind': {'path': '$org_roles'}},
|
{'$unwind': {'path': '$org_roles'}},
|
||||||
{'$group': {
|
{'$group': {
|
||||||
'_id': None,
|
'_id': None,
|
||||||
'org_roles': {'$addToSet': '$org_roles'},
|
'org_roles': {'$addToSet': '$org_roles'},
|
||||||
}}])
|
}}])
|
||||||
|
|
||||||
# If the user has no organizations at all, the query will have no results.
|
# If the user has no organizations/tokens at all, the query will have no results.
|
||||||
try:
|
try:
|
||||||
org_roles_doc = query.next()
|
org_roles_doc = query.next()
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
org_roles = set()
|
return set()
|
||||||
else:
|
return set(org_roles_doc['org_roles'])
|
||||||
org_roles = set(org_roles_doc['org_roles'])
|
|
||||||
|
# Join all organization-given roles and roles from the tokens collection.
|
||||||
|
org_roles = aggr_roles(org_coll, {'members': user_id})
|
||||||
|
self._log.debug('Organization-given roles for user %s: %s', user_id, org_roles)
|
||||||
|
now = datetime.datetime.now(bson.tz_util.utc)
|
||||||
|
token_roles = aggr_roles(tokens_coll, {
|
||||||
|
'user': user_id,
|
||||||
|
'expire_time': {"$gt": now},
|
||||||
|
})
|
||||||
|
self._log.debug('Token-given roles for user %s: %s', user_id, token_roles)
|
||||||
|
org_roles.update(token_roles)
|
||||||
|
|
||||||
users_coll = current_app.db('users')
|
users_coll = current_app.db('users')
|
||||||
user_doc = users_coll.find_one(user_id, projection={'roles': 1})
|
user_doc = users_coll.find_one(user_id, projection={'roles': 1})
|
||||||
if not user_doc:
|
if not user_doc:
|
||||||
self._log.warning('Trying refresh roles of non-existing user %s, ignoring', user_id)
|
self._log.warning('Trying refresh roles of non-existing user %s, ignoring', user_id)
|
||||||
return
|
return set()
|
||||||
|
|
||||||
all_user_roles = set(user_doc.get('roles') or [])
|
all_user_roles = set(user_doc.get('roles') or [])
|
||||||
existing_org_roles = {role for role in all_user_roles
|
existing_org_roles = {role for role in all_user_roles
|
||||||
@@ -291,6 +307,8 @@ class OrgManager:
|
|||||||
if revoke_roles:
|
if revoke_roles:
|
||||||
do_badger('revoke', roles=revoke_roles, user_id=user_id)
|
do_badger('revoke', roles=revoke_roles, user_id=user_id)
|
||||||
|
|
||||||
|
return all_user_roles.union(grant_roles) - revoke_roles
|
||||||
|
|
||||||
def user_is_admin(self, org_id: bson.ObjectId) -> bool:
|
def user_is_admin(self, org_id: bson.ObjectId) -> bool:
|
||||||
"""Returns whether the currently logged in user is the admin of the organization."""
|
"""Returns whether the currently logged in user is the admin of the organization."""
|
||||||
|
|
||||||
@@ -389,14 +407,37 @@ class OrgManager:
|
|||||||
from . import ip_ranges
|
from . import ip_ranges
|
||||||
|
|
||||||
org_coll = current_app.db('organizations')
|
org_coll = current_app.db('organizations')
|
||||||
|
try:
|
||||||
|
q = ip_ranges.query(remote_addr)
|
||||||
|
except ValueError as ex:
|
||||||
|
self._log.warning('Invalid remote address %s, ignoring IP-based roles: %s',
|
||||||
|
remote_addr, ex)
|
||||||
|
return set()
|
||||||
|
|
||||||
orgs = org_coll.find(
|
orgs = org_coll.find(
|
||||||
{'ip_ranges': ip_ranges.query(remote_addr)},
|
{'ip_ranges': q},
|
||||||
projection={'org_roles': True},
|
projection={'org_roles': True},
|
||||||
)
|
)
|
||||||
return set(role
|
return set(role
|
||||||
for org in orgs
|
for org in orgs
|
||||||
for role in org.get('org_roles', []))
|
for role in org.get('org_roles', []))
|
||||||
|
|
||||||
|
def roles_for_request(self) -> typing.Set[str]:
|
||||||
|
"""Find roles for user via the request's remote IP address."""
|
||||||
|
|
||||||
|
try:
|
||||||
|
remote_addr = flask.request.access_route[0]
|
||||||
|
except IndexError:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
if not remote_addr:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
roles = self.roles_for_ip_address(remote_addr)
|
||||||
|
self._log.debug('Roles for IP address %s: %s', remote_addr, roles)
|
||||||
|
|
||||||
|
return roles
|
||||||
|
|
||||||
|
|
||||||
def setup_app(app):
|
def setup_app(app):
|
||||||
from . import patch, hooks
|
from . import patch, hooks
|
||||||
|
@@ -55,7 +55,7 @@ def force_cli_user():
|
|||||||
g.current_user = CLI_USER
|
g.current_user = CLI_USER
|
||||||
|
|
||||||
|
|
||||||
def find_user_in_db(user_info: dict, provider='blender-id'):
|
def find_user_in_db(user_info: dict, provider='blender-id') -> dict:
|
||||||
"""Find the user in our database, creating/updating the returned document where needed.
|
"""Find the user in our database, creating/updating the returned document where needed.
|
||||||
|
|
||||||
First, search for the user using its id from the provider, then try to look the user up via the
|
First, search for the user using its id from the provider, then try to look the user up via the
|
||||||
@@ -222,7 +222,8 @@ def hash_auth_token(token: str) -> str:
|
|||||||
return base64.b64encode(digest).decode('ascii')
|
return base64.b64encode(digest).decode('ascii')
|
||||||
|
|
||||||
|
|
||||||
def store_token(user_id, token: str, token_expiry, oauth_subclient_id=False):
|
def store_token(user_id, token: str, token_expiry, oauth_subclient_id=False,
|
||||||
|
org_roles: typing.Set[str]=frozenset()):
|
||||||
"""Stores an authentication token.
|
"""Stores an authentication token.
|
||||||
|
|
||||||
:returns: the token document from MongoDB
|
:returns: the token document from MongoDB
|
||||||
@@ -237,6 +238,8 @@ def store_token(user_id, token: str, token_expiry, oauth_subclient_id=False):
|
|||||||
}
|
}
|
||||||
if oauth_subclient_id:
|
if oauth_subclient_id:
|
||||||
token_data['is_subclient_token'] = True
|
token_data['is_subclient_token'] = True
|
||||||
|
if org_roles:
|
||||||
|
token_data['org_roles'] = sorted(org_roles)
|
||||||
|
|
||||||
r, _, _, status = current_app.post_internal('tokens', token_data)
|
r, _, _, status = current_app.post_internal('tokens', token_data)
|
||||||
|
|
||||||
|
@@ -67,6 +67,9 @@ class UserClass(flask_login.UserMixin):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f'UserClass(user_id={self.user_id})'
|
return f'UserClass(user_id={self.user_id})'
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f'{self.__class__.__name__}(id={self.user_id}, email={self.email!r}'
|
||||||
|
|
||||||
def __getitem__(self, item):
|
def __getitem__(self, item):
|
||||||
"""Compatibility layer with old dict-based g.current_user object."""
|
"""Compatibility layer with old dict-based g.current_user object."""
|
||||||
|
|
||||||
|
@@ -477,13 +477,15 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
return urlencode(jsonified_params)
|
return urlencode(jsonified_params)
|
||||||
|
|
||||||
def client_request(self, method, path, qs=None, expected_status=200, auth_token=None, json=None,
|
def client_request(self, method, path, qs=None, expected_status=200, auth_token=None, json=None,
|
||||||
data=None, headers=None, files=None, content_type=None, etag=None):
|
data=None, headers=None, files=None, content_type=None, etag=None,
|
||||||
|
environ_overrides=None):
|
||||||
"""Performs a HTTP request to the server."""
|
"""Performs a HTTP request to the server."""
|
||||||
|
|
||||||
from pillar.api.utils import dumps
|
from pillar.api.utils import dumps
|
||||||
import json as mod_json
|
import json as mod_json
|
||||||
|
|
||||||
headers = headers or {}
|
headers = headers or {}
|
||||||
|
environ_overrides = environ_overrides or {}
|
||||||
if auth_token is not None:
|
if auth_token is not None:
|
||||||
headers['Authorization'] = self.make_header(auth_token)
|
headers['Authorization'] = self.make_header(auth_token)
|
||||||
|
|
||||||
@@ -506,7 +508,8 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
|
|
||||||
resp = self.client.open(path=path, method=method, data=data, headers=headers,
|
resp = self.client.open(path=path, method=method, data=data, headers=headers,
|
||||||
content_type=content_type,
|
content_type=content_type,
|
||||||
query_string=self.join_url_params(qs))
|
query_string=self.join_url_params(qs),
|
||||||
|
environ_overrides=environ_overrides)
|
||||||
self.assertEqual(expected_status, resp.status_code,
|
self.assertEqual(expected_status, resp.status_code,
|
||||||
'Expected status %i but got %i. Response: %s' % (
|
'Expected status %i but got %i. Response: %s' % (
|
||||||
expected_status, resp.status_code, resp.data
|
expected_status, resp.status_code, resp.data
|
||||||
|
@@ -1,10 +1,11 @@
|
|||||||
|
import datetime
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
import bson
|
import bson
|
||||||
|
from bson import tz_util
|
||||||
import responses
|
import responses
|
||||||
|
|
||||||
from pillar.tests import AbstractPillarTest
|
from pillar.tests import AbstractPillarTest
|
||||||
from pillar.api.utils import remove_private_keys
|
|
||||||
|
|
||||||
|
|
||||||
class AbstractOrgTest(AbstractPillarTest):
|
class AbstractOrgTest(AbstractPillarTest):
|
||||||
@@ -669,6 +670,7 @@ class OrganizationItemEveTest(AbstractPillarTest):
|
|||||||
|
|
||||||
def _put_test(self, auth_token: typing.Optional[str]):
|
def _put_test(self, auth_token: typing.Optional[str]):
|
||||||
"""Generic PUT test, should be same result for all cases."""
|
"""Generic PUT test, should be same result for all cases."""
|
||||||
|
from pillar.api.utils import remove_private_keys
|
||||||
|
|
||||||
put_doc = remove_private_keys(self.org_doc)
|
put_doc = remove_private_keys(self.org_doc)
|
||||||
put_doc['name'] = 'new name'
|
put_doc['name'] = 'new name'
|
||||||
@@ -771,11 +773,10 @@ class UserCreationTest(AbstractPillarTest):
|
|||||||
self.assertEqual([my_id], db_org['members'])
|
self.assertEqual([my_id], db_org['members'])
|
||||||
|
|
||||||
|
|
||||||
class IPRangeTest(AbstractOrgTest):
|
class AbstractIPRangeSingleOrgTest(AbstractOrgTest):
|
||||||
|
|
||||||
def setUp(self, **kwargs):
|
def setUp(self, **kwargs):
|
||||||
super().setUp(**kwargs)
|
super().setUp(**kwargs)
|
||||||
self.uid = self.create_user(24 * 'a', token='token')
|
self.uid = self.create_user(24 * 'a', roles={'subscriber'}, token='token')
|
||||||
self.org_roles = {'org-subscriber', 'org-phabricator'}
|
self.org_roles = {'org-subscriber', 'org-phabricator'}
|
||||||
self.org = self.app.org_manager.create_new_org('Хакеры', self.uid, 25,
|
self.org = self.app.org_manager.create_new_org('Хакеры', self.uid, 25,
|
||||||
org_roles=self.org_roles)
|
org_roles=self.org_roles)
|
||||||
@@ -793,6 +794,9 @@ class IPRangeTest(AbstractOrgTest):
|
|||||||
db_org = self.om._get_org(self.org_id)
|
db_org = self.om._get_org(self.org_id)
|
||||||
return db_org
|
return db_org
|
||||||
|
|
||||||
|
|
||||||
|
class IPRangeTest(AbstractIPRangeSingleOrgTest):
|
||||||
|
|
||||||
def test_ipranges_doc(self):
|
def test_ipranges_doc(self):
|
||||||
from pillar.api.organizations import ip_ranges
|
from pillar.api.organizations import ip_ranges
|
||||||
|
|
||||||
@@ -922,7 +926,7 @@ class IPRangeQueryTest(AbstractOrgTest):
|
|||||||
db_org = self.om._get_org(org_id)
|
db_org = self.om._get_org(org_id)
|
||||||
return db_org
|
return db_org
|
||||||
|
|
||||||
def test_happy(self):
|
def test_query(self):
|
||||||
# Set up a few organisations. A and B have overlapping IPv4 ranges, B and C on IPv6.
|
# Set up a few organisations. A and B have overlapping IPv4 ranges, B and C on IPv6.
|
||||||
org_roles_a = {'org-roleA1', 'org-roleA2'}
|
org_roles_a = {'org-roleA1', 'org-roleA2'}
|
||||||
org_a = self.app.org_manager.create_new_org('Хакеры', self.uid, 25, org_roles=org_roles_a)
|
org_a = self.app.org_manager.create_new_org('Хакеры', self.uid, 25, org_roles=org_roles_a)
|
||||||
@@ -961,3 +965,128 @@ class IPRangeQueryTest(AbstractOrgTest):
|
|||||||
|
|
||||||
self.assertEqual(set(), self.om.roles_for_ip_address('1111:ffff::1'))
|
self.assertEqual(set(), self.om.roles_for_ip_address('1111:ffff::1'))
|
||||||
self.assertEqual(set(), self.om.roles_for_ip_address('::1'))
|
self.assertEqual(set(), self.om.roles_for_ip_address('::1'))
|
||||||
|
|
||||||
|
|
||||||
|
class IPRangeLoginRolesTest(AbstractIPRangeSingleOrgTest):
|
||||||
|
def setUp(self, **kwargs):
|
||||||
|
super().setUp(**kwargs)
|
||||||
|
self.user_roles = {'subscriber'}
|
||||||
|
self._patch({
|
||||||
|
'name': 'Хакеры',
|
||||||
|
'ip_ranges': [
|
||||||
|
'192.168.0.0/16',
|
||||||
|
'2a03:b0c0:0:1010::8fe:6ef1/120',
|
||||||
|
]})
|
||||||
|
|
||||||
|
def _test_api(self, headers: dict, env: dict):
|
||||||
|
from pillar.api.utils.authentication import hash_auth_token
|
||||||
|
|
||||||
|
self.mock_blenderid_validate_happy()
|
||||||
|
# This should check the IP of the user agains the organization IP ranges and update the
|
||||||
|
# user in the database.
|
||||||
|
resp = self.get('/api/users/me', auth_token='usertoken',
|
||||||
|
headers=headers, environ_overrides=env)
|
||||||
|
me = resp.json()
|
||||||
|
|
||||||
|
# The IP-based roles should be stored in the token document.
|
||||||
|
tokens_coll = self.app.db('tokens')
|
||||||
|
token = tokens_coll.find_one({
|
||||||
|
'user': bson.ObjectId(me['_id']),
|
||||||
|
'token_hashed': hash_auth_token('usertoken'),
|
||||||
|
})
|
||||||
|
self.assertEqual(self.org_roles, set(token['org_roles']))
|
||||||
|
|
||||||
|
# The IP-based roles should also be persisted in the user document.
|
||||||
|
self.assertEqual({'subscriber', *self.org_roles}, set(me['roles']))
|
||||||
|
|
||||||
|
def _test_api_forwarded_for(self, ip_addr: str):
|
||||||
|
self._test_api({'X-Forwarded-For': ip_addr}, {})
|
||||||
|
|
||||||
|
def _test_api_remote_addr(self, ip_addr: str):
|
||||||
|
self._test_api({}, {'REMOTE_ADDR': ip_addr})
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_api_forwarded_for_ipv6(self):
|
||||||
|
self._test_api_forwarded_for('2a03:b0c0:0:1010::8fe:6ede')
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_api_forwarded_for_ipv4(self):
|
||||||
|
self._test_api_forwarded_for('192.168.3.254, 4.3.4.4, 5.6.7.8')
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_api_remote_addr_ipv6(self):
|
||||||
|
self._test_api_remote_addr('2a03:b0c0:0:1010::8fe:6ede')
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_api_remote_addr_ipv4(self):
|
||||||
|
self._test_api_remote_addr('192.168.3.254')
|
||||||
|
|
||||||
|
def _test_web_forwarded_for(self, ip_addr: str, ip_roles: typing.Set[str]):
|
||||||
|
from pillar.api.utils.authentication import hash_auth_token
|
||||||
|
from pillar import auth
|
||||||
|
self.mock_blenderid_validate_happy()
|
||||||
|
|
||||||
|
expect_roles = {*self.user_roles, *ip_roles}
|
||||||
|
|
||||||
|
with self.app.test_request_context(headers={'X-Forwarded-For': ip_addr}):
|
||||||
|
# This should check the IP of the user agains the organization IP ranges and update the
|
||||||
|
# user in the database.
|
||||||
|
auth.login_user('usertoken', load_from_db=True)
|
||||||
|
my_id = auth.current_user.user_id
|
||||||
|
|
||||||
|
# The roles should be reflected in the current user object.
|
||||||
|
self.assertEqual(expect_roles, set(auth.current_user.roles))
|
||||||
|
|
||||||
|
# The IP-based roles should be stored in the token document.
|
||||||
|
tokens_coll = self.app.db('tokens')
|
||||||
|
token = tokens_coll.find_one({
|
||||||
|
'user': bson.ObjectId(my_id),
|
||||||
|
'token_hashed': hash_auth_token('usertoken'),
|
||||||
|
'expire_time': {'$gt': datetime.datetime.now(tz_util.utc)},
|
||||||
|
})
|
||||||
|
self.assertEqual(ip_roles, set(token['org_roles']))
|
||||||
|
|
||||||
|
# The IP-based roles should also be persisted in the user document.
|
||||||
|
users_coll = self.app.db('users')
|
||||||
|
me = users_coll.find_one({'_id': my_id})
|
||||||
|
self.assertEqual(expect_roles, set(me['roles']))
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_web_forwarded_for_ipv6(self):
|
||||||
|
self._test_web_forwarded_for('2a03:b0c0:0:1010::8fe:6ede', self.org_roles)
|
||||||
|
|
||||||
|
# Even though this request is outside of the IP range, the user should
|
||||||
|
# still maintain their organization's roles because it's stored in the
|
||||||
|
# token document created for the previous request.
|
||||||
|
self._test_web_forwarded_for('2a03:d00d:0:1010::8fe:6ede', self.org_roles)
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_web_forwarded_for_ipv6_outside_range(self):
|
||||||
|
self._test_web_forwarded_for('2a03:d00d:0:1010::8fe:6ede', set())
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_web_forwarded_for_ipv4(self):
|
||||||
|
self._test_web_forwarded_for('192.168.3.254', self.org_roles)
|
||||||
|
self._test_web_forwarded_for('123.123.123.123', self.org_roles)
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_web_forwarded_for_ipv4_outside_range(self):
|
||||||
|
self._test_web_forwarded_for('123.123.123.123', set())
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_web_forwarded_for_invalid_addr(self):
|
||||||
|
# This shouldn't produce any exception, but be ignored instead.
|
||||||
|
self._test_web_forwarded_for('317.518.1.7', set())
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_web_expire_Roles(self):
|
||||||
|
# This gives the roles until the token expires.
|
||||||
|
self._test_web_forwarded_for('2a03:b0c0:0:1010::8fe:6ede', self.org_roles)
|
||||||
|
|
||||||
|
# Force all tokens to expire.
|
||||||
|
tokens_coll = self.app.db('tokens')
|
||||||
|
expire = datetime.datetime.now(tz=tz_util.utc) - datetime.timedelta(hours=1)
|
||||||
|
tokens_coll.update_many({}, {'$set': {'expire_time': expire}})
|
||||||
|
|
||||||
|
# A new request from outside the IP range should now not result in the org roles.
|
||||||
|
self._test_web_forwarded_for('2a03:d00d:0:1010::8fe:6ede', set())
|
||||||
|
Reference in New Issue
Block a user