Integrated Celery startup / management / config with PillarServer.
This commit is contained in:
parent
e7d268bde6
commit
5af54237b9
@ -60,6 +60,7 @@ class ConfigurationMissingError(SystemExit):
|
|||||||
class PillarServer(Eve):
|
class PillarServer(Eve):
|
||||||
def __init__(self, app_root, **kwargs):
|
def __init__(self, app_root, **kwargs):
|
||||||
from .extension import PillarExtension
|
from .extension import PillarExtension
|
||||||
|
from celery import Celery
|
||||||
|
|
||||||
kwargs.setdefault('validator', custom_field_validation.ValidateCustomFields)
|
kwargs.setdefault('validator', custom_field_validation.ValidateCustomFields)
|
||||||
super(PillarServer, self).__init__(settings=empty_settings, **kwargs)
|
super(PillarServer, self).__init__(settings=empty_settings, **kwargs)
|
||||||
@ -106,6 +107,9 @@ class PillarServer(Eve):
|
|||||||
|
|
||||||
self._config_caching()
|
self._config_caching()
|
||||||
|
|
||||||
|
# Celery itself is configured after all extensions have loaded.
|
||||||
|
self.celery: Celery = None
|
||||||
|
|
||||||
self.before_first_request(self.setup_db_indices)
|
self.before_first_request(self.setup_db_indices)
|
||||||
|
|
||||||
def _load_flask_config(self):
|
def _load_flask_config(self):
|
||||||
@ -317,6 +321,30 @@ class PillarServer(Eve):
|
|||||||
'static_%s' % name,
|
'static_%s' % name,
|
||||||
ext.static_path)
|
ext.static_path)
|
||||||
|
|
||||||
|
def _config_celery(self):
|
||||||
|
from celery import Celery
|
||||||
|
|
||||||
|
self.log.info('Configuring Celery')
|
||||||
|
|
||||||
|
# Pillar-defined Celery task modules:
|
||||||
|
celery_task_modules = [
|
||||||
|
'pillar.celery.tasks',
|
||||||
|
'pillar.celery.algolia_tasks',
|
||||||
|
]
|
||||||
|
|
||||||
|
# Allow Pillar extensions from defining their own Celery tasks.
|
||||||
|
for extension in self.pillar_extensions.values():
|
||||||
|
celery_task_modules.extend(extension.celery_task_modules)
|
||||||
|
|
||||||
|
self.celery = Celery(
|
||||||
|
'pillar.celery',
|
||||||
|
backend=self.config['CELERY_BACKEND'],
|
||||||
|
broker=self.config['CELERY_BROKER'],
|
||||||
|
include=celery_task_modules,
|
||||||
|
task_track_started=True,
|
||||||
|
result_expires=3600,
|
||||||
|
)
|
||||||
|
|
||||||
def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder):
|
def register_static_file_endpoint(self, url_prefix, endpoint_name, static_folder):
|
||||||
from pillar.web.staticfile import PillarStaticFile
|
from pillar.web.staticfile import PillarStaticFile
|
||||||
|
|
||||||
@ -475,6 +503,8 @@ class PillarServer(Eve):
|
|||||||
def finish_startup(self):
|
def finish_startup(self):
|
||||||
self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME'])
|
self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME'])
|
||||||
|
|
||||||
|
self._config_celery()
|
||||||
|
|
||||||
api.setup_app(self)
|
api.setup_app(self)
|
||||||
web.setup_app(self)
|
web.setup_app(self)
|
||||||
authentication.setup_app(self)
|
authentication.setup_app(self)
|
||||||
|
@ -4,12 +4,10 @@ import bson
|
|||||||
|
|
||||||
from pillar import current_app
|
from pillar import current_app
|
||||||
|
|
||||||
from .celery_cfg import celery_cfg
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@celery_cfg.task(ignore_result=True)
|
@current_app.celery.task(ignore_result=True)
|
||||||
def push_updated_user_to_algolia(user_id: str):
|
def push_updated_user_to_algolia(user_id: str):
|
||||||
"""Push an update to the Algolia index when a user item is updated"""
|
"""Push an update to the Algolia index when a user item is updated"""
|
||||||
|
|
||||||
|
@ -1,20 +0,0 @@
|
|||||||
from celery import Celery
|
|
||||||
|
|
||||||
task_modules = [
|
|
||||||
'pillar.celery.tasks',
|
|
||||||
'pillar.celery.algolia_tasks',
|
|
||||||
]
|
|
||||||
|
|
||||||
celery_cfg = Celery('proj',
|
|
||||||
backend='redis://redis/1',
|
|
||||||
broker='amqp://guest:guest@rabbit//',
|
|
||||||
include=task_modules,
|
|
||||||
task_track_started=True)
|
|
||||||
|
|
||||||
# Optional configuration, see the application user guide.
|
|
||||||
celery_cfg.conf.update(
|
|
||||||
result_expires=3600,
|
|
||||||
)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
celery_cfg.start()
|
|
@ -1,12 +1,12 @@
|
|||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from .celery_cfg import celery_cfg
|
from pillar import current_app
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@celery_cfg.task(track_started=True)
|
@current_app.celery.task(track_started=True)
|
||||||
def long_task(numbers: typing.List[int]):
|
def long_task(numbers: typing.List[int]):
|
||||||
_log = log.getChild('long_task')
|
_log = log.getChild('long_task')
|
||||||
_log.info('Computing sum of %i items', len(numbers))
|
_log.info('Computing sum of %i items', len(numbers))
|
||||||
@ -18,4 +18,3 @@ def long_task(numbers: typing.List[int]):
|
|||||||
_log.info('Computed sum of %i items', len(numbers))
|
_log.info('Computed sum of %i items', len(numbers))
|
||||||
|
|
||||||
return thesum
|
return thesum
|
||||||
|
|
||||||
|
@ -178,6 +178,4 @@ def worker(args):
|
|||||||
# PyMongo client and reconnect after forking.
|
# PyMongo client and reconnect after forking.
|
||||||
] + list(args)
|
] + list(args)
|
||||||
|
|
||||||
from pillar.celery.celery_cfg import celery_cfg
|
current_app.celery.worker_main([argv0] + argvother)
|
||||||
|
|
||||||
celery_cfg.worker_main([argv0] + argvother)
|
|
||||||
|
@ -155,3 +155,6 @@ EXTERNAL_SUBSCRIPTIONS_TIMEOUT_SECS = 10
|
|||||||
|
|
||||||
# Certificate file for communication with other systems.
|
# Certificate file for communication with other systems.
|
||||||
TLS_CERT_FILE = requests.certs.where()
|
TLS_CERT_FILE = requests.certs.where()
|
||||||
|
|
||||||
|
CELERY_BACKEND = 'redis://redis/1'
|
||||||
|
CELERY_BROKER = 'amqp://guest:guest@rabbit//'
|
||||||
|
@ -16,6 +16,7 @@ can then be registered to the application at app creation time:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import typing
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
import pillarsdk
|
import pillarsdk
|
||||||
@ -25,6 +26,9 @@ class PillarExtension(object, metaclass=abc.ABCMeta):
|
|||||||
# Set to True when your extension implements the project_settings() method.
|
# Set to True when your extension implements the project_settings() method.
|
||||||
has_project_settings = False
|
has_project_settings = False
|
||||||
|
|
||||||
|
# List of Celery task modules introduced by this extension.
|
||||||
|
celery_task_modules: typing.List[str] = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def name(self):
|
def name(self):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user