This repository has been archived on 2023-02-07. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Sybren A. Stüvel 05408dc8c1 Cleanup: reformat with Black
Reformat the entire project with Black.

No functional changes.
2021-07-09 14:26:13 +02:00

368 lines
11 KiB
Python

"""Commandline interface entry points."""
import argparse
import asyncio
import logging
import logging.config
import os
import pathlib
import platform
import typing
import requests
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
type=pathlib.Path,
help="Load this configuration file instead of the default files.",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Show configuration before starting, "
"and asyncio task status at shutdown.",
)
parser.add_argument(
"-V",
"--version",
action="store_true",
help="Show the version of Flamenco Worker and stops.",
)
parser.add_argument(
"-r",
"--reregister",
action="store_true",
help="Erases authentication information and re-registers this worker "
"at the Manager. WARNING: this can cause duplicate worker information "
"in the Manager's database.",
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="Enables debug logging for Flamenco Worker's own log entries. "
"Edit the logging config in flamenco-worker.cfg "
"for more powerful options.",
)
parser.add_argument(
"-t",
"--test",
action="store_true",
help="Starts up in testing mode, in which only a handful of "
"test-specific task types are accepted. This overrides the task_types "
"in the configuration file.",
)
parser.add_argument(
"-1", "--single", action="store_true", help="Runs a single tasks, then exits."
)
args = parser.parse_args()
if args.version:
from . import __version__
print(__version__)
raise SystemExit()
# Load configuration
from . import config
confparser = config.load_config(args.config, args.verbose, args.test)
config.configure_logging(confparser, enable_debug=args.debug)
log = logging.getLogger(__name__)
log.debug("Starting, pid=%d", os.getpid())
log_startup()
if args.test:
log.warning(
"Test mode enabled, overriding task_types=%r",
confparser.value("task_types"),
)
if args.reregister:
log.warning(
"Erasing worker_id and worker_secret so we can attempt re-registration."
)
confparser.erase("worker_id")
confparser.erase("worker_secret")
if args.single:
log.info("Running in single-task mode, will stop after performing one task.")
# Find the Manager using UPnP/SSDP if we have no manager_url.
if not confparser.value("manager_url"):
from . import ssdp_discover
try:
manager_url = ssdp_discover.find_flamenco_manager()
except ssdp_discover.DiscoveryFailed:
log.fatal("Unable to find Flamenco Manager via UPnP/SSDP.")
raise SystemExit(1)
log.info("Found Flamenco Manager at %s", manager_url)
confparser.setvalue("manager_url", manager_url)
# Patch AsyncIO
from . import patch_asyncio
patch_asyncio.patch_asyncio()
# Construct the AsyncIO loop
loop = construct_asyncio_loop()
if args.verbose:
log.debug("Enabling AsyncIO debugging")
loop.set_debug(True)
shutdown_future = loop.create_future()
# Piece all the components together.
from . import (
runner,
worker,
upstream,
upstream_update_queue,
may_i_run,
__version__,
)
fmanager = upstream.FlamencoManager(
manager_url=confparser.value("manager_url"),
flamenco_worker_version=__version__,
)
tuqueue = upstream_update_queue.TaskUpdateQueue(
db_fname=confparser.value("task_update_queue_db"),
manager=fmanager,
shutdown_future=shutdown_future,
)
trunner = runner.TaskRunner(
shutdown_future=shutdown_future,
subprocess_pid_file=confparser.value("subprocess_pid_file"),
)
pretask_check_params = parse_pretask_check_config(confparser, log)
fworker = worker.FlamencoWorker(
manager=fmanager,
trunner=trunner,
tuqueue=tuqueue,
task_types=confparser.value("task_types").split(),
worker_id=confparser.value("worker_id"),
worker_secret=confparser.value("worker_secret"),
worker_registration_secret=confparser.value("worker_registration_secret"),
loop=loop,
shutdown_future=shutdown_future,
push_log_max_interval=confparser.interval_secs("push_log_max_interval_seconds"),
push_log_max_entries=confparser.value("push_log_max_entries", int),
push_act_max_interval=confparser.interval_secs("push_act_max_interval_seconds"),
initial_state="testing" if args.test else "awake",
run_single_task=args.single,
pretask_check_params=pretask_check_params,
)
mir = may_i_run.MayIRun(
manager=fmanager,
worker=fworker,
poll_interval=confparser.interval_secs("may_i_run_interval_seconds"),
loop=loop,
)
def shutdown(signum, stackframe):
"""Perform a clean shutdown."""
# Raise an exception, so that the exception is bubbled upwards, until
# the asyncio loop stops executing the current task. Only then can we
# run things like loop.run_until_complete(mir_work_task).
log.warning("Shutting down due to signal %i", signum)
raise KeyboardInterrupt()
def sleep(signum, stackframe):
log.warning("Going asleep due to signal %i", signum)
fworker.go_to_state_asleep()
def wakeup(signum, stackframe):
log.warning("Waking up due to signal %i", signum)
fworker.go_to_state_awake()
# Shut down cleanly upon TERM signal
import signal
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
if hasattr(signal, "SIGUSR1"):
# Windows doesn't have USR1/2 signals.
signal.signal(signal.SIGUSR1, sleep)
signal.signal(signal.SIGUSR2, wakeup)
if hasattr(signal, "SIGPOLL"):
# Not sure how cross-platform SIGPOLL is.
signal.signal(signal.SIGPOLL, asyncio_report_tasks)
# Start asynchronous tasks.
asyncio.ensure_future(tuqueue.work(loop=loop))
mir_work_task = asyncio.ensure_future(mir.work())
def do_clean_shutdown():
shutdown_future.cancel()
mir_work_task.cancel()
try:
loop.run_until_complete(asyncio.wait_for(mir_work_task, 5))
except requests.exceptions.ConnectionError:
log.warning(
"Unable to connect to HTTP server, but that's fine as we're shutting down."
)
except asyncio.TimeoutError:
log.debug(
"Timeout waiting for may-I-run task, "
"but that's fine as we're shutting down."
)
except KeyboardInterrupt:
log.info(
"Keyboard interrupt while shutting down, ignoring as we are shutting down."
)
fworker.shutdown()
async def stop_loop():
log.info("Waiting to give tasks the time to stop gracefully")
await asyncio.sleep(1)
loop.stop()
loop.run_until_complete(stop_loop())
try:
loop.run_until_complete(fworker.startup())
fworker.mainloop()
except worker.UnableToRegisterError:
# The worker will have logged something, we'll just shut down cleanly.
pass
except KeyboardInterrupt:
do_clean_shutdown()
except:
log.exception("Uncaught exception!")
else:
do_clean_shutdown()
# Report on the asyncio task status
if args.verbose:
asyncio_report_tasks()
log.warning("Closing asyncio loop")
loop.close()
log.warning("Flamenco Worker is shut down")
def parse_pretask_check_config(confparser, log):
"""Parse the [pre_task_check] config section.
:rtype: flamenco.worker.PreTaskCheckParams
"""
from . import worker
check_read = []
check_write = []
for name, value in confparser.items(section="pre_task_check"):
if name.startswith("write"):
check_write.append(pathlib.Path(value))
elif name.startswith("read"):
check_read.append(pathlib.Path(value))
else:
log.fatal(
'Config section "pre_task_check" should only have keys starting with '
'"read" or "write"; found %r',
value,
)
raise SystemExit(47)
pretask_check_params = worker.PreTaskCheckParams(
pre_task_check_write=tuple(check_write),
pre_task_check_read=tuple(check_read),
)
return pretask_check_params
def asyncio_report_tasks(signum=0, stackframe=None):
"""Runs the garbage collector, then reports all AsyncIO tasks on the log.
Can be used as signal handler.
"""
log = logging.getLogger("%s.asyncio_report_tasks" % __name__)
log.info("Logging all asyncio tasks.")
all_tasks = asyncio.Task.all_tasks()
count_done = sum(task.done() for task in all_tasks)
if not len(all_tasks):
log.info("No scheduled tasks")
elif len(all_tasks) == count_done:
log.info("All %i tasks are done.", len(all_tasks))
else:
log.info("%i tasks, of which %i are done.", len(all_tasks), count_done)
import gc
import traceback
# Clean up circular references between tasks.
gc.collect()
for task_idx, task in enumerate(all_tasks):
if not task.done():
log.info(" task #%i: %s", task_idx, task)
continue
# noinspection PyBroadException
try:
res = task.result()
log.info(" task #%i: %s result=%r", task_idx, task, res)
except asyncio.CancelledError:
# No problem, we want to stop anyway.
log.info(" task #%i: %s cancelled", task_idx, task)
except Exception:
log.info("%s: resulted in exception: %s", task, traceback.format_exc())
# for ref in gc.get_referrers(task):
# log.info(' - referred by %s', ref)
log.info("Done logging.")
def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
# On Windows, the default event loop is SelectorEventLoop which does
# not support subprocesses. ProactorEventLoop should be used instead.
# Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
if platform.system() == "Windows":
# Silly MyPy doesn't understand this only runs on Windows.
if not isinstance(loop, asyncio.ProactorEventLoop): # type: ignore
loop = asyncio.ProactorEventLoop() # type: ignore
asyncio.set_event_loop(loop)
return loop
def log_startup():
"""Log the version of Flamenco Worker."""
from . import __version__
log = logging.getLogger(__name__)
old_level = log.level
try:
log.setLevel(logging.INFO)
log.info("Starting Flamenco Worker %s", __version__)
finally:
log.setLevel(old_level)
if __name__ == "__main__":
main()