mirror of https://github.com/nucypher/nucypher.git
Introduces InMemorySigner
parent
c948e3bc12
commit
c2baae28d8
|
@ -1,8 +1,7 @@
|
|||
|
||||
|
||||
from nucypher.blockchain.eth.signers.base import Signer
|
||||
from nucypher.blockchain.eth.signers.software import KeystoreSigner
|
||||
from nucypher.blockchain.eth.signers.software import InMemorySigner, KeystoreSigner
|
||||
|
||||
Signer._SIGNERS = {
|
||||
KeystoreSigner.uri_scheme(): KeystoreSigner,
|
||||
InMemorySigner.uri_scheme(): InMemorySigner,
|
||||
}
|
||||
|
|
|
@ -264,3 +264,71 @@ class KeystoreSigner(Signer):
|
|||
signer = self.__get_signer(account=account)
|
||||
signature = signer.sign_message(signable_message=encode_defunct(primitive=message)).signature
|
||||
return HexBytes(signature)
|
||||
|
||||
|
||||
class InMemorySigner(Signer):
|
||||
"""Local signer implementation for in-memory-only keys"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
account = Account.create()
|
||||
self.__keys = {account.address: account.key.hex()}
|
||||
self.__signers = {account.address: account}
|
||||
|
||||
@classmethod
|
||||
def from_signer_uri(cls, uri: str, testnet: bool = False) -> "Signer":
|
||||
"""Return an in-memory signer from URI string i.e. memory://"""
|
||||
decoded_uri = urlparse(uri)
|
||||
if decoded_uri.scheme != cls.uri_scheme() or decoded_uri.netloc:
|
||||
raise cls.InvalidSignerURI(uri)
|
||||
return cls()
|
||||
|
||||
@classmethod
|
||||
def uri_scheme(cls) -> str:
|
||||
return "memory"
|
||||
|
||||
def lock_account(self, account: str) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def accounts(self) -> List[str]:
|
||||
"""Return a list of known in-memory accounts read from"""
|
||||
return list(self.__keys.keys())
|
||||
|
||||
@validate_checksum_address
|
||||
def is_device(self, account: str) -> bool:
|
||||
return False # In-memory accounts are never devices.
|
||||
|
||||
@validate_checksum_address
|
||||
def unlock_account(self, account: str, password: str, duration: int = None) -> bool:
|
||||
return True
|
||||
|
||||
@validate_checksum_address
|
||||
def __get_signer(self, account: str) -> LocalAccount:
|
||||
"""Lookup a known keystore account by its checksum address or raise an error"""
|
||||
try:
|
||||
return self.__signers[account]
|
||||
except KeyError:
|
||||
if account not in self.__keys:
|
||||
raise self.UnknownAccount(account=account)
|
||||
else:
|
||||
raise self.AccountLocked(account=account)
|
||||
|
||||
@validate_checksum_address
|
||||
def sign_transaction(self, transaction_dict: dict) -> HexBytes:
|
||||
sender = transaction_dict["from"]
|
||||
signer = self.__get_signer(account=sender)
|
||||
if not transaction_dict["to"]:
|
||||
transaction_dict = dissoc(transaction_dict, "to")
|
||||
raw_transaction = signer.sign_transaction(
|
||||
transaction_dict=transaction_dict
|
||||
).rawTransaction
|
||||
return raw_transaction
|
||||
|
||||
@validate_checksum_address
|
||||
def sign_message(self, account: str, message: bytes, **kwargs) -> HexBytes:
|
||||
signer = self.__get_signer(account=account)
|
||||
signature = signer.sign_message(
|
||||
signable_message=encode_defunct(primitive=message)
|
||||
).signature
|
||||
return HexBytes(signature)
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
import pytest
|
||||
from cytoolz import assoc
|
||||
from eth_account._utils.legacy_transactions import Transaction
|
||||
from eth_utils import to_checksum_address
|
||||
from hexbytes import HexBytes
|
||||
|
||||
from nucypher.blockchain.eth.constants import LENGTH_ECDSA_SIGNATURE_WITH_RECOVERY
|
||||
from nucypher.blockchain.eth.signers import InMemorySigner, Signer
|
||||
from tests.unit.test_web3_signers import TRANSACTION_DICT
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def signer():
|
||||
_signer = InMemorySigner()
|
||||
return _signer
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def account(signer):
|
||||
_account = signer.accounts[0]
|
||||
return _account
|
||||
|
||||
|
||||
def test_memory_signer_from_signer_uri():
|
||||
signer = Signer.from_signer_uri(uri="memory://")
|
||||
assert isinstance(signer, InMemorySigner)
|
||||
|
||||
|
||||
def test_memory_signer_uri_scheme(signer):
|
||||
assert signer.uri_scheme() == "memory"
|
||||
|
||||
|
||||
def test_memory_signer_accounts(signer):
|
||||
assert len(signer.accounts) == 1
|
||||
assert isinstance(signer.accounts[0], str)
|
||||
assert len(signer.accounts[0]) == 42
|
||||
|
||||
|
||||
def test_memory_signer_lock_account(signer, account):
|
||||
assert signer.is_device(account=account) is False
|
||||
assert signer.lock_account(account=account) is True
|
||||
assert signer.is_device(account=account) is False
|
||||
assert signer.unlock_account(account=account, password="password") is True
|
||||
|
||||
|
||||
def test_memory_signer_message(signer, account):
|
||||
message = b"An in-memory signer - because sometimes, having a short-term memory is actually a superpower!"
|
||||
signature = signer.sign_message(account=account, message=message)
|
||||
assert len(signature) == LENGTH_ECDSA_SIGNATURE_WITH_RECOVERY
|
||||
|
||||
|
||||
def test_memory_signer_transaction(signer, account):
|
||||
transaction_dict = assoc(TRANSACTION_DICT, "from", value=account)
|
||||
signed_transaction = signer.sign_transaction(transaction_dict=transaction_dict)
|
||||
assert isinstance(signed_transaction, HexBytes)
|
||||
transaction = Transaction.from_bytes(signed_transaction)
|
||||
assert to_checksum_address(transaction.to) == transaction_dict["to"]
|
Loading…
Reference in New Issue