Merge pull request #3496 from KPrasch/rpc

Default RPC endpoints for Condition Evaluation
pull/3510/head
KPrasch 2024-06-05 15:52:19 +02:00 committed by GitHub
commit 3b3263a57d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 280 additions and 44 deletions

View File

@ -0,0 +1 @@
Support for default/fallback RPC endpoints from remote sources as a backup for operator-supplied RPC endpoints for condition evaluation.

View File

@ -4,7 +4,7 @@ import time
import traceback
from collections import defaultdict
from decimal import Decimal
from typing import DefaultDict, Dict, List, Optional, Set, Union
from typing import DefaultDict, Dict, List, Optional, Union
import maya
from atxm.exceptions import InsufficientFunds
@ -37,8 +37,10 @@ from nucypher.blockchain.eth.agents import (
TACoApplicationAgent,
TACoChildApplicationAgent,
)
from nucypher.blockchain.eth.clients import PUBLIC_CHAINS
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.constants import (
NULL_ADDRESS,
PUBLIC_CHAINS,
)
from nucypher.blockchain.eth.decorators import validate_checksum_address
from nucypher.blockchain.eth.domains import TACoDomain
from nucypher.blockchain.eth.interfaces import (
@ -50,7 +52,11 @@ from nucypher.blockchain.eth.registry import ContractRegistry
from nucypher.blockchain.eth.signers import Signer
from nucypher.blockchain.eth.trackers import dkg
from nucypher.blockchain.eth.trackers.bonding import OperatorBondedTracker
from nucypher.blockchain.eth.utils import truncate_checksum_address
from nucypher.blockchain.eth.utils import (
get_healthy_default_rpc_endpoints,
rpc_endpoint_health_check,
truncate_checksum_address,
)
from nucypher.crypto.powers import (
CryptoPower,
RitualisticPower,
@ -268,37 +274,66 @@ class Operator(BaseActor):
def connect_condition_providers(
self, endpoints: Dict[int, List[str]]
) -> DefaultDict[int, Set[HTTPProvider]]:
providers = defaultdict(set)
) -> DefaultDict[int, List[HTTPProvider]]:
providers = defaultdict(list) # use list to maintain order
# check that we have endpoints for all condition chains
if self.domain.condition_chain_ids != set(endpoints):
if set(self.domain.condition_chain_ids) != set(endpoints):
raise self.ActorError(
f"Missing blockchain endpoints for chains: "
f"{self.domain.condition_chain_ids - set(endpoints)}"
f"{set(self.domain.condition_chain_ids) - set(endpoints)}"
)
# check that each chain id is supported
# ensure that no endpoint uri for a specific chain is repeated
duplicated_endpoint_check = defaultdict(set)
# User-defined endpoints for chains
for chain_id, endpoints in endpoints.items():
if not self._is_permitted_condition_chain(chain_id):
raise NotImplementedError(
f"Chain ID {chain_id} is not supported for condition evaluation by this Operator."
f"Chain ID {chain_id} is not supported for condition evaluation by this operator."
)
# connect to each endpoint and check that they are on the correct chain
for uri in endpoints:
if uri in duplicated_endpoint_check[chain_id]:
self.log.warn(
f"Duplicated user-supplied blockchain uri, {uri}, for condition evaluation on chain {chain_id}; skipping"
)
continue
provider = self._make_condition_provider(uri)
if int(Web3(provider).eth.chain_id) != int(chain_id):
raise self.ActorError(
f"Condition blockchain endpoint {uri} is not on chain {chain_id}"
)
providers[int(chain_id)].add(provider)
healthy = rpc_endpoint_health_check(endpoint=uri)
if not healthy:
self.log.warn(
f"user-supplied condition RPC endpoint {uri} is unhealthy"
)
providers[int(chain_id)].append(provider)
duplicated_endpoint_check[chain_id].add(uri)
# Ingest default/fallback RPC providers for each chain
for chain_id in self.domain.condition_chain_ids:
default_endpoints = get_healthy_default_rpc_endpoints(chain_id)
for uri in default_endpoints:
if uri in duplicated_endpoint_check[chain_id]:
self.log.warn(
f"Duplicated fallback blockchain uri, {uri}, for condition evaluation on chain {chain_id}; skipping"
)
continue
provider = self._make_condition_provider(uri)
providers[chain_id].append(provider)
duplicated_endpoint_check[chain_id].add(uri)
humanized_chain_ids = ", ".join(
_CONDITION_CHAINS[chain_id] for chain_id in providers
)
self.log.info(
f"Connected to {len(providers)} blockchains for condition checking: {humanized_chain_ids}"
f"Connected to {sum(len(v) for v in providers.values())} RPC endpoints for condition "
f"checking on chain IDs {humanized_chain_ids}"
)
return providers

View File

@ -10,7 +10,10 @@ from web3.contract.contract import Contract
from web3.exceptions import TimeExhausted, TransactionNotFound
from web3.types import TxReceipt, Wei
from nucypher.blockchain.eth.constants import AVERAGE_BLOCK_TIME_IN_SECONDS
from nucypher.blockchain.eth.constants import (
AVERAGE_BLOCK_TIME_IN_SECONDS,
PUBLIC_CHAINS,
)
from nucypher.blockchain.middleware.retry import (
AlchemyRetryRequestMiddleware,
InfuraRetryRequestMiddleware,
@ -33,28 +36,6 @@ class Web3ClientUnexpectedVersionString(Web3ClientError):
pass
PUBLIC_CHAINS = {
1: "Mainnet",
137: "Polygon/Mainnet",
11155111: "Sepolia",
80002: "Polygon/Amoy",
}
# This list is not exhaustive,
# but is sufficient for the current needs of the project.
POA_CHAINS = {
4, # Rinkeby
5, # Goerli
42, # Kovan
77, # Sokol
100, # xDAI
10200, # gnosis/chiado,
137, # Polygon/Mainnet
80001, # "Polygon/Mumbai"
80002, # "Polygon/Amoy"
}
class EthereumClient:
BLOCK_CONFIRMATIONS_POLLING_TIME = 3 # seconds
TRANSACTION_POLLING_TIME = 0.5 # seconds

View File

@ -1,5 +1,3 @@
#
# Contract Names
#
@ -15,7 +13,6 @@ TACO_CHILD_APPLICATION_CONTRACT_NAME = "TACoChildApplication"
COORDINATOR_CONTRACT_NAME = "Coordinator"
SUBSCRIPTION_MANAGER_CONTRACT_NAME = "SubscriptionManager"
TACO_CONTRACT_NAMES = (
TACO_APPLICATION_CONTRACT_NAME,
TACO_CHILD_APPLICATION_CONTRACT_NAME,
@ -23,7 +20,6 @@ TACO_CONTRACT_NAMES = (
SUBSCRIPTION_MANAGER_CONTRACT_NAME
)
# Ethereum
AVERAGE_BLOCK_TIME_IN_SECONDS = 14
@ -37,3 +33,25 @@ NULL_ADDRESS = '0x' + '0' * 40
# NuCypher
# TODO: this is equal to HRAC.SIZE.
POLICY_ID_LENGTH = 16
PUBLIC_CHAINS = {
1: "Mainnet",
137: "Polygon/Mainnet",
11155111: "Sepolia",
80002: "Polygon/Amoy",
}
POA_CHAINS = {
4, # Rinkeby
5, # Goerli
42, # Kovan
77, # Sokol
100, # xDAI
10200, # gnosis/chiado,
137, # Polygon/Mainnet
80001, # "Polygon/Mumbai"
80002, # "Polygon/Amoy"
}
CHAINLIST_URL = "https://raw.githubusercontent.com/nucypher/chainlist/main/rpc.json"

View File

@ -21,7 +21,8 @@ from web3.middleware import geth_poa_middleware, simple_cache_middleware
from web3.providers import BaseProvider
from web3.types import TxParams, TxReceipt
from nucypher.blockchain.eth.clients import POA_CHAINS, EthereumClient
from nucypher.blockchain.eth.clients import EthereumClient
from nucypher.blockchain.eth.constants import POA_CHAINS
from nucypher.blockchain.eth.decorators import validate_checksum_address
from nucypher.blockchain.eth.providers import (
_get_http_provider,

View File

@ -1,11 +1,19 @@
import time
from decimal import Decimal
from typing import Union
from typing import Dict, List, Union
import requests
from eth_typing import ChecksumAddress
from requests import RequestException
from web3 import Web3
from web3.contract.contract import ContractConstructor, ContractFunction
from web3.types import TxParams
from nucypher.blockchain.eth.constants import CHAINLIST_URL
from nucypher.utilities.logging import Logger
LOGGER = Logger("utility")
def prettify_eth_amount(amount, original_denomination: str = 'wei') -> str:
"""
@ -62,3 +70,112 @@ def get_tx_cost_data(transaction_dict: TxParams):
max_cost_wei = max_unit_price * transaction_dict["gas"]
max_cost = Web3.from_wei(max_cost_wei, "ether")
return max_cost, max_price_gwei, tx_type
def rpc_endpoint_health_check(endpoint: str, max_drift_seconds: int = 60) -> bool:
"""
Checks the health of an Ethereum RPC endpoint by comparing the timestamp of the latest block
with the system time. The maximum drift allowed is `max_drift_seconds`.
"""
query = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["latest", False],
"id": 1,
}
LOGGER.debug(f"Checking health of RPC endpoint {endpoint}")
try:
response = requests.post(
endpoint,
json=query,
headers={"Content-Type": "application/json"},
timeout=5,
)
except requests.exceptions.RequestException:
LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: network error")
return False
if response.status_code != 200:
LOGGER.debug(
f"RPC endpoint {endpoint} is unhealthy: {response.status_code} | {response.text}"
)
return False
try:
data = response.json()
if "result" not in data:
LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: no response data")
return False
except requests.exceptions.RequestException:
LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: {response.text}")
return False
if data["result"] is None:
LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: no block data")
return False
block_data = data["result"]
try:
timestamp = int(block_data.get("timestamp"), 16)
except TypeError:
LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: invalid block data")
return False
system_time = time.time()
drift = abs(system_time - timestamp)
if drift > max_drift_seconds:
LOGGER.debug(
f"RPC endpoint {endpoint} is unhealthy: drift too large ({drift} seconds)"
)
return False
LOGGER.debug(f"RPC endpoint {endpoint} is healthy")
return True # finally!
def get_default_rpc_endpoints() -> Dict[int, List[str]]:
"""
Fetches the default RPC endpoints for various chains
from the nucypher/chainlist repository.
"""
LOGGER.debug(
f"Fetching default RPC endpoints from remote chainlist {CHAINLIST_URL}"
)
try:
response = requests.get(CHAINLIST_URL)
except RequestException:
LOGGER.warn("Failed to fetch default RPC endpoints: network error")
return {}
if response.status_code == 200:
return {
int(chain_id): endpoints for chain_id, endpoints in response.json().items()
}
else:
LOGGER.error(
f"Failed to fetch default RPC endpoints: {response.status_code} | {response.text}"
)
return {}
def get_healthy_default_rpc_endpoints(chain_id: int) -> List[str]:
"""Returns a list of healthy RPC endpoints for a given chain ID."""
endpoints = get_default_rpc_endpoints()
chain_endpoints = endpoints.get(chain_id)
if not chain_endpoints:
LOGGER.error(f"No default RPC endpoints found for chain ID {chain_id}")
return list()
healthy = [
endpoint for endpoint in chain_endpoints if rpc_endpoint_health_check(endpoint)
]
LOGGER.info(f"Healthy default RPC endpoints for chain ID {chain_id}: {healthy}")
if not healthy:
LOGGER.warn(
f"No healthy default RPC endpoints available for chain ID {chain_id}"
)
return healthy

View File

@ -18,7 +18,7 @@ from web3.middleware import geth_poa_middleware
from web3.providers import BaseProvider
from web3.types import ABIFunction
from nucypher.blockchain.eth.clients import POA_CHAINS
from nucypher.blockchain.eth.constants import POA_CHAINS
from nucypher.policy.conditions import STANDARD_ABI_CONTRACT_TYPES, STANDARD_ABIS
from nucypher.policy.conditions.base import AccessControlCondition
from nucypher.policy.conditions.context import (

View File

@ -1,8 +1,7 @@
import datetime
import maya
import pytest
from nucypher_core import EncryptedKeyFrag, RevocationOrder
from nucypher_core import EncryptedKeyFrag
from nucypher.characters.lawful import Enrico

View File

@ -0,0 +1,84 @@
import requests
from nucypher.blockchain.eth.utils import (
get_default_rpc_endpoints,
get_healthy_default_rpc_endpoints,
rpc_endpoint_health_check,
)
def test_rpc_endpoint_health_check(mocker):
mock_time = mocker.patch("time.time", return_value=1625247600)
mock_post = mocker.patch("requests.post")
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jsonrpc": "2.0",
"id": 1,
"result": {"timestamp": hex(1625247600)},
}
mock_post.return_value = mock_response
# Test a healthy endpoint
assert rpc_endpoint_health_check("http://mockendpoint") is True
# Test an unhealthy endpoint (drift too large)
mock_time.return_value = 1625247600 + 100 # System time far ahead
assert rpc_endpoint_health_check("http://mockendpoint") is False
# Test request exception
mock_post.side_effect = requests.exceptions.RequestException
assert rpc_endpoint_health_check("http://mockendpoint") is False
def test_get_default_rpc_endpoints(mocker):
mock_get = mocker.patch("requests.get")
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"1": ["http://endpoint1", "http://endpoint2"],
"2": ["http://endpoint3", "http://endpoint4"],
}
mock_get.return_value = mock_response
expected_result = {
1: ["http://endpoint1", "http://endpoint2"],
2: ["http://endpoint3", "http://endpoint4"],
}
assert get_default_rpc_endpoints() == expected_result
# Mock a failed response
mock_get.return_value.status_code = 500
assert get_default_rpc_endpoints() == {}
def test_get_healthy_default_rpc_endpoints(mocker):
mock_get_endpoints = mocker.patch(
"nucypher.blockchain.eth.utils.get_default_rpc_endpoints"
)
mock_get_endpoints.return_value = {
1: ["http://endpoint1", "http://endpoint2"],
2: ["http://endpoint3", "http://endpoint4"],
}
mock_health_check = mocker.patch(
"nucypher.blockchain.eth.utils.rpc_endpoint_health_check"
)
mock_health_check.side_effect = (
lambda endpoint: endpoint == "http://endpoint1"
or endpoint == "http://endpoint3"
)
# Test chain ID 1
healthy_endpoints = get_healthy_default_rpc_endpoints(1)
assert healthy_endpoints == ["http://endpoint1"]
# Test chain ID 2
healthy_endpoints = get_healthy_default_rpc_endpoints(2)
assert healthy_endpoints == ["http://endpoint3"]
# Test chain ID with no healthy endpoints
healthy_endpoints = get_healthy_default_rpc_endpoints(3)
assert healthy_endpoints == []