Initial mfa support (for internal users) #93591
@ -3,10 +3,11 @@ import re
|
|||||||
|
|
||||||
from django.test.client import Client
|
from django.test.client import Client
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
|
from django.urls import reverse
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
from bid_main.tests.factories import UserFactory
|
from bid_main.tests.factories import UserFactory
|
||||||
from mfa.models import devices_for_user
|
from mfa.models import EncryptedTOTPDevice, devices_for_user
|
||||||
|
|
||||||
|
|
||||||
@patch(
|
@patch(
|
||||||
@ -43,7 +44,7 @@ class TestMfaRequredIfConfigured(TestCase):
|
|||||||
follow=True,
|
follow=True,
|
||||||
)
|
)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
response = client.get('/mfa/totp/')
|
response = client.get(reverse('bid_main:mfa_totp'))
|
||||||
match = re.search(
|
match = re.search(
|
||||||
r'input type="hidden" name="key" value="([^"]+)"', str(response.content)
|
r'input type="hidden" name="key" value="([^"]+)"', str(response.content)
|
||||||
)
|
)
|
||||||
@ -53,7 +54,7 @@ class TestMfaRequredIfConfigured(TestCase):
|
|||||||
)
|
)
|
||||||
signature = match.group(1)
|
signature = match.group(1)
|
||||||
response = client.post(
|
response = client.post(
|
||||||
'/mfa/totp/',
|
reverse('bid_main:mfa_totp'),
|
||||||
{'name': 'test totp device', 'code': '123456', 'key': key, 'signature': signature},
|
{'name': 'test totp device', 'code': '123456', 'key': key, 'signature': signature},
|
||||||
follow=True
|
follow=True
|
||||||
)
|
)
|
||||||
@ -84,3 +85,52 @@ class TestMfaRequredIfConfigured(TestCase):
|
|||||||
# have reached the account page
|
# have reached the account page
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertContains(response, '<h2>Account</h2>')
|
self.assertContains(response, '<h2>Account</h2>')
|
||||||
|
|
||||||
|
def test_recovery_codes(self):
|
||||||
|
# shortcut: create a totp device via a model
|
||||||
|
EncryptedTOTPDevice.objects.create(encrypted_key='00', key='', name='totp', user=self.user)
|
||||||
|
|
||||||
|
client = Client()
|
||||||
|
response = client.post(
|
||||||
|
'/login',
|
||||||
|
{'username': self.user.email, 'password': 'hunter2'},
|
||||||
|
follow=True,
|
||||||
|
)
|
||||||
|
match = re.search(
|
||||||
|
r'input type="hidden" name="otp_device" value="([^"]+)"', str(response.content)
|
||||||
|
)
|
||||||
|
otp_device = match.group(1)
|
||||||
|
response = client.post(
|
||||||
|
'/login',
|
||||||
|
{'otp_device': otp_device, 'otp_token': '123456'},
|
||||||
|
follow=True,
|
||||||
|
)
|
||||||
|
self.assertContains(response, '<h2>Account</h2>')
|
||||||
|
|
||||||
|
response = client.post(reverse('bid_main:mfa_generate_recovery'), follow=True)
|
||||||
|
self.assertContains(response, '10 recovery codes remaining')
|
||||||
|
|
||||||
|
# hope that we don't add any other code elements
|
||||||
|
match = re.search(
|
||||||
|
r'<code>([0-9A-F]{16})</code>', str(response.content)
|
||||||
|
)
|
||||||
|
recovery_code = match.group(1)
|
||||||
|
|
||||||
|
client = Client()
|
||||||
|
response = client.post(
|
||||||
|
'/login',
|
||||||
|
{'username': self.user.email, 'password': 'hunter2'},
|
||||||
|
follow=True,
|
||||||
|
)
|
||||||
|
response = client.get('/login?use_recovery=1')
|
||||||
|
match = re.search(
|
||||||
|
r'input type="hidden" name="otp_device" value="([^"]+)"', str(response.content)
|
||||||
|
)
|
||||||
|
otp_device = match.group(1)
|
||||||
|
response = client.post(
|
||||||
|
'/login?use_recovery=1',
|
||||||
|
{'otp_device': otp_device, 'otp_token': recovery_code},
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
response = client.get(reverse('bid_main:mfa'))
|
||||||
|
self.assertContains(response, '9 recovery codes remaining')
|
||||||
|
@ -28,11 +28,15 @@ class EncryptedRecoveryDevice(StaticDevice):
|
|||||||
def verify_token(self, token):
|
def verify_token(self, token):
|
||||||
"""Copy-pasted verbatim from StaticDevice, replacing token_set with encryptedtoken_set.
|
"""Copy-pasted verbatim from StaticDevice, replacing token_set with encryptedtoken_set.
|
||||||
|
|
||||||
|
ORM filter is rewritten to a loop, because looking up encrypted values doesn't work.
|
||||||
Also added a signal for email notification.
|
Also added a signal for email notification.
|
||||||
"""
|
"""
|
||||||
verify_allowed, _ = self.verify_is_allowed()
|
verify_allowed, _ = self.verify_is_allowed()
|
||||||
if verify_allowed:
|
if verify_allowed:
|
||||||
match = self.encryptedtoken_set.filter(encrypted_token=token).first()
|
match = None
|
||||||
|
for t in self.encryptedtoken_set.all():
|
||||||
|
if t.encrypted_token == token:
|
||||||
|
match = t
|
||||||
if match is not None:
|
if match is not None:
|
||||||
match.delete()
|
match.delete()
|
||||||
self.throttle_reset(commit=False)
|
self.throttle_reset(commit=False)
|
||||||
|
Loading…
Reference in New Issue
Block a user