feat(metrics): bytes/frames sent/received & peers

Emit metrics tracking the number of bytes sent / received, and number of
frames sent / received by the local node.

Track the number of discovered peers to record peer discovery rate and
current number of known peers per node.
pull/24376/head
Dom Dwyer 2023-06-14 15:12:50 +02:00
parent 93789d7abb
commit 48466bfa89
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
9 changed files with 170 additions and 25 deletions

1
Cargo.lock generated
View File

@ -2007,6 +2007,7 @@ dependencies = [
"async-trait",
"futures",
"hashbrown 0.14.0",
"metric",
"prost",
"prost-build",
"test_helpers",

View File

@ -7,6 +7,7 @@ edition = "2021"
async-trait = "0.1.68"
futures = "0.3.28"
hashbrown.workspace = true
metric = { version = "0.1.0", path = "../metric" }
prost = "0.11.9"
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] }

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use tokio::{
net::{ToSocketAddrs, UdpSocket},
sync::mpsc,
@ -10,6 +12,7 @@ use crate::{handle::GossipHandle, reactor::Reactor, Dispatcher};
pub struct Builder<T> {
seed_addrs: Vec<String>,
dispatcher: T,
metric: Arc<metric::Registry>,
}
impl<T> Builder<T> {
@ -18,10 +21,11 @@ impl<T> Builder<T> {
///
/// Each address in `seed_addrs` is re-resolved periodically and the first
/// resolved IP address is used for peer communication.
pub fn new(seed_addrs: Vec<String>, dispatcher: T) -> Self {
pub fn new(seed_addrs: Vec<String>, dispatcher: T, metric: Arc<metric::Registry>) -> Self {
Self {
seed_addrs,
dispatcher,
metric,
}
}
}
@ -41,7 +45,7 @@ where
let (tx, rx) = mpsc::channel(1000);
// Initialise the reactor
let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher);
let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher, &self.metric);
let identity = reactor.identity().clone();
// Start the message reactor.

View File

@ -38,6 +38,7 @@
mod builder;
mod dispatcher;
mod handle;
mod metric;
mod peers;
mod proto;
mod reactor;

60
gossip/src/metric.rs Normal file
View File

@ -0,0 +1,60 @@
//! Metric newtype wrappers for type safety.
//!
//! The metrics are easily confused (they're all counters) so have the compiler
//! check the right ones are being used in the right places.
use metric::U64Counter;
#[derive(Debug, Clone)]
pub(crate) struct SentFrames(metric::U64Counter);
impl SentFrames {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
#[derive(Debug)]
pub(crate) struct ReceivedFrames(metric::U64Counter);
impl ReceivedFrames {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
#[derive(Debug, Clone)]
pub(crate) struct SentBytes(metric::U64Counter);
impl SentBytes {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
#[derive(Debug)]
pub(crate) struct ReceivedBytes(metric::U64Counter);
impl ReceivedBytes {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
pub(crate) fn new_metrics(
metrics: &metric::Registry,
) -> (SentFrames, ReceivedFrames, SentBytes, ReceivedBytes) {
let metric_frames = metrics.register_metric::<U64Counter>(
"gossip_frames",
"number of frames sent/received by this node",
);
let metric_bytes = metrics
.register_metric::<U64Counter>("gossip_bytes", "sum of bytes sent/received by this node");
(
SentFrames(metric_frames.recorder(&[("direction", "sent")])),
ReceivedFrames(metric_frames.recorder(&[("direction", "received")])),
SentBytes(metric_bytes.recorder(&[("direction", "sent")])),
ReceivedBytes(metric_bytes.recorder(&[("direction", "received")])),
)
}

View File

@ -2,12 +2,16 @@ use std::{io, net::SocketAddr};
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::{hash_map::RawEntryMut, HashMap};
use metric::U64Counter;
use prost::bytes::Bytes;
use tokio::net::UdpSocket;
use tracing::{trace, warn};
use uuid::Uuid;
use crate::MAX_FRAME_BYTES;
use crate::{
metric::{SentBytes, SentFrames},
MAX_FRAME_BYTES,
};
/// A unique generated identity containing 128 bits of randomness (V4 UUID).
#[derive(Debug, Eq, Clone)]
@ -69,7 +73,13 @@ pub(crate) struct Peer {
}
impl Peer {
pub(crate) async fn send(&self, buf: &[u8], socket: &UdpSocket) -> Result<usize, io::Error> {
pub(crate) async fn send(
&self,
buf: &[u8],
socket: &UdpSocket,
frames_sent: &SentFrames,
bytes_sent: &SentBytes,
) -> Result<usize, io::Error> {
// If the frame is larger than the allowed maximum, then the receiver
// will truncate the frame when reading the socket.
//
@ -88,6 +98,8 @@ impl Peer {
let ret = socket.send_to(buf, self.addr).await;
match &ret {
Ok(n_bytes) => {
frames_sent.inc(1);
bytes_sent.inc(*n_bytes);
trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame")
}
Err(e) => {
@ -102,14 +114,25 @@ impl Peer {
#[derive(Debug, Default)]
pub(crate) struct PeerList {
list: HashMap<Identity, Peer>,
/// The number of known, believed-to-be-healthy peers.
metric_peer_count: metric::U64Counter,
}
impl PeerList {
/// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`]
/// instances.
pub(crate) fn with_capacity(cap: usize) -> Self {
pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self {
let metric_peer_count = metrics
.register_metric::<U64Counter>(
"gossip_known_peers",
"number of likely healthy peers known to this node",
)
.recorder(&[]);
Self {
list: HashMap::with_capacity(cap),
metric_peer_count,
}
}
@ -123,6 +146,7 @@ impl PeerList {
pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer {
let p = match self.list.raw_entry_mut().from_key(identity) {
RawEntryMut::Vacant(v) => {
self.metric_peer_count.inc(1);
v.insert(
identity.to_owned(),
Peer {
@ -141,10 +165,16 @@ impl PeerList {
/// Broadcast `buf` to all known peers over `socket`, returning the number
/// of bytes sent in total.
pub(crate) async fn broadcast(&self, buf: &[u8], socket: &UdpSocket) -> usize {
pub(crate) async fn broadcast(
&self,
buf: &[u8],
socket: &UdpSocket,
frames_sent: &SentFrames,
bytes_sent: &SentBytes,
) -> usize {
self.list
.values()
.map(|v| v.send(buf, socket))
.map(|v| v.send(buf, socket, frames_sent, bytes_sent))
.collect::<FuturesUnordered<_>>()
.fold(0, |acc, res| async move {
match res {

View File

@ -8,6 +8,7 @@ use tokio::{
use tracing::{debug, error, info, trace, warn};
use crate::{
metric::*,
peers::{Identity, PeerList},
proto::{self, frame_message::Payload, FrameMessage},
seed::{seed_ping_task, Seed},
@ -83,14 +84,28 @@ pub(crate) struct Reactor<T> {
/// contain less peers than the number of initial seeds.
peer_list: PeerList,
/// The UDP socket used for communication with peers.
socket: Arc<UdpSocket>,
/// The count of frames sent and received.
metric_frames_sent: SentFrames,
metric_frames_received: ReceivedFrames,
/// The sum of bytes sent and received.
metric_bytes_sent: SentBytes,
metric_bytes_received: ReceivedBytes,
}
impl<T> Reactor<T>
where
T: Dispatcher,
{
pub(crate) fn new(seed_list: Vec<String>, socket: UdpSocket, dispatch: T) -> Self {
pub(crate) fn new(
seed_list: Vec<String>,
socket: UdpSocket,
dispatch: T,
metrics: &metric::Registry,
) -> Self {
// Generate a unique UUID for this Reactor instance, and cache the wire
// representation.
let identity = Identity::new();
@ -117,6 +132,11 @@ where
serialisation_buf.clone()
};
// Initialise the various metrics with wrappers to help distinguish
// between the (very similar) counters.
let (metric_frames_sent, metric_frames_received, metric_bytes_sent, metric_bytes_received) =
new_metrics(metrics);
// Spawn a task that periodically pings all known seeds.
//
// Pinging all seeds announces this node as alive, propagating the
@ -126,6 +146,8 @@ where
Arc::clone(&seed_list),
Arc::clone(&socket),
cached_ping_frame,
metric_frames_sent.clone(),
metric_bytes_sent.clone(),
)));
Self {
@ -133,10 +155,14 @@ where
identity,
cached_frame,
serialisation_buf,
peer_list: PeerList::with_capacity(seed_list.len()),
peer_list: PeerList::with_capacity(seed_list.len(), metrics),
seed_list,
_seed_ping_task: seed_ping_task,
socket,
metric_frames_sent,
metric_frames_received,
metric_bytes_sent,
metric_bytes_received,
}
}
@ -148,10 +174,10 @@ where
);
loop {
let (_bytes_read, _bytes_sent) = tokio::select! {
tokio::select! {
msg = self.read() => {
match msg {
Ok((bytes_read, bytes_sent)) => (bytes_read, bytes_sent),
Ok(()) => {},
Err(Error::NoPayload { peer, addr }) => {
warn!(%peer, %addr, "message contains no payload");
continue;
@ -182,7 +208,6 @@ where
}
Some(Request::GetPeers(tx)) => {
let _ = tx.send(self.peer_list.peer_uuids());
(0, 0)
},
Some(Request::Broadcast(payload)) => {
// The user is guaranteed MAX_USER_PAYLOAD_BYTES to
@ -196,8 +221,7 @@ where
{
continue
}
let bytes_sent = self.peer_list.broadcast(&self.serialisation_buf, &self.socket).await;
(0, bytes_sent)
self.peer_list.broadcast(&self.serialisation_buf, &self.socket, &self.metric_frames_sent, &self.metric_bytes_sent).await;
}
}
}
@ -212,9 +236,11 @@ where
/// returns the result to the sender of the original frame.
///
/// Returns the bytes read and bytes sent during execution of this method.
async fn read(&mut self) -> Result<(usize, usize), Error> {
async fn read(&mut self) -> Result<(), Error> {
// Read a frame into buf.
let (bytes_read, frame, peer_addr) = read_frame(&self.socket).await?;
self.metric_frames_received.inc(1);
self.metric_bytes_received.inc(bytes_read as _);
// Read the peer identity from the frame
let identity =
@ -228,7 +254,7 @@ where
// this node will not be added to the active peer list.
if identity == self.identity {
debug!(%identity, %peer_addr, bytes_read, "dropping frame from self");
return Ok((bytes_read, 0));
return Ok(());
}
// Find or create the peer in the peer list.
@ -264,7 +290,7 @@ where
// Sometimes no message will be returned to the peer - there's no need
// to send an empty frame.
if out_messages.is_empty() {
return Ok((bytes_read, 0));
return Ok(());
}
// Serialise the frame into the serialisation buffer.
@ -274,9 +300,15 @@ where
&mut self.serialisation_buf,
)?;
let bytes_sent = peer.send(&self.serialisation_buf, &self.socket).await?;
peer.send(
&self.serialisation_buf,
&self.socket,
&self.metric_frames_sent,
&self.metric_bytes_sent,
)
.await?;
Ok((bytes_read, bytes_sent))
Ok(())
}
/// Return the randomised identity assigned to this instance.
@ -351,10 +383,18 @@ fn new_payload(p: Payload) -> proto::FrameMessage {
}
/// Send a PING message to `socket`, using `peer_name` as logging context.
pub(crate) async fn ping(ping_frame: &[u8], socket: &UdpSocket, addr: SocketAddr) -> usize {
pub(crate) async fn ping(
ping_frame: &[u8],
socket: &UdpSocket,
addr: SocketAddr,
sent_frames: &SentFrames,
sent_bytes: &SentBytes,
) -> usize {
match socket.send_to(ping_frame, &addr).await {
Ok(n_bytes) => {
debug!(addr = %addr, "ping");
sent_frames.inc(1);
sent_bytes.inc(n_bytes);
n_bytes
}
Err(e) => {

View File

@ -7,7 +7,11 @@ use tokio::{
};
use tracing::{debug, warn};
use crate::{reactor::ping, RESOLVE_TIMEOUT, SEED_PING_INTERVAL};
use crate::{
metric::{SentBytes, SentFrames},
reactor::ping,
RESOLVE_TIMEOUT, SEED_PING_INTERVAL,
};
/// The user-provided seed peer address.
///
@ -63,6 +67,8 @@ pub(super) async fn seed_ping_task(
seeds: Arc<[Seed]>,
socket: Arc<UdpSocket>,
ping_frame: Vec<u8>,
sent_frames: SentFrames,
sent_bytes: SentBytes,
) {
let mut interval = tokio::time::interval(SEED_PING_INTERVAL);
@ -77,7 +83,7 @@ pub(super) async fn seed_ping_task(
.iter()
.map(|seed| async {
if let Some(addr) = seed.resolve().await {
ping(&ping_frame, &socket, addr).await
ping(&ping_frame, &socket, addr, &sent_frames, &sent_bytes).await
} else {
0
}

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use test_helpers::{maybe_start_logging, timeout::FutureTimeout};
use tokio::{net::UdpSocket, sync::mpsc};
@ -11,6 +11,8 @@ use gossip::*;
async fn test_payload_exchange() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
// How long to wait for peer discovery to complete.
const TIMEOUT: Duration = Duration::from_secs(5);
@ -32,8 +34,8 @@ async fn test_payload_exchange() {
// Initialise both reactors
let addrs = dbg!(vec![a_addr.to_string(), b_addr.to_string()]);
let a = Builder::new(addrs.clone(), a_tx).build(a_socket);
let b = Builder::new(addrs, b_tx).build(b_socket);
let a = Builder::new(addrs.clone(), a_tx, Arc::clone(&metrics)).build(a_socket);
let b = Builder::new(addrs, b_tx, Arc::clone(&metrics)).build(b_socket);
// Wait for peer discovery to occur
async {