blender-studio/users/blender_id.py

248 lines
10 KiB
Python

"""Implement integration with Blender ID: handling of OAuth2 session, fetching user info etc."""
from typing import Dict, Any, Tuple
from requests_oauthlib import OAuth2Session
from urllib.parse import urljoin, urlparse
import io
import logging
import pathlib
from django.conf import settings
import botocore
import requests
import requests.exceptions
from blender_id_oauth_client.models import OAuthUserInfo, OAuthToken
from blender_id_oauth_client.views import blender_id_oauth_settings, ClientSettings
logger = logging.getLogger(__name__)
class BIDMissingAccessToken(Exception):
"""Raise when user accesss token is not found in DB."""
class BIDSession:
"""Wrap up interactions with Blender ID, such as fetching user info and avatar."""
_anonymous_session = None
_badger_api_session = None
def __init__(self):
"""Initialise Blender ID client settings."""
self.settings = blender_id_oauth_settings()
def _make_session(self, access_token: str = None) -> OAuth2Session:
"""Return a new OAuth2 session, optionally authenticated with an access token."""
if access_token:
return OAuth2Session(self.settings.client, token={'access_token': access_token})
return OAuth2Session(self.settings.client)
@property
def badger_oauth_settings(self) -> ClientSettings:
"""Container for badger API OAuth Client settings."""
return ClientSettings(
client=settings.BLENDER_ID['BADGER_API_OAUTH_CLIENT'],
secret=settings.BLENDER_ID['BADGER_API_OAUTH_SECRET'],
url_base=self.settings.url_base,
url_authorize=self.settings.url_authorize,
url_token=self.settings.url_token,
url_userinfo=self.settings.url_userinfo,
url_logout=self.settings.url_logout,
)
@property
def session(self):
"""
Return a reusable "anonymous" OAuth2Session for fetching avatars from Blender ID.
Create it the first time this property is accessed.
"""
if not self._anonymous_session:
self._anonymous_session = self._make_session()
return self._anonymous_session
@property
def badger_api_session(self):
"""
Return a reusable OAuth2Session for granting/revoking roles with Blender ID API.
Create it the first time this property is accessed.
"""
if not self._badger_api_session:
token = settings.BLENDER_ID['BADGER_API_ACCESS_TOKEN']
self._badger_api_session = OAuth2Session(
self.badger_oauth_settings.client, token={'access_token': token}
)
return self._badger_api_session
@classmethod
def get_oauth_user_info(cls, oauth_user_id: int) -> OAuthUserInfo:
"""Return OAuthUserInfo record for a given Blender ID.
Used primarily to look up our own user ID associated with an external Blender ID,
for example in the user modified webhook.
"""
return (
OAuthUserInfo.objects.select_related('user')
.prefetch_related('user__groups')
.get(oauth_user_id=oauth_user_id)
)
@classmethod
def get_oauth_token(cls, oauth_user_id: int) -> OAuthToken:
"""Return OAuthToken for a given ID to be used in authenticated requests to Blender ID."""
return OAuthToken.objects.filter(oauth_user_id=oauth_user_id).last()
def get_user_info(self, oauth_user_id: int) -> Dict[str, Any]:
"""Retrieve user info from Blender ID service using a user-specific OAuth2 session.
User info is returned in the following format:
{
"id": 2,
"full_name": "Jane Doe",
"email": "jane@example.com",
"nickname": "janedoe",
"roles": {"dev_core": True},
}
"""
token = self.get_oauth_token(oauth_user_id)
if not token:
raise BIDMissingAccessToken(f'No access token found for {oauth_user_id}')
session = self._make_session(access_token=token.access_token)
resp = session.get(self.settings.url_userinfo)
resp.raise_for_status()
payload = resp.json()
assert isinstance(payload, dict)
return payload
def get_avatar_url(self, oauth_user_id: int) -> str:
"""Return a Blender ID URL to the avatar for a given OAuth ID."""
return urljoin(self.settings.url_base, f'api/user/{oauth_user_id}/avatar')
def get_badges_url(self, oauth_user_id: int) -> str:
"""Return a Blender ID URL to the avatar for a given OAuth ID."""
return urljoin(self.settings.url_base, f'api/badges/{oauth_user_id}')
def get_check_user_by_email_url(self, email: str) -> str:
"""Return a Blender ID URL for checking existence of a record with a given email."""
return urljoin(self.settings.url_base, f'api/check-user/{email}')
def get_badger_api_url(self, action: str, role: str, oauth_user_id: int) -> str:
"""Return a Blender ID API URL for granting/revoking roles."""
assert action in ('grant', 'revoke'), f'{action} is not a known Blender ID API action'
assert role in (
'cloud_subscriber',
'cloud_has_subscription',
'sprite_fright',
'charge',
), f'{role} is not a known Blender ID badge'
return urljoin(self.settings.url_base, f'api/badger/{action}/{role}/{oauth_user_id}')
def get_avatar(self, oauth_user_id: int) -> Tuple[str, io.BytesIO]:
"""Retrieve an avatar from Blender ID service using an OAuth2 session.
Return file name and content of an avatar for the given 'oauth_user_id'.
"""
resp = self.session.get(self.get_avatar_url(oauth_user_id))
resp.raise_for_status()
name = pathlib.Path(urlparse(resp.url).path).name
return name, io.BytesIO(resp.content)
def get_badges(self, oauth_user_id: int) -> Dict[str, Any]:
"""Retrieve badges from Blender ID service using a user-specific OAuth2 session."""
token = self.get_oauth_token(oauth_user_id)
if not token:
raise BIDMissingAccessToken(f'No access token found for {oauth_user_id}')
session = self._make_session(access_token=token.access_token)
resp = session.get(self.get_badges_url(oauth_user_id))
resp.raise_for_status()
badges = resp.json().get('badges', {})
assert isinstance(badges, dict)
return badges
def copy_avatar_from_blender_id(self, user):
"""
Attempt to retrieve an avatar from Blender ID and save it into our storage.
If either OAuth info or Blender ID service isn't available, log an error and return.
"""
if not hasattr(user, 'oauth_info'):
logger.warning(f'Cannot copy avatar from Blender ID: {user} is missing OAuth info')
return
oauth_info = user.oauth_info
try:
name, content = self.get_avatar(oauth_info.oauth_user_id)
if user.image:
# Delete the previous file
user.image.delete(save=False)
user.image.save(name, content, save=True)
logger.info(f'Profile image updated for {user}')
except requests.HTTPError:
logger.warning(f'Failed to retrieve an image for {user} from Blender ID')
except botocore.exceptions.BotoCoreError:
logger.exception(f'Failed to store an image for {user}')
except Exception:
logger.exception(f'Failed to copy an image for {user}')
def update_username(self, user, oauth_user_id):
"""Update username of a given user, fetching it from Blender ID.
FIXME(anna): webhook payload doesn't include username, hence this separate method.
"""
try:
user_info = self.get_user_info(oauth_user_id)
if user_info['nickname'] != user.username:
# TODO(anna) handle duplicate usernames
user.username = user_info['nickname']
user.save(update_fields=['username'])
except BIDMissingAccessToken:
logger.warning(f'Unable to retrieve username for {user}: no access token')
except requests.exceptions.HTTPError:
logger.warning(f'Unable to update username for {user}: HTTPError')
except Exception:
logger.exception(f'Unable to update username for {user}')
def copy_badges_from_blender_id(self, user):
"""
Attempt to retrieve badges from Blender ID and save them in the user record.
If either OAuth info or Blender ID service isn't available, log an error and return.
"""
if not hasattr(user, 'oauth_info'):
logger.warning(f'Cannot copy badges from Blender ID: {user} is missing OAuth info')
return
oauth_info = user.oauth_info
try:
badges = self.get_badges(oauth_info.oauth_user_id)
if badges:
user.badges = badges
user.save(update_fields=['badges'])
logger.info(f'Badges updated for {user}')
except requests.HTTPError:
logger.warning(f'Failed to retrieve badges of {user} from Blender ID')
except BIDMissingAccessToken:
logger.warning(f'Unable to retrieve badges for {user}: no access token')
except Exception:
logger.exception(f'Failed to copy an image for {user}')
def grant_revoke_role(self, user, action: str, role: str) -> None:
"""Grant or revoke a given role (badge) to/from a user with a given OAuth ID."""
if not hasattr(user, 'oauth_info'):
logger.warning('Cannot %s Blender ID %s: %s is missing OAuth info', action, role, user)
return
oauth_info = user.oauth_info
oauth_user_id = oauth_info.oauth_user_id
url = self.get_badger_api_url(action=action, role=role, oauth_user_id=oauth_user_id)
resp = self.badger_api_session.post(url)
resp.raise_for_status()
def check_user_by_email(self, email: str) -> Dict[str, str]:
"""Check if Blender ID with a given email exists."""
url = self.get_check_user_by_email_url(email=email)
resp = self.badger_api_session.get(url)
resp.raise_for_status()
return resp.json()