190 lines
5.9 KiB
Python
190 lines
5.9 KiB
Python
from typing import Set, Tuple, Mapping, Any
|
|
import copy
|
|
import logging
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.db import models, transaction
|
|
from django.shortcuts import reverse
|
|
from django.utils import timezone
|
|
|
|
from .log_entries import attach_log_entry
|
|
from common import markdown
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
OldStateType = Mapping[str, Any]
|
|
"""Type declaration for the old state of a model instance.
|
|
|
|
See TrackChangesMixin.pre_save_record().
|
|
"""
|
|
|
|
|
|
class CreatedModifiedMixin(models.Model):
|
|
"""Add standard date fields to a model."""
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
date_created = models.DateTimeField(auto_now_add=True)
|
|
date_modified = models.DateTimeField(auto_now=True)
|
|
|
|
def get_logentries_url(self) -> str:
|
|
import utils
|
|
|
|
content_type_id = ContentType.objects.get_for_model(self.__class__).pk
|
|
object_id = self.pk
|
|
app_label = 'admin'
|
|
model = 'logentry'
|
|
path = reverse(f'admin:{app_label}_{model}_changelist')
|
|
query = utils.urlencode(
|
|
{
|
|
'content_type__id__exact': content_type_id,
|
|
'object_id__exact': object_id,
|
|
}
|
|
)
|
|
return f'{path}?{query}'
|
|
|
|
|
|
class TrackChangesMixin(models.Model):
|
|
"""Tracks changes of Django models.
|
|
|
|
Tracks which fields have changed in the save() function, so that
|
|
the model can send signals upon changes in fields.
|
|
|
|
Only acts on fields listed in self.track_changes_to_fields.
|
|
"""
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
track_changes_to_fields: Set[str]
|
|
|
|
def _compare(self, old_instance: object) -> bool:
|
|
"""Returns True if model fields have changed.
|
|
|
|
Only checks fields listed in self.track_changes_to_fields.
|
|
"""
|
|
for field in self.track_changes_to_fields:
|
|
old_val = getattr(old_instance, field, ...)
|
|
new_val = getattr(self, field, ...)
|
|
if old_val != new_val:
|
|
return True
|
|
return False
|
|
|
|
def pre_save_record(self) -> Tuple[bool, OldStateType]:
|
|
"""Tracks the previous state of this object.
|
|
|
|
Only records fields listed in self.track_changes_to_fields.
|
|
|
|
:returns: (was changed, old state) tuple.
|
|
"""
|
|
if not self.pk:
|
|
return True, {}
|
|
|
|
try:
|
|
db_instance = type(self).objects.get(id=self.pk)
|
|
except type(self).DoesNotExist:
|
|
return True, {}
|
|
|
|
was_modified = self._compare(db_instance)
|
|
old_instance_data = {
|
|
attr: copy.deepcopy(getattr(db_instance, attr)) for attr in self.track_changes_to_fields
|
|
}
|
|
return was_modified, old_instance_data
|
|
|
|
def record_status_change(self, was_changed, old_state, **kwargs):
|
|
if not was_changed or not self.pk:
|
|
return
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
|
|
old_status = old_state.get('status', '')
|
|
now = timezone.now()
|
|
if old_status and old_status != self.status:
|
|
self.date_status_changed = now
|
|
if update_fields is not None:
|
|
kwargs['update_fields'] = kwargs['update_fields'].union({'date_status_changed'})
|
|
if self.status == self.STATUSES.APPROVED:
|
|
self.date_approved = now
|
|
if update_fields is not None:
|
|
kwargs['update_fields'] = kwargs['update_fields'].union({'date_approved'})
|
|
|
|
self.record_change(was_changed, old_state, **kwargs)
|
|
|
|
def record_change(self, was_changed, old_state, **kwargs):
|
|
if not was_changed or not self.pk:
|
|
return
|
|
|
|
changed_fields = {
|
|
field for field in old_state.keys() if getattr(self, field) != old_state[field]
|
|
}
|
|
message = [
|
|
{
|
|
'changed': {
|
|
'name': str(self._meta.verbose_name),
|
|
'object': repr(self),
|
|
'fields': list(changed_fields),
|
|
'old_state': old_state,
|
|
'new_state': {
|
|
field: getattr(self, f'get_{field}_display')()
|
|
if hasattr(self, f'get_{field}_display')
|
|
else getattr(self, field)
|
|
for field in changed_fields
|
|
},
|
|
},
|
|
}
|
|
]
|
|
attach_log_entry(self, message)
|
|
|
|
def sanitize(self, field_name, was_changed, old_state, **kwargs):
|
|
if not was_changed or not self.pk:
|
|
return
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
|
|
old_name = old_state.get(field_name)
|
|
if self.name and old_name != self.name:
|
|
self.name = markdown.sanitize(self.name)
|
|
if update_fields is not None:
|
|
kwargs['update_fields'] = kwargs['update_fields'].union({'name'})
|
|
|
|
|
|
class SoftDeleteMixin(models.Model):
|
|
"""Model with soft-deletion functionality."""
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
date_deleted = models.DateTimeField(null=True, blank=True, editable=False)
|
|
|
|
@property
|
|
def is_deleted(self) -> bool:
|
|
return self.date_deleted is not None
|
|
|
|
@transaction.atomic
|
|
def delete(self, hard=False):
|
|
if hard:
|
|
super().delete()
|
|
else:
|
|
self.date_deleted = timezone.now()
|
|
self.save()
|
|
if hasattr(self, 'file'):
|
|
# .file should always exist but we don't want to break delete regardless
|
|
self.file.delete()
|
|
|
|
logger.warning('%r pk=%r deleted', self.__class__, self.pk)
|
|
|
|
def delete_queryset(self, request, queryset):
|
|
"""Given a queryset, soft-delete it from the database."""
|
|
queryset.update(date_deleted=timezone.now())
|
|
|
|
def undelete(self, save=True):
|
|
if not self.date_deleted:
|
|
logger.warning('%r pk=%r is not deleted, cannot undelete', self.__class__, self.pk)
|
|
return
|
|
self.date_deleted = None
|
|
if save:
|
|
self.save()
|
|
|
|
logger.warning('%r pk=%r deleted', self.__class__, self.pk)
|