Anna Sirota
918f63ec4b
Otherwise profile image doesn't update when expected because CDN caches that URL.
158 lines
5.2 KiB
Python
158 lines
5.2 KiB
Python
from pathlib import Path
|
|
from typing import Optional
|
|
import logging
|
|
import time
|
|
|
|
from django.contrib.admin.utils import NestedObjects
|
|
from django.contrib.auth.models import AbstractUser
|
|
from django.db import models, DEFAULT_DB_ALIAS, transaction
|
|
from django.templatetags.static import static
|
|
from django.utils.dateparse import parse_datetime
|
|
|
|
from common.model_mixins import TrackChangesMixin
|
|
from files.utils import get_sha256_from_value
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def user_image_upload_to(instance, filename):
|
|
assert instance.pk
|
|
prefix = 'avatars/'
|
|
# Blender ID avatar URL is based on file hash,
|
|
# so is the filename taken from that URL, so this combination is unique enough
|
|
_hash = get_sha256_from_value(str(instance.pk) + filename)
|
|
extension = Path(filename).suffix
|
|
path = Path(prefix, _hash[:2], _hash).with_suffix(extension)
|
|
return path
|
|
|
|
|
|
def shortuid() -> str:
|
|
"""Generate a 14-characters long string ID based on time."""
|
|
return hex(int(time.monotonic() * 10**10))[2:]
|
|
|
|
|
|
class User(TrackChangesMixin, AbstractUser):
|
|
track_changes_to_fields = {
|
|
'is_superuser',
|
|
'is_staff',
|
|
'date_deletion_requested',
|
|
'confirmed_email_at',
|
|
'full_name',
|
|
'email',
|
|
'is_subscribed_to_notification_emails',
|
|
}
|
|
|
|
class Meta:
|
|
db_table = 'auth_user'
|
|
|
|
email = models.EmailField(blank=False, null=False, unique=True)
|
|
full_name = models.CharField(max_length=255, blank=True, default='')
|
|
image = models.ImageField(upload_to=user_image_upload_to, blank=True, null=True)
|
|
badges = models.JSONField(null=True, blank=True)
|
|
|
|
date_deletion_requested = models.DateTimeField(null=True, blank=True)
|
|
confirmed_email_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
is_subscribed_to_notification_emails = models.BooleanField(null=False, default=True)
|
|
|
|
def __str__(self) -> str:
|
|
return self.username
|
|
|
|
@property
|
|
def image_url(self) -> Optional[str]:
|
|
"""Return a URL of the Profile image."""
|
|
if not self.image:
|
|
return static('common/images/blank-profile-pic.png')
|
|
|
|
return self.image.url
|
|
|
|
def _get_nested_objects_collector(self) -> NestedObjects:
|
|
collector = NestedObjects(using=DEFAULT_DB_ALIAS)
|
|
collector.collect([self])
|
|
return collector
|
|
|
|
@property
|
|
def can_be_deleted(self) -> bool:
|
|
"""Fetch objects referencing this profile and determine if it can be deleted."""
|
|
if self.is_staff or self.is_superuser:
|
|
return False
|
|
collector = self._get_nested_objects_collector()
|
|
if collector.protected:
|
|
return False
|
|
return True
|
|
|
|
def request_deletion(self, date_deletion_requested):
|
|
"""Store date of the deletion request and deactivate the user."""
|
|
if not self.can_be_deleted:
|
|
logger.error('Deletion requested for a protected account pk=%s, ignoring', self.pk)
|
|
return
|
|
logger.warning(
|
|
'Deletion of pk=%s requested on %s, deactivating this account',
|
|
self.pk,
|
|
date_deletion_requested,
|
|
)
|
|
self.is_active = False
|
|
self.date_deletion_requested = parse_datetime(date_deletion_requested)
|
|
self.is_subscribed_to_notification_emails = False
|
|
self.save(
|
|
update_fields=[
|
|
'is_active',
|
|
'date_deletion_requested',
|
|
'is_subscribed_to_notification_emails',
|
|
]
|
|
)
|
|
|
|
@transaction.atomic
|
|
def anonymize_or_delete(self):
|
|
"""Delete user completely if they don't have public records, otherwise anonymize.
|
|
|
|
Does nothing if deletion hasn't been explicitly requested earlier.
|
|
"""
|
|
if not self.can_be_deleted:
|
|
logger.error(
|
|
'User.anonymize called, but pk=%s cannot be deleted',
|
|
self.pk,
|
|
)
|
|
return
|
|
|
|
if not self.date_deletion_requested:
|
|
logger.error(
|
|
"User.anonymize_or_delete called, but deletion of pk=%s hasn't been requested",
|
|
self.pk,
|
|
)
|
|
return
|
|
|
|
if self.image:
|
|
try:
|
|
self.image.delete(save=False)
|
|
except Exception:
|
|
logger.exception(
|
|
'Unable to delete image %s for pk=%s',
|
|
self.image.name,
|
|
self.pk,
|
|
)
|
|
|
|
# Simply delete the user if there are no publicly listed records linked to it
|
|
if self.extensions.listed.count() == 0 and self.files.listed.count() == 0:
|
|
logger.warning('Deleted user pk=%s', self.pk)
|
|
self.delete()
|
|
else:
|
|
username = f'del{shortuid()}'
|
|
self.__class__.objects.filter(pk=self.pk).update(
|
|
email=f'{username}@example.com',
|
|
full_name='',
|
|
username=username,
|
|
badges=None,
|
|
is_active=False,
|
|
image=None,
|
|
)
|
|
logger.warning('Anonymized user pk=%s', self.pk)
|
|
|
|
@property
|
|
def is_moderator(self):
|
|
# Used to review and approve extensions
|
|
for g in self.groups.all():
|
|
if g.name == 'moderators':
|
|
return True
|
|
return False
|