From 42691509748ebddac57ac7ae5ce16abb1a46e876 Mon Sep 17 00:00:00 2001 From: jMyles Date: Mon, 31 Aug 2020 12:41:44 -0700 Subject: [PATCH] Using signal flag to check queue. Fixes #2190. --- nucypher/policy/policies.py | 19 +++++++++++-------- .../learning/test_discovery_phases.py | 10 +++------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/nucypher/policy/policies.py b/nucypher/policy/policies.py index 34d3e7638..0faf91b97 100644 --- a/nucypher/policy/policies.py +++ b/nucypher/policy/policies.py @@ -207,7 +207,7 @@ class NodeEngagementMutex: self._completion_queue = Queue() self._block_until_this_many_are_complete = math.ceil( len(nodes) * self.percent_to_complete_before_release / 100) - self.released = False + self.partial_blocking_complete = False self.when_complete = Deferred() # TODO: Allow cancelling via KB Interrupt or some other way? if note is None: @@ -226,8 +226,11 @@ class NodeEngagementMutex: https://www.youtube.com/watch?v=OkSLswPSq2o """ if len(self.completed) < self._block_until_this_many_are_complete: - _ = self._partial_queue.get() # Interesting opportuntiy to pass some data, like the list of contacted nodes above. + completed_for_reasonable_likelihood_of_success = self._partial_queue.get() # Interesting opportuntiy to pass some data, like the list of contacted nodes above. self.log.debug(f"{len(self.completed)} nodes were contacted while blocking for a little while.") + return completed_for_reasonable_likelihood_of_success + else: + return self.completed def block_until_complete(self): if self.total_disposed() < len(self.nodes): @@ -240,12 +243,12 @@ class NodeEngagementMutex: if response.status_code == 202: self.completed[node] = response else: - assert False # TODO: What happens if this is a 300 or 400 level response? - - if len(self.completed) == self._block_until_this_many_are_complete: - self.log.debug(f"Blocked for a little while, completed {len(self.completed)} nodes") - self._partial_queue.put(self.completed) - self.released = True + assert False # TODO: What happens if this is a 300 or 400 level response? (A 500 response will propagate as an error and be handled in the errback chain.) + if not self.partial_blocking_complete: + if len(self.completed) >= self._block_until_this_many_are_complete: + self.partial_blocking_complete = True + self.log.debug(f"Blocked for a little while, completed {len(self.completed)} nodes") + self._partial_queue.put(self.completed) self._consider_finalizing() return response diff --git a/tests/integration/learning/test_discovery_phases.py b/tests/integration/learning/test_discovery_phases.py index 429fccdfa..dc28d6d45 100644 --- a/tests/integration/learning/test_discovery_phases.py +++ b/tests/integration/learning/test_discovery_phases.py @@ -181,19 +181,15 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas, # defer.setDebugging(True) # PART II: We block for a little while to ensure that the distribution is going well. - policy.publishing_mutex.block_until_success_is_reasonably_likely() - nodes_that_have_the_map_when_we_unblock = len(policy.publishing_mutex.completed) + nodes_that_have_the_map_when_we_unblock = policy.publishing_mutex.block_until_success_is_reasonably_likely() little_while_ended_at = datetime.now() # The number of nodes having the map is at least the minimum to have unblocked. - assert nodes_that_have_the_map_when_we_unblock >= policy.publishing_mutex._block_until_this_many_are_complete + assert len(nodes_that_have_the_map_when_we_unblock) >= policy.publishing_mutex._block_until_this_many_are_complete # The number of nodes having the map is approximately the number you'd expect from full utilization of Alice's publication threadpool. # TODO: This line fails sometimes because the loop goes too fast. - assert nodes_that_have_the_map_when_we_unblock == pytest.approx(policy.publishing_mutex._threadpool.max, .6) - - # Temporarily: *some* nodes have it. - assert nodes_that_have_the_map_when_we_unblock + assert len(nodes_that_have_the_map_when_we_unblock) == pytest.approx(policy.publishing_mutex._threadpool.max, .6) # PART III: Having made proper assertions about the publication call and the first block, we allow the rest to # happen in the background and then ensure that each phase was timely.