Anna Sirota
8124bc62ae
When an extension or a version is deleted, create a `LogEntry` for each record linked to that extension or version. Each `LogEntry` has action flag `DELETION` and contains a blob with the last known field values of the records. These `LogEntry` can be viewed by admins in `/admin/admin/logentry/`, same as any other `LogEntry` created by Django admin, so this should be sufficient for archival purposes. Part of #82 Reviewed-on: #84
186 lines
6.3 KiB
Python
186 lines
6.3 KiB
Python
from typing import Set, Tuple, Mapping, Any
|
|
import copy
|
|
import logging
|
|
|
|
from django.contrib.admin.models import DELETION
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core import serializers
|
|
from django.db import models
|
|
from django.shortcuts import reverse
|
|
from django.utils import timezone
|
|
|
|
from .log_entries import attach_log_entry
|
|
from common import markdown
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
OldStateType = Mapping[str, Any]
|
|
"""Type declaration for the old state of a model instance.
|
|
|
|
See TrackChangesMixin.pre_save_record().
|
|
"""
|
|
|
|
|
|
class CreatedModifiedMixin(models.Model):
|
|
"""Add standard date fields to a model."""
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
date_created = models.DateTimeField(auto_now_add=True)
|
|
date_modified = models.DateTimeField(auto_now=True)
|
|
|
|
def get_logentries_url(self) -> str:
|
|
import utils
|
|
|
|
content_type_id = ContentType.objects.get_for_model(self.__class__).pk
|
|
object_id = self.pk
|
|
app_label = 'admin'
|
|
model = 'logentry'
|
|
path = reverse(f'admin:{app_label}_{model}_changelist')
|
|
query = utils.urlencode(
|
|
{
|
|
'content_type__id__exact': content_type_id,
|
|
'object_id__exact': object_id,
|
|
}
|
|
)
|
|
return f'{path}?{query}'
|
|
|
|
|
|
class RecordDeletionMixin:
|
|
def serialise(self) -> dict:
|
|
data = serializers.serialize('python', [self])[0]
|
|
data['fields']['pk'] = data['pk']
|
|
return data['fields']
|
|
|
|
def record_deletion(self):
|
|
"""Create a LogEntry describing a deletion of this object."""
|
|
msg_args = {'type': type(self), 'pk': self.pk}
|
|
logger.info('Deleting %(type)s pk=%(pk)s', msg_args)
|
|
if hasattr(self, 'cannot_be_deleted_reasons'):
|
|
cannot_be_deleted_reasons = self.cannot_be_deleted_reasons
|
|
if len(cannot_be_deleted_reasons) > 0:
|
|
# This shouldn't happen: prior validation steps should have taken care of this.
|
|
msg_args['reasons'] = cannot_be_deleted_reasons
|
|
logger.error("%(type)s pk=%(pk)s is being deleted but it %(reasons)s", msg_args)
|
|
state = self.serialise()
|
|
message = [
|
|
{
|
|
'deleted': {
|
|
'name': str(self._meta.verbose_name),
|
|
'object': repr(self),
|
|
'old_state': state,
|
|
},
|
|
}
|
|
]
|
|
attach_log_entry(self, message, action_flag=DELETION)
|
|
|
|
|
|
class TrackChangesMixin(RecordDeletionMixin, models.Model):
|
|
"""Tracks changes of Django models.
|
|
|
|
Tracks which fields have changed in the save() function, so that
|
|
the model can send signals upon changes in fields.
|
|
|
|
Only acts on fields listed in self.track_changes_to_fields.
|
|
"""
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
track_changes_to_fields: Set[str]
|
|
|
|
def _was_modified(self, old_instance: object, update_fields=None) -> bool:
|
|
"""Returns True if the record is modified.
|
|
|
|
Only checks fields listed in self.track_changes_to_fields.
|
|
"""
|
|
for field in self.track_changes_to_fields:
|
|
# If update_fields was given and this field was NOT in it,
|
|
# its value definitely won't be changed:
|
|
if update_fields is not None and field not in update_fields:
|
|
continue
|
|
old_val = getattr(old_instance, field, ...)
|
|
new_val = getattr(self, field, ...)
|
|
if old_val != new_val:
|
|
return True
|
|
return False
|
|
|
|
def pre_save_record(self, *args, **kwargs) -> Tuple[bool, OldStateType]:
|
|
"""Tracks the previous state of this object.
|
|
|
|
Only records fields listed in self.track_changes_to_fields.
|
|
|
|
:returns: (was changed, old state) tuple.
|
|
"""
|
|
if not self.pk:
|
|
return True, {}
|
|
|
|
try:
|
|
db_instance = type(self).objects.get(id=self.pk)
|
|
except type(self).DoesNotExist:
|
|
return True, {}
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
was_modified = self._was_modified(db_instance, update_fields=update_fields)
|
|
old_instance_data = {
|
|
attr: copy.deepcopy(getattr(db_instance, attr)) for attr in self.track_changes_to_fields
|
|
}
|
|
return was_modified, old_instance_data
|
|
|
|
def record_status_change(self, was_changed, old_state, **kwargs):
|
|
if not was_changed or not self.pk:
|
|
return
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
|
|
old_status = old_state.get('status', '')
|
|
now = timezone.now()
|
|
if old_status and old_status != self.status:
|
|
self.date_status_changed = now
|
|
if update_fields is not None:
|
|
kwargs['update_fields'] = kwargs['update_fields'].union({'date_status_changed'})
|
|
if self.status == self.STATUSES.APPROVED:
|
|
self.date_approved = now
|
|
if update_fields is not None:
|
|
kwargs['update_fields'] = kwargs['update_fields'].union({'date_approved'})
|
|
|
|
self.record_change(was_changed, old_state, **kwargs)
|
|
|
|
def record_change(self, was_changed, old_state, **kwargs):
|
|
if not was_changed or not self.pk:
|
|
return
|
|
|
|
changed_fields = {
|
|
field for field in old_state.keys() if getattr(self, field) != old_state[field]
|
|
}
|
|
message = [
|
|
{
|
|
'changed': {
|
|
'name': str(self._meta.verbose_name),
|
|
'object': repr(self),
|
|
'fields': list(changed_fields),
|
|
'old_state': old_state,
|
|
'new_state': {
|
|
field: getattr(self, f'get_{field}_display')()
|
|
if hasattr(self, f'get_{field}_display')
|
|
else getattr(self, field)
|
|
for field in changed_fields
|
|
},
|
|
},
|
|
}
|
|
]
|
|
attach_log_entry(self, message)
|
|
|
|
def sanitize(self, field_name, was_changed, old_state, **kwargs):
|
|
if not was_changed or not self.pk:
|
|
return
|
|
|
|
update_fields = kwargs.get('update_fields')
|
|
|
|
old_name = old_state.get(field_name)
|
|
if self.name and old_name != self.name:
|
|
self.name = markdown.sanitize(self.name)
|
|
if update_fields is not None:
|
|
kwargs['update_fields'] = kwargs['update_fields'].union({'name'})
|