Integrated Celery startup / management / config with PillarServer.
This commit is contained in:
parent
e7d268bde6
commit
5af54237b9
@ -60,6 +60,7 @@ class ConfigurationMissingError(SystemExit):
|
||||
class PillarServer(Eve):
|
||||
def __init__(self, app_root, **kwargs):
|
||||
from .extension import PillarExtension
|
||||
from celery import Celery
|
||||
|
||||
kwargs.setdefault('validator', custom_field_validation.ValidateCustomFields)
|
||||
super(PillarServer, self).__init__(settings=empty_settings, **kwargs)
|
||||
@ -106,6 +107,9 @@ class PillarServer(Eve):
|
||||
|
||||
self._config_caching()
|
||||
|
||||
# Celery itself is configured after all extensions have loaded.
|
||||
self.celery: Celery = None
|
||||
|
||||
self.before_first_request(self.setup_db_indices)
|
||||
|
||||
def _load_flask_config(self):
|
||||
@ -317,6 +321,30 @@ class PillarServer(Eve):
|
||||
'static_%s' % name,
|
||||
ext.static_path)
|
||||
|
||||
def _config_celery(self):
|
||||
from celery import Celery
|
||||
|
||||
self.log.info('Configuring Celery')
|
||||
|
||||
# Pillar-defined Celery task modules:
|
||||
celery_task_modules = [
|
||||
'pillar.celery.tasks',
|
||||
'pillar.celery.algolia_tasks',
|
||||
]
|
||||
|
||||
# Allow Pillar extensions from defining their own Celery tasks.
|
||||
for extension in self.pillar_extensions.values():
|
||||
celery_task_modules.extend(extension.celery_task_modules)
|
||||
|
||||
self.celery = Celery(
|
||||
'pillar.celery',
|
||||
backend=self.config['CELERY_BACKEND'],
|
||||
broker=self.config['CELERY_BROKER'],
|
||||
include=celery_task_modules,
|
||||
task_track_started=True,
|
||||
result_expires=3600,
|
||||
)
|
||||
|
||||
def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder):
|
||||
from pillar.web.staticfile import PillarStaticFile
|
||||
|
||||
@ -475,6 +503,8 @@ class PillarServer(Eve):
|
||||
def finish_startup(self):
|
||||
self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME'])
|
||||
|
||||
self._config_celery()
|
||||
|
||||
api.setup_app(self)
|
||||
web.setup_app(self)
|
||||
authentication.setup_app(self)
|
||||
|
@ -4,12 +4,10 @@ import bson
|
||||
|
||||
from pillar import current_app
|
||||
|
||||
from .celery_cfg import celery_cfg
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery_cfg.task(ignore_result=True)
|
||||
@current_app.celery.task(ignore_result=True)
|
||||
def push_updated_user_to_algolia(user_id: str):
|
||||
"""Push an update to the Algolia index when a user item is updated"""
|
||||
|
||||
|
@ -1,20 +0,0 @@
|
||||
from celery import Celery
|
||||
|
||||
task_modules = [
|
||||
'pillar.celery.tasks',
|
||||
'pillar.celery.algolia_tasks',
|
||||
]
|
||||
|
||||
celery_cfg = Celery('proj',
|
||||
backend='redis://redis/1',
|
||||
broker='amqp://guest:guest@rabbit//',
|
||||
include=task_modules,
|
||||
task_track_started=True)
|
||||
|
||||
# Optional configuration, see the application user guide.
|
||||
celery_cfg.conf.update(
|
||||
result_expires=3600,
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
celery_cfg.start()
|
@ -1,12 +1,12 @@
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from .celery_cfg import celery_cfg
|
||||
from pillar import current_app
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery_cfg.task(track_started=True)
|
||||
@current_app.celery.task(track_started=True)
|
||||
def long_task(numbers: typing.List[int]):
|
||||
_log = log.getChild('long_task')
|
||||
_log.info('Computing sum of %i items', len(numbers))
|
||||
@ -18,4 +18,3 @@ def long_task(numbers: typing.List[int]):
|
||||
_log.info('Computed sum of %i items', len(numbers))
|
||||
|
||||
return thesum
|
||||
|
||||
|
@ -178,6 +178,4 @@ def worker(args):
|
||||
# PyMongo client and reconnect after forking.
|
||||
] + list(args)
|
||||
|
||||
from pillar.celery.celery_cfg import celery_cfg
|
||||
|
||||
celery_cfg.worker_main([argv0] + argvother)
|
||||
current_app.celery.worker_main([argv0] + argvother)
|
||||
|
@ -155,3 +155,6 @@ EXTERNAL_SUBSCRIPTIONS_TIMEOUT_SECS = 10
|
||||
|
||||
# Certificate file for communication with other systems.
|
||||
TLS_CERT_FILE = requests.certs.where()
|
||||
|
||||
CELERY_BACKEND = 'redis://redis/1'
|
||||
CELERY_BROKER = 'amqp://guest:guest@rabbit//'
|
||||
|
@ -16,6 +16,7 @@ can then be registered to the application at app creation time:
|
||||
"""
|
||||
|
||||
import abc
|
||||
import typing
|
||||
|
||||
import flask
|
||||
import pillarsdk
|
||||
@ -25,6 +26,9 @@ class PillarExtension(object, metaclass=abc.ABCMeta):
|
||||
# Set to True when your extension implements the project_settings() method.
|
||||
has_project_settings = False
|
||||
|
||||
# List of Celery task modules introduced by this extension.
|
||||
celery_task_modules: typing.List[str] = []
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def name(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user