extensions-website/notifications/signals.py
Oleg-Komarov cf1c0315e9 Upload new version: generate approval activity item (#95)
See #89: for new version uploads of a listed (previously approved) extension add a comment on approval queue.

Don't merge before #94 - there the comment form is refactored to avoid listing all possible activity types.

Reviewed-on: #95
Reviewed-by: Anna Sirota <railla@noreply.localhost>
2024-04-30 10:49:03 +02:00

59 lines
1.7 KiB
Python

import logging
from actstream.models import Action, Follow
from django.contrib.auth import get_user_model
from django.db.models.signals import post_save
from django.dispatch import receiver
from constants.activity import Flag, Verb
from notifications.models import Notification
logger = logging.getLogger(__name__)
VERB2FLAGS = {
Verb.APPROVED: [Flag.AUTHOR, Flag.REVIEWER],
Verb.COMMENTED: [Flag.AUTHOR, Flag.REVIEWER],
Verb.RATED_EXTENSION: [Flag.AUTHOR],
Verb.REPORTED_EXTENSION: [Flag.MODERATOR],
Verb.REPORTED_RATING: [Flag.MODERATOR],
Verb.REQUESTED_CHANGES: [Flag.AUTHOR, Flag.REVIEWER],
Verb.REQUESTED_REVIEW: [Flag.MODERATOR, Flag.REVIEWER],
Verb.UPLOADED_NEW_VERSION: [],
}
@receiver(post_save, sender=Action)
def _create_notifications(
sender: object,
instance: Action,
created: bool,
raw: bool,
**kwargs: object,
) -> None:
if raw:
return
if not created:
return
if not instance.target:
logger.warning(f'ignoring an unexpected Action without a target, verb={instance.verb}')
return
notifications = []
flags = VERB2FLAGS.get(instance.verb, None)
if flags is None:
logger.warning(f'no follower flags for verb={instance.verb}, nobody will be notified')
return
followers = Follow.objects.for_object(instance.target).filter(flag__in=flags)
user_ids = followers.values_list('user', flat=True)
followers = get_user_model().objects.filter(id__in=user_ids)
for recipient in followers:
if recipient == instance.actor:
continue
notifications.append(Notification(recipient=recipient, action=instance))
if len(notifications) > 0:
Notification.objects.bulk_create(notifications)