Using the threadpool and the custom mutex - substantially faster test run.

pull/2140/head
jMyles 2020-05-15 17:06:03 -07:00
parent b22e6cc7b6
commit d2686ce84e
1 changed files with 22 additions and 15 deletions

View File

@ -17,11 +17,13 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import random
from collections import OrderedDict, deque
from queue import Queue
from typing import Generator, Set, List, Callable
import maya
from twisted.internet import reactor
from twisted.internet.defer import DeferredList
from twisted.internet.threads import deferToThread
from twisted.internet.threads import deferToThread, deferToThreadPool
from abc import ABC, abstractmethod
from bytestring_splitter import BytestringSplitter, VariableLengthBytestring
@ -165,18 +167,22 @@ class BlockchainArrangement(Arrangement):
class PolicyPayloadMutex(DeferredList):
def __init__(self, deferredList, *args, **kwargs):
def __init__(self, deferredList, percent_to_complete_before_release=10, *args, **kwargs):
self.percent_to_complete_before_release = percent_to_complete_before_release
self.q = Queue()
super().__init__(deferredList, *args, **kwargs)
self.done_when = int(len(deferredList) / 10)
self.done_when = int(len(deferredList) / self.percent_to_complete_before_release)
def _cbDeferred(self, *args, **kwargs):
if self.finishedCount == self.done_when:
self.fireOnOneCallback = True
result = super()._cbDeferred(*args, **kwargs)
self.called = False # Keep running.
return result
result = super()._cbDeferred(*args, **kwargs)
return super()._cbDeferred(*args, **kwargs)
if self.finishedCount == self.done_when:
self.q.put(None)
if self.finishedCount == len(self.resultList):
print("********************** DONE!!! **********************")
return result
class Policy(ABC):
@ -291,13 +297,14 @@ class Policy(ABC):
for node in self.bob.matching_nodes_among(self.alice.known_nodes):
# TODO: Concurrency here.
responses.append(deferToThread(network_middleware.put_treasure_map_on_node,
node=node,
map_id=treasure_map_id,
map_payload=bytes(self.treasure_map)
))
responses.append(deferToThreadPool(reactor, self.alice.publication_threadpool,
network_middleware.put_treasure_map_on_node,
node=node,
map_id=treasure_map_id,
map_payload=bytes(self.treasure_map)
))
return PolicyPayloadMutex(responses)
return PolicyPayloadMutex(responses, percent_to_complete_before_release=10)
def credential(self, with_treasure_map=True):
"""