From d2686ce84ed9d7c8aa206d089be079cf8346d70e Mon Sep 17 00:00:00 2001 From: jMyles Date: Fri, 15 May 2020 17:06:03 -0700 Subject: [PATCH] Using the threadpool and the custom mutex - substantially faster test run. --- nucypher/policy/policies.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 6fe7ad2a4..2d580f2e4 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -17,11 +17,13 @@ along with nucypher. If not, see . import random from collections import OrderedDict, deque +from queue import Queue from typing import Generator, Set, List, Callable import maya +from twisted.internet import reactor from twisted.internet.defer import DeferredList -from twisted.internet.threads import deferToThread +from twisted.internet.threads import deferToThread, deferToThreadPool from abc import ABC, abstractmethod from bytestring_splitter import BytestringSplitter, VariableLengthBytestring @@ -165,18 +167,22 @@ class BlockchainArrangement(Arrangement): class PolicyPayloadMutex(DeferredList): - def __init__(self, deferredList, *args, **kwargs): + def __init__(self, deferredList, percent_to_complete_before_release=10, *args, **kwargs): + self.percent_to_complete_before_release = percent_to_complete_before_release + self.q = Queue() + super().__init__(deferredList, *args, **kwargs) - self.done_when = int(len(deferredList) / 10) + self.done_when = int(len(deferredList) / self.percent_to_complete_before_release) def _cbDeferred(self, *args, **kwargs): - if self.finishedCount == self.done_when: - self.fireOnOneCallback = True - result = super()._cbDeferred(*args, **kwargs) - self.called = False # Keep running. - return result + result = super()._cbDeferred(*args, **kwargs) - return super()._cbDeferred(*args, **kwargs) + if self.finishedCount == self.done_when: + self.q.put(None) + + if self.finishedCount == len(self.resultList): + print("********************** DONE!!! **********************") + return result class Policy(ABC): @@ -291,13 +297,14 @@ class Policy(ABC): for node in self.bob.matching_nodes_among(self.alice.known_nodes): # TODO: Concurrency here. - responses.append(deferToThread(network_middleware.put_treasure_map_on_node, - node=node, - map_id=treasure_map_id, - map_payload=bytes(self.treasure_map) - )) + responses.append(deferToThreadPool(reactor, self.alice.publication_threadpool, + network_middleware.put_treasure_map_on_node, + node=node, + map_id=treasure_map_id, + map_payload=bytes(self.treasure_map) + )) - return PolicyPayloadMutex(responses) + return PolicyPayloadMutex(responses, percent_to_complete_before_release=10) def credential(self, with_treasure_map=True): """