Merge pull request #3569 from derekpierre/any-rpc

EVM Condition on "Any" (Major) EVM Chain (only `lynx` for now)
pull/3577/head
Derek Pierre 2024-12-19 08:36:17 -05:00 committed by GitHub
commit 254b0c54fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 106 additions and 193 deletions

View File

@ -0,0 +1 @@
Added plumbing to support EVM condition evaluation on "any" (major) EVM chain outside of Ethereum and Polygon - only enabled on ``lynx`` testnet for now.

View File

@ -65,7 +65,6 @@ from nucypher.crypto.powers import (
TransactingPower,
)
from nucypher.datastore.dkg import DKGStorage
from nucypher.policy.conditions.evm import _CONDITION_CHAINS
from nucypher.policy.conditions.utils import evaluate_condition_lingo
from nucypher.policy.payment import ContractPayment
from nucypher.types import PhaseId
@ -265,77 +264,76 @@ class Operator(BaseActor):
)
return receipt
@staticmethod
def _is_permitted_condition_chain(chain_id: int) -> bool:
return int(chain_id) in [int(cid) for cid in _CONDITION_CHAINS.keys()]
@staticmethod
def _make_condition_provider(uri: str) -> HTTPProvider:
provider = HTTPProvider(endpoint_uri=uri)
return provider
def connect_condition_providers(
self, endpoints: Dict[int, List[str]]
self, operator_configured_endpoints: Dict[int, List[str]]
) -> DefaultDict[int, List[HTTPProvider]]:
providers = defaultdict(list) # use list to maintain order
# check that we have endpoints for all condition chains
if set(self.domain.condition_chain_ids) != set(endpoints):
# check that we have mandatory user configured endpoints
mandatory_configured_chains = {
self.domain.eth_chain.id,
self.domain.polygon_chain.id,
}
if mandatory_configured_chains != set(operator_configured_endpoints):
raise self.ActorError(
f"Missing blockchain endpoints for chains: "
f"{set(self.domain.condition_chain_ids) - set(endpoints)}"
f"Operator-configured condition endpoints for chains don't match mandatory chains: "
f"{set(operator_configured_endpoints)} vs expected {mandatory_configured_chains}"
)
providers = defaultdict(list) # use list to maintain order
# ensure that no endpoint uri for a specific chain is repeated
duplicated_endpoint_check = defaultdict(set)
# User-defined endpoints for chains
for chain_id, endpoints in endpoints.items():
if not self._is_permitted_condition_chain(chain_id):
raise NotImplementedError(
f"Chain ID {chain_id} is not supported for condition evaluation by this operator."
# Operator-configured endpoints for chains
for chain_id, chain_rpc_endpoints in operator_configured_endpoints.items():
if not chain_rpc_endpoints:
raise self.ActorError(
f"Operator-configured condition endpoint is missing for the required chain {chain_id}"
)
# connect to each endpoint and check that they are on the correct chain
for uri in endpoints:
for uri in chain_rpc_endpoints:
if uri in duplicated_endpoint_check[chain_id]:
self.log.warn(
f"Duplicated user-supplied blockchain uri, {uri}, for condition evaluation on chain {chain_id}; skipping"
f"Operator-configured condition endpoint {uri} is duplicated for condition evaluation on chain {chain_id}; skipping."
)
continue
provider = self._make_condition_provider(uri)
if int(Web3(provider).eth.chain_id) != int(chain_id):
raise self.ActorError(
f"Condition blockchain endpoint {uri} is not on chain {chain_id}"
f"Operator-configured RPC condition endpoint {uri} does not belong to chain {chain_id}"
)
healthy = rpc_endpoint_health_check(endpoint=uri)
if not healthy:
self.log.warn(
f"user-supplied condition RPC endpoint {uri} is unhealthy"
f"Operator-configured RPC condition endpoint {uri} is unhealthy"
)
providers[int(chain_id)].append(provider)
duplicated_endpoint_check[chain_id].add(uri)
# Ingest default/fallback RPC providers for each chain
for chain_id in self.domain.condition_chain_ids:
default_endpoints = get_healthy_default_rpc_endpoints(chain_id)
for uri in default_endpoints:
default_endpoints = get_healthy_default_rpc_endpoints(self.domain)
for chain_id, chain_rpc_endpoints in default_endpoints.items():
# randomize list so that the same fallback RPC endpoints aren't always used by all nodes
random.shuffle(chain_rpc_endpoints)
for uri in chain_rpc_endpoints:
if uri in duplicated_endpoint_check[chain_id]:
self.log.warn(
f"Duplicated fallback blockchain uri, {uri}, for condition evaluation on chain {chain_id}; skipping"
f"Fallback blockchain endpoint, {uri}, is duplicated for condition evaluation on chain {chain_id}; skipping"
)
continue
provider = self._make_condition_provider(uri)
providers[chain_id].append(provider)
duplicated_endpoint_check[chain_id].add(uri)
humanized_chain_ids = ", ".join(
_CONDITION_CHAINS[chain_id] for chain_id in providers
)
self.log.info(
f"Connected to {sum(len(v) for v in providers.values())} RPC endpoints for condition "
f"checking on chain IDs {humanized_chain_ids}"
f"checking on chain IDs {providers.keys()}"
)
return providers

View File

@ -54,4 +54,6 @@ POA_CHAINS = {
80002, # "Polygon/Amoy"
}
CHAINLIST_URL = "https://raw.githubusercontent.com/nucypher/chainlist/main/rpc.json"
CHAINLIST_URL_TEMPLATE = (
"https://raw.githubusercontent.com/nucypher/chainlist/main/{domain}.json"
)

View File

@ -2,7 +2,7 @@
import functools
import inspect
from datetime import datetime
from datetime import datetime, timezone
from typing import Callable, Optional, TypeVar, Union
import eth_utils
@ -111,7 +111,7 @@ def save_receipt(actor_method) -> Callable: # TODO: rename to "save_result"?
@functools.wraps(actor_method)
def wrapped(self, *args, **kwargs) -> dict:
receipt_or_txhash = actor_method(self, *args, **kwargs)
self._saved_receipts.append((datetime.utcnow(), receipt_or_txhash))
self._saved_receipts.append((datetime.now(timezone.utc), receipt_or_txhash))
return receipt_or_txhash
return wrapped

View File

@ -1,7 +1,6 @@
from enum import Enum
from typing import Any, Dict, NamedTuple, Tuple
from cytoolz.functoolz import memoize
from functools import cache
from typing import Any, Dict, NamedTuple
class UnrecognizedTacoDomain(Exception):
@ -29,12 +28,10 @@ class TACoDomain:
name: str,
eth_chain: EthChain,
polygon_chain: PolygonChain,
condition_chains: Tuple[ChainInfo, ...],
):
self.name = name
self.eth_chain = eth_chain
self.polygon_chain = polygon_chain
self.condition_chains = condition_chains
def __repr__(self) -> str:
return f"<TACoDomain {self.name}>"
@ -43,9 +40,7 @@ class TACoDomain:
return self.name
def __hash__(self) -> int:
return hash(
(self.name, self.eth_chain, self.polygon_chain, self.condition_chains)
)
return hash((self.name, self.eth_chain, self.polygon_chain))
def __bytes__(self) -> bytes:
return self.name.encode()
@ -57,7 +52,6 @@ class TACoDomain:
self.name == other.name
and self.eth_chain == other.eth_chain
and self.polygon_chain == other.polygon_chain
and self.condition_chains == other.condition_chains
)
def __bool__(self) -> bool:
@ -67,36 +61,24 @@ class TACoDomain:
def is_testnet(self) -> bool:
return self.eth_chain != EthChain.MAINNET
@property
def condition_chain_ids(self) -> set:
return set(chain.id for chain in self.condition_chains)
MAINNET = TACoDomain(
name="mainnet",
eth_chain=EthChain.MAINNET,
polygon_chain=PolygonChain.MAINNET,
condition_chains=(EthChain.MAINNET, PolygonChain.MAINNET),
)
LYNX = TACoDomain(
name="lynx",
eth_chain=EthChain.SEPOLIA,
polygon_chain=PolygonChain.AMOY,
condition_chains=(
EthChain.MAINNET,
EthChain.SEPOLIA,
PolygonChain.AMOY,
PolygonChain.MAINNET,
),
)
TAPIR = TACoDomain(
name="tapir",
eth_chain=EthChain.SEPOLIA,
polygon_chain=PolygonChain.AMOY,
condition_chains=(EthChain.SEPOLIA, PolygonChain.AMOY),
)
@ -107,7 +89,7 @@ SUPPORTED_DOMAINS: Dict[str, TACoDomain] = {
}
@memoize
@cache
def get_domain(d: Any) -> TACoDomain:
if not isinstance(d, str):
raise TypeError(f"domain must be a string, not {type(d)}")

View File

@ -1,5 +1,6 @@
import time
from decimal import Decimal
from functools import cache
from typing import Dict, List, Union
import requests
@ -9,7 +10,8 @@ from web3 import Web3
from web3.contract.contract import ContractConstructor, ContractFunction
from web3.types import TxParams
from nucypher.blockchain.eth.constants import CHAINLIST_URL
from nucypher.blockchain.eth.constants import CHAINLIST_URL_TEMPLATE
from nucypher.blockchain.eth.domains import TACoDomain
from nucypher.utilities.logging import Logger
LOGGER = Logger("utility")
@ -133,17 +135,17 @@ def rpc_endpoint_health_check(endpoint: str, max_drift_seconds: int = 60) -> boo
return True # finally!
def get_default_rpc_endpoints() -> Dict[int, List[str]]:
@cache
def get_default_rpc_endpoints(domain: TACoDomain) -> Dict[int, List[str]]:
"""
Fetches the default RPC endpoints for various chains
For a given domain, fetches the default RPC endpoints for various chains
from the nucypher/chainlist repository.
"""
LOGGER.debug(
f"Fetching default RPC endpoints from remote chainlist {CHAINLIST_URL}"
)
url = CHAINLIST_URL_TEMPLATE.format(domain=domain.name)
LOGGER.debug(f"Fetching default RPC endpoints from remote chainlist {url}")
try:
response = requests.get(CHAINLIST_URL)
response = requests.get(url)
except RequestException:
LOGGER.warn("Failed to fetch default RPC endpoints: network error")
return {}
@ -159,23 +161,21 @@ def get_default_rpc_endpoints() -> Dict[int, List[str]]:
return {}
def get_healthy_default_rpc_endpoints(chain_id: int) -> List[str]:
"""Returns a list of healthy RPC endpoints for a given chain ID."""
def get_healthy_default_rpc_endpoints(domain: TACoDomain) -> Dict[int, List[str]]:
"""Returns a mapping of chain id to healthy RPC endpoints for a given domain."""
endpoints = get_default_rpc_endpoints(domain)
endpoints = get_default_rpc_endpoints()
chain_endpoints = endpoints.get(chain_id)
if not chain_endpoints:
LOGGER.error(f"No default RPC endpoints found for chain ID {chain_id}")
return list()
healthy = [
endpoint for endpoint in chain_endpoints if rpc_endpoint_health_check(endpoint)
]
LOGGER.info(f"Healthy default RPC endpoints for chain ID {chain_id}: {healthy}")
if not healthy:
LOGGER.warn(
f"No healthy default RPC endpoints available for chain ID {chain_id}"
)
if not domain.is_testnet:
# iterate over all chains and filter out unhealthy endpoints
healthy = {
chain_id: [
endpoint
for endpoint in endpoints[chain_id]
if rpc_endpoint_health_check(endpoint)
]
for chain_id in endpoints
}
else:
healthy = endpoints
return healthy

View File

@ -42,7 +42,7 @@ def generate_self_signed_certificate(
private_key = ec.generate_private_key(curve(), default_backend())
public_key = private_key.public_key()
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
fields = [x509.NameAttribute(NameOID.COMMON_NAME, host)]
subject = issuer = x509.Name(fields)

View File

@ -56,16 +56,6 @@ from nucypher.policy.conditions.validation import (
# Permitted blockchains for condition evaluation
from nucypher.utilities import logging
_CONDITION_CHAINS = {
1: "ethereum/mainnet",
11155111: "ethereum/sepolia",
137: "polygon/mainnet",
80002: "polygon/amoy",
# TODO: Permit support for these chains
# 100: "gnosis/mainnet",
# 10200: "gnosis/chiado",
}
class RPCCall(ExecutionCall):
LOG = logging.Logger(__name__)
@ -88,13 +78,6 @@ class RPCCall(ExecutionCall):
fields.Field, attribute="parameters", required=False, allow_none=True
)
@validates("chain")
def validate_chain(self, value):
if value not in _CONDITION_CHAINS:
raise ValidationError(
f"chain ID {value} is not a permitted blockchain for condition evaluation"
)
@validates("method")
def validate_method(self, value):
if value not in RPCCall.ALLOWED_METHODS:
@ -154,15 +137,10 @@ class RPCCall(ExecutionCall):
self, providers: Dict[int, Set[HTTPProvider]]
) -> Iterator[HTTPProvider]:
"""Yields the next web3 provider to try for a given chain ID"""
try:
rpc_providers = providers[self.chain]
# if there are no entries for the chain ID, there
# is no connection to that chain available.
except KeyError:
raise NoConnectionToChain(chain=self.chain)
rpc_providers = providers.get(self.chain, None)
if not rpc_providers:
raise NoConnectionToChain(chain=self.chain) # TODO: unreachable?
raise NoConnectionToChain(chain=self.chain)
for provider in rpc_providers:
# Someday, we might make this whole function async, and then we can knock on
# each endpoint here to see if it's alive and only yield it if it is.

View File

@ -188,7 +188,7 @@ class EventScanner:
# minor chain reorganisation?
return None
last_time = block_info["timestamp"]
return datetime.datetime.utcfromtimestamp(last_time)
return datetime.datetime.fromtimestamp(last_time, tz=datetime.timezone.utc)
def get_suggested_scan_start_block(self):
"""Get where we should start to scan for new token events.

View File

@ -21,7 +21,6 @@ from tests.constants import (
MIN_OPERATOR_SECONDS,
TEMPORARY_DOMAIN,
TEST_ETH_PROVIDER_URI,
TESTERCHAIN_CHAIN_ID,
)
from tests.utils.blockchain import ReservedTestAccountManager, TesterBlockchain
from tests.utils.registry import ApeRegistrySource
@ -452,10 +451,6 @@ def multichain_ursulas(ursulas, multichain_ids, mock_rpc_condition):
@pytest.fixture(scope="module", autouse=True)
def mock_condition_blockchains(module_mocker):
"""adds testerchain's chain ID to permitted conditional chains"""
module_mocker.patch.dict(
"nucypher.policy.conditions.evm._CONDITION_CHAINS",
{TESTERCHAIN_CHAIN_ID: "eth-tester/pyevm"},
)
module_mocker.patch(
"nucypher.blockchain.eth.domains.get_domain", return_value=TEMPORARY_DOMAIN

View File

@ -9,7 +9,6 @@ from nucypher.network.nodes import Learner
from nucypher.utilities.logging import GlobalLoggerSettings
from tests.constants import (
MOCK_IP_ADDRESS,
TESTERCHAIN_CHAIN_ID,
)
# Don't re-lock accounts in the background while making commitments
@ -136,14 +135,6 @@ def disable_check_grant_requirements(session_mocker):
session_mocker.patch(target, return_value=MOCK_IP_ADDRESS)
@pytest.fixture(scope="module", autouse=True)
def mock_condition_blockchains(module_mocker):
"""adds testerchain's chain ID to permitted conditional chains"""
module_mocker.patch.dict(
"nucypher.policy.conditions.evm._CONDITION_CHAINS",
{TESTERCHAIN_CHAIN_ID: "eth-tester/pyevm"},
)
@pytest.fixture(scope="module", autouse=True)
def mock_multichain_configuration(module_mocker, testerchain):

View File

@ -73,7 +73,6 @@ TEMPORARY_DOMAIN = TACoDomain(
name=TEMPORARY_DOMAIN_NAME,
eth_chain=TESTERCHAIN_CHAIN_INFO,
polygon_chain=TESTERCHAIN_CHAIN_INFO,
condition_chains=(TESTERCHAIN_CHAIN_INFO,),
)

View File

@ -272,14 +272,6 @@ def monkeypatch_get_staking_provider_from_operator(monkeymodule):
)
@pytest.fixture(scope="module", autouse=True)
def mock_condition_blockchains(module_mocker):
"""adds testerchain's chain ID to permitted conditional chains"""
module_mocker.patch.dict(
"nucypher.policy.conditions.evm._CONDITION_CHAINS",
{TESTERCHAIN_CHAIN_ID: "eth-tester/pyevm"},
)
@pytest.fixture(scope="module")
def multichain_ids(module_mocker):
@ -293,6 +285,20 @@ def multichain_ursulas(ursulas, multichain_ids):
return ursulas
@pytest.fixture(scope="module", autouse=True)
def mock_rpc_endpoints(module_mocker):
"""Mock RPC endpoints for integration tests"""
def mock_get_default_endpoints(domain):
# Return test endpoints for the testerchain
return {TESTERCHAIN_CHAIN_ID: ["http://localhost:8545"]}
module_mocker.patch(
"nucypher.blockchain.eth.utils.get_default_rpc_endpoints",
side_effect=mock_get_default_endpoints,
)
@pytest.fixture(scope="module")
def mock_prometheus(module_mocker):
return module_mocker.patch("nucypher.characters.lawful.start_prometheus_exporter")

View File

@ -13,7 +13,6 @@ def domain_1():
name="domain_uno",
eth_chain=TESTERCHAIN_CHAIN_INFO,
polygon_chain=TESTERCHAIN_CHAIN_INFO,
condition_chains=(TESTERCHAIN_CHAIN_INFO,),
)
@ -23,7 +22,6 @@ def domain_2():
name="domain_dos",
eth_chain=TESTERCHAIN_CHAIN_INFO,
polygon_chain=TESTERCHAIN_CHAIN_INFO,
condition_chains=(TESTERCHAIN_CHAIN_INFO,),
)

View File

@ -47,15 +47,6 @@ def test_invalid_rpc_condition():
parameters=["0xaDD9D957170dF6F33982001E4c22eCCdd5539118"],
)
# unsupported chain id
with pytest.raises(InvalidCondition, match="90210 is not a permitted blockchain"):
_ = RPCCondition(
method="eth_getBalance",
chain=90210, # Beverly Hills Chain :)
return_value_test=ReturnValueTest("==", 0),
parameters=["0xaDD9D957170dF6F33982001E4c22eCCdd5539118"],
)
# invalid chain type provided
with pytest.raises(ValueError, match="invalid literal for int"):
_ = RPCCondition(
@ -101,14 +92,10 @@ def test_rpc_condition_schema_validation(rpc_condition):
with pytest.raises(InvalidConditionLingo):
# chain id not an integer
condition_dict = rpc_condition.to_dict()
condition_dict["chain"] = str(TESTERCHAIN_CHAIN_ID)
RPCCondition.from_dict(condition_dict)
with pytest.raises(InvalidConditionLingo):
# chain id not a permitted chain
condition_dict["chain"] = 90210 # Beverly Hills Chain :)
RPCCondition.from_dict(condition_dict)
def test_rpc_condition_repr(rpc_condition):
rpc_condition_str = f"{rpc_condition}"

View File

@ -27,14 +27,6 @@ def test_invalid_time_condition():
method="time_after_time",
)
# chain id not permitted
with pytest.raises(InvalidCondition):
_ = TimeCondition(
return_value_test=ReturnValueTest(">", 0),
chain=90210, # Beverly Hills Chain :)
method=TimeRPCCall.METHOD,
)
def test_time_condition_schema_validation(time_condition):
condition_dict = time_condition.to_dict()
@ -60,19 +52,16 @@ def test_time_condition_schema_validation(time_condition):
with pytest.raises(InvalidConditionLingo):
# invalid method name
condition_dict = time_condition.to_dict()
condition_dict["method"] = "my_blocktime"
TimeCondition.from_dict(condition_dict)
with pytest.raises(InvalidConditionLingo):
# chain id not an integer
condition_dict = time_condition.to_dict()
condition_dict["chain"] = str(TESTERCHAIN_CHAIN_ID)
TimeCondition.from_dict(condition_dict)
with pytest.raises(InvalidConditionLingo):
# chain id not a permitted chain
condition_dict["chain"] = 90210 # Beverly Hills Chain :)
TimeCondition.from_dict(condition_dict)
@pytest.mark.parametrize(
"invalid_value", ["0x123456", 10.15, [1], [1, 2, 3], [True, [1, 2], "0x0"]]

View File

@ -1,8 +0,0 @@
from nucypher.blockchain.eth.domains import EthChain, PolygonChain
from nucypher.policy.conditions.evm import _CONDITION_CHAINS
def test_default_condition_chains():
all_chains = list(EthChain) + list(PolygonChain)
for chain in all_chains:
assert chain.id in _CONDITION_CHAINS

View File

@ -1,6 +1,6 @@
import math
import time
from datetime import datetime
from datetime import datetime, timezone
from typing import Tuple
from unittest.mock import MagicMock, Mock
@ -112,12 +112,14 @@ def test_get_block_timestamp():
now = time.time()
web3.eth.get_block.return_value = {"timestamp": now}
assert scanner.get_block_timestamp(block_num=0) == datetime.utcfromtimestamp(now)
assert scanner.get_block_timestamp(block_num=0) == datetime.fromtimestamp(
now, tz=timezone.utc
)
other_time = time.time() - 1231231
web3.eth.get_block.return_value = {"timestamp": other_time}
assert scanner.get_block_timestamp(block_num=21) == datetime.utcfromtimestamp(
other_time
assert scanner.get_block_timestamp(block_num=21) == datetime.fromtimestamp(
other_time, tz=timezone.utc
)

View File

@ -1,5 +1,6 @@
import requests
from nucypher.blockchain.eth.domains import EthChain, PolygonChain, TACoDomain
from nucypher.blockchain.eth.utils import (
get_default_rpc_endpoints,
get_healthy_default_rpc_endpoints,
@ -43,15 +44,22 @@ def test_get_default_rpc_endpoints(mocker):
}
mock_get.return_value = mock_response
test_domain = TACoDomain(
name="test",
eth_chain=EthChain.SEPOLIA,
polygon_chain=PolygonChain.AMOY,
)
expected_result = {
1: ["http://endpoint1", "http://endpoint2"],
2: ["http://endpoint3", "http://endpoint4"],
}
assert get_default_rpc_endpoints() == expected_result
assert get_default_rpc_endpoints(test_domain) == expected_result
get_default_rpc_endpoints.cache_clear()
# Mock a failed response
mock_get.return_value.status_code = 500
assert get_default_rpc_endpoints() == {}
assert get_default_rpc_endpoints(test_domain) == {}
def test_get_healthy_default_rpc_endpoints(mocker):
@ -71,14 +79,11 @@ def test_get_healthy_default_rpc_endpoints(mocker):
or endpoint == "http://endpoint3"
)
# Test chain ID 1
healthy_endpoints = get_healthy_default_rpc_endpoints(1)
assert healthy_endpoints == ["http://endpoint1"]
test_domain = TACoDomain(
name="mainnet",
eth_chain=EthChain.MAINNET,
polygon_chain=PolygonChain.MAINNET,
)
# Test chain ID 2
healthy_endpoints = get_healthy_default_rpc_endpoints(2)
assert healthy_endpoints == ["http://endpoint3"]
# Test chain ID with no healthy endpoints
healthy_endpoints = get_healthy_default_rpc_endpoints(3)
assert healthy_endpoints == []
healthy_endpoints = get_healthy_default_rpc_endpoints(test_domain)
assert healthy_endpoints == {1: ["http://endpoint1"], 2: ["http://endpoint3"]}

View File

@ -53,26 +53,18 @@ def test_polygon_chains(poly_chain_test):
"mainnet",
EthChain.MAINNET,
PolygonChain.MAINNET,
(EthChain.MAINNET, PolygonChain.MAINNET),
),
(
domains.LYNX,
"lynx",
EthChain.SEPOLIA,
PolygonChain.AMOY,
(
EthChain.MAINNET,
EthChain.SEPOLIA,
PolygonChain.AMOY,
PolygonChain.MAINNET,
),
),
(
domains.TAPIR,
"tapir",
EthChain.SEPOLIA,
PolygonChain.AMOY,
(EthChain.SEPOLIA, PolygonChain.AMOY),
),
),
)
@ -82,12 +74,10 @@ def test_taco_domain_info(taco_domain_test):
expected_name,
expected_eth_chain,
expected_polygon_chain,
expected_condition_chains,
) = taco_domain_test
assert domain_info.name == expected_name
assert domain_info.eth_chain == expected_eth_chain
assert domain_info.polygon_chain == expected_polygon_chain
assert domain_info.condition_chains == expected_condition_chains
assert domain_info.is_testnet == (expected_name != "mainnet")

View File

@ -11,7 +11,6 @@ from web3 import HTTPProvider
from nucypher.blockchain.eth.signers import InMemorySigner, Signer
from nucypher.characters.lawful import Ursula
from nucypher.config.characters import UrsulaConfiguration
from nucypher.policy.conditions.evm import _CONDITION_CHAINS
from tests.constants import TESTERCHAIN_CHAIN_ID
from tests.utils.blockchain import ReservedTestAccountManager
@ -167,7 +166,6 @@ def mock_permitted_multichain_connections(mocker) -> List[int]:
TESTERCHAIN_CHAIN_ID + 2,
123456789,
]
mocker.patch.dict(_CONDITION_CHAINS, {cid: "fakechain/mainnet" for cid in ids})
return ids