Added 'operations hash_auth_tokens' CLI command.
This commit is contained in:
@@ -193,6 +193,58 @@ class AuthenticationTests(AbstractPillarTest):
|
||||
self.assertEqual({tokdat_se['token_hashed'], tokdat_ne['token_hashed']},
|
||||
{item['token_hashed'] for item in token_coll.find()})
|
||||
|
||||
def test_token_hashing_cli(self):
|
||||
from dateutil.parser import parse
|
||||
self.enter_app_context()
|
||||
|
||||
user_id1 = self.create_user(24 * 'a')
|
||||
user_id2 = self.create_user(24 * 'b')
|
||||
now = datetime.datetime.now(tz_util.utc)
|
||||
|
||||
# Force unhashed tokens into our database.
|
||||
tokens_coll = self.app.db('tokens')
|
||||
tokens_coll.insert_one({
|
||||
'_id': ObjectId('59c0f8ef98377327e1525cd1'),
|
||||
'_etag': '3b8fffa5177e87555acd95e49e6764cb81de5d70',
|
||||
'_created': parse('2017-09-19T13:01:03.000+0200'),
|
||||
'_updated': parse('2017-09-19T13:01:03.000+0200'),
|
||||
'user': user_id1,
|
||||
'token': 'unhashed-token',
|
||||
'expire_time': now + datetime.timedelta(hours=1),
|
||||
})
|
||||
tokens_coll.insert_one({
|
||||
'_id': ObjectId(24 * 'c'),
|
||||
'_etag': '3b8fffa5177e87555acd95e49e6764cb81de5d70',
|
||||
'_created': parse('2017-09-20T13:01:03.000+0200'),
|
||||
'_updated': parse('2017-09-20T13:01:03.000+0200'),
|
||||
'user': user_id2,
|
||||
'token': 'some-other-token',
|
||||
'expire_time': now + datetime.timedelta(hours=1),
|
||||
})
|
||||
|
||||
# This token should work.
|
||||
me1 = self.get('/api/users/me', auth_token='unhashed-token').get_json()
|
||||
self.assertEqual(str(user_id1), me1['_id'])
|
||||
me2 = self.get('/api/users/me', auth_token='some-other-token').get_json()
|
||||
self.assertEqual(str(user_id2), me2['_id'])
|
||||
|
||||
from pillar.cli.operations import hash_auth_tokens
|
||||
hash_auth_tokens()
|
||||
|
||||
# The same token should still work, but be hashed in the DB.
|
||||
me1 = self.get('/api/users/me', auth_token='unhashed-token').get_json()
|
||||
self.assertEqual(str(user_id1), me1['_id'])
|
||||
me2 = self.get('/api/users/me', auth_token='some-other-token').get_json()
|
||||
self.assertEqual(str(user_id2), me2['_id'])
|
||||
|
||||
db_token = tokens_coll.find_one({'_id': ObjectId('59c0f8ef98377327e1525cd1')})
|
||||
self.assertNotIn('token', db_token)
|
||||
self.assertIn('token_hashed', db_token)
|
||||
|
||||
db_token = tokens_coll.find_one({'_id': ObjectId('59c0f8ef98377327e1525cd1')})
|
||||
self.assertNotIn('token', db_token)
|
||||
self.assertIn('token_hashed', db_token)
|
||||
|
||||
|
||||
class UserListTests(AbstractPillarTest):
|
||||
"""Security-related tests."""
|
||||
|
Reference in New Issue
Block a user