mirror of https://github.com/nucypher/nucypher.git
State information is now obtained by crawler and displayed in dashboard
parent
da9852d5e2
commit
0d8c41b2f0
|
@ -35,8 +35,6 @@ from nucypher.blockchain.eth.registry import BaseContractRegistry
|
|||
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
|
||||
from nucypher.blockchain.eth.decorators import validate_checksum_address
|
||||
|
||||
from constant_sorrow.constants import UNKNOWN_FLEET_STATE
|
||||
|
||||
|
||||
class NodeStorage(ABC):
|
||||
_name = NotImplemented
|
||||
|
@ -285,7 +283,8 @@ class SQLiteForgetfulNodeStorage(ForgetfulNodeStorage):
|
|||
SQLite forgetful storage of node metadata
|
||||
"""
|
||||
_name = 'sqlite-memory'
|
||||
DB_NAME = 'node_info'
|
||||
NODE_DB_NAME = 'node_info'
|
||||
STATE_DB_NAME = 'fleet_state'
|
||||
PARENT_DIR_PREFIX = 'nucypher-sql-storage-tmp'
|
||||
METADATA_CACHE_PREFIX = 'sql-cache'
|
||||
|
||||
|
@ -307,7 +306,7 @@ class SQLiteForgetfulNodeStorage(ForgetfulNodeStorage):
|
|||
_, self.__metadata_db = tempfile.mkstemp(prefix=self.METADATA_CACHE_PREFIX, dir=self.__parent_dir)
|
||||
|
||||
self.__db_conn = sqlite3.connect(self.__metadata_db)
|
||||
self.__create_db_table()
|
||||
self.__create_db_tables()
|
||||
|
||||
super().__init__(parent_dir=self.__parent_dir, *args, **kwargs)
|
||||
|
||||
|
@ -320,9 +319,12 @@ class SQLiteForgetfulNodeStorage(ForgetfulNodeStorage):
|
|||
os.remove(self.__metadata_db)
|
||||
|
||||
def store_node_metadata(self, node, filepath: str = None):
|
||||
self.__write_metadata(node)
|
||||
self.__write_node_metadata(node)
|
||||
return super().store_node_metadata(node=node, filepath=filepath)
|
||||
|
||||
def store_state_metadata(self, state):
|
||||
self.__write_state_metadata(state)
|
||||
|
||||
@validate_checksum_address
|
||||
def remove(self,
|
||||
checksum_address: str,
|
||||
|
@ -332,14 +334,16 @@ class SQLiteForgetfulNodeStorage(ForgetfulNodeStorage):
|
|||
|
||||
if metadata is True:
|
||||
with self.__db_conn:
|
||||
self.__db_conn.execute(f"DELETE FROM {self.DB_NAME} WHERE staker_address='{checksum_address}'")
|
||||
self.__db_conn.execute(f"DELETE FROM {self.NODE_DB_NAME} WHERE staker_address='{checksum_address}'")
|
||||
|
||||
return super().remove(checksum_address=checksum_address, metadata=metadata, certificate=certificate)
|
||||
|
||||
def clear(self, metadata: bool = True, certificates: bool = True) -> None:
|
||||
if metadata is True:
|
||||
with self.__db_conn:
|
||||
self.__db_conn.execute(f"DELETE FROM {self.DB_NAME}")
|
||||
self.__db_conn.execute(f"DELETE FROM {self.NODE_DB_NAME}")
|
||||
# TODO: do we need to clear the states table here?
|
||||
self.__db_conn.execute(f"DELETE FORM {self.STATE_DB_NAME}")
|
||||
|
||||
super().clear(metadata=metadata, certificates=certificates)
|
||||
|
||||
|
@ -348,48 +352,42 @@ class SQLiteForgetfulNodeStorage(ForgetfulNodeStorage):
|
|||
self.__parent_dir = tempfile.mkdtemp(prefix=self.PARENT_DIR_PREFIX)
|
||||
_, metadata_cache = tempfile.mkstemp(prefix=self.METADATA_CACHE_PREFIX, dir=self.__parent_dir)
|
||||
self.__db_conn = sqlite3.connect(metadata_cache)
|
||||
self.__create_db_table()
|
||||
self.__create_db_tables()
|
||||
|
||||
return super().initialize()
|
||||
|
||||
def __create_db_table(self):
|
||||
def __create_db_tables(self):
|
||||
with self.__db_conn:
|
||||
# ensure table is empty
|
||||
self.__db_conn.execute(f"DROP TABLE IF EXISTS {self.DB_NAME}")
|
||||
# ensure tables are empty
|
||||
for table in [self.NODE_DB_NAME, self.STATE_DB_NAME]:
|
||||
self.__db_conn.execute(f"DROP TABLE IF EXISTS {table}")
|
||||
|
||||
# create fresh new table (same columns names as FleetStateTracker.abridged_nodes_details)
|
||||
self.__db_conn.execute(f"CREATE TABLE {self.DB_NAME} (staker_address text primary key, rest_url text, "
|
||||
# create fresh new node table (same column names as FleetStateTracker.abridged_nodes_details)
|
||||
self.__db_conn.execute(f"CREATE TABLE {self.NODE_DB_NAME} (staker_address text primary key, rest_url text, "
|
||||
f"nickname text, timestamp text, last_seen text, fleet_state_icon text)")
|
||||
|
||||
def __write_metadata(self, node):
|
||||
# Staker address
|
||||
staker_address = node.checksum_address
|
||||
# create fresh new state table (same column names as FleetStateTracker.abridged_state_details)
|
||||
self.__db_conn.execute(f"CREATE TABLE {self.STATE_DB_NAME} (nickname text primary key, symbol text, "
|
||||
f"color_hex text, color_name text, updated text)")
|
||||
|
||||
# REST URL
|
||||
rest_url = node.rest_url()
|
||||
|
||||
# Nickname
|
||||
nickname = node.nickname
|
||||
|
||||
# Timestamp
|
||||
timestamp = node.timestamp.iso8601()
|
||||
|
||||
# Last Seen
|
||||
try:
|
||||
last_seen = node.last_seen.iso8601()
|
||||
except AttributeError: # TODO: This logic belongs somewhere - anywhere - else.
|
||||
last_seen = str(node.last_seen) # In case it's the constant NEVER_SEEN
|
||||
|
||||
# Fleet state icon
|
||||
fleet_icon = node.fleet_state_nickname_metadata
|
||||
if fleet_icon is UNKNOWN_FLEET_STATE:
|
||||
fleet_icon = "?" # TODO
|
||||
else:
|
||||
fleet_icon = fleet_icon[0][1]
|
||||
|
||||
db_row = (staker_address, rest_url, nickname, timestamp, last_seen, fleet_icon)
|
||||
def __write_node_metadata(self, node):
|
||||
from nucypher.network.nodes import FleetStateTracker
|
||||
node_dict = FleetStateTracker.abridged_node_details(node)
|
||||
db_row = (node_dict['staker_address'], node_dict['rest_url'], node_dict['nickname'],
|
||||
node_dict['timestamp'], node_dict['last_seen'], node_dict['fleet_state_icon'])
|
||||
with self.__db_conn:
|
||||
self.__db_conn.execute(f'REPLACE INTO {self.DB_NAME} VALUES(?,?,?,?,?,?)', db_row)
|
||||
self.__db_conn.execute(f'REPLACE INTO {self.NODE_DB_NAME} VALUES(?,?,?,?,?,?)', db_row)
|
||||
|
||||
def __write_state_metadata(self, state):
|
||||
from nucypher.network.nodes import FleetStateTracker
|
||||
state_dict = FleetStateTracker.abridged_state_details(state)
|
||||
# convert updated timestamp format for supported sqlite3 sorting
|
||||
state_dict['updated'] = state.updated.rfc3339()
|
||||
db_row = (state_dict['nickname'], state_dict['symbol'], state_dict['color_hex'],
|
||||
state_dict['color_name'], state_dict['updated'])
|
||||
with self.__db_conn:
|
||||
self.__db_conn.execute(f'REPLACE INTO {self.STATE_DB_NAME} VALUES(?,?,?,?,?)', db_row)
|
||||
# TODO we should limit the size of this table - no reason to store really old state values
|
||||
|
||||
|
||||
class LocalFileBasedNodeStorage(NodeStorage):
|
||||
|
|
|
@ -3,7 +3,6 @@ import os
|
|||
import dash_daq as daq
|
||||
import dash_html_components as html
|
||||
from constant_sorrow.constants import UNKNOWN_FLEET_STATE
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from dash import Dash
|
||||
from flask import Flask
|
||||
from maya import MayaDT
|
||||
|
@ -13,10 +12,6 @@ from twisted.logger import Logger
|
|||
import nucypher
|
||||
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface
|
||||
from nucypher.characters.base import Character
|
||||
from nucypher.keystore.keypairs import HostingKeypair
|
||||
from nucypher.network.nodes import Learner, FleetStateTracker
|
||||
from nucypher.network.server import TLSHostingPower
|
||||
|
||||
|
||||
class NetworkStatusPage:
|
||||
|
@ -40,31 +35,30 @@ class NetworkStatusPage:
|
|||
def header() -> html.Div:
|
||||
return html.Div([html.Div(f'v{nucypher.__version__}', id='version')], className="logo-widget")
|
||||
|
||||
def previous_states(self, learner: Learner) -> html.Div:
|
||||
previous_states = list(reversed(learner.known_nodes.states.values()))[:5] # only latest 5
|
||||
def previous_states(self, states_dict_list) -> html.Div:
|
||||
return html.Div([
|
||||
html.H4('Previous States'),
|
||||
html.Div([
|
||||
self._states_table(previous_states)
|
||||
self._states_table(states_dict_list)
|
||||
]),
|
||||
], className='row')
|
||||
|
||||
def _states_table(self, states) -> html.Table:
|
||||
def _states_table(self, states_dict_list) -> html.Table:
|
||||
row = []
|
||||
for state in states:
|
||||
for state_dict in states_dict_list:
|
||||
# add previous states in order (already reversed)
|
||||
row.append(html.Td(self.state_detail(FleetStateTracker.abridged_state_details(state))))
|
||||
row.append(html.Td(self.state_detail(state_dict)))
|
||||
return html.Table([html.Tr(row, id='state-table')])
|
||||
|
||||
@staticmethod
|
||||
def state_detail(state_detail_dict) -> html.Div:
|
||||
def state_detail(state_dict) -> html.Div:
|
||||
return html.Div([
|
||||
html.Div([
|
||||
html.Div(state_detail_dict['symbol'], className='single-symbol'),
|
||||
], className='nucypher-nickname-icon', style={'border-color': state_detail_dict['color_hex']}),
|
||||
html.Span(state_detail_dict['nickname']),
|
||||
html.Span(state_detail_dict['updated'], className='small'),
|
||||
], className='state', style={'background-color': state_detail_dict['color_hex']})
|
||||
html.Div(state_dict['symbol'], className='single-symbol'),
|
||||
], className='nucypher-nickname-icon', style={'border-color': state_dict['color_hex']}),
|
||||
html.Span(state_dict['nickname']),
|
||||
html.Span(state_dict['updated'], className='small'),
|
||||
], className='state', style={'background-color': state_dict['color_hex']})
|
||||
|
||||
def known_nodes(self, nodes_dict: dict, registry, teacher_checksum: str = None) -> html.Div:
|
||||
nodes = list()
|
||||
|
|
|
@ -9,7 +9,7 @@ from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent, N
|
|||
from nucypher.blockchain.eth.token import NU, StakeList
|
||||
from nucypher.blockchain.eth.utils import datetime_at_period
|
||||
from nucypher.config.storages import SQLiteForgetfulNodeStorage
|
||||
from nucypher.network.nodes import Learner
|
||||
from nucypher.network.nodes import Learner, FleetStateTracker
|
||||
from nucypher.network.status_app.db import BlockchainCrawlerClient
|
||||
|
||||
|
||||
|
@ -59,6 +59,16 @@ class NetworkCrawler(Learner):
|
|||
node_storage = SQLiteForgetfulNodeStorage(federated_only=False,
|
||||
parent_dir=storage_dir,
|
||||
db_filename=db_filename)
|
||||
|
||||
class MonitoringTracker(FleetStateTracker):
|
||||
def record_fleet_state(self, *args, **kwargs):
|
||||
new_state_or_none = super().record_fleet_state(*args, **kwargs)
|
||||
if new_state_or_none:
|
||||
_, new_state = new_state_or_none
|
||||
node_storage.store_state_metadata(new_state)
|
||||
|
||||
self.tracker_class = MonitoringTracker
|
||||
|
||||
super().__init__(save_metadata=True, node_storage=node_storage, *args, **kwargs)
|
||||
self.log = Logger('network-crawler')
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@ from datetime import datetime, timedelta
|
|||
from influxdb import InfluxDBClient
|
||||
from maya import MayaDT
|
||||
|
||||
from nucypher.config.storages import SQLiteForgetfulNodeStorage
|
||||
|
||||
|
||||
class BlockchainCrawlerClient:
|
||||
"""
|
||||
|
@ -83,7 +85,7 @@ class NodeMetadataClient:
|
|||
# dash threading means that connection needs to be established in same thread as use
|
||||
db_conn = sqlite3.connect(self._metadata_filepath)
|
||||
try:
|
||||
result = db_conn.execute(f"SELECT * FROM node_info")
|
||||
result = db_conn.execute(f"SELECT * FROM {SQLiteForgetfulNodeStorage.NODE_DB_NAME}")
|
||||
|
||||
# TODO use `pandas` package instead to automatically get dict?
|
||||
known_nodes = dict()
|
||||
|
@ -98,3 +100,29 @@ class NodeMetadataClient:
|
|||
return known_nodes
|
||||
finally:
|
||||
db_conn.close()
|
||||
|
||||
def get_previous_states_metadata(self, limit: int = 5) -> dict:
|
||||
# dash threading means that connection needs to be established in same thread as use
|
||||
db_conn = sqlite3.connect(self._metadata_filepath)
|
||||
states_dict_list = []
|
||||
try:
|
||||
result = db_conn.execute(f"SELECT * FROM {SQLiteForgetfulNodeStorage.STATE_DB_NAME} "
|
||||
f"ORDER BY datetime(updated) DESC LIMIT {limit}")
|
||||
|
||||
# TODO use `pandas` package instead to automatically get dict?
|
||||
column_names = [description[0] for description in result.description]
|
||||
for row in result:
|
||||
state_info = dict()
|
||||
for idx, value in enumerate(row):
|
||||
column_name = column_names[idx]
|
||||
if column_name == 'updated':
|
||||
# convert column from rfc3339 (for sorting) back to rfc2822
|
||||
# TODO does this matter for displaying?
|
||||
state_info[column_name] = MayaDT.from_rfc3339(row[idx]).rfc2822()
|
||||
else:
|
||||
state_info[column_name] = row[idx]
|
||||
states_dict_list.append(state_info)
|
||||
|
||||
return states_dict_list
|
||||
finally:
|
||||
db_conn.close()
|
||||
|
|
|
@ -71,8 +71,8 @@ class MonitorDashboardApp(NetworkStatusPage):
|
|||
|
||||
# States and Known Nodes Table
|
||||
html.Div([
|
||||
# html.Div(id='prev-states'),
|
||||
# html.Br(),
|
||||
html.Div(id='prev-states'),
|
||||
html.Br(),
|
||||
html.Div(id='known-nodes'),
|
||||
])
|
||||
]),
|
||||
|
@ -103,11 +103,12 @@ class MonitorDashboardApp(NetworkStatusPage):
|
|||
def header(pathname):
|
||||
return self.header()
|
||||
|
||||
# @self.dash_app.callback(Output('prev-states', 'children'),
|
||||
# [Input('state-update-button', 'n_clicks'),
|
||||
# Input('minute-interval', 'n_intervals')])
|
||||
# def state(n_clicks, n_intervals):
|
||||
# return self.previous_states(moe)
|
||||
@self.dash_app.callback(Output('prev-states', 'children'),
|
||||
[Input('state-update-button', 'n_clicks'),
|
||||
Input('minute-interval', 'n_intervals')])
|
||||
def state(n_clicks, n_intervals):
|
||||
states_dict_list = self.node_metadata_db_client.get_previous_states_metadata()
|
||||
return self.previous_states(states_dict_list=states_dict_list)
|
||||
|
||||
@self.dash_app.callback(Output('known-nodes', 'children'),
|
||||
[Input('node-update-button', 'n_clicks'),
|
||||
|
|
|
@ -55,7 +55,11 @@ class UrsulaStatusApp(NetworkStatusPage):
|
|||
@self.dash_app.callback(Output('prev-states', 'children'), [Input('status-update', 'n_intervals')])
|
||||
def state(n):
|
||||
"""Simply update periodically"""
|
||||
return self.previous_states(ursula)
|
||||
previous_states = list(reversed(ursula.known_nodes.states.values()))[:5] # only latest 5
|
||||
states_dict_list = []
|
||||
for previous_state in previous_states:
|
||||
states_dict_list.append(ursula.known_nodes.abridged_state_details(previous_state))
|
||||
return self.previous_states(states_dict_list)
|
||||
|
||||
@self.dash_app.callback(Output('known-nodes', 'children'), [Input('status-update', 'n_intervals')])
|
||||
def known_nodes(n):
|
||||
|
|
Loading…
Reference in New Issue