Merge pull request #2017 from KPrasch/truckin

Reorganize tests in accordance with #2005, #1708
pull/2052/head
K Prasch 2020-05-27 14:12:22 -07:00 committed by GitHub
commit 1649da3193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
166 changed files with 1551 additions and 1255 deletions

View File

@ -9,6 +9,20 @@ workflows:
filters:
tags:
only: /.*/
- integration:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- unit:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- contracts:
context: "NuCypher Tests"
filters:
@ -16,63 +30,69 @@ workflows:
only: /.*/
requires:
- pip_install_37
- basics:
- interfaces:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- character:
- unit
- integration
- characters:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- unit
- integration
- agents:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- unit
- integration
- actors:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- unit
- integration
- deployers:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- unit
- integration
- cli:
context: "NuCypher Tests"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- unit
- integration
- tests_ok:
filters:
tags:
only: /.*/
requires:
- basics
- character
- actors
- agents
- interfaces
- characters
- cli
- deployers
- build_dev_docker_images:
filters:
tags:
only: /.*/
requires:
- basics
- finnegans_wake_demo:
filters:
tags:
@ -95,8 +115,6 @@ workflows:
filters:
tags:
only: /.*/
requires:
- tests_ok
- test_build:
filters:
tags:
@ -207,14 +225,14 @@ workflows:
only: /.*/
requires:
- pip_install_37
- basics:
- interfaces:
context: "Nightly"
filters:
tags:
only: /.*/
requires:
- pip_install_37
- character:
- characters:
context: "Nightly"
filters:
tags:
@ -254,8 +272,10 @@ workflows:
tags:
only: /.*/
requires:
- basics
- character
- actors
- agents
- interfaces
- characters
- cli
- deployers
- build_dev_docker_images:
@ -263,7 +283,7 @@ workflows:
tags:
only: /.*/
requires:
- basics
- tests_ok
- finnegans_wake_demo:
filters:
tags:
@ -514,6 +534,28 @@ jobs:
- pipenv_install:
python_version: "3.8"
integration:
<<: *python_37_base
parallelism: 2
steps:
- prepare_environment
- run:
name: Integration Test Suite
command: |
pytest $(circleci tests glob "tests/integration/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
unit:
<<: *python_37_base
parallelism: 1
steps:
- prepare_environment
- run:
name: Unit Test Suite
command: |
pytest $(circleci tests glob "tests/unit/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
agents:
<<: *python_37_base
parallelism: 4
@ -522,7 +564,7 @@ jobs:
- run:
name: Blockchain Agent Tests
command: |
pytest $(circleci tests glob "tests/blockchain/eth/entities/agents/**/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/acceptance/blockchain/agents/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
actors:
@ -533,7 +575,7 @@ jobs:
- run:
name: Blockchain Actor Tests
command: |
pytest $(circleci tests glob "tests/blockchain/eth/entities/actors/**/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/acceptance/blockchain/actors/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
deployers:
@ -544,21 +586,21 @@ jobs:
- run:
name: Contract Deployer Tests
command: |
pytest $(circleci tests glob "tests/blockchain/eth/entities/deployers/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/acceptance/blockchain/deployers/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
contracts:
<<: *python_37_base
parallelism: 5
parallelism: 4
steps:
- prepare_environment
- run:
name: Ethereum Contract Unit Tests
command: |
pytest $(circleci tests glob "tests/blockchain/eth/contracts/**/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/contracts/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
basics:
interfaces:
<<: *python_37_base
parallelism: 1
steps:
@ -566,10 +608,10 @@ jobs:
- run:
name: Tests for Blockhain interfaces, Crypto functions, Node Configuration and Datastore
command: |
pytest $(circleci tests glob "tests/network/**/test_*.py" "tests/config/**/test_*.py" "tests/crypto/**/test_*.py" "tests/datastore/**/test_*.py" "tests/blockchain/eth/interfaces/**/test_*.py" "tests/blockchain/eth/clients/**/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/acceptance/blockchain/interfaces" "tests/acceptance/blockchain/clients")
- capture_test_results
character:
characters:
<<: *python_37_base
parallelism: 4
steps:
@ -577,7 +619,7 @@ jobs:
- run:
name: Character Tests
command: |
pytest $(circleci tests glob "tests/characters/**/test_*.py" "tests/learning/**/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/acceptance/characters/**/test_*.py" "tests/learning/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
cli:
@ -588,7 +630,7 @@ jobs:
- run:
name: Nucypher CLI Tests
command: |
pytest $(circleci tests glob "tests/cli/**/test_*.py" | circleci tests split --split-by=timings)
pytest $(circleci tests glob "tests/acceptance/cli/**/test_*.py" | circleci tests split --split-by=timings)
- capture_test_results
tests_ok:

View File

@ -0,0 +1,74 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from nucypher.blockchain.eth.clients import (EthereumClient, GethClient)
# TODO: Relocate mock objects out of unt tests
from tests.unit.test_web3_clients import (
GethClientTestBlockchain,
SyncedMockWeb3,
SyncingMockWeb3,
SyncingMockWeb3NoPeers
)
def test_synced_geth_client():
class SyncedBlockchainInterface(GethClientTestBlockchain):
Web3 = SyncedMockWeb3
interface = SyncedBlockchainInterface(provider_uri='file:///ipc.geth')
interface.connect()
assert interface.client._has_latest_block()
assert interface.client.sync()
def test_unsynced_geth_client():
GethClient.SYNC_SLEEP_DURATION = .001
class NonSyncedBlockchainInterface(GethClientTestBlockchain):
Web3 = SyncingMockWeb3
interface = NonSyncedBlockchainInterface(provider_uri='file:///ipc.geth')
interface.connect()
assert interface.client._has_latest_block() is False
assert interface.client.syncing
assert len(list(interface.client.sync())) == 8
def test_no_peers_unsynced_geth_client():
GethClient.PEERING_TIMEOUT = .001
class NonSyncedNoPeersBlockchainInterface(GethClientTestBlockchain):
Web3 = SyncingMockWeb3NoPeers
interface = NonSyncedNoPeersBlockchainInterface(provider_uri='file:///ipc.geth')
interface.connect()
assert interface.client._has_latest_block() is False
with pytest.raises(EthereumClient.SyncTimeout):
list(interface.client.sync())

View File

@ -20,71 +20,10 @@ import json
import pytest
from nucypher.blockchain.eth.constants import PREALLOCATION_ESCROW_CONTRACT_NAME
from nucypher.blockchain.eth.interfaces import BaseContractRegistry
from nucypher.blockchain.eth.registry import IndividualAllocationRegistry, LocalContractRegistry
from nucypher.blockchain.eth.registry import IndividualAllocationRegistry
from nucypher.config.constants import TEMPORARY_DOMAIN
def test_contract_registry(tempfile_path):
# ABC
with pytest.raises(TypeError):
BaseContractRegistry(filepath='test')
with pytest.raises(BaseContractRegistry.RegistryError):
bad_registry = LocalContractRegistry(filepath='/fake/file/path/registry.json')
bad_registry.search(contract_address='0xdeadbeef')
# Tests everything is as it should be when initially created
test_registry = LocalContractRegistry(filepath=tempfile_path)
assert test_registry.read() == list()
# Test contract enrollment and dump_chain
test_name = 'TestContract'
test_addr = '0xDEADBEEF'
test_abi = ['fake', 'data']
test_version = "some_version"
test_registry.enroll(contract_name=test_name,
contract_address=test_addr,
contract_abi=test_abi,
contract_version=test_version)
# Search by name...
contract_records = test_registry.search(contract_name=test_name)
assert len(contract_records) == 1, 'More than one record for {}'.format(test_name)
assert len(contract_records[0]) == 4, 'Registry record is the wrong length'
name, version, address, abi = contract_records[0]
assert name == test_name
assert address == test_addr
assert abi == test_abi
assert version == test_version
# ...or by address
contract_record = test_registry.search(contract_address=test_addr)
name, version, address, abi = contract_record
assert name == test_name
assert address == test_addr
assert abi == test_abi
assert version == test_version
# Check that searching for an unknown contract raises
with pytest.raises(BaseContractRegistry.UnknownContract):
test_registry.search(contract_name='this does not exist')
current_dataset = test_registry.read()
# Corrupt the registry with a duplicate address
current_dataset.append([test_name, test_addr, test_abi])
test_registry.write(current_dataset)
# Check that searching for an unknown contract raises
with pytest.raises(BaseContractRegistry.InvalidRegistry):
test_registry.search(contract_address=test_addr)
def test_individual_allocation_registry(get_random_checksum_address,
test_registry,
tempfile_path,

View File

@ -20,6 +20,7 @@ import os
from nucypher.blockchain.eth.deployers import NucypherTokenDeployer
from nucypher.blockchain.eth.sol.compile import SolidityCompiler, SourceDirs
from tests.constants import TEST_CONTRACTS_DIR
from tests.utils.blockchain import TesterBlockchain
@ -37,7 +38,7 @@ def test_nucypher_contract_compiled(testerchain, test_registry):
def test_multi_source_compilation(testerchain):
solidity_compiler = SolidityCompiler(source_dirs=[
(SolidityCompiler.default_contract_dir(), None),
(SolidityCompiler.default_contract_dir(), {TesterBlockchain.TEST_CONTRACTS_DIR})
(SolidityCompiler.default_contract_dir(), {TEST_CONTRACTS_DIR})
])
interfaces = solidity_compiler.compile()

View File

@ -15,110 +15,12 @@
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from decimal import Decimal, InvalidOperation
from web3 import Web3
from nucypher.blockchain.eth.token import NU, Stake
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
def test_NU(token_economics):
# Starting Small
min_allowed_locked = NU(token_economics.minimum_allowed_locked, 'NuNit')
assert token_economics.minimum_allowed_locked == int(min_allowed_locked.to_nunits())
min_NU_locked = int(str(token_economics.minimum_allowed_locked)[0:-18])
expected = NU(min_NU_locked, 'NU')
assert min_allowed_locked == expected
# Starting Big
min_allowed_locked = NU(min_NU_locked, 'NU')
assert token_economics.minimum_allowed_locked == int(min_allowed_locked)
assert token_economics.minimum_allowed_locked == int(min_allowed_locked.to_nunits())
assert str(min_allowed_locked) == '15000 NU'
# Alternate construction
assert NU(1, 'NU') == NU('1.0', 'NU') == NU(1.0, 'NU')
# Arithmetic
# NUs
one_nu = NU(1, 'NU')
zero_nu = NU(0, 'NU')
one_hundred_nu = NU(100, 'NU')
two_hundred_nu = NU(200, 'NU')
three_hundred_nu = NU(300, 'NU')
# Nits
one_nu_wei = NU(1, 'NuNit')
three_nu_wei = NU(3, 'NuNit')
assert three_nu_wei.to_tokens() == Decimal('3E-18')
assert one_nu_wei.to_tokens() == Decimal('1E-18')
# Base Operations
assert one_hundred_nu < two_hundred_nu < three_hundred_nu
assert one_hundred_nu <= two_hundred_nu <= three_hundred_nu
assert three_hundred_nu > two_hundred_nu > one_hundred_nu
assert three_hundred_nu >= two_hundred_nu >= one_hundred_nu
assert (one_hundred_nu + two_hundred_nu) == three_hundred_nu
assert (three_hundred_nu - two_hundred_nu) == one_hundred_nu
difference = one_nu - one_nu_wei
assert not difference == zero_nu
actual = float(difference.to_tokens())
expected = 0.999999999999999999
assert actual == expected
# 3.14 NU is 3_140_000_000_000_000_000 NuNit
pi_nuweis = NU(3.14, 'NU')
assert NU('3.14', 'NU') == pi_nuweis.to_nunits() == NU(3_140_000_000_000_000_000, 'NuNit')
# Mixed type operations
difference = NU('3.14159265', 'NU') - NU(1.1, 'NU')
assert difference == NU('2.04159265', 'NU')
result = difference + one_nu_wei
assert result == NU(2041592650000000001, 'NuNit')
# Similar to stake read + metadata operations in Staker
collection = [one_hundred_nu, two_hundred_nu, three_hundred_nu]
assert sum(collection) == NU('600', 'NU') == NU(600, 'NU') == NU(600.0, 'NU') == NU(600e+18, 'NuNit')
#
# Fractional Inputs
#
# A decimal amount of NuNit (i.e., a fraction of a NuNit)
pi_nuweis = NU('3.14', 'NuNit')
assert pi_nuweis == three_nu_wei # Floor
# A decimal amount of NU, which amounts to NuNit with decimals
pi_nus = NU('3.14159265358979323846', 'NU')
assert pi_nus == NU(3141592653589793238, 'NuNit') # Floor
# Positive Infinity
with pytest.raises(NU.InvalidAmount):
_inf = NU(float('infinity'), 'NU')
# Negative Infinity
with pytest.raises(NU.InvalidAmount):
_neg_inf = NU(float('-infinity'), 'NU')
# Not a Number
with pytest.raises(InvalidOperation):
_nan = NU(float('NaN'), 'NU')
# Rounding NUs
assert round(pi_nus, 2) == NU("3.14", "NU")
assert round(pi_nus, 1) == NU("3.1", "NU")
assert round(pi_nus, 0) == round(pi_nus) == NU("3", "NU")
def test_stake(testerchain, token_economics, agency):
token_agent, staking_agent, _policy_agent = agency

View File

@ -0,0 +1,30 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from nucypher.blockchain.economics import EconomicsFactory
@pytest.mark.usefixtures('agency')
def test_retrieving_from_blockchain(token_economics, test_registry):
economics = EconomicsFactory.get_economics(registry=test_registry)
assert economics.staking_deployment_parameters == token_economics.staking_deployment_parameters
assert economics.slashing_deployment_parameters == token_economics.slashing_deployment_parameters
assert economics.worklock_deployment_parameters == token_economics.worklock_deployment_parameters

View File

@ -0,0 +1,84 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import datetime
import maya
import pytest
from umbral.kfrags import KFrag
from nucypher.crypto.api import keccak_digest
from nucypher.policy.collections import PolicyCredential
@pytest.mark.usefixtures('blockchain_ursulas')
def test_decentralized_grant(blockchain_alice, blockchain_bob, agency):
# Setup the policy details
n = 3
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
label = b"this_is_the_path_to_which_access_is_being_granted"
# Create the Policy, Granting access to Bob
policy = blockchain_alice.grant(bob=blockchain_bob,
label=label,
m=2,
n=n,
rate=int(1e18), # one ether
expiration=policy_end_datetime)
# Check the policy ID
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
assert policy_id == policy.id
# The number of accepted arrangements at least the number of Ursulas we're using (n)
assert len(policy._accepted_arrangements) >= n
# The number of actually enacted arrangements is exactly equal to n.
assert len(policy._enacted_arrangements) == n
# Let's look at the enacted arrangements.
for kfrag in policy.kfrags:
arrangement = policy._enacted_arrangements[kfrag]
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
retrieved_policy = arrangement.ursula.datastore.get_policy_arrangement(arrangement.id.hex().encode())
retrieved_kfrag = KFrag.from_bytes(retrieved_policy.kfrag)
assert kfrag == retrieved_kfrag
# Test PolicyCredential w/o TreasureMap
credential = policy.credential(with_treasure_map=False)
assert credential.alice_verifying_key == policy.alice.stamp
assert credential.label == policy.label
assert credential.expiration == policy.expiration
assert credential.policy_pubkey == policy.public_key
assert credential.treasure_map is None
cred_json = credential.to_json()
deserialized_cred = PolicyCredential.from_json(cred_json)
assert credential == deserialized_cred
# Test PolicyCredential w/ TreasureMap
credential = policy.credential()
assert credential.alice_verifying_key == policy.alice.stamp
assert credential.label == policy.label
assert credential.expiration == policy.expiration
assert credential.policy_pubkey == policy.public_key
assert credential.treasure_map == policy.treasure_map
cred_json = credential.to_json()
deserialized_cred = PolicyCredential.from_json(cred_json)
assert credential == deserialized_cred

View File

@ -0,0 +1,71 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from eth_account._utils.transactions import Transaction
from eth_utils import to_checksum_address
from nucypher.blockchain.eth.signers import Web3Signer
from nucypher.characters.lawful import Character
from nucypher.crypto.api import verify_eip_191
from nucypher.crypto.powers import (TransactingPower)
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
def test_character_transacting_power_signing(testerchain, agency, test_registry):
# Pretend to be a character.
eth_address = testerchain.etherbase_account
signer = Character(is_me=True, registry=test_registry, checksum_address=eth_address)
# Manually consume the power up
transacting_power = TransactingPower(password=INSECURE_DEVELOPMENT_PASSWORD,
signer=Web3Signer(testerchain.client),
account=eth_address)
signer._crypto_power.consume_power_up(transacting_power)
# Retrieve the power up
power = signer._crypto_power.power_ups(TransactingPower)
assert power == transacting_power
assert testerchain.transacting_power == power
assert power.is_active is True
assert power.is_unlocked is True
assert testerchain.transacting_power.is_unlocked is True
# Sign Message
data_to_sign = b'Premium Select Luxury Pencil Holder'
signature = power.sign_message(message=data_to_sign)
is_verified = verify_eip_191(address=eth_address, message=data_to_sign, signature=signature)
assert is_verified is True
# Sign Transaction
transaction_dict = {'nonce': testerchain.client.w3.eth.getTransactionCount(eth_address),
'gasPrice': testerchain.client.w3.eth.gasPrice,
'gas': 100000,
'from': eth_address,
'to': testerchain.unassigned_accounts[1],
'value': 1,
'data': b''}
signed_transaction = power.sign_transaction(transaction_dict=transaction_dict)
# Demonstrate that the transaction is valid RLP encoded.
restored_transaction = Transaction.from_bytes(serialized_bytes=signed_transaction)
restored_dict = restored_transaction.as_dict()
assert to_checksum_address(restored_dict['to']) == transaction_dict['to']

View File

@ -25,30 +25,8 @@ from nucypher.crypto.api import verify_eip_191
from nucypher.crypto.powers import SigningPower
from nucypher.policy.policies import Policy
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
from tests.utils.middleware import MockRestMiddleware, NodeIsDownMiddleware
from tests.utils.ursula import make_decentralized_ursulas, make_federated_ursulas
def test_new_federated_ursula_announces_herself(ursula_federated_test_config):
ursula_in_a_house, ursula_with_a_mouse = make_federated_ursulas(ursula_config=ursula_federated_test_config,
quantity=2,
know_each_other=False,
network_middleware=MockRestMiddleware())
# Neither Ursula knows about the other.
assert ursula_in_a_house.known_nodes == ursula_with_a_mouse.known_nodes
ursula_in_a_house.remember_node(ursula_with_a_mouse)
# OK, now, ursula_in_a_house knows about ursula_with_a_mouse, but not vice-versa.
assert ursula_with_a_mouse in ursula_in_a_house.known_nodes
assert ursula_in_a_house not in ursula_with_a_mouse.known_nodes
# But as ursula_in_a_house learns, she'll announce herself to ursula_with_a_mouse.
ursula_in_a_house.learn_from_teacher_node()
assert ursula_with_a_mouse in ursula_in_a_house.known_nodes
assert ursula_in_a_house in ursula_with_a_mouse.known_nodes
from tests.utils.middleware import NodeIsDownMiddleware
from tests.utils.ursula import make_decentralized_ursulas
def test_stakers_bond_to_ursulas(testerchain, test_registry, stakers, ursula_decentralized_test_config):

View File

@ -15,21 +15,14 @@
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from collections import namedtuple
import os
import pytest
from bytestring_splitter import VariableLengthBytestring
from constant_sorrow.constants import NOT_SIGNED
from eth_utils.address import to_checksum_address
from twisted.logger import LogLevel, globalLogPublisher
from nucypher.characters.base import Character
from nucypher.crypto.powers import TransactingPower
from nucypher.network.nicknames import nickname_from_seed
from nucypher.network.nodes import FleetStateTracker, Learner
from tests.utils.middleware import MockRestMiddleware
from tests.utils.ursula import make_federated_ursulas, make_ursula_for_staker
from tests.utils.ursula import make_ursula_for_staker
def test_blockchain_ursula_stamp_verification_tolerance(blockchain_ursulas, mocker):
@ -176,98 +169,3 @@ def test_invalid_workers_tolerance(testerchain,
assert worker not in lonely_blockchain_learner.known_nodes
# TODO: Write a similar test but for detached worker (#1075)
def test_emit_warning_upon_new_version(ursula_federated_test_config, caplog):
nodes = make_federated_ursulas(ursula_config=ursula_federated_test_config,
quantity=3,
know_each_other=False)
teacher, learner, new_node = nodes
learner.remember_node(teacher)
teacher.remember_node(learner)
teacher.remember_node(new_node)
new_node.TEACHER_VERSION = learner.LEARNER_VERSION + 1
warnings = []
def warning_trapper(event):
if event['log_level'] == LogLevel.warn:
warnings.append(event)
globalLogPublisher.addObserver(warning_trapper)
learner.learn_from_teacher_node()
assert len(warnings) == 1
assert warnings[0]['log_format'] == learner.unknown_version_message.format(new_node,
new_node.TEACHER_VERSION,
learner.LEARNER_VERSION)
# Now let's go a little further: make the version totally unrecognizable.
# First, there's enough garbage to at least scrape a potential checksum address
fleet_snapshot = os.urandom(32 + 4)
random_bytes = os.urandom(50) # lots of garbage in here
future_version = learner.LEARNER_VERSION + 42
version_bytes = future_version.to_bytes(2, byteorder="big")
crazy_bytes = fleet_snapshot + VariableLengthBytestring(version_bytes + random_bytes)
signed_crazy_bytes = bytes(teacher.stamp(crazy_bytes))
Response = namedtuple("MockResponse", ("content", "status_code"))
response = Response(content=signed_crazy_bytes + crazy_bytes, status_code=200)
learner._current_teacher_node = teacher
learner.network_middleware.get_nodes_via_rest = lambda *args, **kwargs: response
learner.learn_from_teacher_node()
# If you really try, you can read a node representation from the garbage
accidental_checksum = to_checksum_address(random_bytes[:20])
accidental_nickname = nickname_from_seed(accidental_checksum)[0]
accidental_node_repr = Character._display_name_template.format("Ursula", accidental_nickname, accidental_checksum)
assert len(warnings) == 2
assert warnings[1]['log_format'] == learner.unknown_version_message.format(accidental_node_repr,
future_version,
learner.LEARNER_VERSION)
# This time, however, there's not enough garbage to assume there's a checksum address...
random_bytes = os.urandom(2)
crazy_bytes = fleet_snapshot + VariableLengthBytestring(version_bytes + random_bytes)
signed_crazy_bytes = bytes(teacher.stamp(crazy_bytes))
response = Response(content=signed_crazy_bytes + crazy_bytes, status_code=200)
learner._current_teacher_node = teacher
learner.learn_from_teacher_node()
assert len(warnings) == 3
# ...so this time we get a "really unknown version message"
assert warnings[2]['log_format'] == learner.really_unknown_version_message.format(future_version,
learner.LEARNER_VERSION)
globalLogPublisher.removeObserver(warning_trapper)
def test_node_posts_future_version(federated_ursulas):
ursula = list(federated_ursulas)[0]
middleware = MockRestMiddleware()
warnings = []
def warning_trapper(event):
if event['log_level'] == LogLevel.warn:
warnings.append(event)
globalLogPublisher.addObserver(warning_trapper)
crazy_node = b"invalid-node"
middleware.get_nodes_via_rest(node=ursula,
announce_nodes=(crazy_node,))
assert len(warnings) == 1
future_node = list(federated_ursulas)[1]
future_node.TEACHER_VERSION = future_node.TEACHER_VERSION + 10
future_node_bytes = bytes(future_node)
middleware.get_nodes_via_rest(node=ursula,
announce_nodes=(future_node_bytes,))
assert len(warnings) == 2

View File

@ -93,6 +93,7 @@ def test_availability_tracker_success(blockchain_ursulas):
ursula._availability_tracker = None
@pytest.mark.skip('See PR #1863')
@pt.inlineCallbacks
def test_availability_tracker_integration(blockchain_ursulas, monkeypatch):

View File

@ -57,80 +57,6 @@ def test_blockchain_alice_finds_ursula_via_rest(blockchain_alice, blockchain_urs
assert ursula in blockchain_alice.known_nodes
def test_alice_creates_policy_with_correct_hrac(idle_federated_policy):
"""
Alice creates a Policy. It has the proper HRAC, unique per her, Bob, and the label
"""
alice = idle_federated_policy.alice
bob = idle_federated_policy.bob
assert idle_federated_policy.hrac() == keccak_digest(bytes(alice.stamp)
+ bytes(bob.stamp)
+ idle_federated_policy.label)
def test_alice_sets_treasure_map(enacted_federated_policy, federated_ursulas):
"""
Having enacted all the policies of a PolicyGroup, Alice creates a TreasureMap and ...... TODO
"""
enacted_federated_policy.publish_treasure_map(network_middleware=MockRestMiddleware())
treasure_map_index = bytes.fromhex(enacted_federated_policy.treasure_map.public_id())
treasure_map_as_set_on_network = list(federated_ursulas)[0].treasure_maps[treasure_map_index]
assert treasure_map_as_set_on_network == enacted_federated_policy.treasure_map
def test_treasure_map_stored_by_ursula_is_the_correct_one_for_bob(federated_alice, federated_bob, federated_ursulas,
enacted_federated_policy):
"""
The TreasureMap given by Alice to Ursula is the correct one for Bob; he can decrypt and read it.
"""
treasure_map_index = bytes.fromhex(enacted_federated_policy.treasure_map.public_id())
treasure_map_as_set_on_network = list(federated_ursulas)[0].treasure_maps[treasure_map_index]
hrac_by_bob = federated_bob.construct_policy_hrac(federated_alice.stamp, enacted_federated_policy.label)
assert enacted_federated_policy.hrac() == hrac_by_bob
hrac, map_id_by_bob = federated_bob.construct_hrac_and_map_id(federated_alice.stamp, enacted_federated_policy.label)
assert map_id_by_bob == treasure_map_as_set_on_network.public_id()
def test_bob_can_retreive_the_treasure_map_and_decrypt_it(enacted_federated_policy, federated_ursulas):
"""
Above, we showed that the TreasureMap saved on the network is the correct one for Bob. Here, we show
that Bob can retrieve it with only the information about which he is privy pursuant to the PolicyGroup.
"""
bob = enacted_federated_policy.bob
# Of course, in the real world, Bob has sufficient information to reconstitute a PolicyGroup, gleaned, we presume,
# through a side-channel with Alice.
# If Bob doesn't know about any Ursulas, he can't find the TreasureMap via the REST swarm:
with pytest.raises(bob.NotEnoughTeachers):
treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp,
enacted_federated_policy.label)
# Bob finds out about one Ursula (in the real world, a seed node)
bob.remember_node(list(federated_ursulas)[0])
# ...and then learns about the rest of the network.
bob.learn_from_teacher_node(eager=True)
# Now he'll have better success finding that map.
treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp,
enacted_federated_policy.label)
assert enacted_federated_policy.treasure_map == treasure_map_from_wire
def test_treasure_map_is_legit(enacted_federated_policy):
"""
Sure, the TreasureMap can get to Bob, but we also need to know that each Ursula in the TreasureMap is on the network.
"""
for ursula_address, _node_id in enacted_federated_policy.treasure_map:
assert ursula_address in enacted_federated_policy.bob.known_nodes.addresses()
@pytest.mark.skip("See Issue #1075") # TODO: Issue #1075
def test_vladimir_illegal_interface_key_does_not_propagate(blockchain_ursulas):
"""
@ -203,28 +129,3 @@ def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_ali
idle_blockchain_policy.consider_arrangement(network_middleware=blockchain_alice.network_middleware,
arrangement=FakeArrangement(),
ursula=vladimir)
def test_alice_does_not_update_with_old_ursula_info(federated_alice, federated_ursulas):
ursula = list(federated_ursulas)[0]
old_metadata = bytes(ursula)
# Alice has remembered Ursula.
assert federated_alice.known_nodes[ursula.checksum_address] == ursula
# But now, Ursula wants to sign and date her interface info again. This causes a new timestamp.
ursula._sign_and_date_interface_info()
# Indeed, her metadata is not the same now.
assert bytes(ursula) != old_metadata
old_ursula = Ursula.from_bytes(old_metadata)
# Once Alice learns about Ursula's updated info...
federated_alice.remember_node(ursula)
# ...she can't learn about old ursula anymore.
federated_alice.remember_node(old_ursula)
new_metadata = bytes(federated_alice.known_nodes[ursula.checksum_address])
assert new_metadata != old_metadata

View File

@ -1,289 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import datetime
import maya
import os
import pytest
from umbral.kfrags import KFrag
from nucypher.characters.lawful import Bob, Enrico
from nucypher.config.characters import AliceConfiguration
from nucypher.crypto.api import keccak_digest
from nucypher.crypto.powers import DecryptingPower, SigningPower
from nucypher.policy.collections import PolicyCredential, Revocation
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
from tests.utils.middleware import MockRestMiddleware
@pytest.mark.usefixtures('blockchain_ursulas')
def test_decentralized_grant(blockchain_alice, blockchain_bob, agency):
# Setup the policy details
n = 3
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
label = b"this_is_the_path_to_which_access_is_being_granted"
# Create the Policy, Granting access to Bob
policy = blockchain_alice.grant(bob=blockchain_bob,
label=label,
m=2,
n=n,
rate=int(1e18), # one ether
expiration=policy_end_datetime)
# Check the policy ID
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
assert policy_id == policy.id
# The number of accepted arrangements at least the number of Ursulas we're using (n)
assert len(policy._accepted_arrangements) >= n
# The number of actually enacted arrangements is exactly equal to n.
assert len(policy._enacted_arrangements) == n
# Let's look at the enacted arrangements.
for kfrag in policy.kfrags:
arrangement = policy._enacted_arrangements[kfrag]
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
retrieved_policy = arrangement.ursula.datastore.get_policy_arrangement(arrangement.id.hex().encode())
retrieved_kfrag = KFrag.from_bytes(retrieved_policy.kfrag)
assert kfrag == retrieved_kfrag
# Test PolicyCredential w/o TreasureMap
credential = policy.credential(with_treasure_map=False)
assert credential.alice_verifying_key == policy.alice.stamp
assert credential.label == policy.label
assert credential.expiration == policy.expiration
assert credential.policy_pubkey == policy.public_key
assert credential.treasure_map is None
cred_json = credential.to_json()
deserialized_cred = PolicyCredential.from_json(cred_json)
assert credential == deserialized_cred
# Test PolicyCredential w/ TreasureMap
credential = policy.credential()
assert credential.alice_verifying_key == policy.alice.stamp
assert credential.label == policy.label
assert credential.expiration == policy.expiration
assert credential.policy_pubkey == policy.public_key
assert credential.treasure_map == policy.treasure_map
cred_json = credential.to_json()
deserialized_cred = PolicyCredential.from_json(cred_json)
assert credential == deserialized_cred
@pytest.mark.usefixtures('federated_ursulas')
def test_federated_grant(federated_alice, federated_bob):
# Setup the policy details
m, n = 2, 3
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
label = b"this_is_the_path_to_which_access_is_being_granted"
# Create the Policy, granting access to Bob
policy = federated_alice.grant(federated_bob, label, m=m, n=n, expiration=policy_end_datetime)
# Check the policy ID
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
assert policy_id == policy.id
# Check Alice's active policies
assert policy_id in federated_alice.active_policies
assert federated_alice.active_policies[policy_id] == policy
# The number of accepted arrangements at least the number of Ursulas we're using (n)
assert len(policy._accepted_arrangements) >= n
# The number of actually enacted arrangements is exactly equal to n.
assert len(policy._enacted_arrangements) == n
# Let's look at the enacted arrangements.
for kfrag in policy.kfrags:
arrangement = policy._enacted_arrangements[kfrag]
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
retrieved_policy = arrangement.ursula.datastore.get_policy_arrangement(arrangement.id.hex().encode())
retrieved_kfrag = KFrag.from_bytes(retrieved_policy.kfrag)
assert kfrag == retrieved_kfrag
def test_federated_alice_can_decrypt(federated_alice, federated_bob):
"""
Test that alice can decrypt data encrypted by an enrico
for her own derived policy pubkey.
"""
# Setup the policy details
m, n = 2, 3
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
label = b"this_is_the_path_to_which_access_is_being_granted"
policy = federated_alice.create_policy(
bob=federated_bob,
label=label,
m=m,
n=n,
expiration=policy_end_datetime,
)
enrico = Enrico.from_alice(
federated_alice,
policy.label,
)
plaintext = b"this is the first thing i'm encrypting ever."
# use the enrico to encrypt the message
message_kit, signature = enrico.encrypt_message(plaintext)
# decrypt the data
decrypted_data = federated_alice.verify_from(
enrico,
message_kit,
signature=signature,
decrypt=True,
label=policy.label
)
assert plaintext == decrypted_data
@pytest.mark.usefixtures('federated_ursulas')
def test_revocation(federated_alice, federated_bob):
m, n = 2, 3
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
label = b"revocation test"
policy = federated_alice.grant(federated_bob, label, m=m, n=n, expiration=policy_end_datetime)
# Test that all arrangements are included in the RevocationKit
for node_id, arrangement_id in policy.treasure_map:
assert policy.revocation_kit[node_id].arrangement_id == arrangement_id
# Test revocation kit's signatures
for revocation in policy.revocation_kit:
assert revocation.verify_signature(federated_alice.stamp.as_umbral_pubkey())
# Test Revocation deserialization
revocation = policy.revocation_kit[node_id]
revocation_bytes = bytes(revocation)
deserialized_revocation = Revocation.from_bytes(revocation_bytes)
assert deserialized_revocation == revocation
# Attempt to revoke the new policy
failed_revocations = federated_alice.revoke(policy)
assert len(failed_revocations) == 0
# Try to revoke the already revoked policy
already_revoked = federated_alice.revoke(policy)
assert len(already_revoked) == 3
def test_alices_powers_are_persistent(federated_ursulas, tmpdir):
# Create a non-learning AliceConfiguration
alice_config = AliceConfiguration(
config_root=os.path.join(tmpdir, 'nucypher-custom-alice-config'),
network_middleware=MockRestMiddleware(),
known_nodes=federated_ursulas,
start_learning_now=False,
federated_only=True,
save_metadata=False,
reload_metadata=False
)
# Generate keys and write them the disk
alice_config.initialize(password=INSECURE_DEVELOPMENT_PASSWORD)
# Unlock Alice's keyring
alice_config.keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
# Produce an Alice
alice = alice_config() # or alice_config.produce()
# Save Alice's node configuration file to disk for later use
alice_config_file = alice_config.to_configuration_file()
# Let's save Alice's public keys too to check they are correctly restored later
alices_verifying_key = alice.public_keys(SigningPower)
alices_receiving_key = alice.public_keys(DecryptingPower)
# Next, let's fix a label for all the policies we will create later.
label = b"this_is_the_path_to_which_access_is_being_granted"
# Even before creating the policies, we can know what will be its public key.
# This can be used by Enrico (i.e., a Data Source) to encrypt messages
# before Alice grants access to Bobs.
policy_pubkey = alice.get_policy_encrypting_key_from_label(label)
# Now, let's create a policy for some Bob.
m, n = 3, 4
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
bob = Bob(federated_only=True,
start_learning_now=False,
network_middleware=MockRestMiddleware())
bob_policy = alice.grant(bob, label, m=m, n=n, expiration=policy_end_datetime)
assert policy_pubkey == bob_policy.public_key
# ... and Alice and her configuration disappear.
del alice
del alice_config
###################################
# Some time passes. #
# ... #
# (jmyles plays the Song of Time) #
# ... #
# Alice appears again. #
###################################
# A new Alice is restored from the configuration file
new_alice_config = AliceConfiguration.from_configuration_file(
filepath=alice_config_file,
network_middleware=MockRestMiddleware(),
known_nodes=federated_ursulas,
start_learning_now=False,
)
# Alice unlocks her restored keyring from disk
new_alice_config.attach_keyring()
new_alice_config.keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
new_alice = new_alice_config()
# First, we check that her public keys are correctly restored
assert alices_verifying_key == new_alice.public_keys(SigningPower)
assert alices_receiving_key == new_alice.public_keys(DecryptingPower)
# Bob's eldest brother, Roberto, appears too
roberto = Bob(federated_only=True,
start_learning_now=False,
network_middleware=MockRestMiddleware())
# Alice creates a new policy for Roberto. Note how all the parameters
# except for the label (i.e., recipient, m, n, policy_end) are different
# from previous policy
m, n = 2, 5
policy_end_datetime = maya.now() + datetime.timedelta(days=3)
roberto_policy = new_alice.grant(roberto, label, m=m, n=n, expiration=policy_end_datetime)
# Both policies must share the same public key (i.e., the policy public key)
assert policy_pubkey == roberto_policy.public_key

View File

@ -1,284 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from constant_sorrow import constants
from cryptography.exceptions import InvalidSignature
from eth_account._utils.transactions import Transaction
from eth_utils import to_checksum_address
from nucypher.blockchain.eth.signers import Web3Signer
from nucypher.characters.lawful import Alice, Bob, Character, Enrico
from nucypher.crypto import api
from nucypher.crypto.api import verify_eip_191
from nucypher.crypto.powers import (CryptoPower, NoSigningPower, SigningPower, TransactingPower)
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
"""
Chapter 1: SIGNING
"""
def test_actor_without_signing_power_cannot_sign():
"""
We can create a Character with no real CryptoPower to speak of.
This Character can't even sign a message.
"""
cannot_sign = CryptoPower(power_ups=[])
non_signer = Character(crypto_power=cannot_sign,
start_learning_now=False,
federated_only=True)
# The non-signer's stamp doesn't work for signing...
with pytest.raises(NoSigningPower):
non_signer.stamp("something")
# ...or as a way to cast the (non-existent) public key to bytes.
with pytest.raises(NoSigningPower):
bytes(non_signer.stamp)
def test_actor_with_signing_power_can_sign():
"""
However, simply giving that character a PowerUp bestows the power to sign.
Instead of having a Character verify the signature, we'll use the lower level API.
"""
message = b"Llamas."
signer = Character(crypto_power_ups=[SigningPower], is_me=True,
start_learning_now=False, federated_only=True)
stamp_of_the_signer = signer.stamp
# We can use the signer's stamp to sign a message (since the signer is_me)...
signature = stamp_of_the_signer(message)
# ...or to get the signer's public key for verification purposes.
# (note: we use the private _der_encoded_bytes here to test directly against the API, instead of Character)
verification = api.verify_ecdsa(message, signature._der_encoded_bytes(),
stamp_of_the_signer.as_umbral_pubkey())
assert verification is True
def test_anybody_can_verify():
"""
In the last example, we used the lower-level Crypto API to verify the signature.
Here, we show that anybody can do it without needing to directly access Crypto.
"""
# Alice can sign by default, by dint of her _default_crypto_powerups.
alice = Alice(federated_only=True, start_learning_now=False)
# So, our story is fairly simple: an everyman meets Alice.
somebody = Character(start_learning_now=False, federated_only=True)
# Alice signs a message.
message = b"A message for all my friends who can only verify and not sign."
signature = alice.stamp(message)
# Our everyman can verify it.
cleartext = somebody.verify_from(alice, message, signature, decrypt=False)
assert cleartext is constants.NO_DECRYPTION_PERFORMED
# Of course, verification fails with any fake message
with pytest.raises(InvalidSignature):
fake = b"McLovin 892 Momona St. Honolulu, HI 96820"
_ = somebody.verify_from(alice, fake, signature, decrypt=False)
# Signature verification also works when Alice is not living with our
# everyman in the same process, and he only knows her by her public key
alice_pubkey_bytes = bytes(alice.stamp)
hearsay_alice = Character.from_public_keys({SigningPower: alice_pubkey_bytes})
cleartext = somebody.verify_from(hearsay_alice, message, signature, decrypt=False)
assert cleartext is constants.NO_DECRYPTION_PERFORMED
hearsay_alice = Character.from_public_keys(verifying_key=alice_pubkey_bytes)
cleartext = somebody.verify_from(hearsay_alice, message, signature, decrypt=False)
assert cleartext is constants.NO_DECRYPTION_PERFORMED
def test_character_transacting_power_signing(testerchain, agency, test_registry):
# Pretend to be a character.
eth_address = testerchain.etherbase_account
signer = Character(is_me=True, registry=test_registry, checksum_address=eth_address)
# Manually consume the power up
transacting_power = TransactingPower(password=INSECURE_DEVELOPMENT_PASSWORD,
signer=Web3Signer(testerchain.client),
account=eth_address)
signer._crypto_power.consume_power_up(transacting_power)
# Retrieve the power up
power = signer._crypto_power.power_ups(TransactingPower)
assert power == transacting_power
assert testerchain.transacting_power == power
assert power.is_active is True
assert power.is_unlocked is True
assert testerchain.transacting_power.is_unlocked is True
# Sign Message
data_to_sign = b'Premium Select Luxury Pencil Holder'
signature = power.sign_message(message=data_to_sign)
is_verified = verify_eip_191(address=eth_address, message=data_to_sign, signature=signature)
assert is_verified is True
# Sign Transaction
transaction_dict = {'nonce': testerchain.client.w3.eth.getTransactionCount(eth_address),
'gasPrice': testerchain.client.w3.eth.gasPrice,
'gas': 100000,
'from': eth_address,
'to': testerchain.unassigned_accounts[1],
'value': 1,
'data': b''}
signed_transaction = power.sign_transaction(transaction_dict=transaction_dict)
# Demonstrate that the transaction is valid RLP encoded.
restored_transaction = Transaction.from_bytes(serialized_bytes=signed_transaction)
restored_dict = restored_transaction.as_dict()
assert to_checksum_address(restored_dict['to']) == transaction_dict['to']
"""
Chapter 2: ENCRYPTION
"""
def test_anybody_can_encrypt():
"""
Similar to anybody_can_verify() above; we show that anybody can encrypt.
"""
someone = Character(start_learning_now=False, federated_only=True)
bob = Bob(is_me=False, federated_only=True)
cleartext = b"This is Officer Rod Farva. Come in, Ursula! Come in Ursula!"
ciphertext, signature = someone.encrypt_for(bob, cleartext, sign=False)
assert signature == constants.NOT_SIGNED
assert ciphertext is not None
def test_node_deployer(federated_ursulas):
for ursula in federated_ursulas:
deployer = ursula.get_deployer()
assert deployer.options['https_port'] == ursula.rest_information()[0].port
assert deployer.application == ursula.rest_app
"""
What follows are various combinations of signing and encrypting, to match
real-world scenarios.
"""
def test_sign_cleartext_and_encrypt(federated_alice, federated_bob):
"""
Exhibit One: federated_alice signs the cleartext and encrypts her signature inside
the ciphertext.
"""
message = b"Have you accepted my answer on StackOverflow yet?"
message_kit, _signature = federated_alice.encrypt_for(federated_bob, message,
sign_plaintext=True)
# Notice that our function still returns the signature here, in case federated_alice
# wants to do something else with it, such as post it publicly for later
# public verifiability.
# However, we can expressly refrain from passing the Signature, and the
# verification still works:
cleartext = federated_bob.verify_from(federated_alice, message_kit, signature=None,
decrypt=True)
assert cleartext == message
def test_encrypt_and_sign_the_ciphertext(federated_alice, federated_bob):
"""
Now, federated_alice encrypts first and then signs the ciphertext, providing a
Signature that is completely separate from the message.
This is useful in a scenario in which federated_bob needs to prove authenticity
publicly without disclosing contents.
"""
message = b"We have a reaaall problem."
message_kit, signature = federated_alice.encrypt_for(federated_bob, message,
sign_plaintext=False)
cleartext = federated_bob.verify_from(federated_alice, message_kit, signature, decrypt=True)
assert cleartext == message
def test_encrypt_and_sign_including_signature_in_both_places(federated_alice, federated_bob):
"""
Same as above, but showing that we can include the signature in both
the plaintext (to be found upon decryption) and also passed into
verify_from() (eg, gleaned over a side-channel).
"""
message = b"We have a reaaall problem."
message_kit, signature = federated_alice.encrypt_for(federated_bob, message,
sign_plaintext=True)
cleartext = federated_bob.verify_from(federated_alice, message_kit, signature,
decrypt=True)
assert cleartext == message
def test_encrypt_but_do_not_sign(federated_alice, federated_bob):
"""
Finally, federated_alice encrypts but declines to sign.
This is useful in a scenario in which federated_alice wishes to plausibly disavow
having created this content.
"""
# TODO: How do we accurately demonstrate this test safely, if at all?
message = b"If Bonnie comes home and finds an unencrypted private key in her keystore, I'm gonna get divorced."
# Alice might also want to encrypt a message but *not* sign it, in order
# to refrain from creating evidence that can prove she was the
# original sender.
message_kit, not_signature = federated_alice.encrypt_for(federated_bob, message, sign=False)
# The message is not signed...
assert not_signature == constants.NOT_SIGNED
# ...and thus, the message is not verified.
with pytest.raises(InvalidSignature):
federated_bob.verify_from(federated_alice, message_kit, decrypt=True)
def test_alice_can_decrypt(federated_alice):
label = b"boring test label"
policy_pubkey = federated_alice.get_policy_encrypting_key_from_label(label)
enrico = Enrico(policy_encrypting_key=policy_pubkey)
message = b"boring test message"
message_kit, signature = enrico.encrypt_message(message=message)
# Interesting thing: if Alice wants to decrypt, she needs to provide the label directly.
cleartext = federated_alice.verify_from(stranger=enrico,
message_kit=message_kit,
signature=signature,
decrypt=True,
label=label)
assert cleartext == message

View File

@ -1,132 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import contextlib
import json
from io import StringIO
import os
import pytest
import shutil
from click.testing import CliRunner
from datetime import datetime
from nucypher.blockchain.eth.registry import InMemoryContractRegistry, LocalContractRegistry
from nucypher.characters.control.emitters import StdoutEmitter
from nucypher.config.characters import StakeHolderConfiguration, UrsulaConfiguration
from tests.constants import (
BASE_TEMP_DIR,
BASE_TEMP_PREFIX,
DATETIME_FORMAT,
MOCK_ALLOCATION_INFILE,
MOCK_CUSTOM_INSTALLATION_PATH,
MOCK_CUSTOM_INSTALLATION_PATH_2
)
@pytest.fixture(scope='function', autouse=True)
def stdout_trap(mocker):
trap = StringIO()
mocker.patch('sys.stdout', new=trap)
return trap
@pytest.fixture(scope='function')
def test_emitter(mocker, stdout_trap):
mocker.patch('sys.stdout', new=stdout_trap)
return StdoutEmitter()
@pytest.fixture(scope='module')
def click_runner():
runner = CliRunner()
yield runner
@pytest.fixture(scope='session')
def nominal_federated_configuration_fields():
config = UrsulaConfiguration(dev_mode=True, federated_only=True)
config_fields = config.static_payload()
yield tuple(config_fields.keys())
del config
@pytest.fixture(scope='module')
def mock_allocation_infile(testerchain, token_economics, get_random_checksum_address):
accounts = [get_random_checksum_address() for _ in range(10)]
# accounts = testerchain.unassigned_accounts
allocation_data = list()
amount = 2 * token_economics.minimum_allowed_locked
min_periods = token_economics.minimum_locked_periods
for account in accounts:
substake = [{'checksum_address': account, 'amount': amount, 'lock_periods': min_periods + i} for i in range(24)]
allocation_data.extend(substake)
with open(MOCK_ALLOCATION_INFILE, 'w') as file:
file.write(json.dumps(allocation_data))
yield MOCK_ALLOCATION_INFILE
if os.path.isfile(MOCK_ALLOCATION_INFILE):
os.remove(MOCK_ALLOCATION_INFILE)
@pytest.fixture(scope='function')
def new_local_registry():
filename = f'{BASE_TEMP_PREFIX}mock-empty-registry-{datetime.now().strftime(DATETIME_FORMAT)}.json'
registry_filepath = os.path.join(BASE_TEMP_DIR, filename)
registry = LocalContractRegistry(filepath=registry_filepath)
registry.write(InMemoryContractRegistry().read())
yield registry
if os.path.exists(registry_filepath):
os.remove(registry_filepath)
@pytest.fixture(scope='module')
def custom_filepath():
_custom_filepath = MOCK_CUSTOM_INSTALLATION_PATH
with contextlib.suppress(FileNotFoundError):
shutil.rmtree(_custom_filepath, ignore_errors=True)
yield _custom_filepath
with contextlib.suppress(FileNotFoundError):
shutil.rmtree(_custom_filepath, ignore_errors=True)
@pytest.fixture(scope='module')
def custom_filepath_2():
_custom_filepath = MOCK_CUSTOM_INSTALLATION_PATH_2
with contextlib.suppress(FileNotFoundError):
shutil.rmtree(_custom_filepath, ignore_errors=True)
try:
yield _custom_filepath
finally:
with contextlib.suppress(FileNotFoundError):
shutil.rmtree(_custom_filepath, ignore_errors=True)
@pytest.fixture(scope='module')
def worker_configuration_file_location(custom_filepath):
_configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH,
UrsulaConfiguration.generate_filename())
return _configuration_file_location
@pytest.fixture(scope='module')
def stakeholder_configuration_file_location(custom_filepath):
_configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH,
StakeHolderConfiguration.generate_filename())
return _configuration_file_location

View File

@ -57,7 +57,7 @@ NUMBER_OF_MOCK_KEYSTORE_ACCOUNTS = NUMBER_OF_ETH_TEST_ACCOUNTS
# Testerchain
#
TEST_CONTRACTS_DIR = Path(BASE_DIR) / 'tests' / 'blockchain' / 'eth' / 'contracts' / 'contracts'
TEST_CONTRACTS_DIR = Path(BASE_DIR) / 'tests' / 'contracts' / 'contracts'
ONE_YEAR_IN_SECONDS = ((60 * 60) * 24) * 365

Some files were not shown because too many files have changed in this diff Show More