Automatically re-register when the Manager does not accept credentials
Fixes T54174, but rather than making a distinction between the username not being found and the password being wrong (and only re-registering in the former case), we now just always re-register. This could potentially hide certain erroneous situations, but it does make the worker operational in more cases, which I assume is generally preferred.
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
This file logs the changes that are actually interesting to users (new features,
|
||||
changed functionality, fixed bugs).
|
||||
|
||||
|
||||
## Version 2.3 (in development)
|
||||
|
||||
- Changed how progressive rendering works. Nonuniform tasks are now supported. This requires
|
||||
@@ -12,6 +13,8 @@ changed functionality, fixed bugs).
|
||||
EXR files to JPEG files. This is used in progressive rendering to get intermediary previews.
|
||||
- Added the `merge_progressive_render_sequence` for sample-merging sequences of EXR files. The
|
||||
already-existing `merge_progressive_renders` command only performed on one frame at a time.
|
||||
- The Worker now automatically re-registers when the Manager does not accept its credentials.
|
||||
This makes it easier to handle erasure of the Manager's database.
|
||||
|
||||
|
||||
## Version 2.2.1 (2019-01-14)
|
||||
|
||||
@@ -213,9 +213,9 @@ class FlamencoWorker:
|
||||
resp = await self.manager.post(url, **post_kwargs)
|
||||
resp.raise_for_status()
|
||||
except requests.RequestException as ex:
|
||||
if not may_retry_loop:
|
||||
self._log.error('Unable to POST to manager %s: %s', url, ex)
|
||||
raise UnableToRegisterError()
|
||||
if not may_retry_loop or ex.response.status_code == 401:
|
||||
self._log.debug('Unable to POST to manager %s: %s', url, ex)
|
||||
raise
|
||||
|
||||
self._log.warning('Unable to POST to manager %s, retrying in %i seconds: %s',
|
||||
url, REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex)
|
||||
@@ -223,22 +223,42 @@ class FlamencoWorker:
|
||||
else:
|
||||
return resp
|
||||
|
||||
async def signon(self, *, may_retry_loop: bool):
|
||||
async def signon(self, *, may_retry_loop: bool,
|
||||
autoregister_already_tried: bool=False):
|
||||
"""Signs on at the manager.
|
||||
|
||||
Only needed when we didn't just register.
|
||||
"""
|
||||
|
||||
self._log.info('Signing on at manager.')
|
||||
await self._keep_posting_to_manager(
|
||||
'/sign-on',
|
||||
json={
|
||||
'supported_task_types': self.task_types,
|
||||
'nickname': self.hostname(),
|
||||
},
|
||||
may_retry_loop=may_retry_loop,
|
||||
)
|
||||
self._log.info('Manager accepted sign-on.')
|
||||
try:
|
||||
await self._keep_posting_to_manager(
|
||||
'/sign-on',
|
||||
json={
|
||||
'supported_task_types': self.task_types,
|
||||
'nickname': self.hostname(),
|
||||
},
|
||||
may_retry_loop=may_retry_loop,
|
||||
)
|
||||
except requests.exceptions.HTTPError as ex:
|
||||
if ex.response.status_code != 401:
|
||||
self._log.error('Unable to sign on at Manager: %s', ex)
|
||||
raise UnableToRegisterError()
|
||||
|
||||
if autoregister_already_tried:
|
||||
self._log.error('Manager did not accept our credentials, and re-registration '
|
||||
'was already attempted. Giving up.')
|
||||
raise UnableToRegisterError()
|
||||
|
||||
self._log.warning('Manager did not accept our credentials, going to re-register')
|
||||
await self.register_at_manager(may_retry_loop=may_retry_loop)
|
||||
|
||||
self._log.warning('Re-registration was fine, going to re-try sign-on')
|
||||
await self.signon(may_retry_loop=may_retry_loop, autoregister_already_tried=True)
|
||||
else:
|
||||
# Expected flow: no exception, manager accepts credentials.
|
||||
self._log.info('Manager accepted sign-on.')
|
||||
|
||||
|
||||
async def register_at_manager(self, *, may_retry_loop: bool):
|
||||
self._log.info('Registering at manager')
|
||||
@@ -261,6 +281,7 @@ class FlamencoWorker:
|
||||
result = resp.json()
|
||||
self._log.info('Response: %s', result)
|
||||
self.worker_id = result['_id']
|
||||
self.manager.auth = (self.worker_id, self.worker_secret)
|
||||
|
||||
self.write_registration_info()
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ class EmptyResponse:
|
||||
pass
|
||||
|
||||
|
||||
def CoroMock(return_value=None):
|
||||
def CoroMock(return_value=None, side_effect=...):
|
||||
"""Corountine mocking object.
|
||||
|
||||
For an example, see test_coro_mock.py.
|
||||
@@ -63,8 +63,12 @@ def CoroMock(return_value=None):
|
||||
Source: http://stackoverflow.com/a/32505333/875379
|
||||
|
||||
:param return_value: whatever you want to have set as return value.
|
||||
This must always be set. Pass the ellipsis object ... to not set this; in that case
|
||||
you are responsible yourself to set coromock.coro.return_value.
|
||||
:param side_effect: whatever you want to have set as mock side-effect.
|
||||
|
||||
Either return_value or side_effect must always be set. Pass the ellipsis
|
||||
object to either parameter ... to not set them. When passing ellipsis to
|
||||
both parameters you are responsible yourself to set
|
||||
coromock.coro.return_value or coromock.coro.side_effect.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -76,5 +80,7 @@ def CoroMock(return_value=None):
|
||||
|
||||
if return_value is not ...:
|
||||
corofunc.coro.return_value = return_value
|
||||
if side_effect is not ...:
|
||||
corofunc.coro.side_effect = side_effect
|
||||
|
||||
return corofunc
|
||||
|
||||
@@ -19,3 +19,21 @@ class CoroMockTest(unittest.TestCase):
|
||||
|
||||
cm.assert_called_once_with(3, 4)
|
||||
self.assertEqual('123', result)
|
||||
|
||||
def test_setting_side_effect(self):
|
||||
from tests.mock_responses import CoroMock
|
||||
|
||||
cm = CoroMock()
|
||||
cm.coro.side_effect = ['123', '456', IOError('oops')]
|
||||
|
||||
self.assertEqual('123', self.loop.run_until_complete(cm(3, 4)))
|
||||
self.assertEqual('456', self.loop.run_until_complete(cm(3, 4)))
|
||||
|
||||
with self.assertRaises(IOError):
|
||||
self.loop.run_until_complete(cm(3, 4))
|
||||
|
||||
# A generator is not allowed to raise StopIteration by itself,
|
||||
# so the StopIteration caused by side_effect being exhausted
|
||||
# results in a RuntimeError.
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.loop.run_until_complete(cm(3, 4))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import concurrent.futures
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from unittest.mock import Mock
|
||||
from unittest.mock import Mock, call
|
||||
|
||||
import asyncio
|
||||
import requests
|
||||
@@ -157,6 +157,58 @@ class WorkerStartupTest(AbstractFWorkerTest):
|
||||
loop=self.asyncio_loop,
|
||||
)
|
||||
|
||||
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
|
||||
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
|
||||
def test_reregister_if_forbidden(self, mock_merge_with_home_config):
|
||||
from tests.mock_responses import CoroMock, EmptyResponse, JsonResponse, TextResponse
|
||||
from flamenco_worker.worker import detect_platform
|
||||
|
||||
self.manager.post = CoroMock(side_effect=[
|
||||
# First sign-on fails:
|
||||
requests.exceptions.HTTPError(
|
||||
response=TextResponse(text='401 Unauthorized', status_code=401)),
|
||||
# Automatic re-register response:
|
||||
JsonResponse({'_id': '47327'}),
|
||||
# Subsequent sign-on is OK:
|
||||
EmptyResponse(),
|
||||
])
|
||||
|
||||
self.assertEqual(self.worker.worker_id, '1234')
|
||||
old_worker_secret = self.worker.worker_secret
|
||||
|
||||
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False))
|
||||
|
||||
mock_merge_with_home_config.assert_called_once_with(
|
||||
{'worker_id': '47327',
|
||||
'worker_secret': self.worker.worker_secret})
|
||||
|
||||
self.assertEqual(self.worker.worker_id, '47327')
|
||||
self.assertNotEqual(old_worker_secret, self.worker.worker_secret)
|
||||
self.assertEqual(('47327', self.worker.worker_secret), self.worker.manager.auth)
|
||||
|
||||
self.manager.post.assert_has_calls([
|
||||
call('/sign-on',
|
||||
json={
|
||||
'supported_task_types': ['sleep', 'unittest'],
|
||||
'nickname': 'ws-unittest',
|
||||
},
|
||||
loop=self.asyncio_loop),
|
||||
call('/register-worker',
|
||||
json={'secret': self.worker.worker_secret,
|
||||
'platform': detect_platform(),
|
||||
'supported_task_types': ['sleep', 'unittest'],
|
||||
'nickname': 'ws-unittest'},
|
||||
auth=None,
|
||||
loop=self.asyncio_loop),
|
||||
call('/sign-on',
|
||||
json={
|
||||
'supported_task_types': ['sleep', 'unittest'],
|
||||
'nickname': 'ws-unittest',
|
||||
},
|
||||
loop=self.asyncio_loop),
|
||||
])
|
||||
self.tuqueue.queue.assert_not_called()
|
||||
|
||||
|
||||
class TestWorkerTaskExecution(AbstractFWorkerTest):
|
||||
def setUp(self):
|
||||
|
||||
Reference in New Issue
Block a user