From d6506b640243cfd20d6fe3e58a8c40c141e32ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 6 Jun 2017 15:05:18 +0200 Subject: [PATCH] Moved Celery CLI commands to 'manage.py celery' submodule + added extensions: Added a 'celery queue' command, which is supposed to show queued Celery tasks (but doesn't work quite as I'd expect). Added a 'celery purge' command, which purges queued Celery tasks. --- README.md | 4 ++- pillar/cli/__init__.py | 2 ++ pillar/cli/celery.py | 60 ++++++++++++++++++++++++++++++++++++++++ pillar/cli/operations.py | 20 -------------- 4 files changed, 65 insertions(+), 21 deletions(-) create mode 100644 pillar/cli/celery.py diff --git a/README.md b/README.md index af550103..bdc8a500 100644 --- a/README.md +++ b/README.md @@ -61,4 +61,6 @@ Pillar requires [Celery](http://www.celeryproject.org/) for background task proc turn requires a backend and a broker, for which the default Pillar configuration uses Redis and RabbitMQ. -You can run the Celery Worker using `manage.py operations worker`. +You can run the Celery Worker using `manage.py celery worker`. + +Find other Celery operations with the `manage.py celery` command. diff --git a/pillar/cli/__init__.py b/pillar/cli/__init__.py index fb243181..dcfc07a7 100644 --- a/pillar/cli/__init__.py +++ b/pillar/cli/__init__.py @@ -8,6 +8,7 @@ import logging from flask_script import Manager from pillar import current_app +from pillar.cli.celery import manager_celery from pillar.cli.maintenance import manager_maintenance from pillar.cli.operations import manager_operations from pillar.cli.setup import manager_setup @@ -15,6 +16,7 @@ from pillar.cli.setup import manager_setup log = logging.getLogger(__name__) manager = Manager(current_app) +manager.add_command('celery', manager_celery) manager.add_command("maintenance", manager_maintenance) manager.add_command("setup", manager_setup) manager.add_command("operations", manager_operations) diff --git a/pillar/cli/celery.py b/pillar/cli/celery.py new file mode 100644 index 00000000..85abe55f --- /dev/null +++ b/pillar/cli/celery.py @@ -0,0 +1,60 @@ +import logging + +from flask_script import Manager +from pillar import current_app + +log = logging.getLogger(__name__) + +manager_celery = Manager( + current_app, usage="Celery operations, like starting a worker or showing the queue") + + +@manager_celery.option('args', nargs='*') +def worker(args): + """Runs a Celery worker.""" + + import sys + + argv0 = f'{sys.argv[0]} operations worker' + argvother = [ + '-E', + '-l', 'INFO', + '--concurrency', '1', + '--pool', 'solo', # No preforking, as PyMongo can't handle connect-before-fork. + # We might get rid of this and go for the default Celery worker + # preforking concurrency model, *if* we can somehow reset the + # PyMongo client and reconnect after forking. + ] + list(args) + + current_app.celery.worker_main([argv0] + argvother) + + +@manager_celery.command +def queue(): + """Shows queued Celery tasks.""" + + from pprint import pprint + + # Inspect all nodes. + i = current_app.celery.control.inspect() + + print(50 * '=') + print('Tasks that have an ETA or are scheduled for later processing:') + pprint(i.scheduled()) + + print() + print('Tasks that are currently active:') + pprint(i.active()) + + print() + print('Tasks that have been claimed by workers:') + pprint(i.reserved()) + print(50 * '=') + + +@manager_celery.command +def purge(): + """Deletes queued Celery tasks.""" + + log.warning('Purging all pending Celery tasks.') + current_app.celery.control.purge() diff --git a/pillar/cli/operations.py b/pillar/cli/operations.py index bcd00c59..4eed3ca7 100644 --- a/pillar/cli/operations.py +++ b/pillar/cli/operations.py @@ -159,23 +159,3 @@ def index_users_update_settings(): 'unordered(roles)' ] }) - - -@manager_operations.option('args', nargs='*') -def worker(args): - """Runs a Celery worker.""" - - import sys - - argv0 = f'{sys.argv[0]} operations worker' - argvother = [ - '-E', - '-l', 'INFO', - '--concurrency', '1', - '--pool', 'solo', # No preforking, as PyMongo can't handle connect-before-fork. - # We might get rid of this and go for the default Celery worker - # preforking concurrency model, *if* we can somehow reset the - # PyMongo client and reconnect after forking. - ] + list(args) - - current_app.celery.worker_main([argv0] + argvother)