mirror of https://github.com/nucypher/nucypher.git
Evolving test.
parent
37ebca4afa
commit
b18dfd9a69
|
@ -469,8 +469,7 @@ class Policy(ABC):
|
|||
self.alice.add_active_policy(self)
|
||||
|
||||
if publish_treasure_map is True:
|
||||
publication_result = self.inject_alice_publication_threadpool_into(self.publish_treasure_map, network_middleware=network_middleware)
|
||||
# return self.publish_treasure_map(network_middleware=network_middleware)
|
||||
return self.publish_treasure_map(network_middleware=network_middleware) # TODO: blockchain_signer?
|
||||
|
||||
def propose_arrangement(self, network_middleware, ursula, arrangement) -> bool:
|
||||
negotiation_response = network_middleware.propose_arrangement(arrangement=arrangement)
|
||||
|
|
|
@ -1012,7 +1012,6 @@ def highperf_mocked_alice(fleet_of_highperf_mocked_ursulas):
|
|||
yield alice
|
||||
# TODO: Where does this really, truly belong?
|
||||
alice._learning_task.stop()
|
||||
alice.publication_threadpool.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
|
|
|
@ -22,10 +22,13 @@ import maya
|
|||
import pytest
|
||||
import pytest_twisted
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import ensureDeferred
|
||||
from twisted.internet.threads import deferToThread
|
||||
|
||||
from nucypher.characters.lawful import Ursula
|
||||
from nucypher.datastore.base import RecordField
|
||||
from nucypher.network.nodes import Teacher
|
||||
from nucypher.policy.collections import TreasureMap
|
||||
from tests.mock.performance_mocks import (
|
||||
NotAPublicKey,
|
||||
NotARestApp,
|
||||
|
@ -43,6 +46,7 @@ from tests.mock.performance_mocks import (
|
|||
from tests.utils.middleware import SluggishLargeFleetMiddleware
|
||||
from tests.utils.ursula import MOCK_KNOWN_URSULAS_CACHE
|
||||
from umbral.keys import UmbralPublicKey
|
||||
from flask import Response
|
||||
|
||||
"""
|
||||
Node Discovery happens in phases. The first step is for a network actor to learn about the mere existence of a Node.
|
||||
|
@ -146,90 +150,64 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas,
|
|||
# The nodes who match the map distribution criteria.
|
||||
nodes_we_expect_to_have_the_map = highperf_mocked_bob.matching_nodes_among(fleet_of_highperf_mocked_ursulas)
|
||||
|
||||
preparation_started = datetime.now()
|
||||
Teacher.verify_node = lambda *args, **kwargs: None
|
||||
|
||||
# # # Loop through and instantiate actual rest apps so as not to pollute the time measurement (doesn't happen in real world).
|
||||
for node in nodes_we_expect_to_have_the_map:
|
||||
highperf_mocked_alice.network_middleware.client.parse_node_or_host_and_port(
|
||||
node) # Causes rest app to be made (happens JIT in other testS)
|
||||
# Causes rest app to be made (happens JIT in other testS)
|
||||
highperf_mocked_alice.network_middleware.client.parse_node_or_host_and_port(node)
|
||||
|
||||
def _partial_rest_app(node):
|
||||
def faster_receive_map(treasure_map_id, *args, **kwargs):
|
||||
node.treasure_maps[treasure_map_id] = True
|
||||
return Response(bytes(b"Sure, we stored it."), status=202)
|
||||
return faster_receive_map
|
||||
node.rest_app._actual_rest_app.view_functions._view_functions_registry['receive_treasure_map'] = _partial_rest_app(node)
|
||||
|
||||
highperf_mocked_alice.network_middleware = SluggishLargeFleetMiddleware()
|
||||
|
||||
policy = _POLICY_PRESERVER.pop()
|
||||
|
||||
started = datetime.now()
|
||||
|
||||
with patch('umbral.keys.UmbralPublicKey.__eq__', lambda *args, **kwargs: True), mock_metadata_validation:
|
||||
|
||||
started = datetime.now()
|
||||
|
||||
# PART I: The function returns sychronously and quickly.
|
||||
|
||||
defer.setDebugging(False) # Debugging messes up the timing here; comment this line out if you actually need it.
|
||||
# returns instantly.
|
||||
policy.publish_treasure_map(network_middleware=highperf_mocked_alice.network_middleware)
|
||||
# defer.setDebugging(False) # Debugging messes up the timing here; comment this line out if you actually need it.
|
||||
|
||||
nodes_that_have_the_map_when_we_return = []
|
||||
policy.publish_treasure_map(network_middleware=highperf_mocked_alice.network_middleware) # returns quickly.
|
||||
|
||||
for ursula in nodes_we_expect_to_have_the_map:
|
||||
if policy.treasure_map in list(ursula.treasure_maps.values()):
|
||||
nodes_that_have_the_map_when_we_return.append(ursula)
|
||||
|
||||
# Very few have gotten the map yet; it's happening in the background.
|
||||
# Note: if you put a breakpoint above this line, you will likely need to comment this assertion out.
|
||||
assert len(
|
||||
nodes_that_have_the_map_when_we_return) <= 5 # Maybe a couple finished already, especially if this is a lightning fast computer. But more than five is weird.
|
||||
|
||||
defer.setDebugging(True)
|
||||
# defer.setDebugging(True)
|
||||
|
||||
# PART II: We block for a little while to ensure that the distribution is going well.
|
||||
|
||||
# Wait until about ten percent of the distribution has occurred.
|
||||
# We do it in a deferred here in the test because it will block the entire process, but in the real-world, we can do this on the granting thread.
|
||||
|
||||
def count_recipients_after_block():
|
||||
policy.publishing_mutex.block_for_a_little_while()
|
||||
little_while_ended_at = datetime.now()
|
||||
|
||||
# Here we'll just count the nodes that have the map. In the real world, we can do a sanity check
|
||||
# to make sure things haven't gone sideways.
|
||||
|
||||
nodes_that_have_the_map_when_we_unblock = sum(
|
||||
policy.treasure_map in list(u.treasure_maps.values()) for u in nodes_we_expect_to_have_the_map)
|
||||
|
||||
return nodes_that_have_the_map_when_we_unblock, little_while_ended_at
|
||||
|
||||
d = deferToThread(count_recipients_after_block)
|
||||
yield d
|
||||
nodes_that_have_the_map_when_we_unblock, little_while_ended_at = d.result
|
||||
policy.publishing_mutex.block_for_a_little_while()
|
||||
nodes_that_have_the_map_when_we_unblock = len(policy.publishing_mutex.completed)
|
||||
little_while_ended_at = datetime.now()
|
||||
|
||||
# The number of nodes having the map is at least the minimum to have unblocked.
|
||||
assert nodes_that_have_the_map_when_we_unblock >= policy.publishing_mutex._block_until_this_many_are_complete
|
||||
|
||||
# The number of nodes having the map is approximately the number you'd expect from full utilization of Alice's publication threadpool.
|
||||
assert nodes_that_have_the_map_when_we_unblock == pytest.approx(
|
||||
highperf_mocked_alice.publication_threadpool.max, .6)
|
||||
assert nodes_that_have_the_map_when_we_unblock == pytest.approx(policy.publishing_mutex._threadpool.max, .6)
|
||||
|
||||
# PART III: Having made proper assertions about the publication call and the first block, we allow the rest to
|
||||
# happen in the background and then ensure that each phase was timely.
|
||||
successful_responses = []
|
||||
|
||||
def find_successful_responses(map_publication_responses):
|
||||
for was_succssful, http_response in map_publication_responses:
|
||||
assert was_succssful
|
||||
assert http_response.status_code == 202
|
||||
successful_responses.append(http_response)
|
||||
|
||||
policy.publishing_mutex.addCallback(find_successful_responses)
|
||||
yield policy.publishing_mutex # This will block until the distribution is complete.
|
||||
yield policy.publishing_mutex.when_complete # This will block until the distribution is complete.
|
||||
complete_distribution_time = datetime.now() - started
|
||||
|
||||
# We have the same number of successful responses as nodes we expected to have the map.
|
||||
assert len(successful_responses) == len(nodes_we_expect_to_have_the_map)
|
||||
assert len(policy.publishing_mutex.completed) == len(nodes_we_expect_to_have_the_map)
|
||||
nodes_that_got_the_map = sum(
|
||||
policy.treasure_map.public_id() in u.treasure_maps for u in nodes_we_expect_to_have_the_map)
|
||||
assert nodes_that_got_the_map == len(nodes_we_expect_to_have_the_map)
|
||||
|
||||
# TODO: Assert that no nodes outside those expected received the map.
|
||||
|
||||
partial_blocking_duration = little_while_ended_at - started
|
||||
# Before Treasure Island (1741), this process took about 3 minutes.
|
||||
print(f"Partial distribution time: {partial_blocking_duration}, complete in {complete_distribution_time}")
|
||||
assert partial_blocking_duration.total_seconds() < 3
|
||||
assert complete_distribution_time.total_seconds() < 10
|
||||
assert complete_distribution_time.total_seconds() < 8
|
||||
# On CI, we expect these times to be even less. (Around 1 and 3.5 seconds, respectively)
|
||||
# But with debuggers and other processes running on laptops, we give a little leeway.
|
||||
|
|
|
@ -67,36 +67,16 @@ def make_federated_ursulas(ursula_config: UrsulaConfiguration,
|
|||
starting_port = max(MOCK_KNOWN_URSULAS_CACHE.keys()) + 1
|
||||
|
||||
federated_ursulas = set()
|
||||
frames = inspect.stack(3)
|
||||
# This gets called from various places.
|
||||
for frame in range(1, 10):
|
||||
try:
|
||||
test_name = frames[frame].frame.f_locals['request'].module
|
||||
break
|
||||
except KeyError:
|
||||
try:
|
||||
if frames[frame].function.startswith("test"):
|
||||
test_name = frames[frame].function
|
||||
break
|
||||
else:
|
||||
continue
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
for port in range(starting_port, starting_port+quantity):
|
||||
|
||||
ursula = ursula_config.produce(rest_port=port + 100,
|
||||
db_filepath=MOCK_URSULA_DB_FILEPATH,
|
||||
**ursula_overrides)
|
||||
try:
|
||||
ursula._FOR_TEST = test_name
|
||||
except UnboundLocalError:
|
||||
raise RuntimeError("Unable to find a test name to assign to this ursula in the first 10 frames.")
|
||||
|
||||
federated_ursulas.add(ursula)
|
||||
|
||||
# Store this Ursula in our global testing cache.
|
||||
|
||||
port = ursula.rest_interface.port
|
||||
MOCK_KNOWN_URSULAS_CACHE[port] = ursula
|
||||
|
||||
|
|
Loading…
Reference in New Issue