blender-id/bid_main/models.py
Oleg Komarov 2fec3f32cf UserSession: add terminate method
use method docstring to clarify the relation between session objects
2024-08-05 13:45:32 +02:00

697 lines
23 KiB
Python

from typing import Optional, Set
import itertools
import logging
import os.path
import re
from django import urls
from django.conf import settings
from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
from django.contrib.auth.models import PermissionsMixin
from django.contrib.sessions.models import Session
from django.core import validators
from django.core.exceptions import ValidationError
from django.core.mail import send_mail
from django.db import models, transaction
from django.db.models import Q
from django.templatetags.static import static
from django.utils import timezone
from django.utils.deconstruct import deconstructible
from django.utils.translation import gettext_lazy as _
import oauth2_provider.models as oa2_models
import user_agents
from . import fields
from . import hashers
import bid_main.file_utils
import bid_main.utils
log = logging.getLogger(__name__)
nickname_illegal_chars = re.compile(r"[^\w.+-]")
strip_email = re.compile("@.*$")
sizes = {
"s": 64,
"m": 128,
"l": 256,
}
def random_nums():
"""Increasingly larger random number generator."""
import random
lower, upper = 1, 5
while True:
yield random.randint(lower, upper - 1)
lower, upper = upper, upper * 3
class RoleManager(models.Manager):
def badges(self) -> models.QuerySet:
"""Query for those roles that are considered badges."""
return (
self.filter(is_public=True, is_active=True, is_badge=True)
.exclude(badge_img__isnull=True)
.exclude(badge_img="")
)
class Role(models.Model):
name = models.CharField(max_length=80)
description = models.CharField(
max_length=255,
blank=True,
null=False,
help_text="Note that this is shown for badges on users' dashboard page.",
)
is_active = models.BooleanField(default=True, null=False)
is_badge = models.BooleanField(
default=False,
null=False,
help_text="Note that a roles is only actually used as a badge when this checkbox "
"is enabled <strong>and</strong> it has a badge image.",
)
is_public = models.BooleanField(
default=True,
null=False,
help_text="When enabled, this role/badge will be readable through the userinfo API.",
)
prevent_user_deletion = models.BooleanField(
default=False,
null=False,
help_text="When enabled, user accounts with this role/badge cannot be deleted.",
)
may_manage_roles = models.ManyToManyField(
"Role",
related_name="managers",
blank=True,
help_text="Users with this role will be able to grant or revoke these roles to "
"any other user.",
)
# For Badges:
label = models.CharField(
max_length=255,
blank=True,
null=False,
help_text="Human-readable name for a badge. Required for badges, not for roles.",
)
badge_img = models.ImageField(
verbose_name="Badge image",
help_text="Visual representation of a badge.",
upload_to="badges",
height_field="badge_img_height",
width_field="badge_img_width",
null=True,
blank=True,
)
badge_img_height = models.IntegerField(null=True, blank=True)
badge_img_width = models.IntegerField(null=True, blank=True)
link = models.URLField(
null=True,
blank=True,
help_text="Clicking on a badge image will lead to this link.",
)
objects = RoleManager()
class Meta:
ordering = ["-is_active", "name"]
def __str__(self):
if self.is_active:
return self.name
return "%s [inactive]" % self.name
@property
def admin_url(self) -> str:
view_name = f"admin:{self._meta.app_label}_{self._meta.model_name}_change"
return urls.reverse(view_name, args=(self.id,))
def clean(self):
# Labels are required for badges.
if self.is_badge and not self.label:
raise ValidationError({"label": _("Badges must have a label.")})
@property
def _thumbnail_size_list(self) -> list:
return [(px, px) for px in sizes.values()]
@property
def thumbnails(self) -> dict:
"""Return a dictionary with relative URLs of a set of predefined thumbnails."""
base_url, ext = os.path.splitext(self.badge_img.url)
return {
size_string: {
'url': f'{base_url}_{size_in_px}x{size_in_px}.png',
'width': size_in_px,
'height': size_in_px,
}
for size_string, size_in_px in sizes.items()
}
class UserManager(BaseUserManager):
"""UserManager that doesn't use a username, but an email instead."""
use_in_migrations = True
def _create_user(self, email, password, **extra_fields):
"""Creates and saves a User with the given email and password."""
if not email:
raise ValueError("The given email must be set")
email = self.normalize_email(email)
user = self.model(email=email, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_user(self, email, password=None, **extra_fields):
extra_fields.setdefault("is_staff", False)
extra_fields.setdefault("is_superuser", False)
return self._create_user(email, password, **extra_fields)
def create_superuser(self, email, password, **extra_fields):
extra_fields.setdefault("is_staff", True)
extra_fields.setdefault("is_superuser", True)
if extra_fields.get("is_staff") is not True:
raise ValueError("Superuser must have is_staff=True.")
if extra_fields.get("is_superuser") is not True:
raise ValueError("Superuser must have is_superuser=True.")
return self._create_user(email, password, **extra_fields)
def get_by_natural_key(self, username):
"""Look up case-insensitively otherwise people can't login with their mixed-case emails."""
return self.get(**{f'{self.model.USERNAME_FIELD}__iexact': username.lower()})
@deconstructible
class UnicodeNicknameValidator(validators.RegexValidator):
regex = r"^[\w.+-]+$"
message = _(
"Enter a valid nickname. This value may contain only letters, "
"numbers, and ./+/-/_ characters."
)
def avatar_upload_to(instance, filename):
"""Generate path for storing an avatar image based on its hash."""
file_field = instance.avatar
return bid_main.file_utils.get_upload_path('avatar', file_field)
class User(AbstractBaseUser, PermissionsMixin):
"""Implementing a fully featured User model with admin-compliant permissions.
Email and password are required. Other fields are optional.
"""
email = models.EmailField(
_("email address"),
max_length=64,
unique=True,
help_text=_("Required. 64 characters or fewer."),
error_messages={
"unique": _("A user with that email address already exists."),
},
db_index=True,
)
full_name = models.CharField(_("full name"), max_length=80, blank=True, db_index=True)
# Named 'nickname' and not 'username', because the latter already is a
# 'thing' in Django. Not having one, and identifying users by their email
# address, was our own customisation.
# Bringing back a field 'username' with different semantics would be
# confusing. For example, there still is a field 'username' in the default
# Django auth form that is used for 'the thing used to log in', which is
# the email address in our case.
nickname = models.CharField(
"nickname",
max_length=80,
unique=True,
help_text=_(
"A short (one-word) name that can be used to address you. "
"80 characters or fewer. Letters, digits, and ./+/-/_ only."
),
validators=[UnicodeNicknameValidator()],
error_messages={
"unique": _("That name is already used by someone else."),
},
)
avatar = models.ImageField(
upload_to=avatar_upload_to,
null=True,
blank=True,
)
roles = models.ManyToManyField(Role, related_name="users", blank=True)
public_roles_as_string = models.CharField(
"Public roles as string",
max_length=255,
blank=True,
default="",
help_text=_("String representation of public roles, for comparison in webhooks"),
)
private_badges = models.ManyToManyField(
Role,
blank=True,
limit_choices_to={"is_badge": True},
help_text='Badges marked as "private" by the user. Can contain badges '
"they do not have at the moment; this is fine.",
)
confirmed_email_at = models.DateTimeField(
null=True,
blank=True,
help_text=_(
"Designates the date & time at which the user confirmed their email address. "
"None if not yet confirmed."
),
)
last_login_ip = models.GenericIPAddressField(
null=True,
blank=True,
help_text=_("IP address (IPv4 or IPv6) used for previous login, if any."),
)
current_login_ip = models.GenericIPAddressField(
null=True,
blank=True,
help_text=_("IP address (IPv4 or IPv6) used for current login, if any."),
)
login_count = models.PositiveIntegerField(
default=0, blank=True, help_text=_("Number of times this user logged in.")
)
is_staff = models.BooleanField(
_("staff status"),
default=False,
help_text=_("Designates whether the user can log into this admin site."),
)
is_active = models.BooleanField(
_("active"),
default=True,
help_text=_(
"Designates whether this user should be treated as active. "
"Unselect this instead of deleting accounts."
),
)
date_deletion_requested = models.DateTimeField(
null=True,
blank=True,
help_text=_(
"Indicates when deletion of this account was requested, if ever."
" Once set, it should not be changed. Can be set on the users list."
),
)
date_joined = models.DateTimeField(_("date joined"), default=timezone.now)
last_update = models.DateTimeField(_("last update"), default=timezone.now)
privacy_policy_agreed = models.DateTimeField(
_("privacy policy agreed"),
null=True,
blank=True,
help_text=_("Date when this user agreed to our privacy policy."),
)
email_change_preconfirm = models.EmailField(
_("email address to change to"),
blank=True,
max_length=64,
help_text=_("New address for the user, set while in the confirmation flow."),
)
objects = UserManager()
USERNAME_FIELD = "email"
REQUIRED_FIELDS = []
class Meta:
verbose_name = _("user")
verbose_name_plural = _("users")
def __repr__(self) -> str:
return f"<User id={self.id} email={self.email!r}>"
def save(self, *args, **kwargs):
self.last_update = timezone.now()
updated_fields = {"last_update"}
if kwargs.get("update_fields") is not None:
kwargs["update_fields"] = set(kwargs["update_fields"]).union(updated_fields)
return super().save(*args, **kwargs)
def public_roles(self) -> set:
"""Returns public role names.
Used in the bid_api.signals module to detect role changes without
using more lookups in the database.
"""
return {role.name for role in self.roles.filter(is_public=True, is_active=True)}
def public_badges(self) -> models.query.QuerySet:
"""Returns a QuerySet of public badges.
Public badges are those badges that are marked as public in the database
and not marked private by the user.
"""
private_badge_ids = {role.id for role in self.private_badges.all()}
return self.all_badges().exclude(id__in=private_badge_ids)
def all_badges(self) -> models.query.QuerySet:
"""Returns a QuerySet of all badges.
Returned are those badges that are marked as public in the database,
regardless of which ones the user marked as private.
"""
return self.roles.badges()
def get_full_name(self):
"""Returns the full name."""
return self.full_name.strip()
def get_short_name(self):
"""Returns the short name for the user."""
parts = self.full_name.split(" ", 1)
return parts[0]
def email_user(self, subject, message, from_email=None, **kwargs):
"""Sends an email to this User."""
send_mail(subject, message, from_email, [self.email], **kwargs)
@property
def has_confirmed_email(self):
return self.confirmed_email_at is not None
@property
def email_to_confirm(self):
"""The email to confirm, either 'email_change_preconfirm' when set, or 'email'."""
return self.email_change_preconfirm or self.email
@property
def role_names(self) -> Set[str]:
return {role.name for role in self.roles.all()}
@property
def must_pp_agree(self) -> bool:
"""Return True when user needs to agree to new privacy policy."""
return self.privacy_policy_agreed is None or self.privacy_policy_agreed < settings.PPDATE
def revoke_all_tokens(self, date=None):
"""Revoke all access and refresh tokens belonging to this user."""
if not date:
date = timezone.now()
self.bid_main_oauth2accesstoken.filter(
Q(expires__isnull=True) | Q(expires__gt=date)
).update(expires=date)
self.bid_main_oauth2refreshtoken.filter(
Q(revoked__isnull=True) | Q(revoked__gt=date)
).update(revoked=date)
@property
def can_be_deleted(self) -> bool:
"""Determine if deletion of this account can be requested or not."""
if self.is_staff or self.is_superuser:
return False
if self.bid_main_oauth2application.count():
return False
if any(role.prevent_user_deletion for role in self.roles.all()):
return False
return True
def request_deletion(self):
"""Store date of the deletion request and deactivate the user."""
_log = log.getChild('delete')
if not self.can_be_deleted:
_log.error(
'Ignoring deletion request for pk=%s which cannot be deleted',
self.pk,
)
return
if not self.date_deletion_requested:
self.date_deletion_requested = timezone.now()
else:
# This should never happen, but if it does, keep the original date
log.error(
"Received a deletion request for pk=%s who already requested it at %s",
self.pk,
self.date_deletion_requested,
)
self.revoke_all_tokens(date=self.date_deletion_requested)
self.is_active = False
self.save(update_fields=["date_deletion_requested", "is_active"])
@transaction.atomic
def anonymize(self, *args, using=None, **kwargs) -> None:
"""Anonymize user data and delete user tokens without triggering webhooks.
Does nothing if deletion hasn't been explicitly requested earlier.
"""
_log = log.getChild('delete')
if not self.can_be_deleted:
_log.error(
'User.anonymize called, but pk=%s cannot be deleted',
self.pk,
)
return
if not self.date_deletion_requested:
_log.error(
"User.anonymize called, but deletion of pk=%s hasn't been requested",
self.pk,
)
return
if self.avatar:
try:
# No save to avoid triggering a signal that sends out webhook calls.
self.avatar.delete(save=False)
except Exception:
_log.exception(
'Unable to delete avatar %s for pk=%s',
self.avatar.name,
self.pk,
)
shortuid = hashers.shortuid()
# Use an update to avoid triggering a signal that sends out webhook calls.
# All apps that handle deletion should have already handled it at `request_deletion`.
nickname = f'del{shortuid}'
self.__class__.objects.filter(pk=self.pk).update(
email=f'{nickname}@example.com',
full_name='<deleted>',
nickname=nickname,
last_login_ip=None,
current_login_ip=None,
is_active=False,
avatar=None,
)
self.sessions.all().delete()
@classmethod
def generate_nickname(cls, email: Optional[str] = '', full_name: Optional[str] = '') -> str:
"""Return a unique nickname based on given full name or email."""
full_name = nickname_illegal_chars.sub("", full_name.replace(" ", "-"))
email = nickname_illegal_chars.sub("", strip_email.sub("", email))
base = full_name or email
log.debug("Generating unique nickname for base %r", base)
def acceptable_nickname(name: str) -> bool:
"""Return True if the nickname is unique."""
return not cls.objects.filter(nickname=name).exists()
if acceptable_nickname(base):
return base
# Try increasingly larger random numbers as a suffix.
for num in itertools.islice(random_nums(), 1000):
nickname = f"{base}-{num}"
if acceptable_nickname(nickname):
return nickname
raise ValueError(f"Unable to find unique name for base {base!r} after trying 1000 names")
@property
def avatar_thumbnail_paths(self) -> dict:
"""Return a dictionary of predefined thumbnails sizes and their storage paths."""
field = self.avatar
if not field:
return {}
return bid_main.file_utils.get_avatar_thumbnails_paths(field.name)
def avatar_thumbnail_url(self) -> str:
"""Return the absolute URL of the thumbnailed avatar.
Returns either the URL of the generated thumbnail, or the URL
of the default avatar (if this one is empty).
"""
thumbnail_paths = self.avatar_thumbnail_paths
size = settings.AVATAR_DEFAULT_SIZE_PIXELS
default_thumbnail_path = thumbnail_paths.get(size)
if not self or not default_thumbnail_path:
return bid_main.file_utils.get_absolute_url(static(settings.AVATAR_DEFAULT_FILENAME))
return bid_main.file_utils.get_absolute_url(self.avatar.storage.url(default_thumbnail_path))
class SettingValueField(models.CharField):
def __init__(self, *args, **kwargs): # noqa: D107
kwargs["max_length"] = 128
super().__init__(*args, **kwargs)
SETTING_DATA_TYPE_CHOICES = [
("bool", "Boolean"),
]
class Setting(models.Model):
name = models.CharField(max_length=128)
description = models.CharField(max_length=128)
data_type = models.CharField(max_length=32, choices=SETTING_DATA_TYPE_CHOICES, default="bool")
default = SettingValueField()
users = models.ManyToManyField(settings.AUTH_USER_MODEL, through="UserSetting")
def __str__(self):
return self.name.replace("_", " ").title()
class UserSetting(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
setting = models.ForeignKey(Setting, on_delete=models.CASCADE)
unconstrained_value = SettingValueField()
def __str__(self):
return "Setting %r of %r" % (self.setting.name, self.user.email)
class OAuth2AccessToken(oa2_models.AbstractAccessToken):
class Meta:
verbose_name = "OAuth2 access token"
host_label = models.CharField(max_length=255, unique=False, blank=True)
subclient = models.CharField(max_length=255, unique=False, blank=True)
class OAuth2RefreshToken(oa2_models.AbstractRefreshToken):
class Meta:
verbose_name = "OAuth2 refresh token"
class OAuth2Application(oa2_models.AbstractApplication):
class Meta:
verbose_name = "OAuth2 application"
url = models.URLField(
unique=False, blank=True, help_text="URL displayed to users at 'Applications' page."
)
icon = models.ImageField(
blank=True,
upload_to="app-icons",
help_text="Icon displayed to users at 'Applications' page.",
)
logo = fields.SVGAndImageField(
blank=True,
upload_to="app-logos",
help_text="Logo displayed to users at the OAuth login page.",
)
background = models.ImageField(
blank=True,
upload_to="app-backgrounds",
help_text="Background displayed to users at the OAuth login page.",
)
summary = models.CharField(
blank=True,
max_length=128,
help_text="Short text displayed to users under the logo at the OAuth login page.",
)
notes = models.TextField(
blank=True,
help_text="Information about this application, for staff only, not to present on frontend.",
)
notes_public = models.TextField(
blank=True, help_text="Message to be displayed to the users logging in. HTML is allowed."
)
@property
def has_custom_background(self):
"""Whether or not this app has everything set for a custom background."""
return self.background and self.summary
class UserNote(models.Model):
"""CRM-like note added to a user."""
class Meta:
verbose_name = "Note"
ordering = ("-created",)
user = models.ForeignKey(User, related_name="notes", on_delete=models.CASCADE)
creator = models.ForeignKey(
User,
null=True,
blank=True,
related_name="created_notes",
on_delete=models.PROTECT,
limit_choices_to={"is_staff": True},
)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
note = models.TextField(blank=False)
def __str__(self):
return "Note"
class UserSession(models.Model):
id = models.BigAutoField(primary_key=True)
created_at = models.DateTimeField(auto_now_add=True)
last_active_at = models.DateTimeField()
ip = models.GenericIPAddressField(null=True, blank=True)
session = models.OneToOneField(Session, on_delete=models.CASCADE, editable=False)
user = models.ForeignKey(User, related_name="sessions", on_delete=models.CASCADE)
user_agent = models.CharField(
max_length=255,
blank=True,
null=False,
)
@classmethod
def update_or_create_from_request(cls, request, user=None):
if not user:
user = request.user
return cls.objects.update_or_create(
session_id=request.session.session_key,
defaults={
'ip': bid_main.utils.get_client_ip(request),
'user': user,
'last_active_at': timezone.now(),
'user_agent': request.headers.get("User-Agent", "")[:255],
},
)
def __str__(self):
return f'UserSession pk={self.pk} for {self.user}'
@property
def device(self):
if self.user_agent:
return user_agents.parse(self.user_agent)
return None
def terminate(self):
"""Terminate a user session and its corresponding django_session.
Calling a delete on django_session is enough because of the cascading delete.
"""
self.session.delete()