extensions-website/common/model_mixins.py
Anna Sirota 8124bc62ae Log deleted extension data (#84)
When an extension or a version is deleted, create a `LogEntry` for each record linked to that extension or version.
Each `LogEntry` has action flag `DELETION` and contains a blob with the last known field values of the records.
These `LogEntry` can be viewed by admins in `/admin/admin/logentry/`, same as any other `LogEntry` created by Django admin, so this should be sufficient for archival purposes.

Part of #82

Reviewed-on: #84
2024-04-19 16:25:49 +02:00

186 lines
6.3 KiB
Python

from typing import Set, Tuple, Mapping, Any
import copy
import logging
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.core import serializers
from django.db import models
from django.shortcuts import reverse
from django.utils import timezone
from .log_entries import attach_log_entry
from common import markdown
logger = logging.getLogger(__name__)
OldStateType = Mapping[str, Any]
"""Type declaration for the old state of a model instance.
See TrackChangesMixin.pre_save_record().
"""
class CreatedModifiedMixin(models.Model):
"""Add standard date fields to a model."""
class Meta:
abstract = True
date_created = models.DateTimeField(auto_now_add=True)
date_modified = models.DateTimeField(auto_now=True)
def get_logentries_url(self) -> str:
import utils
content_type_id = ContentType.objects.get_for_model(self.__class__).pk
object_id = self.pk
app_label = 'admin'
model = 'logentry'
path = reverse(f'admin:{app_label}_{model}_changelist')
query = utils.urlencode(
{
'content_type__id__exact': content_type_id,
'object_id__exact': object_id,
}
)
return f'{path}?{query}'
class RecordDeletionMixin:
def serialise(self) -> dict:
data = serializers.serialize('python', [self])[0]
data['fields']['pk'] = data['pk']
return data['fields']
def record_deletion(self):
"""Create a LogEntry describing a deletion of this object."""
msg_args = {'type': type(self), 'pk': self.pk}
logger.info('Deleting %(type)s pk=%(pk)s', msg_args)
if hasattr(self, 'cannot_be_deleted_reasons'):
cannot_be_deleted_reasons = self.cannot_be_deleted_reasons
if len(cannot_be_deleted_reasons) > 0:
# This shouldn't happen: prior validation steps should have taken care of this.
msg_args['reasons'] = cannot_be_deleted_reasons
logger.error("%(type)s pk=%(pk)s is being deleted but it %(reasons)s", msg_args)
state = self.serialise()
message = [
{
'deleted': {
'name': str(self._meta.verbose_name),
'object': repr(self),
'old_state': state,
},
}
]
attach_log_entry(self, message, action_flag=DELETION)
class TrackChangesMixin(RecordDeletionMixin, models.Model):
"""Tracks changes of Django models.
Tracks which fields have changed in the save() function, so that
the model can send signals upon changes in fields.
Only acts on fields listed in self.track_changes_to_fields.
"""
class Meta:
abstract = True
track_changes_to_fields: Set[str]
def _was_modified(self, old_instance: object, update_fields=None) -> bool:
"""Returns True if the record is modified.
Only checks fields listed in self.track_changes_to_fields.
"""
for field in self.track_changes_to_fields:
# If update_fields was given and this field was NOT in it,
# its value definitely won't be changed:
if update_fields is not None and field not in update_fields:
continue
old_val = getattr(old_instance, field, ...)
new_val = getattr(self, field, ...)
if old_val != new_val:
return True
return False
def pre_save_record(self, *args, **kwargs) -> Tuple[bool, OldStateType]:
"""Tracks the previous state of this object.
Only records fields listed in self.track_changes_to_fields.
:returns: (was changed, old state) tuple.
"""
if not self.pk:
return True, {}
try:
db_instance = type(self).objects.get(id=self.pk)
except type(self).DoesNotExist:
return True, {}
update_fields = kwargs.get('update_fields')
was_modified = self._was_modified(db_instance, update_fields=update_fields)
old_instance_data = {
attr: copy.deepcopy(getattr(db_instance, attr)) for attr in self.track_changes_to_fields
}
return was_modified, old_instance_data
def record_status_change(self, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
update_fields = kwargs.get('update_fields')
old_status = old_state.get('status', '')
now = timezone.now()
if old_status and old_status != self.status:
self.date_status_changed = now
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'date_status_changed'})
if self.status == self.STATUSES.APPROVED:
self.date_approved = now
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'date_approved'})
self.record_change(was_changed, old_state, **kwargs)
def record_change(self, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
changed_fields = {
field for field in old_state.keys() if getattr(self, field) != old_state[field]
}
message = [
{
'changed': {
'name': str(self._meta.verbose_name),
'object': repr(self),
'fields': list(changed_fields),
'old_state': old_state,
'new_state': {
field: getattr(self, f'get_{field}_display')()
if hasattr(self, f'get_{field}_display')
else getattr(self, field)
for field in changed_fields
},
},
}
]
attach_log_entry(self, message)
def sanitize(self, field_name, was_changed, old_state, **kwargs):
if not was_changed or not self.pk:
return
update_fields = kwargs.get('update_fields')
old_name = old_state.get(field_name)
if self.name and old_name != self.name:
self.name = markdown.sanitize(self.name)
if update_fields is not None:
kwargs['update_fields'] = kwargs['update_fields'].union({'name'})