SLIP44 support and EIP-155 compliance for trezor testnet derivation paths

pull/2220/head
Kieran Prasch 2020-09-22 21:19:24 -07:00
parent 2854f4ba72
commit f526b72790
13 changed files with 109 additions and 65 deletions

View File

@ -55,7 +55,7 @@ class Signer(ABC):
return NotImplemented
@classmethod
def from_signer_uri(cls, uri: str) -> 'Signer':
def from_signer_uri(cls, uri: str, testnet: bool = False) -> 'Signer':
parsed = urlparse(uri)
scheme = parsed.scheme if parsed.scheme else parsed.path
try:
@ -70,7 +70,7 @@ class Signer(ABC):
message = f'{uri} is not a valid signer URI. Available schemes: {", ".join(cls._SIGNERS)}'
raise cls.InvalidSignerURI(message)
return signer
signer = signer_class.from_signer_uri(uri=uri)
signer = signer_class.from_signer_uri(uri=uri, testnet=testnet)
return signer
@abstractmethod

View File

@ -15,8 +15,8 @@
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import rlp
import trezorlib
from eth_account._utils.transactions import assert_valid_fields, Transaction
from eth_utils.address import to_canonical_address
from eth_utils.applicators import apply_key_map
@ -24,7 +24,7 @@ from eth_utils.conversions import to_int
from functools import wraps
from hexbytes import HexBytes
from trezorlib import ethereum
from trezorlib.client import get_default_client
from trezorlib.client import get_default_client, TrezorClient
from trezorlib.tools import parse_path, Address, H_
from trezorlib.transport import TransportException
from typing import List, Tuple, Union
@ -57,15 +57,21 @@ def handle_trezor_call(device_func):
class TrezorSigner(Signer):
"""A trezor message and transaction signing client."""
# Key Derivation Paths
# BIP44 HD derivation paths
# https://wiki.trezor.io/Cryptocurrency_standards#bip44
# https://wiki.trezor.io/Cryptocurrency_standards#slip44
__BIP_44 = 44
__ETH_COIN_TYPE = 60
__ETH_COIN_TYPE = 60 # mainnet
__TESTNET_COIN_TYPE = 1 # all testnets
CHAIN_ID = 0 # 0 is mainnet
DEFAULT_ACCOUNT = 0
_COIN_TYPE = None # set in __init__
_CHAIN_ID = 0
_DEFAULT_ACCOUNT = 0
_DERIVATION_ROOT = f"{__BIP_44}'/{__ETH_COIN_TYPE}'/{_DEFAULT_ACCOUNT}'/{_CHAIN_ID}"
# Cache
DEFAULT_ACCOUNT_INDEX = 0
DERIVATION_ROOT = f"{__BIP_44}'/{__ETH_COIN_TYPE}'/{DEFAULT_ACCOUNT}'/{CHAIN_ID}"
ADDRESS_CACHE_SIZE = 10 # default number of accounts to derive and internally track
class DeviceError(Exception):
@ -74,14 +80,26 @@ class TrezorSigner(Signer):
class NoDeviceDetected(DeviceError):
"""Raised when an operation requires a device but none are available"""
def __init__(self):
def __init__(self, testnet: bool = False):
self.__client = self._open()
self._device_id = self.__client.get_device_id()
# SLIP44 testnet support for EIP-155 sigatures
# TODO: there is no way to change this back to mainnet
self.testnet = testnet
TrezorSigner._COIN_TYPE = self.__TESTNET_COIN_TYPE if self.testnet else self.__ETH_COIN_TYPE
self.__addresses = dict() # track derived addresses
self.__cache_addresses()
@handle_trezor_call
def _open(self) -> TrezorClient:
try:
self.__client = get_default_client()
client = get_default_client()
except TransportException:
raise self.NoDeviceDetected("Could not find a TREZOR device to connect to. Have you unlocked it?")
self._device_id = self.__client.get_device_id()
self.__addresses = dict() # track derived addresses
self.__load_addresses()
return client
@classmethod
def uri_scheme(cls) -> str:
@ -96,7 +114,7 @@ class TrezorSigner(Signer):
if index is not None and checksum_address:
raise ValueError("Expected index or checksum address; Got both.")
elif index is not None:
hd_path = parse_path(f"{self.DERIVATION_ROOT}/{index}")
hd_path = parse_path(f"{self._DERIVATION_ROOT}/{index}")
else:
try:
hd_path = self.__addresses[checksum_address]
@ -105,23 +123,23 @@ class TrezorSigner(Signer):
return hd_path
@handle_trezor_call
def __get_address(self, index: int = None, hd_path: Address = None, show_display: bool = True) -> str:
def __derive_account(self, index: int = None, hd_path: Address = None) -> str:
"""Resolves a trezorlib HD path into a checksum address and returns it."""
if not hd_path:
if index is None:
raise ValueError("No index or HD path supplied.") # TODO: better error handling here
hd_path = self.__get_address_path(index=index)
address = ethereum.get_address(client=self.__client, n=hd_path, show_display=show_display)
address = ethereum.get_address(client=self.__client, n=hd_path, show_display=False) # TODO: show display?
return address
def __load_addresses(self) -> None:
def __cache_addresses(self) -> None:
"""
Derive trezor addresses up to ADDRESS_CACHE_SIZE relative to
the calculated base path and internally cache them.
Derives trezor ethereum addresses up to ADDRESS_CACHE_SIZE relative to
the calculated base path and internally caches them for later use.
"""
for index in range(self.ADDRESS_CACHE_SIZE):
hd_path = self.__get_address_path(index=index)
address = self.__get_address(hd_path=hd_path, show_display=False)
address = self.__derive_account(hd_path=hd_path)
self.__addresses[address] = hd_path
@staticmethod
@ -138,7 +156,7 @@ class TrezorSigner(Signer):
@handle_trezor_call
def __sign_transaction(self, n: List[int], trezor_transaction: dict) -> Tuple[bytes, bytes, bytes]:
"""Internal wrapper for trezorlib transaction signing calls"""
v, r, s = trezorlib.ethereum.sign_tx(client=self.__client, n=n, **trezor_transaction)
v, r, s = ethereum.sign_tx(client=self.__client, n=n, **trezor_transaction)
return v, r, s
#
@ -146,11 +164,11 @@ class TrezorSigner(Signer):
#
@classmethod
def from_signer_uri(cls, uri: str) -> 'TrezorSigner':
def from_signer_uri(cls, uri: str, testnet: bool = False) -> 'TrezorSigner':
"""Return a trezor signer from URI string i.e. trezor:///my/trezor/path """
if uri != cls.uri_scheme(): # TODO: #2269 Support "rich URIs" for trezors
raise cls.InvalidSignerURI(f'{uri} is not a valid trezor URI scheme')
return cls()
return cls(testnet=testnet)
def is_device(self, account: str) -> bool:
"""Trezor is always a device."""
@ -180,7 +198,7 @@ class TrezorSigner(Signer):
"""
# TODO: #2262 Implement Trezor Message Signing
hd_path = self.__get_address_path(checksum_address=checksum_address)
signed_message = trezorlib.ethereum.sign_message(self.__client, hd_path, message)
signed_message = ethereum.sign_message(self.__client, hd_path, message)
return HexBytes(signed_message.signature)
def sign_transaction(self,
@ -211,27 +229,37 @@ class TrezorSigner(Signer):
if transaction_dict.get('data') is not None: # empty string is valid
transaction_dict['data'] = Web3.toBytes(HexBytes(transaction_dict['data']))
# Format transaction fields for Trezor, Lookup HD path, and Sign Transaction
# Leave the chain ID in tact for the trezor signing request:
# If `chain_id` is included, an EIP-155 transaction signature will be applied
# Eager enforcement of EIP-155
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-155.md
#
# Leave the chain ID in tact for the trezor signing request so that an EIP-155 transaction signature will be applied
# https://github.com/trezor/trezor-core/pull/311
if 'chainId' not in transaction_dict:
raise self.SignerError('Invalid EIP-155 transaction - "chain_id" field is missing in trezor signing request.')
# Format transaction fields for Trezor, Lookup HD path
trezor_transaction = self._format_transaction(transaction_dict=transaction_dict)
n = self.__get_address_path(checksum_address=checksum_address)
v, r, s = self.__sign_transaction(n=n, trezor_transaction=trezor_transaction)
# Format the transaction for eth_account Transaction consumption
# ChainID is longer needed since it is later derived with v = (v + 2) * (chain_id + 35)
# see https://github.com/ethereum/eips/issues/155
del transaction_dict['chainId']
transaction_dict['to'] = to_canonical_address(checksum_address) # str -> bytes
# Note that the derivation path on the trezor must correlate with the chain id
# in the transaction. Since Trezor firmware version 2.3.1 mismatched chain_id
# and derivation path will fail to sign with 'forbidden key path'.
# https://github.com/trezor/trezor-firmware/issues/1050#issuecomment-640718622
hd_path = self.__get_address_path(checksum_address=checksum_address) # from cache
# Fire Trezor device signing request
_v, _r, _s = self.__sign_transaction(n=hd_path, trezor_transaction=trezor_transaction)
# Post-signing sanity check for replay attack protection.
chain_id = transaction_dict.pop('chainId')
eip155_v = 1 + chain_id * 2 + 35
if _v != eip155_v:
raise self.SignerError(f'Invalid EIP-155 transaction signature - v({_v}) does not match calculation {eip155_v}')
# Create RLP serializable Transaction instance with eth_account
signed_transaction = Transaction(v=to_int(v), # int
r=to_int(r), # bytes -> int
s=to_int(s), # bytes -> int
transaction_dict['to'] = to_canonical_address(checksum_address) # str -> bytes
signed_transaction = Transaction(v=to_int(_v), # type: int
r=to_int(_r), # bytes -> int
s=to_int(_s), # bytes -> int
**transaction_dict)
# Optionally encode as RLP for broadcasting

View File

@ -49,7 +49,7 @@ class Web3Signer(Signer):
return NotImplemented # web3 signer uses a "passthrough" scheme
@classmethod
def from_signer_uri(cls, uri: str) -> 'Web3Signer':
def from_signer_uri(cls, uri: str, testnet: bool = False) -> 'Web3Signer':
from nucypher.blockchain.eth.interfaces import BlockchainInterface, BlockchainInterfaceFactory
try:
blockchain = BlockchainInterfaceFactory.get_or_create_interface(provider_uri=uri)
@ -117,10 +117,14 @@ class ClefSigner(Signer):
TIMEOUT = 60 # Default timeout for Clef of 60 seconds
def __init__(self, ipc_path: str = DEFAULT_IPC_PATH, timeout: int = TIMEOUT):
def __init__(self,
ipc_path: str = DEFAULT_IPC_PATH,
timeout: int = TIMEOUT,
testnet: bool = False):
super().__init__()
self.w3 = Web3(provider=IPCProvider(ipc_path=ipc_path, timeout=timeout)) # TODO: Unify with clients or build error handling
self.ipc_path = ipc_path
self.testnet = testnet
@classmethod
def uri_scheme(cls) -> str:
@ -142,13 +146,13 @@ class ClefSigner(Signer):
return uri_breakdown.scheme == cls.uri_scheme()
@classmethod
def from_signer_uri(cls, uri: str) -> 'ClefSigner':
def from_signer_uri(cls, uri: str, testnet: bool = False) -> 'ClefSigner':
uri_breakdown = urlparse(uri)
if not uri_breakdown.path and not uri_breakdown.netloc:
raise cls.InvalidSignerURI('Blank signer URI - No keystore path provided')
if uri_breakdown.scheme != cls.uri_scheme():
raise cls.InvalidSignerURI(f"{uri} is not a valid clef signer URI.")
signer = cls(ipc_path=uri_breakdown.path)
signer = cls(ipc_path=uri_breakdown.path, testnet=testnet)
return signer
def is_connected(self) -> bool:
@ -238,12 +242,13 @@ class KeystoreSigner(Signer):
Keystore must be in the geth wallet format.
"""
def __init__(self, path: str):
def __init__(self, path: str, testnet: bool = False):
super().__init__()
self.__path = path
self.__keys = dict()
self.__signers = dict()
self.__read_keystore(path=path)
self.testnet = testnet
def __del__(self):
# TODO: Might need a finally block or exception context handling
@ -328,12 +333,12 @@ class KeystoreSigner(Signer):
return self.__path
@classmethod
def from_signer_uri(cls, uri: str) -> 'Signer':
def from_signer_uri(cls, uri: str, testnet: bool = False) -> 'Signer':
"""Return a keystore signer from URI string i.e. keystore:///my/path/keystore """
decoded_uri = urlparse(uri)
if decoded_uri.scheme != cls.uri_scheme() or decoded_uri.netloc:
raise cls.InvalidSignerURI(uri)
return cls(path=decoded_uri.path)
return cls(path=decoded_uri.path, testnet=testnet)
@validate_checksum_address
def is_device(self, account: str) -> bool:

View File

@ -113,7 +113,8 @@ def select_client_account(emitter,
BlockchainInterfaceFactory.initialize_interface(provider_uri=provider_uri, poa=poa, emitter=emitter)
if signer_uri:
signer = Signer.from_signer_uri(signer_uri)
testnet = network != NetworksInventory.MAINNET
signer = Signer.from_signer_uri(signer_uri, testnet=testnet)
wallet = Wallet(provider_uri=provider_uri, signer=signer)

View File

@ -21,6 +21,7 @@ import json
import click
import os
from nucypher.blockchain.eth.networks import NetworksInventory
from nucypher.blockchain.eth.actors import EmergencyResponseManager, BaseActor
from nucypher.blockchain.eth.signers.base import Signer
from nucypher.blockchain.eth.signers.software import ClefSigner
@ -85,7 +86,9 @@ class DaoOptions: # TODO: This class is essentially the same that WorkLock opti
is_clef = ClefSigner.is_valid_clef_uri(self.signer_uri) # TODO: why not allow the clef signer's validator act on this?
if transacting and not is_clef and not hw_wallet:
client_password = get_client_password(checksum_address=self.participant_address)
signer = Signer.from_signer_uri(self.signer_uri) if self.signer_uri else None
testnet = self.domain != NetworksInventory.MAINNET
signer = Signer.from_signer_uri(self.signer_uri, testnet=testnet) if self.signer_uri else None
actor = EmergencyResponseManager(checksum_address=self.participant_address, # bomberos
network=self.network,
registry=registry,

View File

@ -203,8 +203,10 @@ class ActorOptions:
eth_password_is_needed = not self.hw_wallet and not deployer_interface.client.is_local and not is_clef
if eth_password_is_needed:
password = get_client_password(checksum_address=deployer_address)
# Produce Actor
signer = Signer.from_signer_uri(self.signer_uri) if self.signer_uri else None
testnet = deployer_interface.client.chain_name != NetworksInventory.MAINNET
signer = Signer.from_signer_uri(self.signer_uri, testnet=testnet) if self.signer_uri else None
ADMINISTRATOR = ContractAdministrator(registry=local_registry,
client_password=password,
deployer_address=deployer_address,

View File

@ -120,7 +120,8 @@ class WorkLockOptions:
is_clef = ClefSigner.is_valid_clef_uri(self.signer_uri)
if transacting and not is_clef and not hw_wallet:
client_password = get_client_password(checksum_address=self.bidder_address)
signer = Signer.from_signer_uri(self.signer_uri) if self.signer_uri else None
testnet = self.network != NetworksInventory.MAINNET
signer = Signer.from_signer_uri(self.signer_uri, testnet=testnet) if self.signer_uri else None
bidder = Bidder(checksum_address=self.bidder_address,
registry=registry,
client_password=client_password,

View File

@ -25,6 +25,7 @@ from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
from cryptography.x509 import Certificate
from tempfile import TemporaryDirectory
from nucypher.blockchain.eth.networks import NetworksInventory
from nucypher.blockchain.eth.actors import StakeHolder
from nucypher.blockchain.eth.signers import Signer
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
@ -273,7 +274,9 @@ class StakeHolderConfiguration(CharacterConfiguration):
@property
def dynamic_payload(self) -> dict:
payload = dict(registry=self.registry, signer=Signer.from_signer_uri(self.signer_uri))
testnet = NetworksInventory.MAINNET not in self.domains # TODO: use equality instead of membership after blue oysters
signer = Signer.from_signer_uri(self.signer_uri, testnet=testnet)
payload = dict(registry=self.registry, signer=signer)
return payload
def __setup_node_storage(self, node_storage=None) -> None:

View File

@ -436,8 +436,9 @@ class CharacterConfiguration(BaseConfiguration):
"""Exported dynamic configuration values for initializing Ursula"""
payload = dict()
if not self.federated_only:
payload.update(dict(registry=self.registry,
signer=Signer.from_signer_uri(self.signer_uri)))
testnet = NetworksInventory.MAINNET not in self.domains # TODO: use equality, not membership after blue oyster mushrooms
signer = Signer.from_signer_uri(self.signer_uri, testnet=testnet)
payload.update(dict(registry=self.registry, signer=signer))
payload.update(dict(network_middleware=self.network_middleware or self.DEFAULT_NETWORK_MIDDLEWARE(),
known_nodes=self.known_nodes,

View File

@ -104,7 +104,7 @@ def test_ursula_and_local_keystore_signer_integration(click_runner,
# Good signer...
mock_signer_uri = f'keystore:{mock_keystore_path}'
pre_config_signer = KeystoreSigner.from_signer_uri(uri=mock_signer_uri)
pre_config_signer = KeystoreSigner.from_signer_uri(uri=mock_signer_uri, testnet=True)
assert worker_account.address in pre_config_signer.accounts
deploy_port = select_test_port()

View File

@ -88,25 +88,25 @@ def unknown_address():
def test_invalid_keystore(tmp_path):
with pytest.raises(Signer.InvalidSignerURI) as e:
Signer.from_signer_uri(uri=f'keystore:{tmp_path/"nonexistent"}')
Signer.from_signer_uri(uri=f'keystore:{tmp_path/"nonexistent"}', testnet=True)
empty_path = tmp_path / 'empty_file'
open(empty_path, 'x+t').close()
with pytest.raises(KeystoreSigner.InvalidKeyfile, match=
'Invalid JSON in keyfile at') as e:
Signer.from_signer_uri(uri=f'keystore:{empty_path}')
Signer.from_signer_uri(uri=f'keystore:{empty_path}', testnet=True)
empty_json = tmp_path / 'empty_json'
json.dump({}, open(empty_json, 'x+t'))
with pytest.raises(KeystoreSigner.InvalidKeyfile, match=
'Keyfile does not contain address field at') as e:
Signer.from_signer_uri(uri=f'keystore:{empty_json}')
Signer.from_signer_uri(uri=f'keystore:{empty_json}', testnet=True)
bad_address = tmp_path / 'bad_address'
json.dump({'address':''}, open(bad_address, 'x+t'))
with pytest.raises(KeystoreSigner.InvalidKeyfile, match=
'does not contain a valid ethereum address') as e:
Signer.from_signer_uri(uri=f'keystore:{bad_address}')
Signer.from_signer_uri(uri=f'keystore:{bad_address}', testnet=True)
def test_signer_reads_keystore_from_disk(mock_account, mock_key, tmpdir):
@ -127,7 +127,7 @@ def test_signer_reads_keystore_from_disk(mock_account, mock_key, tmpdir):
fake_keyfile.write(json.dumps(MOCK_KEYFILE))
mock_keystore_uri = f'keystore://{tmp_keystore}'
signer = Signer.from_signer_uri(uri=mock_keystore_uri)
signer = Signer.from_signer_uri(uri=mock_keystore_uri, testnet=True)
assert signer.path == str(tmp_keystore)
assert len(signer.accounts) == 1
@ -143,7 +143,7 @@ def test_create_signer_from_keystore_directory(mock_account, mock_keystore):
mock_keystore_uri = f'keystore:{mock_keystore_path}'
# Return a "real" account address from the keyfile
signer = Signer.from_signer_uri(uri=mock_keystore_uri) # type: KeystoreSigner
signer = Signer.from_signer_uri(uri=mock_keystore_uri, testnet=True) # type: KeystoreSigner
assert signer.path == str(mock_keystore_path)
assert len(signer.accounts) == 1
assert mock_account.address in signer.accounts
@ -154,7 +154,7 @@ def test_create_signer_from_keystore_file(mock_account, mock_keystore):
mock_keystore_uri = f'keystore:{mock_keystore_path}'
# Return a "real" account address from the keyfile
signer = Signer.from_signer_uri(uri=mock_keystore_uri) # type: KeystoreSigner
signer = Signer.from_signer_uri(uri=mock_keystore_uri, testnet=True) # type: KeystoreSigner
assert signer.path == str(mock_keystore_path)
assert len(signer.accounts) == 1
assert mock_account.address in signer.accounts

View File

@ -57,7 +57,7 @@ def test_ursula_init_with_local_keystore_signer(click_runner,
mock_signer_uri = f'keystore://{mock_keystore_path}'
# Good signer...
pre_config_signer = KeystoreSigner.from_signer_uri(uri=mock_signer_uri)
pre_config_signer = KeystoreSigner.from_signer_uri(uri=mock_signer_uri, testnet=True)
deploy_port = select_test_port()

View File

@ -48,7 +48,7 @@ def mock_account():
def test_blank_keystore_uri():
with pytest.raises(Signer.InvalidSignerURI, match='Blank signer URI - No keystore path provided') as error:
Signer.from_signer_uri(uri='keystore://') # it's blank!
Signer.from_signer_uri(uri='keystore://', testnet=True) # it's blank!
def test_trezor_transaction_format():
@ -82,14 +82,14 @@ def mock_trezor(mocker, mock_account):
return mock_account.address
mocker.patch.object(signers.hardware, 'get_default_client', return_value=FakeTrezorClient())
mocker.patch.object(TrezorSigner, '_TrezorSigner__get_address', return_value=mock_account.address)
mocker.patch.object(TrezorSigner, '_TrezorSigner__derive_account', return_value=mock_account.address)
mocker.patch.object(TrezorSigner, '_TrezorSigner__sign_transaction', return_value=FakeTrezorClient.faked_vrs)
def test_trezor_signer_uri(mock_trezor):
signer = Signer.from_signer_uri(uri='trezor')
signer = Signer.from_signer_uri(uri='trezor', testnet=False) # TODO: test derivation for testnet SLIP44
assert isinstance(signer, TrezorSigner)
assert signer.DERIVATION_ROOT == "44'/60'/0'/0"
assert signer._DERIVATION_ROOT == "44'/60'/0'/0"
assert len(signer.accounts) == 1
# TODO: #2269 Support "rich URIs" for trezors