167 lines
5.7 KiB
Python
167 lines
5.7 KiB
Python
import asyncio
|
|
from unittest.mock import Mock, call
|
|
|
|
from abstract_worker_test import AbstractWorkerTest
|
|
|
|
|
|
class AbstractCommandTest(AbstractWorkerTest):
|
|
def setUp(self):
|
|
from mock_responses import CoroMock
|
|
from flamenco_worker.worker import FlamencoWorker
|
|
from flamenco_worker.cli import construct_asyncio_loop
|
|
|
|
self.loop = construct_asyncio_loop()
|
|
self.fworker = Mock(spec=FlamencoWorker)
|
|
self.fworker.register_log = CoroMock()
|
|
self.fworker.register_task_update = CoroMock()
|
|
|
|
def tearDown(self):
|
|
# This is required for subprocesses, otherwise unregistering signal handlers goes wrong.
|
|
self.loop.close()
|
|
|
|
|
|
class SleepCommandTest(AbstractCommandTest):
|
|
def test_sleep(self):
|
|
import time
|
|
from flamenco_worker.commands import SleepCommand
|
|
|
|
cmd = SleepCommand(
|
|
worker=self.fworker,
|
|
task_id='12345',
|
|
command_idx=0,
|
|
)
|
|
|
|
time_before = time.time()
|
|
ok = self.loop.run_until_complete(asyncio.wait_for(
|
|
cmd.run({'time_in_seconds': 0.5}),
|
|
0.6 # the 'sleep' should be over in not more than 0.1 seconds extra
|
|
))
|
|
duration = time.time() - time_before
|
|
self.assertGreaterEqual(duration, 0.5)
|
|
self.assertTrue(ok)
|
|
|
|
|
|
class ExecCommandTest(AbstractCommandTest):
|
|
def construct(self):
|
|
from flamenco_worker.commands import ExecCommand
|
|
cmd = ExecCommand(
|
|
worker=self.fworker,
|
|
task_id='12345',
|
|
command_idx=0,
|
|
)
|
|
return cmd
|
|
|
|
def test_bad_settings(self):
|
|
cmd = self.construct()
|
|
|
|
settings = {'cmd': [1, 2, 3]}
|
|
|
|
ok = self.loop.run_until_complete(asyncio.wait_for(
|
|
cmd.run(settings),
|
|
0.6
|
|
))
|
|
|
|
self.assertFalse(ok)
|
|
self.fworker.register_task_update.assert_called_once_with(
|
|
task_status='failed',
|
|
activity='exec.(task_id=12345, command_idx=0): Invalid settings: "cmd" must be a string'
|
|
)
|
|
|
|
def test_exec_python(self):
|
|
import shlex
|
|
import sys
|
|
cmd = self.construct()
|
|
|
|
# Use shlex to quote strings like this, so we're sure it's done well.
|
|
args = [sys.executable, '-c', r'print("hello, this is two lines\nYes, really.")']
|
|
settings = {
|
|
'cmd': ' '.join(shlex.quote(s) for s in args)
|
|
}
|
|
|
|
ok = self.loop.run_until_complete(asyncio.wait_for(
|
|
cmd.run(settings),
|
|
0.6
|
|
))
|
|
self.assertTrue(ok)
|
|
|
|
# Check that both lines have been reported.
|
|
self.fworker.register_log.assert_has_calls([
|
|
call('exec: Starting'),
|
|
call('Executing %s',
|
|
'%s -c \'print("hello, this is two lines\\nYes, really.")\'' % sys.executable),
|
|
call('> hello, this is two lines'), # note the logged line doesn't end in a newline
|
|
call('> Yes, really.'), # note the logged line doesn't end in a newline
|
|
call('exec: Finished'),
|
|
])
|
|
|
|
self.fworker.register_task_update.assert_called_with(
|
|
activity='finished exec',
|
|
command_progress_percentage=100,
|
|
current_command_idx=0
|
|
)
|
|
|
|
def test_exec_invalid_utf(self):
|
|
import shlex
|
|
import sys
|
|
cmd = self.construct()
|
|
|
|
# Use shlex to quote strings like this, so we're sure it's done well.
|
|
# Writes an invalid sequence of continuation bytes.
|
|
args = [sys.executable, '-c', r'import sys; sys.stdout.buffer.write(b"\x80\x80\x80")']
|
|
settings = {
|
|
'cmd': ' '.join(shlex.quote(s) for s in args)
|
|
}
|
|
|
|
ok = self.loop.run_until_complete(asyncio.wait_for(
|
|
cmd.run(settings),
|
|
0.6
|
|
))
|
|
self.assertFalse(ok)
|
|
|
|
# Check that the error has been reported.
|
|
decode_err = "exec.(task_id=12345, command_idx=0): Error executing: Command produced " \
|
|
"non-UTF8 output, aborting: 'utf-8' codec can't decode byte 0x80 in " \
|
|
"position 0: invalid start byte"
|
|
self.fworker.register_log.assert_has_calls([
|
|
call('exec: Starting'),
|
|
call('Executing %s',
|
|
'%s -c \'import sys; sys.stdout.buffer.write(b"\\x80\\x80\\x80")\'' % sys.executable),
|
|
call(decode_err),
|
|
])
|
|
|
|
# The update should NOT contain a new task status -- that is left to the Worker.
|
|
self.fworker.register_task_update.assert_called_with(activity=decode_err)
|
|
|
|
def test_exec_python_fails(self):
|
|
import shlex
|
|
import sys
|
|
cmd = self.construct()
|
|
|
|
# Use shlex to quote strings like this, so we're sure it's done well.
|
|
args = [sys.executable, '-c', r'raise SystemExit("¡FAIL!")']
|
|
settings = {
|
|
'cmd': ' '.join(shlex.quote(s) for s in args)
|
|
}
|
|
|
|
ok = self.loop.run_until_complete(asyncio.wait_for(
|
|
cmd.run(settings),
|
|
0.6
|
|
))
|
|
self.assertFalse(ok)
|
|
|
|
# Check that the execution error has been reported.
|
|
self.fworker.register_log.assert_has_calls([
|
|
call('exec: Starting'),
|
|
call('Executing %s',
|
|
'%s -c \'raise SystemExit("¡FAIL!")\'' % sys.executable),
|
|
call('> ¡FAIL!'), # note the logged line doesn't end in a newline
|
|
call('exec.(task_id=12345, command_idx=0): Error executing: '
|
|
'Command failed with status 1')
|
|
])
|
|
|
|
# The update should NOT contain a new task status -- that is left to the Worker.
|
|
self.fworker.register_task_update.assert_called_with(
|
|
activity='exec.(task_id=12345, command_idx=0): Error executing: '
|
|
'Command failed with status 1',
|
|
)
|