From 276b3349413fc05ff143df498ac4a539f4dc02f6 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 24 Mar 2022 14:57:17 -0400 Subject: [PATCH 1/4] Ensure that worktracker stops on first startup once operator is confirmed. --- nucypher/blockchain/eth/actors.py | 4 +- nucypher/blockchain/eth/token.py | 25 +++++-- nucypher/cli/actions/configure.py | 2 +- nucypher/utilities/networking.py | 5 +- .../unit/test_work_tracker_error_handling.py | 69 ++++++++++++++++++- 5 files changed, 93 insertions(+), 12 deletions(-) diff --git a/nucypher/blockchain/eth/actors.py b/nucypher/blockchain/eth/actors.py index 4a22f6356..34343c757 100644 --- a/nucypher/blockchain/eth/actors.py +++ b/nucypher/blockchain/eth/actors.py @@ -363,7 +363,9 @@ class Operator(BaseActor): if ether_balance: # funds found funded, balance = True, Web3.fromWei(ether_balance, 'ether') - emitter.message(f"✓ Operator is funded with {balance} ETH", color='green') + emitter.message(f"✓ Operator {self.operator_address} is funded with {balance} ETH", color='green') + else: + emitter.message(f"! Operator {self.operator_address} is not funded with ETH", color="yellow") if (not bonded) and (self.get_staking_provider_address() != NULL_ADDRESS): bonded = True diff --git a/nucypher/blockchain/eth/token.py b/nucypher/blockchain/eth/token.py index 54255e9c7..7e269ddab 100644 --- a/nucypher/blockchain/eth/token.py +++ b/nucypher/blockchain/eth/token.py @@ -263,7 +263,7 @@ class WorkTrackerBase: self._consecutive_fails += 1 self.start(commit_now=commit_now) - def __should_do_work_now(self) -> bool: + def _should_do_work_now(self) -> bool: # TODO: Check for stake expiration and exit if self.__requirement is None: return True @@ -325,12 +325,12 @@ class WorkTrackerBase: return bool(self.__pending) - def _fire_replacement_commitment(self, current_block_number: int, tx_firing_block_number: int) -> None: + def __fire_replacement_commitment(self, current_block_number: int, tx_firing_block_number: int) -> None: replacement_txhash = self._fire_commitment() # replace self.__pending[current_block_number] = replacement_txhash # track this transaction del self.__pending[tx_firing_block_number] # assume our original TX is stuck - def _handle_replacement_commitment(self, current_block_number: int) -> None: + def __handle_replacement_commitment(self, current_block_number: int) -> None: tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0] if txhash is UNTRACKED_PENDING_TRANSACTION: # TODO: Detect if this untracked pending transaction is a commitment transaction at all. @@ -360,6 +360,10 @@ class WorkTrackerBase: """ Async working task for Ursula # TODO: Split into multiple async tasks """ + if self._all_work_completed(): + # nothing left to do + self.stop() + return self.log.info(f"{self.__class__.__name__} is running. Advancing to next work cycle.") # TODO: What to call the verb the subject performs? @@ -382,9 +386,10 @@ class WorkTrackerBase: self._tracking_task.interval = self.random_interval(fails=self._consecutive_fails) # Only perform work this round if the requirements are met - if not self.__should_do_work_now(): + if not self._should_do_work_now(): self.log.warn(f'COMMIT PREVENTED (callable: "{self.__requirement.__name__}") - ' f'Situation does not call for doing work now.') + # TODO: Follow-up actions for failed requirements return @@ -399,11 +404,11 @@ class WorkTrackerBase: """ post __init__ configuration dealing with contracts or state specific to this PRE flavor""" raise NotImplementedError - def _prep_work_state(self): + def _prep_work_state(self) -> bool: """ configuration perfomed before transaction management in task execution """ raise NotImplementedError - def _final_work_prep_before_transaction(self): + def _final_work_prep_before_transaction(self) -> bool: """ configuration perfomed after transaction management in task execution right before transaction firing""" raise NotImplementedError() @@ -411,6 +416,10 @@ class WorkTrackerBase: """ actually fire the tranasction """ raise NotImplementedError + def _all_work_completed(self) -> bool: + """ allows the work tracker to indicate that its work is completed it can be shut down """ + raise NotImplementedError + class WorkTracker(WorkTrackerBase): @@ -438,3 +447,7 @@ class WorkTracker(WorkTrackerBase): txhash = self.worker.confirm_address(fire_and_forget=True) # < --- blockchain WRITE self.log.info(f"Confirming operator address {self.worker.operator_address} with staking provider {self.worker.staking_provider_address} - TxHash: {txhash.hex()}") return txhash + + def _all_work_completed(self) -> bool: + # only a one-and-done - work is no longer needed + return not self._should_do_work_now() diff --git a/nucypher/cli/actions/configure.py b/nucypher/cli/actions/configure.py index 3cdb5ff86..d5731236c 100644 --- a/nucypher/cli/actions/configure.py +++ b/nucypher/cli/actions/configure.py @@ -181,7 +181,7 @@ def perform_startup_ip_check(emitter: StdoutEmitter, ursula: Ursula, force: bool ip_mismatch = external_ip != rest_host if ip_mismatch and not force: - error = f'\nX External IP address ({external_ip}) does not match configuration ({ursula.rest_interface.host}).\n' + error = f'\nx External IP address ({external_ip}) does not match configuration ({ursula.rest_interface.host}).\n' hint = f"Run 'nucypher ursula config ip-address' to reconfigure the IP address then try " \ f"again or use --no-ip-checkup to bypass this check (not recommended).\n" emitter.message(error, color='red') diff --git a/nucypher/utilities/networking.py b/nucypher/utilities/networking.py index a6ec3385a..72a759dc2 100644 --- a/nucypher/utilities/networking.py +++ b/nucypher/utilities/networking.py @@ -62,8 +62,9 @@ IP_DETECTION_LOGGER = Logger('external-ip-detection') def validate_operator_ip(ip: str) -> None: if ip in RESERVED_IP_ADDRESSES: - raise InvalidOperatorIP(f'{ip} is not a valid or permitted worker IP address. ' - f'Verify the rest_host is set to the external IPV4 address') + raise InvalidOperatorIP(f"{ip} is not a valid or permitted operator IP address. " + f"Verify the 'rest_host' configuration value is set to the " + f"external IPV4 address") def _request(url: str, certificate=None) -> Union[str, None]: diff --git a/tests/unit/test_work_tracker_error_handling.py b/tests/unit/test_work_tracker_error_handling.py index 9a8f29e95..b18679330 100644 --- a/tests/unit/test_work_tracker_error_handling.py +++ b/tests/unit/test_work_tracker_error_handling.py @@ -14,7 +14,7 @@ You should have received a copy of the GNU Affero General Public License along with nucypher. If not, see . """ - +from unittest.mock import MagicMock, Mock import pytest import pytest_twisted @@ -23,9 +23,10 @@ from twisted.internet import threads from twisted.internet.task import Clock from twisted.logger import globalLogPublisher, LogLevel +from nucypher.blockchain.eth.clients import EthereumClient from nucypher.utilities.gas_strategies import GasStrategyError -from nucypher.blockchain.eth.token import WorkTrackerBase +from nucypher.blockchain.eth.token import WorkTrackerBase, WorkTracker from nucypher.utilities.logging import Logger, GlobalLoggerSettings logger = Logger("test-logging") @@ -77,6 +78,43 @@ class WorkTrackerThatFailsHalfTheTime(WorkTrackerArbitraryFailureConditions): raise BaseException(f"zomg something went wrong: {self.attempts} % 2 = {self.attempts % 2}") +class WorkTrackerTrackExecutions(WorkTracker): + def __init__(self, clock, num_executions_before_stop: int, *args, **kwargs): + super().__init__(*args, **kwargs) + self._tracking_task.clock = clock + self.num_executions_before_stop = num_executions_before_stop + self.executions = 0 + self.executions_when_stopped = 0 + + def _do_work(self) -> None: + super()._do_work() + self.executions += 1 + + def stop(self): + self.executions_when_stopped = self.executions + super().stop() + + def _configure(self, *args): + client = Mock(spec=EthereumClient) + client.get_transaction_count.return_value = 0 + self.client = client + + def _prep_work_state(self): + return True + + def _final_work_prep_before_transaction(self): + return True + + def _fire_commitment(self): + return bytes.fromhex("deadbeef") + + def _should_do_work_now(self) -> bool: + return self.executions < self.num_executions_before_stop + + def _WorkTrackerBase__track_pending_commitments(self): + return False + + @pytest_twisted.inlineCallbacks def test_worker_failure_resilience(): # Control time @@ -208,3 +246,30 @@ def test_worker_rate_limiting(): yield d assert worktracker.workdone + + +@pytest_twisted.inlineCallbacks +@pytest.mark.parametrize('num_executions', [0, 1, 5]) +def test_worker_stopped_after_required_executions(num_executions): + # Control time + clock = Clock() + worker = MagicMock() + worktracker = WorkTrackerTrackExecutions(clock=clock, + num_executions_before_stop=num_executions, + worker=worker) + + def start(): + worktracker.start() + + d = threads.deferToThread(start) + + def advance_one_cycle(_): + clock.advance(WorkTrackerBase.INTERVAL_CEIL) + + for i in range(10): + d.addCallback(advance_one_cycle) + + yield d + + assert worktracker.executions_when_stopped == num_executions + assert worktracker.executions >= num_executions From a8aa869c2d41a73f2fc11b8020e5655f0c1ad6d3 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 24 Mar 2022 16:28:23 -0400 Subject: [PATCH 2/4] Ursula runs a background task, OperatorBondedTracker, which ensures that it shuts down if no the operator address is no longer bonded to a staking provider. --- nucypher/characters/lawful.py | 16 +++-- nucypher/network/trackers.py | 44 +++++++++++- nucypher/utilities/task.py | 64 +++++++++++++++++ tests/unit/test_operator_bonded_tracker.py | 83 ++++++++++++++++++++++ 4 files changed, 200 insertions(+), 7 deletions(-) create mode 100644 nucypher/utilities/task.py create mode 100644 tests/unit/test_operator_bonded_tracker.py diff --git a/nucypher/characters/lawful.py b/nucypher/characters/lawful.py index 5dbc76037..6871782fa 100644 --- a/nucypher/characters/lawful.py +++ b/nucypher/characters/lawful.py @@ -89,7 +89,7 @@ from nucypher.network.nodes import NodeSprout, TEACHER_NODES, Teacher from nucypher.network.protocols import parse_node_uri from nucypher.network.retrieval import RetrievalClient from nucypher.network.server import ProxyRESTServer, make_rest_app -from nucypher.network.trackers import AvailabilityTracker +from nucypher.network.trackers import AvailabilityTracker, OperatorBondedTracker from nucypher.policy.kits import PolicyMessageKit from nucypher.policy.payment import PaymentMethod, FreeReencryptions from nucypher.policy.policies import Policy, BlockchainPolicy, FederatedPolicy @@ -744,9 +744,8 @@ class Ursula(Teacher, Character, Operator): # Health Checks self._availability_check = availability_check self._availability_tracker = AvailabilityTracker(ursula=self) - - # Datastore Pruning - self.__pruning_task: Union[Deferred, None] = None + if not federated_only: + self._operator_bonded_tracker = OperatorBondedTracker(ursula=self) # Policy Payment if federated_only and not payment_method: @@ -944,6 +943,12 @@ class Ursula(Teacher, Character, Operator): # Non-order dependant services # + # Continuous bonded check now that Ursula is all ready to run + if not self.federated_only: + self._operator_bonded_tracker.start(now=eager) + if emitter: + emitter.message(f"✓ Start Operator Bonded Tracker", color='green') + if prometheus_config: # Locally scoped to prevent import without prometheus explicitly installed from nucypher.utilities.prometheus.metrics import start_prometheus_exporter @@ -991,8 +996,7 @@ class Ursula(Teacher, Character, Operator): self.stop_learning_loop() if not self.federated_only: self.work_tracker.stop() - if self._datastore_pruning_task.running: - self._datastore_pruning_task.stop() + self._operator_bonded_tracker.stop() if halt_reactor: reactor.stop() diff --git a/nucypher/network/trackers.py b/nucypher/network/trackers.py index 5c39d3dd0..a156f4eae 100644 --- a/nucypher/network/trackers.py +++ b/nucypher/network/trackers.py @@ -16,16 +16,58 @@ """ import random +from typing import Union import maya from twisted.internet import reactor from twisted.internet.task import LoopingCall -from typing import Union +from twisted.python.failure import Failure +from nucypher.blockchain.eth.agents import ContractAgency, PREApplicationAgent +from nucypher.blockchain.eth.constants import NULL_ADDRESS +from nucypher.control.emitters import StdoutEmitter from nucypher.network.exceptions import NodeSeemsToBeDown from nucypher.network.middleware import RestMiddleware from nucypher.network.nodes import NodeSprout from nucypher.utilities.logging import Logger +from nucypher.utilities.task import SimpleTask + + +class OperatorBondedTracker(SimpleTask): + INTERVAL = 60 * 60 # 1 hour + + class OperatorNoLongerBonded(RuntimeError): + """Raised when a running node is no longer associated with a staking provider.""" + + def __init__(self, ursula): + self._ursula = ursula + super().__init__() + + def run(self) -> None: + application_agent = ContractAgency.get_agent(PREApplicationAgent, + registry=self._ursula.registry, + eth_provider_uri=self._ursula.eth_provider_uri) + staking_provider_address = application_agent.get_staking_provider_from_operator( + operator_address=self._ursula.operator_address) + if staking_provider_address == NULL_ADDRESS: + # forcibly shut down ursula + self._shutdown_ursula(halt_reactor=True) + + def _shutdown_ursula(self, halt_reactor=False): + emitter = StdoutEmitter() + emitter.message(f'x [Operator {self._ursula.operator_address} is no longer bonded to any ' + f'staking provider] - Commencing auto-shutdown sequence...', color="red") + try: + raise self.OperatorNoLongerBonded() + finally: + self._ursula.stop(halt_reactor=halt_reactor) + + def handle_errors(self, failure: Failure) -> None: + cleaned_traceback = self.clean_traceback(failure) + self.log.warn(f"Unhandled error during operator bonded check: {cleaned_traceback}") + if failure.check([self.OperatorNoLongerBonded]): + # this type of exception we want to propagate because we will shut down + failure.raiseException() class AvailabilityTracker: diff --git a/nucypher/utilities/task.py b/nucypher/utilities/task.py new file mode 100644 index 000000000..48427c25f --- /dev/null +++ b/nucypher/utilities/task.py @@ -0,0 +1,64 @@ +""" + This file is part of nucypher. + + nucypher is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + nucypher is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with nucypher. If not, see . +""" + +from abc import ABC, abstractmethod + +from twisted.internet.task import LoopingCall +from twisted.python.failure import Failure + +from nucypher.utilities.logging import Logger + + +class SimpleTask(ABC): + """Simple Twisted Looping Call abstract base class.""" + INTERVAL = 60 # 60s default + + def __init__(self): + self.log = Logger(self.__class__.__name__) + self.__task = LoopingCall(self.run) + + @property + def running(self) -> bool: + """Determine whether the task is already running.""" + return self.__task.running + + def start(self, now: bool = False): + """Start task.""" + if not self.running: + d = self.__task.start(interval=self.INTERVAL, now=now) + d.addErrback(self.handle_errors) + + def stop(self): + """Stop task.""" + if self.running: + self.__task.stop() + + @abstractmethod + def run(self): + """Task method that should be periodically run.""" + raise NotImplementedError + + @abstractmethod + def handle_errors(self, *args, **kwargs): + """Error callback for error handling during execution.""" + raise NotImplementedError + + @staticmethod + def clean_traceback(failure: Failure) -> str: + # FIXME: Amazing. + cleaned_traceback = failure.getTraceback().replace('{', '').replace('}', '') + return cleaned_traceback diff --git a/tests/unit/test_operator_bonded_tracker.py b/tests/unit/test_operator_bonded_tracker.py new file mode 100644 index 000000000..8d14b9368 --- /dev/null +++ b/tests/unit/test_operator_bonded_tracker.py @@ -0,0 +1,83 @@ +""" +This file is part of nucypher. + +nucypher is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +nucypher is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with nucypher. If not, see . +""" + +import pytest +import pytest_twisted +from twisted.internet import threads + +from nucypher.blockchain.eth.agents import ContractAgency +from nucypher.blockchain.eth.constants import NULL_ADDRESS +from nucypher.network.trackers import OperatorBondedTracker + + +@pytest_twisted.inlineCallbacks +def test_operator_never_bonded(mocker, get_random_checksum_address): + ursula = mocker.Mock() + operator_address = get_random_checksum_address() + ursula.operator_address = operator_address + + application_agent = mocker.Mock() + application_agent.get_staking_provider_from_operator.return_value = NULL_ADDRESS + + mocker.patch.object(ContractAgency, 'get_agent', return_value=application_agent) + + tracker = OperatorBondedTracker(ursula=ursula) + try: + d = threads.deferToThread(tracker.start) + yield d + + with pytest.raises(OperatorBondedTracker.OperatorNoLongerBonded): + d = threads.deferToThread(tracker.run) + yield d + finally: + application_agent.get_staking_provider_from_operator.assert_called_once() + ursula.stop.assert_called_once_with(halt_reactor=True) # stop entire reactor + tracker.stop() + + +@pytest_twisted.inlineCallbacks +def test_operator_bonded_but_becomes_unbonded(mocker, get_random_checksum_address): + ursula = mocker.Mock() + operator_address = get_random_checksum_address() + ursula.operator_address = operator_address + + application_agent = mocker.Mock() + staking_provider = get_random_checksum_address() + application_agent.get_staking_provider_from_operator.return_value = staking_provider + + mocker.patch.object(ContractAgency, 'get_agent', return_value=application_agent) + + tracker = OperatorBondedTracker(ursula=ursula) + try: + d = threads.deferToThread(tracker.start) + yield d + + # bonded + for i in range(1, 10): + d = threads.deferToThread(tracker.run) + yield d + assert application_agent.get_staking_provider_from_operator.call_count == i, "check for operator bonded called" + ursula.stop.assert_not_called() + + # becomes unbonded + application_agent.get_staking_provider_from_operator.return_value = NULL_ADDRESS + with pytest.raises(OperatorBondedTracker.OperatorNoLongerBonded): + d = threads.deferToThread(tracker.run) + yield d + finally: + ursula.stop.assert_called_once_with(halt_reactor=True) # stop entire reactor + tracker.stop() From ac7b421b7ef0947bc697ca0ead82f10b30b3ff4d Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 24 Mar 2022 20:57:24 -0400 Subject: [PATCH 3/4] Add newsfragments for #2886. --- newsfragments/2886.bugfix.rst | 1 + newsfragments/2886.feature.rst | 1 + 2 files changed, 2 insertions(+) create mode 100644 newsfragments/2886.bugfix.rst create mode 100644 newsfragments/2886.feature.rst diff --git a/newsfragments/2886.bugfix.rst b/newsfragments/2886.bugfix.rst new file mode 100644 index 000000000..85240dc8c --- /dev/null +++ b/newsfragments/2886.bugfix.rst @@ -0,0 +1 @@ +Fix runaway WorkTracker task that ensures operator confirmed transaction occurs but continues running and making web3 requests even after operator already confirmed. diff --git a/newsfragments/2886.feature.rst b/newsfragments/2886.feature.rst new file mode 100644 index 000000000..b27e0ba40 --- /dev/null +++ b/newsfragments/2886.feature.rst @@ -0,0 +1 @@ +Proactively shut down Ursula if it is no longer bonded to any staking provider. From 9b1dec2eb1cfe3e3e4000a25cbc82472ba2b2b9a Mon Sep 17 00:00:00 2001 From: Derek Pierre Date: Fri, 25 Mar 2022 12:20:35 -0400 Subject: [PATCH 4/4] Apply RFCs from #2886. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: David Núñez --- nucypher/blockchain/eth/token.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nucypher/blockchain/eth/token.py b/nucypher/blockchain/eth/token.py index 7e269ddab..6239ffd97 100644 --- a/nucypher/blockchain/eth/token.py +++ b/nucypher/blockchain/eth/token.py @@ -417,7 +417,7 @@ class WorkTrackerBase: raise NotImplementedError def _all_work_completed(self) -> bool: - """ allows the work tracker to indicate that its work is completed it can be shut down """ + """ allows the work tracker to indicate that its work is completed and it can be shut down """ raise NotImplementedError