django-background-tasks/background_task/models.py
2022-03-15 14:23:03 -07:00

448 lines
15 KiB
Python

# -*- coding: utf-8 -*-
from datetime import timedelta
from hashlib import sha1
import json
import logging
import os
import traceback
from io import StringIO
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Q
from django.utils import timezone
from six import python_2_unicode_compatible
from background_task.exceptions import InvalidTaskError
from background_task.settings import app_settings
from background_task.signals import task_failed
from background_task.signals import task_rescheduled
logger = logging.getLogger(__name__)
class TaskQuerySet(models.QuerySet):
def created_by(self, creator):
"""
:return: A Task queryset filtered by creator
"""
content_type = ContentType.objects.get_for_model(creator)
return self.filter(
creator_content_type=content_type,
creator_object_id=creator.id,
)
class TaskManager(models.Manager):
def get_queryset(self):
return TaskQuerySet(self.model, using=self._db)
def created_by(self, creator):
return self.get_queryset().created_by(creator)
def find_available(self, queue=None):
now = timezone.now()
qs = self.unlocked(now)
if queue:
qs = qs.filter(queue=queue)
ready = qs.filter(run_at__lte=now, failed_at=None)
_priority_ordering = '{}priority'.format(
app_settings.BACKGROUND_TASK_PRIORITY_ORDERING)
ready = ready.order_by(_priority_ordering, 'run_at')
if app_settings.BACKGROUND_TASK_RUN_ASYNC:
currently_failed = self.failed().count()
currently_locked = self.locked(now).count()
count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - \
(currently_locked - currently_failed)
if count > 0:
ready = ready[:count]
else:
ready = self.none()
return ready
def unlocked(self, now):
max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
qs = self.get_queryset()
expires_at = now - timedelta(seconds=max_run_time)
unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at)
return qs.filter(unlocked)
def locked(self, now):
max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
qs = self.get_queryset()
expires_at = now - timedelta(seconds=max_run_time)
locked = Q(locked_by__isnull=False) | Q(locked_at__gt=expires_at)
return qs.filter(locked)
def failed(self):
"""
`currently_locked - currently_failed` in `find_available` assues that
tasks marked as failed are also in processing by the running PID.
"""
qs = self.get_queryset()
return qs.filter(failed_at__isnull=False)
def new_task(self, task_name, args=None, kwargs=None,
run_at=None, priority=0, queue=None, verbose_name=None,
creator=None, repeat=None, repeat_until=None,
remove_existing_tasks=False):
"""
If `remove_existing_tasks` is True, all unlocked tasks with the identical task hash will be removed.
The attributes `repeat` and `repeat_until` are not supported at the moment.
"""
args = args or ()
kwargs = kwargs or {}
if run_at is None:
run_at = timezone.now()
task_params = json.dumps((args, kwargs), sort_keys=True)
s = "%s%s" % (task_name, task_params)
task_hash = sha1(s.encode('utf-8')).hexdigest()
if remove_existing_tasks:
Task.objects.filter(task_hash=task_hash,
locked_at__isnull=True).delete()
return Task(task_name=task_name,
task_params=task_params,
task_hash=task_hash,
priority=priority,
run_at=run_at,
queue=queue,
verbose_name=verbose_name,
creator=creator,
repeat=repeat or Task.NEVER,
repeat_until=repeat_until,
)
def get_task(self, task_name, args=None, kwargs=None):
args = args or ()
kwargs = kwargs or {}
task_params = json.dumps((args, kwargs), sort_keys=True)
s = "%s%s" % (task_name, task_params)
task_hash = sha1(s.encode('utf-8')).hexdigest()
qs = self.get_queryset()
return qs.filter(task_hash=task_hash)
def drop_task(self, task_name, args=None, kwargs=None):
return self.get_task(task_name, args, kwargs).delete()
@python_2_unicode_compatible
class Task(models.Model):
# the "name" of the task/function to be run
task_name = models.CharField(max_length=190, db_index=True)
# the json encoded parameters to pass to the task
task_params = models.TextField()
# a sha1 hash of the name and params, to lookup already scheduled tasks
task_hash = models.CharField(max_length=40, db_index=True)
verbose_name = models.CharField(max_length=255, null=True, blank=True)
# what priority the task has
priority = models.IntegerField(default=0, db_index=True)
# when the task should be run
run_at = models.DateTimeField(db_index=True)
# Repeat choices are encoded as number of seconds
# The repeat implementation is based on this encoding
HOURLY = 3600
DAILY = 24 * HOURLY
WEEKLY = 7 * DAILY
EVERY_2_WEEKS = 2 * WEEKLY
EVERY_4_WEEKS = 4 * WEEKLY
NEVER = 0
REPEAT_CHOICES = (
(HOURLY, 'hourly'),
(DAILY, 'daily'),
(WEEKLY, 'weekly'),
(EVERY_2_WEEKS, 'every 2 weeks'),
(EVERY_4_WEEKS, 'every 4 weeks'),
(NEVER, 'never'),
)
repeat = models.BigIntegerField(choices=REPEAT_CHOICES, default=NEVER)
repeat_until = models.DateTimeField(null=True, blank=True)
# the "name" of the queue this is to be run on
queue = models.CharField(max_length=190, db_index=True,
null=True, blank=True)
# how many times the task has been tried
attempts = models.IntegerField(default=0, db_index=True)
# when the task last failed
failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
# details of the error that occurred
last_error = models.TextField(blank=True)
# details of who's trying to run the task at the moment
locked_by = models.CharField(max_length=64, db_index=True,
null=True, blank=True)
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
creator_content_type = models.ForeignKey(
ContentType, null=True, blank=True,
related_name='background_task', on_delete=models.CASCADE
)
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
objects = TaskManager()
def locked_by_pid_running(self):
"""
Check if the locked_by process is still running.
"""
if self.locked_by:
try:
# won't kill the process. kill is a bad named system call
os.kill(int(self.locked_by), 0)
return True
except:
return False
else:
return None
locked_by_pid_running.boolean = True
def has_error(self):
"""
Check if the last_error field is empty.
"""
return bool(self.last_error)
has_error.boolean = True
def params(self):
args, kwargs = json.loads(self.task_params)
# need to coerce kwargs keys to str
kwargs = dict((str(k), v) for k, v in kwargs.items())
return args, kwargs
def lock(self, locked_by):
now = timezone.now()
unlocked = Task.objects.unlocked(now).filter(pk=self.pk)
updated = unlocked.update(locked_by=locked_by, locked_at=now)
if updated:
return Task.objects.get(pk=self.pk)
return None
def _extract_error(self, type, err, tb):
file = StringIO()
traceback.print_exception(type, err, tb, None, file)
return file.getvalue()
def increment_attempts(self):
self.attempts += 1
self.save()
def has_reached_max_attempts(self):
max_attempts = app_settings.BACKGROUND_TASK_MAX_ATTEMPTS
return self.attempts >= max_attempts
def is_repeating_task(self):
return self.repeat > self.NEVER
def reschedule(self, type, err, traceback):
'''
Set a new time to run the task in future, or create a CompletedTask and delete the Task
if it has reached the maximum of allowed attempts
'''
self.last_error = self._extract_error(type, err, traceback)
self.increment_attempts()
if self.has_reached_max_attempts() or isinstance(err, InvalidTaskError):
self.failed_at = timezone.now()
logger.warning('Marking task %s as failed', self)
completed = self.create_completed_task()
task_failed.send(sender=self.__class__,
task_id=self.id, completed_task=completed)
self.delete()
else:
backoff = timedelta(seconds=(self.attempts ** 4) + 5)
self.run_at = timezone.now() + backoff
logger.warning('Rescheduling task %s for %s later at %s', self,
backoff, self.run_at)
task_rescheduled.send(sender=self.__class__, task=self)
self.locked_by = None
self.locked_at = None
self.save()
def create_completed_task(self):
'''
Returns a new CompletedTask instance with the same values
'''
completed_task = CompletedTask(
task_name=self.task_name,
task_params=self.task_params,
task_hash=self.task_hash,
priority=self.priority,
run_at=timezone.now(),
queue=self.queue,
attempts=self.attempts,
failed_at=self.failed_at,
last_error=self.last_error,
locked_by=self.locked_by,
locked_at=self.locked_at,
verbose_name=self.verbose_name,
creator=self.creator,
repeat=self.repeat,
repeat_until=self.repeat_until,
)
completed_task.save()
return completed_task
def create_repetition(self):
"""
:return: A new Task with an offset of self.repeat, or None if the self.repeat_until is reached
"""
if not self.is_repeating_task():
return None
if self.repeat_until and self.repeat_until <= timezone.now():
# Repeat chain completed
return None
args, kwargs = self.params()
new_run_at = self.run_at + timedelta(seconds=self.repeat)
while new_run_at < timezone.now():
new_run_at += timedelta(seconds=self.repeat)
new_task = TaskManager().new_task(
task_name=self.task_name,
args=args,
kwargs=kwargs,
run_at=new_run_at,
priority=self.priority,
queue=self.queue,
verbose_name=self.verbose_name,
creator=self.creator,
repeat=self.repeat,
repeat_until=self.repeat_until,
)
new_task.save()
return new_task
def save(self, *arg, **kw):
# force NULL rather than empty string
self.locked_by = self.locked_by or None
return super(Task, self).save(*arg, **kw)
def __str__(self):
return u'{}'.format(self.verbose_name or self.task_name)
class Meta:
db_table = 'background_task'
class CompletedTaskQuerySet(models.QuerySet):
def created_by(self, creator):
"""
:return: A CompletedTask queryset filtered by creator
"""
content_type = ContentType.objects.get_for_model(creator)
return self.filter(
creator_content_type=content_type,
creator_object_id=creator.id,
)
def failed(self, within=None):
"""
:param within: A timedelta object
:return: A queryset of CompletedTasks that failed within the given timeframe (e.g. less than 1h ago)
"""
qs = self.filter(
failed_at__isnull=False,
)
if within:
time_limit = timezone.now() - within
qs = qs.filter(failed_at__gt=time_limit)
return qs
def succeeded(self, within=None):
"""
:param within: A timedelta object
:return: A queryset of CompletedTasks that completed successfully within the given timeframe
(e.g. less than 1h ago)
"""
qs = self.filter(
failed_at__isnull=True,
)
if within:
time_limit = timezone.now() - within
qs = qs.filter(run_at__gt=time_limit)
return qs
@python_2_unicode_compatible
class CompletedTask(models.Model):
# the "name" of the task/function to be run
task_name = models.CharField(max_length=190, db_index=True)
# the json encoded parameters to pass to the task
task_params = models.TextField()
# a sha1 hash of the name and params, to lookup already scheduled tasks
task_hash = models.CharField(max_length=40, db_index=True)
verbose_name = models.CharField(max_length=255, null=True, blank=True)
# what priority the task has
priority = models.IntegerField(default=0, db_index=True)
# when the task should be run
run_at = models.DateTimeField(db_index=True)
repeat = models.BigIntegerField(
choices=Task.REPEAT_CHOICES, default=Task.NEVER)
repeat_until = models.DateTimeField(null=True, blank=True)
# the "name" of the queue this is to be run on
queue = models.CharField(max_length=190, db_index=True,
null=True, blank=True)
# how many times the task has been tried
attempts = models.IntegerField(default=0, db_index=True)
# when the task last failed
failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
# details of the error that occurred
last_error = models.TextField(blank=True)
# details of who's trying to run the task at the moment
locked_by = models.CharField(max_length=64, db_index=True,
null=True, blank=True)
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
creator_content_type = models.ForeignKey(
ContentType, null=True, blank=True,
related_name='completed_background_task', on_delete=models.CASCADE
)
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
objects = CompletedTaskQuerySet.as_manager()
def locked_by_pid_running(self):
"""
Check if the locked_by process is still running.
"""
if self.locked_by:
try:
# won't kill the process. kill is a bad named system call
os.kill(int(self.locked_by), 0)
return True
except:
return False
else:
return None
locked_by_pid_running.boolean = True
def has_error(self):
"""
Check if the last_error field is empty.
"""
return bool(self.last_error)
has_error.boolean = True
def __str__(self):
return u'{} - {}'.format(
self.verbose_name or self.task_name,
self.run_at,
)