Added Celery task for queued email sending.
Upon IOError or OSError (which includes SMTP protocol errors) the mail sending task is retried after MAIL_RETRY seconds. It is retried three times (default setting of Celery) only.
This commit is contained in:
parent
01f81ce4d5
commit
8ca6b4cdb0
@ -462,6 +462,7 @@ class PillarServer(BlinkerCompatibleEve):
|
|||||||
'pillar.celery.tasks',
|
'pillar.celery.tasks',
|
||||||
'pillar.celery.algolia_tasks',
|
'pillar.celery.algolia_tasks',
|
||||||
'pillar.celery.file_link_tasks',
|
'pillar.celery.file_link_tasks',
|
||||||
|
'pillar.celery.email_tasks',
|
||||||
]
|
]
|
||||||
|
|
||||||
# Allow Pillar extensions from defining their own Celery tasks.
|
# Allow Pillar extensions from defining their own Celery tasks.
|
||||||
|
48
pillar/celery/email_tasks.py
Normal file
48
pillar/celery/email_tasks.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
"""Deferred email support.
|
||||||
|
|
||||||
|
Note that this module can only be imported when an application context is
|
||||||
|
active. Best to late-import this in the functions where it's needed.
|
||||||
|
"""
|
||||||
|
from email.message import EmailMessage
|
||||||
|
from email.headerregistry import Address
|
||||||
|
import logging
|
||||||
|
import smtplib
|
||||||
|
|
||||||
|
import celery
|
||||||
|
|
||||||
|
from pillar import current_app
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@current_app.celery.task(bind=True, ignore_result=True, acks_late=True)
|
||||||
|
def send_email(self: celery.Task, to_name: str, to_addr: str, subject: str, text: str, html: str):
|
||||||
|
"""Send an email to a single address."""
|
||||||
|
# WARNING: when changing the signature of this function, also change the
|
||||||
|
# self.retry() call below.
|
||||||
|
cfg = current_app.config
|
||||||
|
|
||||||
|
# Construct the message
|
||||||
|
msg = EmailMessage()
|
||||||
|
msg['Subject'] = subject
|
||||||
|
msg['From'] = Address(cfg['MAIL_DEFAULT_FROM_NAME'], addr_spec=cfg['MAIL_DEFAULT_FROM_ADDR'])
|
||||||
|
msg['To'] = (Address(to_name, addr_spec=to_addr),)
|
||||||
|
msg.set_content(text)
|
||||||
|
msg.add_alternative(html, subtype='html')
|
||||||
|
|
||||||
|
# Refuse to send mail when we're testing.
|
||||||
|
if cfg['TESTING']:
|
||||||
|
log.warning('not sending mail to %s <%s> because we are TESTING', to_name, to_addr)
|
||||||
|
return
|
||||||
|
log.info('sending email to %s <%s>', to_name, to_addr)
|
||||||
|
|
||||||
|
# Send the message via local SMTP server.
|
||||||
|
try:
|
||||||
|
with smtplib.SMTP(cfg['SMTP_HOST'], cfg['SMTP_PORT'], timeout=cfg['SMTP_TIMEOUT']) as smtp:
|
||||||
|
smtp.send_message(msg)
|
||||||
|
except (IOError, OSError) as ex:
|
||||||
|
log.exception('error sending email to %s <%s>, will retry later: %s',
|
||||||
|
to_name, to_addr, ex)
|
||||||
|
self.retry((to_name, to_addr, subject, text, html), countdown=cfg['MAIL_RETRY'])
|
||||||
|
else:
|
||||||
|
log.info('mail to %s <%s> successfully sent', to_name, to_addr)
|
@ -230,3 +230,11 @@ DEFAULT_LOCALE = 'en_US'
|
|||||||
# never show the site in English.
|
# never show the site in English.
|
||||||
SUPPORT_ENGLISH = True
|
SUPPORT_ENGLISH = True
|
||||||
|
|
||||||
|
|
||||||
|
# Mail options, see pillar.celery.email_tasks.
|
||||||
|
SMTP_HOST = 'localhost'
|
||||||
|
SMTP_PORT = 2525
|
||||||
|
SMTP_TIMEOUT = 30 # timeout in seconds, https://docs.python.org/3/library/smtplib.html#smtplib.SMTP
|
||||||
|
MAIL_RETRY = 180 # in seconds, delay until trying to send an email again.
|
||||||
|
MAIL_DEFAULT_FROM_NAME = 'Blender Cloud'
|
||||||
|
MAIL_DEFAULT_FROM_ADDR = 'cloudsupport@localhost'
|
||||||
|
@ -67,20 +67,26 @@ class PillarTestServer(pillar.PillarServer):
|
|||||||
Without this, actual Celery tasks will be created while the tests are running.
|
Without this, actual Celery tasks will be created while the tests are running.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from celery import Celery
|
from celery import Celery, Task
|
||||||
|
|
||||||
self.celery = unittest.mock.MagicMock(Celery)
|
self.celery = unittest.mock.MagicMock(Celery)
|
||||||
|
|
||||||
def fake_task(*task_args, **task_kwargs):
|
def fake_task(*task_args, bind=False, **task_kwargs):
|
||||||
def decorator(f):
|
def decorator(f):
|
||||||
def delay(*args, **kwargs):
|
def delay(*args, **kwargs):
|
||||||
return f(*args, **kwargs)
|
if bind:
|
||||||
|
return f(decorator.sender, *args, **kwargs)
|
||||||
|
else:
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
f.delay = delay
|
f.delay = delay
|
||||||
f.si = unittest.mock.MagicMock()
|
f.si = unittest.mock.MagicMock()
|
||||||
f.s = unittest.mock.MagicMock()
|
f.s = unittest.mock.MagicMock()
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
if bind:
|
||||||
|
decorator.sender = unittest.mock.MagicMock(Task)
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
self.celery.task = fake_task
|
self.celery.task = fake_task
|
||||||
|
Loading…
x
Reference in New Issue
Block a user