Improves fallback RPC endpoint logging; perform startup health check for operator-supplied RPC endpoints.

pull/3496/head
KPrasch 2024-05-30 10:45:47 +02:00
parent fb4ed7c008
commit bdd66f2289
No known key found for this signature in database
2 changed files with 25 additions and 12 deletions

View File

@ -55,6 +55,7 @@ from nucypher.blockchain.eth.trackers.bonding import OperatorBondedTracker
from nucypher.blockchain.eth.utils import ( from nucypher.blockchain.eth.utils import (
get_healthy_default_rpc_endpoints, get_healthy_default_rpc_endpoints,
truncate_checksum_address, truncate_checksum_address,
rpc_endpoint_health_check,
) )
from nucypher.crypto.powers import ( from nucypher.crypto.powers import (
CryptoPower, CryptoPower,
@ -280,14 +281,14 @@ class Operator(BaseActor):
if set(self.domain.condition_chain_ids) != set(endpoints): if set(self.domain.condition_chain_ids) != set(endpoints):
raise self.ActorError( raise self.ActorError(
f"Missing blockchain endpoints for chains: " f"Missing blockchain endpoints for chains: "
f"{self.domain.condition_chain_ids - set(endpoints)}" f"{set(self.domain.condition_chain_ids) - set(endpoints)}"
) )
# check that each chain id is supported # check that each chain id is supported
for chain_id, endpoints in endpoints.items(): for chain_id, endpoints in endpoints.items():
if not self._is_permitted_condition_chain(chain_id): if not self._is_permitted_condition_chain(chain_id):
raise NotImplementedError( raise NotImplementedError(
f"Chain ID {chain_id} is not supported for condition evaluation by this Operator." f"Chain ID {chain_id} is not supported for condition evaluation by this operator."
) )
# connect to each endpoint and check that they are on the correct chain # connect to each endpoint and check that they are on the correct chain
@ -297,22 +298,26 @@ class Operator(BaseActor):
raise self.ActorError( raise self.ActorError(
f"Condition blockchain endpoint {uri} is not on chain {chain_id}" f"Condition blockchain endpoint {uri} is not on chain {chain_id}"
) )
healthy = rpc_endpoint_health_check(endpoint=uri)
if not healthy:
self.log.warn(
f"user-supplied condition RPC endpoint {uri} is unhealthy"
)
providers[int(chain_id)].append(provider) providers[int(chain_id)].append(provider)
# Ingest default RPC providers for each chain # Ingest default/fallback RPC providers for each chain
for chain_id in self.domain.condition_chain_ids: for chain_id in self.domain.condition_chain_ids:
default_endpoints = get_healthy_default_rpc_endpoints(chain_id) default_endpoints = get_healthy_default_rpc_endpoints(chain_id)
for uri in default_endpoints: for uri in default_endpoints:
provider = self._make_condition_provider(uri) provider = self._make_condition_provider(uri)
providers[chain_id].append(provider) providers[chain_id].append(provider)
self.log.info(f"Connected to {len(providers)} default RPC endpoints")
humanized_chain_ids = ", ".join( humanized_chain_ids = ", ".join(
_CONDITION_CHAINS[chain_id] for chain_id in providers _CONDITION_CHAINS[chain_id] for chain_id in providers
) )
self.log.info( self.log.info(
f"Connected to {len(providers)} total endpoints for condition checking: {humanized_chain_ids}" f"Connected to {len(providers.values())} RPC endpoints for condition "
f"checking on chain IDs {humanized_chain_ids}"
) )
return providers return providers

View File

@ -112,8 +112,8 @@ def rpc_endpoint_health_check(endpoint: str, max_drift_seconds: int = 60) -> boo
if data["result"] is None: if data["result"] is None:
LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: no block data") LOGGER.debug(f"RPC endpoint {endpoint} is unhealthy: no block data")
return False return False
block_data = data["result"]
block_data = data["result"]
try: try:
timestamp = int(block_data.get("timestamp"), 16) timestamp = int(block_data.get("timestamp"), 16)
except TypeError: except TypeError:
@ -134,7 +134,8 @@ def rpc_endpoint_health_check(endpoint: str, max_drift_seconds: int = 60) -> boo
def get_default_rpc_endpoints() -> Dict[int, List[str]]: def get_default_rpc_endpoints() -> Dict[int, List[str]]:
""" """
Fetches the default RPC endpoints for various chains from the nucypher/chainlist repository. Fetches the default RPC endpoints for various chains
from the nucypher/chainlist repository.
""" """
LOGGER.debug("Fetching default RPC endpoints from remote chainlist") LOGGER.debug("Fetching default RPC endpoints from remote chainlist")
response = requests.get(CHAINLIST_URL) response = requests.get(CHAINLIST_URL)
@ -148,18 +149,25 @@ def get_default_rpc_endpoints() -> Dict[int, List[str]]:
def get_healthy_default_rpc_endpoints(chain_id: int) -> List[str]: def get_healthy_default_rpc_endpoints(chain_id: int) -> List[str]:
""" """Returns a list of healthy RPC endpoints for a given chain ID."""
Returns a list of healthy RPC endpoints for a given chain ID.
""" healthy = list()
healthy = []
endpoints = get_default_rpc_endpoints() endpoints = get_default_rpc_endpoints()
chain_endpoints = endpoints.get(chain_id) chain_endpoints = endpoints.get(chain_id)
if not chain_endpoints: if not chain_endpoints:
LOGGER.error(f"No default RPC endpoints found for chain ID {chain_id}") LOGGER.error(f"No default RPC endpoints found for chain ID {chain_id}")
return healthy return healthy
for endpoint in chain_endpoints: for endpoint in chain_endpoints:
if rpc_endpoint_health_check(endpoint=endpoint): if rpc_endpoint_health_check(endpoint=endpoint):
healthy.append(endpoint) healthy.append(endpoint)
LOGGER.info(f"Healthy RPC endpoints for chain ID {chain_id}: {healthy}") LOGGER.info(f"Healthy RPC endpoints for chain ID {chain_id}: {healthy}")
if len(healthy) == 0:
LOGGER.warn(
f"No healthy default RPC endpoints available for chain ID {chain_id}"
)
return healthy return healthy