diff --git a/nucypher/blockchain/eth/token.py b/nucypher/blockchain/eth/token.py index 8dc1d66a1..bf98bdbdb 100644 --- a/nucypher/blockchain/eth/token.py +++ b/nucypher/blockchain/eth/token.py @@ -563,6 +563,8 @@ class WorkTracker: self.__uptime_period = NOT_STAKING self._abort_on_error = False + self._consecutive_fails = 0 + @classmethod def random_interval(cls) -> int: return random.randint(cls.INTERVAL_FLOOR, cls.INTERVAL_CEIL) @@ -598,7 +600,7 @@ class WorkTracker: self.__uptime_period = self.staking_agent.get_current_period() self.__current_period = self.__uptime_period - self.log.info(f"START WORK TRACKING") + self.log.info(f"START WORK TRACKING (immediate action: {act_now})") d = self._tracking_task.start(interval=self.random_interval(), now=act_now) d.addErrback(self.handle_working_errors) @@ -618,9 +620,16 @@ class WorkTracker: self.stop() reactor.callFromThread(self._crash_gracefully, failure=failure) else: - self.log.warn(f'Unhandled error during work tracking: {failure.getTraceback()!r}', + self.log.warn(f'Unhandled error during work tracking (#{self._consecutive_fails}): {failure.getTraceback()!r}', failure=failure) - self.start() + + # the effect of this is that we get one immediate retry. + # After that, the random_interval will be honored until + # success is achieved + act_now = self._consecutive_fails < 1 + self._consecutive_fails += 1 + self.start(act_now=act_now) + def __work_requirement_is_satisfied(self) -> bool: # TODO: Check for stake expiration and exit @@ -750,6 +759,7 @@ class WorkTracker: return # This cycle is finished. else: # Randomize the next task interval over time, within bounds. + self._consecutive_fails = 0 self._tracking_task.interval = self.random_interval() # Only perform work this round if the requirements are met diff --git a/tests/unit/test_work_tracker_error_handling.py b/tests/unit/test_work_tracker_error_handling.py index 97242d514..e4d8f5e33 100644 --- a/tests/unit/test_work_tracker_error_handling.py +++ b/tests/unit/test_work_tracker_error_handling.py @@ -21,28 +21,19 @@ from twisted.internet import threads from twisted.internet.task import Clock from twisted.logger import globalLogPublisher, LogLevel +from nucypher.utilities.gas_strategies import GasStrategyError + from nucypher.blockchain.eth.token import WorkTracker from nucypher.utilities.logging import Logger, GlobalLoggerSettings +logger = Logger("test-logging") -class WorkTrackerThatFailsHalfTheTime(WorkTracker): - @property - def staking_agent(self): - class MockStakingAgent: - def get_current_period(self): - return 1 +def log(message): + logger.debug(message) - return MockStakingAgent() - def _do_work(self) -> None: - self.attempts += 1 - if self.attempts % 2: - raise BaseException("zomg something went wrong") - self.workdone += 1 - - def _crash_gracefully(self, failure=None) -> None: - assert failure.getErrorMessage() == 'zomg something went wrong' +class WorkTrackerArbitraryFailureConditions(WorkTracker): def __init__(self, clock, abort_on_error, *args, **kwargs): self.workdone = 0 @@ -52,6 +43,36 @@ class WorkTrackerThatFailsHalfTheTime(WorkTracker): self._tracking_task = task.LoopingCall(self._do_work) self._tracking_task.clock = self.CLOCK self._abort_on_error = abort_on_error + self._consecutive_fails = 0 + + def _do_work(self) -> None: + self.attempts += 1 + + self.check_success_conditions() + + self.workdone += 1 + self._consecutive_fails = 0 + + @property + def staking_agent(self): + class MockStakingAgent: + def get_current_period(self): + return 1 + + return MockStakingAgent() + + def _crash_gracefully(self, failure=None) -> None: + assert 'zomg something went wrong' in failure.getErrorMessage() + + def check_success_conditions(self): + pass + + +class WorkTrackerThatFailsHalfTheTime(WorkTrackerArbitraryFailureConditions): + + def check_success_conditions(self): + if self.attempts % 2: + raise BaseException(f"zomg something went wrong: {self.attempts} % 2 = {self.attempts % 2}") @pytest_twisted.inlineCallbacks @@ -61,10 +82,13 @@ def test_worker_failure_resilience(): worktracker = WorkTrackerThatFailsHalfTheTime(clock, False) def advance_one_cycle(_): - clock.advance(WorkTrackerThatFailsHalfTheTime.INTERVAL_CEIL) + clock.advance(worktracker.INTERVAL_CEIL) def checkworkstate(_): - assert worktracker.attempts / 2 == worktracker.workdone + if worktracker.attempts % 2: + assert worktracker._consecutive_fails > 0 + else: + assert worktracker.attempts / 2 == worktracker.workdone def start(): worktracker.start() @@ -87,7 +111,7 @@ def test_worker_failure_resilience(): assert warnings for warning in warnings: - assert warning['failure'].getErrorMessage() == "zomg something went wrong" + assert "zomg something went wrong" in warning['failure'].getErrorMessage() @pytest_twisted.inlineCallbacks @@ -129,4 +153,56 @@ def test_worker_failure_non_resilience(): globalLogPublisher.removeObserver(critical_trapper) assert len(critical) == 1 - assert critical[0]['failure'].getErrorMessage() == "zomg something went wrong" + assert "zomg something went wrong" in critical[0]['failure'].getErrorMessage() + + +class WorkTrackerThatFailsFor12HoursThenSucceeds(WorkTrackerArbitraryFailureConditions): + + def check_success_conditions(self): + if self.CLOCK.seconds() < 60*60*12: + raise GasStrategyError("Gas is too expensive in the morning.") + + @classmethod + def random_interval(cls, fails=None): + return cls.INTERVAL_FLOOR + + +@pytest_twisted.inlineCallbacks +def test_worker_rate_limiting(): + """ + abort on error is True for this one + """ + + # Control time + clock = Clock() + worktracker = WorkTrackerThatFailsFor12HoursThenSucceeds(clock, False) + + seconds_per_step = 1200 # this can be anything. + # The behavior we want to fix in production is equivalent to seconds_per_step = 1 + # This test does pass with that value but it takes awhile and makes a lot of log file + # so lets go with 20 minute intervals + + # with a value of 1, we get this log output after 43201 cycles (12 hours and 1 second) + # [test-logging#debug] 12 hour fail worktracker: attempts: 50, clock: 43201.0, work: 1 + + def advance_one_cycle(_): + clock.advance(seconds_per_step) + + def checkfailures(_): + log(f"12 hour fail worktracker: attempts: {worktracker.attempts}, " + f"clock: {worktracker.CLOCK.seconds()}, work: {worktracker.workdone}") + assert worktracker.attempts <= (worktracker.CLOCK.seconds() / worktracker.INTERVAL_FLOOR) + 2 # account for the single instant retry + + def start(): + worktracker.start() + + d = threads.deferToThread(start) + + iterations = (60*60*12)+1 # 12 hours plus one second + for i in range(0, iterations, seconds_per_step): + d.addCallback(advance_one_cycle) + d.addCallback(checkfailures) + + yield d + + assert worktracker.workdone