322 lines
11 KiB
Python
322 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import unicode_literals
|
|
|
|
from datetime import datetime, timedelta
|
|
from importlib import import_module
|
|
from multiprocessing.pool import ThreadPool
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
from django.db.utils import OperationalError
|
|
from django.utils import timezone
|
|
from six import python_2_unicode_compatible
|
|
|
|
from background_task.exceptions import BackgroundTaskError
|
|
from background_task.models import Task
|
|
from background_task.settings import app_settings
|
|
from background_task import signals
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def bg_runner(proxy_task, task=None, *args, **kwargs):
|
|
"""
|
|
Executes the function attached to task. Used to enable threads.
|
|
If a Task instance is provided, args and kwargs are ignored and retrieved from the Task itself.
|
|
"""
|
|
signals.task_started.send(Task)
|
|
try:
|
|
func = getattr(proxy_task, 'task_function', None)
|
|
if isinstance(task, Task):
|
|
args, kwargs = task.params()
|
|
else:
|
|
task_name = getattr(proxy_task, 'name', None)
|
|
task_queue = getattr(proxy_task, 'queue', None)
|
|
task_qs = Task.objects.get_task(task_name=task_name, args=args, kwargs=kwargs)
|
|
if task_queue:
|
|
task_qs = task_qs.filter(queue=task_queue)
|
|
if task_qs:
|
|
task = task_qs[0]
|
|
if func is None:
|
|
raise BackgroundTaskError("Function is None, can't execute!")
|
|
func(*args, **kwargs)
|
|
|
|
if task:
|
|
# task done, so can delete it
|
|
task.increment_attempts()
|
|
completed = task.create_completed_task()
|
|
signals.task_successful.send(sender=task.__class__, task_id=task.id, completed_task=completed)
|
|
task.create_repetition()
|
|
task.delete()
|
|
logger.info('Ran task and deleting %s', task)
|
|
|
|
except Exception as ex:
|
|
t, e, traceback = sys.exc_info()
|
|
if task:
|
|
logger.error('Rescheduling %s', task, exc_info=(t, e, traceback))
|
|
signals.task_error.send(sender=ex.__class__, task=task)
|
|
task.reschedule(t, e, traceback)
|
|
del traceback
|
|
signals.task_finished.send(Task)
|
|
|
|
|
|
class PoolRunner:
|
|
def __init__(self, bg_runner, num_processes):
|
|
self._bg_runner = bg_runner
|
|
self._num_processes = num_processes
|
|
|
|
_pool_instance = None
|
|
|
|
@property
|
|
def _pool(self):
|
|
if not self._pool_instance:
|
|
self._pool_instance = ThreadPool(processes=self._num_processes)
|
|
return self._pool_instance
|
|
|
|
def run(self, proxy_task, task=None, *args, **kwargs):
|
|
self._pool.apply_async(func=self._bg_runner, args=(proxy_task, task) + tuple(args), kwds=kwargs)
|
|
|
|
__call__ = run
|
|
|
|
|
|
class Tasks(object):
|
|
def __init__(self):
|
|
self._tasks = {}
|
|
self._runner = DBTaskRunner()
|
|
self._task_proxy_class = TaskProxy
|
|
self._bg_runner = bg_runner
|
|
self._pool_runner = PoolRunner(bg_runner, app_settings.BACKGROUND_TASK_ASYNC_THREADS)
|
|
|
|
def background(self, name=None, schedule=None, queue=None,
|
|
remove_existing_tasks=False):
|
|
'''
|
|
decorator to turn a regular function into
|
|
something that gets run asynchronously in
|
|
the background, at a later time
|
|
'''
|
|
|
|
# see if used as simple decorator
|
|
# where first arg is the function to be decorated
|
|
fn = None
|
|
if name and callable(name):
|
|
fn = name
|
|
name = None
|
|
|
|
def _decorator(fn):
|
|
_name = name
|
|
if not _name:
|
|
_name = '%s.%s' % (fn.__module__, fn.__name__)
|
|
proxy = self._task_proxy_class(_name, fn, schedule, queue,
|
|
remove_existing_tasks, self._runner)
|
|
self._tasks[_name] = proxy
|
|
return proxy
|
|
if fn:
|
|
return _decorator(fn)
|
|
|
|
return _decorator
|
|
|
|
def run_task(self, task_name, args=None, kwargs=None):
|
|
# task_name can be either the name of a task or a Task instance.
|
|
if isinstance(task_name, Task):
|
|
task = task_name
|
|
task_name = task.task_name
|
|
# When we have a Task instance we do not need args and kwargs, but
|
|
# they are kept for backward compatibility
|
|
args = []
|
|
kwargs = {}
|
|
else:
|
|
task = None
|
|
proxy_task = self._tasks[task_name]
|
|
if app_settings.BACKGROUND_TASK_RUN_ASYNC:
|
|
self._pool_runner(proxy_task, task, *args, **kwargs)
|
|
else:
|
|
self._bg_runner(proxy_task, task, *args, **kwargs)
|
|
|
|
def run_next_task(self, queue=None):
|
|
return self._runner.run_next_task(self, queue)
|
|
|
|
|
|
class TaskSchedule(object):
|
|
SCHEDULE = 0
|
|
RESCHEDULE_EXISTING = 1
|
|
CHECK_EXISTING = 2
|
|
|
|
def __init__(self, run_at=None, priority=None, action=None):
|
|
self._run_at = run_at
|
|
self._priority = priority
|
|
self._action = action
|
|
|
|
@classmethod
|
|
def create(self, schedule):
|
|
if isinstance(schedule, TaskSchedule):
|
|
return schedule
|
|
priority = None
|
|
run_at = None
|
|
action = None
|
|
|
|
if schedule:
|
|
if isinstance(schedule, (int, timedelta, datetime)):
|
|
run_at = schedule
|
|
else:
|
|
run_at = schedule.get('run_at', None)
|
|
priority = schedule.get('priority', None)
|
|
action = schedule.get('action', None)
|
|
|
|
return TaskSchedule(run_at=run_at, priority=priority, action=action)
|
|
|
|
def merge(self, schedule):
|
|
params = {}
|
|
for name in ['run_at', 'priority', 'action']:
|
|
attr_name = '_%s' % name
|
|
value = getattr(self, attr_name, None)
|
|
if value is None:
|
|
params[name] = getattr(schedule, attr_name, None)
|
|
else:
|
|
params[name] = value
|
|
return TaskSchedule(**params)
|
|
|
|
@property
|
|
def run_at(self):
|
|
run_at = self._run_at or timezone.now()
|
|
if isinstance(run_at, int):
|
|
run_at = timezone.now() + timedelta(seconds=run_at)
|
|
if isinstance(run_at, timedelta):
|
|
run_at = timezone.now() + run_at
|
|
return run_at
|
|
|
|
@property
|
|
def priority(self):
|
|
return self._priority or 0
|
|
|
|
@property
|
|
def action(self):
|
|
return self._action or TaskSchedule.SCHEDULE
|
|
|
|
|
|
def __repr__(self):
|
|
return 'TaskSchedule(run_at=%s, priority=%s)' % (self._run_at,
|
|
self._priority)
|
|
|
|
def __eq__(self, other):
|
|
return self._run_at == other._run_at \
|
|
and self._priority == other._priority \
|
|
and self._action == other._action
|
|
|
|
|
|
class DBTaskRunner(object):
|
|
'''
|
|
Encapsulate the model related logic in here, in case
|
|
we want to support different queues in the future
|
|
'''
|
|
|
|
def __init__(self):
|
|
self.worker_name = str(os.getpid())
|
|
|
|
def schedule(self, task_name, args, kwargs, run_at=None,
|
|
priority=0, action=TaskSchedule.SCHEDULE, queue=None,
|
|
verbose_name=None, creator=None,
|
|
repeat=None, repeat_until=None, remove_existing_tasks=False):
|
|
'''Simply create a task object in the database'''
|
|
task = Task.objects.new_task(task_name, args, kwargs, run_at, priority,
|
|
queue, verbose_name, creator, repeat,
|
|
repeat_until, remove_existing_tasks)
|
|
if action != TaskSchedule.SCHEDULE:
|
|
task_hash = task.task_hash
|
|
now = timezone.now()
|
|
unlocked = Task.objects.unlocked(now)
|
|
existing = unlocked.filter(task_hash=task_hash)
|
|
if queue:
|
|
existing = existing.filter(queue=queue)
|
|
if action == TaskSchedule.RESCHEDULE_EXISTING:
|
|
updated = existing.update(run_at=run_at, priority=priority)
|
|
if updated:
|
|
return
|
|
elif action == TaskSchedule.CHECK_EXISTING:
|
|
if existing.count():
|
|
return
|
|
|
|
task.save()
|
|
signals.task_created.send(sender=self.__class__, task=task)
|
|
return task
|
|
|
|
def get_task_to_run(self, tasks, queue=None):
|
|
try:
|
|
available_tasks = [task for task in Task.objects.find_available(queue)
|
|
if task.task_name in tasks._tasks][:5]
|
|
for task in available_tasks:
|
|
# try to lock task
|
|
locked_task = task.lock(self.worker_name)
|
|
if locked_task:
|
|
return locked_task
|
|
return None
|
|
except OperationalError:
|
|
logger.warning('Failed to retrieve tasks. Database unreachable.')
|
|
|
|
def run_task(self, tasks, task):
|
|
logger.info('Running %s', task)
|
|
tasks.run_task(task)
|
|
|
|
def run_next_task(self, tasks, queue=None):
|
|
task = self.get_task_to_run(tasks, queue)
|
|
if task:
|
|
self.run_task(tasks, task)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class TaskProxy(object):
|
|
def __init__(self, name, task_function, schedule, queue, remove_existing_tasks, runner):
|
|
self.name = name
|
|
self.now = self.task_function = task_function
|
|
self.runner = runner
|
|
self.schedule = TaskSchedule.create(schedule)
|
|
self.queue = queue
|
|
self.remove_existing_tasks = remove_existing_tasks
|
|
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
schedule = kwargs.pop('schedule', None)
|
|
schedule = TaskSchedule.create(schedule).merge(self.schedule)
|
|
run_at = schedule.run_at
|
|
priority = kwargs.pop('priority', schedule.priority)
|
|
action = schedule.action
|
|
queue = kwargs.pop('queue', self.queue)
|
|
verbose_name = kwargs.pop('verbose_name', None)
|
|
creator = kwargs.pop('creator', None)
|
|
repeat = kwargs.pop('repeat', None)
|
|
repeat_until = kwargs.pop('repeat_until', None)
|
|
remove_existing_tasks = kwargs.pop('remove_existing_tasks', self.remove_existing_tasks)
|
|
|
|
return self.runner.schedule(self.name, args, kwargs, run_at, priority,
|
|
action, queue, verbose_name, creator,
|
|
repeat, repeat_until,
|
|
remove_existing_tasks)
|
|
|
|
def __str__(self):
|
|
return 'TaskProxy(%s)' % self.name
|
|
|
|
tasks = Tasks()
|
|
|
|
|
|
def autodiscover():
|
|
"""
|
|
Autodiscover tasks.py files in much the same way as admin app
|
|
"""
|
|
import imp
|
|
from django.conf import settings
|
|
|
|
for app in settings.INSTALLED_APPS:
|
|
try:
|
|
app_path = import_module(app).__path__
|
|
except (AttributeError, ImportError):
|
|
continue
|
|
try:
|
|
imp.find_module('tasks', app_path)
|
|
except ImportError:
|
|
continue
|
|
|
|
import_module("%s.tasks" % app)
|