From ed4ee5228a216b29a138231b7a7ed364e1d92fd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 1 Jun 2017 15:41:21 +0200 Subject: [PATCH] Added Celery for background tasks. The implementation is still rather simple, using hard-coded configuration values. This will change in subsequent commits. The worker can be started with "manage.py operations worker". Celery Worker CLI options can be passed after a double dash, like this: ./manage.py operations worker -- -C -E --- pillar/celery/__init__.py | 0 pillar/celery/celery_cfg.py | 19 +++++++++++++++++++ pillar/celery/tasks.py | 21 +++++++++++++++++++++ pillar/cli/operations.py | 22 ++++++++++++++++++++++ requirements.txt | 6 ++++++ 5 files changed, 68 insertions(+) create mode 100644 pillar/celery/__init__.py create mode 100644 pillar/celery/celery_cfg.py create mode 100644 pillar/celery/tasks.py diff --git a/pillar/celery/__init__.py b/pillar/celery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pillar/celery/celery_cfg.py b/pillar/celery/celery_cfg.py new file mode 100644 index 00000000..377932e8 --- /dev/null +++ b/pillar/celery/celery_cfg.py @@ -0,0 +1,19 @@ +from celery import Celery + +task_modules = [ + 'pillar.celery.tasks', +] + +celery_cfg = Celery('proj', + backend='redis://redis/1', + broker='amqp://guest:guest@rabbit//', + include=task_modules, + task_track_started=True) + +# Optional configuration, see the application user guide. +celery_cfg.conf.update( + result_expires=3600, +) + +if __name__ == '__main__': + celery_cfg.start() diff --git a/pillar/celery/tasks.py b/pillar/celery/tasks.py new file mode 100644 index 00000000..9fe8b42d --- /dev/null +++ b/pillar/celery/tasks.py @@ -0,0 +1,21 @@ +import logging +import typing + +from .celery_cfg import celery_cfg + +log = logging.getLogger(__name__) + + +@celery_cfg.task(track_started=True) +def long_task(numbers: typing.List[int]): + _log = log.getChild('long_task') + _log.info('Computing sum of %i items', len(numbers)) + + import time + time.sleep(6) + thesum = sum(numbers) + + _log.info('Computed sum of %i items', len(numbers)) + + return thesum + diff --git a/pillar/cli/operations.py b/pillar/cli/operations.py index 4eed3ca7..4c4b810a 100644 --- a/pillar/cli/operations.py +++ b/pillar/cli/operations.py @@ -159,3 +159,25 @@ def index_users_update_settings(): 'unordered(roles)' ] }) + + +@manager_operations.option('args', nargs='*') +def worker(args): + """Runs a Celery worker.""" + + import sys + + argv0 = f'{sys.argv[0]} operations worker' + argvother = [ + '-E', + '-l', 'INFO', + '--concurrency', '1', + '--pool', 'solo', # No preforking, as PyMongo can't handle connect-before-fork. + # We might get rid of this and go for the default Celery worker + # preforking concurrency model, *if* we can somehow reset the + # PyMongo client and reconnect after forking. + ] + list(args) + + from pillar.celery.celery_cfg import celery_cfg + + celery_cfg.worker_main([argv0] + argvother) diff --git a/requirements.txt b/requirements.txt index 334b15ff..a1b80386 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ bcrypt==3.1.3 blinker==1.4 bugsnag==2.3.1 bleach==1.4.3 +celery[redis]==4.0.2 commonmark==0.7.2 Eve==0.7.3 Flask==0.12.2 @@ -29,6 +30,8 @@ wheel==0.29.0 zencoder==0.6.5 # Secondary requirements +amqp==2.1.4 +billiard==3.5.0.2 Flask-PyMongo==0.4.1 CommonMark==0.7.2 cerberus==0.9.2 @@ -38,15 +41,18 @@ html5lib==0.9999999 googleapis-common-protos==1.1.0 itsdangerous==0.24 jinja2==2.9.6 +kombu==4.0.2 oauth2client==2.0.2 oauthlib==2.0.1 protobuf==3.0.0b2.post2 protorpc==0.11.1 pyasn1-modules==0.0.8 pymongo==3.4.0 +pytz==2017.2 requests-oauthlib==0.7.0 rsa==3.4.2 simplejson==3.10.0 six==1.10.0 +vine==1.1.3 WTForms==2.1 werkzeug==0.11.15