mirror of https://github.com/nucypher/nucypher.git
Add special case for WorkerPoolExceptions in WebController to handle response message - ensure WorkerPoolExceptions provide access to relevant data.
parent
8b748dea52
commit
731013aa92
|
@ -35,6 +35,7 @@ from nucypher.control.interfaces import ControlInterface
|
|||
from nucypher.control.specifications.exceptions import SpecificationError
|
||||
from nucypher.exceptions import DevelopmentInstallationRequired
|
||||
from nucypher.network.resources import get_static_resources
|
||||
from nucypher.utilities.concurrency import WorkerPool
|
||||
from nucypher.utilities.logging import Logger, GlobalLoggerSettings
|
||||
|
||||
|
||||
|
@ -350,6 +351,32 @@ class WebController(InterfaceControlServer):
|
|||
#
|
||||
# Unhandled Server Errors
|
||||
#
|
||||
except WorkerPool.WorkerPoolException as e:
|
||||
# special case since WorkerPoolException contain stack traces - not ideal for returning from REST endpoints
|
||||
__exception_code = 500
|
||||
if self.crash_on_error:
|
||||
raise
|
||||
|
||||
if isinstance(e, WorkerPool.TimedOut):
|
||||
message_prefix = f"Execution timed out after {e.timeout}s"
|
||||
else:
|
||||
message_prefix = f"Execution failed - no more values to try"
|
||||
|
||||
# get random failure for context
|
||||
if e.failures:
|
||||
value = list(e.failures)[0]
|
||||
_, exception, _ = e.failures[value]
|
||||
msg = f"{message_prefix} ({len(e.failures)} concurrent failures recorded); " \
|
||||
f"for example, for {value}: {exception}"
|
||||
else:
|
||||
msg = message_prefix
|
||||
|
||||
return self.emitter.exception(
|
||||
e=RuntimeError(msg),
|
||||
log_level='warn',
|
||||
response_code=__exception_code,
|
||||
error_message=WebController._captured_status_codes[__exception_code])
|
||||
|
||||
except Exception as e:
|
||||
__exception_code = 500
|
||||
if self.crash_on_error:
|
||||
|
|
|
@ -101,23 +101,28 @@ class WorkerPool:
|
|||
class WorkerPoolException(Exception):
|
||||
"""Generalized exception class for WorkerPool failures."""
|
||||
def __init__(self, message_prefix: str, failures: Dict):
|
||||
self._failures = failures
|
||||
self.failures = failures
|
||||
|
||||
# craft message
|
||||
msg = message_prefix
|
||||
if self._failures:
|
||||
if self.failures:
|
||||
# Using one random failure
|
||||
# Most probably they're all the same anyway.
|
||||
value = list(self._failures)[0]
|
||||
_, exception, _ = self._failures[value]
|
||||
msg = f"{message_prefix} ({len(self._failures)} failures recorded); " \
|
||||
f"for example, for {value}: {exception}"
|
||||
value = list(self.failures)[0]
|
||||
_, exception, tb = self.failures[value]
|
||||
f = io.StringIO()
|
||||
traceback.print_tb(tb, file=f)
|
||||
traceback_str = f.getvalue()
|
||||
msg = (f"{message_prefix} ({len(self.failures)} failures recorded); "
|
||||
f"for example, for {value}:\n"
|
||||
f"{traceback_str}\n"
|
||||
f"{exception}")
|
||||
super().__init__(msg)
|
||||
|
||||
def get_tracebacks(self) -> Dict[Any, str]:
|
||||
"""Returns values and associated tracebacks of execution failures."""
|
||||
exc_tracebacks = {}
|
||||
for value, exc_info in self._failures.items():
|
||||
for value, exc_info in self.failures.items():
|
||||
_, exception, tb = exc_info
|
||||
f = io.StringIO()
|
||||
traceback.print_tb(tb, file=f)
|
||||
|
@ -128,6 +133,7 @@ class WorkerPool:
|
|||
class TimedOut(WorkerPoolException):
|
||||
"""Raised if waiting for the target number of successes timed out."""
|
||||
def __init__(self, timeout: float, *args, **kwargs):
|
||||
self.timeout = timeout
|
||||
super().__init__(message_prefix=f"Execution timed out after {timeout}s",
|
||||
*args, **kwargs)
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
"""
|
||||
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
from typing import Iterable, Tuple
|
||||
|
||||
|
@ -173,7 +172,7 @@ def test_wait_for_successes_out_of_values(join_worker_pool):
|
|||
|
||||
# This will be the last line in the displayed traceback;
|
||||
# That's where the worker actually failed. (Worker for {value} failed)
|
||||
assert re.search('for example, for .*: Worker for .* failed', message)
|
||||
assert 'raise Exception(f"Worker for {value} failed")' in message
|
||||
|
||||
|
||||
def test_wait_for_successes_timed_out(join_worker_pool):
|
||||
|
|
Loading…
Reference in New Issue