mirror of https://github.com/nucypher/nucypher.git
Reorganize tests into multiple suites.
parent
a69a6de4c2
commit
28cce2fccc
|
@ -0,0 +1,68 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from nucypher.blockchain.eth.clients import (EthereumClient, GethClient)
|
||||
from tests.unit.test_mocked_clients import GethClientTestBlockchain, SyncedMockWeb3, SyncingMockWeb3, \
|
||||
SyncingMockWeb3NoPeers
|
||||
|
||||
|
||||
def test_synced_geth_client():
|
||||
|
||||
class SyncedBlockchainInterface(GethClientTestBlockchain):
|
||||
|
||||
Web3 = SyncedMockWeb3
|
||||
|
||||
interface = SyncedBlockchainInterface(provider_uri='file:///ipc.geth')
|
||||
interface.connect()
|
||||
|
||||
assert interface.client._has_latest_block()
|
||||
assert interface.client.sync()
|
||||
|
||||
|
||||
def test_unsynced_geth_client():
|
||||
|
||||
GethClient.SYNC_SLEEP_DURATION = .1
|
||||
|
||||
class NonSyncedBlockchainInterface(GethClientTestBlockchain):
|
||||
|
||||
Web3 = SyncingMockWeb3
|
||||
|
||||
interface = NonSyncedBlockchainInterface(provider_uri='file:///ipc.geth')
|
||||
interface.connect()
|
||||
|
||||
assert interface.client._has_latest_block() is False
|
||||
assert interface.client.syncing
|
||||
|
||||
assert len(list(interface.client.sync())) == 8
|
||||
|
||||
|
||||
def test_no_peers_unsynced_geth_client():
|
||||
|
||||
GethClient.PEERING_TIMEOUT = .1
|
||||
|
||||
class NonSyncedNoPeersBlockchainInterface(GethClientTestBlockchain):
|
||||
|
||||
Web3 = SyncingMockWeb3NoPeers
|
||||
|
||||
interface = NonSyncedNoPeersBlockchainInterface(provider_uri='file:///ipc.geth')
|
||||
interface.connect()
|
||||
|
||||
assert interface.client._has_latest_block() is False
|
||||
with pytest.raises(EthereumClient.SyncTimeout):
|
||||
list(interface.client.sync())
|
|
@ -20,71 +20,10 @@ import json
|
|||
import pytest
|
||||
|
||||
from nucypher.blockchain.eth.constants import PREALLOCATION_ESCROW_CONTRACT_NAME
|
||||
from nucypher.blockchain.eth.interfaces import BaseContractRegistry
|
||||
from nucypher.blockchain.eth.registry import IndividualAllocationRegistry, LocalContractRegistry
|
||||
from nucypher.blockchain.eth.registry import IndividualAllocationRegistry
|
||||
from nucypher.config.constants import TEMPORARY_DOMAIN
|
||||
|
||||
|
||||
def test_contract_registry(tempfile_path):
|
||||
|
||||
# ABC
|
||||
with pytest.raises(TypeError):
|
||||
BaseContractRegistry(filepath='test')
|
||||
|
||||
with pytest.raises(BaseContractRegistry.RegistryError):
|
||||
bad_registry = LocalContractRegistry(filepath='/fake/file/path/registry.json')
|
||||
bad_registry.search(contract_address='0xdeadbeef')
|
||||
|
||||
# Tests everything is as it should be when initially created
|
||||
test_registry = LocalContractRegistry(filepath=tempfile_path)
|
||||
|
||||
assert test_registry.read() == list()
|
||||
|
||||
# Test contract enrollment and dump_chain
|
||||
test_name = 'TestContract'
|
||||
test_addr = '0xDEADBEEF'
|
||||
test_abi = ['fake', 'data']
|
||||
test_version = "some_version"
|
||||
|
||||
test_registry.enroll(contract_name=test_name,
|
||||
contract_address=test_addr,
|
||||
contract_abi=test_abi,
|
||||
contract_version=test_version)
|
||||
|
||||
# Search by name...
|
||||
contract_records = test_registry.search(contract_name=test_name)
|
||||
assert len(contract_records) == 1, 'More than one record for {}'.format(test_name)
|
||||
assert len(contract_records[0]) == 4, 'Registry record is the wrong length'
|
||||
name, version, address, abi = contract_records[0]
|
||||
|
||||
assert name == test_name
|
||||
assert address == test_addr
|
||||
assert abi == test_abi
|
||||
assert version == test_version
|
||||
|
||||
# ...or by address
|
||||
contract_record = test_registry.search(contract_address=test_addr)
|
||||
name, version, address, abi = contract_record
|
||||
|
||||
assert name == test_name
|
||||
assert address == test_addr
|
||||
assert abi == test_abi
|
||||
assert version == test_version
|
||||
|
||||
# Check that searching for an unknown contract raises
|
||||
with pytest.raises(BaseContractRegistry.UnknownContract):
|
||||
test_registry.search(contract_name='this does not exist')
|
||||
|
||||
current_dataset = test_registry.read()
|
||||
# Corrupt the registry with a duplicate address
|
||||
current_dataset.append([test_name, test_addr, test_abi])
|
||||
test_registry.write(current_dataset)
|
||||
|
||||
# Check that searching for an unknown contract raises
|
||||
with pytest.raises(BaseContractRegistry.InvalidRegistry):
|
||||
test_registry.search(contract_address=test_addr)
|
||||
|
||||
|
||||
def test_individual_allocation_registry(get_random_checksum_address,
|
||||
test_registry,
|
||||
tempfile_path,
|
|
@ -15,110 +15,12 @@
|
|||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from decimal import Decimal, InvalidOperation
|
||||
from web3 import Web3
|
||||
|
||||
from nucypher.blockchain.eth.token import NU, Stake
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
|
||||
|
||||
def test_NU(token_economics):
|
||||
|
||||
# Starting Small
|
||||
min_allowed_locked = NU(token_economics.minimum_allowed_locked, 'NuNit')
|
||||
assert token_economics.minimum_allowed_locked == int(min_allowed_locked.to_nunits())
|
||||
|
||||
min_NU_locked = int(str(token_economics.minimum_allowed_locked)[0:-18])
|
||||
expected = NU(min_NU_locked, 'NU')
|
||||
assert min_allowed_locked == expected
|
||||
|
||||
# Starting Big
|
||||
min_allowed_locked = NU(min_NU_locked, 'NU')
|
||||
assert token_economics.minimum_allowed_locked == int(min_allowed_locked)
|
||||
assert token_economics.minimum_allowed_locked == int(min_allowed_locked.to_nunits())
|
||||
assert str(min_allowed_locked) == '15000 NU'
|
||||
|
||||
# Alternate construction
|
||||
assert NU(1, 'NU') == NU('1.0', 'NU') == NU(1.0, 'NU')
|
||||
|
||||
# Arithmetic
|
||||
|
||||
# NUs
|
||||
one_nu = NU(1, 'NU')
|
||||
zero_nu = NU(0, 'NU')
|
||||
one_hundred_nu = NU(100, 'NU')
|
||||
two_hundred_nu = NU(200, 'NU')
|
||||
three_hundred_nu = NU(300, 'NU')
|
||||
|
||||
# Nits
|
||||
one_nu_wei = NU(1, 'NuNit')
|
||||
three_nu_wei = NU(3, 'NuNit')
|
||||
assert three_nu_wei.to_tokens() == Decimal('3E-18')
|
||||
assert one_nu_wei.to_tokens() == Decimal('1E-18')
|
||||
|
||||
# Base Operations
|
||||
assert one_hundred_nu < two_hundred_nu < three_hundred_nu
|
||||
assert one_hundred_nu <= two_hundred_nu <= three_hundred_nu
|
||||
|
||||
assert three_hundred_nu > two_hundred_nu > one_hundred_nu
|
||||
assert three_hundred_nu >= two_hundred_nu >= one_hundred_nu
|
||||
|
||||
assert (one_hundred_nu + two_hundred_nu) == three_hundred_nu
|
||||
assert (three_hundred_nu - two_hundred_nu) == one_hundred_nu
|
||||
|
||||
difference = one_nu - one_nu_wei
|
||||
assert not difference == zero_nu
|
||||
|
||||
actual = float(difference.to_tokens())
|
||||
expected = 0.999999999999999999
|
||||
assert actual == expected
|
||||
|
||||
# 3.14 NU is 3_140_000_000_000_000_000 NuNit
|
||||
pi_nuweis = NU(3.14, 'NU')
|
||||
assert NU('3.14', 'NU') == pi_nuweis.to_nunits() == NU(3_140_000_000_000_000_000, 'NuNit')
|
||||
|
||||
# Mixed type operations
|
||||
difference = NU('3.14159265', 'NU') - NU(1.1, 'NU')
|
||||
assert difference == NU('2.04159265', 'NU')
|
||||
|
||||
result = difference + one_nu_wei
|
||||
assert result == NU(2041592650000000001, 'NuNit')
|
||||
|
||||
# Similar to stake read + metadata operations in Staker
|
||||
collection = [one_hundred_nu, two_hundred_nu, three_hundred_nu]
|
||||
assert sum(collection) == NU('600', 'NU') == NU(600, 'NU') == NU(600.0, 'NU') == NU(600e+18, 'NuNit')
|
||||
|
||||
#
|
||||
# Fractional Inputs
|
||||
#
|
||||
|
||||
# A decimal amount of NuNit (i.e., a fraction of a NuNit)
|
||||
pi_nuweis = NU('3.14', 'NuNit')
|
||||
assert pi_nuweis == three_nu_wei # Floor
|
||||
|
||||
# A decimal amount of NU, which amounts to NuNit with decimals
|
||||
pi_nus = NU('3.14159265358979323846', 'NU')
|
||||
assert pi_nus == NU(3141592653589793238, 'NuNit') # Floor
|
||||
|
||||
# Positive Infinity
|
||||
with pytest.raises(NU.InvalidAmount):
|
||||
_inf = NU(float('infinity'), 'NU')
|
||||
|
||||
# Negative Infinity
|
||||
with pytest.raises(NU.InvalidAmount):
|
||||
_neg_inf = NU(float('-infinity'), 'NU')
|
||||
|
||||
# Not a Number
|
||||
with pytest.raises(InvalidOperation):
|
||||
_nan = NU(float('NaN'), 'NU')
|
||||
|
||||
# Rounding NUs
|
||||
assert round(pi_nus, 2) == NU("3.14", "NU")
|
||||
assert round(pi_nus, 1) == NU("3.1", "NU")
|
||||
assert round(pi_nus, 0) == round(pi_nus) == NU("3", "NU")
|
||||
|
||||
|
||||
def test_stake(testerchain, token_economics, agency):
|
||||
token_agent, staking_agent, _policy_agent = agency
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from nucypher.blockchain.economics import EconomicsFactory
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('agency')
|
||||
def test_retrieving_from_blockchain(token_economics, test_registry):
|
||||
|
||||
economics = EconomicsFactory.get_economics(registry=test_registry)
|
||||
|
||||
assert economics.staking_deployment_parameters == token_economics.staking_deployment_parameters
|
||||
assert economics.slashing_deployment_parameters == token_economics.slashing_deployment_parameters
|
||||
assert economics.worklock_deployment_parameters == token_economics.worklock_deployment_parameters
|
|
@ -0,0 +1,84 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import maya
|
||||
import pytest
|
||||
from umbral.kfrags import KFrag
|
||||
|
||||
from nucypher.crypto.api import keccak_digest
|
||||
from nucypher.policy.collections import PolicyCredential
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('blockchain_ursulas')
|
||||
def test_decentralized_grant(blockchain_alice, blockchain_bob, agency):
|
||||
# Setup the policy details
|
||||
n = 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
label = b"this_is_the_path_to_which_access_is_being_granted"
|
||||
|
||||
# Create the Policy, Granting access to Bob
|
||||
policy = blockchain_alice.grant(bob=blockchain_bob,
|
||||
label=label,
|
||||
m=2,
|
||||
n=n,
|
||||
rate=int(1e18), # one ether
|
||||
expiration=policy_end_datetime)
|
||||
|
||||
# Check the policy ID
|
||||
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
|
||||
assert policy_id == policy.id
|
||||
|
||||
# The number of accepted arrangements at least the number of Ursulas we're using (n)
|
||||
assert len(policy._accepted_arrangements) >= n
|
||||
|
||||
# The number of actually enacted arrangements is exactly equal to n.
|
||||
assert len(policy._enacted_arrangements) == n
|
||||
|
||||
# Let's look at the enacted arrangements.
|
||||
for kfrag in policy.kfrags:
|
||||
arrangement = policy._enacted_arrangements[kfrag]
|
||||
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
retrieved_policy = arrangement.ursula.datastore.get_policy_arrangement(arrangement.id.hex().encode())
|
||||
retrieved_kfrag = KFrag.from_bytes(retrieved_policy.kfrag)
|
||||
|
||||
assert kfrag == retrieved_kfrag
|
||||
|
||||
# Test PolicyCredential w/o TreasureMap
|
||||
credential = policy.credential(with_treasure_map=False)
|
||||
assert credential.alice_verifying_key == policy.alice.stamp
|
||||
assert credential.label == policy.label
|
||||
assert credential.expiration == policy.expiration
|
||||
assert credential.policy_pubkey == policy.public_key
|
||||
assert credential.treasure_map is None
|
||||
|
||||
cred_json = credential.to_json()
|
||||
deserialized_cred = PolicyCredential.from_json(cred_json)
|
||||
assert credential == deserialized_cred
|
||||
|
||||
# Test PolicyCredential w/ TreasureMap
|
||||
credential = policy.credential()
|
||||
assert credential.alice_verifying_key == policy.alice.stamp
|
||||
assert credential.label == policy.label
|
||||
assert credential.expiration == policy.expiration
|
||||
assert credential.policy_pubkey == policy.public_key
|
||||
assert credential.treasure_map == policy.treasure_map
|
||||
|
||||
cred_json = credential.to_json()
|
||||
deserialized_cred = PolicyCredential.from_json(cred_json)
|
||||
assert credential == deserialized_cred
|
|
@ -0,0 +1,71 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
from eth_account._utils.transactions import Transaction
|
||||
from eth_utils import to_checksum_address
|
||||
|
||||
from nucypher.blockchain.eth.signers import Web3Signer
|
||||
from nucypher.characters.lawful import Character
|
||||
from nucypher.crypto.api import verify_eip_191
|
||||
from nucypher.crypto.powers import (TransactingPower)
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
|
||||
|
||||
def test_character_transacting_power_signing(testerchain, agency, test_registry):
|
||||
|
||||
# Pretend to be a character.
|
||||
eth_address = testerchain.etherbase_account
|
||||
signer = Character(is_me=True, registry=test_registry, checksum_address=eth_address)
|
||||
|
||||
# Manually consume the power up
|
||||
transacting_power = TransactingPower(password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
signer=Web3Signer(testerchain.client),
|
||||
account=eth_address)
|
||||
|
||||
signer._crypto_power.consume_power_up(transacting_power)
|
||||
|
||||
# Retrieve the power up
|
||||
power = signer._crypto_power.power_ups(TransactingPower)
|
||||
|
||||
assert power == transacting_power
|
||||
assert testerchain.transacting_power == power
|
||||
|
||||
assert power.is_active is True
|
||||
assert power.is_unlocked is True
|
||||
assert testerchain.transacting_power.is_unlocked is True
|
||||
|
||||
# Sign Message
|
||||
data_to_sign = b'Premium Select Luxury Pencil Holder'
|
||||
signature = power.sign_message(message=data_to_sign)
|
||||
is_verified = verify_eip_191(address=eth_address, message=data_to_sign, signature=signature)
|
||||
assert is_verified is True
|
||||
|
||||
# Sign Transaction
|
||||
transaction_dict = {'nonce': testerchain.client.w3.eth.getTransactionCount(eth_address),
|
||||
'gasPrice': testerchain.client.w3.eth.gasPrice,
|
||||
'gas': 100000,
|
||||
'from': eth_address,
|
||||
'to': testerchain.unassigned_accounts[1],
|
||||
'value': 1,
|
||||
'data': b''}
|
||||
|
||||
signed_transaction = power.sign_transaction(transaction_dict=transaction_dict)
|
||||
|
||||
# Demonstrate that the transaction is valid RLP encoded.
|
||||
restored_transaction = Transaction.from_bytes(serialized_bytes=signed_transaction)
|
||||
restored_dict = restored_transaction.as_dict()
|
||||
assert to_checksum_address(restored_dict['to']) == transaction_dict['to']
|
|
@ -25,30 +25,8 @@ from nucypher.crypto.api import verify_eip_191
|
|||
from nucypher.crypto.powers import SigningPower
|
||||
from nucypher.policy.policies import Policy
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
from tests.utils.middleware import MockRestMiddleware, NodeIsDownMiddleware
|
||||
from tests.utils.ursula import make_decentralized_ursulas, make_federated_ursulas
|
||||
|
||||
|
||||
def test_new_federated_ursula_announces_herself(ursula_federated_test_config):
|
||||
ursula_in_a_house, ursula_with_a_mouse = make_federated_ursulas(ursula_config=ursula_federated_test_config,
|
||||
quantity=2,
|
||||
know_each_other=False,
|
||||
network_middleware=MockRestMiddleware())
|
||||
|
||||
# Neither Ursula knows about the other.
|
||||
assert ursula_in_a_house.known_nodes == ursula_with_a_mouse.known_nodes
|
||||
|
||||
ursula_in_a_house.remember_node(ursula_with_a_mouse)
|
||||
|
||||
# OK, now, ursula_in_a_house knows about ursula_with_a_mouse, but not vice-versa.
|
||||
assert ursula_with_a_mouse in ursula_in_a_house.known_nodes
|
||||
assert ursula_in_a_house not in ursula_with_a_mouse.known_nodes
|
||||
|
||||
# But as ursula_in_a_house learns, she'll announce herself to ursula_with_a_mouse.
|
||||
ursula_in_a_house.learn_from_teacher_node()
|
||||
|
||||
assert ursula_with_a_mouse in ursula_in_a_house.known_nodes
|
||||
assert ursula_in_a_house in ursula_with_a_mouse.known_nodes
|
||||
from tests.utils.middleware import NodeIsDownMiddleware
|
||||
from tests.utils.ursula import make_decentralized_ursulas
|
||||
|
||||
|
||||
def test_stakers_bond_to_ursulas(testerchain, test_registry, stakers, ursula_decentralized_test_config):
|
|
@ -15,21 +15,14 @@
|
|||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
import os
|
||||
import pytest
|
||||
from bytestring_splitter import VariableLengthBytestring
|
||||
from constant_sorrow.constants import NOT_SIGNED
|
||||
from eth_utils.address import to_checksum_address
|
||||
from twisted.logger import LogLevel, globalLogPublisher
|
||||
|
||||
from nucypher.characters.base import Character
|
||||
from nucypher.crypto.powers import TransactingPower
|
||||
from nucypher.network.nicknames import nickname_from_seed
|
||||
from nucypher.network.nodes import FleetStateTracker, Learner
|
||||
from tests.utils.middleware import MockRestMiddleware
|
||||
from tests.utils.ursula import make_federated_ursulas, make_ursula_for_staker
|
||||
from tests.utils.ursula import make_ursula_for_staker
|
||||
|
||||
|
||||
def test_blockchain_ursula_stamp_verification_tolerance(blockchain_ursulas, mocker):
|
||||
|
@ -176,98 +169,3 @@ def test_invalid_workers_tolerance(testerchain,
|
|||
assert worker not in lonely_blockchain_learner.known_nodes
|
||||
|
||||
# TODO: Write a similar test but for detached worker (#1075)
|
||||
|
||||
|
||||
def test_emit_warning_upon_new_version(ursula_federated_test_config, caplog):
|
||||
nodes = make_federated_ursulas(ursula_config=ursula_federated_test_config,
|
||||
quantity=3,
|
||||
know_each_other=False)
|
||||
teacher, learner, new_node = nodes
|
||||
|
||||
learner.remember_node(teacher)
|
||||
teacher.remember_node(learner)
|
||||
teacher.remember_node(new_node)
|
||||
|
||||
new_node.TEACHER_VERSION = learner.LEARNER_VERSION + 1
|
||||
|
||||
warnings = []
|
||||
|
||||
def warning_trapper(event):
|
||||
if event['log_level'] == LogLevel.warn:
|
||||
warnings.append(event)
|
||||
|
||||
globalLogPublisher.addObserver(warning_trapper)
|
||||
learner.learn_from_teacher_node()
|
||||
|
||||
assert len(warnings) == 1
|
||||
assert warnings[0]['log_format'] == learner.unknown_version_message.format(new_node,
|
||||
new_node.TEACHER_VERSION,
|
||||
learner.LEARNER_VERSION)
|
||||
|
||||
# Now let's go a little further: make the version totally unrecognizable.
|
||||
|
||||
# First, there's enough garbage to at least scrape a potential checksum address
|
||||
fleet_snapshot = os.urandom(32 + 4)
|
||||
random_bytes = os.urandom(50) # lots of garbage in here
|
||||
future_version = learner.LEARNER_VERSION + 42
|
||||
version_bytes = future_version.to_bytes(2, byteorder="big")
|
||||
crazy_bytes = fleet_snapshot + VariableLengthBytestring(version_bytes + random_bytes)
|
||||
signed_crazy_bytes = bytes(teacher.stamp(crazy_bytes))
|
||||
|
||||
Response = namedtuple("MockResponse", ("content", "status_code"))
|
||||
response = Response(content=signed_crazy_bytes + crazy_bytes, status_code=200)
|
||||
|
||||
learner._current_teacher_node = teacher
|
||||
learner.network_middleware.get_nodes_via_rest = lambda *args, **kwargs: response
|
||||
learner.learn_from_teacher_node()
|
||||
|
||||
# If you really try, you can read a node representation from the garbage
|
||||
accidental_checksum = to_checksum_address(random_bytes[:20])
|
||||
accidental_nickname = nickname_from_seed(accidental_checksum)[0]
|
||||
accidental_node_repr = Character._display_name_template.format("Ursula", accidental_nickname, accidental_checksum)
|
||||
|
||||
assert len(warnings) == 2
|
||||
assert warnings[1]['log_format'] == learner.unknown_version_message.format(accidental_node_repr,
|
||||
future_version,
|
||||
learner.LEARNER_VERSION)
|
||||
|
||||
# This time, however, there's not enough garbage to assume there's a checksum address...
|
||||
random_bytes = os.urandom(2)
|
||||
crazy_bytes = fleet_snapshot + VariableLengthBytestring(version_bytes + random_bytes)
|
||||
signed_crazy_bytes = bytes(teacher.stamp(crazy_bytes))
|
||||
|
||||
response = Response(content=signed_crazy_bytes + crazy_bytes, status_code=200)
|
||||
|
||||
learner._current_teacher_node = teacher
|
||||
learner.learn_from_teacher_node()
|
||||
|
||||
assert len(warnings) == 3
|
||||
# ...so this time we get a "really unknown version message"
|
||||
assert warnings[2]['log_format'] == learner.really_unknown_version_message.format(future_version,
|
||||
learner.LEARNER_VERSION)
|
||||
|
||||
globalLogPublisher.removeObserver(warning_trapper)
|
||||
|
||||
|
||||
def test_node_posts_future_version(federated_ursulas):
|
||||
ursula = list(federated_ursulas)[0]
|
||||
middleware = MockRestMiddleware()
|
||||
|
||||
warnings = []
|
||||
|
||||
def warning_trapper(event):
|
||||
if event['log_level'] == LogLevel.warn:
|
||||
warnings.append(event)
|
||||
|
||||
globalLogPublisher.addObserver(warning_trapper)
|
||||
|
||||
crazy_node = b"invalid-node"
|
||||
middleware.get_nodes_via_rest(node=ursula,
|
||||
announce_nodes=(crazy_node,))
|
||||
assert len(warnings) == 1
|
||||
future_node = list(federated_ursulas)[1]
|
||||
future_node.TEACHER_VERSION = future_node.TEACHER_VERSION + 10
|
||||
future_node_bytes = bytes(future_node)
|
||||
middleware.get_nodes_via_rest(node=ursula,
|
||||
announce_nodes=(future_node_bytes,))
|
||||
assert len(warnings) == 2
|
|
@ -57,80 +57,6 @@ def test_blockchain_alice_finds_ursula_via_rest(blockchain_alice, blockchain_urs
|
|||
assert ursula in blockchain_alice.known_nodes
|
||||
|
||||
|
||||
def test_alice_creates_policy_with_correct_hrac(idle_federated_policy):
|
||||
"""
|
||||
Alice creates a Policy. It has the proper HRAC, unique per her, Bob, and the label
|
||||
"""
|
||||
alice = idle_federated_policy.alice
|
||||
bob = idle_federated_policy.bob
|
||||
|
||||
assert idle_federated_policy.hrac() == keccak_digest(bytes(alice.stamp)
|
||||
+ bytes(bob.stamp)
|
||||
+ idle_federated_policy.label)
|
||||
|
||||
|
||||
def test_alice_sets_treasure_map(enacted_federated_policy, federated_ursulas):
|
||||
"""
|
||||
Having enacted all the policies of a PolicyGroup, Alice creates a TreasureMap and ...... TODO
|
||||
"""
|
||||
enacted_federated_policy.publish_treasure_map(network_middleware=MockRestMiddleware())
|
||||
treasure_map_index = bytes.fromhex(enacted_federated_policy.treasure_map.public_id())
|
||||
treasure_map_as_set_on_network = list(federated_ursulas)[0].treasure_maps[treasure_map_index]
|
||||
assert treasure_map_as_set_on_network == enacted_federated_policy.treasure_map
|
||||
|
||||
|
||||
def test_treasure_map_stored_by_ursula_is_the_correct_one_for_bob(federated_alice, federated_bob, federated_ursulas,
|
||||
enacted_federated_policy):
|
||||
"""
|
||||
The TreasureMap given by Alice to Ursula is the correct one for Bob; he can decrypt and read it.
|
||||
"""
|
||||
|
||||
treasure_map_index = bytes.fromhex(enacted_federated_policy.treasure_map.public_id())
|
||||
treasure_map_as_set_on_network = list(federated_ursulas)[0].treasure_maps[treasure_map_index]
|
||||
|
||||
hrac_by_bob = federated_bob.construct_policy_hrac(federated_alice.stamp, enacted_federated_policy.label)
|
||||
assert enacted_federated_policy.hrac() == hrac_by_bob
|
||||
|
||||
hrac, map_id_by_bob = federated_bob.construct_hrac_and_map_id(federated_alice.stamp, enacted_federated_policy.label)
|
||||
assert map_id_by_bob == treasure_map_as_set_on_network.public_id()
|
||||
|
||||
|
||||
def test_bob_can_retreive_the_treasure_map_and_decrypt_it(enacted_federated_policy, federated_ursulas):
|
||||
"""
|
||||
Above, we showed that the TreasureMap saved on the network is the correct one for Bob. Here, we show
|
||||
that Bob can retrieve it with only the information about which he is privy pursuant to the PolicyGroup.
|
||||
"""
|
||||
bob = enacted_federated_policy.bob
|
||||
|
||||
# Of course, in the real world, Bob has sufficient information to reconstitute a PolicyGroup, gleaned, we presume,
|
||||
# through a side-channel with Alice.
|
||||
|
||||
# If Bob doesn't know about any Ursulas, he can't find the TreasureMap via the REST swarm:
|
||||
with pytest.raises(bob.NotEnoughTeachers):
|
||||
treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp,
|
||||
enacted_federated_policy.label)
|
||||
|
||||
# Bob finds out about one Ursula (in the real world, a seed node)
|
||||
bob.remember_node(list(federated_ursulas)[0])
|
||||
|
||||
# ...and then learns about the rest of the network.
|
||||
bob.learn_from_teacher_node(eager=True)
|
||||
|
||||
# Now he'll have better success finding that map.
|
||||
treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp,
|
||||
enacted_federated_policy.label)
|
||||
|
||||
assert enacted_federated_policy.treasure_map == treasure_map_from_wire
|
||||
|
||||
|
||||
def test_treasure_map_is_legit(enacted_federated_policy):
|
||||
"""
|
||||
Sure, the TreasureMap can get to Bob, but we also need to know that each Ursula in the TreasureMap is on the network.
|
||||
"""
|
||||
for ursula_address, _node_id in enacted_federated_policy.treasure_map:
|
||||
assert ursula_address in enacted_federated_policy.bob.known_nodes.addresses()
|
||||
|
||||
|
||||
@pytest.mark.skip("See Issue #1075") # TODO: Issue #1075
|
||||
def test_vladimir_illegal_interface_key_does_not_propagate(blockchain_ursulas):
|
||||
"""
|
||||
|
@ -203,28 +129,3 @@ def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_ali
|
|||
idle_blockchain_policy.consider_arrangement(network_middleware=blockchain_alice.network_middleware,
|
||||
arrangement=FakeArrangement(),
|
||||
ursula=vladimir)
|
||||
|
||||
|
||||
def test_alice_does_not_update_with_old_ursula_info(federated_alice, federated_ursulas):
|
||||
ursula = list(federated_ursulas)[0]
|
||||
old_metadata = bytes(ursula)
|
||||
|
||||
# Alice has remembered Ursula.
|
||||
assert federated_alice.known_nodes[ursula.checksum_address] == ursula
|
||||
|
||||
# But now, Ursula wants to sign and date her interface info again. This causes a new timestamp.
|
||||
ursula._sign_and_date_interface_info()
|
||||
|
||||
# Indeed, her metadata is not the same now.
|
||||
assert bytes(ursula) != old_metadata
|
||||
|
||||
old_ursula = Ursula.from_bytes(old_metadata)
|
||||
|
||||
# Once Alice learns about Ursula's updated info...
|
||||
federated_alice.remember_node(ursula)
|
||||
|
||||
# ...she can't learn about old ursula anymore.
|
||||
federated_alice.remember_node(old_ursula)
|
||||
|
||||
new_metadata = bytes(federated_alice.known_nodes[ursula.checksum_address])
|
||||
assert new_metadata != old_metadata
|
|
@ -1,289 +0,0 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import maya
|
||||
import os
|
||||
import pytest
|
||||
from umbral.kfrags import KFrag
|
||||
|
||||
from nucypher.characters.lawful import Bob, Enrico
|
||||
from nucypher.config.characters import AliceConfiguration
|
||||
from nucypher.crypto.api import keccak_digest
|
||||
from nucypher.crypto.powers import DecryptingPower, SigningPower
|
||||
from nucypher.policy.collections import PolicyCredential, Revocation
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
from tests.utils.middleware import MockRestMiddleware
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('blockchain_ursulas')
|
||||
def test_decentralized_grant(blockchain_alice, blockchain_bob, agency):
|
||||
# Setup the policy details
|
||||
n = 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
label = b"this_is_the_path_to_which_access_is_being_granted"
|
||||
|
||||
# Create the Policy, Granting access to Bob
|
||||
policy = blockchain_alice.grant(bob=blockchain_bob,
|
||||
label=label,
|
||||
m=2,
|
||||
n=n,
|
||||
rate=int(1e18), # one ether
|
||||
expiration=policy_end_datetime)
|
||||
|
||||
# Check the policy ID
|
||||
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
|
||||
assert policy_id == policy.id
|
||||
|
||||
# The number of accepted arrangements at least the number of Ursulas we're using (n)
|
||||
assert len(policy._accepted_arrangements) >= n
|
||||
|
||||
# The number of actually enacted arrangements is exactly equal to n.
|
||||
assert len(policy._enacted_arrangements) == n
|
||||
|
||||
# Let's look at the enacted arrangements.
|
||||
for kfrag in policy.kfrags:
|
||||
arrangement = policy._enacted_arrangements[kfrag]
|
||||
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
retrieved_policy = arrangement.ursula.datastore.get_policy_arrangement(arrangement.id.hex().encode())
|
||||
retrieved_kfrag = KFrag.from_bytes(retrieved_policy.kfrag)
|
||||
|
||||
assert kfrag == retrieved_kfrag
|
||||
|
||||
# Test PolicyCredential w/o TreasureMap
|
||||
credential = policy.credential(with_treasure_map=False)
|
||||
assert credential.alice_verifying_key == policy.alice.stamp
|
||||
assert credential.label == policy.label
|
||||
assert credential.expiration == policy.expiration
|
||||
assert credential.policy_pubkey == policy.public_key
|
||||
assert credential.treasure_map is None
|
||||
|
||||
cred_json = credential.to_json()
|
||||
deserialized_cred = PolicyCredential.from_json(cred_json)
|
||||
assert credential == deserialized_cred
|
||||
|
||||
# Test PolicyCredential w/ TreasureMap
|
||||
credential = policy.credential()
|
||||
assert credential.alice_verifying_key == policy.alice.stamp
|
||||
assert credential.label == policy.label
|
||||
assert credential.expiration == policy.expiration
|
||||
assert credential.policy_pubkey == policy.public_key
|
||||
assert credential.treasure_map == policy.treasure_map
|
||||
|
||||
cred_json = credential.to_json()
|
||||
deserialized_cred = PolicyCredential.from_json(cred_json)
|
||||
assert credential == deserialized_cred
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('federated_ursulas')
|
||||
def test_federated_grant(federated_alice, federated_bob):
|
||||
# Setup the policy details
|
||||
m, n = 2, 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
label = b"this_is_the_path_to_which_access_is_being_granted"
|
||||
|
||||
# Create the Policy, granting access to Bob
|
||||
policy = federated_alice.grant(federated_bob, label, m=m, n=n, expiration=policy_end_datetime)
|
||||
|
||||
# Check the policy ID
|
||||
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
|
||||
assert policy_id == policy.id
|
||||
|
||||
# Check Alice's active policies
|
||||
assert policy_id in federated_alice.active_policies
|
||||
assert federated_alice.active_policies[policy_id] == policy
|
||||
|
||||
# The number of accepted arrangements at least the number of Ursulas we're using (n)
|
||||
assert len(policy._accepted_arrangements) >= n
|
||||
|
||||
# The number of actually enacted arrangements is exactly equal to n.
|
||||
assert len(policy._enacted_arrangements) == n
|
||||
|
||||
# Let's look at the enacted arrangements.
|
||||
for kfrag in policy.kfrags:
|
||||
arrangement = policy._enacted_arrangements[kfrag]
|
||||
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
retrieved_policy = arrangement.ursula.datastore.get_policy_arrangement(arrangement.id.hex().encode())
|
||||
retrieved_kfrag = KFrag.from_bytes(retrieved_policy.kfrag)
|
||||
|
||||
assert kfrag == retrieved_kfrag
|
||||
|
||||
|
||||
def test_federated_alice_can_decrypt(federated_alice, federated_bob):
|
||||
"""
|
||||
Test that alice can decrypt data encrypted by an enrico
|
||||
for her own derived policy pubkey.
|
||||
"""
|
||||
|
||||
# Setup the policy details
|
||||
m, n = 2, 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
label = b"this_is_the_path_to_which_access_is_being_granted"
|
||||
|
||||
policy = federated_alice.create_policy(
|
||||
bob=federated_bob,
|
||||
label=label,
|
||||
m=m,
|
||||
n=n,
|
||||
expiration=policy_end_datetime,
|
||||
)
|
||||
|
||||
enrico = Enrico.from_alice(
|
||||
federated_alice,
|
||||
policy.label,
|
||||
)
|
||||
plaintext = b"this is the first thing i'm encrypting ever."
|
||||
|
||||
# use the enrico to encrypt the message
|
||||
message_kit, signature = enrico.encrypt_message(plaintext)
|
||||
|
||||
# decrypt the data
|
||||
decrypted_data = federated_alice.verify_from(
|
||||
enrico,
|
||||
message_kit,
|
||||
signature=signature,
|
||||
decrypt=True,
|
||||
label=policy.label
|
||||
)
|
||||
|
||||
assert plaintext == decrypted_data
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('federated_ursulas')
|
||||
def test_revocation(federated_alice, federated_bob):
|
||||
m, n = 2, 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
label = b"revocation test"
|
||||
|
||||
policy = federated_alice.grant(federated_bob, label, m=m, n=n, expiration=policy_end_datetime)
|
||||
|
||||
# Test that all arrangements are included in the RevocationKit
|
||||
for node_id, arrangement_id in policy.treasure_map:
|
||||
assert policy.revocation_kit[node_id].arrangement_id == arrangement_id
|
||||
|
||||
# Test revocation kit's signatures
|
||||
for revocation in policy.revocation_kit:
|
||||
assert revocation.verify_signature(federated_alice.stamp.as_umbral_pubkey())
|
||||
|
||||
# Test Revocation deserialization
|
||||
revocation = policy.revocation_kit[node_id]
|
||||
revocation_bytes = bytes(revocation)
|
||||
deserialized_revocation = Revocation.from_bytes(revocation_bytes)
|
||||
assert deserialized_revocation == revocation
|
||||
|
||||
# Attempt to revoke the new policy
|
||||
failed_revocations = federated_alice.revoke(policy)
|
||||
assert len(failed_revocations) == 0
|
||||
|
||||
# Try to revoke the already revoked policy
|
||||
already_revoked = federated_alice.revoke(policy)
|
||||
assert len(already_revoked) == 3
|
||||
|
||||
|
||||
def test_alices_powers_are_persistent(federated_ursulas, tmpdir):
|
||||
# Create a non-learning AliceConfiguration
|
||||
alice_config = AliceConfiguration(
|
||||
config_root=os.path.join(tmpdir, 'nucypher-custom-alice-config'),
|
||||
network_middleware=MockRestMiddleware(),
|
||||
known_nodes=federated_ursulas,
|
||||
start_learning_now=False,
|
||||
federated_only=True,
|
||||
save_metadata=False,
|
||||
reload_metadata=False
|
||||
)
|
||||
|
||||
# Generate keys and write them the disk
|
||||
alice_config.initialize(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
# Unlock Alice's keyring
|
||||
alice_config.keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
# Produce an Alice
|
||||
alice = alice_config() # or alice_config.produce()
|
||||
|
||||
# Save Alice's node configuration file to disk for later use
|
||||
alice_config_file = alice_config.to_configuration_file()
|
||||
|
||||
# Let's save Alice's public keys too to check they are correctly restored later
|
||||
alices_verifying_key = alice.public_keys(SigningPower)
|
||||
alices_receiving_key = alice.public_keys(DecryptingPower)
|
||||
|
||||
# Next, let's fix a label for all the policies we will create later.
|
||||
label = b"this_is_the_path_to_which_access_is_being_granted"
|
||||
|
||||
# Even before creating the policies, we can know what will be its public key.
|
||||
# This can be used by Enrico (i.e., a Data Source) to encrypt messages
|
||||
# before Alice grants access to Bobs.
|
||||
policy_pubkey = alice.get_policy_encrypting_key_from_label(label)
|
||||
|
||||
# Now, let's create a policy for some Bob.
|
||||
m, n = 3, 4
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
|
||||
bob = Bob(federated_only=True,
|
||||
start_learning_now=False,
|
||||
network_middleware=MockRestMiddleware())
|
||||
|
||||
bob_policy = alice.grant(bob, label, m=m, n=n, expiration=policy_end_datetime)
|
||||
|
||||
assert policy_pubkey == bob_policy.public_key
|
||||
|
||||
# ... and Alice and her configuration disappear.
|
||||
del alice
|
||||
del alice_config
|
||||
|
||||
###################################
|
||||
# Some time passes. #
|
||||
# ... #
|
||||
# (jmyles plays the Song of Time) #
|
||||
# ... #
|
||||
# Alice appears again. #
|
||||
###################################
|
||||
|
||||
# A new Alice is restored from the configuration file
|
||||
new_alice_config = AliceConfiguration.from_configuration_file(
|
||||
filepath=alice_config_file,
|
||||
network_middleware=MockRestMiddleware(),
|
||||
known_nodes=federated_ursulas,
|
||||
start_learning_now=False,
|
||||
)
|
||||
|
||||
# Alice unlocks her restored keyring from disk
|
||||
new_alice_config.attach_keyring()
|
||||
new_alice_config.keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
new_alice = new_alice_config()
|
||||
|
||||
# First, we check that her public keys are correctly restored
|
||||
assert alices_verifying_key == new_alice.public_keys(SigningPower)
|
||||
assert alices_receiving_key == new_alice.public_keys(DecryptingPower)
|
||||
|
||||
# Bob's eldest brother, Roberto, appears too
|
||||
roberto = Bob(federated_only=True,
|
||||
start_learning_now=False,
|
||||
network_middleware=MockRestMiddleware())
|
||||
|
||||
# Alice creates a new policy for Roberto. Note how all the parameters
|
||||
# except for the label (i.e., recipient, m, n, policy_end) are different
|
||||
# from previous policy
|
||||
m, n = 2, 5
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=3)
|
||||
roberto_policy = new_alice.grant(roberto, label, m=m, n=n, expiration=policy_end_datetime)
|
||||
|
||||
# Both policies must share the same public key (i.e., the policy public key)
|
||||
assert policy_pubkey == roberto_policy.public_key
|
|
@ -1,284 +0,0 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
|
||||
import pytest
|
||||
from constant_sorrow import constants
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from eth_account._utils.transactions import Transaction
|
||||
from eth_utils import to_checksum_address
|
||||
|
||||
from nucypher.blockchain.eth.signers import Web3Signer
|
||||
from nucypher.characters.lawful import Alice, Bob, Character, Enrico
|
||||
from nucypher.crypto import api
|
||||
from nucypher.crypto.api import verify_eip_191
|
||||
from nucypher.crypto.powers import (CryptoPower, NoSigningPower, SigningPower, TransactingPower)
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
|
||||
"""
|
||||
Chapter 1: SIGNING
|
||||
"""
|
||||
|
||||
|
||||
def test_actor_without_signing_power_cannot_sign():
|
||||
"""
|
||||
We can create a Character with no real CryptoPower to speak of.
|
||||
This Character can't even sign a message.
|
||||
"""
|
||||
cannot_sign = CryptoPower(power_ups=[])
|
||||
non_signer = Character(crypto_power=cannot_sign,
|
||||
start_learning_now=False,
|
||||
federated_only=True)
|
||||
|
||||
# The non-signer's stamp doesn't work for signing...
|
||||
with pytest.raises(NoSigningPower):
|
||||
non_signer.stamp("something")
|
||||
|
||||
# ...or as a way to cast the (non-existent) public key to bytes.
|
||||
with pytest.raises(NoSigningPower):
|
||||
bytes(non_signer.stamp)
|
||||
|
||||
|
||||
def test_actor_with_signing_power_can_sign():
|
||||
"""
|
||||
However, simply giving that character a PowerUp bestows the power to sign.
|
||||
|
||||
Instead of having a Character verify the signature, we'll use the lower level API.
|
||||
"""
|
||||
message = b"Llamas."
|
||||
|
||||
signer = Character(crypto_power_ups=[SigningPower], is_me=True,
|
||||
start_learning_now=False, federated_only=True)
|
||||
stamp_of_the_signer = signer.stamp
|
||||
|
||||
# We can use the signer's stamp to sign a message (since the signer is_me)...
|
||||
signature = stamp_of_the_signer(message)
|
||||
|
||||
# ...or to get the signer's public key for verification purposes.
|
||||
# (note: we use the private _der_encoded_bytes here to test directly against the API, instead of Character)
|
||||
verification = api.verify_ecdsa(message, signature._der_encoded_bytes(),
|
||||
stamp_of_the_signer.as_umbral_pubkey())
|
||||
|
||||
assert verification is True
|
||||
|
||||
|
||||
def test_anybody_can_verify():
|
||||
"""
|
||||
In the last example, we used the lower-level Crypto API to verify the signature.
|
||||
|
||||
Here, we show that anybody can do it without needing to directly access Crypto.
|
||||
"""
|
||||
# Alice can sign by default, by dint of her _default_crypto_powerups.
|
||||
alice = Alice(federated_only=True, start_learning_now=False)
|
||||
|
||||
# So, our story is fairly simple: an everyman meets Alice.
|
||||
somebody = Character(start_learning_now=False, federated_only=True)
|
||||
|
||||
# Alice signs a message.
|
||||
message = b"A message for all my friends who can only verify and not sign."
|
||||
signature = alice.stamp(message)
|
||||
|
||||
# Our everyman can verify it.
|
||||
cleartext = somebody.verify_from(alice, message, signature, decrypt=False)
|
||||
assert cleartext is constants.NO_DECRYPTION_PERFORMED
|
||||
|
||||
# Of course, verification fails with any fake message
|
||||
with pytest.raises(InvalidSignature):
|
||||
fake = b"McLovin 892 Momona St. Honolulu, HI 96820"
|
||||
_ = somebody.verify_from(alice, fake, signature, decrypt=False)
|
||||
|
||||
# Signature verification also works when Alice is not living with our
|
||||
# everyman in the same process, and he only knows her by her public key
|
||||
alice_pubkey_bytes = bytes(alice.stamp)
|
||||
hearsay_alice = Character.from_public_keys({SigningPower: alice_pubkey_bytes})
|
||||
|
||||
cleartext = somebody.verify_from(hearsay_alice, message, signature, decrypt=False)
|
||||
assert cleartext is constants.NO_DECRYPTION_PERFORMED
|
||||
|
||||
hearsay_alice = Character.from_public_keys(verifying_key=alice_pubkey_bytes)
|
||||
|
||||
cleartext = somebody.verify_from(hearsay_alice, message, signature, decrypt=False)
|
||||
assert cleartext is constants.NO_DECRYPTION_PERFORMED
|
||||
|
||||
|
||||
def test_character_transacting_power_signing(testerchain, agency, test_registry):
|
||||
|
||||
# Pretend to be a character.
|
||||
eth_address = testerchain.etherbase_account
|
||||
signer = Character(is_me=True, registry=test_registry, checksum_address=eth_address)
|
||||
|
||||
# Manually consume the power up
|
||||
transacting_power = TransactingPower(password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
signer=Web3Signer(testerchain.client),
|
||||
account=eth_address)
|
||||
|
||||
signer._crypto_power.consume_power_up(transacting_power)
|
||||
|
||||
# Retrieve the power up
|
||||
power = signer._crypto_power.power_ups(TransactingPower)
|
||||
|
||||
assert power == transacting_power
|
||||
assert testerchain.transacting_power == power
|
||||
|
||||
assert power.is_active is True
|
||||
assert power.is_unlocked is True
|
||||
assert testerchain.transacting_power.is_unlocked is True
|
||||
|
||||
# Sign Message
|
||||
data_to_sign = b'Premium Select Luxury Pencil Holder'
|
||||
signature = power.sign_message(message=data_to_sign)
|
||||
is_verified = verify_eip_191(address=eth_address, message=data_to_sign, signature=signature)
|
||||
assert is_verified is True
|
||||
|
||||
# Sign Transaction
|
||||
transaction_dict = {'nonce': testerchain.client.w3.eth.getTransactionCount(eth_address),
|
||||
'gasPrice': testerchain.client.w3.eth.gasPrice,
|
||||
'gas': 100000,
|
||||
'from': eth_address,
|
||||
'to': testerchain.unassigned_accounts[1],
|
||||
'value': 1,
|
||||
'data': b''}
|
||||
|
||||
signed_transaction = power.sign_transaction(transaction_dict=transaction_dict)
|
||||
|
||||
# Demonstrate that the transaction is valid RLP encoded.
|
||||
restored_transaction = Transaction.from_bytes(serialized_bytes=signed_transaction)
|
||||
restored_dict = restored_transaction.as_dict()
|
||||
assert to_checksum_address(restored_dict['to']) == transaction_dict['to']
|
||||
|
||||
|
||||
"""
|
||||
Chapter 2: ENCRYPTION
|
||||
"""
|
||||
|
||||
|
||||
def test_anybody_can_encrypt():
|
||||
"""
|
||||
Similar to anybody_can_verify() above; we show that anybody can encrypt.
|
||||
"""
|
||||
someone = Character(start_learning_now=False, federated_only=True)
|
||||
bob = Bob(is_me=False, federated_only=True)
|
||||
|
||||
cleartext = b"This is Officer Rod Farva. Come in, Ursula! Come in Ursula!"
|
||||
|
||||
ciphertext, signature = someone.encrypt_for(bob, cleartext, sign=False)
|
||||
|
||||
assert signature == constants.NOT_SIGNED
|
||||
assert ciphertext is not None
|
||||
|
||||
|
||||
def test_node_deployer(federated_ursulas):
|
||||
for ursula in federated_ursulas:
|
||||
deployer = ursula.get_deployer()
|
||||
assert deployer.options['https_port'] == ursula.rest_information()[0].port
|
||||
assert deployer.application == ursula.rest_app
|
||||
|
||||
|
||||
"""
|
||||
What follows are various combinations of signing and encrypting, to match
|
||||
real-world scenarios.
|
||||
"""
|
||||
|
||||
|
||||
def test_sign_cleartext_and_encrypt(federated_alice, federated_bob):
|
||||
"""
|
||||
Exhibit One: federated_alice signs the cleartext and encrypts her signature inside
|
||||
the ciphertext.
|
||||
"""
|
||||
message = b"Have you accepted my answer on StackOverflow yet?"
|
||||
|
||||
message_kit, _signature = federated_alice.encrypt_for(federated_bob, message,
|
||||
sign_plaintext=True)
|
||||
|
||||
# Notice that our function still returns the signature here, in case federated_alice
|
||||
# wants to do something else with it, such as post it publicly for later
|
||||
# public verifiability.
|
||||
|
||||
# However, we can expressly refrain from passing the Signature, and the
|
||||
# verification still works:
|
||||
cleartext = federated_bob.verify_from(federated_alice, message_kit, signature=None,
|
||||
decrypt=True)
|
||||
assert cleartext == message
|
||||
|
||||
|
||||
def test_encrypt_and_sign_the_ciphertext(federated_alice, federated_bob):
|
||||
"""
|
||||
Now, federated_alice encrypts first and then signs the ciphertext, providing a
|
||||
Signature that is completely separate from the message.
|
||||
This is useful in a scenario in which federated_bob needs to prove authenticity
|
||||
publicly without disclosing contents.
|
||||
"""
|
||||
message = b"We have a reaaall problem."
|
||||
message_kit, signature = federated_alice.encrypt_for(federated_bob, message,
|
||||
sign_plaintext=False)
|
||||
cleartext = federated_bob.verify_from(federated_alice, message_kit, signature, decrypt=True)
|
||||
assert cleartext == message
|
||||
|
||||
|
||||
def test_encrypt_and_sign_including_signature_in_both_places(federated_alice, federated_bob):
|
||||
"""
|
||||
Same as above, but showing that we can include the signature in both
|
||||
the plaintext (to be found upon decryption) and also passed into
|
||||
verify_from() (eg, gleaned over a side-channel).
|
||||
"""
|
||||
message = b"We have a reaaall problem."
|
||||
message_kit, signature = federated_alice.encrypt_for(federated_bob, message,
|
||||
sign_plaintext=True)
|
||||
cleartext = federated_bob.verify_from(federated_alice, message_kit, signature,
|
||||
decrypt=True)
|
||||
assert cleartext == message
|
||||
|
||||
|
||||
def test_encrypt_but_do_not_sign(federated_alice, federated_bob):
|
||||
"""
|
||||
Finally, federated_alice encrypts but declines to sign.
|
||||
This is useful in a scenario in which federated_alice wishes to plausibly disavow
|
||||
having created this content.
|
||||
"""
|
||||
# TODO: How do we accurately demonstrate this test safely, if at all?
|
||||
message = b"If Bonnie comes home and finds an unencrypted private key in her keystore, I'm gonna get divorced."
|
||||
|
||||
# Alice might also want to encrypt a message but *not* sign it, in order
|
||||
# to refrain from creating evidence that can prove she was the
|
||||
# original sender.
|
||||
message_kit, not_signature = federated_alice.encrypt_for(federated_bob, message, sign=False)
|
||||
|
||||
# The message is not signed...
|
||||
assert not_signature == constants.NOT_SIGNED
|
||||
|
||||
# ...and thus, the message is not verified.
|
||||
with pytest.raises(InvalidSignature):
|
||||
federated_bob.verify_from(federated_alice, message_kit, decrypt=True)
|
||||
|
||||
|
||||
def test_alice_can_decrypt(federated_alice):
|
||||
label = b"boring test label"
|
||||
|
||||
policy_pubkey = federated_alice.get_policy_encrypting_key_from_label(label)
|
||||
|
||||
enrico = Enrico(policy_encrypting_key=policy_pubkey)
|
||||
|
||||
message = b"boring test message"
|
||||
message_kit, signature = enrico.encrypt_message(message=message)
|
||||
|
||||
# Interesting thing: if Alice wants to decrypt, she needs to provide the label directly.
|
||||
cleartext = federated_alice.verify_from(stranger=enrico,
|
||||
message_kit=message_kit,
|
||||
signature=signature,
|
||||
decrypt=True,
|
||||
label=label)
|
||||
assert cleartext == message
|
|
@ -1,132 +0,0 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
from io import StringIO
|
||||
|
||||
import os
|
||||
import pytest
|
||||
import shutil
|
||||
from click.testing import CliRunner
|
||||
from datetime import datetime
|
||||
|
||||
from nucypher.blockchain.eth.registry import InMemoryContractRegistry, LocalContractRegistry
|
||||
from nucypher.characters.control.emitters import StdoutEmitter
|
||||
from nucypher.config.characters import StakeHolderConfiguration, UrsulaConfiguration
|
||||
from tests.constants import (
|
||||
BASE_TEMP_DIR,
|
||||
BASE_TEMP_PREFIX,
|
||||
DATETIME_FORMAT,
|
||||
MOCK_ALLOCATION_INFILE,
|
||||
MOCK_CUSTOM_INSTALLATION_PATH,
|
||||
MOCK_CUSTOM_INSTALLATION_PATH_2
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function', autouse=True)
|
||||
def stdout_trap(mocker):
|
||||
trap = StringIO()
|
||||
mocker.patch('sys.stdout', new=trap)
|
||||
return trap
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def test_emitter(mocker, stdout_trap):
|
||||
mocker.patch('sys.stdout', new=stdout_trap)
|
||||
return StdoutEmitter()
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def click_runner():
|
||||
runner = CliRunner()
|
||||
yield runner
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def nominal_federated_configuration_fields():
|
||||
config = UrsulaConfiguration(dev_mode=True, federated_only=True)
|
||||
config_fields = config.static_payload()
|
||||
yield tuple(config_fields.keys())
|
||||
del config
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def mock_allocation_infile(testerchain, token_economics, get_random_checksum_address):
|
||||
accounts = [get_random_checksum_address() for _ in range(10)]
|
||||
# accounts = testerchain.unassigned_accounts
|
||||
allocation_data = list()
|
||||
amount = 2 * token_economics.minimum_allowed_locked
|
||||
min_periods = token_economics.minimum_locked_periods
|
||||
for account in accounts:
|
||||
substake = [{'checksum_address': account, 'amount': amount, 'lock_periods': min_periods + i} for i in range(24)]
|
||||
allocation_data.extend(substake)
|
||||
|
||||
with open(MOCK_ALLOCATION_INFILE, 'w') as file:
|
||||
file.write(json.dumps(allocation_data))
|
||||
|
||||
yield MOCK_ALLOCATION_INFILE
|
||||
if os.path.isfile(MOCK_ALLOCATION_INFILE):
|
||||
os.remove(MOCK_ALLOCATION_INFILE)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def new_local_registry():
|
||||
filename = f'{BASE_TEMP_PREFIX}mock-empty-registry-{datetime.now().strftime(DATETIME_FORMAT)}.json'
|
||||
registry_filepath = os.path.join(BASE_TEMP_DIR, filename)
|
||||
registry = LocalContractRegistry(filepath=registry_filepath)
|
||||
registry.write(InMemoryContractRegistry().read())
|
||||
yield registry
|
||||
if os.path.exists(registry_filepath):
|
||||
os.remove(registry_filepath)
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def custom_filepath():
|
||||
_custom_filepath = MOCK_CUSTOM_INSTALLATION_PATH
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
shutil.rmtree(_custom_filepath, ignore_errors=True)
|
||||
yield _custom_filepath
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
shutil.rmtree(_custom_filepath, ignore_errors=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def custom_filepath_2():
|
||||
_custom_filepath = MOCK_CUSTOM_INSTALLATION_PATH_2
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
shutil.rmtree(_custom_filepath, ignore_errors=True)
|
||||
try:
|
||||
yield _custom_filepath
|
||||
finally:
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
shutil.rmtree(_custom_filepath, ignore_errors=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def worker_configuration_file_location(custom_filepath):
|
||||
_configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH,
|
||||
UrsulaConfiguration.generate_filename())
|
||||
return _configuration_file_location
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def stakeholder_configuration_file_location(custom_filepath):
|
||||
_configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH,
|
||||
StakeHolderConfiguration.generate_filename())
|
||||
return _configuration_file_location
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue