extensions-website/common/model_mixins.py
Anna Sirota b80b49645d Record changes: better way to compare object states
Using plain `getattr` for comparing model instance fields results in
hard-to-debug issues caused by some field values being unequal even
though they technically are (one empty `ImageFileField` != another
instance of an empty `ImageFileField`). Using Django's built-in
serialiser doesn't lead to this kind of problems.

The change to how `date_deletion_requested` is set is due to the above
intruducing a situation when a `DateTimeField` is used  **before**
it's saved to the DB, read from it and deserialised to a Python's `datetime`,
which causes `AttributeError: 'str' object has no attribute 'isoformat'`.
See https://code.djangoproject.com/ticket/28356
2024-04-23 18:38:41 +02:00

186 lines
6.4 KiB
Python

from typing import Set, Tuple, Mapping, Any
import logging
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.core import serializers
from django.db import models
from django.shortcuts import reverse
from django.utils import timezone
from .log_entries import attach_log_entry
from common import markdown
logger = logging.getLogger(__name__)
OldStateType = Mapping[str, Any]
"""Type declaration for the old state of a model instance.
See TrackChangesMixin.pre_save_record().
"""
def _get_object_state(obj: object, fields=None, include_pk=False) -> dict:
data = serializers.serialize('python', [obj], fields=fields)[0]
if include_pk:
data['fields']['pk'] = data['pk']
return data['fields']
class CreatedModifiedMixin(models.Model):
"""Add standard date fields to a model."""
class Meta:
abstract = True
date_created = models.DateTimeField(auto_now_add=True)
date_modified = models.DateTimeField(auto_now=True)
def get_logentries_url(self) -> str:
import utils
content_type_id = ContentType.objects.get_for_model(self.__class__).pk
object_id = self.pk
app_label = 'admin'
model = 'logentry'
path = reverse(f'admin:{app_label}_{model}_changelist')
query = utils.urlencode(
{
'content_type__id__exact': content_type_id,
'object_id__exact': object_id,
}
)
return f'{path}?{query}'
class RecordDeletionMixin:
def record_deletion(self):
"""Create a LogEntry describing a deletion of this object."""
msg_args = {'type': type(self), 'pk': self.pk}
logger.info('Deleting %(type)s pk=%(pk)s', msg_args)
if hasattr(self, 'cannot_be_deleted_reasons'):
cannot_be_deleted_reasons = self.cannot_be_deleted_reasons
if len(cannot_be_deleted_reasons) > 0:
# This shouldn't happen: prior validation steps should have taken care of this.
msg_args['reasons'] = cannot_be_deleted_reasons
logger.error("%(type)s pk=%(pk)s is being deleted but it %(reasons)s", msg_args)
state = _get_object_state(self, include_pk=True)
message = [
{
'deleted': {
'name': str(self._meta.verbose_name),
'object': repr(self),
'old_state': state,
},
}
]
attach_log_entry(self, message, action_flag=DELETION)
class TrackChangesMixin(RecordDeletionMixin, models.Model):
"""Tracks changes of Django models.
Tracks which fields have changed in the save() function, so that
the model can send signals upon changes in fields.
Only acts on fields listed in self.track_changes_to_fields.
"""
class Meta:
abstract = True
track_changes_to_fields: Set[str]
def _was_modified(self, old_instance: object, update_fields=None) -> bool:
"""Returns True if the record is modified.
Only checks fields listed in self.track_changes_to_fields.
"""
for field in self.track_changes_to_fields:
# If update_fields was given and this field was NOT in it,
# its value definitely won't be changed:
if update_fields is not None and field not in update_fields:
continue
old_val = getattr(old_instance, field, ...)
new_val = getattr(self, field, ...)
if old_val != new_val:
return True
return False
def pre_save_record(self, *args, **kwargs) -> Tuple[bool, OldStateType]:
"""Tracks the previous state of this object.
Only records fields listed in self.track_changes_to_fields.
:returns: (was changed, old state) tuple.
"""
if not self.pk:
return True, {}
try:
db_instance = type(self).objects.get(id=self.pk)
except type(self).DoesNotExist:
return True, {}
update_fields = kwargs.get('update_fields')
was_modified = self._was_modified(db_instance, update_fields=update_fields)
old_instance_data = _get_object_state(db_instance, fields=self.track_changes_to_fields)
return was_modified, old_instance_data
def record_status_change(self, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
update_fields = kwargs.get('update_fields')
old_status = old_state.get('status', '')
now = timezone.now()
if old_status and old_status != self.status:
self.date_status_changed = now
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'date_status_changed'})
if self.status == self.STATUSES.APPROVED:
self.date_approved = now
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'date_approved'})
self.record_change(was_changed, old_state, **kwargs)
def record_change(self, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
new_state = _get_object_state(self, fields=self.track_changes_to_fields)
changed_fields = {
field for field in old_state.keys() if new_state.get(field) != old_state.get(field)
}
message = [
{
'changed': {
'name': str(self._meta.verbose_name),
'object': repr(self),
'fields': list(changed_fields),
'old_state': old_state,
'new_state': {
field: getattr(self, f'get_{field}_display')()
if hasattr(self, f'get_{field}_display')
else getattr(self, field)
for field in changed_fields
},
},
}
]
attach_log_entry(self, message)
def sanitize(self, field_name, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
update_fields = kwargs.get('update_fields')
old_name = old_state.get(field_name)
if self.name and old_name != self.name:
self.name = markdown.sanitize(self.name)
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'name'})