140 lines
4.7 KiB
Python
140 lines
4.7 KiB
Python
"""Implement webhook handlers."""
|
|
from typing import Any, Dict
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
|
|
from background_task import background
|
|
from django.conf import settings
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.db.utils import IntegrityError
|
|
from django.http import HttpResponse, HttpResponseBadRequest
|
|
from django.http.request import HttpRequest
|
|
from django.utils.dateparse import parse_datetime
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_POST
|
|
|
|
from users.queries import set_groups_from_roles
|
|
from users.blender_id import BIDSession
|
|
|
|
bid = BIDSession()
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
WEBHOOK_MAX_BODY_SIZE = 1024 * 10 # 10 kB is large enough
|
|
|
|
|
|
@csrf_exempt
|
|
@require_POST
|
|
def user_modified_webhook(request: HttpRequest) -> HttpResponse:
|
|
"""Handle user modified request sent by Blender ID.
|
|
|
|
Payload is expected to have the following format:
|
|
{
|
|
"avatar_changed": false,
|
|
"avatar_url": null,
|
|
"confirmed_email_at": "2022-09-01T13:47:00+00:00",
|
|
"date_deletion_requested": "2020-01-25T09:51:00+00:00",
|
|
"email": "newmail@example.com",
|
|
"full_name": "John Doe",
|
|
"id": 2,
|
|
"nickname": "some-nickname",
|
|
"old_email": "mail@example.com",
|
|
"old_nickname": "some-old-nickname",
|
|
"roles": ["role1", "role2"],
|
|
}
|
|
"""
|
|
hmac_secret = settings.BLENDER_ID['WEBHOOK_USER_MODIFIED_SECRET']
|
|
if isinstance(hmac_secret, str):
|
|
hmac_secret = hmac_secret.encode()
|
|
|
|
# Check the content type
|
|
if request.content_type != 'application/json':
|
|
logger.info(f'unexpected content type {request.content_type}')
|
|
return HttpResponseBadRequest('Unsupported Content-Type')
|
|
|
|
# Check the length of the body
|
|
content_length = int(request.headers['CONTENT_LENGTH'])
|
|
if content_length > WEBHOOK_MAX_BODY_SIZE:
|
|
return HttpResponse('Payload Too Large', status=413)
|
|
body = request.body
|
|
if len(body) != content_length:
|
|
return HttpResponseBadRequest("Content-Length header doesn't match content")
|
|
|
|
# Validate the request
|
|
mac = hmac.new(hmac_secret, body, hashlib.sha256)
|
|
req_hmac = request.headers.get('X-Webhook-HMAC', '')
|
|
our_hmac = mac.hexdigest()
|
|
if not hmac.compare_digest(req_hmac, our_hmac):
|
|
logger.info(f'Invalid HMAC {req_hmac}, expected {our_hmac}')
|
|
return HttpResponseBadRequest('Invalid HMAC')
|
|
|
|
try:
|
|
payload = json.loads(body)
|
|
# TODO(anna) validate the payload
|
|
logger.info('payload: %s', payload)
|
|
except json.JSONDecodeError:
|
|
logger.exception('Malformed JSON received')
|
|
return HttpResponseBadRequest('Malformed JSON')
|
|
|
|
oauth_user_id = payload['id']
|
|
try:
|
|
bid.get_oauth_user_info(oauth_user_id)
|
|
except ObjectDoesNotExist:
|
|
logger.warning(f'Skipping user-modified: no OAuth info found for ID {oauth_user_id}')
|
|
return HttpResponse(status=204)
|
|
|
|
handle_user_modified(payload)
|
|
|
|
return HttpResponse(status=204)
|
|
|
|
|
|
@background()
|
|
def handle_user_modified(payload: Dict[Any, Any]) -> None:
|
|
"""Handle payload of a user modified webhook, updating User when necessary."""
|
|
oauth_user_id = payload['id']
|
|
try:
|
|
oauth_user_info = bid.get_oauth_user_info(oauth_user_id)
|
|
except ObjectDoesNotExist:
|
|
logger.warning(f'Cannot update user: no OAuth info found for ID {oauth_user_id}')
|
|
return
|
|
|
|
user = oauth_user_info.user
|
|
args = [oauth_user_id, user.pk]
|
|
|
|
if payload.get('date_deletion_requested'):
|
|
user.request_deletion(payload['date_deletion_requested'])
|
|
return
|
|
|
|
for _from, _to in (('email', 'email'), ('nickname', 'username')):
|
|
if payload[_from] == getattr(user, _to):
|
|
continue
|
|
try:
|
|
setattr(user, _to, payload[_from])
|
|
user.save(update_fields=[_to])
|
|
except IntegrityError:
|
|
logger.exception('Unable to update %s for OAuth ID=%s pk=%s', _to, *args)
|
|
|
|
update_fields = set()
|
|
if payload['full_name'] != user.full_name:
|
|
user.full_name = payload['full_name']
|
|
update_fields.add('full_name')
|
|
|
|
if 'confirmed_email_at' in payload:
|
|
user.confirmed_email_at = parse_datetime(payload.get('confirmed_email_at') or '')
|
|
update_fields.add('confirmed_email_at')
|
|
|
|
if update_fields:
|
|
user.save(update_fields=update_fields)
|
|
|
|
if payload.get('avatar_changed') or not user.image:
|
|
bid.copy_image_from_avatar_url(user=user, avatar_url=payload['avatar_url'])
|
|
|
|
# Sync roles to groups
|
|
group_names = payload.get('roles') or []
|
|
set_groups_from_roles(user, group_names=group_names)
|
|
|
|
# Attempt to update the badges
|
|
bid.copy_badges_from_blender_id(user=user)
|