Refactor RPC endpoint management to use domain-specific endpoints

pull/3569/head
James Campbell 2024-12-10 11:47:36 +01:00
parent c8d5227d28
commit 681ae66bf5
No known key found for this signature in database
3 changed files with 25 additions and 92 deletions

View File

@ -54,4 +54,4 @@ POA_CHAINS = {
80002, # "Polygon/Amoy"
}
CHAINLIST_URL = "https://raw.githubusercontent.com/nucypher/chainlist/main/rpc.json"
CHAINLIST_URL = "https://raw.githubusercontent.com/nucypher/chainlist/main/"

View File

@ -1,14 +1,12 @@
import random
import time
from decimal import Decimal
from typing import Dict, List, Union
from urllib.parse import urlparse
import requests
from cytoolz import memoize
from eth_typing import ChecksumAddress
from requests import RequestException
from web3 import HTTPProvider, Web3
from web3 import Web3
from web3.contract.contract import ContractConstructor, ContractFunction
from web3.types import TxParams
@ -136,17 +134,17 @@ def rpc_endpoint_health_check(endpoint: str, max_drift_seconds: int = 60) -> boo
return True # finally!
def get_default_rpc_endpoints() -> Dict[int, List[str]]:
@memoize
def get_default_rpc_endpoints(domain) -> Dict[int, List[str]]:
"""
Fetches the default RPC endpoints for various chains
from the nucypher/chainlist repository.
"""
LOGGER.debug(
f"Fetching default RPC endpoints from remote chainlist {CHAINLIST_URL}"
)
url = CHAINLIST_URL + domain + ".json"
LOGGER.debug(f"Fetching default RPC endpoints from remote chainlist {url}")
try:
response = requests.get(CHAINLIST_URL)
response = requests.get(url)
except RequestException:
LOGGER.warn("Failed to fetch default RPC endpoints: network error")
return {}
@ -162,78 +160,22 @@ def get_default_rpc_endpoints() -> Dict[int, List[str]]:
return {}
def get_healthy_default_rpc_endpoints(chain_id: int) -> List[str]:
def get_healthy_default_rpc_endpoints(domain: str) -> Dict[int, List[str]]:
"""Returns a list of healthy RPC endpoints for a given chain ID."""
endpoints = get_default_rpc_endpoints()
chain_endpoints = endpoints.get(chain_id)
endpoints = get_default_rpc_endpoints(domain)
if not chain_endpoints:
LOGGER.error(f"No default RPC endpoints found for chain ID {chain_id}")
return list()
healthy = [
endpoint for endpoint in chain_endpoints if rpc_endpoint_health_check(endpoint)
]
LOGGER.info(f"Healthy default RPC endpoints for chain ID {chain_id}: {healthy}")
if not healthy:
LOGGER.warn(
f"No healthy default RPC endpoints available for chain ID {chain_id}"
)
if domain == "mainnet":
# iterate over all chains and filter out unhealthy endpoints
healthy = {
chain_id: [
endpoint
for endpoint in endpoints[chain_id]
if rpc_endpoint_health_check(endpoint)
]
for chain_id in endpoints
}
else:
healthy = endpoints
return healthy
@memoize
def _fetch_public_rpc_endpoints_from_chainlist():
chainid_network = "https://chainid.network/chains.json"
LOGGER.debug(f"Fetching public RPC endpoints from {chainid_network}")
try:
response = requests.get(chainid_network)
except RequestException:
LOGGER.warn("Failed to fetch public RPC endpoints: network error")
return {}
if response.status_code != 200:
LOGGER.error(
f"Failed to fetch default RPC endpoints: {response.status_code} | {response.text}"
)
return {}
result = response.json()
return result
def fetch_public_rpc_endpoints_for_chain(chain_id: int) -> List[HTTPProvider]:
result = _fetch_public_rpc_endpoints_from_chainlist()
if not result:
return result
rpc_endpoints = []
for entry in result:
if entry["chainId"] != chain_id:
continue
if entry.get("status", None) == "deprecated":
break
endpoints = entry.get("rpc", [])
for endpoint in endpoints:
# ensure no infura key
if "${" in endpoint:
# filter out urls like:
# - https://mainnet.infura.io/v3/${INFURA_API_KEY},
# - https://mainnet.infura.io/v3/${ALCHEMY_API_KEY}
continue
# only use https endpoints
url_components = urlparse(endpoint)
if url_components.scheme != "https":
continue
rpc_endpoints.append(HTTPProvider(endpoint))
random.shuffle(rpc_endpoints)
return rpc_endpoints

View File

@ -47,11 +47,11 @@ def test_get_default_rpc_endpoints(mocker):
1: ["http://endpoint1", "http://endpoint2"],
2: ["http://endpoint3", "http://endpoint4"],
}
assert get_default_rpc_endpoints() == expected_result
assert get_default_rpc_endpoints("domain") == expected_result
# Mock a failed response
mock_get.return_value.status_code = 500
assert get_default_rpc_endpoints() == {}
assert get_default_rpc_endpoints("bad_domain") == {}
def test_get_healthy_default_rpc_endpoints(mocker):
@ -71,14 +71,5 @@ def test_get_healthy_default_rpc_endpoints(mocker):
or endpoint == "http://endpoint3"
)
# Test chain ID 1
healthy_endpoints = get_healthy_default_rpc_endpoints(1)
assert healthy_endpoints == ["http://endpoint1"]
# Test chain ID 2
healthy_endpoints = get_healthy_default_rpc_endpoints(2)
assert healthy_endpoints == ["http://endpoint3"]
# Test chain ID with no healthy endpoints
healthy_endpoints = get_healthy_default_rpc_endpoints(3)
assert healthy_endpoints == []
healthy_endpoints = get_healthy_default_rpc_endpoints("mainnet")
assert healthy_endpoints == {1: ["http://endpoint1"], 2: ["http://endpoint3"]}