import asyncio import copy import datetime import logging import tempfile from unittest.mock import Mock import requests from tests.abstract_worker_test import AbstractWorkerTest class TaskUpdateQueueTest(AbstractWorkerTest): def setUp(self): from flamenco_worker.upstream import FlamencoManager from flamenco_worker.upstream_update_queue import TaskUpdateQueue from flamenco_worker.cli import construct_asyncio_loop from tests.mock_responses import CoroMock logging.getLogger('flamenco_worker.upstream_update_queue').setLevel(logging.DEBUG) self.asyncio_loop = construct_asyncio_loop() self.shutdown_future = self.asyncio_loop.create_future() self.manager = Mock(spec=FlamencoManager) self.manager.post = CoroMock() self.tmpdir = tempfile.TemporaryDirectory() self.tuqueue = TaskUpdateQueue( db_fname='%s/unittest.db' % self.tmpdir.name, manager=self.manager, shutdown_future=self.shutdown_future, backoff_time=0.3, # faster retry to keep the unittest speedy. ) def tearDown(self): self.tuqueue._disconnect_db() super().tearDown() self.tmpdir.cleanup() def test_queue_push(self): """Test that a queue() is followed by an actual push to Flamenco Manager. Also tests connection errors and other HTTP error statuses. """ from tests.mock_responses import JsonResponse, EmptyResponse # Try different value types payload = {'key': 'value', 'sub': {'some': 13, 'values': datetime.datetime.now()}} tries = 0 received_payload = None received_url = None received_loop = None async def push_callback(url, *, json, loop): nonlocal tries nonlocal received_url nonlocal received_payload nonlocal received_loop tries += 1 if tries < 3: raise requests.ConnectionError() if tries == 3: return JsonResponse({}, status_code=500) # Shut down after handling this push. self.shutdown_future.cancel() # Remember what we received. Calling self.assertEqual() here doesn't stop the unittest, # since the work loop is designed to keep running, even when exceptions are thrown. received_url = url received_payload = copy.deepcopy(json) received_loop = loop return EmptyResponse() self.manager.post.side_effect = push_callback self.tuqueue.queue('/push/here', payload) # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling # the actual payload. self.asyncio_loop.run_until_complete( asyncio.wait_for( self.tuqueue.work(loop=self.asyncio_loop), timeout=2 ) ) # Check the payload. self.assertEqual(received_url, '/push/here') self.assertEqual(received_payload, payload) self.assertEqual(received_loop, self.asyncio_loop) def test_queue_persistence(self): """Check that updates are pushed, even when the process is stopped & restarted.""" from tests.mock_responses import EmptyResponse from flamenco_worker.upstream_update_queue import TaskUpdateQueue # Try different value types payload = {'key': 'value', 'sub': {'some': 13, 'values': datetime.datetime.now()}} self.tuqueue.queue('/push/there', payload) self.manager.post.assert_not_called() self.tuqueue._disconnect_db() # Create a new tuqueue to handle the push, using the same database. # Note that we don't have to stop self.tuqueue because we never ran self.tuqueue.work(). new_tuqueue = TaskUpdateQueue( db_fname=self.tuqueue.db_fname, manager=self.manager, shutdown_future=self.shutdown_future, backoff_time=5, # no retry in this test, so any retry should cause a timeout. ) received_payload = None received_url = None received_loop = None async def push_callback(url, *, json, loop): nonlocal received_url nonlocal received_payload nonlocal received_loop # Shut down after handling this push. self.shutdown_future.cancel() received_url = url received_payload = copy.deepcopy(json) received_loop = loop return EmptyResponse() self.manager.post.side_effect = push_callback # This should pick up on the pushed data. self.asyncio_loop.run_until_complete( asyncio.wait_for( new_tuqueue.work(loop=self.asyncio_loop), timeout=2 ) ) # Check the payload self.assertEqual(received_url, '/push/there') self.assertEqual(received_payload, payload) self.assertEqual(received_loop, self.asyncio_loop) def test_conflict(self): """A 409 Conflict response should discard a queued task update. """ from tests.mock_responses import TextResponse # Try different value types payload = {'key': 'value', 'sub': {'some': 13, 'values': datetime.datetime.now()}} tries = 0 async def push_callback(url, *, json, loop): nonlocal tries tries += 1 self.shutdown_future.cancel() return TextResponse("no", status_code=409) self.manager.post.side_effect = push_callback self.tuqueue.queue('/push/here', payload) # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling # the actual payload. self.asyncio_loop.run_until_complete( asyncio.wait_for( self.tuqueue.work(loop=self.asyncio_loop), timeout=2 ) ) # There should only be one attempt at delivering this payload. self.assertEqual(1, tries) self.assertEqual([], list(self.tuqueue._queue())) def test_task_gone(self): """A 404 Not Found response should discard a queued task update. This can happen in a race condition, when a task is deleted/archived while there are still updates in the local queue. """ from tests.mock_responses import TextResponse # Try different value types payload = {'key': 'value', 'sub': {'some': 13, 'values': datetime.datetime.now()}} tries = 0 async def push_callback(url, *, json, loop): nonlocal tries tries += 1 self.shutdown_future.cancel() return TextResponse("no", status_code=404) self.manager.post.side_effect = push_callback self.tuqueue.queue('/push/here', payload) # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling # the actual payload. self.asyncio_loop.run_until_complete( asyncio.wait_for( self.tuqueue.work(loop=self.asyncio_loop), timeout=2 ) ) # There should only be one attempt at delivering this payload. self.assertEqual(1, tries) self.assertEqual([], list(self.tuqueue._queue()))