Integrated Celery startup / management / config with PillarServer.

This commit is contained in:
Sybren A. Stüvel 2017-06-01 16:15:44 +02:00
parent e7d268bde6
commit 5af54237b9
7 changed files with 41 additions and 29 deletions

View File

@ -60,6 +60,7 @@ class ConfigurationMissingError(SystemExit):
class PillarServer(Eve):
def __init__(self, app_root, **kwargs):
from .extension import PillarExtension
from celery import Celery
kwargs.setdefault('validator', custom_field_validation.ValidateCustomFields)
super(PillarServer, self).__init__(settings=empty_settings, **kwargs)
@ -106,6 +107,9 @@ class PillarServer(Eve):
self._config_caching()
# Celery itself is configured after all extensions have loaded.
self.celery: Celery = None
self.before_first_request(self.setup_db_indices)
def _load_flask_config(self):
@ -317,6 +321,30 @@ class PillarServer(Eve):
'static_%s' % name,
ext.static_path)
def _config_celery(self):
from celery import Celery
self.log.info('Configuring Celery')
# Pillar-defined Celery task modules:
celery_task_modules = [
'pillar.celery.tasks',
'pillar.celery.algolia_tasks',
]
# Allow Pillar extensions from defining their own Celery tasks.
for extension in self.pillar_extensions.values():
celery_task_modules.extend(extension.celery_task_modules)
self.celery = Celery(
'pillar.celery',
backend=self.config['CELERY_BACKEND'],
broker=self.config['CELERY_BROKER'],
include=celery_task_modules,
task_track_started=True,
result_expires=3600,
)
def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder):
from pillar.web.staticfile import PillarStaticFile
@ -475,6 +503,8 @@ class PillarServer(Eve):
def finish_startup(self):
self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME'])
self._config_celery()
api.setup_app(self)
web.setup_app(self)
authentication.setup_app(self)

View File

@ -4,12 +4,10 @@ import bson
from pillar import current_app
from .celery_cfg import celery_cfg
log = logging.getLogger(__name__)
@celery_cfg.task(ignore_result=True)
@current_app.celery.task(ignore_result=True)
def push_updated_user_to_algolia(user_id: str):
"""Push an update to the Algolia index when a user item is updated"""

View File

@ -1,20 +0,0 @@
from celery import Celery
task_modules = [
'pillar.celery.tasks',
'pillar.celery.algolia_tasks',
]
celery_cfg = Celery('proj',
backend='redis://redis/1',
broker='amqp://guest:guest@rabbit//',
include=task_modules,
task_track_started=True)
# Optional configuration, see the application user guide.
celery_cfg.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
celery_cfg.start()

View File

@ -1,12 +1,12 @@
import logging
import typing
from .celery_cfg import celery_cfg
from pillar import current_app
log = logging.getLogger(__name__)
@celery_cfg.task(track_started=True)
@current_app.celery.task(track_started=True)
def long_task(numbers: typing.List[int]):
_log = log.getChild('long_task')
_log.info('Computing sum of %i items', len(numbers))
@ -18,4 +18,3 @@ def long_task(numbers: typing.List[int]):
_log.info('Computed sum of %i items', len(numbers))
return thesum

View File

@ -178,6 +178,4 @@ def worker(args):
# PyMongo client and reconnect after forking.
] + list(args)
from pillar.celery.celery_cfg import celery_cfg
celery_cfg.worker_main([argv0] + argvother)
current_app.celery.worker_main([argv0] + argvother)

View File

@ -155,3 +155,6 @@ EXTERNAL_SUBSCRIPTIONS_TIMEOUT_SECS = 10
# Certificate file for communication with other systems.
TLS_CERT_FILE = requests.certs.where()
CELERY_BACKEND = 'redis://redis/1'
CELERY_BROKER = 'amqp://guest:guest@rabbit//'

View File

@ -16,6 +16,7 @@ can then be registered to the application at app creation time:
"""
import abc
import typing
import flask
import pillarsdk
@ -25,6 +26,9 @@ class PillarExtension(object, metaclass=abc.ABCMeta):
# Set to True when your extension implements the project_settings() method.
has_project_settings = False
# List of Celery task modules introduced by this extension.
celery_task_modules: typing.List[str] = []
@property
@abc.abstractmethod
def name(self):