This repository has been archived on 2023-02-07. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
flamenco-worker/tests/test_commands_subprocess.py
Sybren A. Stüvel 9cb509b10e Panic when older subprocess is detected running
Panic when an older subprocess is already running when trying to start a
new subprocess. This situation was already detected, but instead of
failing the current task (and keeping the Worker running), the Worker
now shuts down completely. When running as a systemd service, the Worker
will automatically be restarted.

When this panic occurs, the Worker still signs off at the Manager,
causing its current task to be returned to the queue. That way another
Worker (or this one, after the restart) can retry it.

This is basically a hack to work around the issue that sometimes Blender
is running twice on the same machine.
2021-07-09 14:26:50 +02:00

160 lines
5.3 KiB
Python

import asyncio
import os
from pathlib import Path
import random
import shlex
import sys
import tempfile
import time
import psutil
from tests.test_runner import AbstractCommandTest
class PIDFileTest(AbstractCommandTest):
def setUp(self):
super().setUp()
from flamenco_worker.commands import ExecCommand
self.cmd = ExecCommand(
worker=self.fworker,
task_id="12345",
command_idx=0,
)
def _assert_panic_error(self, command_settings) -> str:
from flamenco_worker.worker import PanicError
try:
self.cmd.validate(command_settings)
except PanicError as ex:
msg = str(ex)
else:
self.fail("Expected PanicError not raised")
return msg
def test_alive(self):
with tempfile.TemporaryDirectory(suffix=".pid") as tmpdir:
pidfile = Path(tmpdir) / "pidfile.pid"
my_pid = os.getpid()
pidfile.write_text(str(my_pid))
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self._assert_panic_error({"cmd": "echo"})
self.assertIn(str(pidfile), msg)
self.assertIn(str(psutil.Process(my_pid)), msg)
def test_alive_newlines(self):
with tempfile.TemporaryDirectory(suffix=".pid") as tmpdir:
pidfile = Path(tmpdir) / "pidfile.pid"
my_pid = os.getpid()
pidfile.write_text("\n%s\n" % my_pid)
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self._assert_panic_error({"cmd": "echo"})
self.assertIn(str(pidfile), msg)
self.assertIn(str(psutil.Process(my_pid)), msg)
def test_dead(self):
# Find a PID that doesn't exist.
for _ in range(1000):
pid = random.randint(1, 2 ** 16)
try:
psutil.Process(pid)
except psutil.NoSuchProcess:
break
else:
self.fail("Unable to find unused PID")
with tempfile.TemporaryDirectory(suffix=".pid") as tmpname:
tmpdir = Path(tmpname)
pidfile = tmpdir / "stale.pid"
pidfile.write_text(str(pid))
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({"cmd": "echo"})
self.assertFalse(msg)
self.assertFalse(
pidfile.exists(), "Stale PID file should have been deleted"
)
def test_nonexistant(self):
with tempfile.TemporaryDirectory(suffix=".pid") as tmpname:
tmpdir = Path(tmpname)
pidfile = tmpdir / "nonexistant.pid"
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({"cmd": "echo"})
self.assertFalse(msg)
def test_empty(self):
with tempfile.TemporaryDirectory(suffix=".pid") as tmpname:
tmpdir = Path(tmpname)
pidfile = tmpdir / "empty.pid"
pidfile.write_bytes(b"")
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({"cmd": "echo"})
self.assertTrue(msg, "Empty PID file should be treated as 'alive'")
self.assertTrue(
pidfile.exists(), "Empty PID file should not have been deleted"
)
def test_not_configured(self):
self.cmd.worker.trunner.subprocess_pid_file = None
msg = self.cmd.validate({"cmd": "echo"})
self.assertFalse(msg)
def test_race_open_exclusive(self):
# When there is a race condition such that the exclusive open() of the
# subprocess PID file fails, the new subprocess should be killed.
# Use shlex to quote strings like this, so we're sure it's done well.
args = [sys.executable, "-c", "import time; time.sleep(1)"]
cmd = " ".join(shlex.quote(s) for s in args)
with tempfile.TemporaryDirectory() as tmpdir:
pidfile = Path(tmpdir) / "race.pid"
my_pid = os.getpid()
# Set up the race condition: at validation time the PID file doesn't exist yet,
# but at execute time it does.
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({"cmd": cmd})
self.assertIsNone(msg)
# Mock an already-running process by writing our own PID.
pidfile.write_text(str(my_pid))
start_time = time.time()
with self.assertRaises(FileExistsError):
self.loop.run_until_complete(
asyncio.wait_for(
self.cmd.execute({"cmd": cmd}),
1.3, # no more than 300 ms longer than the actual sleep
)
)
duration = time.time() - start_time
# This shouldn't take anywhere near the entire sleep time, as that would
# mean the command was executed while there was already another one running.
self.assertLess(
duration,
0.8,
"Checking the PID file and killing the process should be fast",
)
pid = self.cmd.proc.pid
with self.assertRaises(psutil.NoSuchProcess):
process = psutil.Process(pid)
self.fail(f"Process {process} is still running")