From 172e8a8ba032db315a9ce49457c032c7b78403e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 4 Mar 2016 14:49:48 +0100 Subject: [PATCH] Fixed bug in validate_token(), it now returns the validation status. Also separated on-the-fly creation of user in our database into its own function. --- pillar/application/__init__.py | 8 +-- pillar/application/utils/authentication.py | 65 ++++++++++++++-------- pillar/tests/test_auth.py | 27 ++------- 3 files changed, 51 insertions(+), 49 deletions(-) diff --git a/pillar/application/__init__.py b/pillar/application/__init__.py index 379d2a26..8f5813de 100644 --- a/pillar/application/__init__.py +++ b/pillar/application/__init__.py @@ -17,13 +17,11 @@ from eve.io.mongo import Validator RFC1123_DATE_FORMAT = '%a, %d %b %Y %H:%M:%S GMT' + class NewAuth(TokenAuth): def check_auth(self, token, allowed_roles, resource, method): - if not token: - return False - else: - validate_token() - return True + return validate_token() + class ValidateCustomFields(Validator): def convert_properties(self, properties, node_schema): diff --git a/pillar/application/utils/authentication.py b/pillar/application/utils/authentication.py index 4c01871e..3d5021a2 100644 --- a/pillar/application/utils/authentication.py +++ b/pillar/application/utils/authentication.py @@ -54,13 +54,17 @@ def validate_token(): """Validate the token provided in the request and populate the current_user flask.g object, so that permissions and access to a resource can be defined from it. + + When the token is succesfully validated, sets `g.current_user` to contain + the user information. + + @returns True iff the user is logged in with a valid Blender ID token. """ + if not request.authorization: # If no authorization headers are provided, we are getting a request # from a non logged in user. Proceed accordingly. - return None - - current_user = {} + return False token = request.authorization.username tokens_collection = app.data.driver.db['tokens'] @@ -68,12 +72,12 @@ def validate_token(): lookup = {'token': token, 'expire_time': {"$gt": datetime.now()}} db_token = tokens_collection.find_one(lookup) if not db_token: - # If no valid token is found, we issue a new request to the Blender ID - # to verify the validity of the token. We will get basic user info if - # the user is authorized and we will make a new token. + # If no valid token is found in our local database, we issue a new request to the Blender ID + # server to verify the validity of the token passed via the HTTP header. We will get basic user info if + # the user is authorized, and we will store the token in our local database. validation = validate(token) - if validation is None or validation['status'] != 'success': - return None + if validation is None or validation.get('status', '') != 'success': + return False users = app.data.driver.db['users'] email = validation['data']['user']['email'] @@ -81,20 +85,8 @@ def validate_token(): username = make_unique_username(email) if not db_user: - user_data = { - 'full_name': username, - 'username': username, - 'email': email, - 'auth': [{ - 'provider': 'blender-id', - 'user_id': str(validation['data']['user']['id']), - 'token': ''}], - 'settings': { - 'email_communications': 1 - } - } - r = post_internal('users', user_data) - user_id = r[0]['_id'] + # We don't even know this user; create it on the fly. + user_id = create_new_user(email, username, validation['data']['user']['id']) groups = None else: user_id = db_user['_id'] @@ -111,7 +103,6 @@ def validate_token(): token=token, groups=groups, token_expire_time=datetime.now() + timedelta(hours=1)) - #return token_data else: users = app.data.driver.db['users'] db_user = users.find_one(db_token['user']) @@ -123,6 +114,34 @@ def validate_token(): g.current_user = current_user + return True + + +def create_new_user(email, username, user_id): + """Creates a new user in our local database. + + @param email: the user's email + @param username: the username, which is also used as full name. + @param user_id: the user ID from the Blender ID server. + @returns: the user ID from our local database. + """ + + user_data = { + 'full_name': username, + 'username': username, + 'email': email, + 'auth': [{ + 'provider': 'blender-id', + 'user_id': str(user_id), + 'token': ''}], + 'settings': { + 'email_communications': 1 + } + } + r = post_internal('users', user_data) + user_id = r[0]['_id'] + return user_id + def make_unique_username(email): """Creates a unique username from the email address. diff --git a/pillar/tests/test_auth.py b/pillar/tests/test_auth.py index 0362822a..0889b1a1 100644 --- a/pillar/tests/test_auth.py +++ b/pillar/tests/test_auth.py @@ -8,9 +8,8 @@ TEST_EMAIL_ADDRESS = '%s@testing.blender.org' % TEST_EMAIL_USER os.environ['MONGO_DBNAME'] = 'unittest' os.environ['EVE_SETTINGS'] = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'settings.py') - from application import app -from application.utils.authentication import make_unique_username, validate_token +from application.utils import authentication as auth def make_header(username, password=''): @@ -34,33 +33,19 @@ class FlaskrTestCase(unittest.TestCase): users.delete_many({'username': TEST_EMAIL_USER}) # This user shouldn't exist yet. - self.assertEqual(TEST_EMAIL_USER, make_unique_username(TEST_EMAIL_ADDRESS)) + self.assertEqual(TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS)) # Add a user, then test again. - user_data = { - 'full_name': 'Coro the Llama', - 'username': TEST_EMAIL_USER, - 'email': TEST_EMAIL_ADDRESS, - 'auth': [{ - 'provider': 'unit-test', - 'user_id': 'test123', - 'token': ''}], - 'settings': { - 'email_communications': 0 - } - } - - users.insert_one(user_data) + auth.create_new_user(TEST_EMAIL_ADDRESS, TEST_EMAIL_USER, 'test1234') try: - self.assertIsNotNone(users.find_one({'username': TEST_EMAIL_USER})) - self.assertEqual('%s1' % TEST_EMAIL_USER, make_unique_username(TEST_EMAIL_ADDRESS)) + self.assertEqual('%s1' % TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS)) finally: users.delete_many({'username': TEST_EMAIL_USER}) def test_validate_token__not_logged_in(self): with app.test_request_context(): - self.assertFalse(validate_token()) + self.assertFalse(auth.validate_token()) def test_validate_token__unknown_token(self): with app.test_request_context(headers={'Authorization': make_header('unknowntoken')}): - self.assertFalse(validate_token()) + self.assertFalse(auth.validate_token())