Merge pull request #327 from tuxxy/lindas-in-custody

Create a BlockchainPower with signing and verifying methods
pull/344/head
K Prasch 2018-06-26 01:05:55 -07:00 committed by GitHub
commit 8a3b22e8ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 5 deletions

View File

@ -1,3 +1,4 @@
import binascii
import json
import os
import warnings
@ -5,6 +6,9 @@ from pathlib import Path
from typing import Tuple, List
from constant_sorrow import constants
from eth_utils import to_canonical_address
from eth_keys.datatypes import PublicKey, Signature
from web3.providers.eth_tester.main import EthereumTesterProvider
from web3 import Web3, WebsocketProvider, HTTPProvider, IPCProvider
from web3.contract import Contract
@ -244,13 +248,13 @@ class ControlCircumflex:
#
# If custom __providers are not injected...
self.__providers = list() if providers is None else providers
self._providers = list() if providers is None else providers
if providers is None:
# Mutates self.__providers
# Mutates self._providers
self.add_provider(endpoint_uri=endpoint_uri, websocket=websocket,
ipc_path=ipc_path, timeout=timeout)
web3_instance = Web3(providers=self.__providers) # Instantiate Web3 object with provider
web3_instance = Web3(providers=self._providers) # Instantiate Web3 object with provider
self.w3 = web3_instance # capture web3
# if a SolidityCompiler class instance was passed, compile from solidity source code
@ -308,7 +312,7 @@ class ControlCircumflex:
else:
raise self.InterfaceError("Invalid interface parameters. Pass endpoint_uri or ipc_path")
self.__providers.append(provider)
self._providers.append(provider)
def get_contract_factory(self, contract_name) -> Contract:
"""Retrieve compiled interface data from the cache and return web3 contract"""
@ -390,3 +394,28 @@ class DeployerCircumflex(ControlCircumflex):
contract_abi=contract_factory.abi)
return contract, txhash
def call_backend_sign(self, account: str, message: bytes) -> str:
"""
Calls the appropriate signing function for the specified account on the
backend. If the backend is based on eth-tester, then it uses the
eth-tester signing interface to do so.
"""
provider = self._providers[0]
if isinstance(provider, EthereumTesterProvider):
address = to_canonical_address(account)
sig_key = provider.ethereum_tester.backend._key_lookup[address]
signed_message = sig_key.sign_msg(message)
return signed_message.to_hex()
else:
return self.w3.eth.sign(account, data=message) # Technically deprecated...
def call_backend_verify(self, pubkey: PublicKey, signature: Signature, msg_hash: bytes):
"""
Verifies a hex string signature and message hash are from the provided
public key.
"""
is_valid_sig = signature.verify_msg_hash(msg_hash, pubkey)
sig_pubkey = signature.recover_public_key_from_msg_hash(msg_hash)
return is_valid_sig and (sig_pubkey == pubkey)

View File

@ -464,6 +464,7 @@ class Alice(Character, PolicyAuthor):
_default_crypto_powerups = [SigningPower, EncryptingPower, DelegatingPower]
def __init__(self, is_me=True, federated_only=False, *args, **kwargs):
Character.__init__(self, is_me=is_me, federated_only=federated_only, *args, **kwargs)
if is_me and not federated_only: # TODO: 289
PolicyAuthor.__init__(self, *args, **kwargs)

View File

@ -1,6 +1,10 @@
import inspect
import web3
from binascii import unhexlify
from eth_keys.datatypes import PublicKey, Signature
from typing import List, Union
from eth_utils import keccak
from nucypher.keystore import keypairs
from nucypher.keystore.keypairs import SigningKeypair, EncryptingKeypair
from umbral.keys import UmbralPublicKey, UmbralPrivateKey, UmbralKeyingMaterial
@ -61,6 +65,65 @@ class CryptoPowerUp(object):
confers_public_key = False
class BlockchainPower(CryptoPowerUp):
"""
Allows for transacting on a Blockchain via web3 backend.
"""
def __init__(self, blockchain: 'Blockchain', account: str):
"""
Instantiates a BlockchainPower for the given account id.
"""
self.blockchain = blockchain
self.account = account
self.is_unlocked = False
def unlock_account(self, password: str, duration: int = None):
"""
Unlocks the account for the specified duration. If no duration is
provided, it will remain unlocked indefinitely.
"""
self.is_unlocked = self.blockchain.interface.w3.personal.unlockAccount(
self.account, password, duration=duration)
if not self.is_unlocked:
raise PowerUpError("Account failed to unlock for {}".format(self.account))
def sign_message(self, message: bytes):
"""
Signs the message with the private key of the BlockchainPower.
"""
if not self.is_unlocked:
raise PowerUpError("Account is not unlocked.")
signature = self.blockchain.interface.call_backend_sign(self.account, message)
return signature
def verify_message(self, address: str, pubkey: bytes, message: bytes, signature: str):
"""
Verifies that the message was signed by the keypair.
"""
# Check that address and pubkey match
eth_pubkey = PublicKey(pubkey)
if not eth_pubkey.to_checksum_address() == address:
raise ValueError("Pubkey address ({}) doesn't match the provided address ({})".format(eth_pubkey.to_checksum_address, address))
hashed_message = keccak(message)
eth_signature = Signature(signature_bytes=unhexlify(signature[2:]))
if not self.blockchain.interface.call_backend_verify(
eth_pubkey, eth_signature, hashed_message):
raise PowerUpError("Signature is not valid for this message or pubkey.")
else:
return True
def __del__(self):
"""
Deletes the blockchain power and locks the account.
"""
self.blockchain.interface.w3.personal.lockAccount(self.account)
class KeyPairBasedPower(CryptoPowerUp):
confers_public_key = True
_keypair_class = keypairs.Keypair

View File

@ -1,9 +1,11 @@
import eth_utils
import pytest
from constant_sorrow import constants
from nucypher.characters import Alice, Ursula, Character, Bob
from nucypher.crypto import api
from nucypher.crypto.powers import CryptoPower, SigningPower, NoSigningPower
from nucypher.crypto.powers import CryptoPower, SigningPower, NoSigningPower,\
BlockchainPower, PowerUpError
"""
Chapter 1: SIGNING
@ -72,6 +74,47 @@ def test_anybody_can_verify(mock_policy_agent):
assert cleartext is constants.NO_DECRYPTION_PERFORMED
def test_character_blockchain_power(testerchain):
eth_address = testerchain.interface.w3.eth.accounts[0]
sig_privkey = testerchain.interface._providers[0].ethereum_tester.backend.\
_key_lookup[eth_utils.to_canonical_address(eth_address)]
sig_pubkey = sig_privkey.public_key
signer = Character(is_me=True)
signer._crypto_power.consume_power_up(BlockchainPower(testerchain, eth_address))
# Due to testing backend, the account is already unlocked.
power = signer._crypto_power.power_ups(BlockchainPower)
power.is_unlocked = True
#power.unlock_account('this-is-not-a-secure-password')
data_to_sign = b'What does Ursula look like?!?'
sig = power.sign_message(data_to_sign)
is_verified = power.verify_message(eth_address, sig_pubkey.to_bytes(), data_to_sign, sig)
assert is_verified == True
# Test a bad message:
with pytest.raises(PowerUpError):
power.verify_message( eth_address, sig_pubkey.to_bytes(), data_to_sign + b'bad', sig)
# Test a bad address/pubkey pair
with pytest.raises(ValueError):
power.verify_message(
testerchain.interface.w3.eth.accounts[1],
sig_pubkey.to_bytes(),
data_to_sign,
sig)
# Test a signature without unlocking the account
power.is_unlocked = False
with pytest.raises(PowerUpError):
power.sign_message(b'test')
# Test lockAccount call
del(power)
"""
Chapter 2: ENCRYPTION
"""