281 lines
9.4 KiB
Python
281 lines
9.4 KiB
Python
"""Commandline interface entry points."""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import logging.config
|
|
import os
|
|
import pathlib
|
|
|
|
import requests
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-c', '--config', type=pathlib.Path,
|
|
help='Load this configuration file instead of the default files.')
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help='Show configuration before starting, '
|
|
'and asyncio task status at shutdown.')
|
|
parser.add_argument('-V', '--version', action='store_true',
|
|
help='Show the version of Flamenco Worker and stops.')
|
|
parser.add_argument('-r', '--reregister', action='store_true',
|
|
help="Erases authentication information and re-registers this worker "
|
|
"at the Manager. WARNING: this can cause duplicate worker information "
|
|
"in the Manager's database.")
|
|
parser.add_argument('-d', '--debug', action='store_true',
|
|
help="Enables debug logging for Flamenco Worker's own log entries. "
|
|
"Edit the logging config in flamenco-worker.cfg "
|
|
"for more powerful options.")
|
|
parser.add_argument('-t', '--test', action='store_true',
|
|
help="Starts up in testing mode, in which only a handful of "
|
|
"test-specific task types are accepted. This overrides the task_types "
|
|
"in the configuration file.")
|
|
parser.add_argument('-1', '--single', action='store_true',
|
|
help="Runs a single tasks, then exits.")
|
|
args = parser.parse_args()
|
|
|
|
if args.version:
|
|
from . import __version__
|
|
print(__version__)
|
|
raise SystemExit()
|
|
|
|
# Load configuration
|
|
from . import config
|
|
confparser = config.load_config(args.config, args.verbose, args.test)
|
|
config.configure_logging(confparser, enable_debug=args.debug)
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.debug('Starting, pid=%d', os.getpid())
|
|
|
|
log_startup()
|
|
|
|
if args.test:
|
|
log.warning('Test mode enabled, overriding task_types=%r',
|
|
confparser.value('task_types'))
|
|
|
|
if args.reregister:
|
|
log.warning('Erasing worker_id and worker_secret so we can attempt re-registration.')
|
|
confparser.erase('worker_id')
|
|
confparser.erase('worker_secret')
|
|
|
|
if args.single:
|
|
log.info('Running in single-task mode, will stop after performing one task.')
|
|
|
|
# Find the Manager using UPnP/SSDP if we have no manager_url.
|
|
if not confparser.value('manager_url'):
|
|
from . import ssdp_discover
|
|
|
|
try:
|
|
manager_url = ssdp_discover.find_flamenco_manager()
|
|
except ssdp_discover.DiscoveryFailed:
|
|
log.fatal('Unable to find Flamenco Manager via UPnP/SSDP.')
|
|
raise SystemExit(1)
|
|
|
|
log.info('Found Flamenco Manager at %s', manager_url)
|
|
confparser.setvalue('manager_url', manager_url)
|
|
|
|
# Patch AsyncIO
|
|
from . import patch_asyncio
|
|
patch_asyncio.patch_asyncio()
|
|
|
|
# Construct the AsyncIO loop
|
|
loop = construct_asyncio_loop()
|
|
if args.verbose:
|
|
log.debug('Enabling AsyncIO debugging')
|
|
loop.set_debug(True)
|
|
shutdown_future = loop.create_future()
|
|
|
|
# Piece all the components together.
|
|
from . import runner, worker, upstream, upstream_update_queue, may_i_run, __version__
|
|
|
|
fmanager = upstream.FlamencoManager(
|
|
manager_url=confparser.value('manager_url'),
|
|
flamenco_worker_version=__version__,
|
|
)
|
|
|
|
tuqueue = upstream_update_queue.TaskUpdateQueue(
|
|
db_fname=confparser.value('task_update_queue_db'),
|
|
manager=fmanager,
|
|
shutdown_future=shutdown_future,
|
|
)
|
|
trunner = runner.TaskRunner(
|
|
shutdown_future=shutdown_future)
|
|
|
|
fworker = worker.FlamencoWorker(
|
|
manager=fmanager,
|
|
trunner=trunner,
|
|
tuqueue=tuqueue,
|
|
task_types=confparser.value('task_types').split(),
|
|
worker_id=confparser.value('worker_id'),
|
|
worker_secret=confparser.value('worker_secret'),
|
|
loop=loop,
|
|
shutdown_future=shutdown_future,
|
|
push_log_max_interval=confparser.interval_secs('push_log_max_interval_seconds'),
|
|
push_log_max_entries=confparser.value('push_log_max_entries', int),
|
|
push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
|
|
initial_state='testing' if args.test else 'awake',
|
|
run_single_task=args.single,
|
|
)
|
|
|
|
mir = may_i_run.MayIRun(
|
|
manager=fmanager,
|
|
worker=fworker,
|
|
poll_interval=confparser.interval_secs('may_i_run_interval_seconds'),
|
|
loop=loop,
|
|
)
|
|
|
|
def shutdown(signum, stackframe):
|
|
"""Perform a clean shutdown."""
|
|
|
|
# Raise an exception, so that the exception is bubbled upwards, until
|
|
# the asyncio loop stops executing the current task. Only then can we
|
|
# run things like loop.run_until_complete(mir_work_task).
|
|
log.warning('Shutting down due to signal %i', signum)
|
|
raise KeyboardInterrupt()
|
|
|
|
def sleep(signum, stackframe):
|
|
log.warning('Going asleep due to signal %i', signum)
|
|
fworker.go_to_state_asleep()
|
|
|
|
def wakeup(signum, stackframe):
|
|
log.warning('Waking up due to signal %i', signum)
|
|
fworker.go_to_state_awake()
|
|
|
|
# Shut down cleanly upon TERM signal
|
|
import signal
|
|
signal.signal(signal.SIGTERM, shutdown)
|
|
signal.signal(signal.SIGINT, shutdown)
|
|
|
|
if hasattr(signal, 'SIGUSR1'):
|
|
# Windows doesn't have USR1/2 signals.
|
|
signal.signal(signal.SIGUSR1, sleep)
|
|
signal.signal(signal.SIGUSR2, wakeup)
|
|
|
|
if hasattr(signal, 'SIGPOLL'):
|
|
# Not sure how cross-platform SIGPOLL is.
|
|
signal.signal(signal.SIGPOLL, asyncio_report_tasks)
|
|
|
|
# Start asynchronous tasks.
|
|
asyncio.ensure_future(tuqueue.work(loop=loop))
|
|
mir_work_task = asyncio.ensure_future(mir.work())
|
|
|
|
def do_clean_shutdown():
|
|
shutdown_future.cancel()
|
|
mir_work_task.cancel()
|
|
try:
|
|
loop.run_until_complete(asyncio.wait_for(mir_work_task, 5))
|
|
except requests.exceptions.ConnectionError:
|
|
log.warning("Unable to connect to HTTP server, but that's fine as we're shutting down.")
|
|
|
|
fworker.shutdown()
|
|
|
|
async def stop_loop():
|
|
log.info('Waiting to give tasks the time to stop gracefully')
|
|
await asyncio.sleep(1)
|
|
loop.stop()
|
|
|
|
loop.run_until_complete(stop_loop())
|
|
|
|
try:
|
|
loop.run_until_complete(fworker.startup())
|
|
fworker.mainloop()
|
|
except worker.UnableToRegisterError:
|
|
# The worker will have logged something, we'll just shut down cleanly.
|
|
pass
|
|
except KeyboardInterrupt:
|
|
do_clean_shutdown()
|
|
except:
|
|
log.exception('Uncaught exception!')
|
|
else:
|
|
do_clean_shutdown()
|
|
|
|
# Report on the asyncio task status
|
|
if args.verbose:
|
|
asyncio_report_tasks()
|
|
|
|
log.warning('Closing asyncio loop')
|
|
loop.close()
|
|
log.warning('Flamenco Worker is shut down')
|
|
|
|
|
|
def asyncio_report_tasks(signum=0, stackframe=None):
|
|
"""Runs the garbage collector, then reports all AsyncIO tasks on the log.
|
|
|
|
Can be used as signal handler.
|
|
"""
|
|
|
|
log = logging.getLogger('%s.asyncio_report_tasks' % __name__)
|
|
log.info('Logging all asyncio tasks.')
|
|
|
|
all_tasks = asyncio.Task.all_tasks()
|
|
count_done = sum(task.done() for task in all_tasks)
|
|
|
|
if not len(all_tasks):
|
|
log.info('No scheduled tasks')
|
|
elif len(all_tasks) == count_done:
|
|
log.info('All %i tasks are done.', len(all_tasks))
|
|
else:
|
|
log.info('%i tasks, of which %i are done.', len(all_tasks), count_done)
|
|
|
|
import gc
|
|
import traceback
|
|
|
|
# Clean up circular references between tasks.
|
|
gc.collect()
|
|
|
|
for task_idx, task in enumerate(all_tasks):
|
|
if not task.done():
|
|
log.info(' task #%i: %s', task_idx, task)
|
|
continue
|
|
|
|
# noinspection PyBroadException
|
|
try:
|
|
res = task.result()
|
|
log.info(' task #%i: %s result=%r', task_idx, task, res)
|
|
except asyncio.CancelledError:
|
|
# No problem, we want to stop anyway.
|
|
log.info(' task #%i: %s cancelled', task_idx, task)
|
|
except Exception:
|
|
log.info('%s: resulted in exception: %s', task, traceback.format_exc())
|
|
|
|
# for ref in gc.get_referrers(task):
|
|
# log.info(' - referred by %s', ref)
|
|
|
|
log.info('Done logging.')
|
|
|
|
|
|
def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
|
|
# On Windows, the default event loop is SelectorEventLoop which does
|
|
# not support subprocesses. ProactorEventLoop should be used instead.
|
|
# Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
|
|
import sys
|
|
|
|
if sys.platform == 'win32':
|
|
loop = asyncio.ProactorEventLoop()
|
|
else:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_closed():
|
|
loop = asyncio.new_event_loop()
|
|
|
|
asyncio.set_event_loop(loop)
|
|
return loop
|
|
|
|
|
|
def log_startup():
|
|
"""Log the version of Flamenco Worker."""
|
|
|
|
from . import __version__
|
|
|
|
log = logging.getLogger(__name__)
|
|
old_level = log.level
|
|
try:
|
|
log.setLevel(logging.INFO)
|
|
log.info('Starting Flamenco Worker %s', __version__)
|
|
finally:
|
|
log.setLevel(old_level)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|