mirror of https://github.com/nucypher/nucypher.git
Ensuring that threadpool is stopped after blocking is complete, even if reactor isn't running.
parent
afb6bb0b94
commit
69c655a61e
|
@ -232,6 +232,9 @@ class NodeEngagementMutex:
|
|||
def block_until_complete(self):
|
||||
if self.total_disposed() < len(self.nodes):
|
||||
_ = self._completion_queue.get() # Interesting opportuntiy to pass some data, like the list of contacted nodes above.
|
||||
if not reactor.running and not self._threadpool.joined:
|
||||
# If the reactor isn't running, the user *must* call this, because this is where we stop.
|
||||
self._threadpool.stop()
|
||||
|
||||
def _handle_success(self, response, node):
|
||||
if response.status_code == 202:
|
||||
|
@ -254,18 +257,16 @@ class NodeEngagementMutex:
|
|||
def total_disposed(self):
|
||||
return len(self.completed) + len(self.failed)
|
||||
|
||||
def _finalize(self):
|
||||
self._threadpool.stop()
|
||||
self._finished = True
|
||||
self._completion_queue.put(self.completed)
|
||||
self.when_complete.callback(self.completed)
|
||||
self.log.info(f"{self} finished.")
|
||||
|
||||
def _consider_finalizing(self):
|
||||
if not self._finished:
|
||||
if self.total_disposed() == len(self.nodes):
|
||||
# TODO: Consider whether this can possibly hang.
|
||||
reactor.callInThread(self._finalize)
|
||||
self._finished = True
|
||||
if reactor.running:
|
||||
reactor.callInThread(self._threadpool.stop)
|
||||
self._completion_queue.put(self.completed)
|
||||
self.when_complete.callback(self.completed)
|
||||
self.log.info(f"{self} finished.")
|
||||
else:
|
||||
raise RuntimeError("Already finished.")
|
||||
|
||||
|
|
Loading…
Reference in New Issue