547 lines
20 KiB
Python
547 lines
20 KiB
Python
from typing import Callable, Optional, Type
|
|
import csv
|
|
import datetime
|
|
import re
|
|
|
|
from django import forms
|
|
from django.contrib import admin, messages
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.db.models import Count, F
|
|
from django.http import HttpResponse
|
|
from django.urls import path
|
|
from django.urls import reverse
|
|
from django.utils.html import format_html
|
|
from django.utils.safestring import mark_safe
|
|
import dateutil.parser
|
|
import django.db.models
|
|
|
|
from conference_main.util import absolutify
|
|
from tickets.saleor_client import orders_as_list as saleor_orders_as_list
|
|
from tickets.templatetags.tickets import format_saleor_money, format_cents
|
|
import tickets.models
|
|
import tickets.tasks as tasks
|
|
import tickets.stripe_utils
|
|
|
|
LinkFunc = Callable[[django.db.models.Model], str]
|
|
|
|
|
|
def _get_admin_url_name(model: Type['django.db.models.Model'], action: str = 'change') -> str:
|
|
return 'admin:{}_{}_{}'.format(model._meta.app_label, model._meta.model_name, action)
|
|
|
|
|
|
def create_admin_fk_link(
|
|
field_name: str, short_description: str, view_name: str, title_func=str
|
|
) -> LinkFunc:
|
|
"""Construct a function that constructs a link to the admin:xxxx_change form.
|
|
|
|
:param field_name: The object is taken from model_instance.{field_name}
|
|
:param short_description: The label for this link as shown in the admin.
|
|
:param view_name: The admin view to link to. Must take one parameter `object_id`.
|
|
:param title_func: A callable that takes model_instance and returns a string used a link title.
|
|
|
|
For example, to create a link to a customer (instead of a drop-down to edit it),
|
|
use:
|
|
|
|
`plan_link = create_admin_fk_link('plan', 'plan', 'admin:looper_plan_change')`
|
|
"""
|
|
|
|
def create_link(model_instance: django.db.models.Model) -> str:
|
|
referenced: Optional[object] = getattr(model_instance, field_name)
|
|
if referenced is None:
|
|
return '-'
|
|
assert isinstance(
|
|
referenced, django.db.models.Model
|
|
), f'Expected Model, not {type(referenced)}'
|
|
admin_link = reverse(view_name, kwargs={'object_id': referenced.pk})
|
|
link_title = title_func(referenced)
|
|
# Use primary key as link title if got a blank string:
|
|
if not link_title:
|
|
link_title = str(referenced.pk)
|
|
return format_html('<a href="{}">{}</a>', admin_link, link_title)
|
|
|
|
# Callable[[Model], str] doesn't have those Django-specific attributes,
|
|
# and I don't feel like speccing that all out. ~~Sybren
|
|
create_link.admin_order_field = field_name # type: ignore
|
|
create_link.short_description = short_description # type: ignore
|
|
return create_link
|
|
|
|
|
|
ticket_link = create_admin_fk_link('ticket', 'ticket', _get_admin_url_name(tickets.models.Ticket))
|
|
|
|
|
|
def _get_order_link(ticket: tickets.models.Ticket):
|
|
url = ticket.get_admin_order_url()
|
|
if not url:
|
|
return ''
|
|
return mark_safe(
|
|
f'<a href="{url}" target="_blank" title="View order in the payment system">View</a>'
|
|
)
|
|
|
|
|
|
def _get_invoice_link(ticket: tickets.models.Ticket):
|
|
invoice_url = ticket.invoice_download_url
|
|
if not invoice_url:
|
|
return ''
|
|
return mark_safe(f'<a href="{invoice_url}" target="_blank" title="View invoice PDF">View</a>')
|
|
|
|
|
|
def claim_url(obj):
|
|
"""Display ticket's claim URL."""
|
|
if not obj.claim_url:
|
|
return 'Not applicable'
|
|
return absolutify(obj.claim_url)
|
|
|
|
|
|
class AttendeesInline(admin.TabularInline):
|
|
model = tickets.models.Ticket.attendees.through
|
|
readonly_fields = ['full_name', 'email', 'confirmation_sent_at', 'general_info_sent_at']
|
|
raw_id_fields = ['user']
|
|
extra = 0
|
|
|
|
|
|
class NumberOfUnclaimedTicketsFilter(admin.SimpleListFilter):
|
|
title = 'Unclaimed Tickets'
|
|
|
|
parameter_name = 'unclaimed_count'
|
|
|
|
def lookups(self, request, model_admin):
|
|
"""Human-readable labels for filter choices."""
|
|
queryset = model_admin.get_queryset(request)
|
|
unique_counts = set(
|
|
queryset.annotate(
|
|
unclaimed_count=F('quantity') - Count('attendees', distinct=True),
|
|
).values_list('unclaimed_count', flat=True)
|
|
)
|
|
return [('some', 'Some unclaimed left')] + [
|
|
(count, f'{count} unclaimed left' if count > 0 else 'No unclaimed left 👍')
|
|
for count in unique_counts
|
|
]
|
|
|
|
def queryset(self, request, queryset):
|
|
"""Return the filtered queryset based on the value provided in the query string."""
|
|
value = self.value()
|
|
if value == 'some':
|
|
return queryset.filter(unclaimed_count__gt=0)
|
|
if value is not None:
|
|
return queryset.filter(unclaimed_count=self.value())
|
|
|
|
|
|
class TicketAdminForm(forms.ModelForm):
|
|
class Meta:
|
|
model = tickets.models.Ticket
|
|
help_texts = {
|
|
'claim_url': (
|
|
"Link that attendees can use to claim a ticket, if quantity is more than 1."
|
|
)
|
|
}
|
|
labels = {'claim_url': 'Claim URL'}
|
|
exclude = ()
|
|
|
|
|
|
class _CSVReportMixin:
|
|
def generate_report_filename(self, request, changelist, export_all):
|
|
"""Generate a file name using the model and current filters."""
|
|
opts = self.model._meta
|
|
model_name = (
|
|
opts.verbose_name_plural.replace('.', '_').replace(' ', '_').replace('looper_', '')
|
|
)
|
|
site = 'blender_conference_'
|
|
|
|
# E.g.: ?is_free__exact=1&is_paid__exact=0&quantity=1
|
|
def _format_filter(param, value):
|
|
if param.endswith('__quarter'):
|
|
return f'Q{value}'
|
|
elif param.endswith('__exact'):
|
|
param_name = param.replace('__exact', '')
|
|
if value == '1':
|
|
return f'{param_name}_yes'
|
|
elif value == '0':
|
|
return f'{param_name}_no'
|
|
return f'{param_name}_{value}'
|
|
# Strip hours from datetimes
|
|
return re.sub(r'\s00:00:00+.*$', '', value)
|
|
|
|
filters = '_'.join(
|
|
[
|
|
_format_filter(*_)
|
|
for _ in sorted(request.GET.items(), key=lambda x: x[0], reverse=True)
|
|
]
|
|
)
|
|
if export_all:
|
|
filters = filters + '_all'
|
|
else:
|
|
filters = filters + f'_page_{(changelist.page_num):03}'
|
|
# Prefix filters by another "_"
|
|
filters = '_' + filters if filters else filters
|
|
filename = f'{site}_{model_name}{filters}.csv'.replace(self.date_hierarchy or '', '')
|
|
return re.sub(r'[\]\[]+', '', re.sub(r'_+', '_', filename))
|
|
|
|
def get_urls(self):
|
|
"""Return URLs of additional admin views, such as CSV export view."""
|
|
urls = super().get_urls()
|
|
model_name = f'{self.model._meta.app_label}_{self.model._meta.model_name}'
|
|
report_urls = [
|
|
path(
|
|
'download-report-csv/',
|
|
self.report_as_csv_view,
|
|
name=f'{model_name}_download_report_as_csv',
|
|
),
|
|
path(
|
|
'download-report-all-csv/',
|
|
self.report_all_as_csv_view,
|
|
name=f'{model_name}_download_report_all_as_csv',
|
|
),
|
|
]
|
|
return report_urls + urls
|
|
|
|
def report_as_csv_view(self, request):
|
|
"""Export a changelist (not its original queryset!) as a CSV."""
|
|
cl = self.get_changelist_instance(request)
|
|
return self.generate_csv_report(request, cl, export_all=False)
|
|
|
|
def report_all_as_csv_view(self, request):
|
|
"""Export the filtered queryset as a CSV."""
|
|
cl = self.get_changelist_instance(request)
|
|
return self.generate_csv_report(request, cl, export_all=True)
|
|
|
|
|
|
@admin.register(tickets.models.Ticket)
|
|
class TicketAdmin(_CSVReportMixin, admin.ModelAdmin):
|
|
show_full_result_count = True
|
|
inlines = [AttendeesInline]
|
|
form = TicketAdminForm
|
|
raw_id_fields = ['user']
|
|
list_display = (
|
|
'id',
|
|
'edition',
|
|
'quantity',
|
|
'unclaimed',
|
|
'user',
|
|
'sku',
|
|
'order_number',
|
|
'is_paid',
|
|
'is_free',
|
|
'view_order',
|
|
'view_invoice',
|
|
'refund_status',
|
|
)
|
|
list_filter = (
|
|
'edition',
|
|
'is_free',
|
|
'is_paid',
|
|
'quantity',
|
|
NumberOfUnclaimedTicketsFilter,
|
|
'sku',
|
|
)
|
|
readonly_fields = (
|
|
claim_url,
|
|
'order_token',
|
|
'order_id',
|
|
'view_order',
|
|
'refund_status',
|
|
'created_at',
|
|
'updated_at',
|
|
'checkout_data',
|
|
)
|
|
search_fields = (
|
|
'token',
|
|
'order_token',
|
|
'order_id',
|
|
'order_number',
|
|
'user__email',
|
|
'user__username',
|
|
'attendees__email',
|
|
'user__profile__company',
|
|
'user__profile__full_name',
|
|
'user__profile__title',
|
|
)
|
|
date_hierarchy = 'created_at'
|
|
actions = ['send_mail_unclaimed_reminder', 'send_mail_unpaid_reminder']
|
|
|
|
def get_fieldsets(self, *args, **kwargs):
|
|
"""Hide checkout data into a collapsible section."""
|
|
fieldsets = super().get_fieldsets(*args, **kwargs)
|
|
collapsible = {'checkout_data'}
|
|
all_fields = fieldsets[0][1]['fields']
|
|
fieldsets[0][1]['fields'] = [_ for _ in all_fields if _ not in collapsible]
|
|
|
|
collapsible_fields = [_ for _ in all_fields if _ in collapsible]
|
|
fieldsets.append(('More info', {'fields': collapsible_fields, 'classes': ['collapse']}))
|
|
return fieldsets
|
|
|
|
def view_order(self, obj):
|
|
"""Link to this ticket's order in Saleor Dashboard."""
|
|
return _get_order_link(obj)
|
|
|
|
def view_invoice(self, obj):
|
|
"""Link to this ticket's invoice PDF."""
|
|
return _get_invoice_link(obj)
|
|
|
|
def refund_status(self, obj) -> str:
|
|
"""Return refund status."""
|
|
return obj.refund_status
|
|
|
|
def generate_csv_report(self, request, changelist, export_all=False):
|
|
"""CSV export a report for ticket orders."""
|
|
if not request.user.is_staff:
|
|
raise PermissionDenied
|
|
|
|
results_to_export = changelist.queryset if export_all else changelist.result_list
|
|
|
|
field_names = self.list_display
|
|
if 'action_checkbox' in field_names:
|
|
field_names.remove('action_checkbox')
|
|
|
|
filename = self.generate_report_filename(request, changelist, export_all=export_all)
|
|
response = HttpResponse(content_type='text/csv')
|
|
response['Content-Disposition'] = f'attachment; filename={filename}'
|
|
|
|
writer = csv.writer(response)
|
|
headers = [
|
|
'Order #',
|
|
'Date created',
|
|
'Name',
|
|
'Quantity',
|
|
'Country',
|
|
'Gross total',
|
|
'Refund',
|
|
'VATIN',
|
|
'VAT 21%',
|
|
'VAT 9%',
|
|
]
|
|
writer.writerow(headers)
|
|
|
|
order_id_to_ticket = {_.order_id: _ for _ in results_to_export if _.order_id}
|
|
saleor_order_ids, stripe_order_ids = set(), set()
|
|
|
|
# The ones with `order_id` like `pi_*` definitely came from Stripe, other must be Saleor
|
|
for order_id in order_id_to_ticket:
|
|
if order_id.startswith('pi_'):
|
|
stripe_order_ids.add(order_id)
|
|
else:
|
|
saleor_order_ids.add(order_id)
|
|
|
|
if saleor_order_ids:
|
|
saleor_order_data = saleor_orders_as_list(ids=saleor_order_ids)
|
|
|
|
for row in saleor_order_data:
|
|
vatin = next(
|
|
(_['value'] for _ in row['metadata'] if _['key'] == 'vatrc.vatin_validated'),
|
|
None,
|
|
)
|
|
values = [
|
|
row['number'],
|
|
dateutil.parser.parse(row['created']).date(),
|
|
row['lines'][0]['variant']['name'],
|
|
row['lines'][0]['quantity'],
|
|
row['billingAddress']['country']['code'],
|
|
format_saleor_money(row['total']['gross']),
|
|
vatin,
|
|
]
|
|
writer.writerow(values)
|
|
|
|
if stripe_order_ids:
|
|
for order_id in stripe_order_ids:
|
|
ticket = order_id_to_ticket[order_id]
|
|
session = ticket.order
|
|
currency = session.currency.upper()
|
|
latest_charge = session.payment_intent.latest_charge
|
|
paid_at_timestamp = latest_charge.created
|
|
paid_at = datetime.datetime.fromtimestamp(int(paid_at_timestamp))
|
|
vatin = next((_.value for _ in session.customer_details.tax_ids), None)
|
|
values = [
|
|
ticket.order_number,
|
|
paid_at.date(),
|
|
session.line_items[0].price.product.name,
|
|
ticket.quantity,
|
|
latest_charge.billing_details.address.country,
|
|
format_cents(session.amount_total, currency),
|
|
format_cents(latest_charge.amount_refunded, currency),
|
|
vatin,
|
|
# FIXME: here we expect that order of the metadata didn't change
|
|
*[format_cents(tax['amount'], session.currency) for tax in session.tax_details],
|
|
]
|
|
writer.writerow(values)
|
|
# TODO: refunded status (partial/full)
|
|
return response
|
|
|
|
def get_queryset(self, *args, **kwargs):
|
|
"""Count unclaimed tickets for custom filtering."""
|
|
queryset = super().get_queryset(*args, **kwargs)
|
|
queryset = queryset.annotate(
|
|
unclaimed_count=F('quantity') - Count('attendees', distinct=True),
|
|
)
|
|
return queryset
|
|
|
|
def send_mail_unclaimed_reminder(self, request, queryset):
|
|
"""Queue tasks for sending a reminder email about unclaimed tickets."""
|
|
count = 0
|
|
for ticket in queryset:
|
|
tasks.send_mail_tickets_paid(ticket_id=ticket.id, email_name='unclaimed_reminder')
|
|
count += 1
|
|
msg = f'{count} emails queued for sending'
|
|
self.message_user(request, msg, messages.SUCCESS)
|
|
|
|
def send_mail_unpaid_reminder(self, request, queryset):
|
|
"""Queue tasks for sending a reminder email about unpaid tickets."""
|
|
count = 0
|
|
for ticket in queryset:
|
|
tasks.send_mail_bank_transfer_required(
|
|
ticket_id=ticket.id, email_name='unpaid_reminder'
|
|
)
|
|
count += 1
|
|
msg = f'{count} emails queued for sending'
|
|
self.message_user(request, msg, messages.SUCCESS)
|
|
|
|
|
|
@admin.register(tickets.models.TicketClaim)
|
|
class TicketClaimAdmin(_CSVReportMixin, admin.ModelAdmin):
|
|
date_hierarchy = 'created_at'
|
|
show_full_result_count = True
|
|
raw_id_fields = ('user', 'ticket')
|
|
readonly_fields = ['confirmation_sent_at', 'general_info_sent_at']
|
|
list_select_related = ('user', 'user__profile', 'ticket')
|
|
ordering = ['user__profile__full_name']
|
|
list_display = ['created_at', 'full_name', 'title', 'company', 'country', 'email', ticket_link]
|
|
list_filter = (
|
|
'ticket__edition',
|
|
'ticket__is_free',
|
|
'ticket__is_paid',
|
|
'ticket__quantity',
|
|
'ticket__sku',
|
|
'general_info_sent_at',
|
|
)
|
|
search_fields = (
|
|
'ticket__order_id',
|
|
'=ticket__order_number',
|
|
'ticket__order_token',
|
|
'ticket__token',
|
|
'user__email',
|
|
'user__profile__company',
|
|
'user__profile__full_name',
|
|
)
|
|
actions = [
|
|
'send_mail_general_info',
|
|
'send_mail_badge_reminder',
|
|
'send_mail_one_day_ticket_confirm',
|
|
'send_mail_feedback',
|
|
]
|
|
|
|
def send_mail_general_info(self, request, queryset):
|
|
"""Queue tasks for sending a general info email to selected attendees."""
|
|
count = 0
|
|
for claim in queryset:
|
|
if claim.general_info_sent_at:
|
|
continue
|
|
tasks.send_mail_general_info(ticket_id=claim.ticket_id, user_id=claim.user_id)
|
|
count += 1
|
|
msg = f'{count} emails queued for sending'
|
|
self.message_user(request, msg, messages.SUCCESS)
|
|
|
|
send_mail_general_info.short_description = 'Send general info email to selected attendees'
|
|
|
|
def _send_mail_to_attendee(self, request, queryset, email_name: str):
|
|
from background_task.models import Task, CompletedTask
|
|
|
|
count = 0
|
|
for claim in queryset:
|
|
# Skip if already sent this email to this attendee
|
|
if any(
|
|
# This is based on two fragile assumptions:
|
|
# that background_task uses OrderedDict when serialising task
|
|
# arguments and that task itself won't change its arguments
|
|
# in the future.
|
|
task_class.objects.filter(
|
|
task_name="tickets.tasks.send_mail_confirm_tickets",
|
|
task_params__contains=f'"email_name": "{email_name}"',
|
|
)
|
|
.filter(task_params__contains=f'"ticket_id": {claim.ticket_id},')
|
|
.filter(task_params__contains=f'"user_id": {claim.user_id}}}')
|
|
.exists()
|
|
for task_class in {CompletedTask, Task}
|
|
):
|
|
continue
|
|
tasks.send_mail_confirm_tickets(
|
|
ticket_id=claim.ticket_id, user_id=claim.user_id, email_name=email_name
|
|
)
|
|
count += 1
|
|
msg = f'{count} emails queued for sending'
|
|
self.message_user(request, msg, messages.SUCCESS)
|
|
|
|
def send_mail_badge_reminder(self, request, queryset):
|
|
"""Queue tasks for sending a reminder about badge info to selected attendees."""
|
|
self._send_mail_to_attendee(request, queryset, email_name='badge_reminder')
|
|
|
|
send_mail_badge_reminder.short_description = (
|
|
'Send reminder about badge info to selected attendees'
|
|
)
|
|
|
|
def send_mail_one_day_ticket_confirm(self, request, queryset):
|
|
"""Queue tasks for sending a reminder about badge info to selected attendees."""
|
|
self._send_mail_to_attendee(request, queryset, email_name='one_day_ticket_confirm')
|
|
|
|
send_mail_one_day_ticket_confirm.short_description = (
|
|
'Send emails to one-day attendees asking to confirm their day'
|
|
)
|
|
|
|
def send_mail_feedback(self, request, queryset):
|
|
"""Queue tasks for sending a request for feedback to selected attendees."""
|
|
self._send_mail_to_attendee(request, queryset, email_name='feedback')
|
|
|
|
send_mail_feedback.short_description = 'Send a request for feedback to selected attendees'
|
|
|
|
def generate_csv_report(self, request, changelist, export_all=False):
|
|
"""CSV export a list of attendees."""
|
|
if not request.user.is_staff:
|
|
raise PermissionDenied
|
|
|
|
results_to_export = changelist.queryset if export_all else changelist.result_list
|
|
|
|
field_names = self.list_display
|
|
if 'action_checkbox' in field_names:
|
|
field_names.remove('action_checkbox')
|
|
|
|
filename = self.generate_report_filename(request, changelist, export_all=export_all)
|
|
response = HttpResponse(content_type='text/csv')
|
|
response['Content-Disposition'] = f'attachment; filename={filename}'
|
|
|
|
writer = csv.writer(response)
|
|
headers = [
|
|
'Full name',
|
|
'Title',
|
|
'Company',
|
|
'Country code',
|
|
'Country',
|
|
'Ticket',
|
|
]
|
|
writer.writerow(headers)
|
|
|
|
for row in results_to_export:
|
|
values = [
|
|
row.full_name,
|
|
row.title,
|
|
row.company,
|
|
row.country,
|
|
row.country.name,
|
|
row.ticket,
|
|
]
|
|
writer.writerow(values)
|
|
return response
|
|
|
|
|
|
@admin.register(tickets.models.Product)
|
|
class ProductAdmin(admin.ModelAdmin):
|
|
def site_url(self, obj):
|
|
url = obj.get_absolute_url()
|
|
if not url:
|
|
return ''
|
|
return mark_safe(f'<a href="{url}" target="_blank" title="Site URL">{url}</a>')
|
|
|
|
def url(obj):
|
|
url = obj.url
|
|
if not url:
|
|
return ''
|
|
return mark_safe(f'<a href="{url}" target="_blank" title="URL">{url}</a>')
|
|
|
|
list_display = ('name', 'price', 'currency', 'site_url', 'is_featured', url)
|