Fixed bug in validate_token(), it now returns the validation status.
Also separated on-the-fly creation of user in our database into its own function.
This commit is contained in:
@@ -17,13 +17,11 @@ from eve.io.mongo import Validator
|
||||
|
||||
RFC1123_DATE_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
|
||||
|
||||
|
||||
class NewAuth(TokenAuth):
|
||||
def check_auth(self, token, allowed_roles, resource, method):
|
||||
if not token:
|
||||
return False
|
||||
else:
|
||||
validate_token()
|
||||
return True
|
||||
return validate_token()
|
||||
|
||||
|
||||
class ValidateCustomFields(Validator):
|
||||
def convert_properties(self, properties, node_schema):
|
||||
|
@@ -54,13 +54,17 @@ def validate_token():
|
||||
"""Validate the token provided in the request and populate the current_user
|
||||
flask.g object, so that permissions and access to a resource can be defined
|
||||
from it.
|
||||
|
||||
When the token is succesfully validated, sets `g.current_user` to contain
|
||||
the user information.
|
||||
|
||||
@returns True iff the user is logged in with a valid Blender ID token.
|
||||
"""
|
||||
|
||||
if not request.authorization:
|
||||
# If no authorization headers are provided, we are getting a request
|
||||
# from a non logged in user. Proceed accordingly.
|
||||
return None
|
||||
|
||||
current_user = {}
|
||||
return False
|
||||
|
||||
token = request.authorization.username
|
||||
tokens_collection = app.data.driver.db['tokens']
|
||||
@@ -68,12 +72,12 @@ def validate_token():
|
||||
lookup = {'token': token, 'expire_time': {"$gt": datetime.now()}}
|
||||
db_token = tokens_collection.find_one(lookup)
|
||||
if not db_token:
|
||||
# If no valid token is found, we issue a new request to the Blender ID
|
||||
# to verify the validity of the token. We will get basic user info if
|
||||
# the user is authorized and we will make a new token.
|
||||
# If no valid token is found in our local database, we issue a new request to the Blender ID
|
||||
# server to verify the validity of the token passed via the HTTP header. We will get basic user info if
|
||||
# the user is authorized, and we will store the token in our local database.
|
||||
validation = validate(token)
|
||||
if validation is None or validation['status'] != 'success':
|
||||
return None
|
||||
if validation is None or validation.get('status', '') != 'success':
|
||||
return False
|
||||
|
||||
users = app.data.driver.db['users']
|
||||
email = validation['data']['user']['email']
|
||||
@@ -81,20 +85,8 @@ def validate_token():
|
||||
username = make_unique_username(email)
|
||||
|
||||
if not db_user:
|
||||
user_data = {
|
||||
'full_name': username,
|
||||
'username': username,
|
||||
'email': email,
|
||||
'auth': [{
|
||||
'provider': 'blender-id',
|
||||
'user_id': str(validation['data']['user']['id']),
|
||||
'token': ''}],
|
||||
'settings': {
|
||||
'email_communications': 1
|
||||
}
|
||||
}
|
||||
r = post_internal('users', user_data)
|
||||
user_id = r[0]['_id']
|
||||
# We don't even know this user; create it on the fly.
|
||||
user_id = create_new_user(email, username, validation['data']['user']['id'])
|
||||
groups = None
|
||||
else:
|
||||
user_id = db_user['_id']
|
||||
@@ -111,7 +103,6 @@ def validate_token():
|
||||
token=token,
|
||||
groups=groups,
|
||||
token_expire_time=datetime.now() + timedelta(hours=1))
|
||||
#return token_data
|
||||
else:
|
||||
users = app.data.driver.db['users']
|
||||
db_user = users.find_one(db_token['user'])
|
||||
@@ -123,6 +114,34 @@ def validate_token():
|
||||
|
||||
g.current_user = current_user
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def create_new_user(email, username, user_id):
|
||||
"""Creates a new user in our local database.
|
||||
|
||||
@param email: the user's email
|
||||
@param username: the username, which is also used as full name.
|
||||
@param user_id: the user ID from the Blender ID server.
|
||||
@returns: the user ID from our local database.
|
||||
"""
|
||||
|
||||
user_data = {
|
||||
'full_name': username,
|
||||
'username': username,
|
||||
'email': email,
|
||||
'auth': [{
|
||||
'provider': 'blender-id',
|
||||
'user_id': str(user_id),
|
||||
'token': ''}],
|
||||
'settings': {
|
||||
'email_communications': 1
|
||||
}
|
||||
}
|
||||
r = post_internal('users', user_data)
|
||||
user_id = r[0]['_id']
|
||||
return user_id
|
||||
|
||||
|
||||
def make_unique_username(email):
|
||||
"""Creates a unique username from the email address.
|
||||
|
@@ -8,9 +8,8 @@ TEST_EMAIL_ADDRESS = '%s@testing.blender.org' % TEST_EMAIL_USER
|
||||
os.environ['MONGO_DBNAME'] = 'unittest'
|
||||
os.environ['EVE_SETTINGS'] = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'settings.py')
|
||||
|
||||
|
||||
from application import app
|
||||
from application.utils.authentication import make_unique_username, validate_token
|
||||
from application.utils import authentication as auth
|
||||
|
||||
|
||||
def make_header(username, password=''):
|
||||
@@ -34,33 +33,19 @@ class FlaskrTestCase(unittest.TestCase):
|
||||
users.delete_many({'username': TEST_EMAIL_USER})
|
||||
|
||||
# This user shouldn't exist yet.
|
||||
self.assertEqual(TEST_EMAIL_USER, make_unique_username(TEST_EMAIL_ADDRESS))
|
||||
self.assertEqual(TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
||||
|
||||
# Add a user, then test again.
|
||||
user_data = {
|
||||
'full_name': 'Coro the Llama',
|
||||
'username': TEST_EMAIL_USER,
|
||||
'email': TEST_EMAIL_ADDRESS,
|
||||
'auth': [{
|
||||
'provider': 'unit-test',
|
||||
'user_id': 'test123',
|
||||
'token': ''}],
|
||||
'settings': {
|
||||
'email_communications': 0
|
||||
}
|
||||
}
|
||||
|
||||
users.insert_one(user_data)
|
||||
auth.create_new_user(TEST_EMAIL_ADDRESS, TEST_EMAIL_USER, 'test1234')
|
||||
try:
|
||||
self.assertIsNotNone(users.find_one({'username': TEST_EMAIL_USER}))
|
||||
self.assertEqual('%s1' % TEST_EMAIL_USER, make_unique_username(TEST_EMAIL_ADDRESS))
|
||||
self.assertEqual('%s1' % TEST_EMAIL_USER, auth.make_unique_username(TEST_EMAIL_ADDRESS))
|
||||
finally:
|
||||
users.delete_many({'username': TEST_EMAIL_USER})
|
||||
|
||||
def test_validate_token__not_logged_in(self):
|
||||
with app.test_request_context():
|
||||
self.assertFalse(validate_token())
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
||||
def test_validate_token__unknown_token(self):
|
||||
with app.test_request_context(headers={'Authorization': make_header('unknowntoken')}):
|
||||
self.assertFalse(validate_token())
|
||||
self.assertFalse(auth.validate_token())
|
||||
|
Reference in New Issue
Block a user