From 13a6ab83753c20839b26cb6008735f1039fb9de3 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Thu, 17 Dec 2020 21:58:20 -0800 Subject: [PATCH 1/9] Add a newsfragment for #2482 --- newsfragments/2479.bugfix.rst | 1 + newsfragments/2482.feature.rst | 1 + 2 files changed, 2 insertions(+) create mode 100644 newsfragments/2479.bugfix.rst create mode 100644 newsfragments/2482.feature.rst diff --git a/newsfragments/2479.bugfix.rst b/newsfragments/2479.bugfix.rst new file mode 100644 index 000000000..8fb916bd2 --- /dev/null +++ b/newsfragments/2479.bugfix.rst @@ -0,0 +1 @@ +More logging added for arrangement proposal failures, and more suitable exceptions thrown. diff --git a/newsfragments/2482.feature.rst b/newsfragments/2482.feature.rst new file mode 100644 index 000000000..2417e26d3 --- /dev/null +++ b/newsfragments/2482.feature.rst @@ -0,0 +1 @@ +Arrangement proposals and policy enactment are performed in parallel, with more nodes being considered as some of the requests fail. This improves granting reliability. From e81f2855178ac33215ae127bfd074b3723dda73a Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Thu, 17 Dec 2020 21:16:03 -0800 Subject: [PATCH 2/9] Make WeightedSampler behave correctly in case of several consecutive draws --- nucypher/blockchain/eth/agents.py | 18 ++++++++++-------- .../agents/test_sampling_distribution.py | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/nucypher/blockchain/eth/agents.py b/nucypher/blockchain/eth/agents.py index e1ebddbae..89faa371b 100644 --- a/nucypher/blockchain/eth/agents.py +++ b/nucypher/blockchain/eth/agents.py @@ -1763,6 +1763,7 @@ class WeightedSampler: elements, weights = zip(*weighted_elements.items()) self.totals = list(accumulate(weights)) self.elements = elements + self.__length = len(self.totals) def sample_no_replacement(self, rng, quantity: int) -> list: """ @@ -1780,25 +1781,26 @@ class WeightedSampler: if quantity > len(self): raise ValueError("Cannot sample more than the total amount of elements without replacement") - totals = self.totals.copy() samples = [] for i in range(quantity): - position = rng.randint(0, totals[-1] - 1) - idx = bisect_right(totals, position) + position = rng.randint(0, self.totals[-1] - 1) + idx = bisect_right(self.totals, position) samples.append(self.elements[idx]) # Adjust the totals so that they correspond # to the weight of the element `idx` being set to 0. - prev_total = totals[idx - 1] if idx > 0 else 0 - weight = totals[idx] - prev_total - for j in range(idx, len(totals)): - totals[j] -= weight + prev_total = self.totals[idx - 1] if idx > 0 else 0 + weight = self.totals[idx] - prev_total + for j in range(idx, len(self.totals)): + self.totals[j] -= weight + + self.__length -= quantity return samples def __len__(self): - return len(self.totals) + return self.__length class StakersReservoir: diff --git a/tests/acceptance/blockchain/agents/test_sampling_distribution.py b/tests/acceptance/blockchain/agents/test_sampling_distribution.py index 7bfac2f43..64d73b2e2 100644 --- a/tests/acceptance/blockchain/agents/test_sampling_distribution.py +++ b/tests/acceptance/blockchain/agents/test_sampling_distribution.py @@ -166,8 +166,8 @@ def test_weighted_sampler(sample_size): weighted_elements = {element: weight for element, weight in zip(elements, weights)} samples = 100000 - sampler = WeightedSampler(weighted_elements) for i in range(samples): + sampler = WeightedSampler(weighted_elements) sample_set = sampler.sample_no_replacement(rng, sample_size) counter.update({tuple(sample_set): 1}) From e635825c9ec5b48a9171da773d6e9a56b1011c75 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Wed, 23 Dec 2020 22:47:29 -0800 Subject: [PATCH 3/9] Remove unused mock classes --- tests/utils/policy.py | 63 ------------------------------------------- 1 file changed, 63 deletions(-) diff --git a/tests/utils/policy.py b/tests/utils/policy.py index 445fdbb4f..bdedff09b 100644 --- a/tests/utils/policy.py +++ b/tests/utils/policy.py @@ -15,71 +15,8 @@ You should have received a copy of the GNU Affero General Public License along with nucypher. If not, see . """ - import random -from collections import OrderedDict - -import maya import os -from typing import Set - -from nucypher.characters.lawful import Ursula -from nucypher.network.middleware import RestMiddleware -from nucypher.policy.policies import Arrangement, Policy - - -class MockArrangement(Arrangement): - _arrangements = OrderedDict() - - def publish_treasure_map(self) -> None: - self._arrangements[self.id()] = self - - def revoke(self): - del self._arrangements[self.id()] - - -class MockPolicy(Policy): - def make_arrangements(self, - network_middleware: RestMiddleware, - deposit: int, - expiration: maya.MayaDT, - ursulas: Set[Ursula] = None - ) -> None: - """ - Create and consider n Arangement objects from all known nodes. - """ - - for ursula in self.alice.known_nodes: - arrangement = MockArrangement(alice=self.alice, ursula=ursula, - hrac=self.hrac(), - expiration=expiration) - - self.propose_arrangement(network_middleware=network_middleware, - ursula=ursula, - arrangement=arrangement) - -# TODO: Remove. Seems unused -class MockPolicyCreation: - """ - Simple mock logic to avoid repeated hammering of blockchain policies. - """ - waited_for_receipt = False - _ether_address = None - tx_hash = "THIS HAS BEEN A TRANSACTION!" - - def __init__(self, *args, **kwargs): - # TODO: Test that proper arguments are passed here once 316 is closed. - pass - - def transact(self, payload): - # TODO: Make a meaningful assertion regarding the value. - assert payload['from'] == self._ether_address - return self.tx_hash - - @classmethod - def wait_for_receipt(cls, tx_hash): - assert tx_hash == cls.tx_hash - cls.waited_for_receipt = True def generate_random_label() -> bytes: From 24d5e3a5ec5963e5ed5f916eaa5bcefa87f4c7ca Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Thu, 24 Dec 2020 22:07:24 -0800 Subject: [PATCH 4/9] Remove PolicyCredential It isn't used anywhere --- nucypher/policy/identity.py | 56 ------------------- nucypher/policy/policies.py | 18 ------ .../characters/test_decentralized_grant.py | 25 --------- 3 files changed, 99 deletions(-) diff --git a/nucypher/policy/identity.py b/nucypher/policy/identity.py index fd850ddb2..5ba80e4f4 100644 --- a/nucypher/policy/identity.py +++ b/nucypher/policy/identity.py @@ -342,59 +342,3 @@ class Card: def delete(self) -> None: os.remove(str(self.filepath)) - - -class PolicyCredential: # TODO: Rename this. It is not a credential in any way. - """ - A portable structure that contains information necessary for Alice or Bob - to utilize the policy on the network that the credential describes. - """ - - def __init__(self, alice_verifying_key, label, expiration, policy_pubkey, - treasure_map=None): - self.alice_verifying_key = alice_verifying_key - self.label = label - self.expiration = expiration - self.policy_pubkey = policy_pubkey - self.treasure_map = treasure_map - - def to_json(self): - """ - Serializes the PolicyCredential to JSON. - """ - cred_dict = { - 'alice_verifying_key': bytes(self.alice_verifying_key).hex(), - 'label': self.label.hex(), - 'expiration': self.expiration.iso8601(), - 'policy_pubkey': bytes(self.policy_pubkey).hex() - } - - if self.treasure_map is not None: - cred_dict['treasure_map'] = bytes(self.treasure_map).hex() - - return json.dumps(cred_dict) - - @classmethod - def from_json(cls, data: str): - """Deserializes the PolicyCredential from JSON.""" - cred_json = json.loads(data) - alice_verifying_key = UmbralPublicKey.from_bytes(cred_json['alice_verifying_key'], decoder=bytes.fromhex) - label = bytes.fromhex(cred_json['label']) - expiration = maya.MayaDT.from_iso8601(cred_json['expiration']) - policy_pubkey = UmbralPublicKey.from_bytes(cred_json['policy_pubkey'], decoder=bytes.fromhex) - treasure_map = None - if 'treasure_map' in cred_json: - # TODO: Support unsigned treasuremaps? - treasure_map = SignedTreasureMap.from_bytes(bytes.fromhex(cred_json['treasure_map'])) - - return cls(alice_verifying_key, - label, - expiration, - policy_pubkey, - treasure_map) - - def __eq__(self, other): - return ((self.alice_verifying_key == other.alice_verifying_key) and - (self.label == other.label) and - (self.expiration == other.expiration) and - (self.policy_pubkey == other.policy_pubkey)) diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 290c7d51b..24ada17e1 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -46,7 +46,6 @@ from nucypher.crypto.powers import DecryptingPower, SigningPower, TransactingPow from nucypher.crypto.utils import construct_policy_id from nucypher.network.exceptions import NodeSeemsToBeDown from nucypher.network.middleware import RestMiddleware -from nucypher.policy.identity import PolicyCredential from nucypher.utilities.logging import Logger @@ -433,23 +432,6 @@ class Policy(ABC): self.publishing_mutex.start() - def credential(self, with_treasure_map=True): - """ - Creates a PolicyCredential for portable access to the policy via - Alice or Bob. By default, it will include the treasure_map for the - policy unless `with_treasure_map` is False. - """ - - treasure_map = self.treasure_map - if not with_treasure_map: - treasure_map = None - credential = PolicyCredential(alice_verifying_key=self.alice.stamp, - label=self.label, - expiration=self.expiration, - policy_pubkey=self.public_key, - treasure_map=treasure_map) - return credential - def __assign_kfrags(self) -> Generator[Arrangement, None, None]: if len(self._accepted_arrangements) < self.n: diff --git a/tests/acceptance/characters/test_decentralized_grant.py b/tests/acceptance/characters/test_decentralized_grant.py index d491757fc..9d5e48f31 100644 --- a/tests/acceptance/characters/test_decentralized_grant.py +++ b/tests/acceptance/characters/test_decentralized_grant.py @@ -23,7 +23,6 @@ from nucypher.crypto.api import keccak_digest from nucypher.datastore.models import PolicyArrangement from nucypher.datastore.models import TreasureMap as DatastoreTreasureMap from nucypher.policy.collections import SignedTreasureMap as DecentralizedTreasureMap -from nucypher.policy.identity import PolicyCredential from tests.utils.middleware import MockRestMiddleware @@ -60,30 +59,6 @@ def test_decentralized_grant(blockchain_alice, blockchain_bob, agency): with arrangement.ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: assert kfrag == policy_arrangement.kfrag - # Test PolicyCredential w/o TreasureMap - credential = policy.credential(with_treasure_map=False) - assert credential.alice_verifying_key == policy.alice.stamp - assert credential.label == policy.label - assert credential.expiration == policy.expiration - assert credential.policy_pubkey == policy.public_key - assert credential.treasure_map is None - - cred_json = credential.to_json() - deserialized_cred = PolicyCredential.from_json(cred_json) - assert credential == deserialized_cred - - # Test PolicyCredential w/ TreasureMap - credential = policy.credential() - assert credential.alice_verifying_key == policy.alice.stamp - assert credential.label == policy.label - assert credential.expiration == policy.expiration - assert credential.policy_pubkey == policy.public_key - assert credential.treasure_map == policy.treasure_map - - cred_json = credential.to_json() - deserialized_cred = PolicyCredential.from_json(cred_json) - assert credential == deserialized_cred - def test_alice_sets_treasure_map_decentralized(enacted_blockchain_policy): """ From de0f93311841681cc0598b177d1aeecf543d783d Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Fri, 1 Jan 2021 16:21:55 -0800 Subject: [PATCH 5/9] Make Arrangement stateless and get rid of BlockchainArrangement --- nucypher/characters/unlawful.py | 13 +- nucypher/network/middleware.py | 3 +- nucypher/network/server.py | 2 +- nucypher/policy/collections.py | 4 +- nucypher/policy/policies.py | 211 +++++------------- .../characters/test_decentralized_grant.py | 6 +- .../test_stake_via_allocation_contract.py | 3 +- .../cli/ursula/test_stakeholder_and_ursula.py | 3 +- .../acceptance/network/test_network_actors.py | 12 +- .../test_federated_grant_and_revoke.py | 6 +- .../network/test_network_upgrade.py | 10 +- 11 files changed, 87 insertions(+), 186 deletions(-) diff --git a/nucypher/characters/unlawful.py b/nucypher/characters/unlawful.py index e25fc52f1..e021b8391 100644 --- a/nucypher/characters/unlawful.py +++ b/nucypher/characters/unlawful.py @@ -140,12 +140,17 @@ class Amonia(Alice): @staticmethod def enact_without_tabulating_responses(policy, network_middleware, *_args, **_kwargs): - for arrangement in policy._Policy__assign_kfrags(): - arrangement_message_kit = arrangement.encrypt_payload_for_ursula() + for ursula, kfrag in zip(policy._accepted_arrangements, policy.kfrags): + arrangement = policy._accepted_arrangements[ursula] + # TODO: seems like it would be enough to just encrypt this with Ursula's public key, + # and not create a whole capsule. + # Can't change for now since it's node protocol. + message_kit, _signature = policy.alice.encrypt_for(ursula, policy.make_enactment_payload(kfrag)) + try: - network_middleware.enact_policy(arrangement.ursula, + network_middleware.enact_policy(ursula, arrangement.id, - arrangement_message_kit.to_bytes()) + message_kit.to_bytes()) except Exception as e: # I don't care what went wrong - I will keep trying to ram arrangements through. continue diff --git a/nucypher/network/middleware.py b/nucypher/network/middleware.py index a1f378cf2..0fa3830b8 100644 --- a/nucypher/network/middleware.py +++ b/nucypher/network/middleware.py @@ -193,8 +193,7 @@ class RestMiddleware: backend=default_backend()) return certificate - def propose_arrangement(self, arrangement): - node = arrangement.ursula + def propose_arrangement(self, node, arrangement): response = self.client.post(node_or_sprout=node, path="consider_arrangement", data=bytes(arrangement), diff --git a/nucypher/network/server.py b/nucypher/network/server.py index 18508483f..bdfc34bb0 100644 --- a/nucypher/network/server.py +++ b/nucypher/network/server.py @@ -207,7 +207,7 @@ def _make_rest_app(datastore: Datastore, this_node, domain: str, log: Logger) -> with datastore.describe(PolicyArrangement, arrangement.id.hex(), writeable=True) as new_policy_arrangement: new_policy_arrangement.arrangement_id = arrangement.id.hex().encode() new_policy_arrangement.expiration = arrangement.expiration - new_policy_arrangement.alice_verifying_key = arrangement.alice.stamp.as_umbral_pubkey() + new_policy_arrangement.alice_verifying_key = arrangement.alice_verifying_key # TODO: Fine, we'll add the arrangement here, but if we never hear from Alice again to enact it, # we need to prune it at some point. #1700 diff --git a/nucypher/policy/collections.py b/nucypher/policy/collections.py index 77004ae29..10a03704f 100644 --- a/nucypher/policy/collections.py +++ b/nucypher/policy/collections.py @@ -176,10 +176,10 @@ class TreasureMap: nodes_as_bytes += to_canonical_address(ursula_id) + arrangement_id return nodes_as_bytes - def add_arrangement(self, arrangement): + def add_arrangement(self, ursula, arrangement): if self.destinations == NO_DECRYPTION_PERFORMED: raise TypeError("This TreasureMap is encrypted. You can't add another node without decrypting it.") - self.destinations[arrangement.ursula.checksum_address] = arrangement.id # TODO: 1995 + self.destinations[ursula.checksum_address] = arrangement.id # TODO: 1995 def public_id(self) -> str: """ diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 24ada17e1..c8f275cb0 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -51,122 +51,42 @@ from nucypher.utilities.logging import Logger class Arrangement: """ - A Policy must be implemented by arrangements with n Ursulas. This class tracks the status of that implementation. + A contract between Alice and a single Ursula. """ - federated = True ID_LENGTH = 32 - splitter = BytestringSplitter((UmbralPublicKey, PUBLIC_KEY_LENGTH), # alice.stamp - (bytes, ID_LENGTH), # arrangement_ID + splitter = BytestringSplitter((UmbralPublicKey, PUBLIC_KEY_LENGTH), # alive_verifying_key + (bytes, ID_LENGTH), # arrangement_id (bytes, VariableLengthBytestring)) # expiration + @classmethod + def from_alice(cls, alice: Alice, expiration: maya.MayaDT) -> 'Arrangement': + arrangement_id = secure_random(cls.ID_LENGTH) + alice_verifying_key = alice.stamp.as_umbral_pubkey() + return cls(alice_verifying_key, expiration, arrangement_id) + def __init__(self, - alice: Alice, + alice_verifying_key: UmbralPublicKey, expiration: maya.MayaDT, - ursula: Ursula = None, - arrangement_id: bytes = None, - kfrag: KFrag = UNKNOWN_KFRAG + arrangement_id: bytes, ) -> None: - """ - :param value: Funds which will pay for the timeframe of this Arrangement (not the actual re-encryptions); - a portion will be locked for each Ursula that accepts. - :param expiration: The moment which Alice wants the Arrangement to end. - - Other params are hopefully self-evident. - """ - if arrangement_id: - if len(arrangement_id) != self.ID_LENGTH: - raise ValueError(f"Arrangement ID must be of length {self.ID_LENGTH}.") - self.id = arrangement_id - else: - self.id = secure_random(self.ID_LENGTH) + if len(arrangement_id) != self.ID_LENGTH: + raise ValueError(f"Arrangement ID must be of length {self.ID_LENGTH}.") + self.id = arrangement_id self.expiration = expiration - self.alice = alice - self.status = None - - """ - These will normally not be set if Alice is drawing up this arrangement - she hasn't assigned a kfrag yet - (because she doesn't know if this Arrangement will be accepted). She doesn't have an Ursula, for the same reason. - """ - self.kfrag = kfrag - self.ursula = ursula + self.alice_verifying_key = alice_verifying_key def __bytes__(self): - return bytes(self.alice.stamp) + self.id + bytes(VariableLengthBytestring(self.expiration.iso8601().encode())) + return bytes(self.alice_verifying_key) + self.id + bytes(VariableLengthBytestring(self.expiration.iso8601().encode())) @classmethod - def from_bytes(cls, arrangement_as_bytes): + def from_bytes(cls, arrangement_as_bytes: bytes) -> 'Arrangement': alice_verifying_key, arrangement_id, expiration_bytes = cls.splitter(arrangement_as_bytes) expiration = maya.MayaDT.from_iso8601(iso8601_string=expiration_bytes.decode()) - alice = Alice.from_public_keys(verifying_key=alice_verifying_key) - return cls(alice=alice, arrangement_id=arrangement_id, expiration=expiration) - - def encrypt_payload_for_ursula(self): - """Craft an offer to send to Ursula.""" - # We don't need the signature separately. - return self.alice.encrypt_for(self.ursula, self.payload())[0] - - def payload(self): - return bytes(self.kfrag) - - @abstractmethod - def revoke(self): - """ - Revoke arrangement. - """ - raise NotImplementedError - - -class BlockchainArrangement(Arrangement): - """ - A relationship between Alice and a single Ursula as part of Blockchain Policy - """ - federated = False - - class InvalidArrangement(Exception): - pass - - def __init__(self, - alice: Alice, - ursula: Ursula, - rate: int, - expiration: maya.MayaDT, - duration_periods: int, - *args, **kwargs): - super().__init__(alice=alice, ursula=ursula, expiration=expiration, *args, **kwargs) - - # The relationship exists between two addresses - self.author = alice # type: BlockchainPolicyAuthor - self.policy_agent = alice.policy_agent # type: PolicyManagerAgent - self.staker = ursula # type: Ursula - - # Arrangement rate and duration in periods - self.rate = rate - self.duration_periods = duration_periods - - # Status - self.is_published = False - self.publish_transaction = None - self.is_revoked = False - self.revoke_transaction = None + return cls(alice_verifying_key=alice_verifying_key, arrangement_id=arrangement_id, expiration=expiration) def __repr__(self): - class_name = self.__class__.__name__ - r = "{}(client={}, node={})" - r = r.format(class_name, self.author, self.staker) - return r - - def revoke(self) -> str: - """Revoke this arrangement and return the transaction hash as hex.""" - # TODO: #1355 - Revoke arrangements only - txhash = self.policy_agent.revoke_policy(self.id, author_address=self.author.checksum_address) - self.revoke_transaction = txhash - self.is_revoked = True - return txhash - - def payload(self): - partial_payload = super().payload() - return bytes(self.publish_transaction) + partial_payload + return f"Arrangement(client_key={self.alice_verifying_key})" class NodeEngagementMutex: @@ -360,7 +280,7 @@ class Policy(ABC): self.treasure_map = self._treasure_map_class(m=m) self.expiration = expiration - self._accepted_arrangements = set() # type: Set[Arrangement] + self._accepted_arrangements = {} # type: Dict[Ursula, Arrangement] self._rejected_arrangements = set() # type: Set[Arrangement] self._spare_candidates = set() # type: Set[Ursula] @@ -390,7 +310,7 @@ class Policy(ABC): @property def accepted_ursulas(self) -> Set[Ursula]: - return {arrangement.ursula for arrangement in self._accepted_arrangements} + return set(self._accepted_arrangements) def hrac(self) -> bytes: """ @@ -432,51 +352,45 @@ class Policy(ABC): self.publishing_mutex.start() - def __assign_kfrags(self) -> Generator[Arrangement, None, None]: - - if len(self._accepted_arrangements) < self.n: - raise self.MoreKFragsThanArrangements("Not enough candidate arrangements. " - "Call make_arrangements to make more.") - - for kfrag in self.kfrags: - for arrangement in self._accepted_arrangements: - if not arrangement in self._enacted_arrangements.values(): - arrangement.kfrag = kfrag - self._enacted_arrangements[kfrag] = arrangement - yield arrangement - break # This KFrag is now assigned; break the inner loop and go back to assign other kfrags. - else: - # We didn't assign that KFrag. Trouble. - # This is ideally an impossible situation, because we don't typically - # enter this method unless we've already had n or more Arrangements accepted. - raise self.MoreKFragsThanArrangements("Not enough accepted arrangements to assign all KFrags.") - return + def make_enactment_payload(self, kfrag): + return bytes(kfrag) def enact(self, network_middleware, publish_treasure_map=True) -> dict: """ Assign kfrags to ursulas_on_network, and distribute them via REST, populating enacted_arrangements """ - for arrangement in self.__assign_kfrags(): - arrangement_message_kit = arrangement.encrypt_payload_for_ursula() + if len(self._accepted_arrangements) < self.n: + raise self.MoreKFragsThanArrangements("Not enough candidate arrangements. " + "Call make_arrangements to make more.") + + arrangement_statuses = [] + for ursula, kfrag in zip(self._accepted_arrangements, self.kfrags): + arrangement = self._accepted_arrangements[ursula] + # TODO: seems like it would be enough to just encrypt this with Ursula's public key, + # and not create a whole capsule. + # Can't change for now since it's node protocol. + message_kit, _signature = self.alice.encrypt_for(ursula, self.make_enactment_payload(kfrag)) try: # TODO: Concurrency - response = network_middleware.enact_policy(arrangement.ursula, + response = network_middleware.enact_policy(ursula, arrangement.id, - arrangement_message_kit.to_bytes()) + message_kit.to_bytes()) except network_middleware.UnexpectedResponse as e: - arrangement.status = e.status + arrangement_status = e.status else: - arrangement.status = response.status_code + arrangement_status = response.status_code + + arrangement_statuses.append(arrangement_status) # TODO: Handle problem here - if the arrangement is bad, deal with it. - self.treasure_map.add_arrangement(arrangement) + self.treasure_map.add_arrangement(ursula, arrangement) + self._enacted_arrangements[ursula] = kfrag else: # OK, let's check: if two or more Ursulas claimed we didn't pay, # we need to re-evaulate our situation here. - arrangement_statuses = [a.status for a in self._accepted_arrangements] number_of_claims_of_freeloading = sum(status == 402 for status in arrangement_statuses) if number_of_claims_of_freeloading > 2: @@ -495,15 +409,12 @@ class Policy(ABC): if publish_treasure_map is True: return self.publish_treasure_map(network_middleware=network_middleware) # TODO: blockchain_signer? - def propose_arrangement(self, network_middleware, arrangement) -> bool: - negotiation_response = network_middleware.propose_arrangement(arrangement=arrangement) # Wow, we aren't even passing node here. + def propose_arrangement(self, ursula, network_middleware, arrangement) -> bool: + negotiation_response = network_middleware.propose_arrangement(node=ursula, arrangement=arrangement) # TODO: check out the response: need to assess the result and see if we're actually good to go. arrangement_is_accepted = negotiation_response.status_code == 200 - bucket = self._accepted_arrangements if arrangement_is_accepted else self._rejected_arrangements - bucket.add(arrangement) - return arrangement_is_accepted def make_arrangements(self, @@ -534,9 +445,8 @@ class Policy(ABC): f'- only {len(self._accepted_arrangements)} of {self.n} accepted.\n' f'Offending nodes: \n{formatted_offenders}\n') - @abstractmethod - def make_arrangement(self, ursula: Ursula, *args, **kwargs): - raise NotImplementedError + def make_arrangement(self): + return Arrangement.from_alice(alice=self.alice, expiration=self.expiration) @abstractmethod def sample_essential(self, *args, **kwargs) -> Set[Ursula]: @@ -564,9 +474,10 @@ class Policy(ABC): **kwargs) -> None: for index, selected_ursula in enumerate(candidate_ursulas): - arrangement = self.make_arrangement(ursula=selected_ursula, *args, **kwargs) + arrangement = self.make_arrangement(*args, **kwargs) try: - is_accepted = self.propose_arrangement(arrangement=arrangement, + is_accepted = self.propose_arrangement(ursula=selected_ursula, + arrangement=arrangement, network_middleware=network_middleware) except NodeSeemsToBeDown as e: # TODO: #355 Also catch InvalidNode here? @@ -579,7 +490,7 @@ class Policy(ABC): # Bucket the arrangements if is_accepted: self.log.debug(f"Arrangement accepted by {selected_ursula}") - self._accepted_arrangements.add(arrangement) + self._accepted_arrangements[selected_ursula] = arrangement accepted = len(self._accepted_arrangements) if accepted == self.n and not consider_everyone: try: @@ -621,18 +532,11 @@ class FederatedPolicy(Policy): population=list(known_nodes))) return sampled_ursulas - def make_arrangement(self, ursula: Ursula, *args, **kwargs): - return self._arrangement_class(alice=self.alice, - expiration=self.expiration, - ursula=ursula, - *args, **kwargs) - class BlockchainPolicy(Policy): """ A collection of n BlockchainArrangements representing a single Policy """ - _arrangement_class = BlockchainArrangement from nucypher.policy.collections import SignedTreasureMap as _treasure_map_class # TODO: Circular Import class NoSuchPolicy(Exception): @@ -778,7 +682,7 @@ class BlockchainPolicy(Policy): def publish_to_blockchain(self) -> dict: - prearranged_ursulas = list(a.ursula.checksum_address for a in self._accepted_arrangements) + prearranged_ursulas = list(ursula.checksum_address for ursula in self._accepted_arrangements) # Transact # TODO: Move this logic to BlockchainPolicyActor receipt = self.author.policy_agent.create_policy( @@ -796,13 +700,8 @@ class BlockchainPolicy(Policy): return receipt - def make_arrangement(self, ursula: Ursula, *args, **kwargs): - return self._arrangement_class(alice=self.alice, - expiration=self.expiration, - ursula=ursula, - rate=self.rate, - duration_periods=self.duration_periods, - *args, **kwargs) + def make_enactment_payload(self, kfrag): + return bytes(self.publish_transaction) + super().make_enactment_payload(kfrag) def enact(self, network_middleware, publish_to_blockchain=True, publish_treasure_map=True) -> NodeEngagementMutex: """ @@ -812,10 +711,6 @@ class BlockchainPolicy(Policy): if publish_to_blockchain is True: self.publish_to_blockchain() - # Not in love with this block here, but I want 121 closed. - for arrangement in self._accepted_arrangements: - arrangement.publish_transaction = self.publish_transaction - publisher = super().enact(network_middleware, publish_treasure_map=False) if publish_treasure_map is True: diff --git a/tests/acceptance/characters/test_decentralized_grant.py b/tests/acceptance/characters/test_decentralized_grant.py index 9d5e48f31..c46737fc3 100644 --- a/tests/acceptance/characters/test_decentralized_grant.py +++ b/tests/acceptance/characters/test_decentralized_grant.py @@ -52,11 +52,11 @@ def test_decentralized_grant(blockchain_alice, blockchain_bob, agency): assert len(policy._enacted_arrangements) == n # Let's look at the enacted arrangements. - for kfrag in policy.kfrags: - arrangement = policy._enacted_arrangements[kfrag] + for ursula, kfrag in policy._enacted_arrangements.items(): + arrangement = policy._accepted_arrangements[ursula] # Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID. - with arrangement.ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: + with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: assert kfrag == policy_arrangement.kfrag diff --git a/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py b/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py index c59e33479..4c7fadf64 100644 --- a/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py +++ b/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py @@ -557,8 +557,7 @@ def test_collect_rewards_integration(click_runner, handpicked_ursulas={ursula}) # Ensure that the handpicked Ursula was selected for the policy - arrangement = list(blockchain_policy._accepted_arrangements)[0] - assert arrangement.ursula == ursula + assert ursula in blockchain_policy._enacted_arrangements # Bob learns about the new staker and joins the policy blockchain_bob.remember_node(node=ursula) diff --git a/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py b/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py index 500ab31b2..d92bc20aa 100644 --- a/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py +++ b/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py @@ -626,8 +626,7 @@ def test_collect_rewards_integration(click_runner, handpicked_ursulas={ursula}) # Ensure that the handpicked Ursula was selected for the policy - arrangement = list(blockchain_policy._accepted_arrangements)[0] - assert arrangement.ursula == ursula + assert ursula in blockchain_policy._enacted_arrangements # Bob learns about the new staker and joins the policy blockchain_bob.start_learning_loop() diff --git a/tests/acceptance/network/test_network_actors.py b/tests/acceptance/network/test_network_actors.py index 44a33499b..3c9f6e35e 100644 --- a/tests/acceptance/network/test_network_actors.py +++ b/tests/acceptance/network/test_network_actors.py @@ -148,7 +148,8 @@ def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_ali vladimir.node_storage.store_node_certificate(certificate=target.certificate) with pytest.raises(vladimir.InvalidNode): - idle_blockchain_policy.propose_arrangement(network_middleware=blockchain_alice.network_middleware, + idle_blockchain_policy.propose_arrangement(ursula=vladimir, + network_middleware=blockchain_alice.network_middleware, arrangement=FakeArrangement() ) @@ -171,15 +172,18 @@ def test_treasure_map_cannot_be_duplicated(blockchain_ursulas, expiration=policy_end_datetime) matching_ursulas = blockchain_bob.matching_nodes_among(blockchain_ursulas) - first_matching_ursula = matching_ursulas[0] + completed_ursulas = policy.publishing_mutex.block_until_success_is_reasonably_likely() + # Ursulas in publishing_mutex are not real Ursulas, but just some metadata of remote ones. + # We need a real one to access its datastore. + first_completed_ursula = [ursula for ursula in matching_ursulas if ursula in completed_ursulas][0] - with first_matching_ursula.datastore.describe(TreasureMap, policy.treasure_map._hrac.hex()) as saved_map_record: + with first_completed_ursula.datastore.describe(TreasureMap, policy.treasure_map._hrac.hex()) as saved_map_record: assert saved_map_record.treasure_map == bytes(policy.treasure_map) # This Ursula was actually a Vladimir. # Thus, he has access to the (encrypted) TreasureMap and can use its details to # try to store his own fake details. - vladimir = Vladimir.from_target_ursula(first_matching_ursula) + vladimir = Vladimir.from_target_ursula(first_completed_ursula) ursulas_who_probably_do_not_have_the_map = [u for u in blockchain_ursulas if not u in matching_ursulas] node_on_which_to_store_bad_map = ursulas_who_probably_do_not_have_the_map[0] diff --git a/tests/integration/characters/test_federated_grant_and_revoke.py b/tests/integration/characters/test_federated_grant_and_revoke.py index b3027b602..00a207143 100644 --- a/tests/integration/characters/test_federated_grant_and_revoke.py +++ b/tests/integration/characters/test_federated_grant_and_revoke.py @@ -51,11 +51,11 @@ def test_federated_grant(federated_alice, federated_bob): assert len(policy._enacted_arrangements) == n # Let's look at the enacted arrangements. - for kfrag in policy.kfrags: - arrangement = policy._enacted_arrangements[kfrag] + for ursula, kfrag in policy._enacted_arrangements.items(): + arrangement = policy._accepted_arrangements[ursula] # Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID. - with arrangement.ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: + with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: retrieved_kfrag = policy_arrangement.kfrag assert kfrag == retrieved_kfrag diff --git a/tests/integration/network/test_network_upgrade.py b/tests/integration/network/test_network_upgrade.py index a8c3fe2d2..0a4f7e7b7 100644 --- a/tests/integration/network/test_network_upgrade.py +++ b/tests/integration/network/test_network_upgrade.py @@ -32,11 +32,11 @@ def test_alice_enacts_policies_in_policy_group_via_rest(enacted_federated_policy Now that Alice has made a PolicyGroup, she can enact its policies, using Ursula's Public Key to encrypt each offer and transmitting them via REST. """ - arrangement = list(enacted_federated_policy._accepted_arrangements)[0] - ursula = arrangement.ursula - with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: - the_kfrag = policy_arrangement.kfrag - assert bool(the_kfrag) # TODO: This can be a more poignant assertion. + for ursula, kfrag in enacted_federated_policy._enacted_arrangements.items(): + arrangement = enacted_federated_policy._accepted_arrangements[ursula] + with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: + the_kfrag = policy_arrangement.kfrag + assert kfrag == the_kfrag @pytest_twisted.inlineCallbacks From a2b99daa1ddf6b9ad7739c2006ebe5e03050c391 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Fri, 1 Jan 2021 21:20:52 -0800 Subject: [PATCH 6/9] Merge proposals and enactment into a single method, return EnactedPolicy --- nucypher/blockchain/eth/agents.py | 5 +- nucypher/characters/control/interfaces.py | 2 +- nucypher/characters/lawful.py | 18 +- nucypher/characters/unlawful.py | 41 +- nucypher/crypto/kits.py | 4 +- nucypher/policy/policies.py | 732 ++++++++++-------- .../acceptance/characters/control/conftest.py | 8 +- .../control/test_rpc_control_blockchain.py | 12 +- .../control/test_web_control_blockchain.py | 14 +- .../characters/test_decentralized_grant.py | 35 +- .../characters/test_freerider_attacks.py | 2 +- .../test_ursula_prepares_to_act_as_worker.py | 4 +- tests/acceptance/cli/test_bob.py | 3 +- .../test_stake_via_allocation_contract.py | 2 +- .../cli/ursula/test_stakeholder_and_ursula.py | 2 +- .../acceptance/network/test_network_actors.py | 20 +- tests/fixtures.py | 20 +- .../characters/control/conftest.py | 4 +- .../control/test_rpc_control_federated.py | 12 +- .../control/test_web_control_federated.py | 14 +- .../characters/test_bob_handles_frags.py | 6 +- .../test_bob_joins_policy_and_retrieves.py | 10 +- .../test_federated_grant_and_revoke.py | 23 +- .../learning/test_discovery_phases.py | 10 +- .../integration/network/test_failure_modes.py | 7 +- .../network/test_network_upgrade.py | 10 +- .../network/test_treasure_map_integration.py | 30 +- tests/metrics/grant_availability.py | 6 +- 28 files changed, 553 insertions(+), 503 deletions(-) diff --git a/nucypher/blockchain/eth/agents.py b/nucypher/blockchain/eth/agents.py index 89faa371b..fbdab6b9c 100644 --- a/nucypher/blockchain/eth/agents.py +++ b/nucypher/blockchain/eth/agents.py @@ -1760,7 +1760,10 @@ class WeightedSampler: """ def __init__(self, weighted_elements: Dict[Any, int]): - elements, weights = zip(*weighted_elements.items()) + if weighted_elements: + elements, weights = zip(*weighted_elements.items()) + else: + elements, weights = [], [] self.totals = list(accumulate(weights)) self.elements = elements self.__length = len(self.totals) diff --git a/nucypher/characters/control/interfaces.py b/nucypher/characters/control/interfaces.py index 759b80b8b..e11abea7c 100644 --- a/nucypher/characters/control/interfaces.py +++ b/nucypher/characters/control/interfaces.py @@ -127,7 +127,7 @@ class AliceInterface(CharacterPublicInterface): response_data = {'treasure_map': new_policy.treasure_map, 'policy_encrypting_key': new_policy.public_key, - 'alice_verifying_key': new_policy.alice.stamp} + 'alice_verifying_key': new_policy.alice_verifying_key} return response_data diff --git a/nucypher/characters/lawful.py b/nucypher/characters/lawful.py index 1fa687d7f..7bc321fdc 100644 --- a/nucypher/characters/lawful.py +++ b/nucypher/characters/lawful.py @@ -341,20 +341,18 @@ class Alice(Character, BlockchainPolicyAuthor): "know which nodes to use. Either pass them here or when you make the Policy, " "or run the learning loop on a network with enough Ursulas.".format(policy.n)) - self.log.debug(f"Making arrangements for {policy} ... ") - policy.make_arrangements(network_middleware=self.network_middleware, - handpicked_ursulas=handpicked_ursulas, - discover_on_this_thread=discover_on_this_thread) - - # REST call happens here, as does population of TreasureMap. self.log.debug(f"Enacting {policy} ... ") - # TODO: Make it optional to publish to blockchain? Or is this presumptive based on the `Policy` type? - policy.enact(network_middleware=self.network_middleware, publish_treasure_map=publish_treasure_map) + enacted_policy = policy.enact(network_middleware=self.network_middleware, + handpicked_ursulas=handpicked_ursulas, + publish_treasure_map=publish_treasure_map, + discover_on_this_thread=discover_on_this_thread) + + self.add_active_policy(enacted_policy) if publish_treasure_map and block_until_success_is_reasonably_likely: - policy.publishing_mutex.block_until_success_is_reasonably_likely() - return policy # Now with TreasureMap affixed! + enacted_policy.publishing_mutex.block_until_success_is_reasonably_likely() + return enacted_policy def get_policy_encrypting_key_from_label(self, label: bytes) -> UmbralPublicKey: alice_delegating_power = self._crypto_power.power_ups(DelegatingPower) diff --git a/nucypher/characters/unlawful.py b/nucypher/characters/unlawful.py index e021b8391..de920dc56 100644 --- a/nucypher/characters/unlawful.py +++ b/nucypher/characters/unlawful.py @@ -139,13 +139,11 @@ class Amonia(Alice): return alice_clone @staticmethod - def enact_without_tabulating_responses(policy, network_middleware, *_args, **_kwargs): - for ursula, kfrag in zip(policy._accepted_arrangements, policy.kfrags): - arrangement = policy._accepted_arrangements[ursula] - # TODO: seems like it would be enough to just encrypt this with Ursula's public key, - # and not create a whole capsule. - # Can't change for now since it's node protocol. - message_kit, _signature = policy.alice.encrypt_for(ursula, policy.make_enactment_payload(kfrag)) + def enact_without_tabulating_responses(policy, network_middleware, arrangements, publication_transaction, **_kwargs): + for ursula, kfrag in zip(arrangements, policy.kfrags): + arrangement = arrangements[ursula] + payload = policy._make_enactment_payload(publication_transaction, kfrag) + message_kit, _signature = policy.alice.encrypt_for(ursula, payload) try: network_middleware.enact_policy(ursula, @@ -161,9 +159,9 @@ class Amonia(Alice): """ def what_do_you_mean_you_dont_tip(policy, *args, **kwargs): - policy.publish_transaction = b"He convinced me, gimme back my $" + return b"He convinced me, gimme back my $" - with patch("nucypher.policy.policies.BlockchainPolicy.publish_to_blockchain", what_do_you_mean_you_dont_tip): + with patch("nucypher.policy.policies.BlockchainPolicy._publish_to_blockchain", what_do_you_mean_you_dont_tip): return super().grant(*args, **kwargs) def circumvent_safegaurds_and_grant_without_paying(self, *args, **kwargs): @@ -172,7 +170,7 @@ class Amonia(Alice): Can I grant for free if I change the client code to my liking? """ - with patch("nucypher.policy.policies.Policy.enact", self.enact_without_tabulating_responses): + with patch("nucypher.policy.policies.Policy._enact_arrangements", self.enact_without_tabulating_responses): return self.grant_without_paying(*args, **kwargs) def grant_while_paying_the_wrong_nodes(self, @@ -185,28 +183,23 @@ class Amonia(Alice): an on-chain Policy using PolicyManager, I'm hoping Ursula won't notice. """ - def publish_wrong_payee_address_to_blockchain(policy, *args, **kwargs): - receipt = policy.author.policy_agent.create_policy( - policy_id=policy.hrac()[:HRAC_LENGTH], # bytes16 _policyID - author_address=policy.author.checksum_address, + def publish_wrong_payee_address_to_blockchain(policy, _ursulas): + receipt = policy.alice.policy_agent.create_policy( + policy_id=policy.hrac, # bytes16 _policyID + author_address=policy.alice.checksum_address, value=policy.value, end_timestamp=policy.expiration.epoch, # uint16 _numberOfPeriods node_addresses=[f.checksum_address for f in ursulas_to_pay_instead] # address[] memory _nodes ) - # Capture Response - policy.receipt = receipt - policy.publish_transaction = receipt['transactionHash'] - policy.is_published = True + return receipt['transactionHash'] - return receipt - - with patch("nucypher.policy.policies.BlockchainPolicy.publish_to_blockchain", + with patch("nucypher.policy.policies.BlockchainPolicy._publish_to_blockchain", publish_wrong_payee_address_to_blockchain): - with patch("nucypher.policy.policies.Policy.enact", self.enact_without_tabulating_responses): + with patch("nucypher.policy.policies.Policy._enact_arrangements", self.enact_without_tabulating_responses): return super().grant(handpicked_ursulas=ursulas_to_trick_into_working_for_free, *args, **kwargs) - def use_ursula_as_an_involuntary_and_unbeknownst_cdn(self, policy, sucker_ursula): + def use_ursula_as_an_involuntary_and_unbeknownst_cdn(self, policy, bob, sucker_ursula): """ Ursula is a sucker. @@ -228,7 +221,7 @@ class Amonia(Alice): # I'll include a small portion of this awful film in a new message kit. We don't care about the signature for bob. not_the_bees = b"Not the bees!" + int(i).to_bytes(length=4, byteorder="big") like_a_map_but_awful.message_kit, _signature_for_bob_which_is_never_Used = encrypt_and_sign( - policy.bob.public_keys(DecryptingPower), + bob.public_keys(DecryptingPower), plaintext=not_the_bees, signer=self.stamp, ) diff --git a/nucypher/crypto/kits.py b/nucypher/crypto/kits.py index 47fde839a..2b2c4056f 100644 --- a/nucypher/crypto/kits.py +++ b/nucypher/crypto/kits.py @@ -136,10 +136,10 @@ UmbralMessageKit = PolicyMessageKit # Temporarily, until serialization w/ Enric class RevocationKit: - def __init__(self, policy: 'Policy', signer: 'SignatureStamp'): + def __init__(self, treasure_map, signer: 'SignatureStamp'): from nucypher.policy.collections import Revocation self.revocations = dict() - for node_id, arrangement_id in policy.treasure_map: + for node_id, arrangement_id in treasure_map: self.revocations[node_id] = Revocation(arrangement_id, signer=signer) def __iter__(self): diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index c8f275cb0..34c0410ef 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -19,8 +19,7 @@ along with nucypher. If not, see . import datetime from collections import OrderedDict from queue import Queue, Empty -from typing import Callable, Tuple -from typing import Generator, Set, Optional +from typing import Callable, Tuple, Sequence, Set, Optional, Iterable, List, Dict, Type import math import maya @@ -28,7 +27,9 @@ import random import time from abc import ABC, abstractmethod from bytestring_splitter import BytestringSplitter, VariableLengthBytestring -from constant_sorrow.constants import NOT_SIGNED, UNKNOWN_KFRAG +from constant_sorrow.constants import NOT_SIGNED +from eth_typing.evm import ChecksumAddress +from hexbytes import HexBytes from twisted._threads import AlreadyQuit from twisted.internet import reactor from twisted.internet.defer import ensureDeferred, Deferred @@ -37,7 +38,7 @@ from umbral.keys import UmbralPublicKey from umbral.kfrags import KFrag from nucypher.blockchain.eth.actors import BlockchainPolicyAuthor -from nucypher.blockchain.eth.agents import PolicyManagerAgent +from nucypher.blockchain.eth.agents import PolicyManagerAgent, StakersReservoir, StakingEscrowAgent from nucypher.characters.lawful import Alice, Ursula from nucypher.crypto.api import keccak_digest, secure_random from nucypher.crypto.constants import HRAC_LENGTH, PUBLIC_KEY_LENGTH @@ -236,85 +237,135 @@ class NodeEngagementMutex: self._threadpool.start() +class MergedReservoir: + """ + A reservoir made of a list of addresses and a StakersReservoir. + Draws the values from the list first, then from StakersReservoir, + then returns None on subsequent calls. + """ + + def __init__(self, values: Iterable, reservoir: StakersReservoir): + self.values = list(values) + self.reservoir = reservoir + + def __call__(self) -> Optional[ChecksumAddress]: + if self.values: + return self.values.pop(0) + elif len(self.reservoir) > 0: + return self.reservoir.draw(1)[0] + else: + return None + + +class PrefetchStrategy: + """ + Encapsulates the batch draw strategy from a reservoir. + Determines how many values to draw based on the number of values + that have already led to successes. + """ + + def __init__(self, reservoir: MergedReservoir, need_successes: int): + self.reservoir = reservoir + self.need_successes = need_successes + + def __call__(self, successes: int) -> Optional[List[ChecksumAddress]]: + batch = [] + for i in range(self.need_successes - successes): + value = self.reservoir() + if value is None: + break + batch.append(value) + if not batch: + return None + return batch + + +def propose_arrangements(worker, value_factory, target_successes, timeout): + """ + A temporary function that calls workers sequentially. + To be replaced with a parallel solution. + """ + + successes = {} + failures = {} + start_time = maya.now() + + while True: + + value_batch = value_factory(len(successes)) + if value_batch is None: + break + + for value in value_batch: + try: + result = worker(value) + successes[value] = result + except Exception as e: + failures[value] = e + + if len(successes) == target_successes: + break + + delta = maya.now() - start_time + if delta.total_seconds() >= timeout: + raise RuntimeError(f"Proposal stage timed out after {timeout} seconds; " + f"need {target_successes - len(successes)} more.") + + if len(successes) == target_successes: + break + + return successes, failures + + class Policy(ABC): """ - An edict by Alice, arranged with n Ursulas, to perform re-encryption for a specific Bob - for a specific path. - - Once Alice is ready to enact a Policy, she generates KFrags, which become part of the Policy. - - Each Ursula is offered a Arrangement (see above) for a given Policy by Alice. - - Once Alice has secured agreement with n Ursulas to enact a Policy, she sends each a KFrag, - and generates a TreasureMap for the Policy, recording which Ursulas got a KFrag. + An edict by Alice, arranged with n Ursulas, to perform re-encryption for a specific Bob. """ POLICY_ID_LENGTH = 16 - _arrangement_class = NotImplemented log = Logger("Policy") - class Rejected(RuntimeError): - """Too many Ursulas rejected""" - - def __init__(self, - alice: Alice, - label: bytes, - expiration: maya.MayaDT, - bob: 'Bob' = None, - kfrags: Tuple[KFrag, ...] = (UNKNOWN_KFRAG,), - public_key=None, - m: int = None, - alice_signature=NOT_SIGNED) -> None: - - """ - :param kfrags: A list of KFrags to distribute per this Policy. - :param label: The identity of the resource to which Bob is granted access. - """ - self.alice = alice - self.label = label - self.bob = bob - self.kfrags = kfrags - self.public_key = public_key - self._id = construct_policy_id(self.label, bytes(self.bob.stamp)) - self.treasure_map = self._treasure_map_class(m=m) - self.expiration = expiration - - self._accepted_arrangements = {} # type: Dict[Ursula, Arrangement] - self._rejected_arrangements = set() # type: Set[Arrangement] - self._spare_candidates = set() # type: Set[Ursula] - - self._enacted_arrangements = OrderedDict() - self._published_arrangements = OrderedDict() - - self.alice_signature = alice_signature # TODO: This is unused / To Be Implemented? - - self.publishing_mutex = None - - class MoreKFragsThanArrangements(TypeError): + class NotEnoughUrsulas(Exception): """ Raised when a Policy has been used to generate Arrangements with Ursulas insufficient number such that we don't have enough KFrags to give to each Ursula. """ - @property - def n(self) -> int: - return len(self.kfrags) - - @property - def id(self) -> bytes: - return self._id - - def __repr__(self): - return f"{self.__class__.__name__}:{self.id.hex()[:6]}" - - @property - def accepted_ursulas(self) -> Set[Ursula]: - return set(self._accepted_arrangements) - - def hrac(self) -> bytes: + class EnactmentError(Exception): """ - # TODO: #180 - This function is hanging on for dear life. After 180 is closed, it can be completely deprecated. + Raised if one or more Ursulas failed to enact the policy. + """ + + def __init__(self, + alice: Alice, + label: bytes, + expiration: maya.MayaDT, + bob: 'Bob', + kfrags: Sequence[KFrag], + public_key: UmbralPublicKey, + m: int, + ): + + """ + :param kfrags: A list of KFrags to distribute per this Policy. + :param label: The identity of the resource to which Bob is granted access. + """ + + self.m = m + self.n = len(kfrags) + self.alice = alice + self.label = label + self.bob = bob + self.kfrags = kfrags + self.public_key = public_key + self.expiration = expiration + + self._id = construct_policy_id(self.label, bytes(self.bob.stamp)) + + """ + # TODO: #180 - This attribute is hanging on for dear life. + After 180 is closed, it can be completely deprecated. The "hashed resource authentication code". @@ -326,51 +377,113 @@ class Policy(ABC): Alice and Bob have all the information they need to construct this. Ursula does not, so we share it with her. """ - return keccak_digest(bytes(self.alice.stamp) + bytes(self.bob.stamp) + self.label)[:HRAC_LENGTH] + self.hrac = keccak_digest(bytes(self.alice.stamp) + bytes(self.bob.stamp) + self.label)[:HRAC_LENGTH] - async def put_treasure_map_on_node(self, node, network_middleware): - response = network_middleware.put_treasure_map_on_node( - node=node, - map_payload=bytes(self.treasure_map)) - return response + def __repr__(self): + return f"{self.__class__.__name__}:{self._id.hex()[:6]}" - def publish_treasure_map(self, network_middleware: RestMiddleware, - blockchain_signer: Callable = None) -> NodeEngagementMutex: - self.treasure_map.prepare_for_publication(self.bob.public_keys(DecryptingPower), - self.bob.public_keys(SigningPower), - self.alice.stamp, - self.label) - if blockchain_signer is not None: - self.treasure_map.include_blockchain_signature(blockchain_signer) - - self.alice.block_until_number_of_known_nodes_is(8, timeout=2, learn_on_this_thread=True) - - target_nodes = self.bob.matching_nodes_among(self.alice.known_nodes) - self.publishing_mutex = NodeEngagementMutex(callable_to_engage=self.put_treasure_map_on_node, - nodes=target_nodes, - network_middleware=network_middleware) - - self.publishing_mutex.start() - - def make_enactment_payload(self, kfrag): - return bytes(kfrag) - - def enact(self, network_middleware, publish_treasure_map=True) -> dict: + def _propose_arrangement(self, + address: ChecksumAddress, + network_middleware: RestMiddleware, + ) -> Tuple[Ursula, Arrangement]: """ - Assign kfrags to ursulas_on_network, and distribute them via REST, - populating enacted_arrangements + Attempt to propose an arrangement to the node with the given address. """ - if len(self._accepted_arrangements) < self.n: - raise self.MoreKFragsThanArrangements("Not enough candidate arrangements. " - "Call make_arrangements to make more.") - arrangement_statuses = [] - for ursula, kfrag in zip(self._accepted_arrangements, self.kfrags): - arrangement = self._accepted_arrangements[ursula] + if address not in self.alice.known_nodes: + raise RuntimeError(f"{address} is not known") + + ursula = self.alice.known_nodes[address] + arrangement = Arrangement.from_alice(alice=self.alice, expiration=self.expiration) + + self.log.debug(f"Proposing arrangement {arrangement} to {ursula}") + negotiation_response = network_middleware.propose_arrangement(ursula, arrangement) + status = negotiation_response.status_code + + if status == 200: + self.log.debug(f"Arrangement accepted by {ursula}") + else: + message = f"Proposing arrangement to {ursula} failed with {status}" + self.log.debug(message) + raise RuntimeError(message) + + # We could just return the arrangement and get the Ursula object + # from `known_nodes` later, but when we introduce slashing in FleetSensor, + # the address can already disappear from `known_nodes` by that time. + return (ursula, arrangement) + + @abstractmethod + def _make_reservoir(self, handpicked_addresses: Sequence[ChecksumAddress]) -> MergedReservoir: + """ + Builds a `MergedReservoir` to use for drawing addresses to send proposals to. + """ + raise NotImplementedError + + def _make_arrangements(self, + network_middleware: RestMiddleware, + handpicked_ursulas: Optional[Iterable[Ursula]] = None, + discover_on_this_thread: bool = True, + timeout: int = 10, + ) -> Dict[Ursula, Arrangement]: + """ + Pick some Ursula addresses and send them arrangement proposals. + Returns a dictionary of Ursulas to Arrangements if it managed to get `n` responses. + """ + + if handpicked_ursulas is None: + handpicked_ursulas = [] + handpicked_addresses = [ursula.checksum_address for ursula in handpicked_ursulas] + + reservoir = self._make_reservoir(handpicked_addresses) + value_factory = PrefetchStrategy(reservoir, self.n) + + def worker(address): + return self._propose_arrangement(address, network_middleware) + + self.alice.block_until_number_of_known_nodes_is(self.n, learn_on_this_thread=discover_on_this_thread, eager=True) + + arrangements, failures = propose_arrangements(worker=worker, + value_factory=value_factory, + target_successes=self.n, + timeout=timeout) + + accepted_arrangements = {ursula: arrangement for ursula, arrangement in arrangements.values()} + + accepted_addresses = ", ".join(ursula.checksum_address for ursula in accepted_arrangements) + + if len(arrangements) < self.n: + + rejected_proposals = "\n".join(f"{address}: {exception}" for address, exception in failures.items()) + + self.log.debug( + "Could not find enough Ursulas to accept proposals.\n" + f"Accepted: {accepted_addresses}\n" + f"Rejected:\n{rejected_proposals}") + raise self._not_enough_ursulas_exception() + else: + self.log.debug(f"Finished proposing arrangements; accepted: {accepted_addresses}") + + return accepted_arrangements + + def _enact_arrangements(self, + network_middleware: RestMiddleware, + arrangements: Dict[Ursula, Arrangement], + publication_transaction: Optional[HexBytes] = None, + publish_treasure_map: bool = True, + ): + """ + Attempts to distribute kfrags to Ursulas that accepted arrangements earlier. + """ + + statuses = {} + for ursula, kfrag in zip(arrangements, self.kfrags): + arrangement = arrangements[ursula] + # TODO: seems like it would be enough to just encrypt this with Ursula's public key, # and not create a whole capsule. # Can't change for now since it's node protocol. - message_kit, _signature = self.alice.encrypt_for(ursula, self.make_enactment_payload(kfrag)) + payload = self._make_enactment_payload(publication_transaction, kfrag) + message_kit, _signature = self.alice.encrypt_for(ursula, payload) try: # TODO: Concurrency @@ -378,203 +491,173 @@ class Policy(ABC): arrangement.id, message_kit.to_bytes()) except network_middleware.UnexpectedResponse as e: - arrangement_status = e.status + status = e.status else: - arrangement_status = response.status_code + status = response.status_code - arrangement_statuses.append(arrangement_status) + statuses[ursula.checksum_address] = status - # TODO: Handle problem here - if the arrangement is bad, deal with it. - self.treasure_map.add_arrangement(ursula, arrangement) - self._enacted_arrangements[ursula] = kfrag + # TODO: Enable re-tries? + + if not all(status == 200 for status in statuses.values()): + report = "\n".join(f"{address}: {status}" for address, status in statuses.items()) + self.log.debug(f"Policy enactment failed. Request statuses:\n{report}") - else: # OK, let's check: if two or more Ursulas claimed we didn't pay, # we need to re-evaulate our situation here. - number_of_claims_of_freeloading = sum(status == 402 for status in arrangement_statuses) + number_of_claims_of_freeloading = sum(status == 402 for status in statuses.values()) + # TODO: a better exception here? if number_of_claims_of_freeloading > 2: - raise self.alice.NotEnoughNodes # TODO: Clean this up and enable re-tries. + raise self.alice.NotEnoughNodes - self.treasure_map.check_for_sufficient_destinations() + # otherwise just raise a more generic error + raise Policy.EnactmentError() - # TODO: Leave a note to try any failures later. - pass + def _make_treasure_map(self, + network_middleware: RestMiddleware, + arrangements: Dict[Ursula, Arrangement], + ) -> 'TreasureMap': + """ + Creates a treasure map for given arrangements. + """ - # ...After *all* the arrangements are enacted - # Create Alice's revocation kit - self.revocation_kit = RevocationKit(self, self.alice.stamp) - self.alice.add_active_policy(self) + treasure_map = self._treasure_map_class(m=self.m) - if publish_treasure_map is True: - return self.publish_treasure_map(network_middleware=network_middleware) # TODO: blockchain_signer? + for ursula, arrangement in arrangements.items(): + treasure_map.add_arrangement(ursula, arrangement) - def propose_arrangement(self, ursula, network_middleware, arrangement) -> bool: - negotiation_response = network_middleware.propose_arrangement(node=ursula, arrangement=arrangement) + treasure_map.prepare_for_publication(bob_encrypting_key=self.bob.public_keys(DecryptingPower), + bob_verifying_key=self.bob.public_keys(SigningPower), + alice_stamp=self.alice.stamp, + label=self.label) - # TODO: check out the response: need to assess the result and see if we're actually good to go. - arrangement_is_accepted = negotiation_response.status_code == 200 + return treasure_map - return arrangement_is_accepted + def _make_publishing_mutex(self, + treasure_map: 'TreasureMap', + network_middleware: RestMiddleware, + ) -> NodeEngagementMutex: - def make_arrangements(self, - network_middleware: RestMiddleware, - handpicked_ursulas: Optional[Set[Ursula]] = None, - discover_on_this_thread: bool = True, - *args, **kwargs, - ) -> None: + async def put_treasure_map_on_node(node, network_middleware): + response = network_middleware.put_treasure_map_on_node(node=node, + map_payload=bytes(treasure_map)) + return response - sampled_ursulas = self.sample(handpicked_ursulas=handpicked_ursulas, - discover_on_this_thread=discover_on_this_thread) + # TODO (#2516): remove hardcoding of 8 nodes + self.alice.block_until_number_of_known_nodes_is(8, timeout=2, learn_on_this_thread=True) + target_nodes = self.bob.matching_nodes_among(self.alice.known_nodes) - if len(sampled_ursulas) < self.n: - raise self.MoreKFragsThanArrangements( - "To make a Policy in federated mode, you need to designate *all* ' \ - the Ursulas you need (in this case, {}); there's no other way to ' \ - know which nodes to use. Either pass them here or when you make ' \ - the Policy.".format(self.n)) + return NodeEngagementMutex(callable_to_engage=put_treasure_map_on_node, + nodes=target_nodes, + network_middleware=network_middleware) - # TODO: One of these layers needs to add concurrency. - self._propose_arrangements(network_middleware=network_middleware, - candidate_ursulas=sampled_ursulas, - *args, **kwargs) + def enact(self, + network_middleware: RestMiddleware, + handpicked_ursulas: Optional[Iterable[Ursula]] = None, + discover_on_this_thread: bool = True, + publish_treasure_map: bool = True, + ) -> 'EnactedPolicy': + """ + Attempts to enact the policy, returns an `EnactedPolicy` object on success. + """ - if len(self._accepted_arrangements) < self.n: - formatted_offenders = '\n'.join(f'{u.checksum_address}@{u.rest_url()}' for u in sampled_ursulas) - raise self.Rejected(f'Selected Ursulas rejected too many arrangements' - f'- only {len(self._accepted_arrangements)} of {self.n} accepted.\n' - f'Offending nodes: \n{formatted_offenders}\n') + arrangements = self._make_arrangements(network_middleware=network_middleware, + handpicked_ursulas=handpicked_ursulas, + discover_on_this_thread=discover_on_this_thread) - def make_arrangement(self): - return Arrangement.from_alice(alice=self.alice, expiration=self.expiration) + self._enact_arrangements(network_middleware=network_middleware, + arrangements=arrangements, + publish_treasure_map=publish_treasure_map) + + treasure_map = self._make_treasure_map(network_middleware=network_middleware, + arrangements=arrangements) + publishing_mutex = self._make_publishing_mutex(treasure_map=treasure_map, + network_middleware=network_middleware) + revocation_kit = RevocationKit(treasure_map, self.alice.stamp) + + enacted_policy = EnactedPolicy(self._id, + self.hrac, + self.label, + self.public_key, + treasure_map, + publishing_mutex, + revocation_kit, + self.alice.stamp) + + if publish_treasure_map is True: + enacted_policy.publish_treasure_map() + + return enacted_policy @abstractmethod - def sample_essential(self, *args, **kwargs) -> Set[Ursula]: + def _not_enough_ursulas_exception(self) -> Type[Exception]: + """ + Returns an exception to raise when there were not enough Ursulas + to distribute arrangements to. + """ raise NotImplementedError - def sample(self, - handpicked_ursulas: Optional[Set[Ursula]] = None, - discover_on_this_thread: bool = False, - ) -> Set[Ursula]: - selected_ursulas = set(handpicked_ursulas) if handpicked_ursulas else set() - - # Calculate the target sample quantity - if self.n - len(selected_ursulas) > 0: - sampled_ursulas = self.sample_essential(handpicked_ursulas=selected_ursulas, - discover_on_this_thread=discover_on_this_thread) - selected_ursulas.update(sampled_ursulas) - - return selected_ursulas - - def _propose_arrangements(self, - network_middleware: RestMiddleware, - candidate_ursulas: Set[Ursula], - consider_everyone: bool = False, - *args, - **kwargs) -> None: - - for index, selected_ursula in enumerate(candidate_ursulas): - arrangement = self.make_arrangement(*args, **kwargs) - try: - is_accepted = self.propose_arrangement(ursula=selected_ursula, - arrangement=arrangement, - network_middleware=network_middleware) - - except NodeSeemsToBeDown as e: # TODO: #355 Also catch InvalidNode here? - # This arrangement won't be added to the accepted bucket. - # If too many nodes are down, it will fail in make_arrangements. - # Also TODO: Prolly log this or something at this stage. - continue - - else: - # Bucket the arrangements - if is_accepted: - self.log.debug(f"Arrangement accepted by {selected_ursula}") - self._accepted_arrangements[selected_ursula] = arrangement - accepted = len(self._accepted_arrangements) - if accepted == self.n and not consider_everyone: - try: - spares = set(list(candidate_ursulas)[index + 1::]) - self._spare_candidates.update(spares) - except IndexError: - self._spare_candidates = set() - break - else: - self.log.debug(f"Arrangement failed with {selected_ursula}") - self._rejected_arrangements.add(arrangement) + @abstractmethod + def _make_enactment_payload(self, publication_transaction: Optional[HexBytes], kfrag: KFrag) -> bytes: + """ + Serializes a given kfrag and policy publication transaction to send to Ursula. + """ + raise NotImplementedError class FederatedPolicy(Policy): - _arrangement_class = Arrangement + from nucypher.policy.collections import TreasureMap as _treasure_map_class # TODO: Circular Import - def make_arrangements(self, *args, **kwargs) -> None: - try: - return super().make_arrangements(*args, **kwargs) - except self.MoreKFragsThanArrangements: - error = "To make a Policy in federated mode, you need to designate *all* ' \ - the Ursulas you need (in this case, {}); there's no other way to ' \ - know which nodes to use. " \ - "Pass them here as handpicked_ursulas.".format(self.n) - raise self.MoreKFragsThanArrangements(error) # TODO: NotEnoughUrsulas where in the exception tree is this? + def _not_enough_ursulas_exception(self): + return Policy.NotEnoughUrsulas - def sample_essential(self, - handpicked_ursulas: Set[Ursula], - discover_on_this_thread: bool = True) -> Set[Ursula]: + def _make_reservoir(self, handpicked_addresses): + addresses = { + ursula.checksum_address: 1 for ursula in self.alice.known_nodes + if ursula.checksum_address not in handpicked_addresses} - self.alice.block_until_specific_nodes_are_known(set(ursula.checksum_address for ursula in handpicked_ursulas)) - self.alice.block_until_number_of_known_nodes_is(self.n, learn_on_this_thread=discover_on_this_thread) - known_nodes = self.alice.known_nodes - if handpicked_ursulas: - # Prevent re-sampling of handpicked ursulas. - known_nodes = set(known_nodes) - handpicked_ursulas - sampled_ursulas = set(random.sample(k=self.n - len(handpicked_ursulas), - population=list(known_nodes))) - return sampled_ursulas + return MergedReservoir(handpicked_addresses, StakersReservoir(addresses)) + + def _make_enactment_payload(self, publication_transaction, kfrag): + assert publication_transaction is None # sanity check; should not ever be hit + return bytes(kfrag) class BlockchainPolicy(Policy): """ - A collection of n BlockchainArrangements representing a single Policy + A collection of n Arrangements representing a single Policy """ + from nucypher.policy.collections import SignedTreasureMap as _treasure_map_class # TODO: Circular Import - class NoSuchPolicy(Exception): - pass - - class InvalidPolicy(Exception): - pass - class InvalidPolicyValue(ValueError): pass - class NotEnoughBlockchainUrsulas(Policy.MoreKFragsThanArrangements): + class NotEnoughBlockchainUrsulas(Policy.NotEnoughUrsulas): pass def __init__(self, - alice: Alice, value: int, rate: int, duration_periods: int, - expiration: maya.MayaDT, - *args, **kwargs): + *args, + **kwargs, + ): + + super().__init__(*args, **kwargs) self.duration_periods = duration_periods - self.expiration = expiration self.value = value self.rate = rate - self.author = alice - # Initial State - self.publish_transaction = None - self.is_published = False - self.receipt = None + self._validate_fee_value() - super().__init__(alice=alice, expiration=expiration, *args, **kwargs) + def _not_enough_ursulas_exception(self): + return BlockchainPolicy.NotEnoughBlockchainUrsulas - self.validate_fee_value() - - def validate_fee_value(self) -> None: + def _validate_fee_value(self) -> None: rate_per_period = self.value // self.n // self.duration_periods # wei recalculated_value = self.duration_periods * rate_per_period * self.n if recalculated_value != self.value: @@ -615,111 +698,78 @@ class BlockchainPolicy(Policy): params = dict(rate=rate, value=value) return params - def sample_essential(self, - handpicked_ursulas: Set[Ursula], - learner_timeout: int = 1, - timeout: int = 10, - discover_on_this_thread: bool = False) -> Set[Ursula]: # TODO #843: Make timeout configurable + def _make_reservoir(self, handpicked_addresses): + try: + reservoir = self.alice.get_stakers_reservoir(duration=self.duration_periods, + without=handpicked_addresses) + except StakingEscrowAgent.NotEnoughStakers: + # TODO: do that in `get_stakers_reservoir()`? + reservoir = StakersReservoir({}) - handpicked_addresses = [ursula.checksum_address for ursula in handpicked_ursulas] - reservoir = self.alice.get_stakers_reservoir(duration=self.duration_periods, - without=handpicked_addresses) + return MergedReservoir(handpicked_addresses, reservoir) - quantity_remaining = self.n - len(handpicked_ursulas) - if len(reservoir) < quantity_remaining: - error = f"Cannot create policy with {self.n} arrangements" - raise self.NotEnoughBlockchainUrsulas(error) + def _publish_to_blockchain(self, ursulas) -> dict: - # Handpicked Ursulas are not necessarily known - to_check = list(handpicked_ursulas) + reservoir.draw(quantity_remaining) - checked = [] - - # Sample stakers in a loop and feed them to the learner to check - # until we have enough in `selected_ursulas`. - - start_time = maya.now() - - while True: - - # Check if the sampled addresses are already known. - # If we're lucky, we won't have to wait for the learner iteration to finish. - checked += [x for x in to_check if x in self.alice.known_nodes] - to_check = [x for x in to_check if x not in self.alice.known_nodes] - - if len(checked) >= self.n: - break - - # The number of new nodes to draw on each iteration. - # The choice of this depends on how expensive it is to check a node for validity, - # and how likely is it for a picked node to be offline. - # We assume here that it is unlikely, and be conservative. - drawing_step = self.n - len(checked) - - # Draw a little bit more nodes, if there are any - to_check += reservoir.draw_at_most(drawing_step) - - delta = maya.now() - start_time - if delta.total_seconds() >= timeout: - still_checking = ', '.join(to_check) - quantity_remaining = self.n - len(checked) - raise RuntimeError(f"Timed out after {timeout} seconds; " - f"need {quantity_remaining} more, still checking {still_checking}.") - - self.alice.block_until_specific_nodes_are_known(to_check, - learn_on_this_thread=discover_on_this_thread, - allow_missing=len(to_check), - timeout=learner_timeout) - - # We only need `n` nodes. Pick the first `n` ones, - # since they were the first drawn, and hence have the priority. - found_ursulas = [self.alice.known_nodes[address] for address in checked[:self.n]] - - # Randomize the output to avoid the largest stakers always being the first in the list - system_random = random.SystemRandom() - system_random.shuffle(found_ursulas) # inplace - - return set(found_ursulas) - - def publish_to_blockchain(self) -> dict: - - prearranged_ursulas = list(ursula.checksum_address for ursula in self._accepted_arrangements) + addresses = [ursula.checksum_address for ursula in ursulas] # Transact # TODO: Move this logic to BlockchainPolicyActor - receipt = self.author.policy_agent.create_policy( - policy_id=self.hrac(), # bytes16 _policyID - author_address=self.author.checksum_address, + receipt = self.alice.policy_agent.create_policy( + policy_id=self.hrac, # bytes16 _policyID + author_address=self.alice.checksum_address, value=self.value, end_timestamp=self.expiration.epoch, # uint16 _numberOfPeriods - node_addresses=prearranged_ursulas # address[] memory _nodes + node_addresses=addresses # address[] memory _nodes ) # Capture Response - self.receipt = receipt - self.publish_transaction = receipt['transactionHash'] - self.is_published = True # TODO: For real: TX / Swarm confirmations needed? + return receipt['transactionHash'] - return receipt + def _make_enactment_payload(self, publication_transaction, kfrag): + return bytes(publication_transaction) + bytes(kfrag) - def make_enactment_payload(self, kfrag): - return bytes(self.publish_transaction) + super().make_enactment_payload(kfrag) + def _enact_arrangements(self, + network_middleware, + arrangements, + publish_treasure_map=True) -> NodeEngagementMutex: + transaction = self._publish_to_blockchain(list(arrangements)) + return super()._enact_arrangements(network_middleware=network_middleware, + arrangements=arrangements, + publish_treasure_map=publish_treasure_map, + publication_transaction=transaction) - def enact(self, network_middleware, publish_to_blockchain=True, publish_treasure_map=True) -> NodeEngagementMutex: - """ - Assign kfrags to ursulas_on_network, and distribute them via REST, - populating enacted_arrangements - """ - if publish_to_blockchain is True: - self.publish_to_blockchain() + def _make_treasure_map(self, + network_middleware: RestMiddleware, + arrangements: Dict[Ursula, Arrangement], + ) -> 'TreasureMap': - publisher = super().enact(network_middleware, publish_treasure_map=False) + treasure_map = super()._make_treasure_map(network_middleware, arrangements) + transacting_power = self.alice._crypto_power.power_ups(TransactingPower) + treasure_map.include_blockchain_signature(transacting_power.sign_message) + return treasure_map - if publish_treasure_map is True: - self.treasure_map.prepare_for_publication(bob_encrypting_key=self.bob.public_keys(DecryptingPower), - bob_verifying_key=self.bob.public_keys(SigningPower), - alice_stamp=self.alice.stamp, - label=self.label) - # Sign the map. - transacting_power = self.alice._crypto_power.power_ups(TransactingPower) - publisher = self.publish_treasure_map(network_middleware=network_middleware, - blockchain_signer=transacting_power.sign_message) - return publisher + +class EnactedPolicy: + + def __init__(self, + id: bytes, + hrac: bytes, + label: bytes, + public_key: UmbralPublicKey, + treasure_map: 'TreasureMap', + publishing_mutex: NodeEngagementMutex, + revocation_kit: RevocationKit, + alice_verifying_key: UmbralPublicKey, + ): + + self.id = id # TODO: is it even used anywhere? + self.hrac = hrac + self.label = label + self.public_key = public_key + self.treasure_map = treasure_map + self.publishing_mutex = publishing_mutex + self.revocation_kit = revocation_kit + self.n = len(self.treasure_map.destinations) + self.alice_verifying_key = alice_verifying_key + + def publish_treasure_map(self): + self.publishing_mutex.start() diff --git a/tests/acceptance/characters/control/conftest.py b/tests/acceptance/characters/control/conftest.py index 650b874c3..96b340f34 100644 --- a/tests/acceptance/characters/control/conftest.py +++ b/tests/acceptance/characters/control/conftest.py @@ -118,18 +118,18 @@ def grant_control_request(blockchain_bob): @pytest.fixture(scope='module') -def join_control_request(blockchain_bob, enacted_blockchain_policy): +def join_control_request(blockchain_alice, blockchain_bob, enacted_blockchain_policy): method_name = 'join_policy' params = { 'label': enacted_blockchain_policy.label.decode(), - 'alice_verifying_key': bytes(enacted_blockchain_policy.alice.stamp).hex(), + 'alice_verifying_key': bytes(enacted_blockchain_policy.alice_verifying_key).hex(), } return method_name, params @pytest.fixture(scope='function') -def retrieve_control_request(blockchain_bob, enacted_blockchain_policy, capsule_side_channel_blockchain): +def retrieve_control_request(blockchain_alice, blockchain_bob, enacted_blockchain_policy, capsule_side_channel_blockchain): capsule_side_channel_blockchain.reset() method_name = 'retrieve' message_kit = capsule_side_channel_blockchain() @@ -137,7 +137,7 @@ def retrieve_control_request(blockchain_bob, enacted_blockchain_policy, capsule_ params = { 'label': enacted_blockchain_policy.label.decode(), 'policy_encrypting_key': bytes(enacted_blockchain_policy.public_key).hex(), - 'alice_verifying_key': bytes(enacted_blockchain_policy.alice.stamp).hex(), + 'alice_verifying_key': bytes(enacted_blockchain_policy.alice_verifying_key).hex(), 'message_kit': b64encode(message_kit.to_bytes()).decode(), } return method_name, params diff --git a/tests/acceptance/characters/control/test_rpc_control_blockchain.py b/tests/acceptance/characters/control/test_rpc_control_blockchain.py index f0687e35f..0214eea74 100644 --- a/tests/acceptance/characters/control/test_rpc_control_blockchain.py +++ b/tests/acceptance/characters/control/test_rpc_control_blockchain.py @@ -26,9 +26,15 @@ from nucypher.policy.collections import SignedTreasureMap from tests.utils.controllers import get_fields, validate_json_rpc_response_data -def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_blockchain_policy): - # Simulate passing in a teacher-uri - enacted_blockchain_policy.bob.remember_node(list(enacted_blockchain_policy.accepted_ursulas)[0]) +def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_blockchain_policy, blockchain_bob, blockchain_ursulas): + for ursula in blockchain_ursulas: + if ursula.checksum_address in enacted_blockchain_policy.treasure_map.destinations: + # Simulate passing in a teacher-uri + blockchain_bob.remember_node(ursula) + break + else: + # Shouldn't happen + raise Exception("No known Ursulas present in the treasure map destinations") method_name, params = join_control_request request_data = {'method': method_name, 'params': params} diff --git a/tests/acceptance/characters/control/test_web_control_blockchain.py b/tests/acceptance/characters/control/test_web_control_blockchain.py index 136598eea..ec234dcd4 100644 --- a/tests/acceptance/characters/control/test_web_control_blockchain.py +++ b/tests/acceptance/characters/control/test_web_control_blockchain.py @@ -175,14 +175,20 @@ def test_alice_character_control_decrypt(alice_web_controller_test_client, assert response.status_code == 405 -def test_bob_character_control_join_policy(bob_web_controller_test_client, enacted_blockchain_policy): +def test_bob_character_control_join_policy(bob_web_controller_test_client, enacted_blockchain_policy, blockchain_alice, blockchain_bob, blockchain_ursulas): request_data = { 'label': enacted_blockchain_policy.label.decode(), - 'alice_verifying_key': bytes(enacted_blockchain_policy.alice.stamp).hex(), + 'alice_verifying_key': bytes(enacted_blockchain_policy.alice_verifying_key).hex(), } - # Simulate passing in a teacher-uri - enacted_blockchain_policy.bob.remember_node(list(enacted_blockchain_policy.accepted_ursulas)[0]) + for ursula in blockchain_ursulas: + if ursula.checksum_address in enacted_blockchain_policy.treasure_map.destinations: + # Simulate passing in a teacher-uri + blockchain_bob.remember_node(ursula) + break + else: + # Shouldn't happen + raise Exception("No known Ursulas present in the treasure map destinations") response = bob_web_controller_test_client.post('/join_policy', data=json.dumps(request_data)) assert b'{"result": {"policy_encrypting_key": "OK"}' in response.data # TODO diff --git a/tests/acceptance/characters/test_decentralized_grant.py b/tests/acceptance/characters/test_decentralized_grant.py index c46737fc3..f9c0063a7 100644 --- a/tests/acceptance/characters/test_decentralized_grant.py +++ b/tests/acceptance/characters/test_decentralized_grant.py @@ -26,8 +26,7 @@ from nucypher.policy.collections import SignedTreasureMap as DecentralizedTreasu from tests.utils.middleware import MockRestMiddleware -@pytest.mark.usefixtures('blockchain_ursulas') -def test_decentralized_grant(blockchain_alice, blockchain_bob, agency): +def test_decentralized_grant(blockchain_alice, blockchain_bob, blockchain_ursulas, agency): # Setup the policy details n = 3 policy_end_datetime = maya.now() + datetime.timedelta(days=5) @@ -42,49 +41,47 @@ def test_decentralized_grant(blockchain_alice, blockchain_bob, agency): expiration=policy_end_datetime) # Check the policy ID - policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp)) + policy_id = keccak_digest(label + bytes(blockchain_bob.stamp)) assert policy_id == policy.id - # The number of accepted arrangements at least the number of Ursulas we're using (n) - assert len(policy._accepted_arrangements) >= n - # The number of actually enacted arrangements is exactly equal to n. - assert len(policy._enacted_arrangements) == n + assert len(policy.treasure_map.destinations) == n # Let's look at the enacted arrangements. - for ursula, kfrag in policy._enacted_arrangements.items(): - arrangement = policy._accepted_arrangements[ursula] + for ursula in blockchain_ursulas: + if ursula.checksum_address in policy.treasure_map.destinations: + arrangement_id = policy.treasure_map.destinations[ursula.checksum_address] - # Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID. - with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: - assert kfrag == policy_arrangement.kfrag + # Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID. + with ursula.datastore.describe(PolicyArrangement, arrangement_id.hex()) as policy_arrangement: + retrieved_kfrag = policy_arrangement.kfrag + assert bool(retrieved_kfrag) # TODO: try to assemble them back? -def test_alice_sets_treasure_map_decentralized(enacted_blockchain_policy): +def test_alice_sets_treasure_map_decentralized(enacted_blockchain_policy, blockchain_alice, blockchain_bob): """ Same as test_alice_sets_treasure_map except with a blockchain policy. """ - enacted_blockchain_policy.publish_treasure_map(network_middleware=MockRestMiddleware()) treasure_map_hrac = enacted_blockchain_policy.treasure_map._hrac[:16].hex() found = 0 - for node in enacted_blockchain_policy.bob.matching_nodes_among(enacted_blockchain_policy.alice.known_nodes): + for node in blockchain_bob.matching_nodes_among(blockchain_alice.known_nodes): with node.datastore.describe(DatastoreTreasureMap, treasure_map_hrac) as treasure_map_on_node: assert DecentralizedTreasureMap.from_bytes(treasure_map_on_node.treasure_map) == enacted_blockchain_policy.treasure_map found += 1 assert found -def test_bob_retrieves_treasure_map_from_decentralized_node(enacted_blockchain_policy): +def test_bob_retrieves_treasure_map_from_decentralized_node(enacted_blockchain_policy, blockchain_alice, blockchain_bob): """ This is the same test as `test_bob_retrieves_the_treasure_map_and_decrypt_it`, except with an `enacted_blockchain_policy`. """ - bob = enacted_blockchain_policy.bob + bob = blockchain_bob _previous_domain = bob.domain bob.domain = None # Bob has no knowledge of the network. with pytest.raises(bob.NotEnoughTeachers): - treasure_map_from_wire = bob.get_treasure_map(enacted_blockchain_policy.alice.stamp, + treasure_map_from_wire = bob.get_treasure_map(blockchain_alice.stamp, enacted_blockchain_policy.label) # Bob finds out about one Ursula (in the real world, a seed node, hardcoded based on his learning domain) @@ -95,6 +92,6 @@ def test_bob_retrieves_treasure_map_from_decentralized_node(enacted_blockchain_p bob.learn_from_teacher_node(eager=True) # Now he'll have better success finding that map. - treasure_map_from_wire = bob.get_treasure_map(enacted_blockchain_policy.alice.stamp, + treasure_map_from_wire = bob.get_treasure_map(blockchain_alice.stamp, enacted_blockchain_policy.label) assert enacted_blockchain_policy.treasure_map == treasure_map_from_wire diff --git a/tests/acceptance/characters/test_freerider_attacks.py b/tests/acceptance/characters/test_freerider_attacks.py index 4a348ecfc..3912dbaa6 100644 --- a/tests/acceptance/characters/test_freerider_attacks.py +++ b/tests/acceptance/characters/test_freerider_attacks.py @@ -156,5 +156,5 @@ def test_put_additional_treasure_map_on_network(blockchain_ursulas, blockchain_a # This should 409 because Ursula won't be able to find an HRAC on-chain # with the modified HRAC. with pytest.raises(RestMiddleware.UnexpectedResponse) as should_409: - amonia.use_ursula_as_an_involuntary_and_unbeknownst_cdn(policy, sucker_ursula=blockchain_ursulas[0]) + amonia.use_ursula_as_an_involuntary_and_unbeknownst_cdn(policy, blockchain_bob, sucker_ursula=blockchain_ursulas[0]) assert should_409.value.status == 409 diff --git a/tests/acceptance/characters/test_ursula_prepares_to_act_as_worker.py b/tests/acceptance/characters/test_ursula_prepares_to_act_as_worker.py index b88fb5253..b17e0a4ec 100644 --- a/tests/acceptance/characters/test_ursula_prepares_to_act_as_worker.py +++ b/tests/acceptance/characters/test_ursula_prepares_to_act_as_worker.py @@ -23,7 +23,7 @@ from nucypher.characters.lawful import Enrico from nucypher.characters.unlawful import Vladimir from nucypher.crypto.api import verify_eip_191 from nucypher.crypto.powers import SigningPower -from nucypher.policy.policies import Policy +from nucypher.policy.policies import BlockchainPolicy from tests.constants import INSECURE_DEVELOPMENT_PASSWORD from tests.utils.middleware import NodeIsDownMiddleware from tests.utils.ursula import make_decentralized_ursulas @@ -166,7 +166,7 @@ def test_blockchain_ursulas_reencrypt(blockchain_ursulas, blockchain_alice, bloc blockchain_alice.network_middleware = NodeIsDownMiddleware() blockchain_alice.network_middleware.node_is_down(blockchain_ursulas[0]) - with pytest.raises(Policy.Rejected): + with pytest.raises(BlockchainPolicy.NotEnoughBlockchainUrsulas): _policy = blockchain_alice.grant(bob=blockchain_bob, label=b'another-label', m=m, diff --git a/tests/acceptance/cli/test_bob.py b/tests/acceptance/cli/test_bob.py index 4c74dcb6a..028c98d66 100644 --- a/tests/acceptance/cli/test_bob.py +++ b/tests/acceptance/cli/test_bob.py @@ -123,6 +123,7 @@ def test_bob_retrieves_twice_via_cli(click_runner, federated_ursulas, custom_filepath_2, federated_alice, + federated_bob, mocker): teacher = list(federated_ursulas)[0] @@ -164,7 +165,7 @@ def test_bob_retrieves_twice_via_cli(click_runner, def substitute_bob(*args, **kwargs): log.info("Substituting the Policy's Bob in CLI runtime.") - this_fuckin_guy = enacted_federated_policy.bob + this_fuckin_guy = federated_bob somebody_else = Ursula.from_teacher_uri(teacher_uri=kwargs['teacher_uri'], min_stake=0, federated_only=True, diff --git a/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py b/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py index 4c7fadf64..1c9a39e49 100644 --- a/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py +++ b/tests/acceptance/cli/ursula/test_stake_via_allocation_contract.py @@ -557,7 +557,7 @@ def test_collect_rewards_integration(click_runner, handpicked_ursulas={ursula}) # Ensure that the handpicked Ursula was selected for the policy - assert ursula in blockchain_policy._enacted_arrangements + assert ursula.checksum_address in blockchain_policy.treasure_map.destinations # Bob learns about the new staker and joins the policy blockchain_bob.remember_node(node=ursula) diff --git a/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py b/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py index d92bc20aa..ede0d98ef 100644 --- a/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py +++ b/tests/acceptance/cli/ursula/test_stakeholder_and_ursula.py @@ -626,7 +626,7 @@ def test_collect_rewards_integration(click_runner, handpicked_ursulas={ursula}) # Ensure that the handpicked Ursula was selected for the policy - assert ursula in blockchain_policy._enacted_arrangements + assert ursula.checksum_address in blockchain_policy.treasure_map.destinations # Bob learns about the new staker and joins the policy blockchain_bob.start_learning_loop() diff --git a/tests/acceptance/network/test_network_actors.py b/tests/acceptance/network/test_network_actors.py index 3c9f6e35e..b609989a4 100644 --- a/tests/acceptance/network/test_network_actors.py +++ b/tests/acceptance/network/test_network_actors.py @@ -128,6 +128,7 @@ def test_vladimir_illegal_interface_key_does_not_propagate(blockchain_ursulas): def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_alice, idle_blockchain_policy, blockchain_ursulas): + target = list(blockchain_ursulas)[2] # First, let's imagine that Alice has sampled a Vladimir while making this policy. vladimir = Vladimir.from_target_ursula(target) @@ -137,21 +138,16 @@ def test_alice_refuses_to_make_arrangement_unless_ursula_is_valid(blockchain_ali vladimir.substantiate_stamp() vladimir._Teacher__interface_signature = signature - - class FakeArrangement: - federated = False - ursula = vladimir - - def __bytes__(self): - return b"" - vladimir.node_storage.store_node_certificate(certificate=target.certificate) + # Ideally, a fishy node shouldn't be present in `known_nodes`, + # but I guess we're testing the case when it became fishy somewhere between we learned about it + # and the proposal arrangement. + blockchain_alice.known_nodes[vladimir.checksum_address] = vladimir + with pytest.raises(vladimir.InvalidNode): - idle_blockchain_policy.propose_arrangement(ursula=vladimir, - network_middleware=blockchain_alice.network_middleware, - arrangement=FakeArrangement() - ) + idle_blockchain_policy._propose_arrangement(address=vladimir.checksum_address, + network_middleware=blockchain_alice.network_middleware) def test_treasure_map_cannot_be_duplicated(blockchain_ursulas, diff --git a/tests/fixtures.py b/tests/fixtures.py index 34c23be98..95003c649 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -238,13 +238,12 @@ def enacted_federated_policy(idle_federated_policy, federated_ursulas): # Alice has a policy in mind and knows of enough qualifies Ursulas; she crafts an offer for them. network_middleware = MockRestMiddleware() - idle_federated_policy.make_arrangements(network_middleware, handpicked_ursulas=federated_ursulas) - # REST call happens here, as does population of TreasureMap. - idle_federated_policy.enact(network_middleware) - idle_federated_policy.publishing_mutex.block_until_complete() + enacted_policy = idle_federated_policy.enact(network_middleware=network_middleware, + handpicked_ursulas=federated_ursulas) + enacted_policy.publishing_mutex.block_until_complete() - return idle_federated_policy + return enacted_policy @pytest.fixture(scope="module") @@ -276,12 +275,11 @@ def enacted_blockchain_policy(idle_blockchain_policy, blockchain_ursulas): # contract_end_datetime = maya.now() + datetime.timedelta(days=5) network_middleware = MockRestMiddleware() - idle_blockchain_policy.make_arrangements( - network_middleware, handpicked_ursulas=list(blockchain_ursulas)) - - idle_blockchain_policy.enact(network_middleware) # REST call happens here, as does population of TreasureMap. - idle_blockchain_policy.publishing_mutex.block_until_complete() - return idle_blockchain_policy + # REST call happens here, as does population of TreasureMap. + enacted_policy = idle_blockchain_policy.enact(network_middleware=network_middleware, + handpicked_ursulas=list(blockchain_ursulas)) + enacted_policy.publishing_mutex.block_until_complete() + return enacted_policy @pytest.fixture(scope="module") diff --git a/tests/integration/characters/control/conftest.py b/tests/integration/characters/control/conftest.py index 88ae72588..bf3c1bf56 100644 --- a/tests/integration/characters/control/conftest.py +++ b/tests/integration/characters/control/conftest.py @@ -121,7 +121,7 @@ def join_control_request(federated_bob, enacted_federated_policy): params = { 'label': enacted_federated_policy.label.decode(), - 'alice_verifying_key': bytes(enacted_federated_policy.alice.stamp).hex(), + 'alice_verifying_key': bytes(enacted_federated_policy.alice_verifying_key).hex(), } return method_name, params @@ -134,7 +134,7 @@ def retrieve_control_request(federated_bob, enacted_federated_policy, capsule_si params = { 'label': enacted_federated_policy.label.decode(), 'policy_encrypting_key': bytes(enacted_federated_policy.public_key).hex(), - 'alice_verifying_key': bytes(enacted_federated_policy.alice.stamp).hex(), + 'alice_verifying_key': bytes(enacted_federated_policy.alice_verifying_key).hex(), 'message_kit': b64encode(message_kit.to_bytes()).decode(), } return method_name, params diff --git a/tests/integration/characters/control/test_rpc_control_federated.py b/tests/integration/characters/control/test_rpc_control_federated.py index 710d0e374..fae6f3e52 100644 --- a/tests/integration/characters/control/test_rpc_control_federated.py +++ b/tests/integration/characters/control/test_rpc_control_federated.py @@ -72,10 +72,16 @@ def test_alice_rpc_character_control_grant(alice_rpc_test_client, grant_control_ assert 'jsonrpc' in response.data -def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_federated_policy): +def test_bob_rpc_character_control_join_policy(bob_rpc_controller, join_control_request, enacted_federated_policy, federated_bob, federated_ursulas): - # Simulate passing in a teacher-uri - enacted_federated_policy.bob.remember_node(list(enacted_federated_policy.accepted_ursulas)[0]) + for ursula in federated_ursulas: + if ursula.checksum_address in enacted_federated_policy.treasure_map.destinations: + # Simulate passing in a teacher-uri + federated_bob.remember_node(ursula) + break + else: + # Shouldn't happen + raise Exception("No known Ursulas present in the treasure map destinations") method_name, params = join_control_request request_data = {'method': method_name, 'params': params} diff --git a/tests/integration/characters/control/test_web_control_federated.py b/tests/integration/characters/control/test_web_control_federated.py index d79053d09..f0a32ce48 100644 --- a/tests/integration/characters/control/test_web_control_federated.py +++ b/tests/integration/characters/control/test_web_control_federated.py @@ -168,14 +168,20 @@ def test_alice_character_control_decrypt(alice_web_controller_test_client, assert response.status_code == 405 -def test_bob_character_control_join_policy(bob_web_controller_test_client, enacted_federated_policy): +def test_bob_character_control_join_policy(bob_web_controller_test_client, federated_bob, federated_ursulas, enacted_federated_policy): request_data = { 'label': enacted_federated_policy.label.decode(), - 'alice_verifying_key': bytes(enacted_federated_policy.alice.stamp).hex(), + 'alice_verifying_key': bytes(enacted_federated_policy.alice_verifying_key).hex(), } - # Simulate passing in a teacher-uri - enacted_federated_policy.bob.remember_node(list(enacted_federated_policy.accepted_ursulas)[0]) + for ursula in federated_ursulas: + if ursula.checksum_address in enacted_federated_policy.treasure_map.destinations: + # Simulate passing in a teacher-uri + federated_bob.remember_node(ursula) + break + else: + # Shouldn't happen + raise Exception("No known Ursulas present in the treasure map destinations") response = bob_web_controller_test_client.post('/join_policy', data=json.dumps(request_data)) assert b'{"result": {"policy_encrypting_key": "OK"}' in response.data # TODO diff --git a/tests/integration/characters/test_bob_handles_frags.py b/tests/integration/characters/test_bob_handles_frags.py index 2ba41896e..35a845648 100644 --- a/tests/integration/characters/test_bob_handles_frags.py +++ b/tests/integration/characters/test_bob_handles_frags.py @@ -31,7 +31,7 @@ from tests.utils.middleware import MockRestMiddleware, NodeIsDownMiddleware def test_bob_cannot_follow_the_treasure_map_in_isolation(enacted_federated_policy, federated_bob): # Assume for the moment that Bob has already received a TreasureMap, perhaps via a side channel. - hrac, treasure_map = enacted_federated_policy.hrac(), enacted_federated_policy.treasure_map + hrac, treasure_map = enacted_federated_policy.hrac, enacted_federated_policy.treasure_map # Bob knows of no Ursulas. assert len(federated_bob.known_nodes) == 0 @@ -88,7 +88,7 @@ def test_bob_can_follow_treasure_map_even_if_he_only_knows_of_one_node(enacted_f federated_only=True) # Again, let's assume that he received the TreasureMap via a side channel. - hrac, treasure_map = enacted_federated_policy.hrac(), enacted_federated_policy.treasure_map + hrac, treasure_map = enacted_federated_policy.hrac, enacted_federated_policy.treasure_map # Now, let's create a scenario in which Bob knows of only one node. assert len(bob.known_nodes) == 0 @@ -134,7 +134,7 @@ def test_bob_can_issue_a_work_order_to_a_specific_ursula(enacted_federated_polic """ # We pick up our story with Bob already having followed the treasure map above, ie: - hrac, treasure_map = enacted_federated_policy.hrac(), enacted_federated_policy.treasure_map + hrac, treasure_map = enacted_federated_policy.hrac, enacted_federated_policy.treasure_map federated_bob.start_learning_loop() federated_bob.follow_treasure_map(treasure_map=treasure_map, block=True, timeout=1) diff --git a/tests/integration/characters/test_bob_joins_policy_and_retrieves.py b/tests/integration/characters/test_bob_joins_policy_and_retrieves.py index 0b6e0ab40..98144bc4a 100644 --- a/tests/integration/characters/test_bob_joins_policy_and_retrieves.py +++ b/tests/integration/characters/test_bob_joins_policy_and_retrieves.py @@ -90,7 +90,6 @@ def test_bob_joins_policy_and_retrieves(federated_alice, handpicked_ursulas=set(rest_of_ursulas), ) - assert bob == policy.bob assert label == policy.label try: @@ -174,7 +173,7 @@ def test_bob_joins_policy_and_retrieves(federated_alice, bob.disenchant() -def test_treasure_map_serialization(enacted_federated_policy, federated_bob): +def test_treasure_map_serialization(enacted_federated_policy, federated_alice, federated_bob): treasure_map = enacted_federated_policy.treasure_map assert treasure_map.m is not None assert treasure_map.m != NO_DECRYPTION_PERFORMED @@ -191,8 +190,7 @@ def test_treasure_map_serialization(enacted_federated_policy, federated_bob): with pytest.raises(TypeError): deserialized_map.destinations - compass = federated_bob.make_compass_for_alice( - enacted_federated_policy.alice) + compass = federated_bob.make_compass_for_alice(federated_alice) deserialized_map.orient(compass) assert deserialized_map.m == treasure_map.m assert deserialized_map.destinations == treasure_map.destinations @@ -204,7 +202,7 @@ def test_bob_retrieves_with_treasure_map( enrico = capsule_side_channel.enrico message_kit = capsule_side_channel() treasure_map = enacted_federated_policy.treasure_map - alice_verifying_key = enacted_federated_policy.alice.stamp + alice_verifying_key = enacted_federated_policy.alice_verifying_key # Teach Bob about the network federated_bob.remember_node(list(federated_ursulas)[0]) @@ -246,7 +244,7 @@ def test_bob_retrieves_too_late(federated_bob, federated_ursulas, enrico = capsule_side_channel.enrico message_kit = capsule_side_channel() treasure_map = enacted_federated_policy.treasure_map - alice_verifying_key = enacted_federated_policy.alice.stamp + alice_verifying_key = enacted_federated_policy.alice_verifying_key with pytest.raises(Ursula.NotEnoughUrsulas): federated_bob.retrieve( diff --git a/tests/integration/characters/test_federated_grant_and_revoke.py b/tests/integration/characters/test_federated_grant_and_revoke.py index 00a207143..ef357d0fa 100644 --- a/tests/integration/characters/test_federated_grant_and_revoke.py +++ b/tests/integration/characters/test_federated_grant_and_revoke.py @@ -26,8 +26,7 @@ from nucypher.datastore.models import PolicyArrangement from nucypher.policy.collections import Revocation -@pytest.mark.usefixtures('federated_ursulas') -def test_federated_grant(federated_alice, federated_bob): +def test_federated_grant(federated_alice, federated_bob, federated_ursulas): # Setup the policy details m, n = 2, 3 policy_end_datetime = maya.now() + datetime.timedelta(days=5) @@ -37,27 +36,25 @@ def test_federated_grant(federated_alice, federated_bob): policy = federated_alice.grant(federated_bob, label, m=m, n=n, expiration=policy_end_datetime) # Check the policy ID - policy_id = keccak_digest(policy.label + bytes(policy.bob.stamp)) + policy_id = keccak_digest(policy.label + bytes(federated_bob.stamp)) assert policy_id == policy.id # Check Alice's active policies assert policy_id in federated_alice.active_policies assert federated_alice.active_policies[policy_id] == policy - # The number of accepted arrangements at least the number of Ursulas we're using (n) - assert len(policy._accepted_arrangements) >= n - # The number of actually enacted arrangements is exactly equal to n. - assert len(policy._enacted_arrangements) == n + assert len(policy.treasure_map.destinations) == n # Let's look at the enacted arrangements. - for ursula, kfrag in policy._enacted_arrangements.items(): - arrangement = policy._accepted_arrangements[ursula] + for ursula in federated_ursulas: + if ursula.checksum_address in policy.treasure_map.destinations: + arrangement_id = policy.treasure_map.destinations[ursula.checksum_address] - # Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID. - with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: - retrieved_kfrag = policy_arrangement.kfrag - assert kfrag == retrieved_kfrag + # Get the Arrangement from Ursula's datastore, looking up by the Arrangement ID. + with ursula.datastore.describe(PolicyArrangement, arrangement_id.hex()) as policy_arrangement: + retrieved_kfrag = policy_arrangement.kfrag + assert bool(retrieved_kfrag) # TODO: try to assemble them back? def test_federated_alice_can_decrypt(federated_alice, federated_bob): diff --git a/tests/integration/learning/test_discovery_phases.py b/tests/integration/learning/test_discovery_phases.py index d972d53ee..c571711a1 100644 --- a/tests/integration/learning/test_discovery_phases.py +++ b/tests/integration/learning/test_discovery_phases.py @@ -53,7 +53,7 @@ Node Discovery happens in phases. The first step is for a network actor to lear This is a straightforward step which we currently do with our own logic, but which may someday be replaced by something like libp2p, depending on the course of development of those sorts of tools. The introduction of hamming distance in particular is useful when wanting to learn about a small number (~500) of nodes among a much larger (25,000+) swarm. -This toolchain is not built for that scenario at this time, although it is not a stated nongoal. +This toolchain is not built for that scenario at this time, although it is not a stated nongoal. After this, our "Learning Loop" does four other things in sequence which are not part of the offering of node discovery tooling alone: @@ -134,7 +134,9 @@ def test_alice_verifies_ursula_just_in_time(fleet_of_highperf_mocked_ursulas, publish_treasure_map=False) # TODO: Make some assertions about policy. total_verified = sum(node.verified_node for node in highperf_mocked_alice.known_nodes) - assert total_verified == 30 + # Alice may be able to verify more than `n`, but certainly not less, + # otherwise `grant()` would fail. + assert total_verified >= 30 _POLICY_PRESERVER.append(policy) @@ -179,7 +181,7 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas, # defer.setDebugging(False) # Debugging messes up the timing here; comment this line out if you actually need it. - policy.publish_treasure_map(network_middleware=highperf_mocked_alice.network_middleware) # returns quickly. + policy.publish_treasure_map() # returns quickly. # defer.setDebugging(True) @@ -204,7 +206,7 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas, # Before Treasure Island (1741), this process took about 3 minutes. if partial_blocking_duration.total_seconds() > 10: pytest.fail( - f"Took too long ({partial_blocking_duration}) to contact {len(policy.publishing_mutex.nodes_contacted_during_partial_block)} nodes ({complete_distribution_time} total.)") + f"Took too long ({partial_blocking_duration}) to contact {len(nodes_that_have_the_map_when_we_unblock)} nodes ({complete_distribution_time} total.)") # TODO: Assert that no nodes outside those expected received the map. assert complete_distribution_time.total_seconds() < 20 diff --git a/tests/integration/network/test_failure_modes.py b/tests/integration/network/test_failure_modes.py index 4aca50191..fb6a870ab 100644 --- a/tests/integration/network/test_failure_modes.py +++ b/tests/integration/network/test_failure_modes.py @@ -69,7 +69,7 @@ def test_alice_can_grant_even_when_the_first_nodes_she_tries_are_down(federated_ for node in more_nodes: federated_alice.remember_node(node) - with pytest.raises(Policy.Rejected): + with pytest.raises(Policy.NotEnoughUrsulas): alice_grant_action() # Now let's let a few of them come up. @@ -87,11 +87,8 @@ def test_alice_can_grant_even_when_the_first_nodes_she_tries_are_down(federated_ # are among those still down. # policy = alice_grant_action() - # The number of accepted arrangements at least the number of Ursulas we're using (n) - assert len(policy._accepted_arrangements) >= n - # The number of actually enacted arrangements is exactly equal to n. - assert len(policy._enacted_arrangements) == n + assert len(policy.treasure_map.destinations) == n def test_node_has_changed_cert(federated_alice, federated_ursulas): diff --git a/tests/integration/network/test_network_upgrade.py b/tests/integration/network/test_network_upgrade.py index 0a4f7e7b7..f34f73cd9 100644 --- a/tests/integration/network/test_network_upgrade.py +++ b/tests/integration/network/test_network_upgrade.py @@ -27,16 +27,16 @@ from nucypher.datastore.models import PolicyArrangement from tests.utils.ursula import make_federated_ursulas -def test_alice_enacts_policies_in_policy_group_via_rest(enacted_federated_policy): +def test_alice_enacts_policies_in_policy_group_via_rest(enacted_federated_policy, federated_ursulas): """ Now that Alice has made a PolicyGroup, she can enact its policies, using Ursula's Public Key to encrypt each offer and transmitting them via REST. """ - for ursula, kfrag in enacted_federated_policy._enacted_arrangements.items(): - arrangement = enacted_federated_policy._accepted_arrangements[ursula] - with ursula.datastore.describe(PolicyArrangement, arrangement.id.hex()) as policy_arrangement: + for ursula in federated_ursulas: + arrangement_id = enacted_federated_policy.treasure_map.destinations[ursula.checksum_address] + with ursula.datastore.describe(PolicyArrangement, arrangement_id.hex()) as policy_arrangement: the_kfrag = policy_arrangement.kfrag - assert kfrag == the_kfrag + assert bool(the_kfrag) # TODO: This can be a more poignant assertion. @pytest_twisted.inlineCallbacks diff --git a/tests/integration/network/test_treasure_map_integration.py b/tests/integration/network/test_treasure_map_integration.py index b8e9f3925..3b5996d16 100644 --- a/tests/integration/network/test_treasure_map_integration.py +++ b/tests/integration/network/test_treasure_map_integration.py @@ -24,26 +24,22 @@ from nucypher.policy.collections import TreasureMap as FederatedTreasureMap from tests.utils.middleware import MockRestMiddleware -def test_alice_creates_policy_with_correct_hrac(idle_federated_policy): +def test_alice_creates_policy_with_correct_hrac(federated_alice, federated_bob, idle_federated_policy): """ Alice creates a Policy. It has the proper HRAC, unique per her, Bob, and the label """ - alice = idle_federated_policy.alice - bob = idle_federated_policy.bob - - assert idle_federated_policy.hrac() == keccak_digest(bytes(alice.stamp) - + bytes(bob.stamp) - + idle_federated_policy.label)[:16] + assert idle_federated_policy.hrac == keccak_digest(bytes(federated_alice.stamp) + + bytes(federated_bob.stamp) + + idle_federated_policy.label)[:16] -def test_alice_sets_treasure_map(enacted_federated_policy): +def test_alice_sets_treasure_map(federated_alice, federated_bob, enacted_federated_policy): """ Having enacted all the policies of a PolicyGroup, Alice creates a TreasureMap and ...... TODO """ - enacted_federated_policy.publish_treasure_map(network_middleware=MockRestMiddleware()) treasure_map_id = enacted_federated_policy.treasure_map.public_id() found = 0 - for node in enacted_federated_policy.bob.matching_nodes_among(enacted_federated_policy.alice.known_nodes): + for node in federated_bob.matching_nodes_among(federated_alice.known_nodes): with node.datastore.describe(DatastoreTreasureMap, treasure_map_id) as treasure_map_on_node: assert FederatedTreasureMap.from_bytes(treasure_map_on_node.treasure_map) == enacted_federated_policy.treasure_map found += 1 @@ -62,18 +58,18 @@ def test_treasure_map_stored_by_ursula_is_the_correct_one_for_bob(federated_alic treasure_map_on_network = FederatedTreasureMap.from_bytes(treasure_map_record.treasure_map) hrac_by_bob = federated_bob.construct_policy_hrac(federated_alice.stamp, enacted_federated_policy.label) - assert enacted_federated_policy.hrac() == hrac_by_bob + assert enacted_federated_policy.hrac == hrac_by_bob map_id_by_bob = federated_bob.construct_map_id(federated_alice.stamp, enacted_federated_policy.label) assert map_id_by_bob == treasure_map_on_network.public_id() -def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(enacted_federated_policy): +def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(federated_alice, federated_bob, enacted_federated_policy): """ Above, we showed that the TreasureMap saved on the network is the correct one for Bob. Here, we show that Bob can retrieve it with only the information about which he is privy pursuant to the PolicyGroup. """ - bob = enacted_federated_policy.bob + bob = federated_bob _previous_domain = bob.domain bob.domain = None # Bob has no knowledge of the network. @@ -82,7 +78,7 @@ def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(enacted_federated_poli # If Bob doesn't know about any Ursulas, he can't find the TreasureMap via the REST swarm: with pytest.raises(bob.NotEnoughTeachers): - treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp, + treasure_map_from_wire = bob.get_treasure_map(federated_alice.stamp, enacted_federated_policy.label) @@ -94,18 +90,18 @@ def test_bob_can_retrieve_the_treasure_map_and_decrypt_it(enacted_federated_poli bob.learn_from_teacher_node(eager=True) # Now he'll have better success finding that map. - treasure_map_from_wire = bob.get_treasure_map(enacted_federated_policy.alice.stamp, + treasure_map_from_wire = bob.get_treasure_map(federated_alice.stamp, enacted_federated_policy.label) assert enacted_federated_policy.treasure_map == treasure_map_from_wire -def test_treasure_map_is_legit(enacted_federated_policy): +def test_treasure_map_is_legit(federated_bob, enacted_federated_policy): """ Sure, the TreasureMap can get to Bob, but we also need to know that each Ursula in the TreasureMap is on the network. """ for ursula_address, _node_id in enacted_federated_policy.treasure_map: - if ursula_address not in enacted_federated_policy.bob.known_nodes.addresses(): + if ursula_address not in federated_bob.known_nodes.addresses(): pytest.fail(f"Bob didn't know about {ursula_address}") diff --git a/tests/metrics/grant_availability.py b/tests/metrics/grant_availability.py index 43c320d1c..2b07494d8 100755 --- a/tests/metrics/grant_availability.py +++ b/tests/metrics/grant_availability.py @@ -19,8 +19,8 @@ from pathlib import Path """ -WARNING: This script makes automatic transactions. -Do not use this script unless you know what you +WARNING: This script makes automatic transactions. +Do not use this script unless you know what you are doing and intend to spend ETH measuring live policy availability. """ @@ -136,7 +136,7 @@ def collect(alice: Alice, else: success += 1 policies[policy.public_key.hex()] = policy # track - print(f"PEK:{policy.public_key.hex()} | HRAC {policy.hrac().hex()}") + print(f"PEK:{policy.public_key.hex()} | HRAC {policy.hrac.hex()}") # timeit end = maya.now() From bbc4390f68fd38da6e689a88faf4ad46df4c992d Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Sat, 2 Jan 2021 23:37:17 -0800 Subject: [PATCH 7/9] Generalize NodeEngagementMutex --- .../finnegans-wake-demo.py | 2 +- examples/heartbeat_demo/alicia.py | 2 +- nucypher/characters/control/interfaces.py | 2 +- nucypher/characters/lawful.py | 2 +- nucypher/policy/policies.py | 217 ++++-------- nucypher/utilities/concurrency.py | 304 ++++++++++++++++ .../acceptance/network/test_network_actors.py | 4 +- .../acceptance/utilities/test_concurrency.py | 331 ++++++++++++++++++ tests/fixtures.py | 4 +- .../learning/test_discovery_phases.py | 12 +- 10 files changed, 712 insertions(+), 168 deletions(-) create mode 100644 nucypher/utilities/concurrency.py create mode 100644 tests/acceptance/utilities/test_concurrency.py diff --git a/examples/finnegans_wake_demo/finnegans-wake-demo.py b/examples/finnegans_wake_demo/finnegans-wake-demo.py index 30f04fcf7..a86cbd7c0 100644 --- a/examples/finnegans_wake_demo/finnegans-wake-demo.py +++ b/examples/finnegans_wake_demo/finnegans-wake-demo.py @@ -102,7 +102,7 @@ policy = ALICE.grant(BOB, expiration=policy_end_datetime) assert policy.public_key == policy_pubkey -policy.publishing_mutex.block_until_complete() +policy.treasure_map_publisher.block_until_complete() # Alice puts her public key somewhere for Bob to find later... alices_pubkey_bytes_saved_for_posterity = bytes(ALICE.stamp) diff --git a/examples/heartbeat_demo/alicia.py b/examples/heartbeat_demo/alicia.py index 8d325d5bf..768e59b91 100644 --- a/examples/heartbeat_demo/alicia.py +++ b/examples/heartbeat_demo/alicia.py @@ -141,7 +141,7 @@ policy = alicia.grant(bob=doctor_strange, m=m, n=n, expiration=policy_end_datetime) -policy.publishing_mutex.block_until_complete() +policy.treasure_map_publisher.block_until_complete() print("Done!") # For the demo, we need a way to share with Bob some additional info diff --git a/nucypher/characters/control/interfaces.py b/nucypher/characters/control/interfaces.py index e11abea7c..27c358d80 100644 --- a/nucypher/characters/control/interfaces.py +++ b/nucypher/characters/control/interfaces.py @@ -123,7 +123,7 @@ class AliceInterface(CharacterPublicInterface): expiration=expiration, discover_on_this_thread=True) - new_policy.publishing_mutex.block_until_success_is_reasonably_likely() + new_policy.treasure_map_publisher.block_until_success_is_reasonably_likely() response_data = {'treasure_map': new_policy.treasure_map, 'policy_encrypting_key': new_policy.public_key, diff --git a/nucypher/characters/lawful.py b/nucypher/characters/lawful.py index 7bc321fdc..841fd2a25 100644 --- a/nucypher/characters/lawful.py +++ b/nucypher/characters/lawful.py @@ -351,7 +351,7 @@ class Alice(Character, BlockchainPolicyAuthor): self.add_active_policy(enacted_policy) if publish_treasure_map and block_until_success_is_reasonably_likely: - enacted_policy.publishing_mutex.block_until_success_is_reasonably_likely() + enacted_policy.treasure_map_publisher.block_until_success_is_reasonably_likely() return enacted_policy def get_policy_encrypting_key_from_label(self, label: bytes) -> UmbralPublicKey: diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 34c0410ef..abac5456a 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -47,6 +47,7 @@ from nucypher.crypto.powers import DecryptingPower, SigningPower, TransactingPow from nucypher.crypto.utils import construct_policy_id from nucypher.network.exceptions import NodeSeemsToBeDown from nucypher.network.middleware import RestMiddleware +from nucypher.utilities.concurrency import WorkerPool, AllAtOnceFactory from nucypher.utilities.logging import Logger @@ -90,151 +91,48 @@ class Arrangement: return f"Arrangement(client_key={self.alice_verifying_key})" -class NodeEngagementMutex: - """ - TODO: Does this belong on middleware? +class TreasureMapPublisher: - TODO: There are a couple of ways this can break. If one fo the jobs hangs, the whole thing will hang. Also, - if there are fewer successfully completed than percent_to_complete_before_release, the partial queue will never - release. - - TODO: Make registry per... I guess Policy? It's weird to be able to accidentally enact again. - """ - log = Logger("Policy") + log = Logger('TreasureMapPublisher') def __init__(self, - callable_to_engage, # TODO: typing.Protocol + worker, nodes, - network_middleware, percent_to_complete_before_release=5, - note=None, threadpool_size=120, - timeout=20, - *args, - **kwargs): - self.f = callable_to_engage - self.nodes = nodes - self.network_middleware = network_middleware - self.args = args - self.kwargs = kwargs + timeout=20): - self.completed = {} - self.failed = {} + self._total = len(nodes) + self._block_until_this_many_are_complete = math.ceil(len(nodes) * percent_to_complete_before_release / 100) + self._worker_pool = WorkerPool(worker=worker, + value_factory=AllAtOnceFactory(nodes), + target_successes=self._block_until_this_many_are_complete, + timeout=timeout, + stagger_timeout=0, + threadpool_size=threadpool_size) - self._started = False - self._finished = False - self.timeout = timeout - - self.percent_to_complete_before_release = percent_to_complete_before_release - self._partial_queue = Queue() - self._completion_queue = Queue() - self._block_until_this_many_are_complete = math.ceil( - len(nodes) * self.percent_to_complete_before_release / 100) - self.nodes_contacted_during_partial_block = False - self.when_complete = Deferred() # TODO: Allow cancelling via KB Interrupt or some other way? - - if note is None: - self._repr = f"{callable_to_engage} to {len(nodes)} nodes" - else: - self._repr = f"{note}: {callable_to_engage} to {len(nodes)} nodes" - - self._threadpool = ThreadPool(minthreads=threadpool_size, maxthreads=threadpool_size, name=self._repr) - self.log.info(f"NEM spinning up {self._threadpool}") - self._threadpool.callInThread(self._bail_on_timeout) - - def __repr__(self): - return self._repr - - def _bail_on_timeout(self): - while True: - if self.when_complete.called: - return - duration = datetime.datetime.now() - self._started - if duration.seconds >= self.timeout: - try: - self._threadpool.stop() - except AlreadyQuit: - raise RuntimeError("Is there a race condition here? If this line is being hit, it's a bug.") - raise RuntimeError(f"Timed out. Nodes completed: {self.completed}") - time.sleep(.5) - - def block_until_success_is_reasonably_likely(self): - """ - https://www.youtube.com/watch?v=OkSLswPSq2o - """ - if len(self.completed) < self._block_until_this_many_are_complete: - try: - completed_for_reasonable_likelihood_of_success = self._partial_queue.get(timeout=self.timeout) # TODO: Shorter timeout here? - except Empty: - raise RuntimeError(f"Timed out. Nodes completed: {self.completed}") - self.log.debug(f"{len(self.completed)} nodes were contacted while blocking for a little while.") - return completed_for_reasonable_likelihood_of_success - else: - return self.completed - - - def block_until_complete(self): - if self.total_disposed() < len(self.nodes): - try: - _ = self._completion_queue.get(timeout=self.timeout) # Interesting opportuntiy to pass some data, like the list of contacted nodes above. - except Empty: - raise RuntimeError(f"Timed out. Nodes completed: {self.completed}") - if not reactor.running and not self._threadpool.joined: - # If the reactor isn't running, the user *must* call this, because this is where we stop. - self._threadpool.stop() - - def _handle_success(self, response, node): - if response.status_code == 201: - self.completed[node] = response - else: - assert False # TODO: What happens if this is a 300 or 400 level response? (A 500 response will propagate as an error and be handled in the errback chain.) - if self.nodes_contacted_during_partial_block: - self._consider_finalizing() - else: - if len(self.completed) >= self._block_until_this_many_are_complete: - contacted = tuple(self.completed.keys()) - self.nodes_contacted_during_partial_block = contacted - self.log.debug(f"Blocked for a little while, completed {contacted} nodes") - self._partial_queue.put(contacted) - return response - - def _handle_error(self, failure, node): - self.failed[node] = failure # TODO: Add a failfast mode? - self._consider_finalizing() - self.log.warn(f"{node} failed: {failure}") - - def total_disposed(self): - return len(self.completed) + len(self.failed) - - def _consider_finalizing(self): - if not self._finished: - if self.total_disposed() == len(self.nodes): - # TODO: Consider whether this can possibly hang. - self._finished = True - if reactor.running: - reactor.callInThread(self._threadpool.stop) - self._completion_queue.put(self.completed) - self.when_complete.callback(self.completed) - self.log.info(f"{self} finished.") - else: - raise RuntimeError("Already finished.") - - def _engage_node(self, node): - maybe_coro = self.f(node, network_middleware=self.network_middleware, *self.args, **self.kwargs) - - d = ensureDeferred(maybe_coro) - d.addCallback(self._handle_success, node) - d.addErrback(self._handle_error, node) - return d + @property + def completed(self): + # TODO: lock dict before copying? + return self._worker_pool.get_successes() def start(self): - if self._started: - raise RuntimeError("Already started.") - self._started = datetime.datetime.now() - self.log.info(f"NEM Starting {self._threadpool}") - for node in self.nodes: - self._threadpool.callInThread(self._engage_node, node) - self._threadpool.start() + self.log.info(f"TreasureMapPublisher starting") + self._worker_pool.start() + if reactor.running: + reactor.callInThread(self.block_until_complete) + + def block_until_success_is_reasonably_likely(self): + # Note: `OutOfValues`/`TimedOut` may be raised here, which means we didn't even get to + # `percent_to_complete_before_release` successes. For now just letting it fire. + self._worker_pool.block_until_target_successes() + completed = self.completed + self.log.debug(f"The minimal amount of nodes ({len(completed)}) was contacted " + "while blocking for treasure map publication.") + return completed + + def block_until_complete(self): + self._worker_pool.join() class MergedReservoir: @@ -534,23 +432,34 @@ class Policy(ABC): return treasure_map - def _make_publishing_mutex(self, - treasure_map: 'TreasureMap', - network_middleware: RestMiddleware, - ) -> NodeEngagementMutex: - - async def put_treasure_map_on_node(node, network_middleware): - response = network_middleware.put_treasure_map_on_node(node=node, - map_payload=bytes(treasure_map)) - return response + def _make_publisher(self, + treasure_map: 'TreasureMap', + network_middleware: RestMiddleware, + ) -> TreasureMapPublisher: # TODO (#2516): remove hardcoding of 8 nodes self.alice.block_until_number_of_known_nodes_is(8, timeout=2, learn_on_this_thread=True) target_nodes = self.bob.matching_nodes_among(self.alice.known_nodes) + treasure_map_bytes = bytes(treasure_map) # prevent the closure from holding the reference - return NodeEngagementMutex(callable_to_engage=put_treasure_map_on_node, - nodes=target_nodes, - network_middleware=network_middleware) + def put_treasure_map_on_node(node): + try: + response = network_middleware.put_treasure_map_on_node(node=node, + map_payload=treasure_map_bytes) + except Exception as e: + self.log.warn(f"Putting treasure map on {node} failed: {e}") + raise + + if response.status_code == 201: + return response + else: + message = f"Putting treasure map on {node} failed with response status: {response.status}" + self.log.warn(message) + # TODO: What happens if this is a 300 or 400 level response? + raise Exception(message) + + return TreasureMapPublisher(worker=put_treasure_map_on_node, + nodes=target_nodes) def enact(self, network_middleware: RestMiddleware, @@ -572,8 +481,8 @@ class Policy(ABC): treasure_map = self._make_treasure_map(network_middleware=network_middleware, arrangements=arrangements) - publishing_mutex = self._make_publishing_mutex(treasure_map=treasure_map, - network_middleware=network_middleware) + treasure_map_publisher = self._make_publisher(treasure_map=treasure_map, + network_middleware=network_middleware) revocation_kit = RevocationKit(treasure_map, self.alice.stamp) enacted_policy = EnactedPolicy(self._id, @@ -581,7 +490,7 @@ class Policy(ABC): self.label, self.public_key, treasure_map, - publishing_mutex, + treasure_map_publisher, revocation_kit, self.alice.stamp) @@ -730,7 +639,7 @@ class BlockchainPolicy(Policy): def _enact_arrangements(self, network_middleware, arrangements, - publish_treasure_map=True) -> NodeEngagementMutex: + publish_treasure_map=True) -> TreasureMapPublisher: transaction = self._publish_to_blockchain(list(arrangements)) return super()._enact_arrangements(network_middleware=network_middleware, arrangements=arrangements, @@ -756,7 +665,7 @@ class EnactedPolicy: label: bytes, public_key: UmbralPublicKey, treasure_map: 'TreasureMap', - publishing_mutex: NodeEngagementMutex, + treasure_map_publisher: TreasureMapPublisher, revocation_kit: RevocationKit, alice_verifying_key: UmbralPublicKey, ): @@ -766,10 +675,10 @@ class EnactedPolicy: self.label = label self.public_key = public_key self.treasure_map = treasure_map - self.publishing_mutex = publishing_mutex + self.treasure_map_publisher = treasure_map_publisher self.revocation_kit = revocation_kit self.n = len(self.treasure_map.destinations) self.alice_verifying_key = alice_verifying_key def publish_treasure_map(self): - self.publishing_mutex.start() + self.treasure_map_publisher.start() diff --git a/nucypher/utilities/concurrency.py b/nucypher/utilities/concurrency.py new file mode 100644 index 000000000..779ffb32a --- /dev/null +++ b/nucypher/utilities/concurrency.py @@ -0,0 +1,304 @@ +""" +This file is part of nucypher. + +nucypher is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +nucypher is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with nucypher. If not, see . +""" + +import time +from queue import Queue, Empty +from threading import Thread, Event, Lock, Timer, get_ident +from typing import Callable, List, Any, Optional, Dict + +from constant_sorrow.constants import PRODUCER_STOPPED, TIMEOUT_TRIGGERED +from twisted._threads import AlreadyQuit +from twisted.python.threadpool import ThreadPool + + +class Success: + def __init__(self, value, result): + self.value = value + self.result = result + +class Failure: + def __init__(self, value, exception): + self.value = value + self.exception = exception + + +class Cancelled(Exception): + pass + + +class SetOnce: + """ + A convenience wrapper for a value that can be set once (which can be waited on), + and cannot be overwritten (unless cleared). + """ + + def __init__(self): + self._lock = Lock() + self._set_event = Event() + self._value = None + + def set(self, value): + with self._lock: + if not self._set_event.is_set(): + self._value = value + self._set_event.set() + + def is_set(self): + return self._set_event.is_set() + + def get_and_clear(self): + with self._lock: + value = self._value + self._value = None + self._set_event.clear() + return value + + def get(self): + self._set_event.wait() + return self._value + + +class WorkerPool: + """ + A generalized class that can start multiple workers in a thread pool with values + drawn from the given value factory object, + and wait for their completion and a given number of successes + (a worker returning something without throwing an exception). + """ + + class TimedOut(Exception): + "Raised if waiting for the target number of successes timed out." + + class OutOfValues(Exception): + "Raised if the value factory is out of values, but the target number was not reached." + + def __init__(self, + worker: Callable[[Any], Any], + value_factory: Callable[[int], Optional[List[Any]]], + target_successes, + timeout: float, + stagger_timeout: float = 0, + threadpool_size: int = None): + + # TODO: make stagger_timeout a part of the value factory? + + self._worker = worker + self._value_factory = value_factory + self._timeout = timeout + self._stagger_timeout = stagger_timeout + self._target_successes = target_successes + + thread_pool_kwargs = {} + if threadpool_size is not None: + thread_pool_kwargs['minthreads'] = threadpool_size + thread_pool_kwargs['maxthreads'] = threadpool_size + self._threadpool = ThreadPool(**thread_pool_kwargs) + + # These three tasks must be run in separate threads + # to avoid being blocked by workers in the thread pool. + self._bail_on_timeout_thread = Thread(target=self._bail_on_timeout) + self._produce_values_thread = Thread(target=self._produce_values) + self._process_results_thread = Thread(target=self._process_results) + + self._successes = {} + self._failures = {} + self._started_tasks = 0 + self._finished_tasks = 0 + + self._cancel_event = Event() + self._result_queue = Queue() + self._target_value = SetOnce() + self._unexpected_error = SetOnce() + self._results_lock = Lock() + self._stopped = False + + def start(self): + # TODO: check if already started? + self._threadpool.start() + self._produce_values_thread.start() + self._process_results_thread.start() + self._bail_on_timeout_thread.start() + + def cancel(self): + """ + Cancels the tasks enqueued in the thread pool and stops the producer thread. + """ + self._cancel_event.set() + + def join(self): + """ + Waits for all the threads to finish. + Can be called several times. + """ + + if self._stopped: + return # or raise AlreadyStopped? + + self._produce_values_thread.join() + self._process_results_thread.join() + self._bail_on_timeout_thread.join() + + # protect from a possible race + try: + self._threadpool.stop() + except AlreadyQuit: + pass + self._stopped = True + + if self._unexpected_error.is_set(): + e = self._unexpected_error.get() + raise RuntimeError(f"Unexpected error in the producer thread: {e}") + + def _sleep(self, timeout): + """ + Sleeps for a given timeout, can be interrupted by a cancellation event. + """ + if self._cancel_event.wait(timeout): + raise Cancelled + + def block_until_target_successes(self) -> Dict: + """ + Blocks until the target number of successes is reached. + Returns a dictionary of values matched to results. + Can be called several times. + """ + if self._unexpected_error.is_set(): + # So that we don't raise it again when join() is called + e = self._unexpected_error.get_and_clear() + raise RuntimeError(f"Unexpected error in the producer thread: {e}") + + result = self._target_value.get() + if result == TIMEOUT_TRIGGERED: + raise self.TimedOut() + elif result == PRODUCER_STOPPED: + raise self.OutOfValues() + return result + + def get_failures(self) -> Dict: + """ + Get the current failures, as a dictionary of values to thrown exceptions. + """ + with self._results_lock: + return dict(self._failures) + + def get_successes(self) -> Dict: + """ + Get the current successes, as a dictionary of values to worker return values. + """ + with self._results_lock: + return dict(self._successes) + + def _bail_on_timeout(self): + """ + A service thread that cancels the pool on timeout. + """ + if not self._cancel_event.wait(timeout=self._timeout): + self._target_value.set(TIMEOUT_TRIGGERED) + self._cancel_event.set() + + def _worker_wrapper(self, value): + """ + A wrapper that catches exceptions thrown by the worker + and sends the results to the processing thread. + """ + try: + # If we're in the cancelled state, interrupt early + self._sleep(0) + + result = self._worker(value) + self._result_queue.put(Success(value, result)) + except Cancelled as e: + self._result_queue.put(e) + except BaseException as e: + self._result_queue.put(Failure(value, str(e))) + + def _process_results(self): + """ + A service thread that processes worker results + and waits for the target number of successes to be reached. + """ + producer_stopped = False + success_event_reached = False + while True: + result = self._result_queue.get() + + if result == PRODUCER_STOPPED: + producer_stopped = True + else: + self._finished_tasks += 1 + if isinstance(result, Success): + with self._results_lock: + self._successes[result.value] = result.result + len_successes = len(self._successes) + if not success_event_reached and len_successes == self._target_successes: + # A protection for the case of repeating values. + # Only trigger the target value once. + success_event_reached = True + self._target_value.set(self.get_successes()) + if isinstance(result, Failure): + with self._results_lock: + self._failures[result.value] = result.exception + + if producer_stopped and self._finished_tasks == self._started_tasks: + self.cancel() # to cancel the timeout thread + self._target_value.set(PRODUCER_STOPPED) + break + + def _produce_values(self): + while True: + try: + with self._results_lock: + len_successes = len(self._successes) + batch = self._value_factory(len_successes) + if not batch: + break + + self._started_tasks += len(batch) + for value in batch: + # There is a possible race between `callInThread()` and `stop()`, + # But we never execute them at the same time, + # because `join()` checks that the producer thread is stopped. + self._threadpool.callInThread(self._worker_wrapper, value) + + self._sleep(self._stagger_timeout) + + except Cancelled: + break + + except BaseException as e: + self._unexpected_error.set(e) + self.cancel() + break + + self._result_queue.put(PRODUCER_STOPPED) + + +class AllAtOnceFactory: + """ + A simple value factory that returns all its values in a single batch. + """ + + def __init__(self, values): + self.values = values + self._produced = False + + def __call__(self, _successes): + if self._produced: + return None + else: + self._produced = True + return self.values diff --git a/tests/acceptance/network/test_network_actors.py b/tests/acceptance/network/test_network_actors.py index b609989a4..a72384cb5 100644 --- a/tests/acceptance/network/test_network_actors.py +++ b/tests/acceptance/network/test_network_actors.py @@ -168,8 +168,8 @@ def test_treasure_map_cannot_be_duplicated(blockchain_ursulas, expiration=policy_end_datetime) matching_ursulas = blockchain_bob.matching_nodes_among(blockchain_ursulas) - completed_ursulas = policy.publishing_mutex.block_until_success_is_reasonably_likely() - # Ursulas in publishing_mutex are not real Ursulas, but just some metadata of remote ones. + completed_ursulas = policy.treasure_map_publisher.block_until_success_is_reasonably_likely() + # Ursulas in `treasure_map_publisher` are not real Ursulas, but just some metadata of remote ones. # We need a real one to access its datastore. first_completed_ursula = [ursula for ursula in matching_ursulas if ursula in completed_ursulas][0] diff --git a/tests/acceptance/utilities/test_concurrency.py b/tests/acceptance/utilities/test_concurrency.py new file mode 100644 index 000000000..63be02ddc --- /dev/null +++ b/tests/acceptance/utilities/test_concurrency.py @@ -0,0 +1,331 @@ +import random +import time +from typing import Iterable, Tuple, List, Callable + +import pytest + +from nucypher.utilities.concurrency import WorkerPool, AllAtOnceFactory + + +@pytest.fixture(scope='function') +def join_worker_pool(request): + """ + Makes sure the pool is properly joined at the end of the test, + so that one doesn't have to wrap the whole test in a try-finally block. + """ + pool_to_join = None + def register(pool): + nonlocal pool_to_join + pool_to_join = pool + yield register + pool_to_join.join() + + +class WorkerRule: + def __init__(self, fails: bool = False, timeout_min: float = 0, timeout_max: float = 0): + self.fails = fails + self.timeout_min = timeout_min + self.timeout_max = timeout_max + + +class WorkerOutcome: + def __init__(self, fails: bool, timeout: float): + self.fails = fails + self.timeout = timeout + + def __call__(self, value): + time.sleep(self.timeout) + if self.fails: + raise Exception(f"Worker for {value} failed") + else: + return value + + +def generate_workers(rules: Iterable[Tuple[WorkerRule, int]], seed=None): + rng = random.Random(seed) + outcomes = [] + for rule, quantity in rules: + for _ in range(quantity): + timeout = rng.uniform(rule.timeout_min, rule.timeout_max) + outcomes.append(WorkerOutcome(rule.fails, timeout)) + + rng.shuffle(outcomes) + + values = list(range(len(outcomes))) + + def worker(value): + return outcomes[value](value) + + return {value: outcomes[value] for value in values}, worker + + +def test_wait_for_successes(join_worker_pool): + """ + Checks that `block_until_target_successes()` returns in time and gives all the successes, + if there were enough of them. + """ + + outcomes, worker = generate_workers( + [ + (WorkerRule(timeout_min=0.5, timeout_max=1.5), 10), + (WorkerRule(fails=True, timeout_min=1, timeout_max=3), 20), + ], + seed=123) + + factory = AllAtOnceFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=30) + join_worker_pool(pool) + + t_start = time.monotonic() + pool.start() + successes = pool.block_until_target_successes() + t_end = time.monotonic() + + failures = pool.get_failures() + assert all(outcomes[value].fails for value in failures) + + assert len(successes) == 10 + + # We have more threads in the pool than the workers, + # so all the successful ones should be able to finish right away. + assert t_end - t_start < 2 + + # Should be able to do it several times + successes = pool.block_until_target_successes() + assert len(successes) == 10 + + +def test_wait_for_successes_out_of_values(join_worker_pool): + """ + Checks that if there weren't enough successful workers, `block_until_target_successes()` + raises an exception when the value factory is exhausted. + """ + + outcomes, worker = generate_workers( + [ + (WorkerRule(timeout_min=0.5, timeout_max=1.5), 9), + (WorkerRule(fails=True, timeout_min=0.5, timeout_max=1.5), 20), + ], + seed=123) + + factory = AllAtOnceFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=15) + join_worker_pool(pool) + + t_start = time.monotonic() + pool.start() + with pytest.raises(WorkerPool.OutOfValues): + successes = pool.block_until_target_successes() + t_end = time.monotonic() + + # We have roughly 2 workers per thread, so it shouldn't take longer than 1.5s (max timeout) * 2 + assert t_end - t_start < 4 + + +def test_wait_for_successes_timed_out(join_worker_pool): + """ + Checks that if enough successful workers can't finish before the timeout, we get an exception. + """ + + outcomes, worker = generate_workers( + [ + (WorkerRule(timeout_min=0, timeout_max=0.5), 9), + (WorkerRule(timeout_min=1.5, timeout_max=2.5), 1), + (WorkerRule(fails=True, timeout_min=1.5, timeout_max=2.5), 20), + ], + seed=123) + + factory = AllAtOnceFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=1, threadpool_size=30) + join_worker_pool(pool) + + t_start = time.monotonic() + pool.start() + with pytest.raises(WorkerPool.TimedOut): + successes = pool.block_until_target_successes() + t_end = time.monotonic() + + # Even though timeout is 1, there are long-running workers which we can't interupt. + assert t_end - t_start < 3 + + +def test_join(join_worker_pool): + """ + Test joining the pool. + """ + + outcomes, worker = generate_workers( + [ + (WorkerRule(timeout_min=0.5, timeout_max=1.5), 9), + (WorkerRule(fails=True, timeout_min=0.5, timeout_max=1.5), 20), + ], + seed=123) + + factory = AllAtOnceFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=1, threadpool_size=30) + join_worker_pool(pool) + + t_start = time.monotonic() + pool.start() + pool.join() + t_end = time.monotonic() + + pool.join() # should work the second time too + + # Even though timeout is 1, there are long-running workers which we can't interupt. + assert t_end - t_start < 3 + + +class BatchFactory: + + def __init__(self, values): + self.values = values + self.batch_sizes = [] + + def __call__(self, successes): + if successes == 10: + return None + batch_size = 10 - successes + if len(self.values) >= batch_size: + batch = self.values[:batch_size] + self.batch_sizes.append(len(batch)) + self.values = self.values[batch_size:] + return batch + elif len(self.values) > 0: + self.batch_sizes.append(len(self.values)) + return self.values + self.values = None + else: + return None + + +def test_batched_value_generation(join_worker_pool): + """ + Tests a value factory that gives out value batches in portions. + """ + + outcomes, worker = generate_workers( + [ + (WorkerRule(timeout_min=0.5, timeout_max=1.5), 80), + (WorkerRule(fails=True, timeout_min=0.5, timeout_max=1.5), 80), + ], + seed=123) + + factory = BatchFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10, stagger_timeout=0.5) + join_worker_pool(pool) + + t_start = time.monotonic() + pool.start() + successes = pool.block_until_target_successes() + pool.cancel() + pool.join() + t_end = time.monotonic() + + assert len(successes) == 10 + + # Check that batch sizes in the factory were getting progressively smaller + # as the number of successes grew. + assert all(factory.batch_sizes[i] >= factory.batch_sizes[i+1] + for i in range(len(factory.batch_sizes) - 1)) + + # Since we canceled the pool, no more workers will be started and we will finish faster + assert t_end - t_start < 4 + + successes_copy = pool.get_successes() + failures_copy = pool.get_failures() + + assert all(value in successes_copy for value in successes) + + +def test_cancel_waiting_workers(join_worker_pool): + """ + If we have a small pool and many workers, it is possible for workers to be enqueued + one after another in one thread. + We test that if we call `cancel()`, these enqueued workers are cancelled too. + """ + + outcomes, worker = generate_workers( + [ + (WorkerRule(timeout_min=1, timeout_max=1), 100), + ], + seed=123) + + factory = AllAtOnceFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10) + join_worker_pool(pool) + + t_start = time.monotonic() + pool.start() + pool.block_until_target_successes() + pool.cancel() + pool.join() + t_end = time.monotonic() + + # We have 10 threads in the pool and 100 workers that are all enqueued at once at the start. + # If we didn't check for the cancel condition, we would have to wait for 10 seconds. + # We get 10 successes after 1s and cancel the workers, + # but the next workers in each thread have already started, so we have to wait for another 1s. + assert t_end - t_start < 2.5 + + +class BuggyFactory: + + def __init__(self, values): + self.values = values + + def __call__(self, successes): + if self.values is not None: + values = self.values + self.values = None + return values + else: + raise Exception("Buggy factory") + + +def test_buggy_factory_raises_on_block(join_worker_pool): + """ + Tests that if there is an exception thrown in the value factory, + it is caught in the first call to `block_until_target_successes()`. + """ + + outcomes, worker = generate_workers( + [(WorkerRule(timeout_min=1, timeout_max=1), 100)], + seed=123) + + factory = BuggyFactory(list(outcomes)) + + # Non-zero stagger timeout to make BuggyFactory raise its error only in 1.5s, + # So that we got enough successes for `block_until_target_successes()`. + pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10, stagger_timeout=1.5) + join_worker_pool(pool) + + pool.start() + time.sleep(2) # wait for the stagger timeout to finish + with pytest.raises(RuntimeError, match="Unexpected error in the producer thread"): + pool.block_until_target_successes() + # Further calls to `block_until_target_successes()` or `join()` don't throw the error. + pool.block_until_target_successes() + pool.cancel() + pool.join() + + +def test_buggy_factory_raises_on_join(join_worker_pool): + """ + Tests that if there is an exception thrown in the value factory, + it is caught in the first call to `join()`. + """ + + outcomes, worker = generate_workers( + [(WorkerRule(timeout_min=1, timeout_max=1), 100)], + seed=123) + + factory = BuggyFactory(list(outcomes)) + pool = WorkerPool(worker, factory, target_successes=10, timeout=10, threadpool_size=10) + join_worker_pool(pool) + + pool.start() + pool.cancel() + with pytest.raises(RuntimeError, match="Unexpected error in the producer thread"): + pool.join() + pool.join() diff --git a/tests/fixtures.py b/tests/fixtures.py index 95003c649..52f315865 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -241,7 +241,7 @@ def enacted_federated_policy(idle_federated_policy, federated_ursulas): # REST call happens here, as does population of TreasureMap. enacted_policy = idle_federated_policy.enact(network_middleware=network_middleware, handpicked_ursulas=federated_ursulas) - enacted_policy.publishing_mutex.block_until_complete() + enacted_policy.treasure_map_publisher.block_until_complete() return enacted_policy @@ -278,7 +278,7 @@ def enacted_blockchain_policy(idle_blockchain_policy, blockchain_ursulas): # REST call happens here, as does population of TreasureMap. enacted_policy = idle_blockchain_policy.enact(network_middleware=network_middleware, handpicked_ursulas=list(blockchain_ursulas)) - enacted_policy.publishing_mutex.block_until_complete() + enacted_policy.treasure_map_publisher.block_until_complete() return enacted_policy diff --git a/tests/integration/learning/test_discovery_phases.py b/tests/integration/learning/test_discovery_phases.py index c571711a1..ede3634c0 100644 --- a/tests/integration/learning/test_discovery_phases.py +++ b/tests/integration/learning/test_discovery_phases.py @@ -140,7 +140,7 @@ def test_alice_verifies_ursula_just_in_time(fleet_of_highperf_mocked_ursulas, _POLICY_PRESERVER.append(policy) -# @pytest_twisted.inlineCallbacks # TODO: Why does this, in concert with yield policy.publishing_mutex.when_complete, hang? +# @pytest_twisted.inlineCallbacks # TODO: Why does this, in concert with yield policy.treasure_map_publisher.when_complete, hang? def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas, highperf_mocked_alice, highperf_mocked_bob): @@ -186,21 +186,21 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas, # defer.setDebugging(True) # PART II: We block for a little while to ensure that the distribution is going well. - nodes_that_have_the_map_when_we_unblock = policy.publishing_mutex.block_until_success_is_reasonably_likely() + nodes_that_have_the_map_when_we_unblock = policy.treasure_map_publisher.block_until_success_is_reasonably_likely() little_while_ended_at = datetime.now() # The number of nodes having the map is at least the minimum to have unblocked. - assert len(nodes_that_have_the_map_when_we_unblock) >= policy.publishing_mutex._block_until_this_many_are_complete + assert len(nodes_that_have_the_map_when_we_unblock) >= policy.treasure_map_publisher._block_until_this_many_are_complete # The number of nodes having the map is approximately the number you'd expect from full utilization of Alice's publication threadpool. # TODO: This line fails sometimes because the loop goes too fast. - # assert len(nodes_that_have_the_map_when_we_unblock) == pytest.approx(policy.publishing_mutex._block_until_this_many_are_complete, .2) + # assert len(nodes_that_have_the_map_when_we_unblock) == pytest.approx(policy.treasure_map_publisher._block_until_this_many_are_complete, .2) # PART III: Having made proper assertions about the publication call and the first block, we allow the rest to # happen in the background and then ensure that each phase was timely. # This will block until the distribution is complete. - policy.publishing_mutex.block_until_complete() + policy.treasure_map_publisher.block_until_complete() complete_distribution_time = datetime.now() - started partial_blocking_duration = little_while_ended_at - started # Before Treasure Island (1741), this process took about 3 minutes. @@ -213,7 +213,7 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas, # But with debuggers and other processes running on laptops, we give a little leeway. # We have the same number of successful responses as nodes we expected to have the map. - assert len(policy.publishing_mutex.completed) == len(nodes_we_expect_to_have_the_map) + assert len(policy.treasure_map_publisher.completed) == len(nodes_we_expect_to_have_the_map) nodes_that_got_the_map = sum( u._its_down_there_somewhere_let_me_take_another_look is True for u in nodes_we_expect_to_have_the_map) assert nodes_that_got_the_map == len(nodes_we_expect_to_have_the_map) From 332e6ad38687f9dca8657d997f5b3899a1792d78 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Sun, 3 Jan 2021 11:30:57 -0800 Subject: [PATCH 8/9] Parallel proposals --- nucypher/characters/control/interfaces.py | 3 +- nucypher/characters/lawful.py | 6 +- nucypher/policy/policies.py | 69 +++++++---------------- 3 files changed, 24 insertions(+), 54 deletions(-) diff --git a/nucypher/characters/control/interfaces.py b/nucypher/characters/control/interfaces.py index 27c358d80..b3f94e94f 100644 --- a/nucypher/characters/control/interfaces.py +++ b/nucypher/characters/control/interfaces.py @@ -120,8 +120,7 @@ class AliceInterface(CharacterPublicInterface): n=n, value=value, rate=rate, - expiration=expiration, - discover_on_this_thread=True) + expiration=expiration) new_policy.treasure_map_publisher.block_until_success_is_reasonably_likely() diff --git a/nucypher/characters/lawful.py b/nucypher/characters/lawful.py index 841fd2a25..7bd428b61 100644 --- a/nucypher/characters/lawful.py +++ b/nucypher/characters/lawful.py @@ -297,7 +297,6 @@ class Alice(Character, BlockchainPolicyAuthor): bob: "Bob", label: bytes, handpicked_ursulas: set = None, - discover_on_this_thread: bool = True, timeout: int = None, publish_treasure_map: bool = True, block_until_success_is_reasonably_likely: bool = True, @@ -332,7 +331,7 @@ class Alice(Character, BlockchainPolicyAuthor): # If we're federated only, we need to block to make sure we have enough nodes. if self.federated_only and len(self.known_nodes) < policy.n: good_to_go = self.block_until_number_of_known_nodes_is(number_of_nodes_to_know=policy.n, - learn_on_this_thread=discover_on_this_thread, + learn_on_this_thread=True, timeout=timeout) if not good_to_go: raise ValueError( @@ -345,8 +344,7 @@ class Alice(Character, BlockchainPolicyAuthor): # TODO: Make it optional to publish to blockchain? Or is this presumptive based on the `Policy` type? enacted_policy = policy.enact(network_middleware=self.network_middleware, handpicked_ursulas=handpicked_ursulas, - publish_treasure_map=publish_treasure_map, - discover_on_this_thread=discover_on_this_thread) + publish_treasure_map=publish_treasure_map) self.add_active_policy(enacted_policy) diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index abac5456a..9df4dcfad 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -178,43 +178,6 @@ class PrefetchStrategy: return batch -def propose_arrangements(worker, value_factory, target_successes, timeout): - """ - A temporary function that calls workers sequentially. - To be replaced with a parallel solution. - """ - - successes = {} - failures = {} - start_time = maya.now() - - while True: - - value_batch = value_factory(len(successes)) - if value_batch is None: - break - - for value in value_batch: - try: - result = worker(value) - successes[value] = result - except Exception as e: - failures[value] = e - - if len(successes) == target_successes: - break - - delta = maya.now() - start_time - if delta.total_seconds() >= timeout: - raise RuntimeError(f"Proposal stage timed out after {timeout} seconds; " - f"need {target_successes - len(successes)} more.") - - if len(successes) == target_successes: - break - - return successes, failures - - class Policy(ABC): """ An edict by Alice, arranged with n Ursulas, to perform re-encryption for a specific Bob. @@ -320,7 +283,6 @@ class Policy(ABC): def _make_arrangements(self, network_middleware: RestMiddleware, handpicked_ursulas: Optional[Iterable[Ursula]] = None, - discover_on_this_thread: bool = True, timeout: int = 10, ) -> Dict[Ursula, Arrangement]: """ @@ -338,18 +300,31 @@ class Policy(ABC): def worker(address): return self._propose_arrangement(address, network_middleware) - self.alice.block_until_number_of_known_nodes_is(self.n, learn_on_this_thread=discover_on_this_thread, eager=True) + self.alice.block_until_number_of_known_nodes_is(self.n, learn_on_this_thread=True, eager=True) - arrangements, failures = propose_arrangements(worker=worker, - value_factory=value_factory, - target_successes=self.n, - timeout=timeout) + worker_pool = WorkerPool(worker=worker, + value_factory=value_factory, + target_successes=self.n, + timeout=timeout, + stagger_timeout=1, + threadpool_size=self.n) + worker_pool.start() + try: + successes = worker_pool.block_until_target_successes() + except (WorkerPool.OutOfValues, WorkerPool.TimedOut): + # It's possible to raise some other exceptions here, + # but we will use the logic below. + successes = worker_pool.get_successes() + finally: + worker_pool.cancel() + worker_pool.join() - accepted_arrangements = {ursula: arrangement for ursula, arrangement in arrangements.values()} + accepted_arrangements = {ursula: arrangement for ursula, arrangement in successes.values()} + failures = worker_pool.get_failures() accepted_addresses = ", ".join(ursula.checksum_address for ursula in accepted_arrangements) - if len(arrangements) < self.n: + if len(accepted_arrangements) < self.n: rejected_proposals = "\n".join(f"{address}: {exception}" for address, exception in failures.items()) @@ -464,7 +439,6 @@ class Policy(ABC): def enact(self, network_middleware: RestMiddleware, handpicked_ursulas: Optional[Iterable[Ursula]] = None, - discover_on_this_thread: bool = True, publish_treasure_map: bool = True, ) -> 'EnactedPolicy': """ @@ -472,8 +446,7 @@ class Policy(ABC): """ arrangements = self._make_arrangements(network_middleware=network_middleware, - handpicked_ursulas=handpicked_ursulas, - discover_on_this_thread=discover_on_this_thread) + handpicked_ursulas=handpicked_ursulas) self._enact_arrangements(network_middleware=network_middleware, arrangements=arrangements, From 92bf2a981daaf8ba686715a5be79a4aa163ba4f6 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Sun, 3 Jan 2021 12:18:18 -0800 Subject: [PATCH 9/9] Parallel enactment --- nucypher/policy/policies.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 9df4dcfad..8279fd4e5 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -343,13 +343,14 @@ class Policy(ABC): arrangements: Dict[Ursula, Arrangement], publication_transaction: Optional[HexBytes] = None, publish_treasure_map: bool = True, + timeout: int = 10, ): """ Attempts to distribute kfrags to Ursulas that accepted arrangements earlier. """ - statuses = {} - for ursula, kfrag in zip(arrangements, self.kfrags): + def worker(ursula_and_kfrag): + ursula, kfrag = ursula_and_kfrag arrangement = arrangements[ursula] # TODO: seems like it would be enough to just encrypt this with Ursula's public key, @@ -368,10 +369,27 @@ class Policy(ABC): else: status = response.status_code - statuses[ursula.checksum_address] = status + return status + + value_factory = AllAtOnceFactory(list(zip(arrangements, self.kfrags))) + worker_pool = WorkerPool(worker=worker, + value_factory=value_factory, + target_successes=self.n, + timeout=timeout, + threadpool_size=self.n) + + worker_pool.start() + + # Block until everything is complete. We need all the workers to finish. + worker_pool.join() + + successes = worker_pool.get_successes() + + if len(successes) != self.n: + raise Policy.EnactmentError() # TODO: Enable re-tries? - + statuses = {ursula_and_kfrag[0].checksum_address: status for ursula_and_kfrag, status in successes.items()} if not all(status == 200 for status in statuses.values()): report = "\n".join(f"{address}: {status}" for address, status in statuses.items()) self.log.debug(f"Policy enactment failed. Request statuses:\n{report}")