mirror of https://github.com/nucypher/nucypher.git
Merge pull request #3549 from derekpierre/poa-redundancy
POA Error: reconfiguration of `BlockchainInterface` during connectivity problems results in incomplete middleware setupv7.4.1-hotfix
commit
d22fea13fd
|
@ -0,0 +1,2 @@
|
|||
Prevent connectivity issues from improperly re-initializing underlying instances of ``EthereumClient``
|
||||
and ``Web3`` within a ``BlockchainInterface`` instance.
|
|
@ -13,7 +13,6 @@ from web3.types import TxReceipt, Wei
|
|||
|
||||
from nucypher.blockchain.eth.constants import (
|
||||
AVERAGE_BLOCK_TIME_IN_SECONDS,
|
||||
POA_CHAINS,
|
||||
PUBLIC_CHAINS,
|
||||
)
|
||||
from nucypher.blockchain.middleware.retry import (
|
||||
|
@ -81,43 +80,36 @@ class EthereumClient:
|
|||
self._add_default_middleware()
|
||||
|
||||
def _add_default_middleware(self):
|
||||
# add POA middleware irrespective of chain
|
||||
poa_middleware_name = "poa"
|
||||
self.log.info("Injecting POA middleware at layer 0")
|
||||
self.inject_middleware(
|
||||
geth_poa_middleware,
|
||||
layer=0,
|
||||
name=poa_middleware_name,
|
||||
)
|
||||
|
||||
# retry request middleware
|
||||
endpoint_uri = getattr(self.w3.provider, "endpoint_uri", "")
|
||||
if "infura" in endpoint_uri:
|
||||
self.log.debug("Adding Infura RPC retry middleware to client")
|
||||
self.add_middleware(InfuraRetryRequestMiddleware)
|
||||
self.log.info("Adding Infura RPC retry middleware to client")
|
||||
self.add_middleware(InfuraRetryRequestMiddleware, name="infura_retry")
|
||||
elif "alchemyapi.io" in endpoint_uri:
|
||||
self.log.debug("Adding Alchemy RPC retry middleware to client")
|
||||
self.add_middleware(AlchemyRetryRequestMiddleware)
|
||||
self.log.info("Adding Alchemy RPC retry middleware to client")
|
||||
self.add_middleware(AlchemyRetryRequestMiddleware, name="alchemy_retry")
|
||||
else:
|
||||
self.log.debug("Adding RPC retry middleware to client")
|
||||
self.add_middleware(RetryRequestMiddleware)
|
||||
|
||||
# poa middleware
|
||||
chain_id = self.chain_id
|
||||
is_poa = chain_id in POA_CHAINS
|
||||
|
||||
self.log.debug(
|
||||
f"Blockchain: {self.chain_name} (chain_id={chain_id}, poa={is_poa})"
|
||||
)
|
||||
if is_poa:
|
||||
# proof-of-authority blockchain
|
||||
self.log.info("Injecting POA middleware at layer 0")
|
||||
self.inject_middleware(geth_poa_middleware, layer=0, name="poa")
|
||||
self.log.info("Adding RPC retry middleware to client")
|
||||
self.add_middleware(RetryRequestMiddleware, name="retry")
|
||||
|
||||
# simple cache middleware
|
||||
self.log.debug("Adding simple_cache_middleware")
|
||||
self.add_middleware(simple_cache_middleware)
|
||||
self.log.info("Adding simple_cache_middleware")
|
||||
self.add_middleware(simple_cache_middleware, name="simple_cache")
|
||||
|
||||
@property
|
||||
def chain_name(self) -> str:
|
||||
name = PUBLIC_CHAINS.get(self.chain_id, UNKNOWN_DEVELOPMENT_CHAIN_ID)
|
||||
return name
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
return self.w3.is_connected()
|
||||
|
||||
@property
|
||||
def accounts(self):
|
||||
return self.w3.eth.accounts
|
||||
|
@ -128,8 +120,8 @@ class EthereumClient:
|
|||
def inject_middleware(self, middleware, **kwargs):
|
||||
self.w3.middleware_onion.inject(middleware, **kwargs)
|
||||
|
||||
def add_middleware(self, middleware):
|
||||
self.w3.middleware_onion.add(middleware)
|
||||
def add_middleware(self, middleware, **kwargs):
|
||||
self.w3.middleware_onion.add(middleware, **kwargs)
|
||||
|
||||
def set_gas_strategy(self, gas_strategy):
|
||||
self.w3.eth.set_gas_price_strategy(gas_strategy)
|
||||
|
|
|
@ -253,6 +253,8 @@ class BlockchainInterface:
|
|||
self.gas_strategy = gas_strategy or self.DEFAULT_GAS_STRATEGY
|
||||
self.max_gas_price = max_gas_price
|
||||
|
||||
self.__is_initialized = False
|
||||
|
||||
def __repr__(self):
|
||||
r = "{name}({uri})".format(name=self.__class__.__name__, uri=self.endpoint)
|
||||
return r
|
||||
|
@ -261,13 +263,8 @@ class BlockchainInterface:
|
|||
return self.client.get_blocktime()
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""
|
||||
https://web3py.readthedocs.io/en/stable/__provider.html#examples-using-automated-detection
|
||||
"""
|
||||
if self.client is NO_BLOCKCHAIN_CONNECTION:
|
||||
return False
|
||||
return self.client.is_connected
|
||||
def is_initialized(self) -> bool:
|
||||
return self.__is_initialized
|
||||
|
||||
@classmethod
|
||||
def get_gas_strategy(cls, gas_strategy: Union[str, Callable] = None) -> Callable:
|
||||
|
@ -310,7 +307,7 @@ class BlockchainInterface:
|
|||
# self.log.debug(f"Gas strategy currently reports a gas price of {gwei_gas_price} gwei.")
|
||||
|
||||
def connect(self):
|
||||
if self.is_connected:
|
||||
if self.__is_initialized:
|
||||
# safety check - connect was already previously called
|
||||
return
|
||||
|
||||
|
@ -327,17 +324,21 @@ class BlockchainInterface:
|
|||
raise self.NoProvider("There are no configured blockchain providers")
|
||||
|
||||
try:
|
||||
self.w3 = self.Web3(provider=self._provider)
|
||||
w3 = self.Web3(provider=self._provider)
|
||||
# client mutates w3 instance (configures middleware etc.)
|
||||
self.client = EthereumClient(w3=self.w3)
|
||||
client = EthereumClient(w3=w3)
|
||||
|
||||
# log info
|
||||
latest_block_number = client.get_block("latest")["number"]
|
||||
chain_id = client.chain_id
|
||||
|
||||
# web3 instance fully configured; share instance with ATxM and respective strategies
|
||||
speedup_strategy = ExponentialSpeedupStrategy(
|
||||
w3=self.w3,
|
||||
w3=w3,
|
||||
min_time_between_speedups=120,
|
||||
) # speedup txs if not mined after 2 mins.
|
||||
self.tx_machine = AutomaticTxMachine(
|
||||
w3=self.w3, tx_exec_timeout=self.TIMEOUT, strategies=[speedup_strategy]
|
||||
tx_machine = AutomaticTxMachine(
|
||||
w3=w3, tx_exec_timeout=self.TIMEOUT, strategies=[speedup_strategy]
|
||||
)
|
||||
except requests.ConnectionError: # RPC
|
||||
raise self.ConnectionFailed(
|
||||
|
@ -348,7 +349,17 @@ class BlockchainInterface:
|
|||
f"Connection Failed - {str(self.endpoint)} - is IPC enabled?"
|
||||
)
|
||||
|
||||
return self.is_connected
|
||||
# Only set member variables once early set up is successful
|
||||
# - prevents incomplete instantiations
|
||||
self.w3 = w3
|
||||
self.client = client
|
||||
self.tx_machine = tx_machine
|
||||
self.log.info(
|
||||
f"Blockchain: {client.chain_name} (chain_id={chain_id}, block_num={latest_block_number})"
|
||||
)
|
||||
|
||||
self.__is_initialized = True
|
||||
return self.__is_initialized
|
||||
|
||||
@property
|
||||
def provider(self) -> BaseProvider:
|
||||
|
@ -833,7 +844,7 @@ class BlockchainInterfaceFactory:
|
|||
|
||||
# Connect and Sync
|
||||
interface, emitter = cached_interface
|
||||
if not interface.is_connected:
|
||||
if not interface.is_initialized:
|
||||
interface.connect()
|
||||
return interface
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ from web3.middleware import geth_poa_middleware
|
|||
from web3.providers import BaseProvider
|
||||
from web3.types import ABIFunction
|
||||
|
||||
from nucypher.blockchain.eth.constants import POA_CHAINS
|
||||
from nucypher.policy.conditions import STANDARD_ABI_CONTRACT_TYPES, STANDARD_ABIS
|
||||
from nucypher.policy.conditions.base import AccessControlCondition
|
||||
from nucypher.policy.conditions.context import (
|
||||
|
@ -211,9 +210,9 @@ class RPCCondition(AccessControlCondition):
|
|||
# Instantiate a local web3 instance
|
||||
self.provider = provider
|
||||
w3 = Web3(provider)
|
||||
if self.chain in POA_CHAINS:
|
||||
# inject web3 middleware to handle POA chain extra_data field.
|
||||
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
|
||||
# inject web3 middleware to handle POA chain extra_data field.
|
||||
w3.middleware_onion.inject(geth_poa_middleware, layer=0, name="poa")
|
||||
|
||||
return w3
|
||||
|
||||
def _check_chain_id(self) -> None:
|
||||
|
|
|
@ -68,7 +68,10 @@ class MockEthereumClient(EthereumClient):
|
|||
def __init__(self, w3):
|
||||
super().__init__(w3=w3)
|
||||
|
||||
def add_middleware(self, middleware):
|
||||
def add_middleware(self, middleware, **kwargs):
|
||||
pass
|
||||
|
||||
def inject_middleware(self, middleware, **kwargs):
|
||||
pass
|
||||
|
||||
@property
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
|
||||
from typing import Optional
|
||||
from unittest.mock import PropertyMock
|
||||
|
||||
from constant_sorrow.constants import ALL_OF_THEM
|
||||
from requests import HTTPError
|
||||
from web3 import BaseProvider
|
||||
from web3.gas_strategies import time_based
|
||||
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface
|
||||
|
@ -99,3 +102,69 @@ def test_use_pending_nonce_when_building_payload(mock_testerchain, mocker, rando
|
|||
assert payload['nonce'] == 6
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=False)
|
||||
assert payload['nonce'] == 6
|
||||
|
||||
|
||||
def test_connect_handle_connectivity_issues(mocker):
|
||||
|
||||
mock_eth = mocker.MagicMock()
|
||||
type(mock_eth).chain_id = PropertyMock(return_value=137)
|
||||
|
||||
mock_middleware_onion = mocker.Mock()
|
||||
|
||||
class MockWeb3:
|
||||
def __init__(self, provider: Optional[BaseProvider] = None, *args, **kwargs):
|
||||
self.provider = provider
|
||||
self.eth = mock_eth
|
||||
self.middleware_onion = mock_middleware_onion
|
||||
|
||||
middlewares = []
|
||||
self.middleware_onion.middlewares = middlewares
|
||||
|
||||
def add_middleware(middleware, name=None):
|
||||
middlewares.append(middleware)
|
||||
|
||||
def inject_middleware(middleware, layer=0, name=None):
|
||||
middlewares.insert(layer, middleware)
|
||||
|
||||
mock_middleware_onion.add.side_effect = add_middleware
|
||||
mock_middleware_onion.inject.side_effect = inject_middleware
|
||||
|
||||
class TestBlockchainInterface(BlockchainInterface):
|
||||
Web3 = MockWeb3
|
||||
|
||||
blockchain_interface = TestBlockchainInterface(
|
||||
endpoint="https://public-node.io:8445"
|
||||
)
|
||||
|
||||
assert not blockchain_interface.is_initialized
|
||||
|
||||
# connect() is called with no connectivity issues and executes successfully
|
||||
blockchain_interface.connect()
|
||||
assert blockchain_interface.is_initialized
|
||||
|
||||
# poa, retry, simplecache
|
||||
current_middlewares = blockchain_interface.w3.middleware_onion.middlewares
|
||||
assert len(current_middlewares) == 3
|
||||
|
||||
w3 = blockchain_interface.w3
|
||||
client = blockchain_interface.client
|
||||
tx_machine = blockchain_interface.tx_machine
|
||||
|
||||
# mimic connectivity issues
|
||||
type(mock_eth).chain_id = PropertyMock(side_effect=HTTPError("connectivity issue"))
|
||||
|
||||
# Mimic scanner task that connectivity experienced exception and ran connect()
|
||||
# again on blockchain interface.
|
||||
# However, connect() does nothing the 2nd time around because it already completed
|
||||
# successfully the first time
|
||||
blockchain_interface.connect()
|
||||
|
||||
# no change;
|
||||
# same underlying instances
|
||||
assert w3 == blockchain_interface.w3
|
||||
assert client == blockchain_interface.client
|
||||
assert tx_machine == blockchain_interface.tx_machine
|
||||
|
||||
# same middlewares remain - poa, retry, simplecache
|
||||
assert len(blockchain_interface.w3.middleware_onion.middlewares) == 3
|
||||
assert blockchain_interface.w3.middleware_onion.middlewares == current_middlewares
|
||||
|
|
|
@ -51,9 +51,12 @@ class SyncedMockW3Eth:
|
|||
chain_id = hex(CHAIN_ID)
|
||||
block_number = 5
|
||||
|
||||
def getBlock(self, blockNumber):
|
||||
def get_block(self, blockNumber):
|
||||
return {
|
||||
'timestamp': datetime.datetime.timestamp(datetime.datetime.now() - datetime.timedelta(seconds=25))
|
||||
"timestamp": datetime.datetime.timestamp(
|
||||
datetime.datetime.now() - datetime.timedelta(seconds=25)
|
||||
),
|
||||
"number": 123456789,
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue