From 5027c9a88cf8c6249364d9fd3b2323653e217fe2 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 9 Jun 2023 14:09:33 +0200 Subject: [PATCH 01/17] chore: sort workspace members Sort the package names in the workspace member declaration. --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1c6a8e3c82..a92ccd2467 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,9 @@ members = [ "influxdb_tsm", "influxdb2_client", "influxrpc_parser", + "ingester_query_grpc", "ingester_test_ctx", "ingester", - "ingester_query_grpc", "iox_catalog", "iox_data_generator", "iox_query_influxql", @@ -82,8 +82,8 @@ members = [ "trace", "tracker", "trogging", - "wal", "wal_inspect", + "wal", "workspace-hack", ] default-members = ["influxdb_iox"] From 69ab70ce9929e62d0520ef0f63b04bd08e3ec2fd Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 9 Jun 2023 14:10:47 +0200 Subject: [PATCH 02/17] feat: init gossip package Adds a new empty "gossip" package to the workspace. --- Cargo.lock | 4 ++++ Cargo.toml | 1 + gossip/Cargo.toml | 8 ++++++++ gossip/src/lib.rs | 1 + 4 files changed, 14 insertions(+) create mode 100644 gossip/Cargo.toml create mode 100644 gossip/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 8fa9abeece..b25f4c180b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2000,6 +2000,10 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gossip" +version = "0.1.0" + [[package]] name = "grpc-binary-logger" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a92ccd2467..f0788c9590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "flightsql", "garbage_collector", "generated_types", + "gossip", "grpc-binary-logger-proto", "grpc-binary-logger-test-proto", "grpc-binary-logger", diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml new file mode 100644 index 0000000000..ce8e0c474b --- /dev/null +++ b/gossip/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "gossip" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/gossip/src/lib.rs @@ -0,0 +1 @@ + From 5c191ce6cfb75f500a4f6890901508ab1100119a Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 9 Jun 2023 14:42:46 +0200 Subject: [PATCH 03/17] ci: enable standard lint set Adds the somewhat "standard" lint set we use to the gossip lib. --- gossip/src/lib.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 8b13789179..40b434d835 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -1 +1,16 @@ +//! A work-in-progress, simple gossip primitive for metadata distribution +//! between IOx nodes. +#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] +#![warn( + clippy::clone_on_ref_ptr, + clippy::dbg_macro, + clippy::explicit_iter_loop, + clippy::future_not_send, + clippy::todo, + clippy::use_self, + missing_copy_implementations, + missing_debug_implementations, + unused_crate_dependencies, + missing_docs +)] From 48aa4a5e33dbb08ac3d8bd33d04707cc1e7e1123 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 9 Jun 2023 14:44:48 +0200 Subject: [PATCH 04/17] feat(gossip): frame proto definitions Adds a proto definition and configures prost to build the rust types from it. The gossip framing is intended to be flexible and decoupled - the gossip library will batch together one or more opaque application messages and/or control frames, and uniquely identify each peer with a per-instance UUID to detect crashes/restarts and track peers. --- Cargo.lock | 4 ++++ gossip/Cargo.toml | 7 +++++-- gossip/build.rs | 13 +++++++++++++ gossip/proto/gossip.proto | 28 ++++++++++++++++++++++++++++ gossip/src/lib.rs | 2 ++ gossip/src/proto.rs | 3 +++ 6 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 gossip/build.rs create mode 100644 gossip/proto/gossip.proto create mode 100644 gossip/src/proto.rs diff --git a/Cargo.lock b/Cargo.lock index b25f4c180b..918218a2f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2003,6 +2003,10 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "gossip" version = "0.1.0" +dependencies = [ + "prost", + "prost-build", +] [[package]] name = "grpc-binary-logger" diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index ce8e0c474b..7bc69a875d 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -3,6 +3,9 @@ name = "gossip" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] +prost = "0.11.9" + +[build-dependencies] +prost-build = "0.11.9" + diff --git a/gossip/build.rs b/gossip/build.rs new file mode 100644 index 0000000000..abbbe87cfb --- /dev/null +++ b/gossip/build.rs @@ -0,0 +1,13 @@ +use std::{error::Error, path::PathBuf}; + +use prost_build::Config; + +fn main() -> Result<(), Box> { + let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("proto"); + + Config::new() + .bytes(["."]) + .compile_protos(&[root.join("gossip.proto")], &[root])?; + + Ok(()) +} diff --git a/gossip/proto/gossip.proto b/gossip/proto/gossip.proto new file mode 100644 index 0000000000..fb259ff41a --- /dev/null +++ b/gossip/proto/gossip.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; +package influxdata.iox.gossip.v1; +option go_package = "github.com/influxdata/iox/gossip/v1"; + +// The payload of a single gossip datagram. +message Frame { + // Per-instance UUID as raw BE bytes. + bytes identity = 1; + + // One or more user/control frames packed into a single message. + repeated FrameMessage messages = 2; +} + +// A single gossip message within a frame. +message FrameMessage { + // Various user/control message types. + oneof msg { + // User-provided data payload. + UserPayload user_data = 1; + } +} + +// An application payload from the caller of the gossip library. +message UserPayload { + // An opaque user payload - this is handed back to the gossip library user + // unmodified. + bytes payload = 1; +} diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 40b434d835..24fe346224 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -14,3 +14,5 @@ unused_crate_dependencies, missing_docs )] + +mod proto; diff --git a/gossip/src/proto.rs b/gossip/src/proto.rs new file mode 100644 index 0000000000..cb888494fc --- /dev/null +++ b/gossip/src/proto.rs @@ -0,0 +1,3 @@ +//! Proto definitions of gossip message wire types. + +include!(concat!(env!("OUT_DIR"), "/influxdata.iox.gossip.v1.rs")); From bc9ebc9c668591e35602e36753cb19224472b9fb Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 9 Jun 2023 20:16:31 +0200 Subject: [PATCH 05/17] feat: gossip primitive Adds a simple "gossip" implementation (more accurately described as a pub/sub primitive currently) that supports broadcasting application-level messages to the set of active peers. This implementation uses UDP as a transport for best-effort delivery, and enables zero-copy use of the payload using the Bytes crate. Only peers explicitly provided as "seeds" when initialising will be known to a gossip node - there's currently no peer exchange mechanism. This implementation tolerates seeds changing their DNS entries when restarting to point at new socket addresses (such as within Kubernetes when pods move around). --- Cargo.lock | 7 + gossip/Cargo.toml | 8 + gossip/build.rs | 5 + gossip/proto/gossip.proto | 10 +- gossip/src/builder.rs | 60 +++++++ gossip/src/dispatcher.rs | 34 ++++ gossip/src/handle.rs | 61 +++++++ gossip/src/lib.rs | 43 +++++ gossip/src/peers.rs | 201 +++++++++++++++++++++++ gossip/src/reactor.rs | 328 ++++++++++++++++++++++++++++++++++++++ gossip/src/seed.rs | 91 +++++++++++ gossip/tests/smoke.rs | 70 ++++++++ 12 files changed, 916 insertions(+), 2 deletions(-) create mode 100644 gossip/src/builder.rs create mode 100644 gossip/src/dispatcher.rs create mode 100644 gossip/src/handle.rs create mode 100644 gossip/src/peers.rs create mode 100644 gossip/src/reactor.rs create mode 100644 gossip/src/seed.rs create mode 100644 gossip/tests/smoke.rs diff --git a/Cargo.lock b/Cargo.lock index 918218a2f9..7197270c6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2004,8 +2004,15 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" name = "gossip" version = "0.1.0" dependencies = [ + "async-trait", + "futures", + "hashbrown 0.14.0", "prost", "prost-build", + "test_helpers", + "tokio", + "tracing", + "uuid", ] [[package]] diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 7bc69a875d..14b70b9f5a 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -4,8 +4,16 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = "0.1.68" +futures = "0.3.28" +hashbrown.workspace = true prost = "0.11.9" +tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] } +tracing = "0.1.37" +uuid = { version = "1.3.3", features = ["v4"] } [build-dependencies] prost-build = "0.11.9" +[dev-dependencies] +test_helpers = { path = "../test_helpers", features = ["future_timeout"] } diff --git a/gossip/build.rs b/gossip/build.rs index abbbe87cfb..8207b46030 100644 --- a/gossip/build.rs +++ b/gossip/build.rs @@ -5,6 +5,11 @@ use prost_build::Config; fn main() -> Result<(), Box> { let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("proto"); + println!( + "cargo:rerun-if-changed={}", + root.join("gossip.proto").display() + ); + Config::new() .bytes(["."]) .compile_protos(&[root.join("gossip.proto")], &[root])?; diff --git a/gossip/proto/gossip.proto b/gossip/proto/gossip.proto index fb259ff41a..5af2c4814e 100644 --- a/gossip/proto/gossip.proto +++ b/gossip/proto/gossip.proto @@ -14,12 +14,18 @@ message Frame { // A single gossip message within a frame. message FrameMessage { // Various user/control message types. - oneof msg { + oneof payload { + Ping ping = 1; + Pong pong = 2; + // User-provided data payload. - UserPayload user_data = 1; + UserPayload user_data = 3; } } +message Ping {} +message Pong {} + // An application payload from the caller of the gossip library. message UserPayload { // An opaque user payload - this is handed back to the gossip library user diff --git a/gossip/src/builder.rs b/gossip/src/builder.rs new file mode 100644 index 0000000000..503c8e42ad --- /dev/null +++ b/gossip/src/builder.rs @@ -0,0 +1,60 @@ +use tokio::{ + net::{ToSocketAddrs, UdpSocket}, + sync::mpsc, +}; + +use crate::{handle::GossipHandle, reactor::Reactor, Dispatcher}; + +/// Gossip subsystem configuration and initialisation. +#[derive(Debug)] +pub struct Builder { + seed_addrs: Vec, + dispatcher: T, +} + +impl Builder { + /// Use `seed_addrs` as seed peer addresses, and dispatch any application + /// messages to `dispatcher`. + /// + /// Each address in `seed_addrs` is re-resolved periodically and the first + /// resolved IP address is used for peer communication. + pub fn new(seed_addrs: Vec, dispatcher: T) -> Self { + Self { + seed_addrs, + dispatcher, + } + } +} + +impl Builder +where + T: Dispatcher + 'static, +{ + /// Initialise the gossip subsystem using `socket` for communication. + /// + /// # Panics + /// + /// This call spawns a tokio task, and as such must be called from within a + /// tokio runtime. + pub fn build(self, socket: UdpSocket) -> GossipHandle { + // Obtain a channel to communicate between the actor, and all handles + let (tx, rx) = mpsc::channel(1000); + + // Initialise the reactor + let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher); + let identity = reactor.identity().clone(); + + // Start the message reactor. + tokio::spawn(reactor.run(rx)); + + GossipHandle::new(tx, identity) + } + + /// Bind to the provided socket address and initialise the gossip subsystem. + pub async fn bind(self, bind_addr: A) -> Result + where + A: ToSocketAddrs + Send, + { + Ok(self.build(UdpSocket::bind(bind_addr).await?)) + } +} diff --git a/gossip/src/dispatcher.rs b/gossip/src/dispatcher.rs new file mode 100644 index 0000000000..b6aa951679 --- /dev/null +++ b/gossip/src/dispatcher.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use tracing::warn; + +// Re-export the bytes type to ensure upstream users of this crate are +// interacting with the same type. +pub use prost::bytes::Bytes; + +/// A delegate abstraction through which the gossip subsystem propagates +/// application-level messages received from other peers. +#[async_trait] +pub trait Dispatcher: Send + Sync { + /// Invoked when an application-level payload is received from a peer. + /// + /// This call should not block / should complete quickly to avoid blocking + /// the gossip reactor loop - if a long-running job must be started within + /// this call, consider spawning a separate task. + async fn dispatch(&self, payload: Bytes); +} + +#[async_trait] +impl Dispatcher for tokio::sync::mpsc::Sender { + async fn dispatch(&self, payload: Bytes) { + if let Err(e) = self.send(payload).await { + warn!(error=%e, "error dispatching payload to application handler"); + } + } +} + +// A no-op dispatcher. +#[cfg(test)] +#[async_trait::async_trait] +impl Dispatcher for () { + async fn dispatch(&self, _payload: crate::Bytes) {} +} diff --git a/gossip/src/handle.rs b/gossip/src/handle.rs new file mode 100644 index 0000000000..075966a35c --- /dev/null +++ b/gossip/src/handle.rs @@ -0,0 +1,61 @@ +use crate::Bytes; +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use crate::peers::Identity; + +/// Requests sent to the [`Reactor`] actor task. +/// +/// [`Reactor`]: crate::reactor::Reactor +#[derive(Debug)] +pub(crate) enum Request { + /// Broadcast the given payload to all known peers. + Broadcast(Bytes), + + /// Get a snapshot of the peer identities. + GetPeers(oneshot::Sender>), +} + +/// A handle to the gossip subsystem. +/// +/// All resources used by the gossip system will be released once this +/// [`GossipHandle`] is dropped. To share the handle, wrap it in an [`Arc`]. +/// +/// [`Arc`]: std::sync::Arc +#[derive(Debug)] +pub struct GossipHandle { + tx: mpsc::Sender, + identity: Identity, +} + +impl GossipHandle { + pub(crate) fn new(tx: mpsc::Sender, identity: Identity) -> Self { + Self { tx, identity } + } + + /// Return the randomly generated identity of this gossip instance. + pub fn identity(&self) -> Uuid { + *self.identity + } + + /// Broadcast `payload` to all known peers. + /// + /// This is a best-effort operation - peers are not guaranteed to receive + /// this broadcast. + pub async fn broadcast(&self, payload: T) + where + T: Into + Send, + { + self.tx + .send(Request::Broadcast(payload.into())) + .await + .unwrap() + } + + /// Retrieve a snapshot of the connected peer list. + pub async fn get_peers(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + self.tx.send(Request::GetPeers(tx)).await.unwrap(); + rx.await.unwrap() + } +} diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 24fe346224..30a4994c22 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -1,5 +1,25 @@ //! A work-in-progress, simple gossip primitive for metadata distribution //! between IOx nodes. +//! +//! # Transport +//! +//! Prefer small payloads where possible, and expect loss of some messages - +//! this primitive provides *best effort* delivery. +//! +//! This implementation sends unicast UDP frames between peers, with support for +//! both control frames & user payloads. The maximum message size is 65,507 +//! bytes, but a packet this large is fragmented into smaller (at most +//! MTU-sized) packets and is at greater risk of being dropped due to a lost +//! fragment. +//! +//! # Security +//! +//! Messages exchanged between peers are unauthenticated and connectionless - +//! it's trivial to forge a message appearing to come from a different peer, or +//! include malicious payloads. +//! +//! The security model of this implementation expects the peers to be running in +//! a trusted environment, secure from malicious users. #![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] #![warn( @@ -15,4 +35,27 @@ missing_docs )] +mod builder; +mod dispatcher; +mod handle; +mod peers; mod proto; +mod reactor; +pub(crate) mod seed; + +use std::time::Duration; + +/// Work around the unused_crate_dependencies false positives for test deps. +#[cfg(test)] +use test_helpers as _; + +pub use builder::*; +pub use dispatcher::*; +pub use handle::*; + +/// The maximum duration of time allotted to performing a DNS resolution against +/// a seed/peer address. +const RESOLVE_TIMEOUT: Duration = Duration::from_secs(5); + +/// Defines the interval between PING frames sent to all configured seed peers. +const SEED_PING_INTERVAL: std::time::Duration = Duration::from_secs(15); diff --git a/gossip/src/peers.rs b/gossip/src/peers.rs new file mode 100644 index 0000000000..a40d3bb3eb --- /dev/null +++ b/gossip/src/peers.rs @@ -0,0 +1,201 @@ +use std::net::SocketAddr; + +use futures::{stream::FuturesUnordered, StreamExt}; +use hashbrown::{hash_map::RawEntryMut, HashMap}; +use prost::bytes::Bytes; +use tokio::net::UdpSocket; +use tracing::{trace, warn}; +use uuid::Uuid; + +/// A unique generated identity containing 128 bits of randomness (V4 UUID). +#[derive(Debug, Eq, Clone)] +pub(crate) struct Identity(Bytes, Uuid); + +impl std::ops::Deref for Identity { + type Target = Uuid; + + fn deref(&self) -> &Self::Target { + &self.1 + } +} + +impl std::hash::Hash for Identity { + fn hash(&self, state: &mut H) { + self.0.hash(state); + } +} + +impl PartialEq for Identity { + fn eq(&self, other: &Self) -> bool { + debug_assert!((self.1 == other.1) == (self.0 == other.0)); + self.0 == other.0 + } +} + +impl std::fmt::Display for Identity { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.1.fmt(f) + } +} + +impl TryFrom for Identity { + type Error = uuid::Error; + + fn try_from(value: Bytes) -> Result { + let uuid = Uuid::from_slice(&value)?; + Ok(Self(value, uuid)) + } +} + +impl Identity { + /// Generate a new random identity. + pub(crate) fn new() -> Self { + let id = Uuid::new_v4(); + Self(Bytes::from(id.as_bytes().to_vec()), id) + } + + pub(crate) fn as_bytes(&self) -> &Bytes { + &self.0 + } +} + +/// A discovered peer within the gossip cluster. +#[derive(Debug, Clone)] +pub(crate) struct Peer { + identity: Identity, + addr: SocketAddr, +} + +impl Peer { + pub(crate) async fn send( + &self, + buf: &[u8], + socket: &UdpSocket, + ) -> Result { + let ret = socket.send_to(buf, self.addr).await; + match &ret { + Ok(n_bytes) => { + trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame") + } + Err(e) => { + warn!(error=%e, identity=%self.identity, peer_addr=%self.addr, "frame send error") + } + } + ret + } +} + +/// The set of currently active/known peers. +#[derive(Debug, Default)] +pub(crate) struct PeerList { + list: HashMap, +} + +impl PeerList { + /// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`] + /// instances. + pub(crate) fn with_capacity(cap: usize) -> Self { + Self { + list: HashMap::with_capacity(cap), + } + } + + /// Return the UUIDs of all known peers. + pub(crate) fn peer_uuids(&self) -> Vec { + self.list.keys().map(|v| **v).collect() + } + + /// Upsert a peer identified by `identity` to the peer list, associating it + /// with the provided `peer_addr`. + pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer { + let p = match self.list.raw_entry_mut().from_key(identity) { + RawEntryMut::Vacant(v) => { + v.insert( + identity.to_owned(), + Peer { + addr: peer_addr, + identity: identity.to_owned(), + }, + ) + .1 + } + RawEntryMut::Occupied(v) => v.into_mut(), + }; + + p.addr = peer_addr; + p + } + + /// Broadcast `buf` to all known peers over `socket`, returning the number + /// of bytes sent in total. + pub(crate) async fn broadcast(&self, buf: &[u8], socket: &UdpSocket) -> usize { + self.list + .values() + .map(|v| v.send(buf, socket)) + .collect::>() + .fold(0, |acc, res| async move { + match res { + Ok(n) => acc + n, + Err(_) => acc, + } + }) + .await + } +} + +#[cfg(test)] +mod tests { + use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + }; + + use super::*; + + #[test] + fn test_identity_round_trip() { + let a = Identity::new(); + + let encoded = a.as_bytes().to_owned(); + let decoded = Identity::try_from(encoded).unwrap(); + + assert_eq!(decoded, a); + } + + #[test] + fn test_identity_length_mismatch() { + let v = Bytes::from_static(&[42, 42, 42, 42]); + let _ = Identity::try_from(v).expect_err("short ID should fail"); + + let v = Bytes::from_static(&[ + 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, + ]); + let _ = Identity::try_from(v).expect_err("short ID should fail"); + } + + #[test] + fn test_identity_eq() { + let v = Identity::new(); + assert_eq!(v.clone(), v); + assert_eq!(hash_identity(&v), hash_identity(&v)); + + let other = Identity::new(); + assert_ne!(v, other); + assert_ne!(hash_identity(&other), hash_identity(&v)); + } + + #[test] + fn test_identity_display() { + let v = Identity::new(); + let text = v.to_string(); + + let uuid = Uuid::try_parse(&text).expect("display impl should output valid uuids"); + assert_eq!(*v, uuid); + } + + fn hash_identity(v: &Identity) -> u64 { + let mut h = DefaultHasher::default(); + v.hash(&mut h); + h.finish() + } +} diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs new file mode 100644 index 0000000000..f70239745e --- /dev/null +++ b/gossip/src/reactor.rs @@ -0,0 +1,328 @@ +use std::{net::SocketAddr, sync::Arc}; + +use prost::{bytes::BytesMut, Message}; +use tokio::{ + net::UdpSocket, + sync::mpsc::{self}, +}; +use tracing::{debug, error, info, trace, warn}; + +use crate::{ + peers::{Identity, PeerList}, + proto::{self, frame_message::Payload, FrameMessage}, + seed::{seed_ping_task, Seed}, + Dispatcher, Request, +}; + +#[derive(Debug)] +enum Error { + NoPayload { + peer: Identity, + addr: SocketAddr, + }, + + Deserialise { + addr: SocketAddr, + source: prost::DecodeError, + }, + + Identity { + addr: SocketAddr, + }, + + Io(std::io::Error), +} + +impl From for Error { + fn from(value: std::io::Error) -> Self { + Self::Io(value) + } +} + +#[derive(Debug)] +struct AbortOnDrop(tokio::task::JoinHandle<()>); +impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort() + } +} + +/// An event loop for gossip frames processing. +/// +/// This actor task is responsible for driving peer discovery, managing the set +/// of known peers and exchanging gossip frames between peers. +/// +/// A user interacts with a [`Reactor`] through a [`GossipHandle`]. +/// +/// [`GossipHandle`]: crate::GossipHandle +#[derive(Debug)] +pub(crate) struct Reactor { + dispatch: T, + + /// The random identity of this gossip instance. + identity: Identity, + + /// A cached wire frame, used to generate outgoing messages. + cached_frame: proto::Frame, + /// A re-used buffer for serialising outgoing messages into. + serialisation_buf: Vec, + + /// The immutable list of seed addresses provided by the user, periodically + /// pinged. + seed_list: Arc<[Seed]>, + /// A task that periodically sends PING frames to all seeds, executing in a + /// separate task so that DNS resolution does not block the reactor loop. + _seed_ping_task: AbortOnDrop, + + /// The set of active peers this node has communicated with and believes to + /// be (recently) healthy. + /// + /// Depending on the perceived availability of the seed nodes, this may + /// contain less peers than the number of initial seeds. + peer_list: PeerList, + + socket: Arc, +} + +impl Reactor +where + T: Dispatcher, +{ + pub(crate) fn new(seed_list: Vec, socket: UdpSocket, dispatch: T) -> Self { + // Generate a unique UUID for this Reactor instance, and cache the wire + // representation. + let identity = Identity::new(); + + let seed_list = seed_list.into_iter().map(Seed::new).collect(); + let socket = Arc::new(socket); + + // Generate a pre-populated frame header. + let mut cached_frame = proto::Frame { + identity: identity.as_bytes().clone(), + messages: Vec::with_capacity(1), + }; + + // A ping frame is static over the lifetime of a Reactor instance, so it + // can be pre-serialised, cached, and reused for every ping. + let cached_ping_frame = { + populate_frame( + &mut cached_frame, + vec![new_payload(Payload::Ping(proto::Ping {}))], + ); + cached_frame.encode_to_vec() + }; + + // Spawn a task that periodically pings all known seeds. + // + // Pinging all seeds announces this node as alive, propagating the + // instance UUID, and requesting PONG responses to drive population of + // the active peer list. + let seed_ping_task = AbortOnDrop(tokio::spawn(seed_ping_task( + Arc::clone(&seed_list), + Arc::clone(&socket), + cached_ping_frame, + ))); + + Self { + dispatch, + identity, + cached_frame, + serialisation_buf: Vec::with_capacity(1024), + peer_list: PeerList::with_capacity(seed_list.len()), + seed_list, + _seed_ping_task: seed_ping_task, + socket, + } + } + + pub(crate) async fn run(mut self, mut rx: mpsc::Receiver) { + info!( + identity = %self.identity, + seed_list = ?self.seed_list, + "gossip reactor started", + ); + + loop { + let (_bytes_read, _bytes_sent) = tokio::select! { + msg = self.read() => { + match msg { + Ok((bytes_read, bytes_sent)) => (bytes_read, bytes_sent), + Err(Error::NoPayload { peer, addr }) => { + warn!(%peer, %addr, "message contains no payload"); + continue; + } + Err(Error::Deserialise { addr, source }) => { + warn!(error=%source, %addr, "error deserialising frame"); + continue; + } + Err(Error::Identity { addr }) => { + warn!(%addr, "invalid identity value in frame"); + continue; + } + Err(Error::Io(error)) => { + error!(%error, "i/o error"); + continue; + } + } + } + op = rx.recv() => { + match op { + None => { + info!("stopping gossip reactor"); + return; + } + Some(Request::GetPeers(tx)) => { + let _ = tx.send(self.peer_list.peer_uuids()); + (0, 0) + }, + Some(Request::Broadcast(payload)) => { + populate_frame(&mut self.cached_frame, [ + new_payload(Payload::UserData(proto::UserPayload{payload})), + ]); + self.cached_frame + .encode(&mut self.serialisation_buf) + .expect("buffer should grow"); + let bytes_sent = self.peer_list.broadcast(&self.serialisation_buf, &self.socket).await; + (0, bytes_sent) + } + } + } + }; + } + } + + /// Read a gossip frame from the socket and potentially respond. + /// + /// This method waits for a frame to be made available by the OS, enumerates + /// the contents, batches any responses to those frames and if non-empty, + /// returns the result to the sender of the original frame. + /// + /// Returns the bytes read and bytes sent during execution of this method. + async fn read(&mut self) -> Result<(usize, usize), Error> { + // Read a frame into buf. + let (bytes_read, frame, peer_addr) = read_frame(&self.socket).await?; + + // Read the peer identity from the frame + let identity = + Identity::try_from(frame.identity).map_err(|_| Error::Identity { addr: peer_addr })?; + + // Don't process messages from this node. + // + // It's expected that all N servers will be included in a peer list, + // rather than the N-1 peers to this node. By dropping messages from + // this node, pings sent by this node will go unprocessed and therefore + // this node will not be added to the active peer list. + if identity == self.identity { + debug!(%identity, %peer_addr, bytes_read, "dropping frame from self"); + return Ok((bytes_read, 0)); + } + + // Find or create the peer in the peer list. + let peer = self.peer_list.upsert(&identity, peer_addr); + + let mut out_messages = Vec::with_capacity(1); + for msg in frame.messages { + // Extract the payload from the frame message + let payload = msg.payload.ok_or_else(|| Error::NoPayload { + peer: identity.clone(), + addr: peer_addr, + })?; + + // Handle the frame message from the peer, optionally returning a + // response frame. + let response = match payload { + Payload::Ping(_) => Some(Payload::Pong(proto::Pong {})), + Payload::Pong(_) => { + debug!(%identity, %peer_addr, "pong"); + None + } + Payload::UserData(data) => { + self.dispatch.dispatch(data.payload).await; + None + } + }; + + if let Some(payload) = response { + out_messages.push(new_payload(payload)); + } + } + + // Sometimes no message will be returned to the peer - there's no need + // to send an empty frame. + if out_messages.is_empty() { + return Ok((bytes_read, 0)); + } + + populate_frame(&mut self.cached_frame, out_messages); + + // Serialise the frame into the serialisation buffer. + self.cached_frame + .encode(&mut self.serialisation_buf) + .expect("buffer should grow"); + + let bytes_sent = peer.send(&self.serialisation_buf, &self.socket).await?; + + Ok((bytes_read, bytes_sent)) + } + + /// Return the randomised identity assigned to this instance. + pub(crate) fn identity(&self) -> &Identity { + &self.identity + } +} + +/// Wait for a UDP datagram to become ready, and read it entirely into `buf`. +async fn recv(socket: &UdpSocket, buf: &mut BytesMut) -> (usize, SocketAddr) { + let (n_bytes, addr) = socket.recv_buf_from(buf).await.expect("frame buffer maxed"); + trace!(%addr, n_bytes, "socket read"); + (n_bytes, addr) +} + +/// Wait for a UDP datagram to arrive, and decode it into a gossip Frame. +/// +/// Clears the contents of `buf` before reading the frame. +async fn read_frame(socket: &UdpSocket) -> Result<(usize, proto::Frame, SocketAddr), Error> { + // Allocate a buffer to hold a full-sized packet. + let mut buf = BytesMut::with_capacity(1024); + + let (n_bytes, addr) = recv(socket, &mut buf).await; + + // Decode the frame, re-using byte arrays from the underlying buffer. + match proto::Frame::decode(buf.freeze()) { + Ok(frame) => { + debug!(?frame, %addr, n_bytes, "read frame"); + Ok((n_bytes, frame, addr)) + } + Err(e) => Err(Error::Deserialise { addr, source: e }), + } +} + +/// Given a pre-allocated `frame`, clear and populate it with the provided +/// `payload` containing a set of [`FrameMessage`]. +fn populate_frame(frame: &mut proto::Frame, payload: impl IntoIterator) { + frame.messages.clear(); + frame.messages.extend(payload.into_iter()) +} + +/// Instantiate a new [`FrameMessage`] from the given [`Payload`]. +fn new_payload(p: Payload) -> proto::FrameMessage { + proto::FrameMessage { payload: Some(p) } +} + +/// Send a PING message to `socket`, using `peer_name` as logging context. +pub(crate) async fn ping(ping_frame: &[u8], socket: &UdpSocket, addr: SocketAddr) -> usize { + match socket.send_to(ping_frame, &addr).await { + Ok(n_bytes) => { + debug!(addr = %addr, "ping"); + n_bytes + } + Err(e) => { + warn!( + error=%e, + addr = %addr, + "ping failed" + ); + 0 + } + } +} diff --git a/gossip/src/seed.rs b/gossip/src/seed.rs new file mode 100644 index 0000000000..843a8960ba --- /dev/null +++ b/gossip/src/seed.rs @@ -0,0 +1,91 @@ +use std::{future, net::SocketAddr, sync::Arc}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use tokio::{ + net::{self, UdpSocket}, + time::{timeout, MissedTickBehavior}, +}; +use tracing::{debug, warn}; + +use crate::{reactor::ping, RESOLVE_TIMEOUT, SEED_PING_INTERVAL}; + +/// The user-provided seed peer address. +/// +/// NOTE: the IP/socket address this resolves to may change over the +/// lifetime of the peer, so the raw address is retained instead of +/// the [`SocketAddr`] to ensure it is constantly re-resolved when the peer +/// is unreachable. +#[derive(Debug)] +pub(crate) struct Seed(String); + +impl Seed { + pub(crate) fn new(addr: String) -> Self { + Self(addr) + } + + /// Resolve this peer address, returning an error if resolution is not + /// complete within [`RESOLVE_TIMEOUT`]. + pub(crate) async fn resolve(&self) -> Option { + match timeout(RESOLVE_TIMEOUT, resolve(&self.0)).await { + Ok(v) => v, + Err(_) => { + warn!(addr = %self.0, "timeout resolving seed address"); + None + } + } + } +} + +/// Resolve `addr`, returning the first IP address, if any. +async fn resolve(addr: &str) -> Option { + match net::lookup_host(addr).await.map(|mut v| v.next()) { + Ok(Some(v)) => { + debug!(%addr, peer=%v, "resolved peer address"); + Some(v) + } + Ok(None) => { + warn!(%addr, "resolved peer address contains no IPs"); + None + } + Err(e) => { + warn!(%addr, error=%e, "failed to resolve peer address"); + None + } + } +} + +/// Block forever, sending `ping_frame` over `socket` to all the entries in +/// `seeds`. +/// +/// This method immediately pings all the seeds, and then pings periodically at +/// [`SEED_PING_INTERVAL`]. +pub(super) async fn seed_ping_task( + seeds: Arc<[Seed]>, + socket: Arc, + ping_frame: Vec, +) { + let mut interval = tokio::time::interval(SEED_PING_INTERVAL); + + // Do not burden seeds with faster PING frames to catch up this timer. + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + // Start the ping loop, with the first iteration starting immediately. + loop { + interval.tick().await; + + let bytes_sent = seeds + .iter() + .map(|seed| async { + if let Some(addr) = seed.resolve().await { + ping(&ping_frame, &socket, addr).await + } else { + 0 + } + }) + .collect::>() + .fold(0, |acc, x| future::ready(acc + x)) + .await; + + debug!(bytes_sent, "seed ping sweep complete"); + } +} diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs new file mode 100644 index 0000000000..554c282898 --- /dev/null +++ b/gossip/tests/smoke.rs @@ -0,0 +1,70 @@ +use std::time::Duration; + +use test_helpers::{maybe_start_logging, timeout::FutureTimeout}; +use tokio::{net::UdpSocket, sync::mpsc}; + +use gossip::*; + +/// Assert that starting up a reactor performs the initial peer discovery +/// from a set of seeds, resulting in both peers known of one another. +#[tokio::test] +async fn test_payload_exchange() { + maybe_start_logging(); + + // How long to wait for peer discovery to complete. + const TIMEOUT: Duration = Duration::from_secs(5); + + // Bind a UDP socket to a random port + let a_socket = UdpSocket::bind("127.0.0.1:0") + .await + .expect("failed to bind UDP socket"); + let a_addr = a_socket.local_addr().expect("failed to read local addr"); + + // And a socket for the second reactor + let b_socket = UdpSocket::bind("127.0.0.1:0") + .await + .expect("failed to bind UDP socket"); + let b_addr = b_socket.local_addr().expect("failed to read local addr"); + + // Initialise the dispatchers for the reactors + let (a_tx, mut a_rx) = mpsc::channel(5); + let (b_tx, mut b_rx) = mpsc::channel(5); + + // Initialise both reactors + let addrs = dbg!(vec![a_addr.to_string(), b_addr.to_string()]); + let a = Builder::new(addrs.clone(), a_tx).build(a_socket); + let b = Builder::new(addrs, b_tx).build(b_socket); + + // Wait for peer discovery to occur + async { + loop { + if a.get_peers().await.len() == 1 { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + // Send the payload through peer A + let a_payload = Bytes::from_static(b"bananas"); + a.broadcast(a_payload.clone()).await; + + // Assert it was received by peer B + let got = b_rx + .recv() + .with_timeout_panic(TIMEOUT) + .await + .expect("reactor stopped"); + assert_eq!(got, a_payload); + + // Do the reverse - send from B to A + let b_payload = Bytes::from_static(b"platanos"); + b.broadcast(b_payload.clone()).await; + let got = a_rx + .recv() + .with_timeout_panic(TIMEOUT) + .await + .expect("reactor stopped"); + assert_eq!(got, b_payload); +} From 93789d7abb3ac4cf568cd7e662ad7edc82ec21da Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 14 Jun 2023 01:13:31 +0200 Subject: [PATCH 06/17] feat: refuse oversized gossip payloads Calculate the available byte size for a user payload sent via gossip, and pro-actively check this limit earlier, when the caller is attempting to send the frame, rather than later in the reactor where there's no feedback to the caller. DRY frame serialisation to simplify enforcement, and validate/refuse oversized frames in the reactor so that frames are unlikely to be truncated by receivers. --- Cargo.lock | 1 + gossip/Cargo.toml | 1 + gossip/src/handle.rs | 24 ++++++--- gossip/src/lib.rs | 41 +++++++++++++-- gossip/src/peers.rs | 25 ++++++--- gossip/src/reactor.rs | 120 ++++++++++++++++++++++++++++++++++-------- gossip/tests/smoke.rs | 4 +- 7 files changed, 178 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7197270c6d..6cc4a061ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2010,6 +2010,7 @@ dependencies = [ "prost", "prost-build", "test_helpers", + "thiserror", "tokio", "tracing", "uuid", diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 14b70b9f5a..8000dd32f6 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -8,6 +8,7 @@ async-trait = "0.1.68" futures = "0.3.28" hashbrown.workspace = true prost = "0.11.9" +thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] } tracing = "0.1.37" uuid = { version = "1.3.3", features = ["v4"] } diff --git a/gossip/src/handle.rs b/gossip/src/handle.rs index 075966a35c..6a81acdea4 100644 --- a/gossip/src/handle.rs +++ b/gossip/src/handle.rs @@ -1,9 +1,17 @@ -use crate::Bytes; +use crate::{Bytes, MAX_USER_PAYLOAD_BYTES}; +use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; use crate::peers::Identity; +/// An error indicating a send was attempted with a payload that exceeds +/// [`MAX_USER_PAYLOAD_BYTES`]. +#[derive(Error, Debug)] +#[error("max allowed payload size exceeded")] +#[allow(missing_copy_implementations)] +pub struct PayloadSizeError {} + /// Requests sent to the [`Reactor`] actor task. /// /// [`Reactor`]: crate::reactor::Reactor @@ -42,14 +50,18 @@ impl GossipHandle { /// /// This is a best-effort operation - peers are not guaranteed to receive /// this broadcast. - pub async fn broadcast(&self, payload: T) + pub async fn broadcast(&self, payload: T) -> Result<(), PayloadSizeError> where T: Into + Send, { - self.tx - .send(Request::Broadcast(payload.into())) - .await - .unwrap() + let payload = payload.into(); + if payload.len() > MAX_USER_PAYLOAD_BYTES { + return Err(PayloadSizeError {}); + } + + self.tx.send(Request::Broadcast(payload)).await.unwrap(); + + Ok(()) } /// Retrieve a snapshot of the connected peer list. diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 30a4994c22..c52e50e7e3 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -8,9 +8,9 @@ //! //! This implementation sends unicast UDP frames between peers, with support for //! both control frames & user payloads. The maximum message size is 65,507 -//! bytes, but a packet this large is fragmented into smaller (at most -//! MTU-sized) packets and is at greater risk of being dropped due to a lost -//! fragment. +//! bytes ([`MAX_USER_PAYLOAD_BYTES`] for application-level payloads), but a +//! packet this large is fragmented into smaller (at most MTU-sized) packets and +//! is at greater risk of being dropped due to a lost fragment. //! //! # Security //! @@ -59,3 +59,38 @@ const RESOLVE_TIMEOUT: Duration = Duration::from_secs(5); /// Defines the interval between PING frames sent to all configured seed peers. const SEED_PING_INTERVAL: std::time::Duration = Duration::from_secs(15); + +/// The maximum payload size allowed. +/// +/// Attempting to send a serialised packet (inclusive of control frames/fields) +/// in excess of this amount will result in an error. +const MAX_FRAME_BYTES: usize = 1024 * 10; + +/// The frame header overhead for user payloads. +const USER_PAYLOAD_OVERHEAD: usize = 22; + +/// The maximum allowed byte size of user payloads. +/// +/// Sending payloads of this size is discouraged as it leads to fragmentation of +/// the message and increases the chance of the message being undelivered / +/// dropped. Smaller is always better for UDP transports! +pub const MAX_USER_PAYLOAD_BYTES: usize = MAX_FRAME_BYTES - USER_PAYLOAD_OVERHEAD; + +#[cfg(test)] +#[allow(clippy::assertions_on_constants)] +mod tests { + use super::*; + + #[test] + fn test_max_msg_size() { + assert!(MAX_FRAME_BYTES < 65_536, "cannot exceed UDP maximum"); + } + + #[test] + fn test_max_user_payload_size() { + assert_eq!( + MAX_USER_PAYLOAD_BYTES, 10_218, + "applications may depend on this value not changing" + ); + } +} diff --git a/gossip/src/peers.rs b/gossip/src/peers.rs index a40d3bb3eb..6ab7f61607 100644 --- a/gossip/src/peers.rs +++ b/gossip/src/peers.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{io, net::SocketAddr}; use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::{hash_map::RawEntryMut, HashMap}; @@ -7,6 +7,8 @@ use tokio::net::UdpSocket; use tracing::{trace, warn}; use uuid::Uuid; +use crate::MAX_FRAME_BYTES; + /// A unique generated identity containing 128 bits of randomness (V4 UUID). #[derive(Debug, Eq, Clone)] pub(crate) struct Identity(Bytes, Uuid); @@ -67,11 +69,22 @@ pub(crate) struct Peer { } impl Peer { - pub(crate) async fn send( - &self, - buf: &[u8], - socket: &UdpSocket, - ) -> Result { + pub(crate) async fn send(&self, buf: &[u8], socket: &UdpSocket) -> Result { + // If the frame is larger than the allowed maximum, then the receiver + // will truncate the frame when reading the socket. + // + // Never send frames that will be unprocessable. + if buf.len() > MAX_FRAME_BYTES { + warn!( + n_bytes = buf.len(), + "not sending oversized packet - receiver would truncate" + ); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "max frame size exceeded", + )); + } + let ret = socket.send_to(buf, self.addr).await; match &ret { Ok(n_bytes) => { diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index f70239745e..9058398cfb 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -11,7 +11,7 @@ use crate::{ peers::{Identity, PeerList}, proto::{self, frame_message::Payload, FrameMessage}, seed::{seed_ping_task, Seed}, - Dispatcher, Request, + Dispatcher, Request, MAX_FRAME_BYTES, }; #[derive(Debug)] @@ -31,6 +31,8 @@ enum Error { }, Io(std::io::Error), + + MaxSize(usize), } impl From for Error { @@ -95,6 +97,7 @@ where let seed_list = seed_list.into_iter().map(Seed::new).collect(); let socket = Arc::new(socket); + let mut serialisation_buf = Vec::with_capacity(1024); // Generate a pre-populated frame header. let mut cached_frame = proto::Frame { @@ -108,8 +111,10 @@ where populate_frame( &mut cached_frame, vec![new_payload(Payload::Ping(proto::Ping {}))], - ); - cached_frame.encode_to_vec() + &mut serialisation_buf, + ) + .unwrap(); + serialisation_buf.clone() }; // Spawn a task that periodically pings all known seeds. @@ -127,7 +132,7 @@ where dispatch, identity, cached_frame, - serialisation_buf: Vec::with_capacity(1024), + serialisation_buf, peer_list: PeerList::with_capacity(seed_list.len()), seed_list, _seed_ping_task: seed_ping_task, @@ -163,6 +168,10 @@ where error!(%error, "i/o error"); continue; } + Err(Error::MaxSize(_)) => { + // Logged at source + continue; + } } } op = rx.recv() => { @@ -176,12 +185,17 @@ where (0, 0) }, Some(Request::Broadcast(payload)) => { - populate_frame(&mut self.cached_frame, [ - new_payload(Payload::UserData(proto::UserPayload{payload})), - ]); - self.cached_frame - .encode(&mut self.serialisation_buf) - .expect("buffer should grow"); + // The user is guaranteed MAX_USER_PAYLOAD_BYTES to + // be send-able, so send this frame without packing + // others with it for simplicity. + if populate_frame( + &mut self.cached_frame, + vec![new_payload(Payload::UserData(proto::UserPayload{payload}))], + &mut self.serialisation_buf + ).is_err() + { + continue + } let bytes_sent = self.peer_list.broadcast(&self.serialisation_buf, &self.socket).await; (0, bytes_sent) } @@ -253,12 +267,12 @@ where return Ok((bytes_read, 0)); } - populate_frame(&mut self.cached_frame, out_messages); - // Serialise the frame into the serialisation buffer. - self.cached_frame - .encode(&mut self.serialisation_buf) - .expect("buffer should grow"); + populate_frame( + &mut self.cached_frame, + out_messages, + &mut self.serialisation_buf, + )?; let bytes_sent = peer.send(&self.serialisation_buf, &self.socket).await?; @@ -282,8 +296,12 @@ async fn recv(socket: &UdpSocket, buf: &mut BytesMut) -> (usize, SocketAddr) { /// /// Clears the contents of `buf` before reading the frame. async fn read_frame(socket: &UdpSocket) -> Result<(usize, proto::Frame, SocketAddr), Error> { - // Allocate a buffer to hold a full-sized packet. - let mut buf = BytesMut::with_capacity(1024); + // Pre-allocate a buffer large enough to hold the maximum message size. + // + // Reading data from a UDP socket silently truncates if there's not enough + // buffer space to write the full packet payload (tokio doesn't support + // MSG_TRUNC-like flags on reads). + let mut buf = BytesMut::with_capacity(MAX_FRAME_BYTES); let (n_bytes, addr) = recv(socket, &mut buf).await; @@ -298,10 +316,33 @@ async fn read_frame(socket: &UdpSocket) -> Result<(usize, proto::Frame, SocketAd } /// Given a pre-allocated `frame`, clear and populate it with the provided -/// `payload` containing a set of [`FrameMessage`]. -fn populate_frame(frame: &mut proto::Frame, payload: impl IntoIterator) { - frame.messages.clear(); - frame.messages.extend(payload.into_iter()) +/// `payload` containing a set of [`FrameMessage`], serialising it to `buf`. +fn populate_frame( + frame: &mut proto::Frame, + payload: Vec, + buf: &mut Vec, +) -> Result<(), Error> { + frame.messages = payload; + + // Reading data from a UDP socket silently truncates if there's not enough + // buffer space to write the full packet payload. This library will + // pre-allocate a buffer of this size to read packets into, therefore all + // messages must be shorter than this value. + if frame.encoded_len() > MAX_FRAME_BYTES { + error!( + n_bytes=buf.len(), + n_max=%MAX_FRAME_BYTES, + "attempted to send frame larger than configured maximum" + ); + return Err(Error::MaxSize(buf.len())); + } + + buf.clear(); + frame.encode(buf).expect("buffer should grow"); + + debug_assert!(proto::Frame::decode(crate::Bytes::from(buf.clone())).is_ok()); + + Ok(()) } /// Instantiate a new [`FrameMessage`] from the given [`Payload`]. @@ -326,3 +367,40 @@ pub(crate) async fn ping(ping_frame: &[u8], socket: &UdpSocket, addr: SocketAddr } } } + +#[cfg(test)] +mod tests { + use crate::{MAX_USER_PAYLOAD_BYTES, USER_PAYLOAD_OVERHEAD}; + + use super::*; + + #[test] + fn test_user_frame_overhead() { + let identity = Identity::new(); + + // Generate a pre-populated frame header. + let mut frame = proto::Frame { + identity: identity.as_bytes().clone(), + messages: vec![], + }; + + let mut buf = Vec::new(); + populate_frame( + &mut frame, + vec![new_payload(Payload::UserData(proto::UserPayload { + payload: crate::Bytes::new(), // Empty/0-sized + }))], + &mut buf, + ) + .unwrap(); + + // The proto type should self-report the same size. + assert_eq!(buf.len(), frame.encoded_len()); + + // The overhead const should be accurate + assert_eq!(buf.len(), USER_PAYLOAD_OVERHEAD); + + // The max user payload size should be accurate. + assert_eq!(MAX_FRAME_BYTES - buf.len(), MAX_USER_PAYLOAD_BYTES); + } +} diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs index 554c282898..c46be5f343 100644 --- a/gossip/tests/smoke.rs +++ b/gossip/tests/smoke.rs @@ -48,7 +48,7 @@ async fn test_payload_exchange() { // Send the payload through peer A let a_payload = Bytes::from_static(b"bananas"); - a.broadcast(a_payload.clone()).await; + a.broadcast(a_payload.clone()).await.unwrap(); // Assert it was received by peer B let got = b_rx @@ -60,7 +60,7 @@ async fn test_payload_exchange() { // Do the reverse - send from B to A let b_payload = Bytes::from_static(b"platanos"); - b.broadcast(b_payload.clone()).await; + b.broadcast(b_payload.clone()).await.unwrap(); let got = a_rx .recv() .with_timeout_panic(TIMEOUT) From 48466bfa89491969f6d0571c834e3317fb6286a1 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 14 Jun 2023 15:12:50 +0200 Subject: [PATCH 07/17] feat(metrics): bytes/frames sent/received & peers Emit metrics tracking the number of bytes sent / received, and number of frames sent / received by the local node. Track the number of discovered peers to record peer discovery rate and current number of known peers per node. --- Cargo.lock | 1 + gossip/Cargo.toml | 1 + gossip/src/builder.rs | 8 ++++-- gossip/src/lib.rs | 1 + gossip/src/metric.rs | 60 +++++++++++++++++++++++++++++++++++++++ gossip/src/peers.rs | 40 ++++++++++++++++++++++---- gossip/src/reactor.rs | 66 ++++++++++++++++++++++++++++++++++--------- gossip/src/seed.rs | 10 +++++-- gossip/tests/smoke.rs | 8 ++++-- 9 files changed, 170 insertions(+), 25 deletions(-) create mode 100644 gossip/src/metric.rs diff --git a/Cargo.lock b/Cargo.lock index 6cc4a061ae..7a2b7b364a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2007,6 +2007,7 @@ dependencies = [ "async-trait", "futures", "hashbrown 0.14.0", + "metric", "prost", "prost-build", "test_helpers", diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 8000dd32f6..36110ca420 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" async-trait = "0.1.68" futures = "0.3.28" hashbrown.workspace = true +metric = { version = "0.1.0", path = "../metric" } prost = "0.11.9" thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] } diff --git a/gossip/src/builder.rs b/gossip/src/builder.rs index 503c8e42ad..35d7ebab83 100644 --- a/gossip/src/builder.rs +++ b/gossip/src/builder.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use tokio::{ net::{ToSocketAddrs, UdpSocket}, sync::mpsc, @@ -10,6 +12,7 @@ use crate::{handle::GossipHandle, reactor::Reactor, Dispatcher}; pub struct Builder { seed_addrs: Vec, dispatcher: T, + metric: Arc, } impl Builder { @@ -18,10 +21,11 @@ impl Builder { /// /// Each address in `seed_addrs` is re-resolved periodically and the first /// resolved IP address is used for peer communication. - pub fn new(seed_addrs: Vec, dispatcher: T) -> Self { + pub fn new(seed_addrs: Vec, dispatcher: T, metric: Arc) -> Self { Self { seed_addrs, dispatcher, + metric, } } } @@ -41,7 +45,7 @@ where let (tx, rx) = mpsc::channel(1000); // Initialise the reactor - let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher); + let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher, &self.metric); let identity = reactor.identity().clone(); // Start the message reactor. diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index c52e50e7e3..66ecad726f 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -38,6 +38,7 @@ mod builder; mod dispatcher; mod handle; +mod metric; mod peers; mod proto; mod reactor; diff --git a/gossip/src/metric.rs b/gossip/src/metric.rs new file mode 100644 index 0000000000..c7ae249458 --- /dev/null +++ b/gossip/src/metric.rs @@ -0,0 +1,60 @@ +//! Metric newtype wrappers for type safety. +//! +//! The metrics are easily confused (they're all counters) so have the compiler +//! check the right ones are being used in the right places. + +use metric::U64Counter; + +#[derive(Debug, Clone)] +pub(crate) struct SentFrames(metric::U64Counter); + +impl SentFrames { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +#[derive(Debug)] +pub(crate) struct ReceivedFrames(metric::U64Counter); + +impl ReceivedFrames { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct SentBytes(metric::U64Counter); + +impl SentBytes { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +#[derive(Debug)] +pub(crate) struct ReceivedBytes(metric::U64Counter); + +impl ReceivedBytes { + pub(crate) fn inc(&self, v: usize) { + self.0.inc(v as u64) + } +} + +pub(crate) fn new_metrics( + metrics: &metric::Registry, +) -> (SentFrames, ReceivedFrames, SentBytes, ReceivedBytes) { + let metric_frames = metrics.register_metric::( + "gossip_frames", + "number of frames sent/received by this node", + ); + let metric_bytes = metrics + .register_metric::("gossip_bytes", "sum of bytes sent/received by this node"); + + ( + SentFrames(metric_frames.recorder(&[("direction", "sent")])), + ReceivedFrames(metric_frames.recorder(&[("direction", "received")])), + SentBytes(metric_bytes.recorder(&[("direction", "sent")])), + ReceivedBytes(metric_bytes.recorder(&[("direction", "received")])), + ) +} diff --git a/gossip/src/peers.rs b/gossip/src/peers.rs index 6ab7f61607..f4531af87b 100644 --- a/gossip/src/peers.rs +++ b/gossip/src/peers.rs @@ -2,12 +2,16 @@ use std::{io, net::SocketAddr}; use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::{hash_map::RawEntryMut, HashMap}; +use metric::U64Counter; use prost::bytes::Bytes; use tokio::net::UdpSocket; use tracing::{trace, warn}; use uuid::Uuid; -use crate::MAX_FRAME_BYTES; +use crate::{ + metric::{SentBytes, SentFrames}, + MAX_FRAME_BYTES, +}; /// A unique generated identity containing 128 bits of randomness (V4 UUID). #[derive(Debug, Eq, Clone)] @@ -69,7 +73,13 @@ pub(crate) struct Peer { } impl Peer { - pub(crate) async fn send(&self, buf: &[u8], socket: &UdpSocket) -> Result { + pub(crate) async fn send( + &self, + buf: &[u8], + socket: &UdpSocket, + frames_sent: &SentFrames, + bytes_sent: &SentBytes, + ) -> Result { // If the frame is larger than the allowed maximum, then the receiver // will truncate the frame when reading the socket. // @@ -88,6 +98,8 @@ impl Peer { let ret = socket.send_to(buf, self.addr).await; match &ret { Ok(n_bytes) => { + frames_sent.inc(1); + bytes_sent.inc(*n_bytes); trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame") } Err(e) => { @@ -102,14 +114,25 @@ impl Peer { #[derive(Debug, Default)] pub(crate) struct PeerList { list: HashMap, + + /// The number of known, believed-to-be-healthy peers. + metric_peer_count: metric::U64Counter, } impl PeerList { /// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`] /// instances. - pub(crate) fn with_capacity(cap: usize) -> Self { + pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self { + let metric_peer_count = metrics + .register_metric::( + "gossip_known_peers", + "number of likely healthy peers known to this node", + ) + .recorder(&[]); + Self { list: HashMap::with_capacity(cap), + metric_peer_count, } } @@ -123,6 +146,7 @@ impl PeerList { pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer { let p = match self.list.raw_entry_mut().from_key(identity) { RawEntryMut::Vacant(v) => { + self.metric_peer_count.inc(1); v.insert( identity.to_owned(), Peer { @@ -141,10 +165,16 @@ impl PeerList { /// Broadcast `buf` to all known peers over `socket`, returning the number /// of bytes sent in total. - pub(crate) async fn broadcast(&self, buf: &[u8], socket: &UdpSocket) -> usize { + pub(crate) async fn broadcast( + &self, + buf: &[u8], + socket: &UdpSocket, + frames_sent: &SentFrames, + bytes_sent: &SentBytes, + ) -> usize { self.list .values() - .map(|v| v.send(buf, socket)) + .map(|v| v.send(buf, socket, frames_sent, bytes_sent)) .collect::>() .fold(0, |acc, res| async move { match res { diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index 9058398cfb..58c771b4ef 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -8,6 +8,7 @@ use tokio::{ use tracing::{debug, error, info, trace, warn}; use crate::{ + metric::*, peers::{Identity, PeerList}, proto::{self, frame_message::Payload, FrameMessage}, seed::{seed_ping_task, Seed}, @@ -83,14 +84,28 @@ pub(crate) struct Reactor { /// contain less peers than the number of initial seeds. peer_list: PeerList, + /// The UDP socket used for communication with peers. socket: Arc, + + /// The count of frames sent and received. + metric_frames_sent: SentFrames, + metric_frames_received: ReceivedFrames, + + /// The sum of bytes sent and received. + metric_bytes_sent: SentBytes, + metric_bytes_received: ReceivedBytes, } impl Reactor where T: Dispatcher, { - pub(crate) fn new(seed_list: Vec, socket: UdpSocket, dispatch: T) -> Self { + pub(crate) fn new( + seed_list: Vec, + socket: UdpSocket, + dispatch: T, + metrics: &metric::Registry, + ) -> Self { // Generate a unique UUID for this Reactor instance, and cache the wire // representation. let identity = Identity::new(); @@ -117,6 +132,11 @@ where serialisation_buf.clone() }; + // Initialise the various metrics with wrappers to help distinguish + // between the (very similar) counters. + let (metric_frames_sent, metric_frames_received, metric_bytes_sent, metric_bytes_received) = + new_metrics(metrics); + // Spawn a task that periodically pings all known seeds. // // Pinging all seeds announces this node as alive, propagating the @@ -126,6 +146,8 @@ where Arc::clone(&seed_list), Arc::clone(&socket), cached_ping_frame, + metric_frames_sent.clone(), + metric_bytes_sent.clone(), ))); Self { @@ -133,10 +155,14 @@ where identity, cached_frame, serialisation_buf, - peer_list: PeerList::with_capacity(seed_list.len()), + peer_list: PeerList::with_capacity(seed_list.len(), metrics), seed_list, _seed_ping_task: seed_ping_task, socket, + metric_frames_sent, + metric_frames_received, + metric_bytes_sent, + metric_bytes_received, } } @@ -148,10 +174,10 @@ where ); loop { - let (_bytes_read, _bytes_sent) = tokio::select! { + tokio::select! { msg = self.read() => { match msg { - Ok((bytes_read, bytes_sent)) => (bytes_read, bytes_sent), + Ok(()) => {}, Err(Error::NoPayload { peer, addr }) => { warn!(%peer, %addr, "message contains no payload"); continue; @@ -182,7 +208,6 @@ where } Some(Request::GetPeers(tx)) => { let _ = tx.send(self.peer_list.peer_uuids()); - (0, 0) }, Some(Request::Broadcast(payload)) => { // The user is guaranteed MAX_USER_PAYLOAD_BYTES to @@ -196,8 +221,7 @@ where { continue } - let bytes_sent = self.peer_list.broadcast(&self.serialisation_buf, &self.socket).await; - (0, bytes_sent) + self.peer_list.broadcast(&self.serialisation_buf, &self.socket, &self.metric_frames_sent, &self.metric_bytes_sent).await; } } } @@ -212,9 +236,11 @@ where /// returns the result to the sender of the original frame. /// /// Returns the bytes read and bytes sent during execution of this method. - async fn read(&mut self) -> Result<(usize, usize), Error> { + async fn read(&mut self) -> Result<(), Error> { // Read a frame into buf. let (bytes_read, frame, peer_addr) = read_frame(&self.socket).await?; + self.metric_frames_received.inc(1); + self.metric_bytes_received.inc(bytes_read as _); // Read the peer identity from the frame let identity = @@ -228,7 +254,7 @@ where // this node will not be added to the active peer list. if identity == self.identity { debug!(%identity, %peer_addr, bytes_read, "dropping frame from self"); - return Ok((bytes_read, 0)); + return Ok(()); } // Find or create the peer in the peer list. @@ -264,7 +290,7 @@ where // Sometimes no message will be returned to the peer - there's no need // to send an empty frame. if out_messages.is_empty() { - return Ok((bytes_read, 0)); + return Ok(()); } // Serialise the frame into the serialisation buffer. @@ -274,9 +300,15 @@ where &mut self.serialisation_buf, )?; - let bytes_sent = peer.send(&self.serialisation_buf, &self.socket).await?; + peer.send( + &self.serialisation_buf, + &self.socket, + &self.metric_frames_sent, + &self.metric_bytes_sent, + ) + .await?; - Ok((bytes_read, bytes_sent)) + Ok(()) } /// Return the randomised identity assigned to this instance. @@ -351,10 +383,18 @@ fn new_payload(p: Payload) -> proto::FrameMessage { } /// Send a PING message to `socket`, using `peer_name` as logging context. -pub(crate) async fn ping(ping_frame: &[u8], socket: &UdpSocket, addr: SocketAddr) -> usize { +pub(crate) async fn ping( + ping_frame: &[u8], + socket: &UdpSocket, + addr: SocketAddr, + sent_frames: &SentFrames, + sent_bytes: &SentBytes, +) -> usize { match socket.send_to(ping_frame, &addr).await { Ok(n_bytes) => { debug!(addr = %addr, "ping"); + sent_frames.inc(1); + sent_bytes.inc(n_bytes); n_bytes } Err(e) => { diff --git a/gossip/src/seed.rs b/gossip/src/seed.rs index 843a8960ba..97df4f245c 100644 --- a/gossip/src/seed.rs +++ b/gossip/src/seed.rs @@ -7,7 +7,11 @@ use tokio::{ }; use tracing::{debug, warn}; -use crate::{reactor::ping, RESOLVE_TIMEOUT, SEED_PING_INTERVAL}; +use crate::{ + metric::{SentBytes, SentFrames}, + reactor::ping, + RESOLVE_TIMEOUT, SEED_PING_INTERVAL, +}; /// The user-provided seed peer address. /// @@ -63,6 +67,8 @@ pub(super) async fn seed_ping_task( seeds: Arc<[Seed]>, socket: Arc, ping_frame: Vec, + sent_frames: SentFrames, + sent_bytes: SentBytes, ) { let mut interval = tokio::time::interval(SEED_PING_INTERVAL); @@ -77,7 +83,7 @@ pub(super) async fn seed_ping_task( .iter() .map(|seed| async { if let Some(addr) = seed.resolve().await { - ping(&ping_frame, &socket, addr).await + ping(&ping_frame, &socket, addr, &sent_frames, &sent_bytes).await } else { 0 } diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs index c46be5f343..16ff06c469 100644 --- a/gossip/tests/smoke.rs +++ b/gossip/tests/smoke.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use test_helpers::{maybe_start_logging, timeout::FutureTimeout}; use tokio::{net::UdpSocket, sync::mpsc}; @@ -11,6 +11,8 @@ use gossip::*; async fn test_payload_exchange() { maybe_start_logging(); + let metrics = Arc::new(metric::Registry::default()); + // How long to wait for peer discovery to complete. const TIMEOUT: Duration = Duration::from_secs(5); @@ -32,8 +34,8 @@ async fn test_payload_exchange() { // Initialise both reactors let addrs = dbg!(vec![a_addr.to_string(), b_addr.to_string()]); - let a = Builder::new(addrs.clone(), a_tx).build(a_socket); - let b = Builder::new(addrs, b_tx).build(b_socket); + let a = Builder::new(addrs.clone(), a_tx, Arc::clone(&metrics)).build(a_socket); + let b = Builder::new(addrs, b_tx, Arc::clone(&metrics)).build(b_socket); // Wait for peer discovery to occur async { From 58c48748800c3bb86473279a41ba5f3447e93f41 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 22 Jun 2023 17:11:26 +0200 Subject: [PATCH 08/17] chore: workspace_hack support Add workspace_hack and whitelist the import. --- Cargo.lock | 1 + gossip/Cargo.toml | 1 + gossip/src/lib.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 7a2b7b364a..8483a15a65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2015,6 +2015,7 @@ dependencies = [ "tokio", "tracing", "uuid", + "workspace-hack", ] [[package]] diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 36110ca420..aa19f7a46c 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -13,6 +13,7 @@ thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] } tracing = "0.1.37" uuid = { version = "1.3.3", features = ["v4"] } +workspace-hack = { version = "0.1", path = "../workspace-hack" } [build-dependencies] prost-build = "0.11.9" diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 66ecad726f..ab3aaf797a 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -49,6 +49,7 @@ use std::time::Duration; /// Work around the unused_crate_dependencies false positives for test deps. #[cfg(test)] use test_helpers as _; +use workspace_hack as _; pub use builder::*; pub use dispatcher::*; From 7880f9287f21104d454f59d1d247941ef4b786e6 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 22 Jun 2023 17:13:31 +0200 Subject: [PATCH 09/17] chore: add license --- gossip/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index aa19f7a46c..05c97e7d5f 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -2,6 +2,7 @@ name = "gossip" version = "0.1.0" edition = "2021" +license.workspace = true [dependencies] async-trait = "0.1.68" From 118aefe2d20bcd465ed4170bf0af6eec2e3ad0d4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 13:39:52 +0200 Subject: [PATCH 10/17] chore: use workspace crate config Inherit version/authors/edition from the workspace. --- gossip/Cargo.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 05c97e7d5f..ed67bd9a7f 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "gossip" -version = "0.1.0" -edition = "2021" +version.workspace = true +authors.workspace = true +edition.workspace = true license.workspace = true [dependencies] From bee1b45c1391c1f71ffc1063dcff3d5778943ed1 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 13:48:01 +0200 Subject: [PATCH 11/17] build: reuse path var DRY the path var. --- gossip/build.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/gossip/build.rs b/gossip/build.rs index 8207b46030..313904759b 100644 --- a/gossip/build.rs +++ b/gossip/build.rs @@ -4,15 +4,13 @@ use prost_build::Config; fn main() -> Result<(), Box> { let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("proto"); + let proto = root.join("gossip.proto"); - println!( - "cargo:rerun-if-changed={}", - root.join("gossip.proto").display() - ); + println!("cargo:rerun-if-changed={}", proto.display()); Config::new() .bytes(["."]) - .compile_protos(&[root.join("gossip.proto")], &[root])?; + .compile_protos(&[proto], &[root])?; Ok(()) } From 991692d2fb0153c732d79dbdeb076f90571469d4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 13:51:40 +0200 Subject: [PATCH 12/17] refactor: short/long panic message --- gossip/src/peers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/src/peers.rs b/gossip/src/peers.rs index f4531af87b..5e33f86620 100644 --- a/gossip/src/peers.rs +++ b/gossip/src/peers.rs @@ -213,7 +213,7 @@ mod tests { let v = Bytes::from_static(&[ 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, ]); - let _ = Identity::try_from(v).expect_err("short ID should fail"); + let _ = Identity::try_from(v).expect_err("long ID should fail"); } #[test] From 060f1b2ed6b83b985ebb423228c7cfe8d757e6a0 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 14:01:11 +0200 Subject: [PATCH 13/17] docs: unwrap correctness docs Describe the possible reasons a socket recvfrom() would cause a panic. --- gossip/src/reactor.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index 58c771b4ef..d547df6d6e 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -319,7 +319,13 @@ where /// Wait for a UDP datagram to become ready, and read it entirely into `buf`. async fn recv(socket: &UdpSocket, buf: &mut BytesMut) -> (usize, SocketAddr) { - let (n_bytes, addr) = socket.recv_buf_from(buf).await.expect("frame buffer maxed"); + let (n_bytes, addr) = socket + .recv_buf_from(buf) + .await + // These errors are libc's recvfrom() or converting the kernel-provided + // socket structure to rust's SocketAddr - neither should ever happen. + .expect("invalid recvfrom"); + trace!(%addr, n_bytes, "socket read"); (n_bytes, addr) } From 71625043e2b393eecc803e7c30fb3554c7a7881c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 14:02:57 +0200 Subject: [PATCH 14/17] test: remove dbg!() --- gossip/tests/smoke.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs index 16ff06c469..d1a8136455 100644 --- a/gossip/tests/smoke.rs +++ b/gossip/tests/smoke.rs @@ -33,7 +33,7 @@ async fn test_payload_exchange() { let (b_tx, mut b_rx) = mpsc::channel(5); // Initialise both reactors - let addrs = dbg!(vec![a_addr.to_string(), b_addr.to_string()]); + let addrs = vec![a_addr.to_string(), b_addr.to_string()]; let a = Builder::new(addrs.clone(), a_tx, Arc::clone(&metrics)).build(a_socket); let b = Builder::new(addrs, b_tx, Arc::clone(&metrics)).build(b_socket); From a686580ffaa1cd3c861954058d06e14374d26edf Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 14:03:57 +0200 Subject: [PATCH 15/17] test: multiple messages in single test This ensures various reused scratch buffers are wiped between uses. --- gossip/tests/smoke.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs index d1a8136455..efe4a83798 100644 --- a/gossip/tests/smoke.rs +++ b/gossip/tests/smoke.rs @@ -69,4 +69,15 @@ async fn test_payload_exchange() { .await .expect("reactor stopped"); assert_eq!(got, b_payload); + + // Send another payload through peer A (ensuring scratch buffers are + // correctly wiped, etc) + let a_payload = Bytes::from_static(b"platanos"); + a.broadcast(a_payload.clone()).await.unwrap(); + let got = b_rx + .recv() + .with_timeout_panic(TIMEOUT) + .await + .expect("reactor stopped"); + assert_eq!(got, a_payload); } From 701da1363cb2ba4250a434b49a253fbec38347c8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 14:10:03 +0200 Subject: [PATCH 16/17] refactor: remove panic on impossible error Remove the logical complexity of error handling for an error that cannot occur. This was an artifact of pre-PR refactoring - the error being returned SHOULD never be reached, as the only error returned is the "your message is too big" error, and that's not possible because the message size is validated in the GossipHandle::broadcast() method before it reaches the reactor. --- gossip/src/reactor.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index d547df6d6e..990b8d3eda 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -213,15 +213,18 @@ where // The user is guaranteed MAX_USER_PAYLOAD_BYTES to // be send-able, so send this frame without packing // others with it for simplicity. - if populate_frame( + populate_frame( &mut self.cached_frame, vec![new_payload(Payload::UserData(proto::UserPayload{payload}))], &mut self.serialisation_buf - ).is_err() - { - continue - } - self.peer_list.broadcast(&self.serialisation_buf, &self.socket, &self.metric_frames_sent, &self.metric_bytes_sent).await; + ).expect("size validated in handle at enqueue time"); + + self.peer_list.broadcast( + &self.serialisation_buf, + &self.socket, + &self.metric_frames_sent, + &self.metric_bytes_sent + ).await; } } } From c2273e64880b3e1411c754fc64013bc21e8724b3 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 10 Jul 2023 14:27:08 +0200 Subject: [PATCH 17/17] docs: remove outdated comment --- gossip/src/reactor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index 990b8d3eda..6173863afc 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -391,7 +391,7 @@ fn new_payload(p: Payload) -> proto::FrameMessage { proto::FrameMessage { payload: Some(p) } } -/// Send a PING message to `socket`, using `peer_name` as logging context. +/// Send a PING message to `socket`. pub(crate) async fn ping( ping_frame: &[u8], socket: &UdpSocket,