Run submission in a separate thread, and more explicit state

Also more explicit timeouts and an overall better handling of errors.
This commit is contained in:
2018-08-14 16:36:43 +02:00
parent 5d86b87f40
commit b64577df2e
8 changed files with 191 additions and 66 deletions

View File

@@ -346,13 +346,22 @@ class BENCHMARK_PT_main(Panel):
sub.separator()
sub = col.row()
sub.enabled = not G.results_submitted
sub.enabled = G.state != G.State.submitting
sub.scale_y = 2.25
if G.submission_exception:
text = "Retry Submission"
text = "SHARE ONLINE"
if G.results_submitted and G.state != G.State.submitting:
if G.results_url:
# If we have a results URL, open it upon clicking the button
sub.operator("wm.url_open", text="Shared!").url = G.results_url
else:
sub.enabled = False
sub.operator("benchmark.share", text=text)
else:
text = "SHARE ONLINE"
sub.operator("benchmark.share", text=text)
if G.state == G.State.submitting:
text = "Submitting..."
elif G.submission_exception:
text = "Retry Submission"
sub.operator("benchmark.share", text=text)
sub = col.row()
subsub = sub.split()
@@ -368,18 +377,17 @@ class BENCHMARK_PT_main(Panel):
split.label()
def draw(self, context):
screen_index = 0
with G.progress_lock:
if G.result_dict:
screen_index = 2
elif G.result_stats or G.progress_status:
screen_index = 1
state = G.state
if screen_index == 0:
self.draw_welcome(context)
elif screen_index == 2:
self.draw_submit(context)
draw_funcs = {
G.State.welcome: self.draw_welcome,
G.State.complete: self.draw_submit,
G.State.submitting: self.draw_submit,
}
func = draw_funcs.get(state, None)
if func:
func(context)
################################################################################
@@ -488,8 +496,10 @@ class BENCHMARK_OT_run_base(bpy.types.Operator):
else:
G.result_stats += "{}: {}".format(name_stat['name'],
stat["result"])
G.state = G.State.complete
else:
G.result_stats = ""
G.state = G.State.welcome
# TOGO(sergey): Use some more nice picture for the final slide.
G.background_image_path = ""
# Tag for nice redraw
@@ -513,9 +523,11 @@ class BENCHMARK_OT_run_base(bpy.types.Operator):
return {'PASS_THROUGH'}
def invoke(self, context, event):
G.cancel = False
G.result_platform = ""
G.progress_status = "Initializing..."
with G.progress_lock:
G.cancel = False
G.result_platform = ""
G.progress_status = "Initializing..."
G.state = G.State.running
context.area.tag_redraw()
compute_device = context.scene.compute_device
@@ -630,30 +642,44 @@ class BENCHMARK_OT_share(bpy.types.Operator):
bl_idname = "benchmark.share"
bl_label = "Share Benchmark Result"
def execute(self, context):
timer = None
thread = None
def modal(self, context, event):
if event.type == 'TIMER':
if self.thread.is_alive():
context.area.tag_redraw()
return {'PASS_THROUGH'}
else:
self.done(context)
return {'FINISHED'}
return {'PASS_THROUGH'}
def invoke(self, context, event):
from benchmark import submission
make_buttons_default()
print('Submitting benchmark')
G.submission_exception = None
try:
submission.submit_benchmark(G.result_dict)
except submission.CommunicationError as cex:
logger.ERROR(f'Error {cex.status_code} submitting benchmark: {cex.message}')
if cex.json:
logger.ERROR(f'Response JSON: {cex.json}')
else:
logger.ERROR(f'Response body: {cex.body}')
G.submission_exception = cex
return {'CANCELLED'}
except Exception as ex:
logger.ERROR(f'error submitting benchmark: {ex}')
G.submission_exception = ex
return {'CANCELLED'}
print('Submission done')
self.thread = submission.submit_benchmark_bgthread(G.result_dict)
# Create timer to query thread status
wm = context.window_manager
self.timer = wm.event_timer_add(0.1, context.window)
# Register self as modal.
context.window_manager.modal_handler_add(self)
return {'RUNNING_MODAL'}
def done(self, context):
make_buttons_green()
G.results_submitted = True
return {'FINISHED'}
if self.timer:
wm = context.window_manager
wm.event_timer_remove(self.timer)
if self.thread:
self.thread.join()
context.area.tag_redraw()
class BENCHMARK_OT_opendata_link(bpy.types.Operator):

View File

@@ -5,7 +5,7 @@ import blf
import bpy
from ..foundation import util
from ..submission.client import CommunicationError
from ..submission import exceptions
from . import G
@@ -231,8 +231,16 @@ def _after_submission_text() -> str:
return f'Unable to connect to the Open Data platform. ' \
f'Please check your internet connection and try again.'
if isinstance(ex, requests.exceptions.Timeout):
return f'There was a timeout communicating with the Open Data platform. ' \
f'Please check your internet connection and try again.'
if isinstance(ex, exceptions.TokenTimeoutError):
return f'There was a timeout waiting for a Client Authentication token. ' \
f'This is fine, just try again.'
# If not our own exception class, show generic message.
if not isinstance(ex, CommunicationError):
if not isinstance(ex, exceptions.CommunicationError):
return f'Error submitting your results: {ex}'
# Return proper message based on the HTTP status code of the response.

View File

@@ -1,9 +1,18 @@
import enum
import threading
import typing
class G:
"""Global state of the Benchmark Client."""
class State(enum.Enum):
welcome = 1
running = 2
complete = 3
submitting = 4
state = State.welcome
result_platform = ''
progress_status = ''
result_stats = ''
@@ -15,6 +24,7 @@ class G:
cached_system_info = {}
cached_compute_devices = []
results_submitted = False
results_url = ''
images = {}
current_progress = 0.0
@@ -25,6 +35,7 @@ class G:
@classmethod
def reset(cls):
"""Reset the global state."""
cls.state = G.State.welcome
cls.result_platform = ''
cls.progress_status = ''
cls.result_stats = ''
@@ -32,4 +43,5 @@ class G:
cls.background_image_path = ""
cls.scene_status = {}
cls.results_submitted = False
cls.results_url = ''
cls.submission_exception = None

View File

@@ -1,4 +1,58 @@
from .client import CommunicationError
import logging
import threading
from ..space import G
from . import exceptions
log = logging.getLogger(__name__)
def submit_benchmark_bgthread(benchmark_data: dict) -> threading.Thread:
"""Submit benchmark data in a background thread.
This will update G.xxx to reflect the state of the submission.
"""
thread = threading.Thread(target=_submit_and_update_g, args=(benchmark_data,))
thread.start()
return thread
def _submit_and_update_g(benchmark_data: dict) -> None:
with G.progress_lock:
G.submission_exception = None
G.state = G.State.submitting
def set_exception(exception):
with G.progress_lock:
G.state = G.State.complete
G.submission_exception = exception
try:
submit_benchmark(benchmark_data)
except exceptions.CommunicationError as ex:
log.error('Error %d submitting benchmark: %s', ex.status_code, ex.message)
if ex.json:
log.error('Response JSON:', ex.json)
else:
log.error('Response body: %s', ex.body)
set_exception(ex)
return
except exceptions.TokenTimeoutError as ex:
log.warning('Timeout waiting for a client token. Just try submitting again.')
set_exception(ex)
return
except Exception as ex:
log.error('error submitting benchmark: %s', ex)
set_exception(ex)
return
with G.progress_lock:
G.state = G.State.complete
G.results_submitted = True
def submit_benchmark(benchmark_data: dict):
@@ -22,11 +76,16 @@ def submit_benchmark(benchmark_data: dict):
bc = BenchmarkClient(mydata_url)
# Make sure we have a token; can start the browser to get one.
bc.load_auth_token()
token = bc.load_auth_token()
if not token:
raise exceptions.TokenTimeoutError()
result = bc.submit_benchmark(benchmark_data)
print(result)
# If we get a location from the MyData server, show it in a browser.
if result.location:
with G.progress_lock:
G.results_url = result.location
import webbrowser
webbrowser.open_new_tab(result.location)

View File

@@ -50,7 +50,7 @@ class TokenHTTPServer(http.server.HTTPServer):
self.log.debug('Finding free port starting at %s', local_addr)
return sockutil.find_free_port(local_addr)
def wait_for_token(self, timeout=None):
def wait_for_token(self, timeout: float):
"""Starts the HTTP server, waits for the Token."""
if self.auth_token is None:

View File

@@ -7,27 +7,11 @@ import urllib.parse
import requests
from . import timeouts, exceptions
log = logging.getLogger(__name__)
class CommunicationError(requests.exceptions.BaseHTTPError):
"""Raised when we get an invalid status code form the MyData server."""
def __init__(self, message: str, response: requests.Response):
self.message = message
self.status_code = response.status_code
self.body = response.text
if response.headers.get('Content-Type', '') == 'application/json':
self.json = response.json()
else:
self.json = None
def __str__(self):
return f'{self.message}; ' \
f'status_code={self.status_code}; json={self.json}; body={self.body}'
class SubmissionResult:
"""Metadata of the submitted benchmark.
@@ -46,7 +30,6 @@ class SubmissionResult:
class BenchmarkClient:
default_timeout = 30 # seconds
def __init__(self, mydata_server: str) -> None:
from requests.adapters import HTTPAdapter
@@ -126,7 +109,7 @@ class BenchmarkClient:
log.debug('validating token at %s', self.url_verify_token)
resp = self.session.get(self.url_verify_token,
headers={'Authorization': f'Bearer {self.auth_token}'},
timeout=self.default_timeout)
timeout=timeouts.verify)
token_ok = resp.status_code in {200, 204}
if not token_ok:
log.info('Client token is no longer valid, will obtain another one.')
@@ -158,7 +141,7 @@ class BenchmarkClient:
if not webbrowser.open_new_tab(url):
raise SystemError(f'Unable to open a browser to visit {url}')
self.auth_token = self.auth_http_server.wait_for_token()
self.auth_token = self.auth_http_server.wait_for_token(timeout=timeouts.wait_for_token)
self._stop_http_server()
if self.auth_token:
@@ -179,10 +162,10 @@ class BenchmarkClient:
resp = self.session.post(self.url_submit,
json=payload,
headers={'Authorization': f'Bearer {self.auth_token}'},
timeout=self.default_timeout)
timeout=timeouts.submit)
if resp.status_code != 201:
log.error('Bad status code %d received: %s', resp.status_code, resp.text)
raise CommunicationError(f'Bad status code received', resp)
raise exceptions.CommunicationError(f'Bad status code received', resp)
result = resp.json()
return SubmissionResult(

View File

@@ -0,0 +1,23 @@
import requests
class CommunicationError(requests.exceptions.BaseHTTPError):
"""Raised when we get an invalid status code form the MyData server."""
def __init__(self, message: str, response: requests.Response):
self.message = message
self.status_code = response.status_code
self.body = response.text
if response.headers.get('Content-Type', '') == 'application/json':
self.json = response.json()
else:
self.json = None
def __str__(self):
return f'{self.message}; ' \
f'status_code={self.status_code}; json={self.json}; body={self.body}'
class TokenTimeoutError(Exception):
"""Raised when there was a timeout waiting for a client token."""

View File

@@ -0,0 +1,14 @@
"""Timeouts for HTTP traffic, all in seconds."""
submit = 30
verify = 10
# This one is tricky, as the user may need to take the time to register a new
# Blender ID (which includes confirmation of their email address). We should
# not show too scary messages when it comes to timeout errors when waiting for
# a token, and just expect it to time out when a new Blender ID account is
# registered.
#
# It should be long enough for a normal flow, though, as after this timeout
# the temp HTTP server on localhost:$RANDOM is down again.
wait_for_token = 15