mirror of https://github.com/nucypher/nucypher.git
worktracker: enforce random_interval after one failure
parent
997e915d92
commit
5ed448058c
|
@ -563,6 +563,8 @@ class WorkTracker:
|
|||
self.__uptime_period = NOT_STAKING
|
||||
self._abort_on_error = False
|
||||
|
||||
self._consecutive_fails = 0
|
||||
|
||||
@classmethod
|
||||
def random_interval(cls) -> int:
|
||||
return random.randint(cls.INTERVAL_FLOOR, cls.INTERVAL_CEIL)
|
||||
|
@ -598,7 +600,7 @@ class WorkTracker:
|
|||
self.__uptime_period = self.staking_agent.get_current_period()
|
||||
self.__current_period = self.__uptime_period
|
||||
|
||||
self.log.info(f"START WORK TRACKING")
|
||||
self.log.info(f"START WORK TRACKING (immediate action: {act_now})")
|
||||
d = self._tracking_task.start(interval=self.random_interval(), now=act_now)
|
||||
d.addErrback(self.handle_working_errors)
|
||||
|
||||
|
@ -618,9 +620,16 @@ class WorkTracker:
|
|||
self.stop()
|
||||
reactor.callFromThread(self._crash_gracefully, failure=failure)
|
||||
else:
|
||||
self.log.warn(f'Unhandled error during work tracking: {failure.getTraceback()!r}',
|
||||
self.log.warn(f'Unhandled error during work tracking (#{self._consecutive_fails}): {failure.getTraceback()!r}',
|
||||
failure=failure)
|
||||
self.start()
|
||||
|
||||
# the effect of this is that we get one immediate retry.
|
||||
# After that, the random_interval will be honored until
|
||||
# success is achieved
|
||||
act_now = self._consecutive_fails < 1
|
||||
self._consecutive_fails += 1
|
||||
self.start(act_now=act_now)
|
||||
|
||||
|
||||
def __work_requirement_is_satisfied(self) -> bool:
|
||||
# TODO: Check for stake expiration and exit
|
||||
|
@ -750,6 +759,7 @@ class WorkTracker:
|
|||
return # This cycle is finished.
|
||||
else:
|
||||
# Randomize the next task interval over time, within bounds.
|
||||
self._consecutive_fails = 0
|
||||
self._tracking_task.interval = self.random_interval()
|
||||
|
||||
# Only perform work this round if the requirements are met
|
||||
|
|
|
@ -21,28 +21,19 @@ from twisted.internet import threads
|
|||
from twisted.internet.task import Clock
|
||||
from twisted.logger import globalLogPublisher, LogLevel
|
||||
|
||||
from nucypher.utilities.gas_strategies import GasStrategyError
|
||||
|
||||
from nucypher.blockchain.eth.token import WorkTracker
|
||||
from nucypher.utilities.logging import Logger, GlobalLoggerSettings
|
||||
|
||||
logger = Logger("test-logging")
|
||||
|
||||
class WorkTrackerThatFailsHalfTheTime(WorkTracker):
|
||||
|
||||
@property
|
||||
def staking_agent(self):
|
||||
class MockStakingAgent:
|
||||
def get_current_period(self):
|
||||
return 1
|
||||
def log(message):
|
||||
logger.debug(message)
|
||||
|
||||
return MockStakingAgent()
|
||||
|
||||
def _do_work(self) -> None:
|
||||
self.attempts += 1
|
||||
if self.attempts % 2:
|
||||
raise BaseException("zomg something went wrong")
|
||||
self.workdone += 1
|
||||
|
||||
def _crash_gracefully(self, failure=None) -> None:
|
||||
assert failure.getErrorMessage() == 'zomg something went wrong'
|
||||
class WorkTrackerArbitraryFailureConditions(WorkTracker):
|
||||
|
||||
def __init__(self, clock, abort_on_error, *args, **kwargs):
|
||||
self.workdone = 0
|
||||
|
@ -52,6 +43,36 @@ class WorkTrackerThatFailsHalfTheTime(WorkTracker):
|
|||
self._tracking_task = task.LoopingCall(self._do_work)
|
||||
self._tracking_task.clock = self.CLOCK
|
||||
self._abort_on_error = abort_on_error
|
||||
self._consecutive_fails = 0
|
||||
|
||||
def _do_work(self) -> None:
|
||||
self.attempts += 1
|
||||
|
||||
self.check_success_conditions()
|
||||
|
||||
self.workdone += 1
|
||||
self._consecutive_fails = 0
|
||||
|
||||
@property
|
||||
def staking_agent(self):
|
||||
class MockStakingAgent:
|
||||
def get_current_period(self):
|
||||
return 1
|
||||
|
||||
return MockStakingAgent()
|
||||
|
||||
def _crash_gracefully(self, failure=None) -> None:
|
||||
assert 'zomg something went wrong' in failure.getErrorMessage()
|
||||
|
||||
def check_success_conditions(self):
|
||||
pass
|
||||
|
||||
|
||||
class WorkTrackerThatFailsHalfTheTime(WorkTrackerArbitraryFailureConditions):
|
||||
|
||||
def check_success_conditions(self):
|
||||
if self.attempts % 2:
|
||||
raise BaseException(f"zomg something went wrong: {self.attempts} % 2 = {self.attempts % 2}")
|
||||
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
|
@ -61,10 +82,13 @@ def test_worker_failure_resilience():
|
|||
worktracker = WorkTrackerThatFailsHalfTheTime(clock, False)
|
||||
|
||||
def advance_one_cycle(_):
|
||||
clock.advance(WorkTrackerThatFailsHalfTheTime.INTERVAL_CEIL)
|
||||
clock.advance(worktracker.INTERVAL_CEIL)
|
||||
|
||||
def checkworkstate(_):
|
||||
assert worktracker.attempts / 2 == worktracker.workdone
|
||||
if worktracker.attempts % 2:
|
||||
assert worktracker._consecutive_fails > 0
|
||||
else:
|
||||
assert worktracker.attempts / 2 == worktracker.workdone
|
||||
|
||||
def start():
|
||||
worktracker.start()
|
||||
|
@ -87,7 +111,7 @@ def test_worker_failure_resilience():
|
|||
|
||||
assert warnings
|
||||
for warning in warnings:
|
||||
assert warning['failure'].getErrorMessage() == "zomg something went wrong"
|
||||
assert "zomg something went wrong" in warning['failure'].getErrorMessage()
|
||||
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
|
@ -129,4 +153,56 @@ def test_worker_failure_non_resilience():
|
|||
globalLogPublisher.removeObserver(critical_trapper)
|
||||
|
||||
assert len(critical) == 1
|
||||
assert critical[0]['failure'].getErrorMessage() == "zomg something went wrong"
|
||||
assert "zomg something went wrong" in critical[0]['failure'].getErrorMessage()
|
||||
|
||||
|
||||
class WorkTrackerThatFailsFor12HoursThenSucceeds(WorkTrackerArbitraryFailureConditions):
|
||||
|
||||
def check_success_conditions(self):
|
||||
if self.CLOCK.seconds() < 60*60*12:
|
||||
raise GasStrategyError("Gas is too expensive in the morning.")
|
||||
|
||||
@classmethod
|
||||
def random_interval(cls, fails=None):
|
||||
return cls.INTERVAL_FLOOR
|
||||
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
def test_worker_rate_limiting():
|
||||
"""
|
||||
abort on error is True for this one
|
||||
"""
|
||||
|
||||
# Control time
|
||||
clock = Clock()
|
||||
worktracker = WorkTrackerThatFailsFor12HoursThenSucceeds(clock, False)
|
||||
|
||||
seconds_per_step = 1200 # this can be anything.
|
||||
# The behavior we want to fix in production is equivalent to seconds_per_step = 1
|
||||
# This test does pass with that value but it takes awhile and makes a lot of log file
|
||||
# so lets go with 20 minute intervals
|
||||
|
||||
# with a value of 1, we get this log output after 43201 cycles (12 hours and 1 second)
|
||||
# [test-logging#debug] 12 hour fail worktracker: attempts: 50, clock: 43201.0, work: 1
|
||||
|
||||
def advance_one_cycle(_):
|
||||
clock.advance(seconds_per_step)
|
||||
|
||||
def checkfailures(_):
|
||||
log(f"12 hour fail worktracker: attempts: {worktracker.attempts}, "
|
||||
f"clock: {worktracker.CLOCK.seconds()}, work: {worktracker.workdone}")
|
||||
assert worktracker.attempts <= (worktracker.CLOCK.seconds() / worktracker.INTERVAL_FLOOR) + 2 # account for the single instant retry
|
||||
|
||||
def start():
|
||||
worktracker.start()
|
||||
|
||||
d = threads.deferToThread(start)
|
||||
|
||||
iterations = (60*60*12)+1 # 12 hours plus one second
|
||||
for i in range(0, iterations, seconds_per_step):
|
||||
d.addCallback(advance_one_cycle)
|
||||
d.addCallback(checkfailures)
|
||||
|
||||
yield d
|
||||
|
||||
assert worktracker.workdone
|
||||
|
|
Loading…
Reference in New Issue