This repository has been archived on 2023-02-09. You can view files and clone it, but cannot push or open issues or pull requests.
Files
blender-benchmark-bundle/benchmark/submission/client.py
Sybren A. Stüvel 0eea078802 Include version in HTTP User-Agent header
This allows us to check on the server side whether the user is still using
an up-to-date benchmark client or not.
2018-08-14 17:20:13 +02:00

177 lines
6.0 KiB
Python

import functools
import json
import logging
import typing
import pathlib
import urllib.parse
import requests
from . import timeouts, exceptions
log = logging.getLogger(__name__)
class SubmissionResult:
"""Metadata of the submitted benchmark.
:ivar benchmark_id: the ID string of the benchmark; always set.
:ivar location: an URL where the benchmark can be viewed & managed; may be the empty string
if not known.
"""
def __init__(self, benchmark_id: str, location: str):
assert benchmark_id
self.benchmark_id = benchmark_id
self.location = location
def __repr__(self):
return f'<SubmissionResult(benchmark_id={self.benchmark_id!r} location={self.location!r})>'
class BenchmarkClient:
def __init__(self, mydata_server: str) -> None:
from requests.adapters import HTTPAdapter
from ..version import version
self.auth_token = None
self.auth_http_server = None
self.session = requests.Session()
self.session.mount('https://', HTTPAdapter(max_retries=5))
self.session.headers['User-Agent'] = f'blender-benchmark-client/{version}'
self.url_generate_token = urllib.parse.urljoin(mydata_server, 'token/generate')
self.url_verify_token = urllib.parse.urljoin(mydata_server, 'token/verify')
self.url_submit = urllib.parse.urljoin(mydata_server, 'benchmarks/submit')
self.log = log.getChild('BenchmarkClient')
@functools.lru_cache(maxsize=1)
def _token_storage_path(self) -> pathlib.Path:
"""Determine storage location for the auth token."""
from . import appdirs
data_dir = appdirs.user_config_dir('blender-benchmark-client', 'Blender Foundation')
token_path = pathlib.Path(data_dir) / 'token.json'
self.log.debug('Tokens are stored in %s', token_path)
return token_path
def _load_token(self) -> typing.Optional[str]:
"""Load the token, return None when non-existant."""
tpath = self._token_storage_path()
self.log.info('Loading token from %s', tpath)
if not tpath.exists():
self.log.info('Token file does not exist')
return None
try:
with tpath.open('rb') as infile:
tokeninfo = json.load(infile)
except (IOError, OSError):
self.log.exception('Error reading token file %s', tpath)
return None
except json.JSONDecodeError:
self.log.exception('Malformed token file %s', tpath)
return None
return tokeninfo['token']
def _save_token(self):
"""Save the token to disk.
The token is stored as very simple JSON document, so that it's easy to
extend later (for example with expiry information).
"""
tpath = self._token_storage_path()
self.log.info('Saving token to %s', tpath)
tpath.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
with tpath.open('w') as outfile:
json.dump({'token': self.auth_token}, outfile)
def _start_http_server(self):
"""Starts the HTTP server, if it wasn't started already."""
from . import auth
if self.auth_http_server is not None:
return
self.auth_http_server = auth.TokenHTTPServer()
def _stop_http_server(self):
"""Stops the HTTP server, if one was started."""
if self.auth_http_server is None:
return
self.auth_http_server = None
def _verify_token(self) -> bool:
"""Verify the token with MyData, return True iff still valid."""
log.debug('validating token at %s', self.url_verify_token)
resp = self.session.get(self.url_verify_token,
headers={'Authorization': f'Bearer {self.auth_token}'},
timeout=timeouts.verify)
token_ok = resp.status_code in {200, 204}
if not token_ok:
log.info('Client token is no longer valid, will obtain another one.')
return token_ok
def load_auth_token(self) -> typing.Optional[str]:
"""Load & verify the token, start browser to get new one if needed."""
self.auth_token = self._load_token()
if self.auth_token and self._verify_token():
return self.auth_token
self.auth_via_browser()
if not self.auth_token:
self.log.error('Unable to get token')
return None
return self.auth_token
def auth_via_browser(self):
"""Open the webbrowser to request an auth token."""
import webbrowser
self._start_http_server()
params = urllib.parse.urlencode({'auth_callback': self.auth_http_server.auth_callback_url})
url = f"{self.url_generate_token}?{params}"
if not webbrowser.open_new_tab(url):
raise SystemError(f'Unable to open a browser to visit {url}')
self.auth_token = self.auth_http_server.wait_for_token(timeout=timeouts.wait_for_token)
self._stop_http_server()
if self.auth_token:
self._save_token()
def submit_benchmark(self, benchmark_data: dict) -> SubmissionResult:
"""Submit the benchmark to MyData.
:return: Metadata of the benchmark.
"""
payload = {
'data': benchmark_data,
'schema_version': 0,
}
log.info('Submitting benchmark to %s:\n%s', self.url_submit,
json.dumps(payload, sort_keys=True, indent=4))
resp = self.session.post(self.url_submit,
json=payload,
headers={'Authorization': f'Bearer {self.auth_token}'},
timeout=timeouts.submit)
if resp.status_code != 201:
log.debug('Bad status code %d received: %s', resp.status_code, resp.text)
raise exceptions.CommunicationError(f'Bad status code received', resp)
result = resp.json()
return SubmissionResult(
result['benchmark_id'],
resp.headers.get('Location') or ''
)