278 lines
9.7 KiB
Python
278 lines
9.7 KiB
Python
from decimal import Decimal
|
|
from functools import lru_cache
|
|
import logging
|
|
import time
|
|
|
|
from django.conf import settings
|
|
from django.contrib.sites.models import Site
|
|
from django.db import transaction
|
|
import django.http.request
|
|
import stripe
|
|
|
|
from conference_main.util import absolute_url
|
|
import tickets.models
|
|
|
|
stripe.api_key = settings.STRIPE_API_KEY
|
|
logger = logging.getLogger(__name__)
|
|
SESSION_EXPANDS = [
|
|
'customer',
|
|
'line_items',
|
|
'line_items.data.price',
|
|
'line_items.data.price.product',
|
|
'payment_intent',
|
|
'payment_intent.customer',
|
|
'payment_intent.latest_charge',
|
|
'payment_intent.latest_charge.refunds',
|
|
'payment_link',
|
|
]
|
|
|
|
|
|
def _cents_to_decimal(value):
|
|
_cents = int(value)
|
|
whole = _cents // 100
|
|
cents = _cents % 100
|
|
return Decimal(f'{whole}.{cents:02}')
|
|
|
|
|
|
def get_ttl_hash(seconds=60):
|
|
"""Return the same value withing `seconds` time period"""
|
|
return round(time.time() / seconds)
|
|
|
|
|
|
def retrieve_session(session_id: str) -> 'stripe.checkout.Session':
|
|
"""Retrieve Stripe checkout session with a given ID."""
|
|
return stripe.checkout.Session.retrieve(session_id, expand=SESSION_EXPANDS)
|
|
|
|
|
|
def list_sessions(payment_intent_id: str) -> list:
|
|
"""List checkout sessions linked to given payment intent ID."""
|
|
return stripe.checkout.Session.list(payment_intent=payment_intent_id).data
|
|
|
|
|
|
def retrieve_session_for_payment_intent(payment_intent_id: str) -> 'stripe.checkout.Session':
|
|
"""Retrieve Stripe checkout session for a given PaymentIntent ID.
|
|
|
|
Make 2 API requests: it first lists checkout session for this PaymentIntent ID
|
|
to get CheckoutSession's ID, then retrieves the CheckoutSession itself.
|
|
This is done in order to get line item's product expanded, otherwise it doesn't fit into
|
|
the "no more than 4 levels deep" restriction.
|
|
"""
|
|
session = list_sessions(payment_intent_id)[0]
|
|
return retrieve_session(session['id'])
|
|
|
|
|
|
@lru_cache(maxsize=10)
|
|
def retrieve_product(name: str, ttl_hash=None) -> 'stripe.Product':
|
|
"""Retrieve a first Stripe product with a matching name."""
|
|
products = stripe.Product.list()
|
|
return next((_ for _ in products.data if _.name == name), None)
|
|
|
|
|
|
def retrieve_order(payment_intent_id: str, name: str) -> dict:
|
|
"""Construct a dict with enough info to both show at ticket details page and on an invoice."""
|
|
session = retrieve_session_for_payment_intent(payment_intent_id)
|
|
return _construct_checkout_session(session)
|
|
|
|
|
|
def _construct_checkout_session(session: dict):
|
|
assert session
|
|
if not isinstance(session, stripe.checkout.Session):
|
|
session = stripe.checkout.Session.construct_from(session, '')
|
|
return session
|
|
|
|
|
|
def format_payment_method(payment_method: dict) -> str:
|
|
"""Format a payment method from `payment_method_details` received from Stipe API."""
|
|
_type = payment_method['type']
|
|
data = payment_method[_type]
|
|
if _type == 'card':
|
|
return f'{data["brand"].upper()} credit card ending in {data["last4"]}'
|
|
if _type == 'paypal':
|
|
return 'PayPal account {payer_email}'.format(**data)
|
|
if _type == 'customer_balance':
|
|
return 'Bank transfer'
|
|
if _type in {'ideal', 'giropay', 'sofort', 'sepa_debit'}:
|
|
bic = data.get('bic') or ''
|
|
last4 = data.get('last4', data.get('iban_last4')) or ''
|
|
_title = bic
|
|
if last4:
|
|
_title += f'{"*" * 10}{last4}'
|
|
if _type == 'ideal':
|
|
_title += ' via iDEAL'
|
|
elif _type == 'sepa_debit':
|
|
_title += ' via SEPA Direct Debit'
|
|
else:
|
|
_title += f' via {_type.upper()}'
|
|
return _title
|
|
return _type.upper()
|
|
|
|
|
|
def construct_event_from_request(
|
|
request: 'django.http.request.HttpRequest',
|
|
) -> 'stripe.api_resources.Event':
|
|
"""Verify webhook payload and return an Event object constructed from request body."""
|
|
payload = request.body
|
|
sig_header = request.META['HTTP_STRIPE_SIGNATURE']
|
|
return stripe.Webhook.construct_event(payload, sig_header, settings.STRIPE_ENDPOINT_SECRET)
|
|
|
|
|
|
def construct_event(payload) -> 'stripe.api_resources.Event':
|
|
"""
|
|
Return Event object constructed from given payload.
|
|
|
|
When handling webhook events must only be used after request signature was verified.
|
|
"""
|
|
return stripe.Event.construct_from(payload, settings.STRIPE_ENDPOINT_SECRET)
|
|
|
|
|
|
def _update_checkout_data(ticket, checkout_session):
|
|
ticket.checkout_data = checkout_session
|
|
ticket.save(update_fields={'checkout_data'})
|
|
|
|
|
|
def _upsert_ticket_from_payment_intent(user_id, sku, quantity, payment_intent, product=None):
|
|
is_new = False
|
|
is_paid = payment_intent['status'] == 'succeeded'
|
|
try:
|
|
ticket = tickets.models.Ticket.objects.get(order_id=payment_intent['id'])
|
|
except tickets.models.Ticket.DoesNotExist:
|
|
ticket = tickets.models.Ticket(
|
|
edition=Site.objects.get_current().settings.current_edition,
|
|
# Must be created paid if the payment was immediate,
|
|
# otherwise updates triggered by other webhook events might cause duplicate emails
|
|
is_paid=is_paid,
|
|
order_id=payment_intent['id'],
|
|
product=product,
|
|
quantity=quantity,
|
|
sku=product and product.slug or sku,
|
|
user_id=user_id,
|
|
)
|
|
ticket.save()
|
|
is_new = True
|
|
|
|
update_fields = set()
|
|
if not ticket.product and product:
|
|
ticket.product = product
|
|
update_fields.add('product')
|
|
if not ticket.order_number:
|
|
ticket.order_number = f'ST-{ticket.id}'
|
|
update_fields.add('order_number')
|
|
if not ticket.invoice_url:
|
|
invoice_url = absolute_url('tickets:invoice-pdf', kwargs={'ticket_token': ticket.token})
|
|
ticket.invoice_url = invoice_url
|
|
update_fields.add('invoice_url')
|
|
if not ticket.is_paid and is_paid:
|
|
ticket.is_paid = True
|
|
update_fields.add('is_paid')
|
|
if update_fields:
|
|
ticket.save(update_fields=update_fields)
|
|
|
|
if is_new:
|
|
ticket.process_new_ticket()
|
|
else:
|
|
ticket.claim_if_paid_for_one()
|
|
return ticket
|
|
|
|
|
|
def process_charge_refunded(payload):
|
|
"""Clear attendees from matching ticket, if it's been fully refunded.
|
|
|
|
Also, update stored checkout session data.
|
|
"""
|
|
charge = payload.get('data', {}).get('object', {})
|
|
payment_intent_id = charge.payment_intent
|
|
session = retrieve_session_for_payment_intent(payment_intent_id)
|
|
|
|
ticket_q = tickets.models.Ticket.objects.select_for_update().filter(order_id=payment_intent_id)
|
|
with transaction.atomic():
|
|
ticket = ticket_q.first()
|
|
if not ticket:
|
|
logger.info('No ticket found for refund of %s, ignoring', payment_intent_id)
|
|
return
|
|
is_full_refund = charge.amount == charge.amount_refunded
|
|
if is_full_refund:
|
|
logger.info('Removing all attendees from a fully refunded ticket pk=%s', ticket.pk)
|
|
ticket.attendees.clear()
|
|
|
|
_update_checkout_data(ticket, session)
|
|
|
|
|
|
def _product_by_payment_link(checkout_session):
|
|
if not checkout_session.payment_link:
|
|
return None
|
|
url = checkout_session.payment_link.url
|
|
return tickets.models.Product.objects.filter(url__contains=url).first()
|
|
|
|
|
|
def _product_by_price_id(line_item):
|
|
price_id = line_item.price['id']
|
|
return tickets.models.Product.objects.filter(url__contains=price_id).first()
|
|
|
|
|
|
def process_checkout_session_completed(session_id: str) -> 'tickets.models.Ticket':
|
|
"""Create or update a ticket based on a completed Stripe checkout."""
|
|
checkout_session = retrieve_session(session_id)
|
|
# If something is missing from checkout_session, this will raise an exception.
|
|
# This is something we want.
|
|
line_item = checkout_session['line_items']['data'][0]
|
|
payment_intent = checkout_session['payment_intent']
|
|
product_name = line_item.price.product.name
|
|
quantity = line_item['quantity']
|
|
user_id = int(checkout_session['client_reference_id'])
|
|
# TODO: just use metadata['product_id']?
|
|
product = _product_by_price_id(line_item) or _product_by_payment_link(checkout_session)
|
|
|
|
with transaction.atomic():
|
|
ticket = _upsert_ticket_from_payment_intent(
|
|
user_id, product_name, quantity, payment_intent, product=product
|
|
)
|
|
|
|
_update_checkout_data(ticket, checkout_session)
|
|
return ticket
|
|
|
|
|
|
def create_checkout_session(
|
|
product: object,
|
|
client_reference_id: int,
|
|
customer_email: str,
|
|
cancel_url: str,
|
|
success_url: str,
|
|
) -> stripe.api_resources.checkout.Session:
|
|
"""Create a checkout session with given parameters."""
|
|
# we have to do it to avoid uri-encoding of curly braces,
|
|
# otherwise stripe doesn't do the template substitution
|
|
existing_customers = stripe.Customer.list(email=customer_email, limit=1)
|
|
if existing_customers.data:
|
|
customer = existing_customers.data[0]
|
|
else:
|
|
customer = stripe.Customer.create(
|
|
email=customer_email, metadata={'conference_user_id': client_reference_id}
|
|
)
|
|
success_url = success_url + '?checkout_session_id={CHECKOUT_SESSION_ID}'
|
|
line_items = [
|
|
{
|
|
'adjustable_quantity': {'enabled': True},
|
|
'price': product.price_id,
|
|
'quantity': 1,
|
|
},
|
|
]
|
|
payment_intent_data = {
|
|
'metadata': {'product_id': product.pk},
|
|
}
|
|
return stripe.checkout.Session.create(
|
|
allow_promotion_codes=True,
|
|
automatic_tax={'enabled': False},
|
|
billing_address_collection='required',
|
|
cancel_url=cancel_url,
|
|
client_reference_id=client_reference_id,
|
|
customer=customer.id,
|
|
customer_update={'name': 'auto', 'address': 'auto'},
|
|
line_items=line_items,
|
|
mode='payment',
|
|
payment_intent_data=payment_intent_data,
|
|
submit_type='book',
|
|
success_url=success_url,
|
|
tax_id_collection={'enabled': True},
|
|
ui_mode='hosted',
|
|
)
|