extensions-website/common/model_mixins.py

190 lines
5.9 KiB
Python

from typing import Set, Tuple, Mapping, Any
import copy
import logging
from django.contrib.contenttypes.models import ContentType
from django.db import models, transaction
from django.shortcuts import reverse
from django.utils import timezone
from .log_entries import attach_log_entry
from common import markdown
logger = logging.getLogger(__name__)
OldStateType = Mapping[str, Any]
"""Type declaration for the old state of a model instance.
See TrackChangesMixin.pre_save_record().
"""
class CreatedModifiedMixin(models.Model):
"""Add standard date fields to a model."""
class Meta:
abstract = True
date_created = models.DateTimeField(auto_now_add=True)
date_modified = models.DateTimeField(auto_now=True)
def get_logentries_url(self) -> str:
import utils
content_type_id = ContentType.objects.get_for_model(self.__class__).pk
object_id = self.pk
app_label = 'admin'
model = 'logentry'
path = reverse(f'admin:{app_label}_{model}_changelist')
query = utils.urlencode(
{
'content_type__id__exact': content_type_id,
'object_id__exact': object_id,
}
)
return f'{path}?{query}'
class TrackChangesMixin(models.Model):
"""Tracks changes of Django models.
Tracks which fields have changed in the save() function, so that
the model can send signals upon changes in fields.
Only acts on fields listed in self.track_changes_to_fields.
"""
class Meta:
abstract = True
track_changes_to_fields: Set[str]
def _compare(self, old_instance: object) -> bool:
"""Returns True if model fields have changed.
Only checks fields listed in self.track_changes_to_fields.
"""
for field in self.track_changes_to_fields:
old_val = getattr(old_instance, field, ...)
new_val = getattr(self, field, ...)
if old_val != new_val:
return True
return False
def pre_save_record(self) -> Tuple[bool, OldStateType]:
"""Tracks the previous state of this object.
Only records fields listed in self.track_changes_to_fields.
:returns: (was changed, old state) tuple.
"""
if not self.pk:
return True, {}
try:
db_instance = type(self).objects.get(id=self.pk)
except type(self).DoesNotExist:
return True, {}
was_modified = self._compare(db_instance)
old_instance_data = {
attr: copy.deepcopy(getattr(db_instance, attr)) for attr in self.track_changes_to_fields
}
return was_modified, old_instance_data
def record_status_change(self, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
update_fields = kwargs.get('update_fields')
old_status = old_state.get('status', '')
now = timezone.now()
if old_status and old_status != self.status:
self.date_status_changed = now
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'date_status_changed'})
if self.status == self.STATUSES.APPROVED:
self.date_approved = now
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'date_approved'})
self.record_change(was_changed, old_state, **kwargs)
def record_change(self, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
changed_fields = {
field for field in old_state.keys() if getattr(self, field) != old_state[field]
}
message = [
{
'changed': {
'name': str(self._meta.verbose_name),
'object': repr(self),
'fields': list(changed_fields),
'old_state': old_state,
'new_state': {
field: getattr(self, f'get_{field}_display')()
if hasattr(self, f'get_{field}_display')
else getattr(self, field)
for field in changed_fields
},
},
}
]
attach_log_entry(self, message)
def sanitize(self, field_name, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
update_fields = kwargs.get('update_fields')
old_name = old_state.get(field_name)
if self.name and old_name != self.name:
self.name = markdown.sanitize(self.name)
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'name'})
class SoftDeleteMixin(models.Model):
"""Model with soft-deletion functionality."""
class Meta:
abstract = True
date_deleted = models.DateTimeField(null=True, blank=True, editable=False)
@property
def is_deleted(self) -> bool:
return self.date_deleted is not None
@transaction.atomic
def delete(self, hard=False):
if hard:
super().delete()
else:
self.date_deleted = timezone.now()
self.save()
if hasattr(self, 'file'):
# .file should always exist but we don't want to break delete regardless
self.file.delete()
logger.warning('%r pk=%r deleted', self.__class__, self.pk)
def delete_queryset(self, request, queryset):
"""Given a queryset, soft-delete it from the database."""
queryset.update(date_deleted=timezone.now())
def undelete(self, save=True):
if not self.date_deleted:
logger.warning('%r pk=%r is not deleted, cannot undelete', self.__class__, self.pk)
return
self.date_deleted = None
if save:
self.save()
logger.warning('%r pk=%r deleted', self.__class__, self.pk)