extensions-website/users/blender_id.py

191 lines
7.8 KiB
Python

"""Implement integration with Blender ID: handling of OAuth2 session, fetching user info etc."""
from typing import Dict, Any, Tuple
from requests_oauthlib import OAuth2Session
from urllib.parse import urljoin, urlparse
import io
import logging
import pathlib
from django.conf import settings
import requests
import requests.exceptions
from blender_id_oauth_client.models import OAuthUserInfo, OAuthToken
from blender_id_oauth_client.views import blender_id_oauth_settings, ClientSettings
logger = logging.getLogger(__name__)
class BIDMissingAccessToken(Exception):
"""Raise when user accesss token is not found in DB."""
class BIDSession:
"""Wrap up interactions with Blender ID, such as fetching user info and avatar."""
_anonymous_session = None
_badger_api_session = None
def __init__(self):
"""Initialise Blender ID client settings."""
self.settings = blender_id_oauth_settings()
def _make_session(self, access_token: str = None) -> OAuth2Session:
"""Return a new OAuth2 session, optionally authenticated with an access token."""
if access_token:
return OAuth2Session(self.settings.client, token={'access_token': access_token})
return OAuth2Session(self.settings.client)
@property
def badger_oauth_settings(self) -> ClientSettings:
"""Container for badger API OAuth Client settings."""
return ClientSettings(
client=settings.BLENDER_ID['BADGER_API_OAUTH_CLIENT'],
secret=settings.BLENDER_ID['BADGER_API_OAUTH_SECRET'],
url_base=self.settings.url_base,
url_authorize=self.settings.url_authorize,
url_token=self.settings.url_token,
url_userinfo=self.settings.url_userinfo,
url_logout=self.settings.url_logout,
)
@property
def session(self):
"""
Return a reusable "anonymous" OAuth2Session for fetching avatars from Blender ID.
Create it the first time this property is accessed.
"""
if not self._anonymous_session:
self._anonymous_session = self._make_session()
return self._anonymous_session
@property
def badger_api_session(self):
"""
Return a reusable OAuth2Session for granting/revoking roles with Blender ID API.
Create it the first time this property is accessed.
"""
if not self._badger_api_session:
token = settings.BLENDER_ID['BADGER_API_ACCESS_TOKEN']
self._badger_api_session = OAuth2Session(
self.badger_oauth_settings.client, token={'access_token': token}
)
return self._badger_api_session
@classmethod
def get_oauth_user_info(cls, oauth_user_id: int) -> OAuthUserInfo:
"""Return OAuthUserInfo record for a given Blender ID.
Used primarily to look up our own user ID associated with an external Blender ID,
for example in the user modified webhook.
"""
return (
OAuthUserInfo.objects.select_related('user')
.prefetch_related('user__groups')
.get(oauth_user_id=oauth_user_id)
)
@classmethod
def get_oauth_token(cls, oauth_user_id: int) -> OAuthToken:
"""Return OAuthToken for a given ID to be used in authenticated requests to Blender ID."""
return OAuthToken.objects.filter(oauth_user_id=oauth_user_id).last()
def get_badges_url(self, oauth_user_id: int) -> str:
"""Return a Blender ID URL to the avatar for a given OAuth ID."""
return urljoin(self.settings.url_base, f'api/badges/{oauth_user_id}')
def get_check_user_by_email_url(self, email: str) -> str:
"""Return a Blender ID URL for checking existence of a record with a given email."""
return urljoin(self.settings.url_base, f'api/check-user/{email}')
def get_badger_api_url(self, action: str, role: str, oauth_user_id: int) -> str:
"""Return a Blender ID API URL for granting/revoking roles."""
assert action in ('grant', 'revoke'), f'{action} is not a known Blender ID API action'
assert role in (
'cloud_subscriber',
'cloud_has_subscription',
'sprite_fright',
'charge',
), f'{role} is not a known Blender ID badge'
return urljoin(self.settings.url_base, f'api/badger/{action}/{role}/{oauth_user_id}')
def download_avatar_url(self, avatar_url: str) -> Tuple[str, io.BytesIO]:
"""Download an avatar from a given URL."""
resp = self.session.get(avatar_url)
resp.raise_for_status()
name = pathlib.Path(urlparse(resp.url).path).name
return name, io.BytesIO(resp.content)
def get_badges(self, oauth_user_id: int) -> Dict[str, Any]:
"""Retrieve badges from Blender ID service using a user-specific OAuth2 session."""
token = self.get_oauth_token(oauth_user_id)
if not token:
raise BIDMissingAccessToken(f'No access token found for {oauth_user_id}')
session = self._make_session(access_token=token.access_token)
resp = session.get(self.get_badges_url(oauth_user_id))
resp.raise_for_status()
badges = resp.json().get('badges', {})
assert isinstance(badges, dict)
return badges
def copy_image_from_avatar_url(self, user, avatar_url):
"""Fetch Blender ID avatar and save it to our storage."""
try:
clear_existing_image = avatar_url is None
if user.image:
# Delete the previous file
user.image.delete(save=clear_existing_image)
if avatar_url:
name, content = self.download_avatar_url(avatar_url)
user.image.save(name, content, save=True)
logger.info('Profile image updated for pk=%s', user.pk)
except requests.HTTPError:
logger.warning('Failed to retrieve an image for pk=%s', user.pk)
except Exception:
logger.exception(f'Failed to copy an image for pk={user.pk}')
def copy_badges_from_blender_id(self, user):
"""
Attempt to retrieve badges from Blender ID and save them in the user record.
If either OAuth info or Blender ID service isn't available, log an error and return.
"""
if not hasattr(user, 'oauth_info'):
logger.warning(f'Cannot copy badges from Blender ID: {user} is missing OAuth info')
return
oauth_info = user.oauth_info
try:
badges = self.get_badges(oauth_info.oauth_user_id)
if badges:
user.badges = badges
user.save(update_fields=['badges'])
logger.info(f'Badges updated for {user}')
except requests.HTTPError:
logger.warning(f'Failed to retrieve badges of {user} from Blender ID')
except BIDMissingAccessToken:
logger.warning(f'Unable to retrieve badges for {user}: no access token')
except Exception:
logger.exception(f'Failed to copy badges for {user}')
def grant_revoke_role(self, user, action: str, role: str) -> None:
"""Grant or revoke a given role (badge) to/from a user with a given OAuth ID."""
if not hasattr(user, 'oauth_info'):
logger.warning('Cannot %s Blender ID %s: %s is missing OAuth info', action, role, user)
return
oauth_info = user.oauth_info
oauth_user_id = oauth_info.oauth_user_id
url = self.get_badger_api_url(action=action, role=role, oauth_user_id=oauth_user_id)
resp = self.badger_api_session.post(url)
resp.raise_for_status()
def check_user_by_email(self, email: str) -> Dict[str, str]:
"""Check if Blender ID with a given email exists."""
url = self.get_check_user_by_email_url(email=email)
resp = self.badger_api_session.get(url)
resp.raise_for_status()
return resp.json()