Check for duplicate entries between user supplied endpoints and fallback endpoints. All endpoints for a specific chain should be unique.

pull/3496/head
derekpierre 2024-06-04 11:52:14 -04:00 committed by KPrasch
parent 31bf91650c
commit 61544cca3d
No known key found for this signature in database
2 changed files with 19 additions and 9 deletions

View File

@ -275,13 +275,7 @@ class Operator(BaseActor):
def connect_condition_providers(
self, endpoints: Dict[int, List[str]]
) -> DefaultDict[int, List[HTTPProvider]]:
providers = defaultdict(list)
# de-duplicate, preserving order
deduplicated = {}
for chain_id, uris in endpoints.items():
deduplicated[chain_id] = list(dict.fromkeys(uris))
endpoints = deduplicated
providers = defaultdict(list) # use list to maintain order
# check that we have endpoints for all condition chains
if set(self.domain.condition_chain_ids) != set(endpoints):
@ -290,7 +284,10 @@ class Operator(BaseActor):
f"{set(self.domain.condition_chain_ids) - set(endpoints)}"
)
# check that each chain id is supported
# ensure that no endpoint uri for a specific chain is repeated
duplicated_endpoint_check = defaultdict(set)
# User-defined endpoints for chains
for chain_id, endpoints in endpoints.items():
if not self._is_permitted_condition_chain(chain_id):
raise NotImplementedError(
@ -299,6 +296,12 @@ class Operator(BaseActor):
# connect to each endpoint and check that they are on the correct chain
for uri in endpoints:
if uri in duplicated_endpoint_check[chain_id]:
self.log.warn(
f"Duplicated user-supplied blockchain uri, {uri}, for condition evaluation on chain {chain_id}; skipping"
)
continue
provider = self._make_condition_provider(uri)
if int(Web3(provider).eth.chain_id) != int(chain_id):
raise self.ActorError(
@ -310,13 +313,20 @@ class Operator(BaseActor):
f"user-supplied condition RPC endpoint {uri} is unhealthy"
)
providers[int(chain_id)].append(provider)
duplicated_endpoint_check[chain_id].add(uri)
# Ingest default/fallback RPC providers for each chain
for chain_id in self.domain.condition_chain_ids:
default_endpoints = get_healthy_default_rpc_endpoints(chain_id)
for uri in default_endpoints:
if uri in duplicated_endpoint_check[chain_id]:
self.log.warn(
f"Duplicated fallback blockchain uri, {uri}, for condition evaluation on chain {chain_id}; skipping"
)
continue
provider = self._make_condition_provider(uri)
providers[chain_id].append(provider)
duplicated_endpoint_check[chain_id].add(uri)
humanized_chain_ids = ", ".join(
_CONDITION_CHAINS[chain_id] for chain_id in providers

View File

@ -172,7 +172,7 @@ def get_healthy_default_rpc_endpoints(chain_id: int) -> List[str]:
healthy = [
endpoint for endpoint in chain_endpoints if rpc_endpoint_health_check(endpoint)
]
LOGGER.info(f"Healthy RPC endpoints for chain ID {chain_id}: {healthy}")
LOGGER.info(f"Healthy default RPC endpoints for chain ID {chain_id}: {healthy}")
if not healthy:
LOGGER.warn(
f"No healthy default RPC endpoints available for chain ID {chain_id}"