diff --git a/pillar/__init__.py b/pillar/__init__.py index 43c5663b..c60da2ab 100644 --- a/pillar/__init__.py +++ b/pillar/__init__.py @@ -60,6 +60,7 @@ class ConfigurationMissingError(SystemExit): class PillarServer(Eve): def __init__(self, app_root, **kwargs): from .extension import PillarExtension + from celery import Celery kwargs.setdefault('validator', custom_field_validation.ValidateCustomFields) super(PillarServer, self).__init__(settings=empty_settings, **kwargs) @@ -106,6 +107,9 @@ class PillarServer(Eve): self._config_caching() + # Celery itself is configured after all extensions have loaded. + self.celery: Celery = None + self.before_first_request(self.setup_db_indices) def _load_flask_config(self): @@ -317,6 +321,30 @@ class PillarServer(Eve): 'static_%s' % name, ext.static_path) + def _config_celery(self): + from celery import Celery + + self.log.info('Configuring Celery') + + # Pillar-defined Celery task modules: + celery_task_modules = [ + 'pillar.celery.tasks', + 'pillar.celery.algolia_tasks', + ] + + # Allow Pillar extensions from defining their own Celery tasks. + for extension in self.pillar_extensions.values(): + celery_task_modules.extend(extension.celery_task_modules) + + self.celery = Celery( + 'pillar.celery', + backend=self.config['CELERY_BACKEND'], + broker=self.config['CELERY_BROKER'], + include=celery_task_modules, + task_track_started=True, + result_expires=3600, + ) + def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder): from pillar.web.staticfile import PillarStaticFile @@ -475,6 +503,8 @@ class PillarServer(Eve): def finish_startup(self): self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME']) + self._config_celery() + api.setup_app(self) web.setup_app(self) authentication.setup_app(self) diff --git a/pillar/celery/algolia_tasks.py b/pillar/celery/algolia_tasks.py index c8dad3d6..ccf51768 100644 --- a/pillar/celery/algolia_tasks.py +++ b/pillar/celery/algolia_tasks.py @@ -4,12 +4,10 @@ import bson from pillar import current_app -from .celery_cfg import celery_cfg - log = logging.getLogger(__name__) -@celery_cfg.task(ignore_result=True) +@current_app.celery.task(ignore_result=True) def push_updated_user_to_algolia(user_id: str): """Push an update to the Algolia index when a user item is updated""" diff --git a/pillar/celery/celery_cfg.py b/pillar/celery/celery_cfg.py deleted file mode 100644 index 505f58f1..00000000 --- a/pillar/celery/celery_cfg.py +++ /dev/null @@ -1,20 +0,0 @@ -from celery import Celery - -task_modules = [ - 'pillar.celery.tasks', - 'pillar.celery.algolia_tasks', -] - -celery_cfg = Celery('proj', - backend='redis://redis/1', - broker='amqp://guest:guest@rabbit//', - include=task_modules, - task_track_started=True) - -# Optional configuration, see the application user guide. -celery_cfg.conf.update( - result_expires=3600, -) - -if __name__ == '__main__': - celery_cfg.start() diff --git a/pillar/celery/tasks.py b/pillar/celery/tasks.py index 9fe8b42d..b4baa13d 100644 --- a/pillar/celery/tasks.py +++ b/pillar/celery/tasks.py @@ -1,12 +1,12 @@ import logging import typing -from .celery_cfg import celery_cfg +from pillar import current_app log = logging.getLogger(__name__) -@celery_cfg.task(track_started=True) +@current_app.celery.task(track_started=True) def long_task(numbers: typing.List[int]): _log = log.getChild('long_task') _log.info('Computing sum of %i items', len(numbers)) @@ -18,4 +18,3 @@ def long_task(numbers: typing.List[int]): _log.info('Computed sum of %i items', len(numbers)) return thesum - diff --git a/pillar/cli/operations.py b/pillar/cli/operations.py index 4c4b810a..bcd00c59 100644 --- a/pillar/cli/operations.py +++ b/pillar/cli/operations.py @@ -178,6 +178,4 @@ def worker(args): # PyMongo client and reconnect after forking. ] + list(args) - from pillar.celery.celery_cfg import celery_cfg - - celery_cfg.worker_main([argv0] + argvother) + current_app.celery.worker_main([argv0] + argvother) diff --git a/pillar/config.py b/pillar/config.py index 7f18eb3b..4cb1087f 100644 --- a/pillar/config.py +++ b/pillar/config.py @@ -155,3 +155,6 @@ EXTERNAL_SUBSCRIPTIONS_TIMEOUT_SECS = 10 # Certificate file for communication with other systems. TLS_CERT_FILE = requests.certs.where() + +CELERY_BACKEND = 'redis://redis/1' +CELERY_BROKER = 'amqp://guest:guest@rabbit//' diff --git a/pillar/extension.py b/pillar/extension.py index 900d3832..a114ea06 100644 --- a/pillar/extension.py +++ b/pillar/extension.py @@ -16,6 +16,7 @@ can then be registered to the application at app creation time: """ import abc +import typing import flask import pillarsdk @@ -25,6 +26,9 @@ class PillarExtension(object, metaclass=abc.ABCMeta): # Set to True when your extension implements the project_settings() method. has_project_settings = False + # List of Celery task modules introduced by this extension. + celery_task_modules: typing.List[str] = [] + @property @abc.abstractmethod def name(self):