Merge pull request #2886 from derekpierre/airship-engine-overload

Fix additional issues from Airship
pull/2889/head
KPrasch 2022-03-25 10:59:02 -07:00 committed by GitHub
commit 50c0b02223
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 295 additions and 19 deletions

View File

@ -0,0 +1 @@
Fix runaway WorkTracker task that ensures operator confirmed transaction occurs but continues running and making web3 requests even after operator already confirmed.

View File

@ -0,0 +1 @@
Proactively shut down Ursula if it is no longer bonded to any staking provider.

View File

@ -363,7 +363,9 @@ class Operator(BaseActor):
if ether_balance:
# funds found
funded, balance = True, Web3.fromWei(ether_balance, 'ether')
emitter.message(f"✓ Operator is funded with {balance} ETH", color='green')
emitter.message(f"✓ Operator {self.operator_address} is funded with {balance} ETH", color='green')
else:
emitter.message(f"! Operator {self.operator_address} is not funded with ETH", color="yellow")
if (not bonded) and (self.get_staking_provider_address() != NULL_ADDRESS):
bonded = True

View File

@ -263,7 +263,7 @@ class WorkTrackerBase:
self._consecutive_fails += 1
self.start(commit_now=commit_now)
def __should_do_work_now(self) -> bool:
def _should_do_work_now(self) -> bool:
# TODO: Check for stake expiration and exit
if self.__requirement is None:
return True
@ -325,12 +325,12 @@ class WorkTrackerBase:
return bool(self.__pending)
def _fire_replacement_commitment(self, current_block_number: int, tx_firing_block_number: int) -> None:
def __fire_replacement_commitment(self, current_block_number: int, tx_firing_block_number: int) -> None:
replacement_txhash = self._fire_commitment() # replace
self.__pending[current_block_number] = replacement_txhash # track this transaction
del self.__pending[tx_firing_block_number] # assume our original TX is stuck
def _handle_replacement_commitment(self, current_block_number: int) -> None:
def __handle_replacement_commitment(self, current_block_number: int) -> None:
tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0]
if txhash is UNTRACKED_PENDING_TRANSACTION:
# TODO: Detect if this untracked pending transaction is a commitment transaction at all.
@ -360,6 +360,10 @@ class WorkTrackerBase:
"""
Async working task for Ursula # TODO: Split into multiple async tasks
"""
if self._all_work_completed():
# nothing left to do
self.stop()
return
self.log.info(f"{self.__class__.__name__} is running. Advancing to next work cycle.") # TODO: What to call the verb the subject performs?
@ -382,9 +386,10 @@ class WorkTrackerBase:
self._tracking_task.interval = self.random_interval(fails=self._consecutive_fails)
# Only perform work this round if the requirements are met
if not self.__should_do_work_now():
if not self._should_do_work_now():
self.log.warn(f'COMMIT PREVENTED (callable: "{self.__requirement.__name__}") - '
f'Situation does not call for doing work now.')
# TODO: Follow-up actions for failed requirements
return
@ -399,11 +404,11 @@ class WorkTrackerBase:
""" post __init__ configuration dealing with contracts or state specific to this PRE flavor"""
raise NotImplementedError
def _prep_work_state(self):
def _prep_work_state(self) -> bool:
""" configuration perfomed before transaction management in task execution """
raise NotImplementedError
def _final_work_prep_before_transaction(self):
def _final_work_prep_before_transaction(self) -> bool:
""" configuration perfomed after transaction management in task execution right before transaction firing"""
raise NotImplementedError()
@ -411,6 +416,10 @@ class WorkTrackerBase:
""" actually fire the tranasction """
raise NotImplementedError
def _all_work_completed(self) -> bool:
""" allows the work tracker to indicate that its work is completed and it can be shut down """
raise NotImplementedError
class WorkTracker(WorkTrackerBase):
@ -438,3 +447,7 @@ class WorkTracker(WorkTrackerBase):
txhash = self.worker.confirm_address(fire_and_forget=True) # < --- blockchain WRITE
self.log.info(f"Confirming operator address {self.worker.operator_address} with staking provider {self.worker.staking_provider_address} - TxHash: {txhash.hex()}")
return txhash
def _all_work_completed(self) -> bool:
# only a one-and-done - work is no longer needed
return not self._should_do_work_now()

View File

@ -89,7 +89,7 @@ from nucypher.network.nodes import NodeSprout, TEACHER_NODES, Teacher
from nucypher.network.protocols import parse_node_uri
from nucypher.network.retrieval import RetrievalClient
from nucypher.network.server import ProxyRESTServer, make_rest_app
from nucypher.network.trackers import AvailabilityTracker
from nucypher.network.trackers import AvailabilityTracker, OperatorBondedTracker
from nucypher.policy.kits import PolicyMessageKit
from nucypher.policy.payment import PaymentMethod, FreeReencryptions
from nucypher.policy.policies import Policy, BlockchainPolicy, FederatedPolicy
@ -744,9 +744,8 @@ class Ursula(Teacher, Character, Operator):
# Health Checks
self._availability_check = availability_check
self._availability_tracker = AvailabilityTracker(ursula=self)
# Datastore Pruning
self.__pruning_task: Union[Deferred, None] = None
if not federated_only:
self._operator_bonded_tracker = OperatorBondedTracker(ursula=self)
# Policy Payment
if federated_only and not payment_method:
@ -944,6 +943,12 @@ class Ursula(Teacher, Character, Operator):
# Non-order dependant services
#
# Continuous bonded check now that Ursula is all ready to run
if not self.federated_only:
self._operator_bonded_tracker.start(now=eager)
if emitter:
emitter.message(f"✓ Start Operator Bonded Tracker", color='green')
if prometheus_config:
# Locally scoped to prevent import without prometheus explicitly installed
from nucypher.utilities.prometheus.metrics import start_prometheus_exporter
@ -991,8 +996,7 @@ class Ursula(Teacher, Character, Operator):
self.stop_learning_loop()
if not self.federated_only:
self.work_tracker.stop()
if self._datastore_pruning_task.running:
self._datastore_pruning_task.stop()
self._operator_bonded_tracker.stop()
if halt_reactor:
reactor.stop()

View File

@ -181,7 +181,7 @@ def perform_startup_ip_check(emitter: StdoutEmitter, ursula: Ursula, force: bool
ip_mismatch = external_ip != rest_host
if ip_mismatch and not force:
error = f'\nX External IP address ({external_ip}) does not match configuration ({ursula.rest_interface.host}).\n'
error = f'\nx External IP address ({external_ip}) does not match configuration ({ursula.rest_interface.host}).\n'
hint = f"Run 'nucypher ursula config ip-address' to reconfigure the IP address then try " \
f"again or use --no-ip-checkup to bypass this check (not recommended).\n"
emitter.message(error, color='red')

View File

@ -16,16 +16,58 @@
"""
import random
from typing import Union
import maya
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from typing import Union
from twisted.python.failure import Failure
from nucypher.blockchain.eth.agents import ContractAgency, PREApplicationAgent
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.control.emitters import StdoutEmitter
from nucypher.network.exceptions import NodeSeemsToBeDown
from nucypher.network.middleware import RestMiddleware
from nucypher.network.nodes import NodeSprout
from nucypher.utilities.logging import Logger
from nucypher.utilities.task import SimpleTask
class OperatorBondedTracker(SimpleTask):
INTERVAL = 60 * 60 # 1 hour
class OperatorNoLongerBonded(RuntimeError):
"""Raised when a running node is no longer associated with a staking provider."""
def __init__(self, ursula):
self._ursula = ursula
super().__init__()
def run(self) -> None:
application_agent = ContractAgency.get_agent(PREApplicationAgent,
registry=self._ursula.registry,
eth_provider_uri=self._ursula.eth_provider_uri)
staking_provider_address = application_agent.get_staking_provider_from_operator(
operator_address=self._ursula.operator_address)
if staking_provider_address == NULL_ADDRESS:
# forcibly shut down ursula
self._shutdown_ursula(halt_reactor=True)
def _shutdown_ursula(self, halt_reactor=False):
emitter = StdoutEmitter()
emitter.message(f'x [Operator {self._ursula.operator_address} is no longer bonded to any '
f'staking provider] - Commencing auto-shutdown sequence...', color="red")
try:
raise self.OperatorNoLongerBonded()
finally:
self._ursula.stop(halt_reactor=halt_reactor)
def handle_errors(self, failure: Failure) -> None:
cleaned_traceback = self.clean_traceback(failure)
self.log.warn(f"Unhandled error during operator bonded check: {cleaned_traceback}")
if failure.check([self.OperatorNoLongerBonded]):
# this type of exception we want to propagate because we will shut down
failure.raiseException()
class AvailabilityTracker:

View File

@ -62,8 +62,9 @@ IP_DETECTION_LOGGER = Logger('external-ip-detection')
def validate_operator_ip(ip: str) -> None:
if ip in RESERVED_IP_ADDRESSES:
raise InvalidOperatorIP(f'{ip} is not a valid or permitted worker IP address. '
f'Verify the rest_host is set to the external IPV4 address')
raise InvalidOperatorIP(f"{ip} is not a valid or permitted operator IP address. "
f"Verify the 'rest_host' configuration value is set to the "
f"external IPV4 address")
def _request(url: str, certificate=None) -> Union[str, None]:

View File

@ -0,0 +1,64 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from abc import ABC, abstractmethod
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
from nucypher.utilities.logging import Logger
class SimpleTask(ABC):
"""Simple Twisted Looping Call abstract base class."""
INTERVAL = 60 # 60s default
def __init__(self):
self.log = Logger(self.__class__.__name__)
self.__task = LoopingCall(self.run)
@property
def running(self) -> bool:
"""Determine whether the task is already running."""
return self.__task.running
def start(self, now: bool = False):
"""Start task."""
if not self.running:
d = self.__task.start(interval=self.INTERVAL, now=now)
d.addErrback(self.handle_errors)
def stop(self):
"""Stop task."""
if self.running:
self.__task.stop()
@abstractmethod
def run(self):
"""Task method that should be periodically run."""
raise NotImplementedError
@abstractmethod
def handle_errors(self, *args, **kwargs):
"""Error callback for error handling during execution."""
raise NotImplementedError
@staticmethod
def clean_traceback(failure: Failure) -> str:
# FIXME: Amazing.
cleaned_traceback = failure.getTraceback().replace('{', '').replace('}', '')
return cleaned_traceback

View File

@ -0,0 +1,83 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
import pytest_twisted
from twisted.internet import threads
from nucypher.blockchain.eth.agents import ContractAgency
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.network.trackers import OperatorBondedTracker
@pytest_twisted.inlineCallbacks
def test_operator_never_bonded(mocker, get_random_checksum_address):
ursula = mocker.Mock()
operator_address = get_random_checksum_address()
ursula.operator_address = operator_address
application_agent = mocker.Mock()
application_agent.get_staking_provider_from_operator.return_value = NULL_ADDRESS
mocker.patch.object(ContractAgency, 'get_agent', return_value=application_agent)
tracker = OperatorBondedTracker(ursula=ursula)
try:
d = threads.deferToThread(tracker.start)
yield d
with pytest.raises(OperatorBondedTracker.OperatorNoLongerBonded):
d = threads.deferToThread(tracker.run)
yield d
finally:
application_agent.get_staking_provider_from_operator.assert_called_once()
ursula.stop.assert_called_once_with(halt_reactor=True) # stop entire reactor
tracker.stop()
@pytest_twisted.inlineCallbacks
def test_operator_bonded_but_becomes_unbonded(mocker, get_random_checksum_address):
ursula = mocker.Mock()
operator_address = get_random_checksum_address()
ursula.operator_address = operator_address
application_agent = mocker.Mock()
staking_provider = get_random_checksum_address()
application_agent.get_staking_provider_from_operator.return_value = staking_provider
mocker.patch.object(ContractAgency, 'get_agent', return_value=application_agent)
tracker = OperatorBondedTracker(ursula=ursula)
try:
d = threads.deferToThread(tracker.start)
yield d
# bonded
for i in range(1, 10):
d = threads.deferToThread(tracker.run)
yield d
assert application_agent.get_staking_provider_from_operator.call_count == i, "check for operator bonded called"
ursula.stop.assert_not_called()
# becomes unbonded
application_agent.get_staking_provider_from_operator.return_value = NULL_ADDRESS
with pytest.raises(OperatorBondedTracker.OperatorNoLongerBonded):
d = threads.deferToThread(tracker.run)
yield d
finally:
ursula.stop.assert_called_once_with(halt_reactor=True) # stop entire reactor
tracker.stop()

View File

@ -14,7 +14,7 @@
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from unittest.mock import MagicMock, Mock
import pytest
import pytest_twisted
@ -23,9 +23,10 @@ from twisted.internet import threads
from twisted.internet.task import Clock
from twisted.logger import globalLogPublisher, LogLevel
from nucypher.blockchain.eth.clients import EthereumClient
from nucypher.utilities.gas_strategies import GasStrategyError
from nucypher.blockchain.eth.token import WorkTrackerBase
from nucypher.blockchain.eth.token import WorkTrackerBase, WorkTracker
from nucypher.utilities.logging import Logger, GlobalLoggerSettings
logger = Logger("test-logging")
@ -77,6 +78,43 @@ class WorkTrackerThatFailsHalfTheTime(WorkTrackerArbitraryFailureConditions):
raise BaseException(f"zomg something went wrong: {self.attempts} % 2 = {self.attempts % 2}")
class WorkTrackerTrackExecutions(WorkTracker):
def __init__(self, clock, num_executions_before_stop: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self._tracking_task.clock = clock
self.num_executions_before_stop = num_executions_before_stop
self.executions = 0
self.executions_when_stopped = 0
def _do_work(self) -> None:
super()._do_work()
self.executions += 1
def stop(self):
self.executions_when_stopped = self.executions
super().stop()
def _configure(self, *args):
client = Mock(spec=EthereumClient)
client.get_transaction_count.return_value = 0
self.client = client
def _prep_work_state(self):
return True
def _final_work_prep_before_transaction(self):
return True
def _fire_commitment(self):
return bytes.fromhex("deadbeef")
def _should_do_work_now(self) -> bool:
return self.executions < self.num_executions_before_stop
def _WorkTrackerBase__track_pending_commitments(self):
return False
@pytest_twisted.inlineCallbacks
def test_worker_failure_resilience():
# Control time
@ -208,3 +246,30 @@ def test_worker_rate_limiting():
yield d
assert worktracker.workdone
@pytest_twisted.inlineCallbacks
@pytest.mark.parametrize('num_executions', [0, 1, 5])
def test_worker_stopped_after_required_executions(num_executions):
# Control time
clock = Clock()
worker = MagicMock()
worktracker = WorkTrackerTrackExecutions(clock=clock,
num_executions_before_stop=num_executions,
worker=worker)
def start():
worktracker.start()
d = threads.deferToThread(start)
def advance_one_cycle(_):
clock.advance(WorkTrackerBase.INTERVAL_CEIL)
for i in range(10):
d.addCallback(advance_one_cycle)
yield d
assert worktracker.executions_when_stopped == num_executions
assert worktracker.executions >= num_executions