Moved Celery CLI commands to 'manage.py celery' submodule + added extensions:
Added a 'celery queue' command, which is supposed to show queued Celery tasks (but doesn't work quite as I'd expect). Added a 'celery purge' command, which purges queued Celery tasks.
This commit is contained in:
@@ -61,4 +61,6 @@ Pillar requires [Celery](http://www.celeryproject.org/) for background task proc
|
|||||||
turn requires a backend and a broker, for which the default Pillar configuration uses Redis and
|
turn requires a backend and a broker, for which the default Pillar configuration uses Redis and
|
||||||
RabbitMQ.
|
RabbitMQ.
|
||||||
|
|
||||||
You can run the Celery Worker using `manage.py operations worker`.
|
You can run the Celery Worker using `manage.py celery worker`.
|
||||||
|
|
||||||
|
Find other Celery operations with the `manage.py celery` command.
|
||||||
|
@@ -8,6 +8,7 @@ import logging
|
|||||||
from flask_script import Manager
|
from flask_script import Manager
|
||||||
|
|
||||||
from pillar import current_app
|
from pillar import current_app
|
||||||
|
from pillar.cli.celery import manager_celery
|
||||||
from pillar.cli.maintenance import manager_maintenance
|
from pillar.cli.maintenance import manager_maintenance
|
||||||
from pillar.cli.operations import manager_operations
|
from pillar.cli.operations import manager_operations
|
||||||
from pillar.cli.setup import manager_setup
|
from pillar.cli.setup import manager_setup
|
||||||
@@ -15,6 +16,7 @@ from pillar.cli.setup import manager_setup
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
manager = Manager(current_app)
|
manager = Manager(current_app)
|
||||||
|
|
||||||
|
manager.add_command('celery', manager_celery)
|
||||||
manager.add_command("maintenance", manager_maintenance)
|
manager.add_command("maintenance", manager_maintenance)
|
||||||
manager.add_command("setup", manager_setup)
|
manager.add_command("setup", manager_setup)
|
||||||
manager.add_command("operations", manager_operations)
|
manager.add_command("operations", manager_operations)
|
||||||
|
60
pillar/cli/celery.py
Normal file
60
pillar/cli/celery.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from flask_script import Manager
|
||||||
|
from pillar import current_app
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
manager_celery = Manager(
|
||||||
|
current_app, usage="Celery operations, like starting a worker or showing the queue")
|
||||||
|
|
||||||
|
|
||||||
|
@manager_celery.option('args', nargs='*')
|
||||||
|
def worker(args):
|
||||||
|
"""Runs a Celery worker."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
argv0 = f'{sys.argv[0]} operations worker'
|
||||||
|
argvother = [
|
||||||
|
'-E',
|
||||||
|
'-l', 'INFO',
|
||||||
|
'--concurrency', '1',
|
||||||
|
'--pool', 'solo', # No preforking, as PyMongo can't handle connect-before-fork.
|
||||||
|
# We might get rid of this and go for the default Celery worker
|
||||||
|
# preforking concurrency model, *if* we can somehow reset the
|
||||||
|
# PyMongo client and reconnect after forking.
|
||||||
|
] + list(args)
|
||||||
|
|
||||||
|
current_app.celery.worker_main([argv0] + argvother)
|
||||||
|
|
||||||
|
|
||||||
|
@manager_celery.command
|
||||||
|
def queue():
|
||||||
|
"""Shows queued Celery tasks."""
|
||||||
|
|
||||||
|
from pprint import pprint
|
||||||
|
|
||||||
|
# Inspect all nodes.
|
||||||
|
i = current_app.celery.control.inspect()
|
||||||
|
|
||||||
|
print(50 * '=')
|
||||||
|
print('Tasks that have an ETA or are scheduled for later processing:')
|
||||||
|
pprint(i.scheduled())
|
||||||
|
|
||||||
|
print()
|
||||||
|
print('Tasks that are currently active:')
|
||||||
|
pprint(i.active())
|
||||||
|
|
||||||
|
print()
|
||||||
|
print('Tasks that have been claimed by workers:')
|
||||||
|
pprint(i.reserved())
|
||||||
|
print(50 * '=')
|
||||||
|
|
||||||
|
|
||||||
|
@manager_celery.command
|
||||||
|
def purge():
|
||||||
|
"""Deletes queued Celery tasks."""
|
||||||
|
|
||||||
|
log.warning('Purging all pending Celery tasks.')
|
||||||
|
current_app.celery.control.purge()
|
@@ -159,23 +159,3 @@ def index_users_update_settings():
|
|||||||
'unordered(roles)'
|
'unordered(roles)'
|
||||||
]
|
]
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
@manager_operations.option('args', nargs='*')
|
|
||||||
def worker(args):
|
|
||||||
"""Runs a Celery worker."""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
|
|
||||||
argv0 = f'{sys.argv[0]} operations worker'
|
|
||||||
argvother = [
|
|
||||||
'-E',
|
|
||||||
'-l', 'INFO',
|
|
||||||
'--concurrency', '1',
|
|
||||||
'--pool', 'solo', # No preforking, as PyMongo can't handle connect-before-fork.
|
|
||||||
# We might get rid of this and go for the default Celery worker
|
|
||||||
# preforking concurrency model, *if* we can somehow reset the
|
|
||||||
# PyMongo client and reconnect after forking.
|
|
||||||
] + list(args)
|
|
||||||
|
|
||||||
current_app.celery.worker_main([argv0] + argvother)
|
|
||||||
|
Reference in New Issue
Block a user