Merge pull request #691 from jMyles/learning-loop

Real-time learning monitor
pull/712/head
K Prasch 2019-01-30 10:07:32 -08:00 committed by GitHub
commit 0e2f9ec746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 410 additions and 29 deletions

View File

@ -0,0 +1,94 @@
import sys
from flask import Flask, render_template
from twisted.logger import globalLogPublisher
from hendrix.deploy.base import HendrixDeploy
from hendrix.experience import crosstown_traffic, hey_joe
from nucypher.characters.base import Character
from nucypher.characters.lawful import Ursula
from nucypher.config.constants import GLOBAL_DOMAIN
from nucypher.network.middleware import RestMiddleware
from nucypher.network.nodes import FleetStateTracker
from nucypher.utilities.logging import SimpleObserver
websocket_service = hey_joe.WebSocketService("127.0.0.1", 9000)
globalLogPublisher.addObserver(SimpleObserver())
known_node = Ursula.from_seed_and_stake_info(seed_uri=sys.argv[1],
federated_only=True,
minimum_stake=0)
rest_app = Flask("fleet-monitor")
class MonitoringTracker(FleetStateTracker):
def record_fleet_state(self, *args, **kwargs):
new_state_or_none = super(MonitoringTracker, self).record_fleet_state(*args, **kwargs)
if new_state_or_none:
checksum, new_state = new_state_or_none
hey_joe.send({checksum: self.abridged_state_details(new_state)}, "states")
return new_state_or_none
class Moe(Character):
"""
A monitor (lizard?)
"""
tracker_class = MonitoringTracker
_SHORT_LEARNING_DELAY = .5
def remember_node(self, *args, **kwargs):
new_node_or_none = super().remember_node(*args, **kwargs)
if new_node_or_none:
hey_joe.send({new_node_or_none.checksum_public_address: MonitoringTracker.abridged_node_details(new_node_or_none)}, "nodes")
def learn_from_teacher_node(self, *args, **kwargs):
teacher = self.current_teacher_node(cycle=False)
new_nodes = super().learn_from_teacher_node(*args, **kwargs)
hey_joe.send({teacher.checksum_public_address: MonitoringTracker.abridged_node_details(teacher)}, "nodes")
new_teacher = self.current_teacher_node(cycle=False)
hey_joe.send({"current_teacher": new_teacher.checksum_public_address}, "teachers")
return new_nodes
monitor = Moe(
domains=GLOBAL_DOMAIN,
network_middleware=RestMiddleware(),
known_nodes=[known_node],
federated_only=True,
)
monitor.start_learning_loop()
import time
import json
def send_states(subscriber):
message = ["states", monitor.known_nodes.abridged_states_dict()]
subscriber.sendMessage(json.dumps(message).encode())
def send_nodes(subscriber):
message = ["nodes", monitor.known_nodes.abridged_nodes_dict()]
subscriber.sendMessage(json.dumps(message).encode())
websocket_service.register_followup("states", send_states)
websocket_service.register_followup("nodes", send_nodes)
@rest_app.route("/")
def status():
# for node in monitor.known_nodes:
# hey_joe.send(node.status_json(), topic="nodes")
return render_template('monitor.html')
deployer = HendrixDeploy(action="start", options={"wsgi": rest_app, "http_port": 9750})
deployer.add_non_tls_websocket_service(websocket_service)
deployer.run()

View File

@ -0,0 +1,230 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<link rel="icon" type="image/x-icon" href="https://www.nucypher.com/favicon-32x32.png"/>
<script src="https://cdnjs.cloudflare.com/ajax/libs/handlebars.js/4.0.12/handlebars.js"></script>
<style type="text/css">
html {
font-family: sans-serif;
}
table, th, td {
border: 1px solid black;
}
.currentTeacher {
background-color: red;
}
.previousTeacher {
background-color: orange;
}
.nucypher-nickname-icon {
border-width: 10px;
border-style: solid;
margin: 3px;
padding: 3px;
text-align: center;
box-shadow: 1px 1px black, -1px -1px black;
width: 100px;
}
.small {
float: left;
width: 100%;
text-shadow: none;
font-family: sans;
font-size: 10px;
}
.symbols {
float: left;
width: 100%;
}
.single-symbol {
font-size: 3em;
color: black;
text-shadow: 1px 1px black, -1px -1px black;
}
.address, .small-address {
font-family: monospace;
}
.small-address {
text-shadow: none;
}
.state {
float: left;
}
#previous-states {
float: left;
clear: left;
}
#states .state {
margin: left: 10px;
border-right: 3px solid black;
}
#states .nucypher-nickname-icon {
height: 75px;
width: 75px;
}
#states .single-symbol {
font-size: 2em;
}
#known-nodes {
float: left;
clear: left;
}
.small-address {
text-shadow: none;
}
.state {
float: left;
}
#states {
float: left;
clear: left;
}
#states .state {
margin: left: 10px;
border-right: 3px solid black;
}
#states .nucypher-nickname-icon {
height: 75px;
width: 75px;
}
#states .single-symbol {
font-size: 2em;
}
#nodes {
float: left;
clear: left;
}
</style>
{% raw %}
<script id="state-template" type="text/x-handlebars-template">
<h5>{{nickname}}</h5>
<div class="body">
<div class="single-symbol">{{metadata.[0].[1]}}</div>
<span>{{checksum}}</span>
{{updated}}
</div>
</script>
<script id="node-template" type="text/x-handlebars-template">
<td>{{ nickname_icon }}</td>
<td>
<a href="https://{{rest_url}}/status">{{ nickname }}</a>
<br/><span class="small">{{ checksum_address }}</span>
</td>
<td>{{ timestamp }}</td>
<td>{{ last_seen }}</td>
<td>{{{ fleet_state_icon }}}</td>
</script>
{% endraw %}
<script type="text/javascript">
window.onload = function () {
var stateTemplate = Handlebars.compile(document.getElementById("state-template").innerHTML);
var nodeTemplate = Handlebars.compile(document.getElementById("node-template").innerHTML);
const socket = new WebSocket("ws://localhost:9000");
socket.binaryType = "arraybuffer";
socket.onopen = function () {
socket.send(JSON.stringify({'hx_subscribe': 'states'}));
socket.send(JSON.stringify({'hx_subscribe': 'nodes'}));
socket.send(JSON.stringify({'hx_subscribe': 'teachers'}));
isopen = true;
}
socket.addEventListener('message', function (event) {
console.log("Message from server ", event.data);
if (event.data.startsWith("[\"states\"")) {
message = JSON.parse(event.data);
Object.entries(message[1]).forEach((state) => {
var stateDiv = document.getElementById(state[0]);
if (stateDiv == null) {
var stateDiv = document.createElement("div");
stateDiv.setAttribute("id", state[0]);
stateDiv.className = "state";
statesDiv = document.getElementById("states");
statesDiv.appendChild(stateDiv)
}
var stateHtml = stateTemplate(state[1]);
stateDiv.innerHTML = stateHtml;
console.log(state[1]);
})
};
if (event.data.startsWith("[\"nodes\"")) {
message = JSON.parse(event.data);
Object.entries(message[1]).forEach((node) => {
var nodeRow = document.getElementById(node[0]);
if (nodeRow == null) {
var nodeRow = document.createElement("tr");
nodeRow.setAttribute("id", node[0]);
nodeRow.classname = "node";
nodesTable = document.getElementById("nodes");
nodesTable.prepend(nodeRow);
}
var nodeHtml = nodeTemplate(node[1]);
nodeRow.innerHTML = nodeHtml;
console.log(node[1]);
})
};
if (event.data.startsWith("[\"teachers\"")) {
previousTeacher = document.getElementsByClassName("previousTeacher")[0];
if (previousTeacher) {
previousTeacher.classList.remove("previousTeacher");
}
currentTeacher = document.getElementsByClassName("currentTeacher")[0];
if (currentTeacher) {
currentTeacher.classList.remove("currentTeacher");
currentTeacher.classList.add("previousTeacher");
}
message = JSON.parse(event.data);
newTeacher = document.getElementById(message[1].current_teacher);
newTeacher.classList.add("currentTeacher");
}
});
socket.onerror = function (error) {
console.log(error.data);
}
}
</script>
</head>
<div id="states"></div>
<hr/>
<table id="nodes"></table>
</html>

View File

@ -200,14 +200,21 @@ class Character(Learner):
self.known_nodes.record_fleet_state()
def __eq__(self, other) -> bool:
return bytes(self.stamp) == bytes(other.stamp)
try:
other_stamp = other.stamp
except (AttributeError, NoSigningPower):
return False
return bytes(self.stamp) == bytes(other_stamp)
def __hash__(self):
return int.from_bytes(bytes(self.stamp), byteorder="big")
def __repr__(self):
r = "{}↽ ({})"
r = r.format(self.nickname, self.checksum_public_address)
try:
r = r.format(self.nickname, self.checksum_public_address)
except NoSigningPower:
r = r.format(self.__class__.__name__, self.nickname)
return r
@property

View File

@ -15,21 +15,17 @@ You should have received a copy of the GNU General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import binascii
import random
from collections import defaultdict, OrderedDict
from collections import deque
from collections import namedtuple
from contextlib import suppress
from typing import Set, Tuple
import maya
import requests
import time
from bytestring_splitter import BytestringSplitter
from bytestring_splitter import VariableLengthBytestring, BytestringSplittingError
from constant_sorrow import constant_or_bytes
from constant_sorrow.constants import NO_KNOWN_NODES, NOT_SIGNED, NEVER_SEEN, NO_STORAGE_AVAILIBLE, FLEET_STATES_MATCH
from cryptography.x509 import Certificate
from eth_keys.datatypes import Signature as EthSignature
from requests.exceptions import SSLError
@ -37,8 +33,11 @@ from twisted.internet import reactor, defer
from twisted.internet import task
from twisted.internet.threads import deferToThread
from twisted.logger import Logger
from typing import Set, Tuple
from bytestring_splitter import BytestringSplitter
from bytestring_splitter import VariableLengthBytestring, BytestringSplittingError
from constant_sorrow import constant_or_bytes
from constant_sorrow.constants import NO_KNOWN_NODES, NOT_SIGNED, NEVER_SEEN, NO_STORAGE_AVAILIBLE, FLEET_STATES_MATCH
from nucypher.config.constants import SeednodeMetadata, GLOBAL_DOMAIN
from nucypher.config.storages import ForgetfulNodeStorage
from nucypher.crypto.api import keccak_digest
@ -85,7 +84,7 @@ class FleetStateTracker:
most_recent_node_change = NO_KNOWN_NODES
snapshot_splitter = BytestringSplitter(32, 4)
log = Logger("Learning")
state_template = namedtuple("FleetState", ("nickname", "icon", "nodes", "updated"))
state_template = namedtuple("FleetState", ("nickname", "metadata", "icon", "nodes", "updated"))
def __init__(self):
self.additional_nodes_to_track = []
@ -174,11 +173,14 @@ class FleetStateTracker:
self.updated = maya.now()
# For now we store the sorted node list. Someday we probably spin this out into
# its own class, FleetState, and use it as the basis for partial updates.
self.states[checksum] = self.state_template(nickname=self.nickname,
nodes=sorted_nodes,
icon=self.icon,
updated=self.updated,
)
new_state = self.state_template(nickname=self.nickname,
metadata=self.nickname_metadata,
nodes=sorted_nodes,
icon=self.icon,
updated=self.updated,
)
self.states[checksum] = new_state
return checksum, new_state
def start_tracking_state(self, additional_nodes_to_track=None):
if additional_nodes_to_track is None:
@ -196,6 +198,43 @@ class FleetStateTracker:
random.shuffle(nodes_we_know_about)
return nodes_we_know_about
def abridged_states_dict(self):
abridged_states = {}
for k, v in self.states.items():
abridged_states[k] = self.abridged_state_details(v)
return abridged_states
def abridged_nodes_dict(self):
abridged_nodes = {}
for checksum_address, node in self._nodes.items():
abridged_nodes[checksum_address] = self.abridged_node_details(node)
return abridged_nodes
@staticmethod
def abridged_state_details(state):
return {"nickname": state.nickname,
"metadata": state.metadata,
"updated": state.updated.iso8601()
}
@staticmethod
def abridged_node_details(node):
try:
last_seen = node.last_seen.iso8601()
except AttributeError: # TODO: This logic belongs somewhere - anywhere - else.
last_seen = str(node.last_seen)
return {"nickname_metadata": node.nickname_metadata,
"rest_url": node.rest_url(),
"nickname": node.nickname,
"checksum_address": node.checksum_public_address,
"timestamp": node.timestamp.iso8601(),
"last_seen": last_seen,
"fleet_state_icon": node.fleet_state_icon,
}
class Learner:
"""
@ -217,10 +256,12 @@ class Learner:
LEARNER_VERSION = LEARNING_LOOP_VERSION
node_splitter = BytestringSplitter(VariableLengthBytestring)
version_splitter = BytestringSplitter((int, 2, {"byteorder": "big"}))
tracker_class = FleetStateTracker
invalid_metadata_message = "{} has invalid metadata. Maybe its stake is over? Or maybe it is transitioning to a new interface. Ignoring."
unknown_version_message = "{} purported to be of version {}, but we're only version {}. Is there a new version of NuCypher?"
really_unknown_version_message = "Unable to glean address from node that perhaps purported to be version {}. We're only version {}."
fleet_state_icon = ""
class NotEnoughTeachers(RuntimeError):
pass
@ -253,7 +294,7 @@ class Learner:
self._learning_listeners = defaultdict(list)
self._node_ids_to_learn_about_immediately = set()
self.__known_nodes = FleetStateTracker()
self.__known_nodes = self.tracker_class()
self.lonely = lonely
self.done_seeding = False
@ -272,14 +313,15 @@ class Learner:
self.unresponsive_startup_nodes = list() # TODO: Attempt to use these again later
for node in known_nodes:
try:
self.remember_node(node) # TODO: Need to test this better - do we ever init an Ursula-Learner with Node Storage?
self.remember_node(
node) # TODO: Need to test this better - do we ever init an Ursula-Learner with Node Storage?
except self.UnresponsiveTeacher:
self.unresponsive_startup_nodes.append(node)
self.teacher_nodes = deque()
self._current_teacher_node = None # type: Teacher
self._current_teacher_node = None # type: Teacher
self._learning_task = task.LoopingCall(self.keep_learning_about_nodes)
self._learning_round = 0 # type: int
self._learning_round = 0 # type: int
self._rounds_without_new_nodes = 0 # type: int
self._seed_nodes = seed_nodes or []
self.unresponsive_seed_nodes = set()
@ -354,7 +396,8 @@ class Learner:
try:
node.verify_node(force=force_verification_check,
network_middleware=self.network_middleware,
accept_federated_only=self.federated_only, # TODO: 466 - move federated-only up to Learner?
accept_federated_only=self.federated_only,
# TODO: 466 - move federated-only up to Learner?
certificate_filepath=certificate_filepath)
except SSLError:
return False # TODO: Bucket this node as having bad TLS info - maybe it's an update that hasn't fully propagated?
@ -367,8 +410,6 @@ class Learner:
address = node.checksum_public_address
self.known_nodes[address] = node
if self in self.known_nodes:
raise RuntimeError
if self.save_metadata:
node.certificate_filepath = certificate_filepath
@ -382,7 +423,7 @@ class Learner:
if record_fleet_state:
self.known_nodes.record_fleet_state()
return True
return node
def start_learning_loop(self, now=False):
if self._learning_task.running:
@ -686,16 +727,25 @@ class Learner:
current_teacher.last_seen = maya.now()
# TODO: This is weird - let's get a stranger FleetState going.
checksum = fleet_state_checksum_bytes.hex()
current_teacher.update_snapshot(checksum=checksum,
updated=maya.MayaDT(int.from_bytes(fleet_state_updated_bytes, byteorder="big")))
# TODO: This doesn't make sense - a decentralized node can still learn about a federated-only node.
from nucypher.characters.lawful import Ursula
if constant_or_bytes(node_payload) is FLEET_STATES_MATCH:
current_teacher.update_snapshot(checksum=checksum,
updated=maya.MayaDT(
int.from_bytes(fleet_state_updated_bytes, byteorder="big")),
number_of_known_nodes=len(self.known_nodes)
)
return FLEET_STATES_MATCH
node_list = Ursula.batch_from_bytes(node_payload, federated_only=self.federated_only) # TODO: 466
current_teacher.update_snapshot(checksum=checksum,
updated=maya.MayaDT(
int.from_bytes(fleet_state_updated_bytes, byteorder="big")),
number_of_known_nodes=len(node_list)
)
new_nodes = []
for node in node_list:
if GLOBAL_DOMAIN not in self.learning_domains:
@ -824,15 +874,15 @@ class Teacher:
proper_address = proper_pubkey.to_checksum_address()
return proper_address == self.checksum_public_address
def update_snapshot(self, checksum, updated):
# TODO: Kind of an interesting pattern here - with VerifiableNode increasingly looking like it will be Teacher.
def update_snapshot(self, checksum, updated, number_of_known_nodes):
# We update the simple snapshot here, but of course if we're dealing with an instance that is also a Learner, it has
# its own notion of its FleetState, so we probably need a reckoning of sorts here to manage that. In time.
self.fleet_state_nickname, self.fleet_state_nickname_metadata = nickname_from_seed(checksum, number_of_pairs=1)
self.fleet_state_checksum = checksum
self.fleet_state_updated = updated
self.fleet_state_icon = icon_from_checksum(self.fleet_state_checksum,
nickname_metadata=self.fleet_state_nickname_metadata)
nickname_metadata=self.fleet_state_nickname_metadata,
number_of_nodes=number_of_known_nodes)
#
# Stamp
@ -907,8 +957,8 @@ class Teacher:
# The node's metadata is valid; let's be sure the interface is in order.
response_data = network_middleware.node_information(host=self.rest_information()[0].host,
port=self.rest_information()[0].port,
certificate_filepath=certificate_filepath)
port=self.rest_information()[0].port,
certificate_filepath=certificate_filepath)
version, node_bytes = self.version_splitter(response_data, return_remainder=True)