diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 4b14c2f95..008f9138c 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -376,28 +376,14 @@ class Policy(ABC): # d = ensureDeferred(self.get_publication_threadpool()) - for node in self.bob.matching_nodes_among(self.alice.known_nodes): - d = deferToThreadPool(reactor, threadpool, - network_middleware.put_treasure_map_on_node, - node=node, - map_id=treasure_map_id, - map_payload=bytes(self.treasure_map) - ) - def llama(result): - return result - d.addCallback(llama) - publication_deferreds.append(d) - # d.callback(47) - self.publishing_mutex = PolicyPayloadMutex(publication_deferreds, percent_to_complete_before_release=10) - def _finish(all_responses): # TODO: This degree of management is too much to expect of the called function. + target_nodes = self.bob.matching_nodes_among(self.alice.known_nodes) + self.publishing_mutex = NodeEngagementMutex(f=self.put_treasure_map_on_node, + nodes=target_nodes, + network_middleware=network_middleware, + percent_to_complete_before_release=10) - self.alice._policy_queue.put(all_responses) # TODO: Do something more interesting with the responses? - threadpool.stop() + self.publishing_mutex.start() - # TODO: errback that stops the threadpool. - self.publishing_mutex.addCallback(_finish) - - # return self.publishing_mutex # I dunno.. return this? Why not just use the composed version? def inject_alice_publication_threadpool_into(self, f, *args, **kwargs): d = ensureDeferred(self.alice.get_publication_threadpool(policy_id=self.id.hex()[0:10]))