Simplified enactment using new mutex.

pull/2177/head
jMyles 2020-08-14 08:39:47 -07:00
parent 0484360020
commit 3e80655d0f
1 changed files with 6 additions and 20 deletions

View File

@ -376,28 +376,14 @@ class Policy(ABC):
# d = ensureDeferred(self.get_publication_threadpool())
for node in self.bob.matching_nodes_among(self.alice.known_nodes):
d = deferToThreadPool(reactor, threadpool,
network_middleware.put_treasure_map_on_node,
node=node,
map_id=treasure_map_id,
map_payload=bytes(self.treasure_map)
)
def llama(result):
return result
d.addCallback(llama)
publication_deferreds.append(d)
# d.callback(47)
self.publishing_mutex = PolicyPayloadMutex(publication_deferreds, percent_to_complete_before_release=10)
def _finish(all_responses): # TODO: This degree of management is too much to expect of the called function.
target_nodes = self.bob.matching_nodes_among(self.alice.known_nodes)
self.publishing_mutex = NodeEngagementMutex(f=self.put_treasure_map_on_node,
nodes=target_nodes,
network_middleware=network_middleware,
percent_to_complete_before_release=10)
self.alice._policy_queue.put(all_responses) # TODO: Do something more interesting with the responses?
threadpool.stop()
self.publishing_mutex.start()
# TODO: errback that stops the threadpool.
self.publishing_mutex.addCallback(_finish)
# return self.publishing_mutex # I dunno.. return this? Why not just use the composed version?
def inject_alice_publication_threadpool_into(self, f, *args, **kwargs):
d = ensureDeferred(self.alice.get_publication_threadpool(policy_id=self.id.hex()[0:10]))