conference-website/tickets/admin.py

547 lines
20 KiB
Python

from typing import Callable, Optional, Type
import csv
import datetime
import re
from django import forms
from django.contrib import admin, messages
from django.core.exceptions import PermissionDenied
from django.db.models import Count, F
from django.http import HttpResponse
from django.urls import path
from django.urls import reverse
from django.utils.html import format_html
from django.utils.safestring import mark_safe
import dateutil.parser
import django.db.models
from conference_main.util import absolutify
from tickets.saleor_client import orders_as_list as saleor_orders_as_list
from tickets.templatetags.tickets import format_saleor_money, format_cents
import tickets.models
import tickets.tasks as tasks
import tickets.stripe_utils
LinkFunc = Callable[[django.db.models.Model], str]
def _get_admin_url_name(model: Type['django.db.models.Model'], action: str = 'change') -> str:
return 'admin:{}_{}_{}'.format(model._meta.app_label, model._meta.model_name, action)
def create_admin_fk_link(
field_name: str, short_description: str, view_name: str, title_func=str
) -> LinkFunc:
"""Construct a function that constructs a link to the admin:xxxx_change form.
:param field_name: The object is taken from model_instance.{field_name}
:param short_description: The label for this link as shown in the admin.
:param view_name: The admin view to link to. Must take one parameter `object_id`.
:param title_func: A callable that takes model_instance and returns a string used a link title.
For example, to create a link to a customer (instead of a drop-down to edit it),
use:
`plan_link = create_admin_fk_link('plan', 'plan', 'admin:looper_plan_change')`
"""
def create_link(model_instance: django.db.models.Model) -> str:
referenced: Optional[object] = getattr(model_instance, field_name)
if referenced is None:
return '-'
assert isinstance(
referenced, django.db.models.Model
), f'Expected Model, not {type(referenced)}'
admin_link = reverse(view_name, kwargs={'object_id': referenced.pk})
link_title = title_func(referenced)
# Use primary key as link title if got a blank string:
if not link_title:
link_title = str(referenced.pk)
return format_html('<a href="{}">{}</a>', admin_link, link_title)
# Callable[[Model], str] doesn't have those Django-specific attributes,
# and I don't feel like speccing that all out. ~~Sybren
create_link.admin_order_field = field_name # type: ignore
create_link.short_description = short_description # type: ignore
return create_link
ticket_link = create_admin_fk_link('ticket', 'ticket', _get_admin_url_name(tickets.models.Ticket))
def _get_order_link(ticket: tickets.models.Ticket):
url = ticket.get_admin_order_url()
if not url:
return ''
return mark_safe(
f'<a href="{url}" target="_blank" title="View order in the payment system">View</a>'
)
def _get_invoice_link(ticket: tickets.models.Ticket):
invoice_url = ticket.invoice_download_url
if not invoice_url:
return ''
return mark_safe(f'<a href="{invoice_url}" target="_blank" title="View invoice PDF">View</a>')
def claim_url(obj):
"""Display ticket's claim URL."""
if not obj.claim_url:
return 'Not applicable'
return absolutify(obj.claim_url)
class AttendeesInline(admin.TabularInline):
model = tickets.models.Ticket.attendees.through
readonly_fields = ['full_name', 'email', 'confirmation_sent_at', 'general_info_sent_at']
raw_id_fields = ['user']
extra = 0
class NumberOfUnclaimedTicketsFilter(admin.SimpleListFilter):
title = 'Unclaimed Tickets'
parameter_name = 'unclaimed_count'
def lookups(self, request, model_admin):
"""Human-readable labels for filter choices."""
queryset = model_admin.get_queryset(request)
unique_counts = set(
queryset.annotate(
unclaimed_count=F('quantity') - Count('attendees', distinct=True),
).values_list('unclaimed_count', flat=True)
)
return [('some', 'Some unclaimed left')] + [
(count, f'{count} unclaimed left' if count > 0 else 'No unclaimed left 👍')
for count in unique_counts
]
def queryset(self, request, queryset):
"""Return the filtered queryset based on the value provided in the query string."""
value = self.value()
if value == 'some':
return queryset.filter(unclaimed_count__gt=0)
if value is not None:
return queryset.filter(unclaimed_count=self.value())
class TicketAdminForm(forms.ModelForm):
class Meta:
model = tickets.models.Ticket
help_texts = {
'claim_url': (
"Link that attendees can use to claim a ticket, if quantity is more than 1."
)
}
labels = {'claim_url': 'Claim URL'}
exclude = ()
class _CSVReportMixin:
def generate_report_filename(self, request, changelist, export_all):
"""Generate a file name using the model and current filters."""
opts = self.model._meta
model_name = (
opts.verbose_name_plural.replace('.', '_').replace(' ', '_').replace('looper_', '')
)
site = 'blender_conference_'
# E.g.: ?is_free__exact=1&is_paid__exact=0&quantity=1
def _format_filter(param, value):
if param.endswith('__quarter'):
return f'Q{value}'
elif param.endswith('__exact'):
param_name = param.replace('__exact', '')
if value == '1':
return f'{param_name}_yes'
elif value == '0':
return f'{param_name}_no'
return f'{param_name}_{value}'
# Strip hours from datetimes
return re.sub(r'\s00:00:00+.*$', '', value)
filters = '_'.join(
[
_format_filter(*_)
for _ in sorted(request.GET.items(), key=lambda x: x[0], reverse=True)
]
)
if export_all:
filters = filters + '_all'
else:
filters = filters + f'_page_{(changelist.page_num):03}'
# Prefix filters by another "_"
filters = '_' + filters if filters else filters
filename = f'{site}_{model_name}{filters}.csv'.replace(self.date_hierarchy or '', '')
return re.sub(r'[\]\[]+', '', re.sub(r'_+', '_', filename))
def get_urls(self):
"""Return URLs of additional admin views, such as CSV export view."""
urls = super().get_urls()
model_name = f'{self.model._meta.app_label}_{self.model._meta.model_name}'
report_urls = [
path(
'download-report-csv/',
self.report_as_csv_view,
name=f'{model_name}_download_report_as_csv',
),
path(
'download-report-all-csv/',
self.report_all_as_csv_view,
name=f'{model_name}_download_report_all_as_csv',
),
]
return report_urls + urls
def report_as_csv_view(self, request):
"""Export a changelist (not its original queryset!) as a CSV."""
cl = self.get_changelist_instance(request)
return self.generate_csv_report(request, cl, export_all=False)
def report_all_as_csv_view(self, request):
"""Export the filtered queryset as a CSV."""
cl = self.get_changelist_instance(request)
return self.generate_csv_report(request, cl, export_all=True)
@admin.register(tickets.models.Ticket)
class TicketAdmin(_CSVReportMixin, admin.ModelAdmin):
show_full_result_count = True
inlines = [AttendeesInline]
form = TicketAdminForm
raw_id_fields = ['user']
list_display = (
'id',
'edition',
'quantity',
'unclaimed',
'user',
'sku',
'order_number',
'is_paid',
'is_free',
'view_order',
'view_invoice',
'refund_status',
)
list_filter = (
'edition',
'is_free',
'is_paid',
'quantity',
NumberOfUnclaimedTicketsFilter,
'sku',
)
readonly_fields = (
claim_url,
'order_token',
'order_id',
'view_order',
'refund_status',
'created_at',
'updated_at',
'checkout_data',
)
search_fields = (
'token',
'order_token',
'order_id',
'order_number',
'user__email',
'user__username',
'attendees__email',
'user__profile__company',
'user__profile__full_name',
'user__profile__title',
)
date_hierarchy = 'created_at'
actions = ['send_mail_unclaimed_reminder', 'send_mail_unpaid_reminder']
def get_fieldsets(self, *args, **kwargs):
"""Hide checkout data into a collapsible section."""
fieldsets = super().get_fieldsets(*args, **kwargs)
collapsible = {'checkout_data'}
all_fields = fieldsets[0][1]['fields']
fieldsets[0][1]['fields'] = [_ for _ in all_fields if _ not in collapsible]
collapsible_fields = [_ for _ in all_fields if _ in collapsible]
fieldsets.append(('More info', {'fields': collapsible_fields, 'classes': ['collapse']}))
return fieldsets
def view_order(self, obj):
"""Link to this ticket's order in Saleor Dashboard."""
return _get_order_link(obj)
def view_invoice(self, obj):
"""Link to this ticket's invoice PDF."""
return _get_invoice_link(obj)
def refund_status(self, obj) -> str:
"""Return refund status."""
return obj.refund_status
def generate_csv_report(self, request, changelist, export_all=False):
"""CSV export a report for ticket orders."""
if not request.user.is_staff:
raise PermissionDenied
results_to_export = changelist.queryset if export_all else changelist.result_list
field_names = self.list_display
if 'action_checkbox' in field_names:
field_names.remove('action_checkbox')
filename = self.generate_report_filename(request, changelist, export_all=export_all)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename={filename}'
writer = csv.writer(response)
headers = [
'Order #',
'Date created',
'Name',
'Quantity',
'Country',
'Gross total',
'Refund',
'VATIN',
'VAT 21%',
'VAT 9%',
]
writer.writerow(headers)
order_id_to_ticket = {_.order_id: _ for _ in results_to_export if _.order_id}
saleor_order_ids, stripe_order_ids = set(), set()
# The ones with `order_id` like `pi_*` definitely came from Stripe, other must be Saleor
for order_id in order_id_to_ticket:
if order_id.startswith('pi_'):
stripe_order_ids.add(order_id)
else:
saleor_order_ids.add(order_id)
if saleor_order_ids:
saleor_order_data = saleor_orders_as_list(ids=saleor_order_ids)
for row in saleor_order_data:
vatin = next(
(_['value'] for _ in row['metadata'] if _['key'] == 'vatrc.vatin_validated'),
None,
)
values = [
row['number'],
dateutil.parser.parse(row['created']).date(),
row['lines'][0]['variant']['name'],
row['lines'][0]['quantity'],
row['billingAddress']['country']['code'],
format_saleor_money(row['total']['gross']),
vatin,
]
writer.writerow(values)
if stripe_order_ids:
for order_id in stripe_order_ids:
ticket = order_id_to_ticket[order_id]
session = ticket.order
currency = session.currency.upper()
latest_charge = session.payment_intent.latest_charge
paid_at_timestamp = latest_charge.created
paid_at = datetime.datetime.fromtimestamp(int(paid_at_timestamp))
vatin = next((_.value for _ in session.customer_details.tax_ids), None)
values = [
ticket.order_number,
paid_at.date(),
session.line_items[0].price.product.name,
ticket.quantity,
latest_charge.billing_details.address.country,
format_cents(session.amount_total, currency),
format_cents(latest_charge.amount_refunded, currency),
vatin,
# FIXME: here we expect that order of the metadata didn't change
*[format_cents(tax['amount'], session.currency) for tax in session.tax_details],
]
writer.writerow(values)
# TODO: refunded status (partial/full)
return response
def get_queryset(self, *args, **kwargs):
"""Count unclaimed tickets for custom filtering."""
queryset = super().get_queryset(*args, **kwargs)
queryset = queryset.annotate(
unclaimed_count=F('quantity') - Count('attendees', distinct=True),
)
return queryset
def send_mail_unclaimed_reminder(self, request, queryset):
"""Queue tasks for sending a reminder email about unclaimed tickets."""
count = 0
for ticket in queryset:
tasks.send_mail_tickets_paid(ticket_id=ticket.id, email_name='unclaimed_reminder')
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
def send_mail_unpaid_reminder(self, request, queryset):
"""Queue tasks for sending a reminder email about unpaid tickets."""
count = 0
for ticket in queryset:
tasks.send_mail_bank_transfer_required(
ticket_id=ticket.id, email_name='unpaid_reminder'
)
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
@admin.register(tickets.models.TicketClaim)
class TicketClaimAdmin(_CSVReportMixin, admin.ModelAdmin):
date_hierarchy = 'created_at'
show_full_result_count = True
raw_id_fields = ('user', 'ticket')
readonly_fields = ['confirmation_sent_at', 'general_info_sent_at']
list_select_related = ('user', 'user__profile', 'ticket')
ordering = ['user__profile__full_name']
list_display = ['created_at', 'full_name', 'title', 'company', 'country', 'email', ticket_link]
list_filter = (
'ticket__edition',
'ticket__is_free',
'ticket__is_paid',
'ticket__quantity',
'ticket__sku',
'general_info_sent_at',
)
search_fields = (
'ticket__order_id',
'=ticket__order_number',
'ticket__order_token',
'ticket__token',
'user__email',
'user__profile__company',
'user__profile__full_name',
)
actions = [
'send_mail_general_info',
'send_mail_badge_reminder',
'send_mail_one_day_ticket_confirm',
'send_mail_feedback',
]
def send_mail_general_info(self, request, queryset):
"""Queue tasks for sending a general info email to selected attendees."""
count = 0
for claim in queryset:
if claim.general_info_sent_at:
continue
tasks.send_mail_general_info(ticket_id=claim.ticket_id, user_id=claim.user_id)
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
send_mail_general_info.short_description = 'Send general info email to selected attendees'
def _send_mail_to_attendee(self, request, queryset, email_name: str):
from background_task.models import Task, CompletedTask
count = 0
for claim in queryset:
# Skip if already sent this email to this attendee
if any(
# This is based on two fragile assumptions:
# that background_task uses OrderedDict when serialising task
# arguments and that task itself won't change its arguments
# in the future.
task_class.objects.filter(
task_name="tickets.tasks.send_mail_confirm_tickets",
task_params__contains=f'"email_name": "{email_name}"',
)
.filter(task_params__contains=f'"ticket_id": {claim.ticket_id},')
.filter(task_params__contains=f'"user_id": {claim.user_id}}}')
.exists()
for task_class in {CompletedTask, Task}
):
continue
tasks.send_mail_confirm_tickets(
ticket_id=claim.ticket_id, user_id=claim.user_id, email_name=email_name
)
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
def send_mail_badge_reminder(self, request, queryset):
"""Queue tasks for sending a reminder about badge info to selected attendees."""
self._send_mail_to_attendee(request, queryset, email_name='badge_reminder')
send_mail_badge_reminder.short_description = (
'Send reminder about badge info to selected attendees'
)
def send_mail_one_day_ticket_confirm(self, request, queryset):
"""Queue tasks for sending a reminder about badge info to selected attendees."""
self._send_mail_to_attendee(request, queryset, email_name='one_day_ticket_confirm')
send_mail_one_day_ticket_confirm.short_description = (
'Send emails to one-day attendees asking to confirm their day'
)
def send_mail_feedback(self, request, queryset):
"""Queue tasks for sending a request for feedback to selected attendees."""
self._send_mail_to_attendee(request, queryset, email_name='feedback')
send_mail_feedback.short_description = 'Send a request for feedback to selected attendees'
def generate_csv_report(self, request, changelist, export_all=False):
"""CSV export a list of attendees."""
if not request.user.is_staff:
raise PermissionDenied
results_to_export = changelist.queryset if export_all else changelist.result_list
field_names = self.list_display
if 'action_checkbox' in field_names:
field_names.remove('action_checkbox')
filename = self.generate_report_filename(request, changelist, export_all=export_all)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename={filename}'
writer = csv.writer(response)
headers = [
'Full name',
'Title',
'Company',
'Country code',
'Country',
'Ticket',
]
writer.writerow(headers)
for row in results_to_export:
values = [
row.full_name,
row.title,
row.company,
row.country,
row.country.name,
row.ticket,
]
writer.writerow(values)
return response
@admin.register(tickets.models.Product)
class ProductAdmin(admin.ModelAdmin):
def site_url(self, obj):
url = obj.get_absolute_url()
if not url:
return ''
return mark_safe(f'<a href="{url}" target="_blank" title="Site URL">{url}</a>')
def url(obj):
url = obj.url
if not url:
return ''
return mark_safe(f'<a href="{url}" target="_blank" title="URL">{url}</a>')
list_display = ('name', 'price', 'currency', 'site_url', 'is_featured', url)