Added Celery for background tasks.

The implementation is still rather simple, using hard-coded configuration
values. This will change in subsequent commits.

The worker can be started with "manage.py operations worker". Celery
Worker CLI options can be passed after a double dash, like this:

    ./manage.py operations worker -- -C -E
This commit is contained in:
2017-06-01 15:41:21 +02:00
parent f152521041
commit ed4ee5228a
5 changed files with 68 additions and 0 deletions

View File

View File

@@ -0,0 +1,19 @@
from celery import Celery
task_modules = [
'pillar.celery.tasks',
]
celery_cfg = Celery('proj',
backend='redis://redis/1',
broker='amqp://guest:guest@rabbit//',
include=task_modules,
task_track_started=True)
# Optional configuration, see the application user guide.
celery_cfg.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
celery_cfg.start()

21
pillar/celery/tasks.py Normal file
View File

@@ -0,0 +1,21 @@
import logging
import typing
from .celery_cfg import celery_cfg
log = logging.getLogger(__name__)
@celery_cfg.task(track_started=True)
def long_task(numbers: typing.List[int]):
_log = log.getChild('long_task')
_log.info('Computing sum of %i items', len(numbers))
import time
time.sleep(6)
thesum = sum(numbers)
_log.info('Computed sum of %i items', len(numbers))
return thesum

View File

@@ -159,3 +159,25 @@ def index_users_update_settings():
'unordered(roles)'
]
})
@manager_operations.option('args', nargs='*')
def worker(args):
"""Runs a Celery worker."""
import sys
argv0 = f'{sys.argv[0]} operations worker'
argvother = [
'-E',
'-l', 'INFO',
'--concurrency', '1',
'--pool', 'solo', # No preforking, as PyMongo can't handle connect-before-fork.
# We might get rid of this and go for the default Celery worker
# preforking concurrency model, *if* we can somehow reset the
# PyMongo client and reconnect after forking.
] + list(args)
from pillar.celery.celery_cfg import celery_cfg
celery_cfg.worker_main([argv0] + argvother)