Using signal flag to check queue. Fixes #2190.

pull/1741/head
jMyles 2020-08-31 12:41:44 -07:00
parent d1b2257797
commit 4269150974
2 changed files with 14 additions and 15 deletions

View File

@ -207,7 +207,7 @@ class NodeEngagementMutex:
self._completion_queue = Queue() self._completion_queue = Queue()
self._block_until_this_many_are_complete = math.ceil( self._block_until_this_many_are_complete = math.ceil(
len(nodes) * self.percent_to_complete_before_release / 100) len(nodes) * self.percent_to_complete_before_release / 100)
self.released = False self.partial_blocking_complete = False
self.when_complete = Deferred() # TODO: Allow cancelling via KB Interrupt or some other way? self.when_complete = Deferred() # TODO: Allow cancelling via KB Interrupt or some other way?
if note is None: if note is None:
@ -226,8 +226,11 @@ class NodeEngagementMutex:
https://www.youtube.com/watch?v=OkSLswPSq2o https://www.youtube.com/watch?v=OkSLswPSq2o
""" """
if len(self.completed) < self._block_until_this_many_are_complete: if len(self.completed) < self._block_until_this_many_are_complete:
_ = self._partial_queue.get() # Interesting opportuntiy to pass some data, like the list of contacted nodes above. completed_for_reasonable_likelihood_of_success = self._partial_queue.get() # Interesting opportuntiy to pass some data, like the list of contacted nodes above.
self.log.debug(f"{len(self.completed)} nodes were contacted while blocking for a little while.") self.log.debug(f"{len(self.completed)} nodes were contacted while blocking for a little while.")
return completed_for_reasonable_likelihood_of_success
else:
return self.completed
def block_until_complete(self): def block_until_complete(self):
if self.total_disposed() < len(self.nodes): if self.total_disposed() < len(self.nodes):
@ -240,12 +243,12 @@ class NodeEngagementMutex:
if response.status_code == 202: if response.status_code == 202:
self.completed[node] = response self.completed[node] = response
else: else:
assert False # TODO: What happens if this is a 300 or 400 level response? assert False # TODO: What happens if this is a 300 or 400 level response? (A 500 response will propagate as an error and be handled in the errback chain.)
if not self.partial_blocking_complete:
if len(self.completed) == self._block_until_this_many_are_complete: if len(self.completed) >= self._block_until_this_many_are_complete:
self.log.debug(f"Blocked for a little while, completed {len(self.completed)} nodes") self.partial_blocking_complete = True
self._partial_queue.put(self.completed) self.log.debug(f"Blocked for a little while, completed {len(self.completed)} nodes")
self.released = True self._partial_queue.put(self.completed)
self._consider_finalizing() self._consider_finalizing()
return response return response

View File

@ -181,19 +181,15 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas,
# defer.setDebugging(True) # defer.setDebugging(True)
# PART II: We block for a little while to ensure that the distribution is going well. # PART II: We block for a little while to ensure that the distribution is going well.
policy.publishing_mutex.block_until_success_is_reasonably_likely() nodes_that_have_the_map_when_we_unblock = policy.publishing_mutex.block_until_success_is_reasonably_likely()
nodes_that_have_the_map_when_we_unblock = len(policy.publishing_mutex.completed)
little_while_ended_at = datetime.now() little_while_ended_at = datetime.now()
# The number of nodes having the map is at least the minimum to have unblocked. # The number of nodes having the map is at least the minimum to have unblocked.
assert nodes_that_have_the_map_when_we_unblock >= policy.publishing_mutex._block_until_this_many_are_complete assert len(nodes_that_have_the_map_when_we_unblock) >= policy.publishing_mutex._block_until_this_many_are_complete
# The number of nodes having the map is approximately the number you'd expect from full utilization of Alice's publication threadpool. # The number of nodes having the map is approximately the number you'd expect from full utilization of Alice's publication threadpool.
# TODO: This line fails sometimes because the loop goes too fast. # TODO: This line fails sometimes because the loop goes too fast.
assert nodes_that_have_the_map_when_we_unblock == pytest.approx(policy.publishing_mutex._threadpool.max, .6) assert len(nodes_that_have_the_map_when_we_unblock) == pytest.approx(policy.publishing_mutex._threadpool.max, .6)
# Temporarily: *some* nodes have it.
assert nodes_that_have_the_map_when_we_unblock
# PART III: Having made proper assertions about the publication call and the first block, we allow the rest to # PART III: Having made proper assertions about the publication call and the first block, we allow the rest to
# happen in the background and then ensure that each phase was timely. # happen in the background and then ensure that each phase was timely.