mirror of https://github.com/nucypher/nucypher.git
Add unit and acceptance tests for EIP1271 authentication.
parent
278cf9f0d4
commit
ac5da6ce7e
|
@ -57,14 +57,12 @@ def erc20_evm_condition_balanceof(testerchain, test_registry, ritual_token):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def erc721_contract(accounts, project):
|
||||
account = accounts[0]
|
||||
|
||||
def erc721_contract(project, deployer_account):
|
||||
# deploy contract
|
||||
deployed_contract = project.ConditionNFT.deploy(sender=account)
|
||||
deployed_contract = project.ConditionNFT.deploy(sender=deployer_account)
|
||||
|
||||
# mint nft with token id = 1
|
||||
deployed_contract.mint(account.address, 1, sender=account)
|
||||
deployed_contract.mint(deployer_account.address, 1, sender=deployer_account)
|
||||
return deployed_contract
|
||||
|
||||
|
||||
|
@ -154,3 +152,11 @@ def custom_context_variable_erc20_condition(
|
|||
parameters=[":addressToUse"],
|
||||
)
|
||||
return condition
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eip1271_contract_wallet(project, deployer_account):
|
||||
_eip1271_contract_wallet = deployer_account.deploy(
|
||||
project.SmartContractWallet, deployer_account.address
|
||||
)
|
||||
return _eip1271_contract_wallet
|
||||
|
|
|
@ -3,6 +3,7 @@ import os
|
|||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from eth_account.messages import defunct_hash_message, encode_defunct
|
||||
from hexbytes import HexBytes
|
||||
from web3 import Web3
|
||||
from web3.providers import BaseProvider
|
||||
|
@ -13,6 +14,7 @@ from nucypher.blockchain.eth.agents import (
|
|||
SubscriptionManagerAgent,
|
||||
)
|
||||
from nucypher.blockchain.eth.constants import NULL_ADDRESS
|
||||
from nucypher.policy.conditions.auth.evm import EvmAuth
|
||||
from nucypher.policy.conditions.context import (
|
||||
USER_ADDRESS_CONTEXT,
|
||||
get_context_value,
|
||||
|
@ -930,3 +932,56 @@ def test_json_rpc_condition_non_evm_prototyping_example():
|
|||
)
|
||||
success, _ = condition.verify()
|
||||
assert success
|
||||
|
||||
|
||||
def test_rpc_condition_using_eip1271(
|
||||
testerchain, deployer_account, eip1271_contract_wallet, condition_providers
|
||||
):
|
||||
# send some ETH to the smart contract wallet
|
||||
eth_amount = Web3.to_wei(2.25, "ether")
|
||||
|
||||
encoded_deposit_function = eip1271_contract_wallet.deposit.encode_input().hex()
|
||||
deployer_account.transfer(
|
||||
account=eip1271_contract_wallet.address,
|
||||
value=eth_amount,
|
||||
data=encoded_deposit_function,
|
||||
)
|
||||
|
||||
rpc_condition = RPCCondition(
|
||||
method="eth_getBalance",
|
||||
chain=TESTERCHAIN_CHAIN_ID,
|
||||
parameters=[USER_ADDRESS_CONTEXT],
|
||||
return_value_test=ReturnValueTest("==", eth_amount),
|
||||
)
|
||||
|
||||
data = f"I'm the owner of the smart contract wallet address {eip1271_contract_wallet.address}"
|
||||
signable_message = encode_defunct(text=data)
|
||||
hash = defunct_hash_message(text=data)
|
||||
message_signature = deployer_account.sign_message(signable_message)
|
||||
hex_signature = HexBytes(message_signature.encode_rsv()).hex()
|
||||
|
||||
typedData = {"chain": TESTERCHAIN_CHAIN_ID, "dataHash": hash.hex()}
|
||||
auth_message = {
|
||||
"signature": f"{hex_signature}",
|
||||
"address": f"{eip1271_contract_wallet.address}",
|
||||
"scheme": EvmAuth.AuthScheme.EIP1271.value,
|
||||
"typedData": typedData,
|
||||
}
|
||||
context = {
|
||||
USER_ADDRESS_CONTEXT: auth_message,
|
||||
}
|
||||
condition_result, call_result = rpc_condition.verify(
|
||||
providers=condition_providers, **context
|
||||
)
|
||||
assert condition_result is True
|
||||
assert call_result == eth_amount
|
||||
|
||||
# withdraw some ETH and check condition again
|
||||
withdraw_amount = Web3.to_wei(1, "ether")
|
||||
eip1271_contract_wallet.withdraw(withdraw_amount, sender=deployer_account)
|
||||
condition_result, call_result = rpc_condition.verify(
|
||||
providers=condition_providers, **context
|
||||
)
|
||||
assert condition_result is False
|
||||
assert call_result != eth_amount
|
||||
assert call_result == (eth_amount - withdraw_amount)
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
import "@openzeppelin/contracts/access/Ownable.sol";
|
||||
import "@openzeppelin/contracts/interfaces/IERC1271.sol";
|
||||
import "@openzeppelin/contracts/utils/cryptography/ECDSA.sol";
|
||||
|
||||
contract SmartContractWallet is IERC1271, Ownable {
|
||||
using ECDSA for bytes32;
|
||||
|
||||
uint public balance;
|
||||
|
||||
bytes4 internal constant MAGICVALUE = 0x1626ba7e;
|
||||
bytes4 constant internal INVALID_SIGNATURE = 0xffffffff;
|
||||
|
||||
constructor(address _owner) Ownable(_owner) public {}
|
||||
|
||||
function deposit() external payable {
|
||||
balance += msg.value;
|
||||
}
|
||||
|
||||
function withdraw(uint amount) external onlyOwner {
|
||||
require(amount <= balance, "Amount exceeds balance");
|
||||
balance -= amount;
|
||||
payable(owner()).transfer(amount);
|
||||
}
|
||||
|
||||
function isValidSignature(bytes32 _hash, bytes memory _signature) public view override returns (bytes4) {
|
||||
address signer = _hash.recover(_signature);
|
||||
if (signer == owner()) {
|
||||
return MAGICVALUE;
|
||||
} else {
|
||||
return INVALID_SIGNATURE;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,10 @@
|
|||
import maya
|
||||
import pytest
|
||||
from eth_account import Account
|
||||
from eth_account.messages import defunct_hash_message
|
||||
from hexbytes import HexBytes
|
||||
from siwe import SiweMessage
|
||||
from web3.contract import Contract
|
||||
|
||||
from nucypher.blockchain.eth.signers import InMemorySigner
|
||||
from nucypher.policy.conditions.auth.evm import (
|
||||
|
@ -9,6 +13,9 @@ from nucypher.policy.conditions.auth.evm import (
|
|||
EIP4361Auth,
|
||||
EvmAuth,
|
||||
)
|
||||
from nucypher.policy.conditions.exceptions import NoConnectionToChain
|
||||
from nucypher.policy.conditions.utils import ConditionProviderManager
|
||||
from tests.constants import TESTERCHAIN_CHAIN_ID
|
||||
|
||||
|
||||
def test_auth_scheme():
|
||||
|
@ -288,3 +295,110 @@ def test_authenticate_eip4361(get_random_checksum_address):
|
|||
not_stale_but_past_expiry_signature.hex(),
|
||||
valid_address_for_signature,
|
||||
)
|
||||
|
||||
|
||||
def test_authenticate_eip1271(mocker, get_random_checksum_address):
|
||||
# smart contract wallet
|
||||
eip1271_mock_contract = mocker.Mock(spec=Contract)
|
||||
contract_address = get_random_checksum_address()
|
||||
eip1271_mock_contract.address = contract_address
|
||||
|
||||
# signer for wallet
|
||||
data = f"I'm the owner of the smart contract wallet address {eip1271_mock_contract.address}"
|
||||
wallet_signer = InMemorySigner()
|
||||
valid_message_signature = wallet_signer.sign_message(
|
||||
account=wallet_signer.accounts[0], message=data.encode()
|
||||
)
|
||||
data_hash = defunct_hash_message(text=data)
|
||||
typedData = {"chain": TESTERCHAIN_CHAIN_ID, "dataHash": data_hash.hex()}
|
||||
|
||||
def _isValidSignature(data_hash, signature_bytes):
|
||||
class ContractCall:
|
||||
def __init__(self, hash, signature):
|
||||
self.hash = hash
|
||||
self.signature = signature
|
||||
|
||||
def call(self):
|
||||
recovered_address = Account._recover_hash(
|
||||
message_hash=self.hash, signature=self.signature
|
||||
)
|
||||
if recovered_address == wallet_signer.accounts[0]:
|
||||
return bytes(HexBytes("0x1626ba7e"))
|
||||
|
||||
return bytes(HexBytes("0xffffffff"))
|
||||
|
||||
return ContractCall(data_hash, signature_bytes)
|
||||
|
||||
eip1271_mock_contract.functions.isValidSignature.side_effect = _isValidSignature
|
||||
|
||||
# condition provider manager
|
||||
providers = mocker.Mock(spec=ConditionProviderManager)
|
||||
w3 = mocker.Mock()
|
||||
w3.eth.contract.return_value = eip1271_mock_contract
|
||||
providers.web3_endpoints.return_value = [w3]
|
||||
|
||||
# valid signature
|
||||
EIP1271Auth.authenticate(
|
||||
typedData, valid_message_signature, eip1271_mock_contract.address, providers
|
||||
)
|
||||
|
||||
# invalid typed data - no chain id
|
||||
with pytest.raises(EvmAuth.InvalidData):
|
||||
EIP1271Auth.authenticate(
|
||||
{
|
||||
"dataHash": data_hash.hex(),
|
||||
},
|
||||
valid_message_signature,
|
||||
eip1271_mock_contract.address,
|
||||
providers,
|
||||
)
|
||||
|
||||
# invalid typed data - no data hash
|
||||
with pytest.raises(EvmAuth.InvalidData):
|
||||
EIP1271Auth.authenticate(
|
||||
{
|
||||
"chainId": TESTERCHAIN_CHAIN_ID,
|
||||
},
|
||||
valid_message_signature,
|
||||
eip1271_mock_contract.address,
|
||||
providers,
|
||||
)
|
||||
|
||||
# use invalid signer
|
||||
invalid_signer = InMemorySigner()
|
||||
invalid_message_signature = invalid_signer.sign_message(
|
||||
account=invalid_signer.accounts[0], message=data.encode()
|
||||
)
|
||||
with pytest.raises(EvmAuth.AuthenticationFailed):
|
||||
EIP1271Auth.authenticate(
|
||||
typedData,
|
||||
invalid_message_signature,
|
||||
eip1271_mock_contract.address,
|
||||
providers,
|
||||
)
|
||||
|
||||
# bad w3 instance failed for some reason
|
||||
w3_bad = mocker.Mock()
|
||||
w3_bad.eth.contract.side_effect = ValueError("something went wrong")
|
||||
providers.web3_endpoints.return_value = [w3_bad]
|
||||
with pytest.raises(EvmAuth.AuthenticationFailed, match="something went wrong"):
|
||||
EIP1271Auth.authenticate(
|
||||
typedData, valid_message_signature, eip1271_mock_contract.address, providers
|
||||
)
|
||||
assert w3_bad.eth.contract.call_count == 1, "one call that failed"
|
||||
|
||||
# fall back to good w3 instances
|
||||
providers.web3_endpoints.return_value = [w3_bad, w3_bad, w3]
|
||||
EIP1271Auth.authenticate(
|
||||
typedData, valid_message_signature, eip1271_mock_contract.address, providers
|
||||
)
|
||||
assert w3_bad.eth.contract.call_count == 3, "two more calls that failed"
|
||||
|
||||
# no connection to chain
|
||||
providers.web3_endpoints.side_effect = NoConnectionToChain(
|
||||
chain=TESTERCHAIN_CHAIN_ID
|
||||
)
|
||||
with pytest.raises(EvmAuth.AuthenticationFailed, match="No connection to chain ID"):
|
||||
EIP1271Auth.authenticate(
|
||||
typedData, valid_message_signature, eip1271_mock_contract.address, providers
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue