mirror of https://github.com/nucypher/nucypher.git
commit
1f4acace8f
|
@ -102,7 +102,7 @@ policy = ALICE.grant(BOB,
|
|||
expiration=policy_end_datetime)
|
||||
|
||||
assert policy.public_key == policy_pubkey
|
||||
policy.publishing_mutex.block_until_complete()
|
||||
policy.treasure_map_publisher.block_until_complete()
|
||||
|
||||
# Alice puts her public key somewhere for Bob to find later...
|
||||
alices_pubkey_bytes_saved_for_posterity = bytes(ALICE.stamp)
|
||||
|
|
|
@ -141,7 +141,7 @@ policy = alicia.grant(bob=doctor_strange,
|
|||
m=m,
|
||||
n=n,
|
||||
expiration=policy_end_datetime)
|
||||
policy.publishing_mutex.block_until_complete()
|
||||
policy.treasure_map_publisher.block_until_complete()
|
||||
print("Done!")
|
||||
|
||||
# For the demo, we need a way to share with Bob some additional info
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
More logging added for arrangement proposal failures, and more suitable exceptions thrown.
|
|
@ -0,0 +1 @@
|
|||
Arrangement proposals and policy enactment are performed in parallel, with more nodes being considered as some of the requests fail. This improves granting reliability.
|
|
@ -1760,9 +1760,13 @@ class WeightedSampler:
|
|||
"""
|
||||
|
||||
def __init__(self, weighted_elements: Dict[Any, int]):
|
||||
elements, weights = zip(*weighted_elements.items())
|
||||
if weighted_elements:
|
||||
elements, weights = zip(*weighted_elements.items())
|
||||
else:
|
||||
elements, weights = [], []
|
||||
self.totals = list(accumulate(weights))
|
||||
self.elements = elements
|
||||
self.__length = len(self.totals)
|
||||
|
||||
def sample_no_replacement(self, rng, quantity: int) -> list:
|
||||
"""
|
||||
|
@ -1780,25 +1784,26 @@ class WeightedSampler:
|
|||
if quantity > len(self):
|
||||
raise ValueError("Cannot sample more than the total amount of elements without replacement")
|
||||
|
||||
totals = self.totals.copy()
|
||||
samples = []
|
||||
|
||||
for i in range(quantity):
|
||||
position = rng.randint(0, totals[-1] - 1)
|
||||
idx = bisect_right(totals, position)
|
||||
position = rng.randint(0, self.totals[-1] - 1)
|
||||
idx = bisect_right(self.totals, position)
|
||||
samples.append(self.elements[idx])
|
||||
|
||||
# Adjust the totals so that they correspond
|
||||
# to the weight of the element `idx` being set to 0.
|
||||
prev_total = totals[idx - 1] if idx > 0 else 0
|
||||
weight = totals[idx] - prev_total
|
||||
for j in range(idx, len(totals)):
|
||||
totals[j] -= weight
|
||||
prev_total = self.totals[idx - 1] if idx > 0 else 0
|
||||
weight = self.totals[idx] - prev_total
|
||||
for j in range(idx, len(self.totals)):
|
||||
self.totals[j] -= weight
|
||||
|
||||
self.__length -= quantity
|
||||
|
||||
return samples
|
||||
|
||||
def __len__(self):
|
||||
return len(self.totals)
|
||||
return self.__length
|
||||
|
||||
|
||||
class StakersReservoir:
|
||||
|
|
|
@ -120,14 +120,13 @@ class AliceInterface(CharacterPublicInterface):
|
|||
n=n,
|
||||
value=value,
|
||||
rate=rate,
|
||||
expiration=expiration,
|
||||
discover_on_this_thread=True)
|
||||
expiration=expiration)
|
||||
|
||||
new_policy.publishing_mutex.block_until_success_is_reasonably_likely()
|
||||
new_policy.treasure_map_publisher.block_until_success_is_reasonably_likely()
|
||||
|
||||
response_data = {'treasure_map': new_policy.treasure_map,
|
||||
'policy_encrypting_key': new_policy.public_key,
|
||||
'alice_verifying_key': new_policy.alice.stamp}
|
||||
'alice_verifying_key': new_policy.alice_verifying_key}
|
||||
|
||||
return response_data
|
||||
|
||||
|
|
|
@ -297,7 +297,6 @@ class Alice(Character, BlockchainPolicyAuthor):
|
|||
bob: "Bob",
|
||||
label: bytes,
|
||||
handpicked_ursulas: set = None,
|
||||
discover_on_this_thread: bool = True,
|
||||
timeout: int = None,
|
||||
publish_treasure_map: bool = True,
|
||||
block_until_success_is_reasonably_likely: bool = True,
|
||||
|
@ -332,7 +331,7 @@ class Alice(Character, BlockchainPolicyAuthor):
|
|||
# If we're federated only, we need to block to make sure we have enough nodes.
|
||||
if self.federated_only and len(self.known_nodes) < policy.n:
|
||||
good_to_go = self.block_until_number_of_known_nodes_is(number_of_nodes_to_know=policy.n,
|
||||
learn_on_this_thread=discover_on_this_thread,
|
||||
learn_on_this_thread=True,
|
||||
timeout=timeout)
|
||||
if not good_to_go:
|
||||
raise ValueError(
|
||||
|
@ -341,20 +340,17 @@ class Alice(Character, BlockchainPolicyAuthor):
|
|||
"know which nodes to use. Either pass them here or when you make the Policy, "
|
||||
"or run the learning loop on a network with enough Ursulas.".format(policy.n))
|
||||
|
||||
self.log.debug(f"Making arrangements for {policy} ... ")
|
||||
policy.make_arrangements(network_middleware=self.network_middleware,
|
||||
handpicked_ursulas=handpicked_ursulas,
|
||||
discover_on_this_thread=discover_on_this_thread)
|
||||
|
||||
# REST call happens here, as does population of TreasureMap.
|
||||
self.log.debug(f"Enacting {policy} ... ")
|
||||
|
||||
# TODO: Make it optional to publish to blockchain? Or is this presumptive based on the `Policy` type?
|
||||
policy.enact(network_middleware=self.network_middleware, publish_treasure_map=publish_treasure_map)
|
||||
enacted_policy = policy.enact(network_middleware=self.network_middleware,
|
||||
handpicked_ursulas=handpicked_ursulas,
|
||||
publish_treasure_map=publish_treasure_map)
|
||||
|
||||
self.add_active_policy(enacted_policy)
|
||||
|
||||
if publish_treasure_map and block_until_success_is_reasonably_likely:
|
||||
policy.publishing_mutex.block_until_success_is_reasonably_likely()
|
||||
return policy # Now with TreasureMap affixed!
|
||||
enacted_policy.treasure_map_publisher.block_until_success_is_reasonably_likely()
|
||||
return enacted_policy
|
||||
|
||||
def get_policy_encrypting_key_from_label(self, label: bytes) -> UmbralPublicKey:
|
||||
alice_delegating_power = self._crypto_power.power_ups(DelegatingPower)
|
||||
|
|
|
@ -139,13 +139,16 @@ class Amonia(Alice):
|
|||
return alice_clone
|
||||
|
||||
@staticmethod
|
||||
def enact_without_tabulating_responses(policy, network_middleware, *_args, **_kwargs):
|
||||
for arrangement in policy._Policy__assign_kfrags():
|
||||
arrangement_message_kit = arrangement.encrypt_payload_for_ursula()
|
||||
def enact_without_tabulating_responses(policy, network_middleware, arrangements, publication_transaction, **_kwargs):
|
||||
for ursula, kfrag in zip(arrangements, policy.kfrags):
|
||||
arrangement = arrangements[ursula]
|
||||
payload = policy._make_enactment_payload(publication_transaction, kfrag)
|
||||
message_kit, _signature = policy.alice.encrypt_for(ursula, payload)
|
||||
|
||||
try:
|
||||
network_middleware.enact_policy(arrangement.ursula,
|
||||
network_middleware.enact_policy(ursula,
|
||||
arrangement.id,
|
||||
arrangement_message_kit.to_bytes())
|
||||
message_kit.to_bytes())
|
||||
except Exception as e:
|
||||
# I don't care what went wrong - I will keep trying to ram arrangements through.
|
||||
continue
|
||||
|
@ -156,9 +159,9 @@ class Amonia(Alice):
|
|||
"""
|
||||
|
||||
def what_do_you_mean_you_dont_tip(policy, *args, **kwargs):
|
||||
policy.publish_transaction = b"He convinced me, gimme back my $"
|
||||
return b"He convinced me, gimme back my $"
|
||||
|
||||
with patch("nucypher.policy.policies.BlockchainPolicy.publish_to_blockchain", what_do_you_mean_you_dont_tip):
|
||||
with patch("nucypher.policy.policies.BlockchainPolicy._publish_to_blockchain", what_do_you_mean_you_dont_tip):
|
||||
return super().grant(*args, **kwargs)
|
||||
|
||||
def circumvent_safegaurds_and_grant_without_paying(self, *args, **kwargs):
|
||||
|
@ -167,7 +170,7 @@ class Amonia(Alice):
|
|||
|
||||
Can I grant for free if I change the client code to my liking?
|
||||
"""
|
||||
with patch("nucypher.policy.policies.Policy.enact", self.enact_without_tabulating_responses):
|
||||
with patch("nucypher.policy.policies.Policy._enact_arrangements", self.enact_without_tabulating_responses):
|
||||
return self.grant_without_paying(*args, **kwargs)
|
||||
|
||||
def grant_while_paying_the_wrong_nodes(self,
|
||||
|
@ -180,28 +183,23 @@ class Amonia(Alice):
|
|||
an on-chain Policy using PolicyManager, I'm hoping Ursula won't notice.
|
||||
"""
|
||||
|
||||
def publish_wrong_payee_address_to_blockchain(policy, *args, **kwargs):
|
||||
receipt = policy.author.policy_agent.create_policy(
|
||||
policy_id=policy.hrac()[:HRAC_LENGTH], # bytes16 _policyID
|
||||
author_address=policy.author.checksum_address,
|
||||
def publish_wrong_payee_address_to_blockchain(policy, _ursulas):
|
||||
receipt = policy.alice.policy_agent.create_policy(
|
||||
policy_id=policy.hrac, # bytes16 _policyID
|
||||
author_address=policy.alice.checksum_address,
|
||||
value=policy.value,
|
||||
end_timestamp=policy.expiration.epoch, # uint16 _numberOfPeriods
|
||||
node_addresses=[f.checksum_address for f in ursulas_to_pay_instead] # address[] memory _nodes
|
||||
)
|
||||
|
||||
# Capture Response
|
||||
policy.receipt = receipt
|
||||
policy.publish_transaction = receipt['transactionHash']
|
||||
policy.is_published = True
|
||||
return receipt['transactionHash']
|
||||
|
||||
return receipt
|
||||
|
||||
with patch("nucypher.policy.policies.BlockchainPolicy.publish_to_blockchain",
|
||||
with patch("nucypher.policy.policies.BlockchainPolicy._publish_to_blockchain",
|
||||
publish_wrong_payee_address_to_blockchain):
|
||||
with patch("nucypher.policy.policies.Policy.enact", self.enact_without_tabulating_responses):
|
||||
with patch("nucypher.policy.policies.Policy._enact_arrangements", self.enact_without_tabulating_responses):
|
||||
return super().grant(handpicked_ursulas=ursulas_to_trick_into_working_for_free, *args, **kwargs)
|
||||
|
||||
def use_ursula_as_an_involuntary_and_unbeknownst_cdn(self, policy, sucker_ursula):
|
||||
def use_ursula_as_an_involuntary_and_unbeknownst_cdn(self, policy, bob, sucker_ursula):
|
||||
"""
|
||||
Ursula is a sucker.
|
||||
|
||||
|
@ -223,7 +221,7 @@ class Amonia(Alice):
|
|||
# I'll include a small portion of this awful film in a new message kit. We don't care about the signature for bob.
|
||||
not_the_bees = b"Not the bees!" + int(i).to_bytes(length=4, byteorder="big")
|
||||
like_a_map_but_awful.message_kit, _signature_for_bob_which_is_never_Used = encrypt_and_sign(
|
||||
policy.bob.public_keys(DecryptingPower),
|
||||
bob.public_keys(DecryptingPower),
|
||||
plaintext=not_the_bees,
|
||||
signer=self.stamp,
|
||||
)
|
||||
|
|
|
@ -136,10 +136,10 @@ UmbralMessageKit = PolicyMessageKit # Temporarily, until serialization w/ Enric
|
|||
|
||||
class RevocationKit:
|
||||
|
||||
def __init__(self, policy: 'Policy', signer: 'SignatureStamp'):
|
||||
def __init__(self, treasure_map, signer: 'SignatureStamp'):
|
||||
from nucypher.policy.collections import Revocation
|
||||
self.revocations = dict()
|
||||
for node_id, arrangement_id in policy.treasure_map:
|
||||
for node_id, arrangement_id in treasure_map:
|
||||
self.revocations[node_id] = Revocation(arrangement_id, signer=signer)
|
||||
|
||||
def __iter__(self):
|
||||
|
|
|
@ -193,8 +193,7 @@ class RestMiddleware:
|
|||
backend=default_backend())
|
||||
return certificate
|
||||
|
||||
def propose_arrangement(self, arrangement):
|
||||
node = arrangement.ursula
|
||||
def propose_arrangement(self, node, arrangement):
|
||||
response = self.client.post(node_or_sprout=node,
|
||||
path="consider_arrangement",
|
||||
data=bytes(arrangement),
|
||||
|
|
|
@ -207,7 +207,7 @@ def _make_rest_app(datastore: Datastore, this_node, domain: str, log: Logger) ->
|
|||
with datastore.describe(PolicyArrangement, arrangement.id.hex(), writeable=True) as new_policy_arrangement:
|
||||
new_policy_arrangement.arrangement_id = arrangement.id.hex().encode()
|
||||
new_policy_arrangement.expiration = arrangement.expiration
|
||||
new_policy_arrangement.alice_verifying_key = arrangement.alice.stamp.as_umbral_pubkey()
|
||||
new_policy_arrangement.alice_verifying_key = arrangement.alice_verifying_key
|
||||
|
||||
# TODO: Fine, we'll add the arrangement here, but if we never hear from Alice again to enact it,
|
||||
# we need to prune it at some point. #1700
|
||||
|
|
|
@ -176,10 +176,10 @@ class TreasureMap:
|
|||
nodes_as_bytes += to_canonical_address(ursula_id) + arrangement_id
|
||||
return nodes_as_bytes
|
||||
|
||||
def add_arrangement(self, arrangement):
|
||||
def add_arrangement(self, ursula, arrangement):
|
||||
if self.destinations == NO_DECRYPTION_PERFORMED:
|
||||
raise TypeError("This TreasureMap is encrypted. You can't add another node without decrypting it.")
|
||||
self.destinations[arrangement.ursula.checksum_address] = arrangement.id # TODO: 1995
|
||||
self.destinations[ursula.checksum_address] = arrangement.id # TODO: 1995
|
||||
|
||||
def public_id(self) -> str:
|
||||
"""
|
||||
|
|
|
@ -342,59 +342,3 @@ class Card:
|
|||
|
||||
def delete(self) -> None:
|
||||
os.remove(str(self.filepath))
|
||||
|
||||
|
||||
class PolicyCredential: # TODO: Rename this. It is not a credential in any way.
|
||||
"""
|
||||
A portable structure that contains information necessary for Alice or Bob
|
||||
to utilize the policy on the network that the credential describes.
|
||||
"""
|
||||
|
||||
def __init__(self, alice_verifying_key, label, expiration, policy_pubkey,
|
||||
treasure_map=None):
|
||||
self.alice_verifying_key = alice_verifying_key
|
||||
self.label = label
|
||||
self.expiration = expiration
|
||||
self.policy_pubkey = policy_pubkey
|
||||
self.treasure_map = treasure_map
|
||||
|
||||
def to_json(self):
|
||||
"""
|
||||
Serializes the PolicyCredential to JSON.
|
||||
"""
|
||||
cred_dict = {
|
||||
'alice_verifying_key': bytes(self.alice_verifying_key).hex(),
|
||||
'label': self.label.hex(),
|
||||
'expiration': self.expiration.iso8601(),
|
||||
'policy_pubkey': bytes(self.policy_pubkey).hex()
|
||||
}
|
||||
|
||||
if self.treasure_map is not None:
|
||||
cred_dict['treasure_map'] = bytes(self.treasure_map).hex()
|
||||
|
||||
return json.dumps(cred_dict)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data: str):
|
||||
"""Deserializes the PolicyCredential from JSON."""
|
||||
cred_json = json.loads(data)
|
||||
alice_verifying_key = UmbralPublicKey.from_bytes(cred_json['alice_verifying_key'], decoder=bytes.fromhex)
|
||||
label = bytes.fromhex(cred_json['label'])
|
||||
expiration = maya.MayaDT.from_iso8601(cred_json['expiration'])
|
||||
policy_pubkey = UmbralPublicKey.from_bytes(cred_json['policy_pubkey'], decoder=bytes.fromhex)
|
||||
treasure_map = None
|
||||
if 'treasure_map' in cred_json:
|
||||
# TODO: Support unsigned treasuremaps?
|
||||
treasure_map = SignedTreasureMap.from_bytes(bytes.fromhex(cred_json['treasure_map']))
|
||||
|
||||
return cls(alice_verifying_key,
|
||||
label,
|
||||
expiration,
|
||||
policy_pubkey,
|
||||
treasure_map)
|
||||
|
||||
def __eq__(self, other):
|
||||
return ((self.alice_verifying_key == other.alice_verifying_key) and
|
||||
(self.label == other.label) and
|
||||
(self.expiration == other.expiration) and
|
||||
(self.policy_pubkey == other.policy_pubkey))
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,304 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import time
|
||||
from queue import Queue, Empty
|
||||
from threading import Thread, Event, Lock, Timer, get_ident
|
||||
from typing import Callable, List, Any, Optional, Dict
|
||||
|
||||
from constant_sorrow.constants import PRODUCER_STOPPED, TIMEOUT_TRIGGERED
|
||||
from twisted._threads import AlreadyQuit
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
|
||||
|
||||
class Success:
|
||||
def __init__(self, value, result):
|
||||
self.value = value
|
||||
self.result = result
|
||||
|
||||
class Failure:
|
||||
def __init__(self, value, exception):
|
||||
self.value = value
|
||||
self.exception = exception
|
||||
|
||||
|
||||
class Cancelled(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SetOnce:
|
||||
"""
|
||||
A convenience wrapper for a value that can be set once (which can be waited on),
|
||||
and cannot be overwritten (unless cleared).
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._lock = Lock()
|
||||
self._set_event = Event()
|
||||
self._value = None
|
||||
|
||||
def set(self, value):
|
||||
with self._lock:
|
||||
if not self._set_event.is_set():
|
||||
self._value = value
|
||||
self._set_event.set()
|
||||
|
||||
def is_set(self):
|
||||
return self._set_event.is_set()
|
||||
|
||||
def get_and_clear(self):
|
||||
with self._lock:
|
||||
value = self._value
|
||||
self._value = None
|
||||
self._set_event.clear()
|
||||
return value
|
||||
|
||||
def get(self):
|
||||
self._set_event.wait()
|
||||
return self._value
|
||||
|
||||
|
||||
class WorkerPool:
|
||||
"""
|
||||
A generalized class that can start multiple workers in a thread pool with values
|
||||
drawn from the given value factory object,
|
||||
and wait for their completion and a given number of successes
|
||||
(a worker returning something without throwing an exception).
|
||||
"""
|
||||
|
||||
class TimedOut(Exception):
|
||||
"Raised if waiting for the target number of successes timed out."
|
||||
|
||||
class OutOfValues(Exception):
|
||||
"Raised if the value factory is out of values, but the target number was not reached."
|
||||
|
||||
def __init__(self,
|
||||
worker: Callable[[Any], Any],
|
||||
value_factory: Callable[[int], Optional[List[Any]]],
|
||||
target_successes,
|
||||
timeout: float,
|
||||
stagger_timeout: float = 0,
|
||||
threadpool_size: int = None):
|
||||
|
||||
# TODO: make stagger_timeout a part of the value factory?
|
||||
|
||||
self._worker = worker
|
||||
self._value_factory = value_factory
|
||||
self._timeout = timeout
|
||||
self._stagger_timeout = stagger_timeout
|
||||
self._target_successes = target_successes
|
||||
|
||||
thread_pool_kwargs = {}
|
||||
if threadpool_size is not None:
|
||||
thread_pool_kwargs['minthreads'] = threadpool_size
|
||||
thread_pool_kwargs['maxthreads'] = threadpool_size
|
||||
self._threadpool = ThreadPool(**thread_pool_kwargs)
|
||||
|
||||
# These three tasks must be run in separate threads
|
||||
# to avoid being blocked by workers in the thread pool.
|
||||
self._bail_on_timeout_thread = Thread(target=self._bail_on_timeout)
|
||||
self._produce_values_thread = Thread(target=self._produce_values)
|
||||
self._process_results_thread = Thread(target=self._process_results)
|
||||
|
||||
self._successes = {}
|
||||
self._failures = {}
|
||||
self._started_tasks = 0
|
||||
self._finished_tasks = 0
|
||||
|
||||
self._cancel_event = Event()
|
||||
self._result_queue = Queue()
|
||||
self._target_value = SetOnce()
|
||||
self._unexpected_error = SetOnce()
|
||||
self._results_lock = Lock()
|
||||
self._stopped = False
|
||||
|
||||
def start(self):
|
||||
# TODO: check if already started?
|
||||
self._threadpool.start()
|
||||
self._produce_values_thread.start()
|
||||
self._process_results_thread.start()
|
||||
self._bail_on_timeout_thread.start()
|
||||
|
||||
def cancel(self):
|
||||
"""
|
||||
Cancels the tasks enqueued in the thread pool and stops the producer thread.
|
||||
"""
|
||||
self._cancel_event.set()
|
||||
|
||||
def join(self):
|
||||
"""
|
||||
Waits for all the threads to finish.
|
||||
Can be called several times.
|
||||
"""
|
||||
|
||||
if self._stopped:
|
||||
return # or raise AlreadyStopped?
|
||||
|
||||
self._produce_values_thread.join()
|
||||
self._process_results_thread.join()
|
||||
self._bail_on_timeout_thread.join()
|
||||
|
||||
# protect from a possible race
|
||||
try:
|
||||
self._threadpool.stop()
|
||||
except AlreadyQuit:
|
||||
pass
|
||||
self._stopped = True
|
||||
|
||||
if self._unexpected_error.is_set():
|
||||
e = self._unexpected_error.get()
|
||||
raise RuntimeError(f"Unexpected error in the producer thread: {e}")
|
||||
|
||||
def _sleep(self, timeout):
|
||||
"""
|
||||
Sleeps for a given timeout, can be interrupted by a cancellation event.
|
||||
"""
|
||||
if self._cancel_event.wait(timeout):
|
||||
raise Cancelled
|
||||
|
||||
def block_until_target_successes(self) -> Dict:
|
||||
"""
|
||||
Blocks until the target number of successes is reached.
|
||||
Returns a dictionary of values matched to results.
|
||||
Can be called several times.
|
||||
"""
|
||||
if self._unexpected_error.is_set():
|
||||
# So that we don't raise it again when join() is called
|
||||
e = self._unexpected_error.get_and_clear()
|
||||
raise RuntimeError(f"Unexpected error in the producer thread: {e}")
|
||||
|
||||
result = self._target_value.get()
|
||||
if result == TIMEOUT_TRIGGERED:
|
||||
raise self.TimedOut()
|
||||
elif result == PRODUCER_STOPPED:
|
||||
raise self.OutOfValues()
|
||||
return result
|
||||
|
||||
def get_failures(self) -> Dict:
|
||||
"""
|
||||
Get the current failures, as a dictionary of values to thrown exceptions.
|
||||
"""
|
||||
with self._results_lock:
|
||||
return dict(self._failures)
|
||||
|
||||
def get_successes(self) -> Dict:
|
||||
"""
|
||||
Get the current successes, as a dictionary of values to worker return values.
|
||||
"""
|
||||
with self._results_lock:
|
||||
return dict(self._successes)
|
||||
|
||||
def _bail_on_timeout(self):
|
||||
"""
|
||||
A service thread that cancels the pool on timeout.
|
||||
"""
|
||||
if not self._cancel_event.wait(timeout=self._timeout):
|
||||
self._target_value.set(TIMEOUT_TRIGGERED)
|
||||
self._cancel_event.set()
|
||||
|
||||
def _worker_wrapper(self, value):
|
||||
"""
|
||||
A wrapper that catches exceptions thrown by the worker
|
||||
and sends the results to the processing thread.
|
||||
"""
|
||||
try:
|
||||
# If we're in the cancelled state, interrupt early
|
||||
self._sleep(0)
|
||||
|
||||
result = self._worker(value)
|
||||
self._result_queue.put(Success(value, result))
|
||||
except Cancelled as e:
|
||||
self._result_queue.put(e)
|
||||
except BaseException as e:
|
||||
self._result_queue.put(Failure(value, str(e)))
|
||||
|
||||
def _process_results(self):
|
||||
"""
|
||||
A service thread that processes worker results
|
||||
and waits for the target number of successes to be reached.
|
||||
"""
|
||||
producer_stopped = False
|
||||
success_event_reached = False
|
||||
while True:
|
||||
result = self._result_queue.get()
|
||||
|
||||
if result == PRODUCER_STOPPED:
|
||||
producer_stopped = True
|
||||
else:
|
||||
self._finished_tasks += 1
|
||||
if isinstance(result, Success):
|
||||
with self._results_lock:
|
||||
self._successes[result.value] = result.result
|
||||
len_successes = len(self._successes)
|
||||
if not success_event_reached and len_successes == self._target_successes:
|
||||
# A protection for the case of repeating values.
|
||||
# Only trigger the target value once.
|
||||
success_event_reached = True
|
||||
self._target_value.set(self.get_successes())
|
||||
if isinstance(result, Failure):
|
||||
with self._results_lock:
|
||||
self._failures[result.value] = result.exception
|
||||
|
||||
if producer_stopped and self._finished_tasks == self._started_tasks:
|
||||
self.cancel() # to cancel the timeout thread
|
||||
self._target_value.set(PRODUCER_STOPPED)
|
||||
break
|
||||
|
||||
def _produce_values(self):
|
||||
while True:
|
||||
try:
|
||||
with self._results_lock:
|
||||
len_successes = len(self._successes)
|
||||
batch = self._value_factory(len_successes)
|
||||
if not batch:
|
||||
break
|
||||
|
||||
self._started_tasks += len(batch)
|
||||
for value in batch:
|
||||
# There is a possible race between `callInThread()` and `stop()`,
|
||||
# But we never execute them at the same time,
|
||||
# because `join()` checks that the producer thread is stopped.
|
||||
self._threadpool.callInThread(self._worker_wrapper, value)
|
||||
|
||||
self._sleep(self._stagger_timeout)
|
||||
|
||||
except Cancelled:
|
||||
break
|
||||
|
||||
except BaseException as e:
|
||||
self._unexpected_error.set(e)
|
||||
self.cancel()
|
||||
break
|
||||
|
||||
self._result_queue.put(PRODUCER_STOPPED)
|
||||
|
||||
|
||||
class AllAtOnceFactory:
|
||||
"""
|
||||
A simple value factory that returns all its values in a single batch.
|
||||
"""
|
||||
|
||||
def __init__(self, values):
|
||||
self.values = values
|
||||
self._produced = False
|
||||
|
||||
def __call__(self, _successes):
|
||||
if self._produced:
|
||||
return None
|
||||
else:
|
||||
self._produced = True
|
||||
return self.values
|
|
@ -166,8 +166,8 @@ def test_weighted_sampler(sample_size):
|
|||
weighted_elements = {element: weight for element, weight in zip(elements, weights)}
|
||||
|
||||
samples = 100000
|
||||
sampler = WeightedSampler(weighted_elements)
|
||||
for i in range(samples):
|
||||
sampler = WeightedSampler(weighted_elements)
|
||||
sample_set = sampler.sample_no_replacement(rng, sample_size)
|
||||
counter.update({tuple(sample_set): 1})
|
||||
|
||||
|
|
|
@ -118,18 +118,18 @@ def grant_control_request(blockchain_bob):
|
|||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def join_control_request(blockchain_bob, enacted_blockchain_policy):
|
||||
def join_control_request(blockchain_alice, blockchain_bob, enacted_blockchain_policy):
|
||||
method_name = 'join_policy'
|
||||
|
||||
params = {
|
||||
'label': enacted_blockchain_policy.label.decode(),
|
||||
'alice_verifying_key': bytes(enacted_blockchain_policy.alice.stamp).hex(),
|
||||
'alice_verifying_key': bytes(enacted_blockchain_policy.alice_verifying_key).hex(),
|
||||
}
|
||||
return method_name, params
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def retrieve_control_request(blockchain_bob, enacted_blockchain_policy, capsule_side_channel_blockchain):
|
||||
def retrieve_control_request(blockchain_alice, blockchain_bob, enacted_blockchain_policy, capsule_side_channel_blockchain):
|
||||
capsule_side_channel_blockchain.reset()
|
||||
method_name = 'retrieve'
|
||||
message_kit = capsule_side_channel_blockchain()
|
||||
|
@ -137,7 +137,7 @@ def retrieve_control_request(blockchain_bob, enacted_blockchain_policy, capsule_
|
|||
params = {
|
||||
'label': enacted_blockchain_policy.label.decode(),
|
||||
'policy_encrypting_key': bytes(enacted_blockchain_policy.public_key).hex(),
|
||||
'alice_verifying_key': bytes(enacted_blockchain_policy.alice.stamp).hex(),
|
||||
'alice_verifying_key': bytes(enacted_blockchain_policy.alice_verifying_key).hex(),
|
||||
'message_kit': b64encode(message_kit.to_bytes()).decode(),
|
||||
}
|
||||
return method_name, params
|
||||
|
|
|
@ -26,9 +26,15 @@ from nucypher.policy.collections import SignedTreasureMap
|
|||
from tests.utils.controllers import get_fields, validate_json_rpc_response_data
|
||||
|
||||
|
||||
def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_blockchain_policy):
|
||||
# Simulate passing in a teacher-uri
|
||||
enacted_blockchain_policy.bob.remember_node(list(enacted_blockchain_policy.accepted_ursulas)[0])
|
||||
def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_blockchain_policy, blockchain_bob, blockchain_ursulas):
|
||||
for ursula in blockchain_ursulas:
|
||||
if ursula.checksum_address in enacted_blockchain_policy.treasure_map.destinations:
|
||||
# Simulate passing in a teacher-uri
|
||||
blockchain_bob.remember_node(ursula)
|
||||
break
|
||||
else:
|
||||
# Shouldn't happen
|
||||
raise Exception("No known Ursulas present in the treasure map destinations")
|
||||
|
||||
method_name, params = join_control_request
|
||||
request_data = {'method': method_name, 'params': params}
|
||||
|
|
|
@ -175,14 +175,20 @@ def test_alice_character_control_decrypt(alice_web_controller_test_client,
|
|||
assert response.status_code == 405
|
||||
|
||||
|
||||
def test_bob_character_control_join_policy(bob_web_controller_test_client, enacted_blockchain_policy):
|
||||
def test_bob_character_control_join_policy(bob_web_controller_test_client, enacted_blockchain_policy, blockchain_alice, blockchain_bob, blockchain_ursulas):
|
||||
request_data = {
|
||||
'label': enacted_blockchain_policy.label.decode(),
|
||||
'alice_verifying_key': bytes(enacted_blockchain_policy.alice.stamp).hex(),
|
||||
'alice_verifying_key': bytes(enacted_blockchain_policy.alice_verifying_key).hex(),
|
||||
}
|
||||
|
||||
# Simulate passing in a teacher-uri
|
||||
enacted_blockchain_policy.bob.remember_node(list(enacted_blockchain_policy.accepted_ursulas)[0])
|
||||
for ursula in blockchain_ursulas:
|
||||
if ursula.checksum_address in enacted_blockchain_policy.treasure_map.destinations:
|
||||
# Simulate passing in a teacher-uri
|
||||
blockchain_bob.remember_node(ursula)
|
||||
break
|
||||
else:
|
||||
# Shouldn't happen
|
||||
raise Exception("No known Ursulas present in the treasure map destinations")
|
||||
|
||||
response = bob_web_controller_test_client.post('/join_policy', data=json.dumps(request_data))
|
||||
assert b'{"result": {"policy_encrypting_key": "OK"}' in response.data # TODO
|
||||
|
|
|
@ -23,12 +23,10 @@ from nucypher.crypto.api import keccak_digest
|
|||
from nucypher.datastore.models import PolicyArrangement
|
||||
from nucypher.datastore.models import TreasureMap as DatastoreTreasureMap
|
||||
from nucypher.policy.collections import SignedTreasureMap as DecentralizedTreasureMap
|
||||
from nucypher.policy.identity import PolicyCredential
|
||||
from tests.utils.middleware import MockRestMiddleware
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('blockchain_ursulas')
|
||||
def test_decentralized_grant(blockchain_alice, blockchain_bob, agency):
|
||||
def test_decentralized_grant(blockchain_alice, blockchain_bob, blockchain_ursulas, agency):
|
||||
# Setup the policy details
|
||||
n = 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
|
@ -43,73 +41,47 @@ def test_decentralized_grant(blockchain_alice, blockchain_bob, agency):
|
|||
expiration=policy_end_datetime)
|
||||
|
||||
# Check the policy ID
|
||||
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
|
||||
policy_id = keccak_digest(label + bytes(blockchain_bob.stamp))
|
||||
assert policy_id == policy.id
|
||||
|
||||
# The number of accepted arrangements at least the number of Ursulas we're using (n)
|
||||
assert len(policy._accepted_arrangements) >= n
|
||||
|
||||
# The number of actually enacted arrangements is exactly equal to n.
|
||||
assert len(policy._enacted_arrangements) == n
|
||||
assert len(policy.treasure_map.destinations) == n
|
||||
|
||||
# Let's look at the enacted arrangements.
|
||||
for kfrag in policy.kfrags:
|
||||
arrangement = policy._enacted_arrangements[kfrag]
|
||||
for ursula in blockchain_ursulas:
|
||||
if ursula.checksum_address in policy.treasure_map.destinations:
|
||||
arrangement_id = policy.treasure_map.destinations[ursula.checksum_address]
|
||||
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
with arrangement.ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement:
|
||||
assert kfrag == policy_arrangement.kfrag
|
||||
|
||||
# Test PolicyCredential w/o TreasureMap
|
||||
credential = policy.credential(with_treasure_map=False)
|
||||
assert credential.alice_verifying_key == policy.alice.stamp
|
||||
assert credential.label == policy.label
|
||||
assert credential.expiration == policy.expiration
|
||||
assert credential.policy_pubkey == policy.public_key
|
||||
assert credential.treasure_map is None
|
||||
|
||||
cred_json = credential.to_json()
|
||||
deserialized_cred = PolicyCredential.from_json(cred_json)
|
||||
assert credential == deserialized_cred
|
||||
|
||||
# Test PolicyCredential w/ TreasureMap
|
||||
credential = policy.credential()
|
||||
assert credential.alice_verifying_key == policy.alice.stamp
|
||||
assert credential.label == policy.label
|
||||
assert credential.expiration == policy.expiration
|
||||
assert credential.policy_pubkey == policy.public_key
|
||||
assert credential.treasure_map == policy.treasure_map
|
||||
|
||||
cred_json = credential.to_json()
|
||||
deserialized_cred = PolicyCredential.from_json(cred_json)
|
||||
assert credential == deserialized_cred
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
with ursula.datastore.describe(PolicyArrangement, arrangement_id.hex()) as policy_arrangement:
|
||||
retrieved_kfrag = policy_arrangement.kfrag
|
||||
assert bool(retrieved_kfrag) # TODO: try to assemble them back?
|
||||
|
||||
|
||||
def test_alice_sets_treasure_map_decentralized(enacted_blockchain_policy):
|
||||
def test_alice_sets_treasure_map_decentralized(enacted_blockchain_policy, blockchain_alice, blockchain_bob):
|
||||
"""
|
||||
Same as test_alice_sets_treasure_map except with a blockchain policy.
|
||||
"""
|
||||
enacted_blockchain_policy.publish_treasure_map(network_middleware=MockRestMiddleware())
|
||||
treasure_map_hrac = enacted_blockchain_policy.treasure_map._hrac[:16].hex()
|
||||
found = 0
|
||||
for node in enacted_blockchain_policy.bob.matching_nodes_among(enacted_blockchain_policy.alice.known_nodes):
|
||||
for node in blockchain_bob.matching_nodes_among(blockchain_alice.known_nodes):
|
||||
with node.datastore.describe(DatastoreTreasureMap, treasure_map_hrac) as treasure_map_on_node:
|
||||
assert DecentralizedTreasureMap.from_bytes(treasure_map_on_node.treasure_map) == enacted_blockchain_policy.treasure_map
|
||||
found += 1
|
||||
assert found
|
||||
|
||||
|
||||
def test_bob_retrieves_treasure_map_from_decentralized_node(enacted_blockchain_policy):
|
||||
def test_bob_retrieves_treasure_map_from_decentralized_node(enacted_blockchain_policy, blockchain_alice, blockchain_bob):
|
||||
"""
|
||||
This is the same test as `test_bob_retrieves_the_treasure_map_and_decrypt_it`,
|
||||
except with an `enacted_blockchain_policy`.
|
||||
"""
|
||||
bob = enacted_blockchain_policy.bob
|
||||
bob = blockchain_bob
|
||||
_previous_domain = bob.domain
|
||||
bob.domain = None # Bob has no knowledge of the network.
|
||||
|
||||
with pytest.raises(bob.NotEnoughTeachers):
|
||||
treasure_map_from_wire = bob.get_treasure_map(enacted_blockchain_policy.alice.stamp,
|
||||
treasure_map_from_wire = bob.get_treasure_map(blockchain_alice.stamp,
|
||||
enacted_blockchain_policy.label)
|
||||
|
||||
# Bob finds out about one Ursula (in the real world, a seed node, hardcoded based on his learning domain)
|
||||
|
@ -120,6 +92,6 @@ def test_bob_retrieves_treasure_map_from_decentralized_node(enacted_blockchain_p
|
|||
bob.learn_from_teacher_node(eager=True)
|
||||
|
||||
# Now he'll have better success finding that map.
|
||||
treasure_map_from_wire = bob.get_treasure_map(enacted_blockchain_policy.alice.stamp,
|
||||
treasure_map_from_wire = bob.get_treasure_map(blockchain_alice.stamp,
|
||||
enacted_blockchain_policy.label)
|
||||
assert enacted_blockchain_policy.treasure_map == treasure_map_from_wire
|
||||
|
|
|
@ -156,5 +156,5 @@ def test_put_additional_treasure_map_on_network(blockchain_ursulas, blockchain_a
|
|||
# This should 409 because Ursula won't be able to find an HRAC on-chain
|
||||
# with the modified HRAC.
|
||||
with pytest.raises(RestMiddleware.UnexpectedResponse) as should_409:
|
||||
amonia.use_ursula_as_an_involuntary_and_unbeknownst_cdn(policy, sucker_ursula=blockchain_ursulas[0])
|
||||
amonia.use_ursula_as_an_involuntary_and_unbeknownst_cdn(policy, blockchain_bob, sucker_ursula=blockchain_ursulas[0])
|
||||
assert should_409.value.status == 409
|
||||
|
|
|
@ -23,7 +23,7 @@ from nucypher.characters.lawful import Enrico
|
|||
from nucypher.characters.unlawful import Vladimir
|
||||
from nucypher.crypto.api import verify_eip_191
|
||||
from nucypher.crypto.powers import SigningPower
|
||||
from nucypher.policy.policies import Policy
|
||||
from nucypher.policy.policies import BlockchainPolicy
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
from tests.utils.middleware import NodeIsDownMiddleware
|
||||
from tests.utils.ursula import make_decentralized_ursulas
|
||||
|
@ -166,7 +166,7 @@ def test_blockchain_ursulas_reencrypt(blockchain_ursulas, blockchain_alice, bloc
|
|||
blockchain_alice.network_middleware = NodeIsDownMiddleware()
|
||||
blockchain_alice.network_middleware.node_is_down(blockchain_ursulas[0])
|
||||
|
||||
with pytest.raises(Policy.Rejected):
|
||||
with pytest.raises(BlockchainPolicy.NotEnoughBlockchainUrsulas):
|
||||
_policy = blockchain_alice.grant(bob=blockchain_bob,
|
||||
label=b'another-label',
|
||||
m=m,
|
||||
|
|
|
@ -123,6 +123,7 @@ def test_bob_retrieves_twice_via_cli(click_runner,
|
|||
federated_ursulas,
|
||||
custom_filepath_2,
|
||||
federated_alice,
|
||||
federated_bob,
|
||||
mocker):
|
||||
|
||||
teacher = list(federated_ursulas)[0]
|
||||
|
@ -164,7 +165,7 @@ def test_bob_retrieves_twice_via_cli(click_runner,
|
|||
|
||||
def substitute_bob(*args, **kwargs):
|
||||
log.info("Substituting the Policy's Bob in CLI runtime.")
|
||||
this_fuckin_guy = enacted_federated_policy.bob
|
||||
this_fuckin_guy = federated_bob
|
||||
somebody_else = Ursula.from_teacher_uri(teacher_uri=kwargs['teacher_uri'],
|
||||
min_stake=0,
|
||||
federated_only=True,
|
||||
|
|
|
@ -557,8 +557,7 @@ def test_collect_rewards_integration(click_runner,
|
|||
handpicked_ursulas={ursula})
|
||||
|
||||
# Ensure that the handpicked Ursula was selected for the policy
|
||||
arrangement = list(blockchain_policy._accepted_arrangements)[0]
|
||||
assert arrangement.ursula == ursula
|
||||
assert ursula.checksum_address in blockchain_policy.treasure_map.destinations
|
||||
|
||||
# Bob learns about the new staker and joins the policy
|
||||
blockchain_bob.remember_node(node=ursula)
|
||||
|
|
|
@ -626,8 +626,7 @@ def test_collect_rewards_integration(click_runner,
|
|||
handpicked_ursulas={ursula})
|
||||
|
||||
# Ensure that the handpicked Ursula was selected for the policy
|
||||
arrangement = list(blockchain_policy._accepted_arrangements)[0]
|
||||
assert arrangement.ursula == ursula
|
||||
assert ursula.checksum_address in blockchain_policy.treasure_map.destinations
|
||||
|
||||
# Bob learns about the new staker and joins the policy
|
||||
blockchain_bob.start_learning_loop()
|
||||
|
|
|
@ -128,6 +128,7 @@ def test_vladimir_illegal_interface_key_does_not_propagate(blockchain_ursulas):
|
|||
def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_alice,
|
||||
idle_blockchain_policy,
|
||||
blockchain_ursulas):
|
||||
|
||||
target = list(blockchain_ursulas)[2]
|
||||
# First, let's imagine that Alice has sampled a Vladimir while making this policy.
|
||||
vladimir = Vladimir.from_target_ursula(target)
|
||||
|
@ -137,20 +138,16 @@ def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_ali
|
|||
|
||||
vladimir.substantiate_stamp()
|
||||
vladimir._Teacher__interface_signature = signature
|
||||
|
||||
class FakeArrangement:
|
||||
federated = False
|
||||
ursula = vladimir
|
||||
|
||||
def __bytes__(self):
|
||||
return b""
|
||||
|
||||
vladimir.node_storage.store_node_certificate(certificate=target.certificate)
|
||||
|
||||
# Ideally, a fishy node shouldn't be present in `known_nodes`,
|
||||
# but I guess we're testing the case when it became fishy somewhere between we learned about it
|
||||
# and the proposal arrangement.
|
||||
blockchain_alice.known_nodes[vladimir.checksum_address] = vladimir
|
||||
|
||||
with pytest.raises(vladimir.InvalidNode):
|
||||
idle_blockchain_policy.propose_arrangement(network_middleware=blockchain_alice.network_middleware,
|
||||
arrangement=FakeArrangement()
|
||||
)
|
||||
idle_blockchain_policy._propose_arrangement(address=vladimir.checksum_address,
|
||||
network_middleware=blockchain_alice.network_middleware)
|
||||
|
||||
|
||||
def test_treasure_map_cannot_be_duplicated(blockchain_ursulas,
|
||||
|
@ -171,15 +168,18 @@ def test_treasure_map_cannot_be_duplicated(blockchain_ursulas,
|
|||
expiration=policy_end_datetime)
|
||||
|
||||
matching_ursulas = blockchain_bob.matching_nodes_among(blockchain_ursulas)
|
||||
first_matching_ursula = matching_ursulas[0]
|
||||
completed_ursulas = policy.treasure_map_publisher.block_until_success_is_reasonably_likely()
|
||||
# Ursulas in `treasure_map_publisher` are not real Ursulas, but just some metadata of remote ones.
|
||||
# We need a real one to access its datastore.
|
||||
first_completed_ursula = [ursula for ursula in matching_ursulas if ursula in completed_ursulas][0]
|
||||
|
||||
with first_matching_ursula.datastore.describe(TreasureMap, policy.treasure_map._hrac.hex()) as saved_map_record:
|
||||
with first_completed_ursula.datastore.describe(TreasureMap, policy.treasure_map._hrac.hex()) as saved_map_record:
|
||||
assert saved_map_record.treasure_map == bytes(policy.treasure_map)
|
||||
|
||||
# This Ursula was actually a Vladimir.
|
||||
# Thus, he has access to the (encrypted) TreasureMap and can use its details to
|
||||
# try to store his own fake details.
|
||||
vladimir = Vladimir.from_target_ursula(first_matching_ursula)
|
||||
vladimir = Vladimir.from_target_ursula(first_completed_ursula)
|
||||
|
||||
ursulas_who_probably_do_not_have_the_map = [u for u in blockchain_ursulas if not u in matching_ursulas]
|
||||
node_on_which_to_store_bad_map = ursulas_who_probably_do_not_have_the_map[0]
|
||||
|
|
|
@ -0,0 +1,331 @@
|
|||
import random
|
||||
import time
|
||||
from typing import Iterable, Tuple, List, Callable
|
||||
|
||||
import pytest
|
||||
|
||||
from nucypher.utilities.concurrency import WorkerPool, AllAtOnceFactory
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def join_worker_pool(request):
|
||||
"""
|
||||
Makes sure the pool is properly joined at the end of the test,
|
||||
so that one doesn't have to wrap the whole test in a try-finally block.
|
||||
"""
|
||||
pool_to_join = None
|
||||
def register(pool):
|
||||
nonlocal pool_to_join
|
||||
pool_to_join = pool
|
||||
yield register
|
||||
pool_to_join.join()
|
||||
|
||||
|
||||
class WorkerRule:
|
||||
def __init__(self, fails: bool = False, timeout_min: float = 0, timeout_max: float = 0):
|
||||
self.fails = fails
|
||||
self.timeout_min = timeout_min
|
||||
self.timeout_max = timeout_max
|
||||
|
||||
|
||||
class WorkerOutcome:
|
||||
def __init__(self, fails: bool, timeout: float):
|
||||
self.fails = fails
|
||||
self.timeout = timeout
|
||||
|
||||
def __call__(self, value):
|
||||
time.sleep(self.timeout)
|
||||
if self.fails:
|
||||
raise Exception(f"Worker for {value} failed")
|
||||
else:
|
||||
return value
|
||||
|
||||
|
||||
def generate_workers(rules: Iterable[Tuple[WorkerRule, int]], seed=None):
|
||||
rng = random.Random(seed)
|
||||
outcomes = []
|
||||
for rule, quantity in rules:
|
||||
for _ in range(quantity):
|
||||
timeout = rng.uniform(rule.timeout_min, rule.timeout_max)
|
||||
outcomes.append(WorkerOutcome(rule.fails, timeout))
|
||||
|
||||
rng.shuffle(outcomes)
|
||||
|
||||
values = list(range(len(outcomes)))
|
||||
|
||||
def worker(value):
|
||||
return outcomes[value](value)
|
||||
|
||||
return {value: outcomes[value] for value in values}, worker
|
||||
|
||||
|
||||
def test_wait_for_successes(join_worker_pool):
|
||||
"""
|
||||
Checks that `block_until_target_successes()` returns in time and gives all the successes,
|
||||
if there were enough of them.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[
|
||||
(WorkerRule(timeout_min=0.5, timeout_max=1.5), 10),
|
||||
(WorkerRule(fails=True, timeout_min=1, timeout_max=3), 20),
|
||||
],
|
||||
seed=123)
|
||||
|
||||
factory = AllAtOnceFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=30)
|
||||
join_worker_pool(pool)
|
||||
|
||||
t_start = time.monotonic()
|
||||
pool.start()
|
||||
successes = pool.block_until_target_successes()
|
||||
t_end = time.monotonic()
|
||||
|
||||
failures = pool.get_failures()
|
||||
assert all(outcomes[value].fails for value in failures)
|
||||
|
||||
assert len(successes) == 10
|
||||
|
||||
# We have more threads in the pool than the workers,
|
||||
# so all the successful ones should be able to finish right away.
|
||||
assert t_end - t_start < 2
|
||||
|
||||
# Should be able to do it several times
|
||||
successes = pool.block_until_target_successes()
|
||||
assert len(successes) == 10
|
||||
|
||||
|
||||
def test_wait_for_successes_out_of_values(join_worker_pool):
|
||||
"""
|
||||
Checks that if there weren't enough successful workers, `block_until_target_successes()`
|
||||
raises an exception when the value factory is exhausted.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[
|
||||
(WorkerRule(timeout_min=0.5, timeout_max=1.5), 9),
|
||||
(WorkerRule(fails=True, timeout_min=0.5, timeout_max=1.5), 20),
|
||||
],
|
||||
seed=123)
|
||||
|
||||
factory = AllAtOnceFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=15)
|
||||
join_worker_pool(pool)
|
||||
|
||||
t_start = time.monotonic()
|
||||
pool.start()
|
||||
with pytest.raises(WorkerPool.OutOfValues):
|
||||
successes = pool.block_until_target_successes()
|
||||
t_end = time.monotonic()
|
||||
|
||||
# We have roughly 2 workers per thread, so it shouldn't take longer than 1.5s (max timeout) * 2
|
||||
assert t_end - t_start < 4
|
||||
|
||||
|
||||
def test_wait_for_successes_timed_out(join_worker_pool):
|
||||
"""
|
||||
Checks that if enough successful workers can't finish before the timeout, we get an exception.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[
|
||||
(WorkerRule(timeout_min=0, timeout_max=0.5), 9),
|
||||
(WorkerRule(timeout_min=1.5, timeout_max=2.5), 1),
|
||||
(WorkerRule(fails=True, timeout_min=1.5, timeout_max=2.5), 20),
|
||||
],
|
||||
seed=123)
|
||||
|
||||
factory = AllAtOnceFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=1, threadpool_size=30)
|
||||
join_worker_pool(pool)
|
||||
|
||||
t_start = time.monotonic()
|
||||
pool.start()
|
||||
with pytest.raises(WorkerPool.TimedOut):
|
||||
successes = pool.block_until_target_successes()
|
||||
t_end = time.monotonic()
|
||||
|
||||
# Even though timeout is 1, there are long-running workers which we can't interupt.
|
||||
assert t_end - t_start < 3
|
||||
|
||||
|
||||
def test_join(join_worker_pool):
|
||||
"""
|
||||
Test joining the pool.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[
|
||||
(WorkerRule(timeout_min=0.5, timeout_max=1.5), 9),
|
||||
(WorkerRule(fails=True, timeout_min=0.5, timeout_max=1.5), 20),
|
||||
],
|
||||
seed=123)
|
||||
|
||||
factory = AllAtOnceFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=1, threadpool_size=30)
|
||||
join_worker_pool(pool)
|
||||
|
||||
t_start = time.monotonic()
|
||||
pool.start()
|
||||
pool.join()
|
||||
t_end = time.monotonic()
|
||||
|
||||
pool.join() # should work the second time too
|
||||
|
||||
# Even though timeout is 1, there are long-running workers which we can't interupt.
|
||||
assert t_end - t_start < 3
|
||||
|
||||
|
||||
class BatchFactory:
|
||||
|
||||
def __init__(self, values):
|
||||
self.values = values
|
||||
self.batch_sizes = []
|
||||
|
||||
def __call__(self, successes):
|
||||
if successes == 10:
|
||||
return None
|
||||
batch_size = 10 - successes
|
||||
if len(self.values) >= batch_size:
|
||||
batch = self.values[:batch_size]
|
||||
self.batch_sizes.append(len(batch))
|
||||
self.values = self.values[batch_size:]
|
||||
return batch
|
||||
elif len(self.values) > 0:
|
||||
self.batch_sizes.append(len(self.values))
|
||||
return self.values
|
||||
self.values = None
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def test_batched_value_generation(join_worker_pool):
|
||||
"""
|
||||
Tests a value factory that gives out value batches in portions.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[
|
||||
(WorkerRule(timeout_min=0.5, timeout_max=1.5), 80),
|
||||
(WorkerRule(fails=True, timeout_min=0.5, timeout_max=1.5), 80),
|
||||
],
|
||||
seed=123)
|
||||
|
||||
factory = BatchFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10, stagger_timeout=0.5)
|
||||
join_worker_pool(pool)
|
||||
|
||||
t_start = time.monotonic()
|
||||
pool.start()
|
||||
successes = pool.block_until_target_successes()
|
||||
pool.cancel()
|
||||
pool.join()
|
||||
t_end = time.monotonic()
|
||||
|
||||
assert len(successes) == 10
|
||||
|
||||
# Check that batch sizes in the factory were getting progressively smaller
|
||||
# as the number of successes grew.
|
||||
assert all(factory.batch_sizes[i] >= factory.batch_sizes[i+1]
|
||||
for i in range(len(factory.batch_sizes) - 1))
|
||||
|
||||
# Since we canceled the pool, no more workers will be started and we will finish faster
|
||||
assert t_end - t_start < 4
|
||||
|
||||
successes_copy = pool.get_successes()
|
||||
failures_copy = pool.get_failures()
|
||||
|
||||
assert all(value in successes_copy for value in successes)
|
||||
|
||||
|
||||
def test_cancel_waiting_workers(join_worker_pool):
|
||||
"""
|
||||
If we have a small pool and many workers, it is possible for workers to be enqueued
|
||||
one after another in one thread.
|
||||
We test that if we call `cancel()`, these enqueued workers are cancelled too.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[
|
||||
(WorkerRule(timeout_min=1, timeout_max=1), 100),
|
||||
],
|
||||
seed=123)
|
||||
|
||||
factory = AllAtOnceFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10)
|
||||
join_worker_pool(pool)
|
||||
|
||||
t_start = time.monotonic()
|
||||
pool.start()
|
||||
pool.block_until_target_successes()
|
||||
pool.cancel()
|
||||
pool.join()
|
||||
t_end = time.monotonic()
|
||||
|
||||
# We have 10 threads in the pool and 100 workers that are all enqueued at once at the start.
|
||||
# If we didn't check for the cancel condition, we would have to wait for 10 seconds.
|
||||
# We get 10 successes after 1s and cancel the workers,
|
||||
# but the next workers in each thread have already started, so we have to wait for another 1s.
|
||||
assert t_end - t_start < 2.5
|
||||
|
||||
|
||||
class BuggyFactory:
|
||||
|
||||
def __init__(self, values):
|
||||
self.values = values
|
||||
|
||||
def __call__(self, successes):
|
||||
if self.values is not None:
|
||||
values = self.values
|
||||
self.values = None
|
||||
return values
|
||||
else:
|
||||
raise Exception("Buggy factory")
|
||||
|
||||
|
||||
def test_buggy_factory_raises_on_block(join_worker_pool):
|
||||
"""
|
||||
Tests that if there is an exception thrown in the value factory,
|
||||
it is caught in the first call to `block_until_target_successes()`.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[(WorkerRule(timeout_min=1, timeout_max=1), 100)],
|
||||
seed=123)
|
||||
|
||||
factory = BuggyFactory(list(outcomes))
|
||||
|
||||
# Non-zero stagger timeout to make BuggyFactory raise its error only in 1.5s,
|
||||
# So that we got enough successes for `block_until_target_successes()`.
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10, stagger_timeout=1.5)
|
||||
join_worker_pool(pool)
|
||||
|
||||
pool.start()
|
||||
time.sleep(2) # wait for the stagger timeout to finish
|
||||
with pytest.raises(RuntimeError, match="Unexpected error in the producer thread"):
|
||||
pool.block_until_target_successes()
|
||||
# Further calls to `block_until_target_successes()` or `join()` don't throw the error.
|
||||
pool.block_until_target_successes()
|
||||
pool.cancel()
|
||||
pool.join()
|
||||
|
||||
|
||||
def test_buggy_factory_raises_on_join(join_worker_pool):
|
||||
"""
|
||||
Tests that if there is an exception thrown in the value factory,
|
||||
it is caught in the first call to `join()`.
|
||||
"""
|
||||
|
||||
outcomes, worker = generate_workers(
|
||||
[(WorkerRule(timeout_min=1, timeout_max=1), 100)],
|
||||
seed=123)
|
||||
|
||||
factory = BuggyFactory(list(outcomes))
|
||||
pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10)
|
||||
join_worker_pool(pool)
|
||||
|
||||
pool.start()
|
||||
pool.cancel()
|
||||
with pytest.raises(RuntimeError, match="Unexpected error in the producer thread"):
|
||||
pool.join()
|
||||
pool.join()
|
|
@ -238,13 +238,12 @@ def enacted_federated_policy(idle_federated_policy, federated_ursulas):
|
|||
# Alice has a policy in mind and knows of enough qualifies Ursulas; she crafts an offer for them.
|
||||
network_middleware = MockRestMiddleware()
|
||||
|
||||
idle_federated_policy.make_arrangements(network_middleware, handpicked_ursulas=federated_ursulas)
|
||||
|
||||
# REST call happens here, as does population of TreasureMap.
|
||||
idle_federated_policy.enact(network_middleware)
|
||||
idle_federated_policy.publishing_mutex.block_until_complete()
|
||||
enacted_policy = idle_federated_policy.enact(network_middleware=network_middleware,
|
||||
handpicked_ursulas=federated_ursulas)
|
||||
enacted_policy.treasure_map_publisher.block_until_complete()
|
||||
|
||||
return idle_federated_policy
|
||||
return enacted_policy
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
|
@ -276,12 +275,11 @@ def enacted_blockchain_policy(idle_blockchain_policy, blockchain_ursulas):
|
|||
# contract_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
network_middleware = MockRestMiddleware()
|
||||
|
||||
idle_blockchain_policy.make_arrangements(
|
||||
network_middleware, handpicked_ursulas=list(blockchain_ursulas))
|
||||
|
||||
idle_blockchain_policy.enact(network_middleware) # REST call happens here, as does population of TreasureMap.
|
||||
idle_blockchain_policy.publishing_mutex.block_until_complete()
|
||||
return idle_blockchain_policy
|
||||
# REST call happens here, as does population of TreasureMap.
|
||||
enacted_policy = idle_blockchain_policy.enact(network_middleware=network_middleware,
|
||||
handpicked_ursulas=list(blockchain_ursulas))
|
||||
enacted_policy.treasure_map_publisher.block_until_complete()
|
||||
return enacted_policy
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
|
|
|
@ -121,7 +121,7 @@ def join_control_request(federated_bob, enacted_federated_policy):
|
|||
|
||||
params = {
|
||||
'label': enacted_federated_policy.label.decode(),
|
||||
'alice_verifying_key': bytes(enacted_federated_policy.alice.stamp).hex(),
|
||||
'alice_verifying_key': bytes(enacted_federated_policy.alice_verifying_key).hex(),
|
||||
}
|
||||
return method_name, params
|
||||
|
||||
|
@ -134,7 +134,7 @@ def retrieve_control_request(federated_bob, enacted_federated_policy, capsule_si
|
|||
params = {
|
||||
'label': enacted_federated_policy.label.decode(),
|
||||
'policy_encrypting_key': bytes(enacted_federated_policy.public_key).hex(),
|
||||
'alice_verifying_key': bytes(enacted_federated_policy.alice.stamp).hex(),
|
||||
'alice_verifying_key': bytes(enacted_federated_policy.alice_verifying_key).hex(),
|
||||
'message_kit': b64encode(message_kit.to_bytes()).decode(),
|
||||
}
|
||||
return method_name, params
|
||||
|
|
|
@ -72,10 +72,16 @@ def test_alice_rpc_character_control_grant(alice_rpc_test_client, grant_control_
|
|||
assert 'jsonrpc' in response.data
|
||||
|
||||
|
||||
def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_federated_policy):
|
||||
def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_federated_policy, federated_bob, federated_ursulas):
|
||||
|
||||
# Simulate passing in a teacher-uri
|
||||
enacted_federated_policy.bob.remember_node(list(enacted_federated_policy.accepted_ursulas)[0])
|
||||
for ursula in federated_ursulas:
|
||||
if ursula.checksum_address in enacted_federated_policy.treasure_map.destinations:
|
||||
# Simulate passing in a teacher-uri
|
||||
federated_bob.remember_node(ursula)
|
||||
break
|
||||
else:
|
||||
# Shouldn't happen
|
||||
raise Exception("No known Ursulas present in the treasure map destinations")
|
||||
|
||||
method_name, params = join_control_request
|
||||
request_data = {'method': method_name, 'params': params}
|
||||
|
|
|
@ -168,14 +168,20 @@ def test_alice_character_control_decrypt(alice_web_controller_test_client,
|
|||
assert response.status_code == 405
|
||||
|
||||
|
||||
def test_bob_character_control_join_policy(bob_web_controller_test_client, enacted_federated_policy):
|
||||
def test_bob_character_control_join_policy(bob_web_controller_test_client, federated_bob, federated_ursulas, enacted_federated_policy):
|
||||
request_data = {
|
||||
'label': enacted_federated_policy.label.decode(),
|
||||
'alice_verifying_key': bytes(enacted_federated_policy.alice.stamp).hex(),
|
||||
'alice_verifying_key': bytes(enacted_federated_policy.alice_verifying_key).hex(),
|
||||
}
|
||||
|
||||
# Simulate passing in a teacher-uri
|
||||
enacted_federated_policy.bob.remember_node(list(enacted_federated_policy.accepted_ursulas)[0])
|
||||
for ursula in federated_ursulas:
|
||||
if ursula.checksum_address in enacted_federated_policy.treasure_map.destinations:
|
||||
# Simulate passing in a teacher-uri
|
||||
federated_bob.remember_node(ursula)
|
||||
break
|
||||
else:
|
||||
# Shouldn't happen
|
||||
raise Exception("No known Ursulas present in the treasure map destinations")
|
||||
|
||||
response = bob_web_controller_test_client.post('/join_policy', data=json.dumps(request_data))
|
||||
assert b'{"result": {"policy_encrypting_key": "OK"}' in response.data # TODO
|
||||
|
|
|
@ -31,7 +31,7 @@ from tests.utils.middleware import MockRestMiddleware, NodeIsDownMiddleware
|
|||
|
||||
def test_bob_cannot_follow_the_treasure_map_in_isolation(enacted_federated_policy, federated_bob):
|
||||
# Assume for the moment that Bob has already received a TreasureMap, perhaps via a side channel.
|
||||
hrac, treasure_map = enacted_federated_policy.hrac(), enacted_federated_policy.treasure_map
|
||||
hrac, treasure_map = enacted_federated_policy.hrac, enacted_federated_policy.treasure_map
|
||||
|
||||
# Bob knows of no Ursulas.
|
||||
assert len(federated_bob.known_nodes) == 0
|
||||
|
@ -88,7 +88,7 @@ def test_bob_can_follow_treasure_map_even_if_he_only_knows_of_one_node(enacted_f
|
|||
federated_only=True)
|
||||
|
||||
# Again, let's assume that he received the TreasureMap via a side channel.
|
||||
hrac, treasure_map = enacted_federated_policy.hrac(), enacted_federated_policy.treasure_map
|
||||
hrac, treasure_map = enacted_federated_policy.hrac, enacted_federated_policy.treasure_map
|
||||
|
||||
# Now, let's create a scenario in which Bob knows of only one node.
|
||||
assert len(bob.known_nodes) == 0
|
||||
|
@ -134,7 +134,7 @@ def test_bob_can_issue_a_work_order_to_a_specific_ursula(enacted_federated_polic
|
|||
"""
|
||||
|
||||
# We pick up our story with Bob already having followed the treasure map above, ie:
|
||||
hrac, treasure_map = enacted_federated_policy.hrac(), enacted_federated_policy.treasure_map
|
||||
hrac, treasure_map = enacted_federated_policy.hrac, enacted_federated_policy.treasure_map
|
||||
federated_bob.start_learning_loop()
|
||||
|
||||
federated_bob.follow_treasure_map(treasure_map=treasure_map, block=True, timeout=1)
|
||||
|
|
|
@ -90,7 +90,6 @@ def test_bob_joins_policy_and_retrieves(federated_alice,
|
|||
handpicked_ursulas=set(rest_of_ursulas),
|
||||
)
|
||||
|
||||
assert bob == policy.bob
|
||||
assert label == policy.label
|
||||
|
||||
try:
|
||||
|
@ -174,7 +173,7 @@ def test_bob_joins_policy_and_retrieves(federated_alice,
|
|||
bob.disenchant()
|
||||
|
||||
|
||||
def test_treasure_map_serialization(enacted_federated_policy, federated_bob):
|
||||
def test_treasure_map_serialization(enacted_federated_policy, federated_alice, federated_bob):
|
||||
treasure_map = enacted_federated_policy.treasure_map
|
||||
assert treasure_map.m is not None
|
||||
assert treasure_map.m != NO_DECRYPTION_PERFORMED
|
||||
|
@ -191,8 +190,7 @@ def test_treasure_map_serialization(enacted_federated_policy, federated_bob):
|
|||
with pytest.raises(TypeError):
|
||||
deserialized_map.destinations
|
||||
|
||||
compass = federated_bob.make_compass_for_alice(
|
||||
enacted_federated_policy.alice)
|
||||
compass = federated_bob.make_compass_for_alice(federated_alice)
|
||||
deserialized_map.orient(compass)
|
||||
assert deserialized_map.m == treasure_map.m
|
||||
assert deserialized_map.destinations == treasure_map.destinations
|
||||
|
@ -204,7 +202,7 @@ def test_bob_retrieves_with_treasure_map(
|
|||
enrico = capsule_side_channel.enrico
|
||||
message_kit = capsule_side_channel()
|
||||
treasure_map = enacted_federated_policy.treasure_map
|
||||
alice_verifying_key = enacted_federated_policy.alice.stamp
|
||||
alice_verifying_key = enacted_federated_policy.alice_verifying_key
|
||||
|
||||
# Teach Bob about the network
|
||||
federated_bob.remember_node(list(federated_ursulas)[0])
|
||||
|
@ -246,7 +244,7 @@ def test_bob_retrieves_too_late(federated_bob, federated_ursulas,
|
|||
enrico = capsule_side_channel.enrico
|
||||
message_kit = capsule_side_channel()
|
||||
treasure_map = enacted_federated_policy.treasure_map
|
||||
alice_verifying_key = enacted_federated_policy.alice.stamp
|
||||
alice_verifying_key = enacted_federated_policy.alice_verifying_key
|
||||
|
||||
with pytest.raises(Ursula.NotEnoughUrsulas):
|
||||
federated_bob.retrieve(
|
||||
|
|
|
@ -26,8 +26,7 @@ from nucypher.datastore.models import PolicyArrangement
|
|||
from nucypher.policy.collections import Revocation
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('federated_ursulas')
|
||||
def test_federated_grant(federated_alice, federated_bob):
|
||||
def test_federated_grant(federated_alice, federated_bob, federated_ursulas):
|
||||
# Setup the policy details
|
||||
m, n = 2, 3
|
||||
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
|
||||
|
@ -37,27 +36,25 @@ def test_federated_grant(federated_alice, federated_bob):
|
|||
policy = federated_alice.grant(federated_bob, label, m=m, n=n, expiration=policy_end_datetime)
|
||||
|
||||
# Check the policy ID
|
||||
policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp))
|
||||
policy_id = keccak_digest(policy.label + bytes(federated_bob.stamp))
|
||||
assert policy_id == policy.id
|
||||
|
||||
# Check Alice's active policies
|
||||
assert policy_id in federated_alice.active_policies
|
||||
assert federated_alice.active_policies[policy_id] == policy
|
||||
|
||||
# The number of accepted arrangements at least the number of Ursulas we're using (n)
|
||||
assert len(policy._accepted_arrangements) >= n
|
||||
|
||||
# The number of actually enacted arrangements is exactly equal to n.
|
||||
assert len(policy._enacted_arrangements) == n
|
||||
assert len(policy.treasure_map.destinations) == n
|
||||
|
||||
# Let's look at the enacted arrangements.
|
||||
for kfrag in policy.kfrags:
|
||||
arrangement = policy._enacted_arrangements[kfrag]
|
||||
for ursula in federated_ursulas:
|
||||
if ursula.checksum_address in policy.treasure_map.destinations:
|
||||
arrangement_id = policy.treasure_map.destinations[ursula.checksum_address]
|
||||
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
with arrangement.ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement:
|
||||
retrieved_kfrag = policy_arrangement.kfrag
|
||||
assert kfrag == retrieved_kfrag
|
||||
# Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID.
|
||||
with ursula.datastore.describe(PolicyArrangement, arrangement_id.hex()) as policy_arrangement:
|
||||
retrieved_kfrag = policy_arrangement.kfrag
|
||||
assert bool(retrieved_kfrag) # TODO: try to assemble them back?
|
||||
|
||||
|
||||
def test_federated_alice_can_decrypt(federated_alice, federated_bob):
|
||||
|
|
|
@ -53,7 +53,7 @@ Node Discovery happens in phases. The first step is for a network actor to lear
|
|||
This is a straightforward step which we currently do with our own logic, but which may someday be replaced by something
|
||||
like libp2p, depending on the course of development of those sorts of tools. The introduction of hamming distance
|
||||
in particular is useful when wanting to learn about a small number (~500) of nodes among a much larger (25,000+) swarm.
|
||||
This toolchain is not built for that scenario at this time, although it is not a stated nongoal.
|
||||
This toolchain is not built for that scenario at this time, although it is not a stated nongoal.
|
||||
|
||||
After this, our "Learning Loop" does four other things in sequence which are not part of the offering of node discovery tooling alone:
|
||||
|
||||
|
@ -134,11 +134,13 @@ def test_alice_verifies_ursula_just_in_time(fleet_of_highperf_mocked_ursulas,
|
|||
publish_treasure_map=False)
|
||||
# TODO: Make some assertions about policy.
|
||||
total_verified = sum(node.verified_node for node in highperf_mocked_alice.known_nodes)
|
||||
assert total_verified == 30
|
||||
# Alice may be able to verify more than `n`, but certainly not less,
|
||||
# otherwise `grant()` would fail.
|
||||
assert total_verified >= 30
|
||||
_POLICY_PRESERVER.append(policy)
|
||||
|
||||
|
||||
# @pytest_twisted.inlineCallbacks # TODO: Why does this, in concert with yield policy.publishing_mutex.when_complete, hang?
|
||||
# @pytest_twisted.inlineCallbacks # TODO: Why does this, in concert with yield policy.treasure_map_publisher.when_complete, hang?
|
||||
def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas,
|
||||
highperf_mocked_alice,
|
||||
highperf_mocked_bob):
|
||||
|
@ -179,39 +181,39 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas,
|
|||
|
||||
# defer.setDebugging(False) # Debugging messes up the timing here; comment this line out if you actually need it.
|
||||
|
||||
policy.publish_treasure_map(network_middleware=highperf_mocked_alice.network_middleware) # returns quickly.
|
||||
policy.publish_treasure_map() # returns quickly.
|
||||
|
||||
# defer.setDebugging(True)
|
||||
|
||||
# PART II: We block for a little while to ensure that the distribution is going well.
|
||||
nodes_that_have_the_map_when_we_unblock = policy.publishing_mutex.block_until_success_is_reasonably_likely()
|
||||
nodes_that_have_the_map_when_we_unblock = policy.treasure_map_publisher.block_until_success_is_reasonably_likely()
|
||||
little_while_ended_at = datetime.now()
|
||||
|
||||
# The number of nodes having the map is at least the minimum to have unblocked.
|
||||
assert len(nodes_that_have_the_map_when_we_unblock) >= policy.publishing_mutex._block_until_this_many_are_complete
|
||||
assert len(nodes_that_have_the_map_when_we_unblock) >= policy.treasure_map_publisher._block_until_this_many_are_complete
|
||||
|
||||
# The number of nodes having the map is approximately the number you'd expect from full utilization of Alice's publication threadpool.
|
||||
# TODO: This line fails sometimes because the loop goes too fast.
|
||||
# assert len(nodes_that_have_the_map_when_we_unblock) == pytest.approx(policy.publishing_mutex._block_until_this_many_are_complete, .2)
|
||||
# assert len(nodes_that_have_the_map_when_we_unblock) == pytest.approx(policy.treasure_map_publisher._block_until_this_many_are_complete, .2)
|
||||
|
||||
# PART III: Having made proper assertions about the publication call and the first block, we allow the rest to
|
||||
# happen in the background and then ensure that each phase was timely.
|
||||
|
||||
# This will block until the distribution is complete.
|
||||
policy.publishing_mutex.block_until_complete()
|
||||
policy.treasure_map_publisher.block_until_complete()
|
||||
complete_distribution_time = datetime.now() - started
|
||||
partial_blocking_duration = little_while_ended_at - started
|
||||
# Before Treasure Island (1741), this process took about 3 minutes.
|
||||
if partial_blocking_duration.total_seconds() > 10:
|
||||
pytest.fail(
|
||||
f"Took too long ({partial_blocking_duration}) to contact {len(policy.publishing_mutex.nodes_contacted_during_partial_block)} nodes ({complete_distribution_time} total.)")
|
||||
f"Took too long ({partial_blocking_duration}) to contact {len(nodes_that_have_the_map_when_we_unblock)} nodes ({complete_distribution_time} total.)")
|
||||
|
||||
# TODO: Assert that no nodes outside those expected received the map.
|
||||
assert complete_distribution_time.total_seconds() < 20
|
||||
# But with debuggers and other processes running on laptops, we give a little leeway.
|
||||
|
||||
# We have the same number of successful responses as nodes we expected to have the map.
|
||||
assert len(policy.publishing_mutex.completed) == len(nodes_we_expect_to_have_the_map)
|
||||
assert len(policy.treasure_map_publisher.completed) == len(nodes_we_expect_to_have_the_map)
|
||||
nodes_that_got_the_map = sum(
|
||||
u._its_down_there_somewhere_let_me_take_another_look is True for u in nodes_we_expect_to_have_the_map)
|
||||
assert nodes_that_got_the_map == len(nodes_we_expect_to_have_the_map)
|
||||
|
|
|
@ -69,7 +69,7 @@ def test_alice_can_grant_even_when_the_first_nodes_she_tries_are_down(federated_
|
|||
|
||||
for node in more_nodes:
|
||||
federated_alice.remember_node(node)
|
||||
with pytest.raises(Policy.Rejected):
|
||||
with pytest.raises(Policy.NotEnoughUrsulas):
|
||||
alice_grant_action()
|
||||
|
||||
# Now let's let a few of them come up.
|
||||
|
@ -87,11 +87,8 @@ def test_alice_can_grant_even_when_the_first_nodes_she_tries_are_down(federated_
|
|||
# are among those still down.
|
||||
# policy = alice_grant_action()
|
||||
|
||||
# The number of accepted arrangements at least the number of Ursulas we're using (n)
|
||||
assert len(policy._accepted_arrangements) >= n
|
||||
|
||||
# The number of actually enacted arrangements is exactly equal to n.
|
||||
assert len(policy._enacted_arrangements) == n
|
||||
assert len(policy.treasure_map.destinations) == n
|
||||
|
||||
|
||||
def test_node_has_changed_cert(federated_alice, federated_ursulas):
|
||||
|
|
|
@ -27,16 +27,16 @@ from nucypher.datastore.models import PolicyArrangement
|
|||
from tests.utils.ursula import make_federated_ursulas
|
||||
|
||||
|
||||
def test_alice_enacts_policies_in_policy_group_via_rest(enacted_federated_policy):
|
||||
def test_alice_enacts_policies_in_policy_group_via_rest(enacted_federated_policy, federated_ursulas):
|
||||
"""
|
||||
Now that Alice has made a PolicyGroup, she can enact its policies, using Ursula's Public Key to encrypt each offer
|
||||
and transmitting them via REST.
|
||||
"""
|
||||
arrangement = list(enacted_federated_policy._accepted_arrangements)[0]
|
||||
ursula = arrangement.ursula
|
||||
with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement:
|
||||
the_kfrag = policy_arrangement.kfrag
|
||||
assert bool(the_kfrag) # TODO: This can be a more poignant assertion.
|
||||
for ursula in federated_ursulas:
|
||||
arrangement_id = enacted_federated_policy.treasure_map.destinations[ursula.checksum_address]
|
||||
with ursula.datastore.describe(PolicyArrangement, arrangement_id.hex()) as policy_arrangement:
|
||||
the_kfrag = policy_arrangement.kfrag
|
||||
assert bool(the_kfrag) # TODO: This can be a more poignant assertion.
|
||||
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
|
|
|
@ -24,26 +24,22 @@ from nucypher.policy.collections import TreasureMap as FederatedTreasureMap
|
|||
from tests.utils.middleware import MockRestMiddleware
|
||||
|
||||
|
||||
def test_alice_creates_policy_with_correct_hrac(idle_federated_policy):
|
||||
def test_alice_creates_policy_with_correct_hrac(federated_alice, federated_bob, idle_federated_policy):
|
||||
"""
|
||||
Alice creates a Policy. It has the proper HRAC, unique per her, Bob, and the label
|
||||
"""
|
||||
alice = idle_federated_policy.alice
|
||||
bob = idle_federated_policy.bob
|
||||
|
||||
assert idle_federated_policy.hrac() == keccak_digest(bytes(alice.stamp)
|
||||
+ bytes(bob.stamp)
|
||||
+ idle_federated_policy.label)[:16]
|
||||
assert idle_federated_policy.hrac == keccak_digest(bytes(federated_alice.stamp)
|
||||
+ bytes(federated_bob.stamp)
|
||||
+ idle_federated_policy.label)[:16]
|
||||
|
||||
|
||||
def test_alice_sets_treasure_map(enacted_federated_policy):
|
||||
def test_alice_sets_treasure_map(federated_alice, federated_bob, enacted_federated_policy):
|
||||
"""
|
||||
Having enacted all the policies of a PolicyGroup, Alice creates a TreasureMap and ...... TODO
|
||||
"""
|
||||
enacted_federated_policy.publish_treasure_map(network_middleware=MockRestMiddleware())
|
||||
treasure_map_id = enacted_federated_policy.treasure_map.public_id()
|
||||
found = 0
|
||||
for node in enacted_federated_policy.bob.matching_nodes_among(enacted_federated_policy.alice.known_nodes):
|
||||
for node in federated_bob.matching_nodes_among(federated_alice.known_nodes):
|
||||
with node.datastore.describe(DatastoreTreasureMap, treasure_map_id) as treasure_map_on_node:
|
||||
assert FederatedTreasureMap.from_bytes(treasure_map_on_node.treasure_map) == enacted_federated_policy.treasure_map
|
||||
found += 1
|
||||
|
@ -62,18 +58,18 @@ def test_treasure_map_stored_by_ursula_is_the_correct_one_for_bob(federated_alic
|
|||
treasure_map_on_network = FederatedTreasureMap.from_bytes(treasure_map_record.treasure_map)
|
||||
|
||||
hrac_by_bob = federated_bob.construct_policy_hrac(federated_alice.stamp, enacted_federated_policy.label)
|
||||
assert enacted_federated_policy.hrac() == hrac_by_bob
|
||||
assert enacted_federated_policy.hrac == hrac_by_bob
|
||||
|
||||
map_id_by_bob = federated_bob.construct_map_id(federated_alice.stamp, enacted_federated_policy.label)
|
||||
assert map_id_by_bob == treasure_map_on_network.public_id()
|
||||
|
||||
|
||||
def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(enacted_federated_policy):
|
||||
def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(federated_alice, federated_bob, enacted_federated_policy):
|
||||
"""
|
||||
Above, we showed that the TreasureMap saved on the network is the correct one for Bob. Here, we show
|
||||
that Bob can retrieve it with only the information about which he is privy pursuant to the PolicyGroup.
|
||||
"""
|
||||
bob = enacted_federated_policy.bob
|
||||
bob = federated_bob
|
||||
_previous_domain = bob.domain
|
||||
bob.domain = None # Bob has no knowledge of the network.
|
||||
|
||||
|
@ -82,7 +78,7 @@ def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(enacted_federated_poli
|
|||
|
||||
# If Bob doesn't know about any Ursulas, he can't find the TreasureMap via the REST swarm:
|
||||
with pytest.raises(bob.NotEnoughTeachers):
|
||||
treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp,
|
||||
treasure_map_from_wire = bob.get_treasure_map(federated_alice.stamp,
|
||||
enacted_federated_policy.label)
|
||||
|
||||
|
||||
|
@ -94,18 +90,18 @@ def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(enacted_federated_poli
|
|||
bob.learn_from_teacher_node(eager=True)
|
||||
|
||||
# Now he'll have better success finding that map.
|
||||
treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp,
|
||||
treasure_map_from_wire = bob.get_treasure_map(federated_alice.stamp,
|
||||
enacted_federated_policy.label)
|
||||
|
||||
assert enacted_federated_policy.treasure_map == treasure_map_from_wire
|
||||
|
||||
|
||||
def test_treasure_map_is_legit(enacted_federated_policy):
|
||||
def test_treasure_map_is_legit(federated_bob, enacted_federated_policy):
|
||||
"""
|
||||
Sure, the TreasureMap can get to Bob, but we also need to know that each Ursula in the TreasureMap is on the network.
|
||||
"""
|
||||
for ursula_address, _node_id in enacted_federated_policy.treasure_map:
|
||||
if ursula_address not in enacted_federated_policy.bob.known_nodes.addresses():
|
||||
if ursula_address not in federated_bob.known_nodes.addresses():
|
||||
pytest.fail(f"Bob didn't know about {ursula_address}")
|
||||
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
from pathlib import Path
|
||||
|
||||
"""
|
||||
WARNING: This script makes automatic transactions.
|
||||
Do not use this script unless you know what you
|
||||
WARNING: This script makes automatic transactions.
|
||||
Do not use this script unless you know what you
|
||||
are doing and intend to spend ETH measuring live
|
||||
policy availability.
|
||||
"""
|
||||
|
@ -136,7 +136,7 @@ def collect(alice: Alice,
|
|||
else:
|
||||
success += 1
|
||||
policies[policy.public_key.hex()] = policy # track
|
||||
print(f"PEK:{policy.public_key.hex()} | HRAC {policy.hrac().hex()}")
|
||||
print(f"PEK:{policy.public_key.hex()} | HRAC {policy.hrac.hex()}")
|
||||
|
||||
# timeit
|
||||
end = maya.now()
|
||||
|
|
|
@ -15,71 +15,8 @@ You should have received a copy of the GNU Affero General Public License
|
|||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
|
||||
import random
|
||||
from collections import OrderedDict
|
||||
|
||||
import maya
|
||||
import os
|
||||
from typing import Set
|
||||
|
||||
from nucypher.characters.lawful import Ursula
|
||||
from nucypher.network.middleware import RestMiddleware
|
||||
from nucypher.policy.policies import Arrangement, Policy
|
||||
|
||||
|
||||
class MockArrangement(Arrangement):
|
||||
_arrangements = OrderedDict()
|
||||
|
||||
def publish_treasure_map(self) -> None:
|
||||
self._arrangements[self.id()] = self
|
||||
|
||||
def revoke(self):
|
||||
del self._arrangements[self.id()]
|
||||
|
||||
|
||||
class MockPolicy(Policy):
|
||||
def make_arrangements(self,
|
||||
network_middleware: RestMiddleware,
|
||||
deposit: int,
|
||||
expiration: maya.MayaDT,
|
||||
ursulas: Set[Ursula] = None
|
||||
) -> None:
|
||||
"""
|
||||
Create and consider n Arangement objects from all known nodes.
|
||||
"""
|
||||
|
||||
for ursula in self.alice.known_nodes:
|
||||
arrangement = MockArrangement(alice=self.alice, ursula=ursula,
|
||||
hrac=self.hrac(),
|
||||
expiration=expiration)
|
||||
|
||||
self.propose_arrangement(network_middleware=network_middleware,
|
||||
ursula=ursula,
|
||||
arrangement=arrangement)
|
||||
|
||||
# TODO: Remove. Seems unused
|
||||
class MockPolicyCreation:
|
||||
"""
|
||||
Simple mock logic to avoid repeated hammering of blockchain policies.
|
||||
"""
|
||||
waited_for_receipt = False
|
||||
_ether_address = None
|
||||
tx_hash = "THIS HAS BEEN A TRANSACTION!"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# TODO: Test that proper arguments are passed here once 316 is closed.
|
||||
pass
|
||||
|
||||
def transact(self, payload):
|
||||
# TODO: Make a meaningful assertion regarding the value.
|
||||
assert payload['from'] == self._ether_address
|
||||
return self.tx_hash
|
||||
|
||||
@classmethod
|
||||
def wait_for_receipt(cls, tx_hash):
|
||||
assert tx_hash == cls.tx_hash
|
||||
cls.waited_for_receipt = True
|
||||
|
||||
|
||||
def generate_random_label() -> bytes:
|
||||
|
|
Loading…
Reference in New Issue