Oleg Komarov
ce31207f36
## Motivation A user needs to know how their account is being accessed/used. At the very minimum, we need to display information about recent sign-ins and active sessions. This PR adds: - a new "Active Sessions" page that lists existing sessions linked to a user, with an option to terminate a particular session; - an email that is sent to a confirmed email address when a new login happens from a different IP address. ## Implementation Builtin django sessions are lacking some essential features: - it's impossible to efficiently list all sessions belonging to a user - there is not enough metadata: when and where a session was created This PR adds a cross table `bid_main_user_session` that links to both `django_session` and `bid_main_user` tables, and also stores info about sign-in timestamp, last activity timestamp, IP and User-Agent. A `UserSession` object is updated (or created, for new sessions and for active sessions existing before the rollout) on every authenticated request to update the `last_active_at` field. This is done in a new `user_session_middleware`. A further improvement (intentionally excluded from the PR): use geoip2 to display an IP-based location in the Active Sessions listing and the email. Reviewed-on: #93587 Reviewed-by: Anna Sirota <annasirota@noreply.localhost>
82 lines
3.0 KiB
Python
82 lines
3.0 KiB
Python
import logging
|
|
|
|
from django.contrib.auth.signals import user_logged_in
|
|
from django.core.signals import got_request_exception
|
|
from django.db.models import F
|
|
from django.db.models.signals import m2m_changed, post_delete
|
|
from django.dispatch import receiver
|
|
|
|
from . import models
|
|
import bid_main.utils as utils
|
|
import bid_main.file_utils
|
|
import bid_main.tasks
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@receiver(got_request_exception)
|
|
def log_exception(sender, **kwargs):
|
|
log.exception("uncaught exception occurred")
|
|
|
|
|
|
@receiver(user_logged_in)
|
|
def process_new_login(sender, request, user, **kwargs):
|
|
"""Updates user fields and creates a UserSession upon login. Sends and email if IP is new.
|
|
|
|
Only saves specific fields, so that the webhook trigger knows what changed.
|
|
"""
|
|
log.debug("User %s logged in, storing login information", user.email)
|
|
user.login_count = F("login_count") + 1
|
|
fields = {"login_count"}
|
|
|
|
user_session, _ = models.UserSession.update_or_create_from_request(request, user)
|
|
# Only move 'current' to 'last' login IP if the IP address is different.
|
|
request_ip = utils.get_client_ip(request)
|
|
if request_ip and user.current_login_ip != request_ip:
|
|
user.last_login_ip = F("current_login_ip")
|
|
user.current_login_ip = request_ip
|
|
fields.update({"last_login_ip", "current_login_ip"})
|
|
|
|
if user.has_confirmed_email:
|
|
bid_main.tasks.send_new_user_session_email(
|
|
user_pk=user.pk,
|
|
session_data={
|
|
'device': str(user_session.device or 'Unknown'),
|
|
'ip': user_session.ip,
|
|
},
|
|
)
|
|
user.save(update_fields=fields)
|
|
|
|
|
|
@receiver(m2m_changed)
|
|
def modified_user_role(sender, instance, action, reverse, model, **kwargs):
|
|
my_log = log.getChild("modified_user_role")
|
|
if not action.startswith("post_"):
|
|
my_log.debug("Ignoring m2m %r on %s - %s", action, type(instance), model)
|
|
return
|
|
if not isinstance(instance, models.User) or not issubclass(model, models.Role):
|
|
my_log.debug("Ignoring m2m %r on %s - %s", action, type(instance), model)
|
|
return
|
|
if not instance.id:
|
|
my_log.debug("Ignoring m2m %r on %s (no ID) - %s", action, type(instance), model)
|
|
return
|
|
|
|
# User's roles changed, so we have to update their public_roles_as_string.
|
|
new_roles = " ".join(sorted(instance.public_roles()))
|
|
if new_roles != instance.public_roles_as_string:
|
|
instance.public_roles_as_string = new_roles
|
|
my_log.debug(" saving user again for new roles %r", new_roles)
|
|
instance.save(update_fields=["public_roles_as_string"])
|
|
else:
|
|
my_log.debug(" new roles are old roles: %r", new_roles)
|
|
|
|
|
|
@receiver(post_delete, sender=models.User)
|
|
def delete_orphaned_avatar_files(sender, instance, **kwargs):
|
|
"""Delete avatar files from storage when User record is deleted."""
|
|
if not instance.avatar:
|
|
log.info('User pk=%d has no avatar, nothing to clean up', instance.pk)
|
|
return
|
|
|
|
bid_main.file_utils.delete_avatar_files(instance.avatar.name)
|