from typing import Callable, Optional, Type
import csv
import datetime
import re
from django import forms
from django.contrib import admin, messages
from django.core.exceptions import PermissionDenied
from django.db.models import Count, F
from django.http import HttpResponse
from django.template.loader import render_to_string
from django.urls import path, reverse
from django.utils.html import format_html
from django.utils.safestring import mark_safe
import dateutil.parser
import django.db.models
from conference_main.util import absolutify
from tickets.saleor_client import orders_as_list as saleor_orders_as_list
from import format_saleor_money, format_cents
import tickets.models
import tickets.tasks as tasks
import tickets.stripe_utils
LinkFunc = Callable[[django.db.models.Model], str]
def _get_admin_url_name(model: Type['django.db.models.Model'], action: str = 'change') -> str:
return 'admin:{}_{}_{}'.format(model._meta.app_label, model._meta.model_name, action)
def create_admin_fk_link(
field_name: str, short_description: str, view_name: str, title_func=str
) -> LinkFunc:
"""Construct a function that constructs a link to the admin:xxxx_change form.
:param field_name: The object is taken from model_instance.{field_name}
:param short_description: The label for this link as shown in the admin.
:param view_name: The admin view to link to. Must take one parameter `object_id`.
:param title_func: A callable that takes model_instance and returns a string used a link title.
For example, to create a link to a customer (instead of a drop-down to edit it),
`plan_link = create_admin_fk_link('plan', 'plan', 'admin:looper_plan_change')`
def create_link(model_instance: django.db.models.Model) -> str:
referenced: Optional[object] = getattr(model_instance, field_name)
if referenced is None:
return '-'
assert isinstance(
referenced, django.db.models.Model
), f'Expected Model, not {type(referenced)}'
admin_link = reverse(view_name, kwargs={'object_id':})
link_title = title_func(referenced)
# Use primary key as link title if got a blank string:
if not link_title:
link_title = str(
return format_html('<a href="{}">{}</a>', admin_link, link_title)
# Callable[[Model], str] doesn't have those Django-specific attributes,
# and I don't feel like speccing that all out. ~~Sybren
create_link.admin_order_field = field_name # type: ignore
create_link.short_description = short_description # type: ignore
return create_link
ticket_link = create_admin_fk_link('ticket', 'ticket', _get_admin_url_name(tickets.models.Ticket))
def _get_order_link(ticket: tickets.models.Ticket):
url = ticket.get_admin_order_url()
if not url:
return ''
return mark_safe(
f'<a href="{url}" target="_blank" title="View order in the payment system">View</a>'
def _get_invoice_link(ticket: tickets.models.Ticket):
invoice_url = ticket.invoice_download_url
if not invoice_url:
return ''
return mark_safe(f'<a href="{invoice_url}" target="_blank" title="View invoice PDF">View</a>')
def claim_url(obj):
"""Display ticket's claim URL."""
if not obj.claim_url:
return 'Not applicable'
return absolutify(obj.claim_url)
class AttendeesInline(admin.TabularInline):
model = tickets.models.Ticket.attendees.through
readonly_fields = ['full_name', 'email', 'confirmation_sent_at', 'general_info_sent_at']
raw_id_fields = ['user']
extra = 0
class NumberOfUnclaimedTicketsFilter(admin.SimpleListFilter):
title = 'Unclaimed Tickets'
parameter_name = 'unclaimed_count'
def lookups(self, request, model_admin):
"""Human-readable labels for filter choices."""
queryset = model_admin.get_queryset(request)
unique_counts = set(
unclaimed_count=F('quantity') - Count('attendees', distinct=True),
).values_list('unclaimed_count', flat=True)
return [('some', 'Some unclaimed left')] + [
(count, f'{count} unclaimed left' if count > 0 else 'No unclaimed left 👍')
for count in unique_counts
def queryset(self, request, queryset):
"""Return the filtered queryset based on the value provided in the query string."""
value = self.value()
if value == 'some':
return queryset.filter(unclaimed_count__gt=0)
if value is not None:
return queryset.filter(unclaimed_count=self.value())
class TicketAdminForm(forms.ModelForm):
class Meta:
model = tickets.models.Ticket
help_texts = {
'claim_url': (
"Link that attendees can use to claim a ticket, if quantity is more than 1."
labels = {'claim_url': 'Claim URL'}
exclude = ()
class _CSVReportMixin:
def generate_report_filename(self, request, changelist, export_all):
"""Generate a file name using the model and current filters."""
opts = self.model._meta
model_name = (
opts.verbose_name_plural.replace('.', '_').replace(' ', '_').replace('looper_', '')
site = 'blender_conference_'
# E.g.: ?is_free__exact=1&is_paid__exact=0&quantity=1
def _format_filter(param, value):
if param.endswith('__quarter'):
return f'Q{value}'
elif param.endswith('__exact'):
param_name = param.replace('__exact', '')
if value == '1':
return f'{param_name}_yes'
elif value == '0':
return f'{param_name}_no'
return f'{param_name}_{value}'
# Strip hours from datetimes
return re.sub(r'\s00:00:00+.*$', '', value)
filters = '_'.join(
for _ in sorted(request.GET.items(), key=lambda x: x[0], reverse=True)
if export_all:
filters = filters + '_all'
filters = filters + f'_page_{(changelist.page_num):03}'
# Prefix filters by another "_"
filters = '_' + filters if filters else filters
filename = f'{site}_{model_name}{filters}.csv'.replace(self.date_hierarchy or '', '')
return re.sub(r'[\]\[]+', '', re.sub(r'_+', '_', filename))
def get_urls(self):
"""Return URLs of additional admin views, such as CSV export view."""
urls = super().get_urls()
model_name = f'{self.model._meta.app_label}_{self.model._meta.model_name}'
report_urls = [
return report_urls + urls
def report_as_csv_view(self, request):
"""Export a changelist (not its original queryset!) as a CSV."""
cl = self.get_changelist_instance(request)
return self.generate_csv_report(request, cl, export_all=False)
def report_all_as_csv_view(self, request):
"""Export the filtered queryset as a CSV."""
cl = self.get_changelist_instance(request)
return self.generate_csv_report(request, cl, export_all=True)
class TicketAdmin(_CSVReportMixin, admin.ModelAdmin):
show_full_result_count = True
inlines = [AttendeesInline]
form = TicketAdminForm
raw_id_fields = ['user']
list_display = (
list_filter = (
readonly_fields = (
search_fields = (
date_hierarchy = 'created_at'
actions = ['send_mail_unclaimed_reminder', 'send_mail_unpaid_reminder']
def get_fieldsets(self, *args, **kwargs):
"""Hide checkout data into a collapsible section."""
fieldsets = super().get_fieldsets(*args, **kwargs)
collapsible = {'checkout_data'}
all_fields = fieldsets[0][1]['fields']
fieldsets[0][1]['fields'] = [_ for _ in all_fields if _ not in collapsible]
collapsible_fields = [_ for _ in all_fields if _ in collapsible]
fieldsets.append(('More info', {'fields': collapsible_fields, 'classes': ['collapse']}))
return fieldsets
def view_order(self, obj):
"""Link to this ticket's order in Saleor Dashboard."""
return _get_order_link(obj)
def view_invoice(self, obj):
"""Link to this ticket's invoice PDF."""
return _get_invoice_link(obj)
def refund_status(self, obj) -> str:
"""Return refund status."""
return obj.refund_status
def generate_csv_report(self, request, changelist, export_all=False):
"""CSV export a report for ticket orders."""
if not request.user.is_staff:
raise PermissionDenied
results_to_export = changelist.queryset if export_all else changelist.result_list
field_names = self.list_display
if 'action_checkbox' in field_names:
filename = self.generate_report_filename(request, changelist, export_all=export_all)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename={filename}'
writer = csv.writer(response)
headers = [
'Order #',
'Date created',
'Gross total',
'VAT 21%',
'VAT 9%',
order_id_to_ticket = {_.order_id: _ for _ in results_to_export if _.order_id}
saleor_order_ids, stripe_order_ids = set(), set()
# The ones with `order_id` like `pi_*` definitely came from Stripe, other must be Saleor
for order_id in order_id_to_ticket:
if order_id.startswith('pi_'):
if saleor_order_ids:
saleor_order_data = saleor_orders_as_list(ids=saleor_order_ids)
for row in saleor_order_data:
vatin = next(
(_['value'] for _ in row['metadata'] if _['key'] == 'vatrc.vatin_validated'),
values = [
if stripe_order_ids:
for order_id in stripe_order_ids:
ticket = order_id_to_ticket[order_id]
if not ticket.is_paid:
session = ticket.order
currency = session.currency.upper()
latest_charge = session.payment_intent.latest_charge
paid_at_timestamp = latest_charge.created
paid_at = datetime.datetime.fromtimestamp(int(paid_at_timestamp))
vatin = next((_.value for _ in session.customer_details.tax_ids), None)
tax_details = ticket.get_tax_details(session)
values = [
format_cents(session.amount_total, currency),
format_cents(latest_charge.amount_refunded, currency),
# FIXME: here we expect that order of the metadata didn't change
format_cents(tax['amount'], session.currency)
for tax in tax_details['tax_totals']
# TODO: refunded status (partial/full)
return response
def get_queryset(self, *args, **kwargs):
"""Count unclaimed tickets for custom filtering."""
queryset = super().get_queryset(*args, **kwargs)
queryset = queryset.annotate(
unclaimed_count=F('quantity') - Count('attendees', distinct=True),
return queryset
def send_mail_unclaimed_reminder(self, request, queryset):
"""Queue tasks for sending a reminder email about unclaimed tickets."""
count = 0
for ticket in queryset:
tasks.send_mail_tickets_paid(, email_name='unclaimed_reminder')
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
def send_mail_unpaid_reminder(self, request, queryset):
"""Queue tasks for sending a reminder email about unpaid tickets."""
count = 0
for ticket in queryset:
tasks.send_mail_bank_transfer_required(, email_name='unpaid_reminder'
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
class TicketClaimAdmin(_CSVReportMixin, admin.ModelAdmin):
date_hierarchy = 'created_at'
show_full_result_count = True
raw_id_fields = ('user', 'ticket')
readonly_fields = ['confirmation_sent_at', 'general_info_sent_at']
list_select_related = ('user', 'user__profile', 'ticket', 'ticket__product')
ordering = ['user__profile__full_name']
list_display = [
list_filter = (
search_fields = (
actions = [
def custom_fields(self, obj):
ticket = obj.ticket
if not ticket.is_stripe:
return ''
return render_to_string(
'tickets/components/custom_fields_admin', {'ticket': ticket, 'order': ticket.order}
def send_mail_general_info(self, request, queryset):
"""Queue tasks for sending a general info email to selected attendees."""
count = 0
for claim in queryset:
if claim.general_info_sent_at:
tasks.send_mail_general_info(ticket_id=claim.ticket_id, user_id=claim.user_id)
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
send_mail_general_info.short_description = 'Send general info email to selected attendees'
def _send_mail_to_attendee(self, request, queryset, email_name: str):
from background_task.models import Task, CompletedTask
count = 0
for claim in queryset:
# Skip if already sent this email to this attendee
if any(
# This is based on two fragile assumptions:
# that background_task uses OrderedDict when serialising task
# arguments and that task itself won't change its arguments
# in the future.
task_params__contains=f'"email_name": "{email_name}"',
.filter(task_params__contains=f'"ticket_id": {claim.ticket_id},')
.filter(task_params__contains=f'"user_id": {claim.user_id}}}')
for task_class in {CompletedTask, Task}
ticket_id=claim.ticket_id, user_id=claim.user_id, email_name=email_name
count += 1
msg = f'{count} emails queued for sending'
self.message_user(request, msg, messages.SUCCESS)
def send_mail_badge_reminder(self, request, queryset):
"""Queue tasks for sending a reminder about badge info to selected attendees."""
self._send_mail_to_attendee(request, queryset, email_name='badge_reminder')
send_mail_badge_reminder.short_description = (
'Send reminder about badge info to selected attendees'
def send_mail_one_day_ticket_confirm(self, request, queryset):
"""Queue tasks for sending a reminder about badge info to selected attendees."""
self._send_mail_to_attendee(request, queryset, email_name='one_day_ticket_confirm')
send_mail_one_day_ticket_confirm.short_description = (
'Send emails to one-day attendees asking to confirm their day'
def send_mail_feedback(self, request, queryset):
"""Queue tasks for sending a request for feedback to selected attendees."""
self._send_mail_to_attendee(request, queryset, email_name='feedback')
send_mail_feedback.short_description = 'Send a request for feedback to selected attendees'
def generate_csv_report(self, request, changelist, export_all=False):
"""CSV export a list of attendees."""
if not request.user.is_staff:
raise PermissionDenied
results_to_export = changelist.queryset if export_all else changelist.result_list
field_names = self.list_display
if 'action_checkbox' in field_names:
filename = self.generate_report_filename(request, changelist, export_all=export_all)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename={filename}'
writer = csv.writer(response)
headers = [
'Full name',
'Country code',
for row in results_to_export:
values = [
return response
class ProductAdmin(admin.ModelAdmin):
def site_url(self, obj):
url = obj.get_absolute_url()
if not url:
return ''
return mark_safe(f'<a href="{url}" target="_blank" title="Site URL">{url}</a>')
def url(obj):
url = obj.url
if not url:
return ''
return mark_safe(f'<a href="{url}" target="_blank" title="URL">{url}</a>')
list_display = ('name', 'price', 'currency', 'site_url', 'is_featured', url)