From 48466bfa89491969f6d0571c834e3317fb6286a1 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 14 Jun 2023 15:12:50 +0200 Subject: [PATCH] feat(metrics): bytes/frames sent/received & peers Emit metrics tracking the number of bytes sent / received, and number of frames sent / received by the local node. Track the number of discovered peers to record peer discovery rate and current number of known peers per node. --- Cargo.lock | 1 + gossip/Cargo.toml | 1 + gossip/src/builder.rs | 8 ++++-- gossip/src/lib.rs | 1 + gossip/src/metric.rs | 60 +++++++++++++++++++++++++++++++++++++++ gossip/src/peers.rs | 40 ++++++++++++++++++++++---- gossip/src/reactor.rs | 66 ++++++++++++++++++++++++++++++++++--------- gossip/src/seed.rs | 10 +++++-- gossip/tests/smoke.rs | 8 ++++-- 9 files changed, 170 insertions(+), 25 deletions(-) create mode 100644 gossip/src/metric.rs diff --git a/Cargo.lock b/Cargo.lock index 6cc4a061ae..7a2b7b364a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2007,6 +2007,7 @@ dependencies = [ "async-trait", "futures", "hashbrown 0.14.0", + "metric", "prost", "prost-build", "test_helpers", diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 8000dd32f6..36110ca420 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" async-trait = "0.1.68" futures = "0.3.28" hashbrown.workspace = true +metric = { version = "0.1.0", path = "../metric" } prost = "0.11.9" thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] } diff --git a/gossip/src/builder.rs b/gossip/src/builder.rs index 503c8e42ad..35d7ebab83 100644 --- a/gossip/src/builder.rs +++ b/gossip/src/builder.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use tokio::{ net::{ToSocketAddrs, UdpSocket}, sync::mpsc, @@ -10,6 +12,7 @@ use crate::{handle::GossipHandle, reactor::Reactor, Dispatcher}; pub struct Builder { seed_addrs: Vec, dispatcher: T, + metric: Arc, } impl Builder { @@ -18,10 +21,11 @@ impl Builder { /// /// Each address in `seed_addrs` is re-resolved periodically and the first /// resolved IP address is used for peer communication. - pub fn new(seed_addrs: Vec, dispatcher: T) -> Self { + pub fn new(seed_addrs: Vec, dispatcher: T, metric: Arc) -> Self { Self { seed_addrs, dispatcher, + metric, } } } @@ -41,7 +45,7 @@ where let (tx, rx) = mpsc::channel(1000); // Initialise the reactor - let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher); + let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher, &self.metric); let identity = reactor.identity().clone(); // Start the message reactor. diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index c52e50e7e3..66ecad726f 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -38,6 +38,7 @@ mod builder; mod dispatcher; mod handle; +mod metric; mod peers; mod proto; mod reactor; diff --git a/gossip/src/metric.rs b/gossip/src/metric.rs new file mode 100644 index 0000000000..c7ae249458 --- /dev/null +++ b/gossip/src/metric.rs @@ -0,0 +1,60 @@ +//! Metric newtype wrappers for type safety. +//! +//! The metrics are easily confused (they're all counters) so have the compiler +//! check the right ones are being used in the right places. + +use metric::U64Counter; + +#[derive(Debug, Clone)] +pub(crate) struct SentFrames(metric::U64Counter); + +impl SentFrames { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +#[derive(Debug)] +pub(crate) struct ReceivedFrames(metric::U64Counter); + +impl ReceivedFrames { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct SentBytes(metric::U64Counter); + +impl SentBytes { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +#[derive(Debug)] +pub(crate) struct ReceivedBytes(metric::U64Counter); + +impl ReceivedBytes { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +pub(crate) fn new_metrics( + metrics: &metric::Registry, +) -> (SentFrames, ReceivedFrames, SentBytes, ReceivedBytes) { + let metric_frames = metrics.register_metric::( + "gossip_frames", + "number of frames sent/received by this node", + ); + let metric_bytes = metrics + .register_metric::("gossip_bytes", "sum of bytes sent/received by this node"); + + ( + SentFrames(metric_frames.recorder(&[("direction", "sent")])), + ReceivedFrames(metric_frames.recorder(&[("direction", "received")])), + SentBytes(metric_bytes.recorder(&[("direction", "sent")])), + ReceivedBytes(metric_bytes.recorder(&[("direction", "received")])), + ) +} diff --git a/gossip/src/peers.rs b/gossip/src/peers.rs index 6ab7f61607..f4531af87b 100644 --- a/gossip/src/peers.rs +++ b/gossip/src/peers.rs @@ -2,12 +2,16 @@ use std::{io, net::SocketAddr}; use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::{hash_map::RawEntryMut, HashMap}; +use metric::U64Counter; use prost::bytes::Bytes; use tokio::net::UdpSocket; use tracing::{trace, warn}; use uuid::Uuid; -use crate::MAX_FRAME_BYTES; +use crate::{ + metric::{SentBytes, SentFrames}, + MAX_FRAME_BYTES, +}; /// A unique generated identity containing 128 bits of randomness (V4 UUID). #[derive(Debug, Eq, Clone)] @@ -69,7 +73,13 @@ pub(crate) struct Peer { } impl Peer { - pub(crate) async fn send(&self, buf: &[u8], socket: &UdpSocket) -> Result { + pub(crate) async fn send( + &self, + buf: &[u8], + socket: &UdpSocket, + frames_sent: &SentFrames, + bytes_sent: &SentBytes, + ) -> Result { // If the frame is larger than the allowed maximum, then the receiver // will truncate the frame when reading the socket. // @@ -88,6 +98,8 @@ impl Peer { let ret = socket.send_to(buf, self.addr).await; match &ret { Ok(n_bytes) => { + frames_sent.inc(1); + bytes_sent.inc(*n_bytes); trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame") } Err(e) => { @@ -102,14 +114,25 @@ impl Peer { #[derive(Debug, Default)] pub(crate) struct PeerList { list: HashMap, + + /// The number of known, believed-to-be-healthy peers. + metric_peer_count: metric::U64Counter, } impl PeerList { /// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`] /// instances. - pub(crate) fn with_capacity(cap: usize) -> Self { + pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self { + let metric_peer_count = metrics + .register_metric::( + "gossip_known_peers", + "number of likely healthy peers known to this node", + ) + .recorder(&[]); + Self { list: HashMap::with_capacity(cap), + metric_peer_count, } } @@ -123,6 +146,7 @@ impl PeerList { pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer { let p = match self.list.raw_entry_mut().from_key(identity) { RawEntryMut::Vacant(v) => { + self.metric_peer_count.inc(1); v.insert( identity.to_owned(), Peer { @@ -141,10 +165,16 @@ impl PeerList { /// Broadcast `buf` to all known peers over `socket`, returning the number /// of bytes sent in total. - pub(crate) async fn broadcast(&self, buf: &[u8], socket: &UdpSocket) -> usize { + pub(crate) async fn broadcast( + &self, + buf: &[u8], + socket: &UdpSocket, + frames_sent: &SentFrames, + bytes_sent: &SentBytes, + ) -> usize { self.list .values() - .map(|v| v.send(buf, socket)) + .map(|v| v.send(buf, socket, frames_sent, bytes_sent)) .collect::>() .fold(0, |acc, res| async move { match res { diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index 9058398cfb..58c771b4ef 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -8,6 +8,7 @@ use tokio::{ use tracing::{debug, error, info, trace, warn}; use crate::{ + metric::*, peers::{Identity, PeerList}, proto::{self, frame_message::Payload, FrameMessage}, seed::{seed_ping_task, Seed}, @@ -83,14 +84,28 @@ pub(crate) struct Reactor { /// contain less peers than the number of initial seeds. peer_list: PeerList, + /// The UDP socket used for communication with peers. socket: Arc, + + /// The count of frames sent and received. + metric_frames_sent: SentFrames, + metric_frames_received: ReceivedFrames, + + /// The sum of bytes sent and received. + metric_bytes_sent: SentBytes, + metric_bytes_received: ReceivedBytes, } impl Reactor where T: Dispatcher, { - pub(crate) fn new(seed_list: Vec, socket: UdpSocket, dispatch: T) -> Self { + pub(crate) fn new( + seed_list: Vec, + socket: UdpSocket, + dispatch: T, + metrics: &metric::Registry, + ) -> Self { // Generate a unique UUID for this Reactor instance, and cache the wire // representation. let identity = Identity::new(); @@ -117,6 +132,11 @@ where serialisation_buf.clone() }; + // Initialise the various metrics with wrappers to help distinguish + // between the (very similar) counters. + let (metric_frames_sent, metric_frames_received, metric_bytes_sent, metric_bytes_received) = + new_metrics(metrics); + // Spawn a task that periodically pings all known seeds. // // Pinging all seeds announces this node as alive, propagating the @@ -126,6 +146,8 @@ where Arc::clone(&seed_list), Arc::clone(&socket), cached_ping_frame, + metric_frames_sent.clone(), + metric_bytes_sent.clone(), ))); Self { @@ -133,10 +155,14 @@ where identity, cached_frame, serialisation_buf, - peer_list: PeerList::with_capacity(seed_list.len()), + peer_list: PeerList::with_capacity(seed_list.len(), metrics), seed_list, _seed_ping_task: seed_ping_task, socket, + metric_frames_sent, + metric_frames_received, + metric_bytes_sent, + metric_bytes_received, } } @@ -148,10 +174,10 @@ where ); loop { - let (_bytes_read, _bytes_sent) = tokio::select! { + tokio::select! { msg = self.read() => { match msg { - Ok((bytes_read, bytes_sent)) => (bytes_read, bytes_sent), + Ok(()) => {}, Err(Error::NoPayload { peer, addr }) => { warn!(%peer, %addr, "message contains no payload"); continue; @@ -182,7 +208,6 @@ where } Some(Request::GetPeers(tx)) => { let _ = tx.send(self.peer_list.peer_uuids()); - (0, 0) }, Some(Request::Broadcast(payload)) => { // The user is guaranteed MAX_USER_PAYLOAD_BYTES to @@ -196,8 +221,7 @@ where { continue } - let bytes_sent = self.peer_list.broadcast(&self.serialisation_buf, &self.socket).await; - (0, bytes_sent) + self.peer_list.broadcast(&self.serialisation_buf, &self.socket, &self.metric_frames_sent, &self.metric_bytes_sent).await; } } } @@ -212,9 +236,11 @@ where /// returns the result to the sender of the original frame. /// /// Returns the bytes read and bytes sent during execution of this method. - async fn read(&mut self) -> Result<(usize, usize), Error> { + async fn read(&mut self) -> Result<(), Error> { // Read a frame into buf. let (bytes_read, frame, peer_addr) = read_frame(&self.socket).await?; + self.metric_frames_received.inc(1); + self.metric_bytes_received.inc(bytes_read as _); // Read the peer identity from the frame let identity = @@ -228,7 +254,7 @@ where // this node will not be added to the active peer list. if identity == self.identity { debug!(%identity, %peer_addr, bytes_read, "dropping frame from self"); - return Ok((bytes_read, 0)); + return Ok(()); } // Find or create the peer in the peer list. @@ -264,7 +290,7 @@ where // Sometimes no message will be returned to the peer - there's no need // to send an empty frame. if out_messages.is_empty() { - return Ok((bytes_read, 0)); + return Ok(()); } // Serialise the frame into the serialisation buffer. @@ -274,9 +300,15 @@ where &mut self.serialisation_buf, )?; - let bytes_sent = peer.send(&self.serialisation_buf, &self.socket).await?; + peer.send( + &self.serialisation_buf, + &self.socket, + &self.metric_frames_sent, + &self.metric_bytes_sent, + ) + .await?; - Ok((bytes_read, bytes_sent)) + Ok(()) } /// Return the randomised identity assigned to this instance. @@ -351,10 +383,18 @@ fn new_payload(p: Payload) -> proto::FrameMessage { } /// Send a PING message to `socket`, using `peer_name` as logging context. -pub(crate) async fn ping(ping_frame: &[u8], socket: &UdpSocket, addr: SocketAddr) -> usize { +pub(crate) async fn ping( + ping_frame: &[u8], + socket: &UdpSocket, + addr: SocketAddr, + sent_frames: &SentFrames, + sent_bytes: &SentBytes, +) -> usize { match socket.send_to(ping_frame, &addr).await { Ok(n_bytes) => { debug!(addr = %addr, "ping"); + sent_frames.inc(1); + sent_bytes.inc(n_bytes); n_bytes } Err(e) => { diff --git a/gossip/src/seed.rs b/gossip/src/seed.rs index 843a8960ba..97df4f245c 100644 --- a/gossip/src/seed.rs +++ b/gossip/src/seed.rs @@ -7,7 +7,11 @@ use tokio::{ }; use tracing::{debug, warn}; -use crate::{reactor::ping, RESOLVE_TIMEOUT, SEED_PING_INTERVAL}; +use crate::{ + metric::{SentBytes, SentFrames}, + reactor::ping, + RESOLVE_TIMEOUT, SEED_PING_INTERVAL, +}; /// The user-provided seed peer address. /// @@ -63,6 +67,8 @@ pub(super) async fn seed_ping_task( seeds: Arc<[Seed]>, socket: Arc, ping_frame: Vec, + sent_frames: SentFrames, + sent_bytes: SentBytes, ) { let mut interval = tokio::time::interval(SEED_PING_INTERVAL); @@ -77,7 +83,7 @@ pub(super) async fn seed_ping_task( .iter() .map(|seed| async { if let Some(addr) = seed.resolve().await { - ping(&ping_frame, &socket, addr).await + ping(&ping_frame, &socket, addr, &sent_frames, &sent_bytes).await } else { 0 } diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs index c46be5f343..16ff06c469 100644 --- a/gossip/tests/smoke.rs +++ b/gossip/tests/smoke.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use test_helpers::{maybe_start_logging, timeout::FutureTimeout}; use tokio::{net::UdpSocket, sync::mpsc}; @@ -11,6 +11,8 @@ use gossip::*; async fn test_payload_exchange() { maybe_start_logging(); + let metrics = Arc::new(metric::Registry::default()); + // How long to wait for peer discovery to complete. const TIMEOUT: Duration = Duration::from_secs(5); @@ -32,8 +34,8 @@ async fn test_payload_exchange() { // Initialise both reactors let addrs = dbg!(vec![a_addr.to_string(), b_addr.to_string()]); - let a = Builder::new(addrs.clone(), a_tx).build(a_socket); - let b = Builder::new(addrs, b_tx).build(b_socket); + let a = Builder::new(addrs.clone(), a_tx, Arc::clone(&metrics)).build(a_socket); + let b = Builder::new(addrs, b_tx, Arc::clone(&metrics)).build(b_socket); // Wait for peer discovery to occur async {