diff --git a/gossip/proto/gossip.proto b/gossip/proto/gossip.proto index 5af2c4814e..c9b1260a3c 100644 --- a/gossip/proto/gossip.proto +++ b/gossip/proto/gossip.proto @@ -24,7 +24,27 @@ message FrameMessage { } message Ping {} -message Pong {} + +// A response to a PING, containg the set of peers known to the sender. +// +// A sequence of ping/pong frames acts as a peer-exchange mechanism between +// peers. +message Pong { + // A set of peers known to the sender. + // + // Some of these peers may already be unreachable, and the receiver should not + // add them to their peer list without validating liveness. + repeated Peer peers = 1; +} + +message Peer { + // A unique identifer (UUID) self-assigned by this peer as raw BE bytes. + bytes identity = 1; + + // A socket (IP & port) address in the form "ip:port", as discovered by the + // PONG sender. + string address = 2; +} // An application payload from the caller of the gossip library. message UserPayload { diff --git a/gossip/src/dispatcher.rs b/gossip/src/dispatcher.rs index b6aa951679..cfeb5ff640 100644 --- a/gossip/src/dispatcher.rs +++ b/gossip/src/dispatcher.rs @@ -26,9 +26,11 @@ impl Dispatcher for tokio::sync::mpsc::Sender { } } -// A no-op dispatcher. -#[cfg(test)] +/// A no-op [`Dispatcher`]. +#[derive(Debug, Default, Clone, Copy)] +pub struct NopDispatcher; + #[async_trait::async_trait] -impl Dispatcher for () { +impl Dispatcher for NopDispatcher { async fn dispatch(&self, _payload: crate::Bytes) {} } diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index ab3aaf797a..d7a2299480 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -1,13 +1,22 @@ -//! A work-in-progress, simple gossip primitive for metadata distribution +//! A simple gossip & broadcast primitive for best-effort metadata distribution //! between IOx nodes. //! +//! # Peers +//! +//! Peers are uniquely identified by their self-reported "identity" UUID. A +//! unique UUID is generated for each gossip instance, ensuring the identity +//! changes across restarts of the underlying node. +//! +//! An identity is associated with an immutable socket address used for peer +//! communication. +//! //! # Transport //! //! Prefer small payloads where possible, and expect loss of some messages - //! this primitive provides *best effort* delivery. //! //! This implementation sends unicast UDP frames between peers, with support for -//! both control frames & user payloads. The maximum message size is 65,507 +//! both control frames & user payloads. The maximum UDP message size is 65,507 //! bytes ([`MAX_USER_PAYLOAD_BYTES`] for application-level payloads), but a //! packet this large is fragmented into smaller (at most MTU-sized) packets and //! is at greater risk of being dropped due to a lost fragment. @@ -20,6 +29,80 @@ //! //! The security model of this implementation expects the peers to be running in //! a trusted environment, secure from malicious users. +//! +//! # Peer Exchange +//! +//! When a gossip instance is initialised, it advertises itself to the set of +//! user-provided "seed" peers - other gossip instances with fixed, known +//! addresses. The peer then bootstraps the peer list from these seed peers. +//! +//! Peers are discovered through PONG messages from peers, which contain the +//! list of peers the sender has successfully communicated with. +//! +//! On receipt of a PONG frame, a node will send PING frames to all newly +//! discovered peers without adding the peer to its local peer list. Once the +//! discovered peer responds with a PONG, the peer is added to the peer list. +//! This acts as a liveness check, ensuring a node only adds peers it can +//! communicate with to its peer list. +//! +//! ```text +//! ┌──────────┐ +//! │ Seed │ +//! └──────────┘ +//! ▲ │ +//! │ │ +//! (1) │ │ (2) +//! PING │ │ PONG +//! │ │ (contains Peer A) +//! │ ▼ +//! ┌──────────┐ +//! │ Local │ +//! └──────────┘ +//! ▲ │ +//! │ │ +//! (4) │ │ (3) +//! PONG │ │ PING +//! │ │ +//! │ ▼ +//! ┌──────────┐ +//! │ Peer A │ +//! └──────────┘ +//! ``` +//! +//! The above illustrates this process when the "local" node joins: +//! +//! 1. Send PING messages to all configured seeds +//! 2. Receive a PONG response containing the list of all known peers +//! 3. Send PING frames to all discovered peers - do not add to peer list +//! 4. Receive PONG frames from discovered peers - add to peer list +//! +//! The peer addresses sent during PEX rounds contain the advertised peer +//! identity and the socket address the PONG sender discovered. +//! +//! # Dead Peer Removal +//! +//! All peers are periodically sent a PING frame, and a per-peer counter is +//! incremented. If a message of any sort is received (including the PONG +//! response to the soliciting PING), the peer's counter is reset to 0. +//! +//! Once a peer's counter reaches [`MAX_PING_UNACKED`], indicating a number of +//! PINGs have been sent without receiving any response, the peer is removed +//! from the node's peer list. +//! +//! Dead peers age out of the cluster once all nodes perform the above routine. +//! If a peer dies, it is still sent in PONG messages as part of PEX until it is +//! removed from the sender's peer list, but the receiver of the PONG will not +//! add it to the node's peer list unless it successfully commutates, ensuring +//! dead peers are not propagated. +//! +//! Ageing out dead peers is strictly an optimisation (and not for correctness). +//! A dead peer consumes a tiny amount of RAM, but also will have frames +//! dispatched to it - over time, as the number of dead peers accumulates, this +//! would cause the number of UDP frames sent per broadcast to increase, +//! needlessly increasing gossip traffic. +//! +//! This process is heavily biased towards reliability/deliverability and is too +//! slow for use as a general peer health check. #![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] #![warn( @@ -34,6 +117,7 @@ unused_crate_dependencies, missing_docs )] +#![allow(clippy::default_constructed_unit_structs)] mod builder; mod dispatcher; @@ -60,7 +144,13 @@ pub use handle::*; const RESOLVE_TIMEOUT: Duration = Duration::from_secs(5); /// Defines the interval between PING frames sent to all configured seed peers. -const SEED_PING_INTERVAL: std::time::Duration = Duration::from_secs(15); +const SEED_PING_INTERVAL: Duration = Duration::from_secs(60); + +/// How often a PING frame should be sent to a known peer. +/// +/// This value and [`MAX_PING_UNACKED`] defines the approximate duration of time +/// until a dead peer is removed from the peer list. +const PEER_PING_INTERVAL: Duration = Duration::from_secs(30); /// The maximum payload size allowed. /// @@ -78,6 +168,18 @@ const USER_PAYLOAD_OVERHEAD: usize = 22; /// dropped. Smaller is always better for UDP transports! pub const MAX_USER_PAYLOAD_BYTES: usize = MAX_FRAME_BYTES - USER_PAYLOAD_OVERHEAD; +/// The number of PING messages sent to a peer without a response (of any kind) +/// before the peer is considered dead and removed from the peer list. +/// +/// Increasing this value does not affect correctness - messages will be sent to +/// this peer for longer before being marked as dead, and the (small amount of) +/// RAM used by this peer will be held for longer. +/// +/// This value should be large so that the occasional dropped frame does not +/// cause a peer to be spuriously marked as "dead" - doing so would cause it to +/// miss broadcast frames until it is re-discovered. +const MAX_PING_UNACKED: usize = 10; + #[cfg(test)] #[allow(clippy::assertions_on_constants)] mod tests { diff --git a/gossip/src/peers.rs b/gossip/src/peers.rs index 5e33f86620..2ece063f0a 100644 --- a/gossip/src/peers.rs +++ b/gossip/src/peers.rs @@ -5,12 +5,12 @@ use hashbrown::{hash_map::RawEntryMut, HashMap}; use metric::U64Counter; use prost::bytes::Bytes; use tokio::net::UdpSocket; -use tracing::{trace, warn}; +use tracing::{info, trace, warn}; use uuid::Uuid; use crate::{ metric::{SentBytes, SentFrames}, - MAX_FRAME_BYTES, + MAX_FRAME_BYTES, MAX_PING_UNACKED, }; /// A unique generated identity containing 128 bits of randomness (V4 UUID). @@ -53,6 +53,15 @@ impl TryFrom for Identity { } } +impl TryFrom> for Identity { + type Error = uuid::Error; + + fn try_from(value: Vec) -> Result { + let uuid = Uuid::from_slice(&value)?; + Ok(Self(Bytes::from(value), uuid)) + } +} + impl Identity { /// Generate a new random identity. pub(crate) fn new() -> Self { @@ -70,6 +79,12 @@ impl Identity { pub(crate) struct Peer { identity: Identity, addr: SocketAddr, + + /// The number of PING messages sent to this peer since the last message + /// observed from it. + /// + /// This value is reset to 0 each time a message from this peer is received. + unacked_ping_count: usize, } impl Peer { @@ -100,7 +115,7 @@ impl Peer { Ok(n_bytes) => { frames_sent.inc(1); bytes_sent.inc(*n_bytes); - trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame") + trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "socket write") } Err(e) => { warn!(error=%e, identity=%self.identity, peer_addr=%self.addr, "frame send error") @@ -108,6 +123,26 @@ impl Peer { } ret } + + /// Record that a PING has been sent to this peer, incrementing the unacked + /// PING count and returning the new value. + pub(crate) fn record_ping(&mut self) -> usize { + self.unacked_ping_count += 1; + self.unacked_ping_count + } + + pub(crate) fn mark_observed(&mut self) { + self.unacked_ping_count = 0; + } +} + +impl From<&Peer> for crate::proto::Peer { + fn from(p: &Peer) -> Self { + Self { + identity: p.identity.as_bytes().clone(), // Ref-count + address: p.addr.to_string(), + } + } } /// The set of currently active/known peers. @@ -115,24 +150,36 @@ impl Peer { pub(crate) struct PeerList { list: HashMap, - /// The number of known, believed-to-be-healthy peers. - metric_peer_count: metric::U64Counter, + /// The number of peers discovered. + metric_peer_discovered_count: metric::U64Counter, + /// The number of peers removed due to health checks. + metric_peer_removed_count: metric::U64Counter, + // The difference between these two metrics will always match the number of + // entries in the peer list (ignoring races & lack of thread + // synchronisation). } impl PeerList { /// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`] /// instances. pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self { - let metric_peer_count = metrics + let metric_peer_discovered_count = metrics .register_metric::( - "gossip_known_peers", - "number of likely healthy peers known to this node", + "gossip_peers_discovered", + "number of peers discovered by this node", + ) + .recorder(&[]); + let metric_peer_removed_count = metrics + .register_metric::( + "gossip_peers_removed", + "number of peers removed due to health check failures", ) .recorder(&[]); Self { list: HashMap::with_capacity(cap), - metric_peer_count, + metric_peer_discovered_count, + metric_peer_removed_count, } } @@ -141,17 +188,29 @@ impl PeerList { self.list.keys().map(|v| **v).collect() } + /// Returns an iterator of all known peers in the peer list. + pub(crate) fn peers(&self) -> impl Iterator { + self.list.values() + } + + /// Returns true if `identity` is already in the peer list. + pub(crate) fn contains(&self, identity: &Identity) -> bool { + self.list.contains_key(identity) + } + /// Upsert a peer identified by `identity` to the peer list, associating it /// with the provided `peer_addr`. pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer { let p = match self.list.raw_entry_mut().from_key(identity) { RawEntryMut::Vacant(v) => { - self.metric_peer_count.inc(1); + info!(%identity, %peer_addr, "discovered new peer"); + self.metric_peer_discovered_count.inc(1); v.insert( identity.to_owned(), Peer { addr: peer_addr, identity: identity.to_owned(), + unacked_ping_count: 0, }, ) .1 @@ -184,6 +243,36 @@ impl PeerList { }) .await } + + /// Send PING frames to all known nodes, and remove any that have not + /// responded after [`MAX_PING_UNACKED`] attempts. + pub(crate) async fn ping_gc( + &mut self, + ping_frame: &[u8], + socket: &UdpSocket, + frames_sent: &SentFrames, + bytes_sent: &SentBytes, + ) { + // First broadcast the PING frames. + // + // If a peer has exceeded the allowed maximum, it will be removed next, + // but if it responds to this PING, it will be re-added again. + self.broadcast(ping_frame, socket, frames_sent, bytes_sent) + .await; + + // Increment the PING counts and remove all peers that have not + // responded to more than the allowed number of PINGs. + let dead = self.list.extract_if(|_ident, p| { + // Increment the counter and grab the incremented value. + let missed = p.record_ping(); + missed > MAX_PING_UNACKED + }); + + for (identity, p) in dead { + warn!(%identity, addr=%p.addr, "removed unreachable peer"); + self.metric_peer_removed_count.inc(1); + } + } } #[cfg(test)] diff --git a/gossip/src/reactor.rs b/gossip/src/reactor.rs index 6173863afc..ffeb3d6f0e 100644 --- a/gossip/src/reactor.rs +++ b/gossip/src/reactor.rs @@ -4,6 +4,7 @@ use prost::{bytes::BytesMut, Message}; use tokio::{ net::UdpSocket, sync::mpsc::{self}, + time, }; use tracing::{debug, error, info, trace, warn}; @@ -12,7 +13,7 @@ use crate::{ peers::{Identity, PeerList}, proto::{self, frame_message::Payload, FrameMessage}, seed::{seed_ping_task, Seed}, - Dispatcher, Request, MAX_FRAME_BYTES, + Dispatcher, Request, MAX_FRAME_BYTES, PEER_PING_INTERVAL, }; #[derive(Debug)] @@ -50,7 +51,7 @@ impl Drop for AbortOnDrop { } } -/// An event loop for gossip frames processing. +/// An event loop actor for gossip frame processing. /// /// This actor task is responsible for driving peer discovery, managing the set /// of known peers and exchanging gossip frames between peers. @@ -67,6 +68,8 @@ pub(crate) struct Reactor { /// A cached wire frame, used to generate outgoing messages. cached_frame: proto::Frame, + /// A cached, immutable ping frame. + cached_ping_frame: Arc<[u8]>, /// A re-used buffer for serialising outgoing messages into. serialisation_buf: Vec, @@ -129,7 +132,7 @@ where &mut serialisation_buf, ) .unwrap(); - serialisation_buf.clone() + Arc::from(serialisation_buf.clone()) }; // Initialise the various metrics with wrappers to help distinguish @@ -145,7 +148,7 @@ where let seed_ping_task = AbortOnDrop(tokio::spawn(seed_ping_task( Arc::clone(&seed_list), Arc::clone(&socket), - cached_ping_frame, + Arc::clone(&cached_ping_frame), metric_frames_sent.clone(), metric_bytes_sent.clone(), ))); @@ -154,6 +157,7 @@ where dispatch, identity, cached_frame, + cached_ping_frame, serialisation_buf, peer_list: PeerList::with_capacity(seed_list.len(), metrics), seed_list, @@ -166,6 +170,10 @@ where } } + /// Execute the reactor event loop, handing requests from the + /// [`GossipHandle`] over `rx`. + /// + /// [`GossipHandle`]: crate::GossipHandle pub(crate) async fn run(mut self, mut rx: mpsc::Receiver) { info!( identity = %self.identity, @@ -173,9 +181,15 @@ where "gossip reactor started", ); + // Start a timer to periodically perform health check PINGs and remove + // dead nodes. + let mut gc_interval = time::interval(PEER_PING_INTERVAL); + gc_interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + loop { tokio::select! { msg = self.read() => { + // Process a message received from a peer. match msg { Ok(()) => {}, Err(Error::NoPayload { peer, addr }) => { @@ -201,6 +215,7 @@ where } } op = rx.recv() => { + // Process an operation from the handle. match op { None => { info!("stopping gossip reactor"); @@ -228,6 +243,16 @@ where } } } + _ = gc_interval.tick() => { + // Perform a periodic PING & prune dead peers. + debug!("peer ping & gc sweep"); + self.peer_list.ping_gc( + &self.cached_ping_frame, + &self.socket, + &self.metric_frames_sent, + &self.metric_bytes_sent + ).await; + } }; } } @@ -260,10 +285,7 @@ where return Ok(()); } - // Find or create the peer in the peer list. - let peer = self.peer_list.upsert(&identity, peer_addr); - - let mut out_messages = Vec::with_capacity(1); + let mut out_messages = Vec::new(); for msg in frame.messages { // Extract the payload from the frame message let payload = msg.payload.ok_or_else(|| Error::NoPayload { @@ -272,15 +294,28 @@ where })?; // Handle the frame message from the peer, optionally returning a - // response frame. + // response frame to be sent back. let response = match payload { - Payload::Ping(_) => Some(Payload::Pong(proto::Pong {})), - Payload::Pong(_) => { - debug!(%identity, %peer_addr, "pong"); + Payload::Ping(_) => Some(Payload::Pong(proto::Pong { + peers: self.peer_list.peers().map(proto::Peer::from).collect(), + })), + Payload::Pong(pex) => { + debug!( + %identity, + %peer_addr, + pex_nodes=pex.peers.len(), + "pong" + ); + // If handling the PEX frame fails, the sender sent a bad + // frame and is not added to the peer list / marked as + // healthy. + self.handle_pex(pex).await?; None } Payload::UserData(data) => { - self.dispatch.dispatch(data.payload).await; + let data = data.payload; + debug!(%identity, %peer_addr, n_bytes=data.len(), "dispatch payload"); + self.dispatch.dispatch(data).await; None } }; @@ -290,6 +325,12 @@ where } } + // Find or create the peer in the peer list. + let peer = self.peer_list.upsert(&identity, peer_addr); + + // Track that this peer has been observed as healthy. + peer.mark_observed(); + // Sometimes no message will be returned to the peer - there's no need // to send an empty frame. if out_messages.is_empty() { @@ -314,6 +355,60 @@ where Ok(()) } + /// The PONG response to a PING contains the set of peers known to the sender + /// - this is the peer exchange mechanism. + async fn handle_pex(&mut self, pex: proto::Pong) -> Result<(), Error> { + // Process the peers from the remote, ignoring the local node and known + // peers. + for p in pex.peers { + let pex_identity = match Identity::try_from(p.identity) { + // Ignore any references to this node's identity, or known + // peers. + Ok(v) if v == self.identity => continue, + Ok(v) if self.peer_list.contains(&v) => continue, + Ok(v) => v, + Err(e) => { + warn!( + error=%e, + "received invalid identity via PEX", + ); + continue; + } + }; + + let pex_addr = match p.address.parse() { + Ok(v) => v, + Err(e) => { + warn!( + %pex_identity, + error=%e, + "received invalid peer address via PEX", + ); + continue; + } + }; + + // Send a PING frame to this peer without adding it to the local + // peer list. + // + // The peer will be added to the local peer list if it responds to + // this solicitation (or otherwise communicates with the local + // node), ensuring only live and reachable peers are added. + // + // Immediately ping this new peer if new (a fast UDP send). + ping( + &self.cached_ping_frame, + &self.socket, + pex_addr, + &self.metric_frames_sent, + &self.metric_bytes_sent, + ) + .await; + } + + Ok(()) + } + /// Return the randomised identity assigned to this instance. pub(crate) fn identity(&self) -> &Identity { &self.identity @@ -371,11 +466,11 @@ fn populate_frame( // messages must be shorter than this value. if frame.encoded_len() > MAX_FRAME_BYTES { error!( - n_bytes=buf.len(), + n_bytes=frame.encoded_len(), n_max=%MAX_FRAME_BYTES, "attempted to send frame larger than configured maximum" ); - return Err(Error::MaxSize(buf.len())); + return Err(Error::MaxSize(frame.encoded_len())); } buf.clear(); @@ -399,19 +494,21 @@ pub(crate) async fn ping( sent_frames: &SentFrames, sent_bytes: &SentBytes, ) -> usize { + // Check the payload length as a primitive/best-effort way of ensuring only + // ping frames are sent via this mechanism. + // + // Normal messaging should be performed through a Peer's send() method. + debug_assert_eq!(ping_frame.len(), 22); + match socket.send_to(ping_frame, &addr).await { Ok(n_bytes) => { - debug!(addr = %addr, "ping"); + debug!(n_bytes, %addr, "ping"); sent_frames.inc(1); sent_bytes.inc(n_bytes); n_bytes } Err(e) => { - warn!( - error=%e, - addr = %addr, - "ping failed" - ); + warn!(error=%e, %addr, "ping failed"); 0 } } diff --git a/gossip/src/seed.rs b/gossip/src/seed.rs index 97df4f245c..dfe35ed955 100644 --- a/gossip/src/seed.rs +++ b/gossip/src/seed.rs @@ -63,10 +63,15 @@ async fn resolve(addr: &str) -> Option { /// /// This method immediately pings all the seeds, and then pings periodically at /// [`SEED_PING_INTERVAL`]. +/// +/// Seeds must be periodically pinged to ensure they're discovered when they +/// come online - if seeds were not pinged continuously, this node could become +/// isolated from all peers, mark all known peers as dead, and would never +/// rejoin. pub(super) async fn seed_ping_task( seeds: Arc<[Seed]>, socket: Arc, - ping_frame: Vec, + ping_frame: Arc<[u8]>, sent_frames: SentFrames, sent_bytes: SentBytes, ) { diff --git a/gossip/tests/smoke.rs b/gossip/tests/smoke.rs index efe4a83798..73da102d2f 100644 --- a/gossip/tests/smoke.rs +++ b/gossip/tests/smoke.rs @@ -1,10 +1,31 @@ -use std::{sync::Arc, time::Duration}; +#![allow(clippy::default_constructed_unit_structs)] + +use std::{ + net::SocketAddr, + sync::Arc, + time::{Duration, Instant}, +}; use test_helpers::{maybe_start_logging, timeout::FutureTimeout}; use tokio::{net::UdpSocket, sync::mpsc}; use gossip::*; +// How long to wait for various time-limited test loops to complete. +const TIMEOUT: Duration = Duration::from_secs(5); + +/// Bind a UDP socket on a random port and return it alongside the socket +/// address. +async fn random_udp() -> (UdpSocket, SocketAddr) { + // Bind a UDP socket to a random port + let socket = UdpSocket::bind("127.0.0.1:0") + .await + .expect("failed to bind UDP socket"); + let addr = socket.local_addr().expect("failed to read local addr"); + + (socket, addr) +} + /// Assert that starting up a reactor performs the initial peer discovery /// from a set of seeds, resulting in both peers known of one another. #[tokio::test] @@ -13,20 +34,8 @@ async fn test_payload_exchange() { let metrics = Arc::new(metric::Registry::default()); - // How long to wait for peer discovery to complete. - const TIMEOUT: Duration = Duration::from_secs(5); - - // Bind a UDP socket to a random port - let a_socket = UdpSocket::bind("127.0.0.1:0") - .await - .expect("failed to bind UDP socket"); - let a_addr = a_socket.local_addr().expect("failed to read local addr"); - - // And a socket for the second reactor - let b_socket = UdpSocket::bind("127.0.0.1:0") - .await - .expect("failed to bind UDP socket"); - let b_addr = b_socket.local_addr().expect("failed to read local addr"); + let (a_socket, a_addr) = random_udp().await; + let (b_socket, b_addr) = random_udp().await; // Initialise the dispatchers for the reactors let (a_tx, mut a_rx) = mpsc::channel(5); @@ -81,3 +90,358 @@ async fn test_payload_exchange() { .expect("reactor stopped"); assert_eq!(got, a_payload); } + +/// Construct a set of peers such that peer exchange has to occur for them to +/// know the full set of peers. +/// +/// This configures the following topology: +/// +/// ``` +/// A <--> B <--> C +/// ``` +/// +/// Where only node B knows of both A & C - in order for A to discover C, it has +/// to perform PEX with B, and the same for C to discover A. +/// +/// To further drive the discovery mechanism, B is not informed of any peer - it +/// discovers A & B through their announcement PINGs at startup. +#[tokio::test] +async fn test_peer_exchange() { + maybe_start_logging(); + + let metrics = Arc::new(metric::Registry::default()); + + let (a_socket, _a_addr) = random_udp().await; + let (b_socket, b_addr) = random_udp().await; + let (c_socket, _c_addr) = random_udp().await; + + let (a_tx, mut a_rx) = mpsc::channel(5); + + let a = Builder::new(vec![b_addr.to_string()], a_tx, Arc::clone(&metrics)).build(a_socket); + let b = Builder::new(vec![], NopDispatcher::default(), Arc::clone(&metrics)).build(b_socket); + let c = Builder::new( + vec![b_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(c_socket); + + // At this point, A is configured with B as a seed, B knows of no other + // peers, and C knows of B. + + // Wait for peer exchange to occur + async { + loop { + if a.get_peers().await.len() == 2 { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + let a_peers = a.get_peers().await; + assert!(a_peers.contains(&b.identity())); + assert!(a_peers.contains(&c.identity())); + assert!(!a_peers.contains(&a.identity())); + + let b_peers = b.get_peers().await; + assert!(b_peers.contains(&a.identity())); + assert!(b_peers.contains(&c.identity())); + assert!(!b_peers.contains(&b.identity())); + + let c_peers = c.get_peers().await; + assert!(c_peers.contains(&a.identity())); + assert!(c_peers.contains(&b.identity())); + assert!(!c_peers.contains(&c.identity())); + + // Prove that C has discovered A, which would only be possible through PEX + // with B. + let payload = Bytes::from_static(b"bananas"); + c.broadcast(payload.clone()).await.unwrap(); + + let got = a_rx + .recv() + .with_timeout_panic(TIMEOUT) + .await + .expect("reactor stopped"); + assert_eq!(got, payload); +} + +/// Construct a set of peers such that a delayed peer exchange has to occur for +/// them to know the full set of peers. +/// +/// This configures the following topology: +/// +/// ``` +/// A <--> B +/// ``` +/// +/// And once PEX has completed between them, a new node C is introduced with B +/// as a seed: +/// +/// ``` +/// A <--> B <-- C +/// ``` +/// +/// At which point A will discover C when C sends PING messages to all peers it +/// discovers from B. +#[tokio::test] +async fn test_delayed_peer_exchange() { + maybe_start_logging(); + + let metrics = Arc::new(metric::Registry::default()); + + let (a_socket, a_addr) = random_udp().await; + let (b_socket, b_addr) = random_udp().await; + let (c_socket, _c_addr) = random_udp().await; + + let a = Builder::new( + vec![b_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(a_socket); + let b = Builder::new( + vec![a_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(b_socket); + + // At this point, A is configured with B as a seed, B knows of no other + // peers, and C knows of B. + + // Wait for peer exchange to occur between A and B + async { + loop { + if a.get_peers().await.len() == 1 { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + let a_peers = a.get_peers().await; + assert!(a_peers.contains(&b.identity())); + + let b_peers = b.get_peers().await; + assert!(b_peers.contains(&a.identity())); + + // Start C now the initial PEX is complete, seeding it with the address of + // B. + let c = Builder::new( + vec![b_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(c_socket); + + // C will perform PEX with B, learning of A. + async { + loop { + if c.get_peers().await.len() == 2 { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + let c_peers = c.get_peers().await; + assert!(c_peers.contains(&a.identity())); + assert!(c_peers.contains(&b.identity())); + + // And eventually A should discover C through B. + async { + loop { + if a.get_peers().await.len() == 2 { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + let a_peers = a.get_peers().await; + assert!(a_peers.contains(&b.identity())); + assert!(a_peers.contains(&c.identity())); +} + +/// Initialise a pair of nodes A & B with an unreachable third node C listed as +/// a seed. +/// +/// Ensure this seed C is not added to the local peer list. +#[tokio::test] +async fn test_seed_health_check() { + maybe_start_logging(); + + let metrics = Arc::new(metric::Registry::default()); + + let (a_socket, a_addr) = random_udp().await; + let (b_socket, b_addr) = random_udp().await; + let (_c_socket, c_addr) = random_udp().await; + + let seeds = vec![a_addr.to_string(), b_addr.to_string(), c_addr.to_string()]; + let a = Builder::new( + seeds.to_vec(), + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(a_socket); + let b = Builder::new(seeds, NopDispatcher::default(), Arc::clone(&metrics)).build(b_socket); + + // Wait for peer exchange to occur between A and B + async { + loop { + if !a.get_peers().await.is_empty() { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + // Assert that only the live peers were added. + let a_peers = a.get_peers().await; + assert_eq!(a_peers.len(), 1); + assert!(a_peers.contains(&b.identity())); + + let b_peers = b.get_peers().await; + assert_eq!(b_peers.len(), 1); + assert!(b_peers.contains(&a.identity())); +} + +/// Initialise a pair of nodes A & B, kill B after PEX, and then introduce a new +/// node C. +/// +/// This test ensures that dead peers are not added to the joiner's local peer +/// list, causing them to age out of the cluster and not be propagated forever. +#[tokio::test] +async fn test_discovery_health_check() { + maybe_start_logging(); + + let metrics = Arc::new(metric::Registry::default()); + + let (a_socket, a_addr) = random_udp().await; + let (b_socket, b_addr) = random_udp().await; + + let a = Builder::new( + vec![b_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(a_socket); + let b = Builder::new( + vec![a_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(b_socket); + + // Wait for peer exchange to occur between A and B + async { + loop { + if a.get_peers().await.len() == 1 { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + // Stop B. + let b_identity = b.identity(); + drop(b); + + // Introduce C to the cluster, seeding it with A + let (c_socket, _c_addr) = random_udp().await; + + let c = Builder::new( + vec![a_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(c_socket); + + // Wait for peer exchange to occur between A and B + async { + loop { + if !c.get_peers().await.is_empty() { + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + // Assert that only the live peer A was added. + let c_peers = c.get_peers().await; + assert_eq!(c_peers.len(), 1); + assert!(c_peers.contains(&a.identity())); + assert!(!c_peers.contains(&b_identity)); +} + +/// Drive peer removal / age-out. +/// +/// Start two nodes, wait for PEX to complete, kill one node, and wait for it to +/// eventually remove the dead peer. Because this is a periodic/time-based +/// action, this test uses the tokio's "auto-advance time" test functionality. +#[tokio::test] +async fn test_peer_removal() { + maybe_start_logging(); + + let metrics = Arc::new(metric::Registry::default()); + + let (a_socket, a_addr) = random_udp().await; + let (b_socket, b_addr) = random_udp().await; + + let a = Builder::new( + vec![b_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(a_socket); + let b = Builder::new( + vec![a_addr.to_string()], + NopDispatcher::default(), + Arc::clone(&metrics), + ) + .build(b_socket); + + // Wait for peer exchange to occur between A and B + async { + loop { + let peers = a.get_peers().await; + if peers.len() == 1 { + assert!(peers.contains(&b.identity())); + break; + } + } + } + .with_timeout_panic(TIMEOUT) + .await; + + // Stop B. + drop(b); + + // Make time tick as fast as necessary to advance to the next timer event. + tokio::time::pause(); + for _ in 0..50 { + tokio::time::advance(Duration::from_secs(35)).await; + } + + // Use the stdlib time to avoid tokio's paused time and bound the loop time. + let started_at = Instant::now(); + loop { + let peers = a.get_peers().await; + if peers.is_empty() { + break; + } + + if Instant::now().duration_since(started_at) > TIMEOUT { + panic!("timeout waiting for peer removal"); + } + } +}