Merge pull request #8051 from influxdata/dom/gossip-basic

feat: gossip primitive
pull/24376/head
Dom 2023-07-12 17:21:33 +01:00 committed by GitHub
commit 5232cfea1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1306 additions and 2 deletions

18
Cargo.lock generated
View File

@ -2018,6 +2018,24 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gossip"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"hashbrown 0.14.0",
"metric",
"prost",
"prost-build",
"test_helpers",
"thiserror",
"tokio",
"tracing",
"uuid",
"workspace-hack",
]
[[package]]
name = "grpc-binary-logger"
version = "0.1.0"

View File

@ -17,6 +17,7 @@ members = [
"flightsql",
"garbage_collector",
"generated_types",
"gossip",
"grpc-binary-logger-proto",
"grpc-binary-logger-test-proto",
"grpc-binary-logger",
@ -29,9 +30,9 @@ members = [
"influxdb_tsm",
"influxdb2_client",
"influxrpc_parser",
"ingester_query_grpc",
"ingester_test_ctx",
"ingester",
"ingester_query_grpc",
"iox_catalog",
"iox_data_generator",
"iox_query_influxql",
@ -82,8 +83,8 @@ members = [
"trace",
"tracker",
"trogging",
"wal",
"wal_inspect",
"wal",
"workspace-hack",
]
default-members = ["influxdb_iox"]

24
gossip/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "gossip"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.68"
futures = "0.3.28"
hashbrown.workspace = true
metric = { version = "0.1.0", path = "../metric" }
prost = "0.11.9"
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["net", "io-util", "time", "rt", "sync", "macros"] }
tracing = "0.1.37"
uuid = { version = "1.3.3", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[build-dependencies]
prost-build = "0.11.9"
[dev-dependencies]
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }

16
gossip/build.rs Normal file
View File

@ -0,0 +1,16 @@
use std::{error::Error, path::PathBuf};
use prost_build::Config;
fn main() -> Result<(), Box<dyn Error>> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("proto");
let proto = root.join("gossip.proto");
println!("cargo:rerun-if-changed={}", proto.display());
Config::new()
.bytes(["."])
.compile_protos(&[proto], &[root])?;
Ok(())
}

34
gossip/proto/gossip.proto Normal file
View File

@ -0,0 +1,34 @@
syntax = "proto3";
package influxdata.iox.gossip.v1;
option go_package = "github.com/influxdata/iox/gossip/v1";
// The payload of a single gossip datagram.
message Frame {
// Per-instance UUID as raw BE bytes.
bytes identity = 1;
// One or more user/control frames packed into a single message.
repeated FrameMessage messages = 2;
}
// A single gossip message within a frame.
message FrameMessage {
// Various user/control message types.
oneof payload {
Ping ping = 1;
Pong pong = 2;
// User-provided data payload.
UserPayload user_data = 3;
}
}
message Ping {}
message Pong {}
// An application payload from the caller of the gossip library.
message UserPayload {
// An opaque user payload - this is handed back to the gossip library user
// unmodified.
bytes payload = 1;
}

64
gossip/src/builder.rs Normal file
View File

@ -0,0 +1,64 @@
use std::sync::Arc;
use tokio::{
net::{ToSocketAddrs, UdpSocket},
sync::mpsc,
};
use crate::{handle::GossipHandle, reactor::Reactor, Dispatcher};
/// Gossip subsystem configuration and initialisation.
#[derive(Debug)]
pub struct Builder<T> {
seed_addrs: Vec<String>,
dispatcher: T,
metric: Arc<metric::Registry>,
}
impl<T> Builder<T> {
/// Use `seed_addrs` as seed peer addresses, and dispatch any application
/// messages to `dispatcher`.
///
/// Each address in `seed_addrs` is re-resolved periodically and the first
/// resolved IP address is used for peer communication.
pub fn new(seed_addrs: Vec<String>, dispatcher: T, metric: Arc<metric::Registry>) -> Self {
Self {
seed_addrs,
dispatcher,
metric,
}
}
}
impl<T> Builder<T>
where
T: Dispatcher + 'static,
{
/// Initialise the gossip subsystem using `socket` for communication.
///
/// # Panics
///
/// This call spawns a tokio task, and as such must be called from within a
/// tokio runtime.
pub fn build(self, socket: UdpSocket) -> GossipHandle {
// Obtain a channel to communicate between the actor, and all handles
let (tx, rx) = mpsc::channel(1000);
// Initialise the reactor
let reactor = Reactor::new(self.seed_addrs, socket, self.dispatcher, &self.metric);
let identity = reactor.identity().clone();
// Start the message reactor.
tokio::spawn(reactor.run(rx));
GossipHandle::new(tx, identity)
}
/// Bind to the provided socket address and initialise the gossip subsystem.
pub async fn bind<A>(self, bind_addr: A) -> Result<GossipHandle, std::io::Error>
where
A: ToSocketAddrs + Send,
{
Ok(self.build(UdpSocket::bind(bind_addr).await?))
}
}

34
gossip/src/dispatcher.rs Normal file
View File

@ -0,0 +1,34 @@
use async_trait::async_trait;
use tracing::warn;
// Re-export the bytes type to ensure upstream users of this crate are
// interacting with the same type.
pub use prost::bytes::Bytes;
/// A delegate abstraction through which the gossip subsystem propagates
/// application-level messages received from other peers.
#[async_trait]
pub trait Dispatcher: Send + Sync {
/// Invoked when an application-level payload is received from a peer.
///
/// This call should not block / should complete quickly to avoid blocking
/// the gossip reactor loop - if a long-running job must be started within
/// this call, consider spawning a separate task.
async fn dispatch(&self, payload: Bytes);
}
#[async_trait]
impl Dispatcher for tokio::sync::mpsc::Sender<Bytes> {
async fn dispatch(&self, payload: Bytes) {
if let Err(e) = self.send(payload).await {
warn!(error=%e, "error dispatching payload to application handler");
}
}
}
// A no-op dispatcher.
#[cfg(test)]
#[async_trait::async_trait]
impl Dispatcher for () {
async fn dispatch(&self, _payload: crate::Bytes) {}
}

73
gossip/src/handle.rs Normal file
View File

@ -0,0 +1,73 @@
use crate::{Bytes, MAX_USER_PAYLOAD_BYTES};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::peers::Identity;
/// An error indicating a send was attempted with a payload that exceeds
/// [`MAX_USER_PAYLOAD_BYTES`].
#[derive(Error, Debug)]
#[error("max allowed payload size exceeded")]
#[allow(missing_copy_implementations)]
pub struct PayloadSizeError {}
/// Requests sent to the [`Reactor`] actor task.
///
/// [`Reactor`]: crate::reactor::Reactor
#[derive(Debug)]
pub(crate) enum Request {
/// Broadcast the given payload to all known peers.
Broadcast(Bytes),
/// Get a snapshot of the peer identities.
GetPeers(oneshot::Sender<Vec<Uuid>>),
}
/// A handle to the gossip subsystem.
///
/// All resources used by the gossip system will be released once this
/// [`GossipHandle`] is dropped. To share the handle, wrap it in an [`Arc`].
///
/// [`Arc`]: std::sync::Arc
#[derive(Debug)]
pub struct GossipHandle {
tx: mpsc::Sender<Request>,
identity: Identity,
}
impl GossipHandle {
pub(crate) fn new(tx: mpsc::Sender<Request>, identity: Identity) -> Self {
Self { tx, identity }
}
/// Return the randomly generated identity of this gossip instance.
pub fn identity(&self) -> Uuid {
*self.identity
}
/// Broadcast `payload` to all known peers.
///
/// This is a best-effort operation - peers are not guaranteed to receive
/// this broadcast.
pub async fn broadcast<T>(&self, payload: T) -> Result<(), PayloadSizeError>
where
T: Into<Bytes> + Send,
{
let payload = payload.into();
if payload.len() > MAX_USER_PAYLOAD_BYTES {
return Err(PayloadSizeError {});
}
self.tx.send(Request::Broadcast(payload)).await.unwrap();
Ok(())
}
/// Retrieve a snapshot of the connected peer list.
pub async fn get_peers(&self) -> Vec<Uuid> {
let (tx, rx) = oneshot::channel();
self.tx.send(Request::GetPeers(tx)).await.unwrap();
rx.await.unwrap()
}
}

98
gossip/src/lib.rs Normal file
View File

@ -0,0 +1,98 @@
//! A work-in-progress, simple gossip primitive for metadata distribution
//! between IOx nodes.
//!
//! # Transport
//!
//! Prefer small payloads where possible, and expect loss of some messages -
//! this primitive provides *best effort* delivery.
//!
//! This implementation sends unicast UDP frames between peers, with support for
//! both control frames & user payloads. The maximum message size is 65,507
//! bytes ([`MAX_USER_PAYLOAD_BYTES`] for application-level payloads), but a
//! packet this large is fragmented into smaller (at most MTU-sized) packets and
//! is at greater risk of being dropped due to a lost fragment.
//!
//! # Security
//!
//! Messages exchanged between peers are unauthenticated and connectionless -
//! it's trivial to forge a message appearing to come from a different peer, or
//! include malicious payloads.
//!
//! The security model of this implementation expects the peers to be running in
//! a trusted environment, secure from malicious users.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
clippy::clone_on_ref_ptr,
clippy::dbg_macro,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::todo,
clippy::use_self,
missing_copy_implementations,
missing_debug_implementations,
unused_crate_dependencies,
missing_docs
)]
mod builder;
mod dispatcher;
mod handle;
mod metric;
mod peers;
mod proto;
mod reactor;
pub(crate) mod seed;
use std::time::Duration;
/// Work around the unused_crate_dependencies false positives for test deps.
#[cfg(test)]
use test_helpers as _;
use workspace_hack as _;
pub use builder::*;
pub use dispatcher::*;
pub use handle::*;
/// The maximum duration of time allotted to performing a DNS resolution against
/// a seed/peer address.
const RESOLVE_TIMEOUT: Duration = Duration::from_secs(5);
/// Defines the interval between PING frames sent to all configured seed peers.
const SEED_PING_INTERVAL: std::time::Duration = Duration::from_secs(15);
/// The maximum payload size allowed.
///
/// Attempting to send a serialised packet (inclusive of control frames/fields)
/// in excess of this amount will result in an error.
const MAX_FRAME_BYTES: usize = 1024 * 10;
/// The frame header overhead for user payloads.
const USER_PAYLOAD_OVERHEAD: usize = 22;
/// The maximum allowed byte size of user payloads.
///
/// Sending payloads of this size is discouraged as it leads to fragmentation of
/// the message and increases the chance of the message being undelivered /
/// dropped. Smaller is always better for UDP transports!
pub const MAX_USER_PAYLOAD_BYTES: usize = MAX_FRAME_BYTES - USER_PAYLOAD_OVERHEAD;
#[cfg(test)]
#[allow(clippy::assertions_on_constants)]
mod tests {
use super::*;
#[test]
fn test_max_msg_size() {
assert!(MAX_FRAME_BYTES < 65_536, "cannot exceed UDP maximum");
}
#[test]
fn test_max_user_payload_size() {
assert_eq!(
MAX_USER_PAYLOAD_BYTES, 10_218,
"applications may depend on this value not changing"
);
}
}

60
gossip/src/metric.rs Normal file
View File

@ -0,0 +1,60 @@
//! Metric newtype wrappers for type safety.
//!
//! The metrics are easily confused (they're all counters) so have the compiler
//! check the right ones are being used in the right places.
use metric::U64Counter;
#[derive(Debug, Clone)]
pub(crate) struct SentFrames(metric::U64Counter);
impl SentFrames {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
#[derive(Debug)]
pub(crate) struct ReceivedFrames(metric::U64Counter);
impl ReceivedFrames {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
#[derive(Debug, Clone)]
pub(crate) struct SentBytes(metric::U64Counter);
impl SentBytes {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
#[derive(Debug)]
pub(crate) struct ReceivedBytes(metric::U64Counter);
impl ReceivedBytes {
pub(crate) fn inc(&self, v: usize) {
self.0.inc(v as u64)
}
}
pub(crate) fn new_metrics(
metrics: &metric::Registry,
) -> (SentFrames, ReceivedFrames, SentBytes, ReceivedBytes) {
let metric_frames = metrics.register_metric::<U64Counter>(
"gossip_frames",
"number of frames sent/received by this node",
);
let metric_bytes = metrics
.register_metric::<U64Counter>("gossip_bytes", "sum of bytes sent/received by this node");
(
SentFrames(metric_frames.recorder(&[("direction", "sent")])),
ReceivedFrames(metric_frames.recorder(&[("direction", "received")])),
SentBytes(metric_bytes.recorder(&[("direction", "sent")])),
ReceivedBytes(metric_bytes.recorder(&[("direction", "received")])),
)
}

244
gossip/src/peers.rs Normal file
View File

@ -0,0 +1,244 @@
use std::{io, net::SocketAddr};
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::{hash_map::RawEntryMut, HashMap};
use metric::U64Counter;
use prost::bytes::Bytes;
use tokio::net::UdpSocket;
use tracing::{trace, warn};
use uuid::Uuid;
use crate::{
metric::{SentBytes, SentFrames},
MAX_FRAME_BYTES,
};
/// A unique generated identity containing 128 bits of randomness (V4 UUID).
#[derive(Debug, Eq, Clone)]
pub(crate) struct Identity(Bytes, Uuid);
impl std::ops::Deref for Identity {
type Target = Uuid;
fn deref(&self) -> &Self::Target {
&self.1
}
}
impl std::hash::Hash for Identity {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
impl PartialEq for Identity {
fn eq(&self, other: &Self) -> bool {
debug_assert!((self.1 == other.1) == (self.0 == other.0));
self.0 == other.0
}
}
impl std::fmt::Display for Identity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.1.fmt(f)
}
}
impl TryFrom<Bytes> for Identity {
type Error = uuid::Error;
fn try_from(value: Bytes) -> Result<Self, Self::Error> {
let uuid = Uuid::from_slice(&value)?;
Ok(Self(value, uuid))
}
}
impl Identity {
/// Generate a new random identity.
pub(crate) fn new() -> Self {
let id = Uuid::new_v4();
Self(Bytes::from(id.as_bytes().to_vec()), id)
}
pub(crate) fn as_bytes(&self) -> &Bytes {
&self.0
}
}
/// A discovered peer within the gossip cluster.
#[derive(Debug, Clone)]
pub(crate) struct Peer {
identity: Identity,
addr: SocketAddr,
}
impl Peer {
pub(crate) async fn send(
&self,
buf: &[u8],
socket: &UdpSocket,
frames_sent: &SentFrames,
bytes_sent: &SentBytes,
) -> Result<usize, io::Error> {
// If the frame is larger than the allowed maximum, then the receiver
// will truncate the frame when reading the socket.
//
// Never send frames that will be unprocessable.
if buf.len() > MAX_FRAME_BYTES {
warn!(
n_bytes = buf.len(),
"not sending oversized packet - receiver would truncate"
);
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"max frame size exceeded",
));
}
let ret = socket.send_to(buf, self.addr).await;
match &ret {
Ok(n_bytes) => {
frames_sent.inc(1);
bytes_sent.inc(*n_bytes);
trace!(identity=%self.identity, n_bytes, peer_addr=%self.addr, "send frame")
}
Err(e) => {
warn!(error=%e, identity=%self.identity, peer_addr=%self.addr, "frame send error")
}
}
ret
}
}
/// The set of currently active/known peers.
#[derive(Debug, Default)]
pub(crate) struct PeerList {
list: HashMap<Identity, Peer>,
/// The number of known, believed-to-be-healthy peers.
metric_peer_count: metric::U64Counter,
}
impl PeerList {
/// Initialise the [`PeerList`] with capacity for `cap` number of [`Peer`]
/// instances.
pub(crate) fn with_capacity(cap: usize, metrics: &metric::Registry) -> Self {
let metric_peer_count = metrics
.register_metric::<U64Counter>(
"gossip_known_peers",
"number of likely healthy peers known to this node",
)
.recorder(&[]);
Self {
list: HashMap::with_capacity(cap),
metric_peer_count,
}
}
/// Return the UUIDs of all known peers.
pub(crate) fn peer_uuids(&self) -> Vec<Uuid> {
self.list.keys().map(|v| **v).collect()
}
/// Upsert a peer identified by `identity` to the peer list, associating it
/// with the provided `peer_addr`.
pub(crate) fn upsert(&mut self, identity: &Identity, peer_addr: SocketAddr) -> &mut Peer {
let p = match self.list.raw_entry_mut().from_key(identity) {
RawEntryMut::Vacant(v) => {
self.metric_peer_count.inc(1);
v.insert(
identity.to_owned(),
Peer {
addr: peer_addr,
identity: identity.to_owned(),
},
)
.1
}
RawEntryMut::Occupied(v) => v.into_mut(),
};
p.addr = peer_addr;
p
}
/// Broadcast `buf` to all known peers over `socket`, returning the number
/// of bytes sent in total.
pub(crate) async fn broadcast(
&self,
buf: &[u8],
socket: &UdpSocket,
frames_sent: &SentFrames,
bytes_sent: &SentBytes,
) -> usize {
self.list
.values()
.map(|v| v.send(buf, socket, frames_sent, bytes_sent))
.collect::<FuturesUnordered<_>>()
.fold(0, |acc, res| async move {
match res {
Ok(n) => acc + n,
Err(_) => acc,
}
})
.await
}
}
#[cfg(test)]
mod tests {
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
use super::*;
#[test]
fn test_identity_round_trip() {
let a = Identity::new();
let encoded = a.as_bytes().to_owned();
let decoded = Identity::try_from(encoded).unwrap();
assert_eq!(decoded, a);
}
#[test]
fn test_identity_length_mismatch() {
let v = Bytes::from_static(&[42, 42, 42, 42]);
let _ = Identity::try_from(v).expect_err("short ID should fail");
let v = Bytes::from_static(&[
42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
]);
let _ = Identity::try_from(v).expect_err("long ID should fail");
}
#[test]
fn test_identity_eq() {
let v = Identity::new();
assert_eq!(v.clone(), v);
assert_eq!(hash_identity(&v), hash_identity(&v));
let other = Identity::new();
assert_ne!(v, other);
assert_ne!(hash_identity(&other), hash_identity(&v));
}
#[test]
fn test_identity_display() {
let v = Identity::new();
let text = v.to_string();
let uuid = Uuid::try_parse(&text).expect("display impl should output valid uuids");
assert_eq!(*v, uuid);
}
fn hash_identity(v: &Identity) -> u64 {
let mut h = DefaultHasher::default();
v.hash(&mut h);
h.finish()
}
}

3
gossip/src/proto.rs Normal file
View File

@ -0,0 +1,3 @@
//! Proto definitions of gossip message wire types.
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.gossip.v1.rs"));

455
gossip/src/reactor.rs Normal file
View File

@ -0,0 +1,455 @@
use std::{net::SocketAddr, sync::Arc};
use prost::{bytes::BytesMut, Message};
use tokio::{
net::UdpSocket,
sync::mpsc::{self},
};
use tracing::{debug, error, info, trace, warn};
use crate::{
metric::*,
peers::{Identity, PeerList},
proto::{self, frame_message::Payload, FrameMessage},
seed::{seed_ping_task, Seed},
Dispatcher, Request, MAX_FRAME_BYTES,
};
#[derive(Debug)]
enum Error {
NoPayload {
peer: Identity,
addr: SocketAddr,
},
Deserialise {
addr: SocketAddr,
source: prost::DecodeError,
},
Identity {
addr: SocketAddr,
},
Io(std::io::Error),
MaxSize(usize),
}
impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
#[derive(Debug)]
struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort()
}
}
/// An event loop for gossip frames processing.
///
/// This actor task is responsible for driving peer discovery, managing the set
/// of known peers and exchanging gossip frames between peers.
///
/// A user interacts with a [`Reactor`] through a [`GossipHandle`].
///
/// [`GossipHandle`]: crate::GossipHandle
#[derive(Debug)]
pub(crate) struct Reactor<T> {
dispatch: T,
/// The random identity of this gossip instance.
identity: Identity,
/// A cached wire frame, used to generate outgoing messages.
cached_frame: proto::Frame,
/// A re-used buffer for serialising outgoing messages into.
serialisation_buf: Vec<u8>,
/// The immutable list of seed addresses provided by the user, periodically
/// pinged.
seed_list: Arc<[Seed]>,
/// A task that periodically sends PING frames to all seeds, executing in a
/// separate task so that DNS resolution does not block the reactor loop.
_seed_ping_task: AbortOnDrop,
/// The set of active peers this node has communicated with and believes to
/// be (recently) healthy.
///
/// Depending on the perceived availability of the seed nodes, this may
/// contain less peers than the number of initial seeds.
peer_list: PeerList,
/// The UDP socket used for communication with peers.
socket: Arc<UdpSocket>,
/// The count of frames sent and received.
metric_frames_sent: SentFrames,
metric_frames_received: ReceivedFrames,
/// The sum of bytes sent and received.
metric_bytes_sent: SentBytes,
metric_bytes_received: ReceivedBytes,
}
impl<T> Reactor<T>
where
T: Dispatcher,
{
pub(crate) fn new(
seed_list: Vec<String>,
socket: UdpSocket,
dispatch: T,
metrics: &metric::Registry,
) -> Self {
// Generate a unique UUID for this Reactor instance, and cache the wire
// representation.
let identity = Identity::new();
let seed_list = seed_list.into_iter().map(Seed::new).collect();
let socket = Arc::new(socket);
let mut serialisation_buf = Vec::with_capacity(1024);
// Generate a pre-populated frame header.
let mut cached_frame = proto::Frame {
identity: identity.as_bytes().clone(),
messages: Vec::with_capacity(1),
};
// A ping frame is static over the lifetime of a Reactor instance, so it
// can be pre-serialised, cached, and reused for every ping.
let cached_ping_frame = {
populate_frame(
&mut cached_frame,
vec![new_payload(Payload::Ping(proto::Ping {}))],
&mut serialisation_buf,
)
.unwrap();
serialisation_buf.clone()
};
// Initialise the various metrics with wrappers to help distinguish
// between the (very similar) counters.
let (metric_frames_sent, metric_frames_received, metric_bytes_sent, metric_bytes_received) =
new_metrics(metrics);
// Spawn a task that periodically pings all known seeds.
//
// Pinging all seeds announces this node as alive, propagating the
// instance UUID, and requesting PONG responses to drive population of
// the active peer list.
let seed_ping_task = AbortOnDrop(tokio::spawn(seed_ping_task(
Arc::clone(&seed_list),
Arc::clone(&socket),
cached_ping_frame,
metric_frames_sent.clone(),
metric_bytes_sent.clone(),
)));
Self {
dispatch,
identity,
cached_frame,
serialisation_buf,
peer_list: PeerList::with_capacity(seed_list.len(), metrics),
seed_list,
_seed_ping_task: seed_ping_task,
socket,
metric_frames_sent,
metric_frames_received,
metric_bytes_sent,
metric_bytes_received,
}
}
pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<Request>) {
info!(
identity = %self.identity,
seed_list = ?self.seed_list,
"gossip reactor started",
);
loop {
tokio::select! {
msg = self.read() => {
match msg {
Ok(()) => {},
Err(Error::NoPayload { peer, addr }) => {
warn!(%peer, %addr, "message contains no payload");
continue;
}
Err(Error::Deserialise { addr, source }) => {
warn!(error=%source, %addr, "error deserialising frame");
continue;
}
Err(Error::Identity { addr }) => {
warn!(%addr, "invalid identity value in frame");
continue;
}
Err(Error::Io(error)) => {
error!(%error, "i/o error");
continue;
}
Err(Error::MaxSize(_)) => {
// Logged at source
continue;
}
}
}
op = rx.recv() => {
match op {
None => {
info!("stopping gossip reactor");
return;
}
Some(Request::GetPeers(tx)) => {
let _ = tx.send(self.peer_list.peer_uuids());
},
Some(Request::Broadcast(payload)) => {
// The user is guaranteed MAX_USER_PAYLOAD_BYTES to
// be send-able, so send this frame without packing
// others with it for simplicity.
populate_frame(
&mut self.cached_frame,
vec![new_payload(Payload::UserData(proto::UserPayload{payload}))],
&mut self.serialisation_buf
).expect("size validated in handle at enqueue time");
self.peer_list.broadcast(
&self.serialisation_buf,
&self.socket,
&self.metric_frames_sent,
&self.metric_bytes_sent
).await;
}
}
}
};
}
}
/// Read a gossip frame from the socket and potentially respond.
///
/// This method waits for a frame to be made available by the OS, enumerates
/// the contents, batches any responses to those frames and if non-empty,
/// returns the result to the sender of the original frame.
///
/// Returns the bytes read and bytes sent during execution of this method.
async fn read(&mut self) -> Result<(), Error> {
// Read a frame into buf.
let (bytes_read, frame, peer_addr) = read_frame(&self.socket).await?;
self.metric_frames_received.inc(1);
self.metric_bytes_received.inc(bytes_read as _);
// Read the peer identity from the frame
let identity =
Identity::try_from(frame.identity).map_err(|_| Error::Identity { addr: peer_addr })?;
// Don't process messages from this node.
//
// It's expected that all N servers will be included in a peer list,
// rather than the N-1 peers to this node. By dropping messages from
// this node, pings sent by this node will go unprocessed and therefore
// this node will not be added to the active peer list.
if identity == self.identity {
debug!(%identity, %peer_addr, bytes_read, "dropping frame from self");
return Ok(());
}
// Find or create the peer in the peer list.
let peer = self.peer_list.upsert(&identity, peer_addr);
let mut out_messages = Vec::with_capacity(1);
for msg in frame.messages {
// Extract the payload from the frame message
let payload = msg.payload.ok_or_else(|| Error::NoPayload {
peer: identity.clone(),
addr: peer_addr,
})?;
// Handle the frame message from the peer, optionally returning a
// response frame.
let response = match payload {
Payload::Ping(_) => Some(Payload::Pong(proto::Pong {})),
Payload::Pong(_) => {
debug!(%identity, %peer_addr, "pong");
None
}
Payload::UserData(data) => {
self.dispatch.dispatch(data.payload).await;
None
}
};
if let Some(payload) = response {
out_messages.push(new_payload(payload));
}
}
// Sometimes no message will be returned to the peer - there's no need
// to send an empty frame.
if out_messages.is_empty() {
return Ok(());
}
// Serialise the frame into the serialisation buffer.
populate_frame(
&mut self.cached_frame,
out_messages,
&mut self.serialisation_buf,
)?;
peer.send(
&self.serialisation_buf,
&self.socket,
&self.metric_frames_sent,
&self.metric_bytes_sent,
)
.await?;
Ok(())
}
/// Return the randomised identity assigned to this instance.
pub(crate) fn identity(&self) -> &Identity {
&self.identity
}
}
/// Wait for a UDP datagram to become ready, and read it entirely into `buf`.
async fn recv(socket: &UdpSocket, buf: &mut BytesMut) -> (usize, SocketAddr) {
let (n_bytes, addr) = socket
.recv_buf_from(buf)
.await
// These errors are libc's recvfrom() or converting the kernel-provided
// socket structure to rust's SocketAddr - neither should ever happen.
.expect("invalid recvfrom");
trace!(%addr, n_bytes, "socket read");
(n_bytes, addr)
}
/// Wait for a UDP datagram to arrive, and decode it into a gossip Frame.
///
/// Clears the contents of `buf` before reading the frame.
async fn read_frame(socket: &UdpSocket) -> Result<(usize, proto::Frame, SocketAddr), Error> {
// Pre-allocate a buffer large enough to hold the maximum message size.
//
// Reading data from a UDP socket silently truncates if there's not enough
// buffer space to write the full packet payload (tokio doesn't support
// MSG_TRUNC-like flags on reads).
let mut buf = BytesMut::with_capacity(MAX_FRAME_BYTES);
let (n_bytes, addr) = recv(socket, &mut buf).await;
// Decode the frame, re-using byte arrays from the underlying buffer.
match proto::Frame::decode(buf.freeze()) {
Ok(frame) => {
debug!(?frame, %addr, n_bytes, "read frame");
Ok((n_bytes, frame, addr))
}
Err(e) => Err(Error::Deserialise { addr, source: e }),
}
}
/// Given a pre-allocated `frame`, clear and populate it with the provided
/// `payload` containing a set of [`FrameMessage`], serialising it to `buf`.
fn populate_frame(
frame: &mut proto::Frame,
payload: Vec<FrameMessage>,
buf: &mut Vec<u8>,
) -> Result<(), Error> {
frame.messages = payload;
// Reading data from a UDP socket silently truncates if there's not enough
// buffer space to write the full packet payload. This library will
// pre-allocate a buffer of this size to read packets into, therefore all
// messages must be shorter than this value.
if frame.encoded_len() > MAX_FRAME_BYTES {
error!(
n_bytes=buf.len(),
n_max=%MAX_FRAME_BYTES,
"attempted to send frame larger than configured maximum"
);
return Err(Error::MaxSize(buf.len()));
}
buf.clear();
frame.encode(buf).expect("buffer should grow");
debug_assert!(proto::Frame::decode(crate::Bytes::from(buf.clone())).is_ok());
Ok(())
}
/// Instantiate a new [`FrameMessage`] from the given [`Payload`].
fn new_payload(p: Payload) -> proto::FrameMessage {
proto::FrameMessage { payload: Some(p) }
}
/// Send a PING message to `socket`.
pub(crate) async fn ping(
ping_frame: &[u8],
socket: &UdpSocket,
addr: SocketAddr,
sent_frames: &SentFrames,
sent_bytes: &SentBytes,
) -> usize {
match socket.send_to(ping_frame, &addr).await {
Ok(n_bytes) => {
debug!(addr = %addr, "ping");
sent_frames.inc(1);
sent_bytes.inc(n_bytes);
n_bytes
}
Err(e) => {
warn!(
error=%e,
addr = %addr,
"ping failed"
);
0
}
}
}
#[cfg(test)]
mod tests {
use crate::{MAX_USER_PAYLOAD_BYTES, USER_PAYLOAD_OVERHEAD};
use super::*;
#[test]
fn test_user_frame_overhead() {
let identity = Identity::new();
// Generate a pre-populated frame header.
let mut frame = proto::Frame {
identity: identity.as_bytes().clone(),
messages: vec![],
};
let mut buf = Vec::new();
populate_frame(
&mut frame,
vec![new_payload(Payload::UserData(proto::UserPayload {
payload: crate::Bytes::new(), // Empty/0-sized
}))],
&mut buf,
)
.unwrap();
// The proto type should self-report the same size.
assert_eq!(buf.len(), frame.encoded_len());
// The overhead const should be accurate
assert_eq!(buf.len(), USER_PAYLOAD_OVERHEAD);
// The max user payload size should be accurate.
assert_eq!(MAX_FRAME_BYTES - buf.len(), MAX_USER_PAYLOAD_BYTES);
}
}

97
gossip/src/seed.rs Normal file
View File

@ -0,0 +1,97 @@
use std::{future, net::SocketAddr, sync::Arc};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{
net::{self, UdpSocket},
time::{timeout, MissedTickBehavior},
};
use tracing::{debug, warn};
use crate::{
metric::{SentBytes, SentFrames},
reactor::ping,
RESOLVE_TIMEOUT, SEED_PING_INTERVAL,
};
/// The user-provided seed peer address.
///
/// NOTE: the IP/socket address this resolves to may change over the
/// lifetime of the peer, so the raw address is retained instead of
/// the [`SocketAddr`] to ensure it is constantly re-resolved when the peer
/// is unreachable.
#[derive(Debug)]
pub(crate) struct Seed(String);
impl Seed {
pub(crate) fn new(addr: String) -> Self {
Self(addr)
}
/// Resolve this peer address, returning an error if resolution is not
/// complete within [`RESOLVE_TIMEOUT`].
pub(crate) async fn resolve(&self) -> Option<SocketAddr> {
match timeout(RESOLVE_TIMEOUT, resolve(&self.0)).await {
Ok(v) => v,
Err(_) => {
warn!(addr = %self.0, "timeout resolving seed address");
None
}
}
}
}
/// Resolve `addr`, returning the first IP address, if any.
async fn resolve(addr: &str) -> Option<SocketAddr> {
match net::lookup_host(addr).await.map(|mut v| v.next()) {
Ok(Some(v)) => {
debug!(%addr, peer=%v, "resolved peer address");
Some(v)
}
Ok(None) => {
warn!(%addr, "resolved peer address contains no IPs");
None
}
Err(e) => {
warn!(%addr, error=%e, "failed to resolve peer address");
None
}
}
}
/// Block forever, sending `ping_frame` over `socket` to all the entries in
/// `seeds`.
///
/// This method immediately pings all the seeds, and then pings periodically at
/// [`SEED_PING_INTERVAL`].
pub(super) async fn seed_ping_task(
seeds: Arc<[Seed]>,
socket: Arc<UdpSocket>,
ping_frame: Vec<u8>,
sent_frames: SentFrames,
sent_bytes: SentBytes,
) {
let mut interval = tokio::time::interval(SEED_PING_INTERVAL);
// Do not burden seeds with faster PING frames to catch up this timer.
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Start the ping loop, with the first iteration starting immediately.
loop {
interval.tick().await;
let bytes_sent = seeds
.iter()
.map(|seed| async {
if let Some(addr) = seed.resolve().await {
ping(&ping_frame, &socket, addr, &sent_frames, &sent_bytes).await
} else {
0
}
})
.collect::<FuturesUnordered<_>>()
.fold(0, |acc, x| future::ready(acc + x))
.await;
debug!(bytes_sent, "seed ping sweep complete");
}
}

83
gossip/tests/smoke.rs Normal file
View File

@ -0,0 +1,83 @@
use std::{sync::Arc, time::Duration};
use test_helpers::{maybe_start_logging, timeout::FutureTimeout};
use tokio::{net::UdpSocket, sync::mpsc};
use gossip::*;
/// Assert that starting up a reactor performs the initial peer discovery
/// from a set of seeds, resulting in both peers known of one another.
#[tokio::test]
async fn test_payload_exchange() {
maybe_start_logging();
let metrics = Arc::new(metric::Registry::default());
// How long to wait for peer discovery to complete.
const TIMEOUT: Duration = Duration::from_secs(5);
// Bind a UDP socket to a random port
let a_socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let a_addr = a_socket.local_addr().expect("failed to read local addr");
// And a socket for the second reactor
let b_socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let b_addr = b_socket.local_addr().expect("failed to read local addr");
// Initialise the dispatchers for the reactors
let (a_tx, mut a_rx) = mpsc::channel(5);
let (b_tx, mut b_rx) = mpsc::channel(5);
// Initialise both reactors
let addrs = vec![a_addr.to_string(), b_addr.to_string()];
let a = Builder::new(addrs.clone(), a_tx, Arc::clone(&metrics)).build(a_socket);
let b = Builder::new(addrs, b_tx, Arc::clone(&metrics)).build(b_socket);
// Wait for peer discovery to occur
async {
loop {
if a.get_peers().await.len() == 1 {
break;
}
}
}
.with_timeout_panic(TIMEOUT)
.await;
// Send the payload through peer A
let a_payload = Bytes::from_static(b"bananas");
a.broadcast(a_payload.clone()).await.unwrap();
// Assert it was received by peer B
let got = b_rx
.recv()
.with_timeout_panic(TIMEOUT)
.await
.expect("reactor stopped");
assert_eq!(got, a_payload);
// Do the reverse - send from B to A
let b_payload = Bytes::from_static(b"platanos");
b.broadcast(b_payload.clone()).await.unwrap();
let got = a_rx
.recv()
.with_timeout_panic(TIMEOUT)
.await
.expect("reactor stopped");
assert_eq!(got, b_payload);
// Send another payload through peer A (ensuring scratch buffers are
// correctly wiped, etc)
let a_payload = Bytes::from_static(b"platanos");
a.broadcast(a_payload.clone()).await.unwrap();
let got = b_rx
.recv()
.with_timeout_panic(TIMEOUT)
.await
.expect("reactor stopped");
assert_eq!(got, a_payload);
}