649 lines
22 KiB
Python
649 lines
22 KiB
Python
from typing import Optional, Set
|
|
import itertools
|
|
import logging
|
|
import os.path
|
|
import re
|
|
|
|
from django import urls
|
|
from django.conf import settings
|
|
from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
|
|
from django.contrib.auth.models import PermissionsMixin
|
|
from django.core import validators
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.mail import send_mail
|
|
from django.db import models, transaction
|
|
from django.db.models import Q
|
|
from django.templatetags.static import static
|
|
from django.utils import timezone
|
|
from django.utils.deconstruct import deconstructible
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
import oauth2_provider.models as oa2_models
|
|
from . import fields
|
|
from . import hashers
|
|
import bid_main.file_utils
|
|
|
|
log = logging.getLogger(__name__)
|
|
nickname_illegal_chars = re.compile(r"[^\w.+-]")
|
|
strip_email = re.compile("@.*$")
|
|
|
|
|
|
sizes = {
|
|
"s": 64,
|
|
"m": 128,
|
|
"l": 256,
|
|
}
|
|
|
|
|
|
def random_nums():
|
|
"""Increasingly larger random number generator."""
|
|
import random
|
|
|
|
lower, upper = 1, 5
|
|
while True:
|
|
yield random.randint(lower, upper - 1)
|
|
lower, upper = upper, upper * 3
|
|
|
|
|
|
class RoleManager(models.Manager):
|
|
def badges(self) -> models.QuerySet:
|
|
"""Query for those roles that are considered badges."""
|
|
return (
|
|
self.filter(is_public=True, is_active=True, is_badge=True)
|
|
.exclude(badge_img__isnull=True)
|
|
.exclude(badge_img="")
|
|
)
|
|
|
|
|
|
class Role(models.Model):
|
|
name = models.CharField(max_length=80)
|
|
description = models.CharField(
|
|
max_length=255,
|
|
blank=True,
|
|
null=False,
|
|
help_text="Note that this is shown for badges on users' dashboard page.",
|
|
)
|
|
is_active = models.BooleanField(default=True, null=False)
|
|
is_badge = models.BooleanField(
|
|
default=False,
|
|
null=False,
|
|
help_text="Note that a roles is only actually used as a badge when this checkbox "
|
|
"is enabled <strong>and</strong> it has a badge image.",
|
|
)
|
|
is_public = models.BooleanField(
|
|
default=True,
|
|
null=False,
|
|
help_text="When enabled, this role/badge will be readable through the userinfo API.",
|
|
)
|
|
prevent_user_deletion = models.BooleanField(
|
|
default=False,
|
|
null=False,
|
|
help_text="When enabled, user accounts with this role/badge cannot be deleted.",
|
|
)
|
|
|
|
may_manage_roles = models.ManyToManyField(
|
|
"Role",
|
|
related_name="managers",
|
|
blank=True,
|
|
help_text="Users with this role will be able to grant or revoke these roles to "
|
|
"any other user.",
|
|
)
|
|
|
|
# For Badges:
|
|
label = models.CharField(
|
|
max_length=255,
|
|
blank=True,
|
|
null=False,
|
|
help_text="Human-readable name for a badge. Required for badges, not for roles.",
|
|
)
|
|
badge_img = models.ImageField(
|
|
verbose_name="Badge image",
|
|
help_text="Visual representation of a badge.",
|
|
upload_to="badges",
|
|
height_field="badge_img_height",
|
|
width_field="badge_img_width",
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
badge_img_height = models.IntegerField(null=True, blank=True)
|
|
badge_img_width = models.IntegerField(null=True, blank=True)
|
|
link = models.URLField(
|
|
null=True,
|
|
blank=True,
|
|
help_text="Clicking on a badge image will lead to this link.",
|
|
)
|
|
|
|
objects = RoleManager()
|
|
|
|
class Meta:
|
|
ordering = ["-is_active", "name"]
|
|
|
|
def __str__(self):
|
|
if self.is_active:
|
|
return self.name
|
|
return "%s [inactive]" % self.name
|
|
|
|
@property
|
|
def admin_url(self) -> str:
|
|
view_name = f"admin:{self._meta.app_label}_{self._meta.model_name}_change"
|
|
return urls.reverse(view_name, args=(self.id,))
|
|
|
|
def clean(self):
|
|
# Labels are required for badges.
|
|
if self.is_badge and not self.label:
|
|
raise ValidationError({"label": _("Badges must have a label.")})
|
|
|
|
@property
|
|
def _thumbnail_size_list(self) -> list:
|
|
return [(px, px) for px in sizes.values()]
|
|
|
|
@property
|
|
def thumbnails(self) -> dict:
|
|
"""Return a dictionary with relative URLs of a set of predefined thumbnails."""
|
|
base_url, ext = os.path.splitext(self.badge_img.url)
|
|
return {
|
|
size_string: {
|
|
'url': f'{base_url}_{size_in_px}x{size_in_px}.png',
|
|
'width': size_in_px,
|
|
'height': size_in_px,
|
|
}
|
|
for size_string, size_in_px in sizes.items()
|
|
}
|
|
|
|
|
|
class UserManager(BaseUserManager):
|
|
"""UserManager that doesn't use a username, but an email instead."""
|
|
|
|
use_in_migrations = True
|
|
|
|
def _create_user(self, email, password, **extra_fields):
|
|
"""Creates and saves a User with the given email and password."""
|
|
if not email:
|
|
raise ValueError("The given email must be set")
|
|
email = self.normalize_email(email)
|
|
user = self.model(email=email, **extra_fields)
|
|
user.set_password(password)
|
|
user.save(using=self._db)
|
|
return user
|
|
|
|
def create_user(self, email, password=None, **extra_fields):
|
|
extra_fields.setdefault("is_staff", False)
|
|
extra_fields.setdefault("is_superuser", False)
|
|
return self._create_user(email, password, **extra_fields)
|
|
|
|
def create_superuser(self, email, password, **extra_fields):
|
|
extra_fields.setdefault("is_staff", True)
|
|
extra_fields.setdefault("is_superuser", True)
|
|
|
|
if extra_fields.get("is_staff") is not True:
|
|
raise ValueError("Superuser must have is_staff=True.")
|
|
if extra_fields.get("is_superuser") is not True:
|
|
raise ValueError("Superuser must have is_superuser=True.")
|
|
|
|
return self._create_user(email, password, **extra_fields)
|
|
|
|
def get_by_natural_key(self, username):
|
|
"""Look up case-insensitively otherwise people can't login with their mixed-case emails."""
|
|
return self.get(**{f'{self.model.USERNAME_FIELD}__iexact': username.lower()})
|
|
|
|
|
|
@deconstructible
|
|
class UnicodeNicknameValidator(validators.RegexValidator):
|
|
regex = r"^[\w.+-]+$"
|
|
message = _(
|
|
"Enter a valid nickname. This value may contain only letters, "
|
|
"numbers, and ./+/-/_ characters."
|
|
)
|
|
|
|
|
|
def avatar_upload_to(instance, filename):
|
|
"""Generate path for storing an avatar image based on its hash."""
|
|
file_field = instance.avatar
|
|
return bid_main.file_utils.get_upload_path('avatar', file_field)
|
|
|
|
|
|
class User(AbstractBaseUser, PermissionsMixin):
|
|
"""Implementing a fully featured User model with admin-compliant permissions.
|
|
|
|
Email and password are required. Other fields are optional.
|
|
"""
|
|
|
|
email = models.EmailField(
|
|
_("email address"),
|
|
max_length=64,
|
|
unique=True,
|
|
help_text=_("Required. 64 characters or fewer."),
|
|
error_messages={
|
|
"unique": _("A user with that email address already exists."),
|
|
},
|
|
db_index=True,
|
|
)
|
|
full_name = models.CharField(_("full name"), max_length=80, blank=True, db_index=True)
|
|
|
|
# Named 'nickname' and not 'username', because the latter already is a
|
|
# 'thing' in Django. Not having one, and identifying users by their email
|
|
# address, was our own customisation.
|
|
# Bringing back a field 'username' with different semantics would be
|
|
# confusing. For example, there still is a field 'username' in the default
|
|
# Django auth form that is used for 'the thing used to log in', which is
|
|
# the email address in our case.
|
|
nickname = models.CharField(
|
|
"nickname",
|
|
max_length=80,
|
|
unique=True,
|
|
help_text=_(
|
|
"A short (one-word) name that can be used to address you. "
|
|
"80 characters or fewer. Letters, digits, and ./+/-/_ only."
|
|
),
|
|
validators=[UnicodeNicknameValidator()],
|
|
error_messages={
|
|
"unique": _("That name is already used by someone else."),
|
|
},
|
|
)
|
|
|
|
avatar = models.ImageField(
|
|
upload_to=avatar_upload_to,
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
roles = models.ManyToManyField(Role, related_name="users", blank=True)
|
|
public_roles_as_string = models.CharField(
|
|
"Public roles as string",
|
|
max_length=255,
|
|
blank=True,
|
|
default="",
|
|
help_text=_("String representation of public roles, for comparison in webhooks"),
|
|
)
|
|
private_badges = models.ManyToManyField(
|
|
Role,
|
|
blank=True,
|
|
limit_choices_to={"is_badge": True},
|
|
help_text='Badges marked as "private" by the user. Can contain badges '
|
|
"they do not have at the moment; this is fine.",
|
|
)
|
|
|
|
confirmed_email_at = models.DateTimeField(
|
|
null=True,
|
|
blank=True,
|
|
help_text=_(
|
|
"Designates the date & time at which the user confirmed their email address. "
|
|
"None if not yet confirmed."
|
|
),
|
|
)
|
|
last_login_ip = models.GenericIPAddressField(
|
|
null=True,
|
|
blank=True,
|
|
help_text=_("IP address (IPv4 or IPv6) used for previous login, if any."),
|
|
)
|
|
current_login_ip = models.GenericIPAddressField(
|
|
null=True,
|
|
blank=True,
|
|
help_text=_("IP address (IPv4 or IPv6) used for current login, if any."),
|
|
)
|
|
login_count = models.PositiveIntegerField(
|
|
default=0, blank=True, help_text=_("Number of times this user logged in.")
|
|
)
|
|
|
|
is_staff = models.BooleanField(
|
|
_("staff status"),
|
|
default=False,
|
|
help_text=_("Designates whether the user can log into this admin site."),
|
|
)
|
|
is_active = models.BooleanField(
|
|
_("active"),
|
|
default=True,
|
|
help_text=_(
|
|
"Designates whether this user should be treated as active. "
|
|
"Unselect this instead of deleting accounts."
|
|
),
|
|
)
|
|
date_deletion_requested = models.DateTimeField(
|
|
null=True,
|
|
blank=True,
|
|
help_text=_(
|
|
"Indicates when deletion of this account was requested, if ever."
|
|
" Once set, it should not be changed. Can be set on the users list."
|
|
),
|
|
)
|
|
|
|
date_joined = models.DateTimeField(_("date joined"), default=timezone.now)
|
|
last_update = models.DateTimeField(_("last update"), default=timezone.now)
|
|
privacy_policy_agreed = models.DateTimeField(
|
|
_("privacy policy agreed"),
|
|
null=True,
|
|
blank=True,
|
|
help_text=_("Date when this user agreed to our privacy policy."),
|
|
)
|
|
|
|
email_change_preconfirm = models.EmailField(
|
|
_("email address to change to"),
|
|
blank=True,
|
|
max_length=64,
|
|
help_text=_("New address for the user, set while in the confirmation flow."),
|
|
)
|
|
|
|
objects = UserManager()
|
|
|
|
USERNAME_FIELD = "email"
|
|
REQUIRED_FIELDS = []
|
|
|
|
class Meta:
|
|
verbose_name = _("user")
|
|
verbose_name_plural = _("users")
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<User id={self.id} email={self.email!r}>"
|
|
|
|
def save(self, *args, **kwargs):
|
|
self.last_update = timezone.now()
|
|
updated_fields = {"last_update"}
|
|
|
|
if kwargs.get("update_fields") is not None:
|
|
kwargs["update_fields"] = set(kwargs["update_fields"]).union(updated_fields)
|
|
|
|
return super().save(*args, **kwargs)
|
|
|
|
def public_roles(self) -> set:
|
|
"""Returns public role names.
|
|
|
|
Used in the bid_api.signals module to detect role changes without
|
|
using more lookups in the database.
|
|
"""
|
|
return {role.name for role in self.roles.filter(is_public=True, is_active=True)}
|
|
|
|
def public_badges(self) -> models.query.QuerySet:
|
|
"""Returns a QuerySet of public badges.
|
|
|
|
Public badges are those badges that are marked as public in the database
|
|
and not marked private by the user.
|
|
"""
|
|
private_badge_ids = {role.id for role in self.private_badges.all()}
|
|
return self.all_badges().exclude(id__in=private_badge_ids)
|
|
|
|
def all_badges(self) -> models.query.QuerySet:
|
|
"""Returns a QuerySet of all badges.
|
|
|
|
Returned are those badges that are marked as public in the database,
|
|
regardless of which ones the user marked as private.
|
|
"""
|
|
return self.roles.badges()
|
|
|
|
def get_full_name(self):
|
|
"""Returns the full name."""
|
|
return self.full_name.strip()
|
|
|
|
def get_short_name(self):
|
|
"""Returns the short name for the user."""
|
|
parts = self.full_name.split(" ", 1)
|
|
return parts[0]
|
|
|
|
def email_user(self, subject, message, from_email=None, **kwargs):
|
|
"""Sends an email to this User."""
|
|
send_mail(subject, message, from_email, [self.email], **kwargs)
|
|
|
|
@property
|
|
def has_confirmed_email(self):
|
|
return self.confirmed_email_at is not None
|
|
|
|
@property
|
|
def email_to_confirm(self):
|
|
"""The email to confirm, either 'email_change_preconfirm' when set, or 'email'."""
|
|
return self.email_change_preconfirm or self.email
|
|
|
|
@property
|
|
def role_names(self) -> Set[str]:
|
|
return {role.name for role in self.roles.all()}
|
|
|
|
@property
|
|
def must_pp_agree(self) -> bool:
|
|
"""Return True when user needs to agree to new privacy policy."""
|
|
return self.privacy_policy_agreed is None or self.privacy_policy_agreed < settings.PPDATE
|
|
|
|
def revoke_all_tokens(self, date=None):
|
|
"""Revoke all access and refresh tokens belonging to this user."""
|
|
if not date:
|
|
date = timezone.now()
|
|
self.bid_main_oauth2accesstoken.filter(
|
|
Q(expires__isnull=True) | Q(expires__gt=date)
|
|
).update(expires=date)
|
|
self.bid_main_oauth2refreshtoken.filter(
|
|
Q(revoked__isnull=True) | Q(revoked__gt=date)
|
|
).update(revoked=date)
|
|
|
|
@property
|
|
def can_be_deleted(self) -> bool:
|
|
"""Determine if deletion of this account can be requested or not."""
|
|
if self.is_staff or self.is_superuser:
|
|
return False
|
|
|
|
if self.bid_main_oauth2application.count():
|
|
return False
|
|
|
|
if any(role.prevent_user_deletion for role in self.roles.all()):
|
|
return False
|
|
return True
|
|
|
|
def request_deletion(self):
|
|
"""Store date of the deletion request and deactivate the user."""
|
|
_log = log.getChild('delete')
|
|
if not self.can_be_deleted:
|
|
_log.error(
|
|
'Ignoring deletion request for pk=%s which cannot be deleted',
|
|
self.pk,
|
|
)
|
|
return
|
|
|
|
if not self.date_deletion_requested:
|
|
self.date_deletion_requested = timezone.now()
|
|
else:
|
|
# This should never happen, but if it does, keep the original date
|
|
log.error(
|
|
"Received a deletion request for pk=%s who already requested it at %s",
|
|
self.pk,
|
|
self.date_deletion_requested,
|
|
)
|
|
self.revoke_all_tokens(date=self.date_deletion_requested)
|
|
self.is_active = False
|
|
self.save(update_fields=["date_deletion_requested", "is_active"])
|
|
|
|
@transaction.atomic
|
|
def anonymize(self, *args, using=None, **kwargs) -> None:
|
|
"""Anonymize user data and delete user tokens without triggering webhooks.
|
|
|
|
Does nothing if deletion hasn't been explicitly requested earlier.
|
|
"""
|
|
_log = log.getChild('delete')
|
|
if not self.can_be_deleted:
|
|
_log.error(
|
|
'User.anonymize called, but pk=%s cannot be deleted',
|
|
self.pk,
|
|
)
|
|
return
|
|
|
|
if not self.date_deletion_requested:
|
|
_log.error(
|
|
"User.anonymize called, but deletion of pk=%s hasn't been requested",
|
|
self.pk,
|
|
)
|
|
return
|
|
|
|
if self.avatar:
|
|
try:
|
|
# No save to avoid triggering a signal that sends out webhook calls.
|
|
self.avatar.delete(save=False)
|
|
except Exception:
|
|
_log.exception(
|
|
'Unable to delete avatar %s for pk=%s',
|
|
self.avatar.name,
|
|
self.pk,
|
|
)
|
|
|
|
shortuid = hashers.shortuid()
|
|
# Use an update to avoid triggering a signal that sends out webhook calls.
|
|
# All apps that handle deletion should have already handled it at `request_deletion`.
|
|
nickname = f'del{shortuid}'
|
|
self.__class__.objects.filter(pk=self.pk).update(
|
|
email=f'{nickname}@example.com',
|
|
full_name='<deleted>',
|
|
nickname=nickname,
|
|
last_login_ip=None,
|
|
current_login_ip=None,
|
|
is_active=False,
|
|
avatar=None,
|
|
)
|
|
|
|
@classmethod
|
|
def generate_nickname(cls, email: Optional[str] = '', full_name: Optional[str] = '') -> str:
|
|
"""Return a unique nickname based on given full name or email."""
|
|
full_name = nickname_illegal_chars.sub("", full_name.replace(" ", "-"))
|
|
email = nickname_illegal_chars.sub("", strip_email.sub("", email))
|
|
base = full_name or email
|
|
log.debug("Generating unique nickname for base %r", base)
|
|
|
|
def acceptable_nickname(name: str) -> bool:
|
|
"""Return True if the nickname is unique."""
|
|
return not cls.objects.filter(nickname=name).exists()
|
|
|
|
if acceptable_nickname(base):
|
|
return base
|
|
|
|
# Try increasingly larger random numbers as a suffix.
|
|
for num in itertools.islice(random_nums(), 1000):
|
|
nickname = f"{base}-{num}"
|
|
if acceptable_nickname(nickname):
|
|
return nickname
|
|
|
|
raise ValueError(f"Unable to find unique name for base {base!r} after trying 1000 names")
|
|
|
|
@property
|
|
def avatar_thumbnail_paths(self) -> dict:
|
|
"""Return a dictionary of predefined thumbnails sizes and their storage paths."""
|
|
field = self.avatar
|
|
if not field:
|
|
return {}
|
|
return bid_main.file_utils.get_avatar_thumbnails_paths(field.name)
|
|
|
|
def avatar_thumbnail_url(self) -> str:
|
|
"""Return the absolute URL of the thumbnailed avatar.
|
|
|
|
Returns either the URL of the generated thumbnail, or the URL
|
|
of the default avatar (if this one is empty).
|
|
"""
|
|
thumbnail_paths = self.avatar_thumbnail_paths
|
|
size = settings.AVATAR_DEFAULT_SIZE_PIXELS
|
|
default_thumbnail_path = thumbnail_paths.get(size)
|
|
if not self or not default_thumbnail_path:
|
|
return bid_main.file_utils.get_absolute_url(static(settings.AVATAR_DEFAULT_FILENAME))
|
|
return bid_main.file_utils.get_absolute_url(self.avatar.storage.url(default_thumbnail_path))
|
|
|
|
|
|
class SettingValueField(models.CharField):
|
|
def __init__(self, *args, **kwargs): # noqa: D107
|
|
kwargs["max_length"] = 128
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
SETTING_DATA_TYPE_CHOICES = [
|
|
("bool", "Boolean"),
|
|
]
|
|
|
|
|
|
class Setting(models.Model):
|
|
name = models.CharField(max_length=128)
|
|
description = models.CharField(max_length=128)
|
|
data_type = models.CharField(max_length=32, choices=SETTING_DATA_TYPE_CHOICES, default="bool")
|
|
default = SettingValueField()
|
|
users = models.ManyToManyField(settings.AUTH_USER_MODEL, through="UserSetting")
|
|
|
|
def __str__(self):
|
|
return self.name.replace("_", " ").title()
|
|
|
|
|
|
class UserSetting(models.Model):
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
|
|
setting = models.ForeignKey(Setting, on_delete=models.CASCADE)
|
|
unconstrained_value = SettingValueField()
|
|
|
|
def __str__(self):
|
|
return "Setting %r of %r" % (self.setting.name, self.user.email)
|
|
|
|
|
|
class OAuth2AccessToken(oa2_models.AbstractAccessToken):
|
|
class Meta:
|
|
verbose_name = "OAuth2 access token"
|
|
|
|
host_label = models.CharField(max_length=255, unique=False, blank=True)
|
|
subclient = models.CharField(max_length=255, unique=False, blank=True)
|
|
|
|
|
|
class OAuth2RefreshToken(oa2_models.AbstractRefreshToken):
|
|
class Meta:
|
|
verbose_name = "OAuth2 refresh token"
|
|
|
|
|
|
class OAuth2Application(oa2_models.AbstractApplication):
|
|
class Meta:
|
|
verbose_name = "OAuth2 application"
|
|
|
|
url = models.URLField(
|
|
unique=False, blank=True, help_text="URL displayed to users at 'Applications' page."
|
|
)
|
|
icon = models.ImageField(
|
|
blank=True,
|
|
upload_to="app-icons",
|
|
help_text="Icon displayed to users at 'Applications' page.",
|
|
)
|
|
logo = fields.SVGAndImageField(
|
|
blank=True,
|
|
upload_to="app-logos",
|
|
help_text="Logo displayed to users at the OAuth login page.",
|
|
)
|
|
background = models.ImageField(
|
|
blank=True,
|
|
upload_to="app-backgrounds",
|
|
help_text="Background displayed to users at the OAuth login page.",
|
|
)
|
|
summary = models.CharField(
|
|
blank=True,
|
|
max_length=128,
|
|
help_text="Short text displayed to users under the logo at the OAuth login page.",
|
|
)
|
|
notes = models.TextField(
|
|
blank=True,
|
|
help_text="Information about this application, for staff only, not to present on frontend.",
|
|
)
|
|
|
|
notes_public = models.TextField(
|
|
blank=True, help_text="Message to be displayed to the users logging in. HTML is allowed."
|
|
)
|
|
|
|
@property
|
|
def has_custom_background(self):
|
|
"""Whether or not this app has everything set for a custom background."""
|
|
return self.background and self.summary
|
|
|
|
|
|
class UserNote(models.Model):
|
|
"""CRM-like note added to a user."""
|
|
|
|
class Meta:
|
|
verbose_name = "Note"
|
|
ordering = ("-created",)
|
|
|
|
user = models.ForeignKey(User, related_name="notes", on_delete=models.CASCADE)
|
|
creator = models.ForeignKey(
|
|
User,
|
|
null=True,
|
|
blank=True,
|
|
related_name="created_notes",
|
|
on_delete=models.PROTECT,
|
|
limit_choices_to={"is_staff": True},
|
|
)
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
updated = models.DateTimeField(auto_now=True)
|
|
note = models.TextField(blank=False)
|
|
|
|
def __str__(self):
|
|
return "Note"
|