blender-addons/blender_id/communication.py
2023-07-05 09:41:03 +02:00

294 lines
9.8 KiB
Python

# SPDX-License-Identifier: GPL-2.0-or-later
import functools
import logging
import typing
log = logging.getLogger(__name__)
# Can be overridden by setting the environment variable BLENDER_ID_ENDPOINT.
BLENDER_ID_ENDPOINT = 'https://id.blender.org/'
# Will become a requests.Session at the first request to Blender ID.
requests_session = None
# Request timeout, in seconds.
REQUESTS_TIMEOUT = 5.0
class BlenderIdCommError(RuntimeError):
"""Raised when there was an error communicating with Blender ID"""
class AuthResult:
def __init__(self, *, success: bool,
user_id: str=None, token: str=None, expires: str=None,
error_message: typing.Any=None): # when success=False
self.success = success
self.user_id = user_id
self.token = token
self.error_message = str(error_message)
self.expires = expires
@functools.lru_cache(maxsize=None)
def host_label():
import socket
return 'Blender running on %r' % socket.gethostname()
def blender_id_session():
"""Returns the Requests session, creating it if necessary."""
global requests_session
import requests.adapters
if requests_session is not None:
return requests_session
requests_session = requests.session()
# Retry with backoff factor, so that a restart of Blender ID or hickup
# in the connection doesn't immediately fail the request.
retries = requests.packages.urllib3.util.retry.Retry(
total=5,
backoff_factor=0.05,
)
http_adapter = requests.adapters.HTTPAdapter(max_retries=retries)
requests_session.mount('https://', http_adapter)
requests_session.mount('http://', http_adapter)
# Construct the User-Agent header with Blender and add-on versions.
try:
import bpy
except ImportError:
blender_version = 'unknown'
else:
blender_version = '.'.join(str(component) for component in bpy.app.version)
from blender_id import bl_info
addon_version = '.'.join(str(component) for component in bl_info['version'])
requests_session.headers['User-Agent'] = f'Blender/{blender_version} Blender-ID-Addon/{addon_version}'
return requests_session
@functools.lru_cache(maxsize=None)
def blender_id_endpoint(endpoint_path=None):
"""Gets the endpoint for the authentication API. If the BLENDER_ID_ENDPOINT env variable
is defined, it's possible to override the (default) production address.
"""
import os
import urllib.parse
base_url = os.environ.get('BLENDER_ID_ENDPOINT')
if base_url:
log.warning('Using overridden Blender ID url %s', base_url)
else:
base_url = BLENDER_ID_ENDPOINT
log.info('Using standard Blender ID url %s', base_url)
# urljoin() is None-safe for the 2nd parameter.
return urllib.parse.urljoin(base_url, endpoint_path)
def blender_id_server_authenticate(username, password) -> AuthResult:
"""Authenticate the user with the server with a single transaction
containing username and password (must happen via HTTPS).
If the transaction is successful, status will be 'successful' and we
return the user's unique blender id and a token (that will be used to
represent that username and password combination).
If there was a problem, status will be 'fail' and we return an error
message. Problems may be with the connection or wrong user/password.
"""
import requests.exceptions
payload = dict(
username=username,
password=password,
host_label=host_label()
)
url = blender_id_endpoint('u/identify')
session = blender_id_session()
try:
r = session.post(url, data=payload, verify=True, timeout=REQUESTS_TIMEOUT)
except (requests.exceptions.SSLError,
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as e:
msg = 'Exception POSTing to {}: {}'.format(url, e)
print(msg)
return AuthResult(success=False, error_message=msg)
if r.status_code == 200:
resp = r.json()
status = resp['status']
if status == 'success':
return AuthResult(success=True,
user_id=str(resp['data']['user_id']),
token=resp['data']['oauth_token']['access_token'],
expires=resp['data']['oauth_token']['expires'],
)
if status == 'fail':
return AuthResult(success=False, error_message='Username and/or password is incorrect')
return AuthResult(success=False,
error_message='There was a problem communicating with'
' the server. Error code is: %s' % r.status_code)
def blender_id_server_validate(token) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
"""Validate the auth token with the server.
@param token: the authentication token
@type token: str
@returns: tuple (expiry, error).
The expiry is the expiry date of the token if it is valid, else None.
The error is None if the token is valid, or an error message when it's invalid.
"""
import requests.exceptions
url = blender_id_endpoint('u/validate_token')
session = blender_id_session()
try:
r = session.post(url, data={'token': token}, verify=True, timeout=REQUESTS_TIMEOUT)
except requests.exceptions.ConnectionError:
log.exception('error connecting to Blender ID at %s', url)
return None, 'Unable to connect to Blender ID'
except requests.exceptions.RequestException as e:
log.exception('error validating token at %s', url)
return None, str(e)
if r.status_code != 200:
return None, 'Authentication token invalid'
response = r.json()
return response['token_expires'], None
def blender_id_server_logout(user_id, token):
"""Logs out of the Blender ID service by removing the token server-side.
@param user_id: the email address of the user.
@type user_id: str
@param token: the token to remove
@type token: str
@return: {'status': 'fail' or 'success', 'error_message': str}
@rtype: dict
"""
import requests.exceptions
payload = dict(
user_id=user_id,
token=token
)
session = blender_id_session()
try:
r = session.post(blender_id_endpoint('u/delete_token'),
data=payload, verify=True, timeout=REQUESTS_TIMEOUT)
except (requests.exceptions.SSLError,
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as e:
return dict(
status='fail',
error_message=format('There was a problem setting up a connection to '
'the server. Error type is: %s' % type(e).__name__)
)
if r.status_code != 200:
return dict(
status='fail',
error_message=format('There was a problem communicating with'
' the server. Error code is: %s' % r.status_code)
)
resp = r.json()
return dict(
status=resp['status'],
error_message=None
)
def subclient_create_token(auth_token: str, subclient_id: str) -> dict:
"""Creates a subclient-specific authentication token.
:returns: the token along with its expiry timestamp, in a {'scst': 'token',
'expiry': datetime.datetime} dict.
"""
payload = {'subclient_id': subclient_id,
'host_label': host_label()}
r = make_authenticated_call('POST', 'subclients/create_token', auth_token, payload)
if r.status_code == 401:
raise BlenderIdCommError('Your Blender ID login is not valid, try logging in again.')
if r.status_code != 201:
raise BlenderIdCommError('Invalid response, HTTP code %i received' % r.status_code)
resp = r.json()
if resp['status'] != 'success':
raise BlenderIdCommError(resp['message'])
return resp['data']
def make_authenticated_call(method, url, auth_token, data):
"""Makes a HTTP call authenticated with the OAuth token."""
import requests.exceptions
session = blender_id_session()
try:
r = session.request(method,
blender_id_endpoint(url),
data=data,
headers={'Authorization': 'Bearer %s' % auth_token},
verify=True,
timeout=REQUESTS_TIMEOUT)
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as e:
raise BlenderIdCommError(str(e))
return r
def send_token_to_subclient(webservice_endpoint: str, user_id: str,
subclient_token: str, subclient_id: str) -> str:
"""Sends the subclient-specific token to the subclient.
The subclient verifies this token with BlenderID. If it's accepted, the
subclient ensures there is a valid user created server-side. The ID of
that user is returned.
:returns: the user ID at the subclient.
"""
import requests.exceptions
import urllib.parse
url = urllib.parse.urljoin(webservice_endpoint, 'blender_id/store_scst')
session = blender_id_session()
try:
r = session.post(url,
data={'user_id': user_id,
'subclient_id': subclient_id,
'token': subclient_token},
verify=True,
timeout=REQUESTS_TIMEOUT)
r.raise_for_status()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as e:
raise BlenderIdCommError(str(e))
resp = r.json()
if resp['status'] != 'success':
raise BlenderIdCommError('Error sending subclient-specific token to %s, error is: %s'
% (webservice_endpoint, resp))
return resp['subclient_user_id']