mirror of https://github.com/nucypher/nucypher.git
Learner is a concrete class; Handle unresponsive teacher nodes at startup.
parent
41d0ed5b0b
commit
bf549db7f3
|
@ -30,7 +30,7 @@ from nucypher.network.nodes import VerifiableNode
|
|||
from nucypher.network.server import TLSHostingPower
|
||||
|
||||
|
||||
class Learner(ABC):
|
||||
class Learner:
|
||||
"""
|
||||
Any participant in the "learning loop" - a class inheriting from
|
||||
this one has the ability, synchronously or asynchronously,
|
||||
|
@ -45,16 +45,24 @@ class Learner(ABC):
|
|||
class NotEnoughTeachers(RuntimeError):
|
||||
pass
|
||||
|
||||
class UnresponsiveTeacher(ConnectionError):
|
||||
pass
|
||||
|
||||
def __init__(self,
|
||||
common_name: str,
|
||||
network_middleware: RestMiddleware = RestMiddleware(),
|
||||
start_learning_now: bool = False,
|
||||
learn_on_same_thread: bool = False,
|
||||
known_nodes: tuple = None,
|
||||
known_certificates_dir: str = None,
|
||||
known_metadata_dir: str = None,
|
||||
save_metadata: bool = False,
|
||||
abort_on_learning_error: bool = False) -> None:
|
||||
|
||||
self.log = getLogger("characters") # type: Logger
|
||||
|
||||
self.__common_name = common_name
|
||||
self.network_middleware = network_middleware
|
||||
self.save_metadata = save_metadata
|
||||
self.start_learning_now = start_learning_now
|
||||
self.learn_on_same_thread = learn_on_same_thread
|
||||
|
@ -63,6 +71,7 @@ class Learner(ABC):
|
|||
self._learning_listeners = defaultdict(list)
|
||||
self._node_ids_to_learn_about_immediately = set()
|
||||
|
||||
self.known_certificates_dir = known_certificates_dir
|
||||
self.__known_nodes = dict()
|
||||
|
||||
# Read
|
||||
|
@ -71,8 +80,12 @@ class Learner(ABC):
|
|||
raise ValueError("Cannot save nodes without a known_metadata_dir")
|
||||
|
||||
known_nodes = known_nodes or tuple()
|
||||
self.unresponsive_nodes = list() # TODO: Attempt to use these again later
|
||||
for node in known_nodes:
|
||||
try:
|
||||
self.remember_node(node)
|
||||
except self.UnresponsiveTeacher:
|
||||
self.unresponsive_nodes.append(node)
|
||||
|
||||
self.teacher_nodes = deque()
|
||||
self._current_teacher_node = None # type: Teacher
|
||||
|
@ -93,14 +106,19 @@ class Learner(ABC):
|
|||
with suppress(KeyError):
|
||||
already_known_node = self.known_nodes[node.checksum_public_address]
|
||||
if not node.timestamp > already_known_node.timestamp:
|
||||
self.log.debug("Skipping already known node {}".format(already_known_node))
|
||||
# This node is already known. We can safely return.
|
||||
return
|
||||
|
||||
certificate_filepath = node.save_certificate_to_disk(directory=self.known_certificates_dir)
|
||||
node.verify_node(self.network_middleware, # TODO: Take middleware directly in this class?
|
||||
try:
|
||||
node.verify_node(self.network_middleware,
|
||||
force=force_verification_check,
|
||||
accept_federated_only=self.federated_only,
|
||||
certificate_filepath=certificate_filepath) # TODO: 466
|
||||
accept_federated_only=self.federated_only, # TODO: 466
|
||||
certificate_filepath=certificate_filepath)
|
||||
except requests.exceptions.ConnectionError:
|
||||
self.log.info("No Response from known node {}|{}".format(node.rest_interface, node.checksum_public_address))
|
||||
raise self.UnresponsiveTeacher
|
||||
|
||||
listeners = self._learning_listeners.pop(node.checksum_public_address, tuple())
|
||||
address = node.checksum_public_address
|
||||
|
@ -138,10 +156,12 @@ class Learner(ABC):
|
|||
"""
|
||||
self._crashed = failure
|
||||
failure.raiseException()
|
||||
self.log.critical("{} crashed with {}".format(self.__common_name, failure))
|
||||
|
||||
def shuffled_known_nodes(self):
|
||||
nodes_we_know_about = list(self.__known_nodes.values())
|
||||
random.shuffle(nodes_we_know_about)
|
||||
self.log.info("Shuffled {} known nodes".format(len(nodes_we_know_about)))
|
||||
return nodes_we_know_about
|
||||
|
||||
def select_teacher_nodes(self):
|
||||
|
@ -160,6 +180,7 @@ class Learner(ABC):
|
|||
except IndexError:
|
||||
error = "Not enough nodes to select a good teacher, Check your network connection then node configuration"
|
||||
raise self.NotEnoughTeachers(error)
|
||||
self.log.info("Cycled teachers; New teacher is {}".format(self._current_teacher_node.checksum_public_address))
|
||||
|
||||
def current_teacher_node(self, cycle=False):
|
||||
if not self._current_teacher_node:
|
||||
|
@ -178,7 +199,7 @@ class Learner(ABC):
|
|||
self._learning_task()
|
||||
elif not force:
|
||||
self.log.warning(
|
||||
"Learning loop isn't started; can't learn about nodes now. You can ovverride this with force=True.")
|
||||
"Learning loop isn't started; can't learn about nodes now. You can override this with force=True.")
|
||||
elif force:
|
||||
self.log.info("Learning loop wasn't started; forcing start now.")
|
||||
self._learning_task.start(self._SHORT_LEARNING_DELAY, now=True)
|
||||
|
@ -216,8 +237,7 @@ class Learner(ABC):
|
|||
|
||||
if (maya.now() - start).seconds > timeout:
|
||||
if not self._learning_task.running:
|
||||
raise self.NotEnoughTeachers(
|
||||
"We didn't discover any nodes because the learning loop isn't running. Start it with start_learning().")
|
||||
raise self.NotEnoughTeachers("Learning loop is not running. Start it with start_learning().")
|
||||
else:
|
||||
raise self.NotEnoughTeachers("After {} seconds and {} rounds, didn't find {} nodes".format(
|
||||
timeout, rounds_undertaken, number_of_nodes_to_know))
|
||||
|
@ -314,11 +334,97 @@ class Learner(ABC):
|
|||
metadata_filepath = os.path.join(self.known_metadata_dir, filename)
|
||||
with open(metadata_filepath, "w") as f:
|
||||
f.write(serializer(node).hex())
|
||||
self.log.info("Wrote new node metadata {}".format(metadata_filepath))
|
||||
return metadata_filepath
|
||||
|
||||
@abstractmethod
|
||||
def learn_from_teacher_node(self, eager: bool = True):
|
||||
raise NotImplementedError
|
||||
def learn_from_teacher_node(self, eager=True):
|
||||
"""
|
||||
Sends a request to node_url to find out about known nodes.
|
||||
"""
|
||||
self._learning_round += 1
|
||||
|
||||
try:
|
||||
current_teacher = self.current_teacher_node()
|
||||
except self.NotEnoughTeachers as e:
|
||||
self.log.warning("Can't learn right now: {}".format(e.args[0]))
|
||||
return
|
||||
|
||||
rest_url = current_teacher.rest_interface # TODO: Name this..?
|
||||
|
||||
# TODO: Do we really want to try to learn about all these nodes instantly?
|
||||
# Hearing this traffic might give insight to an attacker.
|
||||
if VerifiableNode in self.__class__.__bases__:
|
||||
announce_nodes = [self]
|
||||
else:
|
||||
announce_nodes = None
|
||||
|
||||
unresponsive_nodes = set()
|
||||
try:
|
||||
|
||||
# TODO: Streamline path generation
|
||||
certificate_filepath = current_teacher.get_certificate_filepath(certificates_dir=self.known_certificates_dir)
|
||||
response = self.network_middleware.get_nodes_via_rest(url=rest_url,
|
||||
nodes_i_need=self._node_ids_to_learn_about_immediately,
|
||||
announce_nodes=announce_nodes,
|
||||
certificate_filepath=certificate_filepath)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
unresponsive_nodes.add(current_teacher)
|
||||
teacher_rest_info = current_teacher.rest_information()[0]
|
||||
|
||||
# TODO: This error isn't necessarily "no repsonse" - let's maybe pass on the text of the exception here.
|
||||
self.log.info("No Response from teacher: {}:{}.".format(teacher_rest_info.host, teacher_rest_info.port))
|
||||
self.cycle_teacher_node()
|
||||
return
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Bad response from teacher: {} - {}".format(response, response.content))
|
||||
|
||||
signature, nodes = signature_splitter(response.content, return_remainder=True)
|
||||
|
||||
# TODO: This doesn't make sense - a decentralized node can still learn about a federated-only node.
|
||||
from nucypher.characters.lawful import Ursula
|
||||
node_list = Ursula.batch_from_bytes(nodes, federated_only=self.federated_only) # TODO: 466
|
||||
|
||||
new_nodes = []
|
||||
for node in node_list:
|
||||
|
||||
if node.checksum_public_address in self.known_nodes or node.checksum_public_address == self.__common_name:
|
||||
continue # TODO: 168 Check version and update if required.
|
||||
|
||||
try:
|
||||
if eager:
|
||||
certificate_filepath = current_teacher.get_certificate_filepath(certificates_dir=certificate_filepath)
|
||||
node.verify_node(self.network_middleware,
|
||||
accept_federated_only=self.federated_only, # TODO: 466
|
||||
certificate_filepath=certificate_filepath)
|
||||
self.log.debug("Verified node: {}".format(node.checksum_public_address))
|
||||
|
||||
else:
|
||||
node.validate_metadata(accept_federated_only=self.federated_only) # TODO: 466
|
||||
|
||||
except node.SuspiciousActivity:
|
||||
# TODO: Account for possibility that stamp, rather than interface, was bad.
|
||||
message = "Suspicious Activity: Discovered node with bad signature: {}. " \
|
||||
"Propagated by: {}".format(current_teacher.checksum_public_address, rest_url)
|
||||
self.log.warning(message)
|
||||
self.log.info("Previously unknown node: {}".format(node.checksum_public_address))
|
||||
|
||||
self.log.info("Previously unknown node: {}".format(node.checksum_public_address))
|
||||
self.remember_node(node)
|
||||
new_nodes.append(node)
|
||||
|
||||
self._adjust_learning(new_nodes)
|
||||
|
||||
learning_round_log_message = "Learning round {}. Teacher: {} knew about {} nodes, {} were new."
|
||||
self.log.info(learning_round_log_message.format(self._learning_round,
|
||||
current_teacher.checksum_public_address,
|
||||
len(node_list),
|
||||
len(new_nodes)), )
|
||||
if new_nodes and self.known_certificates_dir:
|
||||
for node in new_nodes:
|
||||
node.save_certificate_to_disk(self.known_certificates_dir)
|
||||
|
||||
return new_nodes
|
||||
|
||||
|
||||
class Character(Learner):
|
||||
|
@ -340,7 +446,6 @@ class Character(Learner):
|
|||
def __init__(self,
|
||||
is_me: bool = True,
|
||||
network_middleware: RestMiddleware = None,
|
||||
known_certificates_dir: str = None,
|
||||
crypto_power: CryptoPower = None,
|
||||
crypto_power_ups: List[CryptoPowerUp] = None,
|
||||
federated_only: bool = False,
|
||||
|
@ -372,7 +477,6 @@ class Character(Learner):
|
|||
|
||||
"""
|
||||
self.federated_only = federated_only # type: bool
|
||||
self.known_certificates_dir = known_certificates_dir
|
||||
|
||||
#
|
||||
# Power-ups and Powers
|
||||
|
@ -404,15 +508,17 @@ class Character(Learner):
|
|||
except NoSigningPower:
|
||||
self._stamp = constants.NO_SIGNING_POWER
|
||||
|
||||
Learner.__init__(self,
|
||||
common_name=checksum_address,
|
||||
network_middleware=network_middleware,
|
||||
*args, **kwargs)
|
||||
|
||||
else: # Feel like a stranger
|
||||
if network_middleware is not None:
|
||||
raise TypeError(
|
||||
"Can't attach network middleware to a Character who isn't me. What are you even trying to do?")
|
||||
self._stamp = StrangerStamp(self.public_keys(SigningPower))
|
||||
|
||||
# Init the Learner superclass.
|
||||
Learner.__init__(self, *args, **kwargs)
|
||||
|
||||
# Decentralized
|
||||
if not federated_only:
|
||||
if not checksum_address:
|
||||
|
@ -468,10 +574,6 @@ class Character(Learner):
|
|||
def canonical_public_address(self, address_bytes):
|
||||
self._checksum_address = to_checksum_address(address_bytes)
|
||||
|
||||
@property
|
||||
def ether_address(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def checksum_public_address(self):
|
||||
if self._checksum_address is constants.NO_BLOCKCHAIN_CONNECTION:
|
||||
|
@ -484,7 +586,6 @@ class Character(Learner):
|
|||
|
||||
@classmethod
|
||||
def from_public_keys(cls, powers_and_material: Dict, federated_only=True, *args, **kwargs) -> 'Character':
|
||||
# TODO: Need to be federated only until we figure out the best way to get the checksum_address in here.
|
||||
"""
|
||||
Sometimes we discover a Character and, at the same moment,
|
||||
learn the public parts of more of their powers. Here, we take a Dict
|
||||
|
@ -494,7 +595,11 @@ class Character(Learner):
|
|||
Each item in the collection will have the CryptoPowerUp instantiated
|
||||
with the public_material_bytes, and the resulting CryptoPowerUp instance
|
||||
consumed by the Character.
|
||||
|
||||
# TODO: Need to be federated only until we figure out the best way to get the checksum_address in here.
|
||||
|
||||
"""
|
||||
|
||||
crypto_power = CryptoPower()
|
||||
|
||||
for power_up, public_key in powers_and_material.items():
|
||||
|
@ -507,93 +612,6 @@ class Character(Learner):
|
|||
|
||||
return cls(is_me=False, federated_only=federated_only, crypto_power=crypto_power, *args, **kwargs)
|
||||
|
||||
def learn_from_teacher_node(self, eager=True):
|
||||
"""
|
||||
Sends a request to node_url to find out about known nodes.
|
||||
"""
|
||||
self._learning_round += 1
|
||||
|
||||
try:
|
||||
current_teacher = self.current_teacher_node()
|
||||
except self.NotEnoughTeachers as e:
|
||||
self.log.warning("Can't learn right now: {}".format(e.args[0]))
|
||||
return
|
||||
|
||||
rest_url = current_teacher.rest_interface # TODO: Name this..?
|
||||
|
||||
# TODO: Do we really want to try to learn about all these nodes instantly?
|
||||
# Hearing this traffic might give insight to an attacker.
|
||||
if VerifiableNode in self.__class__.__bases__:
|
||||
announce_nodes = [self]
|
||||
else:
|
||||
announce_nodes = None
|
||||
|
||||
unresponsive_nodes = set()
|
||||
try:
|
||||
|
||||
# TODO: Streamline path generation
|
||||
certificate_filepath = current_teacher.get_certificate_filepath(certificates_dir=self.known_certificates_dir)
|
||||
response = self.network_middleware.get_nodes_via_rest(url=rest_url,
|
||||
nodes_i_need=self._node_ids_to_learn_about_immediately,
|
||||
announce_nodes=announce_nodes,
|
||||
certificate_filepath=certificate_filepath)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
unresponsive_nodes.add(current_teacher)
|
||||
teacher_rest_info = current_teacher.rest_information()[0]
|
||||
|
||||
# TODO: This error isn't necessarily "no repsonse" - let's maybe pass on the text of the exception here.
|
||||
self.log.info("No Response from teacher: {}:{}.".format(teacher_rest_info.host, teacher_rest_info.port))
|
||||
self.cycle_teacher_node()
|
||||
return
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Bad response from teacher: {} - {}".format(response, response.content))
|
||||
|
||||
signature, nodes = signature_splitter(response.content, return_remainder=True)
|
||||
|
||||
# TODO: This doesn't make sense - a decentralized node can still learn about a federated-only node.
|
||||
from nucypher.characters.lawful import Ursula
|
||||
node_list = Ursula.batch_from_bytes(nodes, federated_only=self.federated_only) # TODO: 466
|
||||
|
||||
new_nodes = []
|
||||
for node in node_list:
|
||||
|
||||
if node.checksum_public_address in self.known_nodes or node.checksum_public_address == self.checksum_public_address:
|
||||
continue # TODO: 168 Check version and update if required.
|
||||
|
||||
try:
|
||||
if eager:
|
||||
certificate_filepath = current_teacher.get_certificate_filepath(certificates_dir=certificate_filepath)
|
||||
node.verify_node(self.network_middleware,
|
||||
accept_federated_only=self.federated_only,
|
||||
certificate_filepath=certificate_filepath)
|
||||
else:
|
||||
node.validate_metadata(accept_federated_only=self.federated_only) # TODO: 466
|
||||
|
||||
except node.SuspiciousActivity:
|
||||
# TODO: Account for possibility that stamp, rather than interface, was bad.
|
||||
message = "Suspicious Activity: Discovered node with bad signature: {}. " \
|
||||
"Propagated by: {}".format(current_teacher.checksum_public_address, rest_url)
|
||||
self.log.warning(message)
|
||||
self.log.info("Previously unknown node: {}".format(node.checksum_public_address))
|
||||
|
||||
self.log.info("Previously unknown node: {}".format(node.checksum_public_address))
|
||||
self.remember_node(node)
|
||||
new_nodes.append(node)
|
||||
|
||||
self._adjust_learning(new_nodes)
|
||||
|
||||
learning_round_log_message = "Learning round {}. Teacher: {} knew about {} nodes, {} were new."
|
||||
self.log.info(learning_round_log_message.format(self._learning_round,
|
||||
current_teacher.checksum_public_address,
|
||||
len(node_list),
|
||||
len(new_nodes)), )
|
||||
if new_nodes and self.known_certificates_dir:
|
||||
for node in new_nodes:
|
||||
node.save_certificate_to_disk(self.known_certificates_dir)
|
||||
|
||||
return new_nodes
|
||||
|
||||
def encrypt_for(self,
|
||||
recipient: 'Character',
|
||||
plaintext: bytes,
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
import binascii
|
||||
import random
|
||||
from collections import OrderedDict
|
||||
from functools import partial
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
|
||||
import maya
|
||||
import time
|
||||
from bytestring_splitter import BytestringSplitter, VariableLengthBytestring
|
||||
from constant_sorrow import constants
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
@ -11,16 +13,12 @@ from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
|
|||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from cryptography.x509 import load_pem_x509_certificate, Certificate
|
||||
from eth_utils import to_checksum_address
|
||||
from functools import partial
|
||||
from twisted.internet import threads
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from umbral.keys import UmbralPublicKey
|
||||
from umbral.signing import Signature
|
||||
|
||||
from nucypher.blockchain.eth.actors import PolicyAuthor, Miner, only_me
|
||||
from nucypher.blockchain.eth.actors import PolicyAuthor, Miner
|
||||
from nucypher.blockchain.eth.agents import MinerAgent
|
||||
from nucypher.blockchain.eth.utils import datetime_to_period
|
||||
from nucypher.characters.base import Character, Learner
|
||||
from nucypher.crypto.api import keccak_digest
|
||||
from nucypher.crypto.constants import PUBLIC_ADDRESS_LENGTH, PUBLIC_KEY_LENGTH
|
||||
|
@ -30,7 +28,6 @@ from nucypher.network.middleware import RestMiddleware
|
|||
from nucypher.network.nodes import VerifiableNode
|
||||
from nucypher.network.protocols import InterfaceInfo
|
||||
from nucypher.network.server import ProxyRESTServer, TLSHostingPower, ProxyRESTRoutes
|
||||
from nucypher.utilities.sandbox.constants import TEST_URSULA_INSECURE_DEVELOPMENT_PASSWORD
|
||||
|
||||
|
||||
class Alice(Character, PolicyAuthor):
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import maya
|
||||
from eth_tester.exceptions import ValidationError
|
||||
|
||||
from nucypher.characters.lawful import Ursula
|
||||
|
|
|
@ -241,6 +241,7 @@ class NodeConfiguration:
|
|||
return self.config_root
|
||||
|
||||
def load_known_nodes(self, known_metadata_dir=None) -> None:
|
||||
from nucypher.characters.lawful import Ursula
|
||||
|
||||
if known_metadata_dir is None:
|
||||
known_metadata_dir = self.known_metadata_dir
|
||||
|
@ -250,7 +251,6 @@ class NodeConfiguration:
|
|||
|
||||
self.log.info("Found {} known node metadata files at {}".format(len(metadata_paths), known_metadata_dir))
|
||||
for metadata_path in metadata_paths:
|
||||
from nucypher.characters.lawful import Ursula
|
||||
node = Ursula.from_metadata_file(filepath=abspath(metadata_path), federated_only=self.federated_only) # TODO: 466
|
||||
self.known_nodes.add(node)
|
||||
|
||||
|
|
|
@ -132,7 +132,9 @@ class VerifiableNode:
|
|||
certificate_filepath=certificate_filepath)
|
||||
if not response.status_code == 200:
|
||||
raise RuntimeError("Or something.") # TODO: Raise an error here? Or return False? Or something?
|
||||
timestamp, signature, identity_evidence, verifying_key, encrypting_key, public_address, certificate_vbytes, rest_info = self._internal_splitter(response.content)
|
||||
timestamp, signature, identity_evidence, \
|
||||
verifying_key, encrypting_key, \
|
||||
public_address, certificate_vbytes, rest_info = self._internal_splitter(response.content)
|
||||
|
||||
verifying_keys_match = verifying_key == self.public_keys(SigningPower)
|
||||
encrypting_keys_match = encrypting_key == self.public_keys(EncryptingPower)
|
||||
|
|
Loading…
Reference in New Issue