Integrated Celery startup / management / config with PillarServer.
This commit is contained in:
@@ -60,6 +60,7 @@ class ConfigurationMissingError(SystemExit):
|
||||
class PillarServer(Eve):
|
||||
def __init__(self, app_root, **kwargs):
|
||||
from .extension import PillarExtension
|
||||
from celery import Celery
|
||||
|
||||
kwargs.setdefault('validator', custom_field_validation.ValidateCustomFields)
|
||||
super(PillarServer, self).__init__(settings=empty_settings, **kwargs)
|
||||
@@ -106,6 +107,9 @@ class PillarServer(Eve):
|
||||
|
||||
self._config_caching()
|
||||
|
||||
# Celery itself is configured after all extensions have loaded.
|
||||
self.celery: Celery = None
|
||||
|
||||
self.before_first_request(self.setup_db_indices)
|
||||
|
||||
def _load_flask_config(self):
|
||||
@@ -317,6 +321,30 @@ class PillarServer(Eve):
|
||||
'static_%s' % name,
|
||||
ext.static_path)
|
||||
|
||||
def _config_celery(self):
|
||||
from celery import Celery
|
||||
|
||||
self.log.info('Configuring Celery')
|
||||
|
||||
# Pillar-defined Celery task modules:
|
||||
celery_task_modules = [
|
||||
'pillar.celery.tasks',
|
||||
'pillar.celery.algolia_tasks',
|
||||
]
|
||||
|
||||
# Allow Pillar extensions from defining their own Celery tasks.
|
||||
for extension in self.pillar_extensions.values():
|
||||
celery_task_modules.extend(extension.celery_task_modules)
|
||||
|
||||
self.celery = Celery(
|
||||
'pillar.celery',
|
||||
backend=self.config['CELERY_BACKEND'],
|
||||
broker=self.config['CELERY_BROKER'],
|
||||
include=celery_task_modules,
|
||||
task_track_started=True,
|
||||
result_expires=3600,
|
||||
)
|
||||
|
||||
def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder):
|
||||
from pillar.web.staticfile import PillarStaticFile
|
||||
|
||||
@@ -475,6 +503,8 @@ class PillarServer(Eve):
|
||||
def finish_startup(self):
|
||||
self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME'])
|
||||
|
||||
self._config_celery()
|
||||
|
||||
api.setup_app(self)
|
||||
web.setup_app(self)
|
||||
authentication.setup_app(self)
|
||||
|
Reference in New Issue
Block a user