feat(gossip): peer exchange
This commit implements peer exchange (abbreviated PEX) between peers of the gossip cluster. This allows using a set of fixed seeds and dynamic node membership - nodes can come and go without having to be manually configured across all peers in order to communicate. "Dead" peers are periodically cleaned from the local list of active peers, ensuring the list of peers doesn't grow forever as node churn occurs. This is a best-effort, conservative process, biasing towards reliability/deliverability rather than accuracy and fast removal - it's not a health check!pull/24376/head
parent
16a3ff8dfe
commit
fc866ebe92
|
@ -24,7 +24,27 @@ message FrameMessage {
|
|||
}
|
||||
|
||||
message Ping {}
|
||||
message Pong {}
|
||||
|
||||
// A response to a PING, containg the set of peers known to the sender.
|
||||
//
|
||||
// A sequence of ping/pong frames acts as a peer-exchange mechanism between
|
||||
// peers.
|
||||
message Pong {
|
||||
// A set of peers known to the sender.
|
||||
//
|
||||
// Some of these peers may already be unreachable, and the receiver should not
|
||||
// add them to their peer list without validating liveness.
|
||||
repeated Peer peers = 1;
|
||||
}
|
||||
|
||||
message Peer {
|
||||
// A unique identifer (UUID) self-assigned by this peer as raw BE bytes.
|
||||
bytes identity = 1;
|
||||
|
||||
// A socket (IP & port) address in the form "ip:port", as discovered by the
|
||||
// PONG sender.
|
||||
string address = 2;
|
||||
}
|
||||
|
||||
// An application payload from the caller of the gossip library.
|
||||
message UserPayload {
|
||||
|
|
|
@ -26,9 +26,11 @@ impl Dispatcher for tokio::sync::mpsc::Sender<Bytes> {
|
|||
}
|
||||
}
|
||||
|
||||
// A no-op dispatcher.
|
||||
#[cfg(test)]
|
||||
/// A no-op [`Dispatcher`].
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub struct NopDispatcher;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Dispatcher for () {
|
||||
impl Dispatcher for NopDispatcher {
|
||||
async fn dispatch(&self, _payload: crate::Bytes) {}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,22 @@
|
|||
//! A work-in-progress, simple gossip primitive for metadata distribution
|
||||
//! A simple gossip & broadcast primitive for best-effort metadata distribution
|
||||
//! between IOx nodes.
|
||||
//!
|
||||
//! # Peers
|
||||
//!
|
||||
//! Peers are uniquely identified by their self-reported "identity" UUID. A
|
||||
//! unique UUID is generated for each gossip instance, ensuring the identity
|
||||
//! changes across restarts of the underlying node.
|
||||
//!
|
||||
//! An identity is associated with an immutable socket address used for peer
|
||||
//! communication.
|
||||
//!
|
||||
//! # Transport
|
||||
//!
|
||||
//! Prefer small payloads where possible, and expect loss of some messages -
|
||||
//! this primitive provides *best effort* delivery.
|
||||
//!
|
||||
//! This implementation sends unicast UDP frames between peers, with support for
|
||||
//! both control frames & user payloads. The maximum message size is 65,507
|
||||
//! both control frames & user payloads. The maximum UDP message size is 65,507
|
||||
//! bytes ([`MAX_USER_PAYLOAD_BYTES`] for application-level payloads), but a
|
||||
//! packet this large is fragmented into smaller (at most MTU-sized) packets and
|
||||
//! is at greater risk of being dropped due to a lost fragment.
|
||||
|
@ -20,6 +29,80 @@
|
|||
//!
|
||||
//! The security model of this implementation expects the peers to be running in
|
||||
//! a trusted environment, secure from malicious users.
|
||||
//!
|
||||
//! # Peer Exchange
|
||||
//!
|
||||
//! When a gossip instance is initialised, it advertises itself to the set of
|
||||
//! user-provided "seed" peers - other gossip instances with fixed, known
|
||||
//! addresses. The peer then bootstraps the peer list from these seed peers.
|
||||
//!
|
||||
//! Peers are discovered through PONG messages from peers, which contain the
|
||||
//! list of peers the sender has successfully communicated with.
|
||||
//!
|
||||
//! On receipt of a PONG frame, a node will send PING frames to all newly
|
||||
//! discovered peers without adding the peer to its local peer list. Once the
|
||||
//! discovered peer responds with a PONG, the peer is added to the peer list.
|
||||
//! This acts as a liveness check, ensuring a node only adds peers it can
|
||||
//! communicate with to its peer list.
|
||||
//!
|
||||
//! ```text
|
||||
//! ┌──────────┐
|
||||
//! │ Seed │
|
||||
//! └──────────┘
|
||||
//! ▲ │
|
||||
//! │ │
|
||||
//! (1) │ │ (2)
|
||||
//! PING │ │ PONG
|
||||
//! │ │ (contains Peer A)
|
||||
//! │ ▼
|
||||
//! ┌──────────┐
|
||||
//! │ Local │
|
||||
//! └──────────┘
|
||||
//! ▲ │
|
||||
//! │ │
|
||||
//! (4) │ │ (3)
|
||||
//! PONG │ │ PING
|
||||
//! │ │
|
||||
//! │ ▼
|
||||
//! ┌──────────┐
|
||||
//! │ Peer A │
|
||||
//! └──────────┘
|
||||
//! ```
|
||||
//!
|
||||
//! The above illustrates this process when the "local" node joins:
|
||||
//!
|
||||
//! 1. Send PING messages to all configured seeds
|
||||
//! 2. Receive a PONG response containing the list of all known peers
|
||||
//! 3. Send PING frames to all discovered peers - do not add to peer list
|
||||
//! 4. Receive PONG frames from discovered peers - add to peer list
|
||||
//!
|
||||
//! The peer addresses sent during PEX rounds contain the advertised peer
|
||||
//! identity and the socket address the PONG sender discovered.
|
||||
//!
|
||||
//! # Dead Peer Removal
|
||||
//!
|
||||
//! All peers are periodically sent a PING frame, and a per-peer counter is
|
||||
//! incremented. If a message of any sort is received (including the PONG
|
||||
//! response to the soliciting PING), the peer's counter is reset to 0.
|
||||
//!
|
||||
//! Once a peer's counter reaches [`MAX_PING_UNACKED`], indicating a number of
|
||||
//! PINGs have been sent without receiving any response, the peer is removed
|
||||
//! from the node's peer list.
|
||||
//!
|
||||
//! Dead peers age out of the cluster once all nodes perform the above routine.
|
||||
//! If a peer dies, it is still sent in PONG messages as part of PEX until it is
|
||||
//! removed from the sender's peer list, but the receiver of the PONG will not
|
||||
//! add it to the node's peer list unless it successfully commutates, ensuring
|
||||
//! dead peers are not propagated.
|
||||
//!
|
||||
//! Ageing out dead peers is strictly an optimisation (and not for correctness).
|
||||
//! A dead peer consumes a tiny amount of RAM, but also will have frames
|
||||
//! dispatched to it - over time, as the number of dead peers accumulates, this
|
||||
//! would cause the number of UDP frames sent per broadcast to increase,
|
||||
//! needlessly increasing gossip traffic.
|
||||
//!
|
||||
//! This process is heavily biased towards reliability/deliverability and is too
|
||||
//! slow for use as a general peer health check.
|
||||
|
||||
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
|
||||
#![warn(
|
||||
|
@ -34,6 +117,7 @@
|
|||
unused_crate_dependencies,
|
||||
missing_docs
|
||||
)]
|
||||
#![allow(clippy::default_constructed_unit_structs)]
|
||||
|
||||
mod builder;
|
||||
mod dispatcher;
|
||||
|
@ -60,7 +144,13 @@ pub use handle::*;
|
|||
const RESOLVE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Defines the interval between PING frames sent to all configured seed peers.
|
||||
const SEED_PING_INTERVAL: std::time::Duration = Duration::from_secs(15);
|
||||
const SEED_PING_INTERVAL: Duration = Duration::from_secs(60);
|
||||
|
||||
/// How often a PING frame should be sent to a known peer.
|
||||
///
|
||||
/// This value and [`MAX_PING_UNACKED`] defines the approximate duration of time
|
||||
/// until a dead peer is removed from the peer list.
|
||||
const PEER_PING_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The maximum payload size allowed.
|
||||
///
|
||||
|
@ -78,6 +168,18 @@ const USER_PAYLOAD_OVERHEAD: usize = 22;
|
|||
/// dropped. Smaller is always better for UDP transports!
|
||||
pub const MAX_USER_PAYLOAD_BYTES: usize = MAX_FRAME_BYTES - USER_PAYLOAD_OVERHEAD;
|
||||
|
||||
/// The number of PING messages sent to a peer without a response (of any kind)
|
||||
/// before the peer is considered dead and removed from the peer list.
|
||||
///
|
||||
/// Increasing this value does not affect correctness - messages will be sent to
|
||||
/// this peer for longer before being marked as dead, and the (small amount of)
|
||||
/// RAM used by this peer will be held for longer.
|
||||
///
|
||||
/// This value should be large so that the occasional dropped frame does not
|
||||
/// cause a peer to be spuriously marked as "dead" - doing so would cause it to
|
||||
/// miss broadcast frames until it is re-discovered.
|
||||
const MAX_PING_UNACKED: usize = 10;
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::assertions_on_constants)]
|
||||
mod tests {
|
||||
|
|
|
@ -5,12 +5,12 @@ use hashbrown::{hash_map::RawEntryMut, HashMap};
|
|||
use metric::U64Counter;
|
||||
use prost::bytes::Bytes;
|
||||
use tokio::net::UdpSocket;
|
||||
use tracing::{trace, warn};
|
||||
use tracing::{info, trace, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
metric::{SentBytes, SentFrames},
|
||||
MAX_FRAME_BYTES,
|
||||
MAX_FRAME_BYTES, MAX_PING_UNACKED,
|
||||
};
|
||||
|
||||
/// A unique generated identity containing 128 bits of randomness (V4 UUID).
|
||||
|
@ -53,6 +53,15 @@ impl TryFrom<Bytes> for Identity {
|
|||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for Identity {
|
||||
type Error = uuid::Error;
|
||||
|
||||
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
let uuid = Uuid::from_slice(&value)?;
|
||||
Ok(Self(Bytes::from(value), uuid))
|
||||
}
|
||||
}
|
||||
|
||||
impl Identity {
|
||||
/// Generate a new random identity.
|
||||
pub(crate) fn new() -> Self {
|
||||
|
@ -70,6 +79,12 @@ impl Identity {
|
|||
pub(crate) struct Peer {
|
||||
identity: Identity,
|
||||
addr: SocketAddr,
|
||||
|
||||
/// The number of PING messages sent to this peer since the last message
|
||||
/// observed from it.
|
||||
///
|
||||
/// This value is reset to 0 each time a message from this peer is received.
|
||||
unacked_ping_count: usize,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
|
@ -100,7 +115,7 @@ impl Peer {
|
|||
Ok(n_bytes) => {
|
||||
frames_sent.inc(1);
|
||||
bytes_sent.inc(*n_bytes);
|
||||
trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame")
|
||||
trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "socket write")
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error=%e, identity=%self.identity, peer_addr=%self.addr, "frame send error")
|
||||
|
@ -108,6 +123,26 @@ impl Peer {
|
|||
}
|
||||
ret
|
||||
}
|
||||
|
||||
/// Record that a PING has been sent to this peer, incrementing the unacked
|
||||
/// PING count and returning the new value.
|
||||
pub(crate) fn record_ping(&mut self) -> usize {
|
||||
self.unacked_ping_count += 1;
|
||||
self.unacked_ping_count
|
||||
}
|
||||
|
||||
pub(crate) fn mark_observed(&mut self) {
|
||||
self.unacked_ping_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Peer> for crate::proto::Peer {
|
||||
fn from(p: &Peer) -> Self {
|
||||
Self {
|
||||
identity: p.identity.as_bytes().clone(), // Ref-count
|
||||
address: p.addr.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The set of currently active/known peers.
|
||||
|
@ -115,24 +150,36 @@ impl Peer {
|
|||
pub(crate) struct PeerList {
|
||||
list: HashMap<Identity, Peer>,
|
||||
|
||||
/// The number of known, believed-to-be-healthy peers.
|
||||
metric_peer_count: metric::U64Counter,
|
||||
/// The number of peers discovered.
|
||||
metric_peer_discovered_count: metric::U64Counter,
|
||||
/// The number of peers removed due to health checks.
|
||||
metric_peer_removed_count: metric::U64Counter,
|
||||
// The difference between these two metrics will always match the number of
|
||||
// entries in the peer list (ignoring races & lack of thread
|
||||
// synchronisation).
|
||||
}
|
||||
|
||||
impl PeerList {
|
||||
/// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`]
|
||||
/// instances.
|
||||
pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self {
|
||||
let metric_peer_count = metrics
|
||||
let metric_peer_discovered_count = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"gossip_known_peers",
|
||||
"number of likely healthy peers known to this node",
|
||||
"gossip_peers_discovered",
|
||||
"number of peers discovered by this node",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let metric_peer_removed_count = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"gossip_peers_removed",
|
||||
"number of peers removed due to health check failures",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
Self {
|
||||
list: HashMap::with_capacity(cap),
|
||||
metric_peer_count,
|
||||
metric_peer_discovered_count,
|
||||
metric_peer_removed_count,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,17 +188,29 @@ impl PeerList {
|
|||
self.list.keys().map(|v| **v).collect()
|
||||
}
|
||||
|
||||
/// Returns an iterator of all known peers in the peer list.
|
||||
pub(crate) fn peers(&self) -> impl Iterator<Item = &'_ Peer> {
|
||||
self.list.values()
|
||||
}
|
||||
|
||||
/// Returns true if `identity` is already in the peer list.
|
||||
pub(crate) fn contains(&self, identity: &Identity) -> bool {
|
||||
self.list.contains_key(identity)
|
||||
}
|
||||
|
||||
/// Upsert a peer identified by `identity` to the peer list, associating it
|
||||
/// with the provided `peer_addr`.
|
||||
pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer {
|
||||
let p = match self.list.raw_entry_mut().from_key(identity) {
|
||||
RawEntryMut::Vacant(v) => {
|
||||
self.metric_peer_count.inc(1);
|
||||
info!(%identity, %peer_addr, "discovered new peer");
|
||||
self.metric_peer_discovered_count.inc(1);
|
||||
v.insert(
|
||||
identity.to_owned(),
|
||||
Peer {
|
||||
addr: peer_addr,
|
||||
identity: identity.to_owned(),
|
||||
unacked_ping_count: 0,
|
||||
},
|
||||
)
|
||||
.1
|
||||
|
@ -184,6 +243,36 @@ impl PeerList {
|
|||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send PING frames to all known nodes, and remove any that have not
|
||||
/// responded after [`MAX_PING_UNACKED`] attempts.
|
||||
pub(crate) async fn ping_gc(
|
||||
&mut self,
|
||||
ping_frame: &[u8],
|
||||
socket: &UdpSocket,
|
||||
frames_sent: &SentFrames,
|
||||
bytes_sent: &SentBytes,
|
||||
) {
|
||||
// First broadcast the PING frames.
|
||||
//
|
||||
// If a peer has exceeded the allowed maximum, it will be removed next,
|
||||
// but if it responds to this PING, it will be re-added again.
|
||||
self.broadcast(ping_frame, socket, frames_sent, bytes_sent)
|
||||
.await;
|
||||
|
||||
// Increment the PING counts and remove all peers that have not
|
||||
// responded to more than the allowed number of PINGs.
|
||||
let dead = self.list.extract_if(|_ident, p| {
|
||||
// Increment the counter and grab the incremented value.
|
||||
let missed = p.record_ping();
|
||||
missed > MAX_PING_UNACKED
|
||||
});
|
||||
|
||||
for (identity, p) in dead {
|
||||
warn!(%identity, addr=%p.addr, "removed unreachable peer");
|
||||
self.metric_peer_removed_count.inc(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -4,6 +4,7 @@ use prost::{bytes::BytesMut, Message};
|
|||
use tokio::{
|
||||
net::UdpSocket,
|
||||
sync::mpsc::{self},
|
||||
time,
|
||||
};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
|
@ -12,7 +13,7 @@ use crate::{
|
|||
peers::{Identity, PeerList},
|
||||
proto::{self, frame_message::Payload, FrameMessage},
|
||||
seed::{seed_ping_task, Seed},
|
||||
Dispatcher, Request, MAX_FRAME_BYTES,
|
||||
Dispatcher, Request, MAX_FRAME_BYTES, PEER_PING_INTERVAL,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -50,7 +51,7 @@ impl Drop for AbortOnDrop {
|
|||
}
|
||||
}
|
||||
|
||||
/// An event loop for gossip frames processing.
|
||||
/// An event loop actor for gossip frame processing.
|
||||
///
|
||||
/// This actor task is responsible for driving peer discovery, managing the set
|
||||
/// of known peers and exchanging gossip frames between peers.
|
||||
|
@ -67,6 +68,8 @@ pub(crate) struct Reactor<T> {
|
|||
|
||||
/// A cached wire frame, used to generate outgoing messages.
|
||||
cached_frame: proto::Frame,
|
||||
/// A cached, immutable ping frame.
|
||||
cached_ping_frame: Arc<[u8]>,
|
||||
/// A re-used buffer for serialising outgoing messages into.
|
||||
serialisation_buf: Vec<u8>,
|
||||
|
||||
|
@ -129,7 +132,7 @@ where
|
|||
&mut serialisation_buf,
|
||||
)
|
||||
.unwrap();
|
||||
serialisation_buf.clone()
|
||||
Arc::from(serialisation_buf.clone())
|
||||
};
|
||||
|
||||
// Initialise the various metrics with wrappers to help distinguish
|
||||
|
@ -145,7 +148,7 @@ where
|
|||
let seed_ping_task = AbortOnDrop(tokio::spawn(seed_ping_task(
|
||||
Arc::clone(&seed_list),
|
||||
Arc::clone(&socket),
|
||||
cached_ping_frame,
|
||||
Arc::clone(&cached_ping_frame),
|
||||
metric_frames_sent.clone(),
|
||||
metric_bytes_sent.clone(),
|
||||
)));
|
||||
|
@ -154,6 +157,7 @@ where
|
|||
dispatch,
|
||||
identity,
|
||||
cached_frame,
|
||||
cached_ping_frame,
|
||||
serialisation_buf,
|
||||
peer_list: PeerList::with_capacity(seed_list.len(), metrics),
|
||||
seed_list,
|
||||
|
@ -166,6 +170,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Execute the reactor event loop, handing requests from the
|
||||
/// [`GossipHandle`] over `rx`.
|
||||
///
|
||||
/// [`GossipHandle`]: crate::GossipHandle
|
||||
pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<Request>) {
|
||||
info!(
|
||||
identity = %self.identity,
|
||||
|
@ -173,9 +181,15 @@ where
|
|||
"gossip reactor started",
|
||||
);
|
||||
|
||||
// Start a timer to periodically perform health check PINGs and remove
|
||||
// dead nodes.
|
||||
let mut gc_interval = time::interval(PEER_PING_INTERVAL);
|
||||
gc_interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = self.read() => {
|
||||
// Process a message received from a peer.
|
||||
match msg {
|
||||
Ok(()) => {},
|
||||
Err(Error::NoPayload { peer, addr }) => {
|
||||
|
@ -201,6 +215,7 @@ where
|
|||
}
|
||||
}
|
||||
op = rx.recv() => {
|
||||
// Process an operation from the handle.
|
||||
match op {
|
||||
None => {
|
||||
info!("stopping gossip reactor");
|
||||
|
@ -228,6 +243,16 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
_ = gc_interval.tick() => {
|
||||
// Perform a periodic PING & prune dead peers.
|
||||
debug!("peer ping & gc sweep");
|
||||
self.peer_list.ping_gc(
|
||||
&self.cached_ping_frame,
|
||||
&self.socket,
|
||||
&self.metric_frames_sent,
|
||||
&self.metric_bytes_sent
|
||||
).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -260,10 +285,7 @@ where
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
// Find or create the peer in the peer list.
|
||||
let peer = self.peer_list.upsert(&identity, peer_addr);
|
||||
|
||||
let mut out_messages = Vec::with_capacity(1);
|
||||
let mut out_messages = Vec::new();
|
||||
for msg in frame.messages {
|
||||
// Extract the payload from the frame message
|
||||
let payload = msg.payload.ok_or_else(|| Error::NoPayload {
|
||||
|
@ -272,15 +294,28 @@ where
|
|||
})?;
|
||||
|
||||
// Handle the frame message from the peer, optionally returning a
|
||||
// response frame.
|
||||
// response frame to be sent back.
|
||||
let response = match payload {
|
||||
Payload::Ping(_) => Some(Payload::Pong(proto::Pong {})),
|
||||
Payload::Pong(_) => {
|
||||
debug!(%identity, %peer_addr, "pong");
|
||||
Payload::Ping(_) => Some(Payload::Pong(proto::Pong {
|
||||
peers: self.peer_list.peers().map(proto::Peer::from).collect(),
|
||||
})),
|
||||
Payload::Pong(pex) => {
|
||||
debug!(
|
||||
%identity,
|
||||
%peer_addr,
|
||||
pex_nodes=pex.peers.len(),
|
||||
"pong"
|
||||
);
|
||||
// If handling the PEX frame fails, the sender sent a bad
|
||||
// frame and is not added to the peer list / marked as
|
||||
// healthy.
|
||||
self.handle_pex(pex).await?;
|
||||
None
|
||||
}
|
||||
Payload::UserData(data) => {
|
||||
self.dispatch.dispatch(data.payload).await;
|
||||
let data = data.payload;
|
||||
debug!(%identity, %peer_addr, n_bytes=data.len(), "dispatch payload");
|
||||
self.dispatch.dispatch(data).await;
|
||||
None
|
||||
}
|
||||
};
|
||||
|
@ -290,6 +325,12 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// Find or create the peer in the peer list.
|
||||
let peer = self.peer_list.upsert(&identity, peer_addr);
|
||||
|
||||
// Track that this peer has been observed as healthy.
|
||||
peer.mark_observed();
|
||||
|
||||
// Sometimes no message will be returned to the peer - there's no need
|
||||
// to send an empty frame.
|
||||
if out_messages.is_empty() {
|
||||
|
@ -314,6 +355,60 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// The PONG response to a PING contains the set of peers known to the sender
|
||||
/// - this is the peer exchange mechanism.
|
||||
async fn handle_pex(&mut self, pex: proto::Pong) -> Result<(), Error> {
|
||||
// Process the peers from the remote, ignoring the local node and known
|
||||
// peers.
|
||||
for p in pex.peers {
|
||||
let pex_identity = match Identity::try_from(p.identity) {
|
||||
// Ignore any references to this node's identity, or known
|
||||
// peers.
|
||||
Ok(v) if v == self.identity => continue,
|
||||
Ok(v) if self.peer_list.contains(&v) => continue,
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
error=%e,
|
||||
"received invalid identity via PEX",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let pex_addr = match p.address.parse() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
%pex_identity,
|
||||
error=%e,
|
||||
"received invalid peer address via PEX",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Send a PING frame to this peer without adding it to the local
|
||||
// peer list.
|
||||
//
|
||||
// The peer will be added to the local peer list if it responds to
|
||||
// this solicitation (or otherwise communicates with the local
|
||||
// node), ensuring only live and reachable peers are added.
|
||||
//
|
||||
// Immediately ping this new peer if new (a fast UDP send).
|
||||
ping(
|
||||
&self.cached_ping_frame,
|
||||
&self.socket,
|
||||
pex_addr,
|
||||
&self.metric_frames_sent,
|
||||
&self.metric_bytes_sent,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the randomised identity assigned to this instance.
|
||||
pub(crate) fn identity(&self) -> &Identity {
|
||||
&self.identity
|
||||
|
@ -371,11 +466,11 @@ fn populate_frame(
|
|||
// messages must be shorter than this value.
|
||||
if frame.encoded_len() > MAX_FRAME_BYTES {
|
||||
error!(
|
||||
n_bytes=buf.len(),
|
||||
n_bytes=frame.encoded_len(),
|
||||
n_max=%MAX_FRAME_BYTES,
|
||||
"attempted to send frame larger than configured maximum"
|
||||
);
|
||||
return Err(Error::MaxSize(buf.len()));
|
||||
return Err(Error::MaxSize(frame.encoded_len()));
|
||||
}
|
||||
|
||||
buf.clear();
|
||||
|
@ -399,19 +494,21 @@ pub(crate) async fn ping(
|
|||
sent_frames: &SentFrames,
|
||||
sent_bytes: &SentBytes,
|
||||
) -> usize {
|
||||
// Check the payload length as a primitive/best-effort way of ensuring only
|
||||
// ping frames are sent via this mechanism.
|
||||
//
|
||||
// Normal messaging should be performed through a Peer's send() method.
|
||||
debug_assert_eq!(ping_frame.len(), 22);
|
||||
|
||||
match socket.send_to(ping_frame, &addr).await {
|
||||
Ok(n_bytes) => {
|
||||
debug!(addr = %addr, "ping");
|
||||
debug!(n_bytes, %addr, "ping");
|
||||
sent_frames.inc(1);
|
||||
sent_bytes.inc(n_bytes);
|
||||
n_bytes
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
error=%e,
|
||||
addr = %addr,
|
||||
"ping failed"
|
||||
);
|
||||
warn!(error=%e, %addr, "ping failed");
|
||||
0
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,10 +63,15 @@ async fn resolve(addr: &str) -> Option<SocketAddr> {
|
|||
///
|
||||
/// This method immediately pings all the seeds, and then pings periodically at
|
||||
/// [`SEED_PING_INTERVAL`].
|
||||
///
|
||||
/// Seeds must be periodically pinged to ensure they're discovered when they
|
||||
/// come online - if seeds were not pinged continuously, this node could become
|
||||
/// isolated from all peers, mark all known peers as dead, and would never
|
||||
/// rejoin.
|
||||
pub(super) async fn seed_ping_task(
|
||||
seeds: Arc<[Seed]>,
|
||||
socket: Arc<UdpSocket>,
|
||||
ping_frame: Vec<u8>,
|
||||
ping_frame: Arc<[u8]>,
|
||||
sent_frames: SentFrames,
|
||||
sent_bytes: SentBytes,
|
||||
) {
|
||||
|
|
|
@ -1,10 +1,31 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
#![allow(clippy::default_constructed_unit_structs)]
|
||||
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use test_helpers::{maybe_start_logging, timeout::FutureTimeout};
|
||||
use tokio::{net::UdpSocket, sync::mpsc};
|
||||
|
||||
use gossip::*;
|
||||
|
||||
// How long to wait for various time-limited test loops to complete.
|
||||
const TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Bind a UDP socket on a random port and return it alongside the socket
|
||||
/// address.
|
||||
async fn random_udp() -> (UdpSocket, SocketAddr) {
|
||||
// Bind a UDP socket to a random port
|
||||
let socket = UdpSocket::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("failed to bind UDP socket");
|
||||
let addr = socket.local_addr().expect("failed to read local addr");
|
||||
|
||||
(socket, addr)
|
||||
}
|
||||
|
||||
/// Assert that starting up a reactor performs the initial peer discovery
|
||||
/// from a set of seeds, resulting in both peers known of one another.
|
||||
#[tokio::test]
|
||||
|
@ -13,20 +34,8 @@ async fn test_payload_exchange() {
|
|||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
// How long to wait for peer discovery to complete.
|
||||
const TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
// Bind a UDP socket to a random port
|
||||
let a_socket = UdpSocket::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("failed to bind UDP socket");
|
||||
let a_addr = a_socket.local_addr().expect("failed to read local addr");
|
||||
|
||||
// And a socket for the second reactor
|
||||
let b_socket = UdpSocket::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("failed to bind UDP socket");
|
||||
let b_addr = b_socket.local_addr().expect("failed to read local addr");
|
||||
let (a_socket, a_addr) = random_udp().await;
|
||||
let (b_socket, b_addr) = random_udp().await;
|
||||
|
||||
// Initialise the dispatchers for the reactors
|
||||
let (a_tx, mut a_rx) = mpsc::channel(5);
|
||||
|
@ -81,3 +90,358 @@ async fn test_payload_exchange() {
|
|||
.expect("reactor stopped");
|
||||
assert_eq!(got, a_payload);
|
||||
}
|
||||
|
||||
/// Construct a set of peers such that peer exchange has to occur for them to
|
||||
/// know the full set of peers.
|
||||
///
|
||||
/// This configures the following topology:
|
||||
///
|
||||
/// ```
|
||||
/// A <--> B <--> C
|
||||
/// ```
|
||||
///
|
||||
/// Where only node B knows of both A & C - in order for A to discover C, it has
|
||||
/// to perform PEX with B, and the same for C to discover A.
|
||||
///
|
||||
/// To further drive the discovery mechanism, B is not informed of any peer - it
|
||||
/// discovers A & B through their announcement PINGs at startup.
|
||||
#[tokio::test]
|
||||
async fn test_peer_exchange() {
|
||||
maybe_start_logging();
|
||||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
let (a_socket, _a_addr) = random_udp().await;
|
||||
let (b_socket, b_addr) = random_udp().await;
|
||||
let (c_socket, _c_addr) = random_udp().await;
|
||||
|
||||
let (a_tx, mut a_rx) = mpsc::channel(5);
|
||||
|
||||
let a = Builder::new(vec![b_addr.to_string()], a_tx, Arc::clone(&metrics)).build(a_socket);
|
||||
let b = Builder::new(vec![], NopDispatcher::default(), Arc::clone(&metrics)).build(b_socket);
|
||||
let c = Builder::new(
|
||||
vec![b_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(c_socket);
|
||||
|
||||
// At this point, A is configured with B as a seed, B knows of no other
|
||||
// peers, and C knows of B.
|
||||
|
||||
// Wait for peer exchange to occur
|
||||
async {
|
||||
loop {
|
||||
if a.get_peers().await.len() == 2 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
let a_peers = a.get_peers().await;
|
||||
assert!(a_peers.contains(&b.identity()));
|
||||
assert!(a_peers.contains(&c.identity()));
|
||||
assert!(!a_peers.contains(&a.identity()));
|
||||
|
||||
let b_peers = b.get_peers().await;
|
||||
assert!(b_peers.contains(&a.identity()));
|
||||
assert!(b_peers.contains(&c.identity()));
|
||||
assert!(!b_peers.contains(&b.identity()));
|
||||
|
||||
let c_peers = c.get_peers().await;
|
||||
assert!(c_peers.contains(&a.identity()));
|
||||
assert!(c_peers.contains(&b.identity()));
|
||||
assert!(!c_peers.contains(&c.identity()));
|
||||
|
||||
// Prove that C has discovered A, which would only be possible through PEX
|
||||
// with B.
|
||||
let payload = Bytes::from_static(b"bananas");
|
||||
c.broadcast(payload.clone()).await.unwrap();
|
||||
|
||||
let got = a_rx
|
||||
.recv()
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await
|
||||
.expect("reactor stopped");
|
||||
assert_eq!(got, payload);
|
||||
}
|
||||
|
||||
/// Construct a set of peers such that a delayed peer exchange has to occur for
|
||||
/// them to know the full set of peers.
|
||||
///
|
||||
/// This configures the following topology:
|
||||
///
|
||||
/// ```
|
||||
/// A <--> B
|
||||
/// ```
|
||||
///
|
||||
/// And once PEX has completed between them, a new node C is introduced with B
|
||||
/// as a seed:
|
||||
///
|
||||
/// ```
|
||||
/// A <--> B <-- C
|
||||
/// ```
|
||||
///
|
||||
/// At which point A will discover C when C sends PING messages to all peers it
|
||||
/// discovers from B.
|
||||
#[tokio::test]
|
||||
async fn test_delayed_peer_exchange() {
|
||||
maybe_start_logging();
|
||||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
let (a_socket, a_addr) = random_udp().await;
|
||||
let (b_socket, b_addr) = random_udp().await;
|
||||
let (c_socket, _c_addr) = random_udp().await;
|
||||
|
||||
let a = Builder::new(
|
||||
vec![b_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(a_socket);
|
||||
let b = Builder::new(
|
||||
vec![a_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(b_socket);
|
||||
|
||||
// At this point, A is configured with B as a seed, B knows of no other
|
||||
// peers, and C knows of B.
|
||||
|
||||
// Wait for peer exchange to occur between A and B
|
||||
async {
|
||||
loop {
|
||||
if a.get_peers().await.len() == 1 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
let a_peers = a.get_peers().await;
|
||||
assert!(a_peers.contains(&b.identity()));
|
||||
|
||||
let b_peers = b.get_peers().await;
|
||||
assert!(b_peers.contains(&a.identity()));
|
||||
|
||||
// Start C now the initial PEX is complete, seeding it with the address of
|
||||
// B.
|
||||
let c = Builder::new(
|
||||
vec![b_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(c_socket);
|
||||
|
||||
// C will perform PEX with B, learning of A.
|
||||
async {
|
||||
loop {
|
||||
if c.get_peers().await.len() == 2 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
let c_peers = c.get_peers().await;
|
||||
assert!(c_peers.contains(&a.identity()));
|
||||
assert!(c_peers.contains(&b.identity()));
|
||||
|
||||
// And eventually A should discover C through B.
|
||||
async {
|
||||
loop {
|
||||
if a.get_peers().await.len() == 2 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
let a_peers = a.get_peers().await;
|
||||
assert!(a_peers.contains(&b.identity()));
|
||||
assert!(a_peers.contains(&c.identity()));
|
||||
}
|
||||
|
||||
/// Initialise a pair of nodes A & B with an unreachable third node C listed as
|
||||
/// a seed.
|
||||
///
|
||||
/// Ensure this seed C is not added to the local peer list.
|
||||
#[tokio::test]
|
||||
async fn test_seed_health_check() {
|
||||
maybe_start_logging();
|
||||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
let (a_socket, a_addr) = random_udp().await;
|
||||
let (b_socket, b_addr) = random_udp().await;
|
||||
let (_c_socket, c_addr) = random_udp().await;
|
||||
|
||||
let seeds = vec![a_addr.to_string(), b_addr.to_string(), c_addr.to_string()];
|
||||
let a = Builder::new(
|
||||
seeds.to_vec(),
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(a_socket);
|
||||
let b = Builder::new(seeds, NopDispatcher::default(), Arc::clone(&metrics)).build(b_socket);
|
||||
|
||||
// Wait for peer exchange to occur between A and B
|
||||
async {
|
||||
loop {
|
||||
if !a.get_peers().await.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
// Assert that only the live peers were added.
|
||||
let a_peers = a.get_peers().await;
|
||||
assert_eq!(a_peers.len(), 1);
|
||||
assert!(a_peers.contains(&b.identity()));
|
||||
|
||||
let b_peers = b.get_peers().await;
|
||||
assert_eq!(b_peers.len(), 1);
|
||||
assert!(b_peers.contains(&a.identity()));
|
||||
}
|
||||
|
||||
/// Initialise a pair of nodes A & B, kill B after PEX, and then introduce a new
|
||||
/// node C.
|
||||
///
|
||||
/// This test ensures that dead peers are not added to the joiner's local peer
|
||||
/// list, causing them to age out of the cluster and not be propagated forever.
|
||||
#[tokio::test]
|
||||
async fn test_discovery_health_check() {
|
||||
maybe_start_logging();
|
||||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
let (a_socket, a_addr) = random_udp().await;
|
||||
let (b_socket, b_addr) = random_udp().await;
|
||||
|
||||
let a = Builder::new(
|
||||
vec![b_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(a_socket);
|
||||
let b = Builder::new(
|
||||
vec![a_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(b_socket);
|
||||
|
||||
// Wait for peer exchange to occur between A and B
|
||||
async {
|
||||
loop {
|
||||
if a.get_peers().await.len() == 1 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
// Stop B.
|
||||
let b_identity = b.identity();
|
||||
drop(b);
|
||||
|
||||
// Introduce C to the cluster, seeding it with A
|
||||
let (c_socket, _c_addr) = random_udp().await;
|
||||
|
||||
let c = Builder::new(
|
||||
vec![a_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(c_socket);
|
||||
|
||||
// Wait for peer exchange to occur between A and B
|
||||
async {
|
||||
loop {
|
||||
if !c.get_peers().await.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
// Assert that only the live peer A was added.
|
||||
let c_peers = c.get_peers().await;
|
||||
assert_eq!(c_peers.len(), 1);
|
||||
assert!(c_peers.contains(&a.identity()));
|
||||
assert!(!c_peers.contains(&b_identity));
|
||||
}
|
||||
|
||||
/// Drive peer removal / age-out.
|
||||
///
|
||||
/// Start two nodes, wait for PEX to complete, kill one node, and wait for it to
|
||||
/// eventually remove the dead peer. Because this is a periodic/time-based
|
||||
/// action, this test uses the tokio's "auto-advance time" test functionality.
|
||||
#[tokio::test]
|
||||
async fn test_peer_removal() {
|
||||
maybe_start_logging();
|
||||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
let (a_socket, a_addr) = random_udp().await;
|
||||
let (b_socket, b_addr) = random_udp().await;
|
||||
|
||||
let a = Builder::new(
|
||||
vec![b_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(a_socket);
|
||||
let b = Builder::new(
|
||||
vec![a_addr.to_string()],
|
||||
NopDispatcher::default(),
|
||||
Arc::clone(&metrics),
|
||||
)
|
||||
.build(b_socket);
|
||||
|
||||
// Wait for peer exchange to occur between A and B
|
||||
async {
|
||||
loop {
|
||||
let peers = a.get_peers().await;
|
||||
if peers.len() == 1 {
|
||||
assert!(peers.contains(&b.identity()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(TIMEOUT)
|
||||
.await;
|
||||
|
||||
// Stop B.
|
||||
drop(b);
|
||||
|
||||
// Make time tick as fast as necessary to advance to the next timer event.
|
||||
tokio::time::pause();
|
||||
for _ in 0..50 {
|
||||
tokio::time::advance(Duration::from_secs(35)).await;
|
||||
}
|
||||
|
||||
// Use the stdlib time to avoid tokio's paused time and bound the loop time.
|
||||
let started_at = Instant::now();
|
||||
loop {
|
||||
let peers = a.get_peers().await;
|
||||
if peers.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
if Instant::now().duration_since(started_at) > TIMEOUT {
|
||||
panic!("timeout waiting for peer removal");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue