feat(gossip): add gossip_compaction crate

Adds a crate that layers compaction-specific gossip types and
abstractions over the underlying gossip transport for a nicer (and
decoupled!) internal API.
pull/24376/head
Dom Dwyer 2023-08-24 12:28:35 +02:00
parent 044d5bfdcf
commit c0b4a10874
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
6 changed files with 495 additions and 0 deletions

15
Cargo.lock generated
View File

@ -2059,6 +2059,21 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "gossip_compaction"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"generated_types",
"gossip",
"metric",
"observability_deps",
"test_helpers",
"tokio",
"workspace-hack",
]
[[package]]
name = "gossip_parquet_file"
version = "0.1.0"

View File

@ -18,6 +18,7 @@ members = [
"garbage_collector",
"generated_types",
"gossip",
"gossip_compaction",
"gossip_parquet_file",
"gossip_schema",
"grpc-binary-logger-proto",

View File

@ -0,0 +1,24 @@
[package]
name = "gossip_compaction"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
bytes = "1.4"
generated_types = { path = "../generated_types" }
gossip = { version = "0.1.0", path = "../gossip" }
observability_deps = { path = "../observability_deps" }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
metric = { path = "../metric" }
test_helpers = { version = "0.1.0", path = "../test_helpers", features = [
"future_timeout",
] }
tokio = { version = "1", features = ["test-util"] }

View File

@ -0,0 +1,260 @@
//! Parquet compaction completion notifications over [gossip].
//!
//! This sub-system is composed of the following primary components:
//!
//! * [`gossip`] crate: provides the gossip transport, the [`GossipHandle`], and
//! the [`Dispatcher`]. This crate operates on raw bytes.
//!
//! * The outgoing [`CompactionEventTx`]: a schema-specific wrapper over the
//! underlying [`GossipHandle`]. This type translates the protobuf
//! [`CompactionEvent`] from the application layer into raw serialised bytes,
//! sending them over the underlying [`gossip`] impl.
//!
//! * The incoming [`CompactionEventRx`]: deserialises the incoming bytes from
//! the gossip [`Dispatcher`] into [`CompactionEvent`] and passes them off to
//! the [`CompactionEventHandler`] implementation for processing.
//!
//! Users of this crate should implement the [`CompactionEventHandler`] trait to
//! receive change events, and push events into the [`CompactionEventTx`] to
//! broadcast changes to peers.
//!
//! ```text
//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! │
//! │ Application
//! │
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! │ ▲
//! │ │
//! │ │
//! │ CompactionEvent │
//! │ │
//! ▼ │
//! ┌──────────────────────┐ ┌─────────────────────────┐
//! │ CompactionEventTx │ │ CompactionEventRx │
//! └──────────────────────┘ └─────────────────────────┘
//! │ ▲
//! │ │
//! │ Encoded bytes │
//! │ │
//! │ │
//! ┌ Gossip ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─
//! ▼ │ │
//! │ ┌──────────────┐ ┌──────────────────┐
//! │ GossipHandle │ │ Dispatcher │ │
//! │ └──────────────┘ └──────────────────┘
//! │
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ```
//!
//! # Best Effort
//!
//! This underlying gossip subsystem is designed to provide best effort delivery
//! of messages, and therefore best-effort delivery of parquet creation events,
//! without any ordering or delivery guarantees.
//!
//! This crate does NOT provide any eventual consistency guarantees.
//!
//! # Message Atomicity & Size Restrictions
//!
//! The underlying gossip protocol has an upper bound on message size
//! ([`MAX_USER_PAYLOAD_BYTES`]) that restricts how large a single message may
//! be.
//!
//! This implementation preserves the atomicity of compaction events, and does
//! not attempt to split overly large messages into multiple smaller messages.
//! This ensures each message defines the exact result of a single compaction
//! round in its entirety, at the cost of discarding messages that exceed the
//! maximum message size.
//!
//! Because the underlying gossip protocol does not provide guaranteed delivery,
//! consumers of these messages SHOULD NOT expect to receive notifications for
//! all events, so discarding large messages does not prevent a correctness
//! problem - it does however reduce the effectiveness of the optimisations
//! these notifications enable, and the frequency of message discards SHOULD be
//! tracked to understand the scope of the problem.
//!
//! [`CompactionEventTx`]: tx::CompactionEventTx
//! [`CompactionEventRx`]: rx::CompactionEventRx
//! [`CompactionEventHandler`]: rx::CompactionEventHandler
//! [`GossipHandle`]: gossip::GossipHandle
//! [`Dispatcher`]: gossip::Dispatcher
//! [`MAX_USER_PAYLOAD_BYTES`]: gossip::MAX_USER_PAYLOAD_BYTES
//! [`CompactionEvent`]:
//! generated_types::influxdata::iox::gossip::v1::CompactionEvent
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
clippy::clone_on_ref_ptr,
clippy::dbg_macro,
clippy::explicit_iter_loop,
// See https://github.com/influxdata/influxdb_iox/pull/1671
clippy::future_not_send,
clippy::todo,
clippy::use_self,
missing_copy_implementations,
missing_debug_implementations,
unused_crate_dependencies,
missing_docs
)]
#![allow(clippy::default_constructed_unit_structs)]
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
pub mod rx;
pub mod tx;
#[cfg(test)]
mod tests {
use std::{net::SocketAddr, sync::Arc, time::Duration};
use async_trait::async_trait;
use generated_types::influxdata::iox::{
catalog::v1::{partition_identifier, ParquetFile, PartitionIdentifier},
gossip::v1::CompactionEvent,
};
use gossip::Builder;
use test_helpers::{maybe_start_logging, timeout::FutureTimeout};
use tokio::{net::UdpSocket, sync::mpsc};
use crate::{
rx::{CompactionEventHandler, CompactionEventRx},
tx::CompactionEventTx,
};
/// Bind a UDP socket on a random port and return it alongside the socket
/// address.
async fn random_udp() -> (UdpSocket, SocketAddr) {
// Bind a UDP socket to a random port
let socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let addr = socket.local_addr().expect("failed to read local addr");
(socket, addr)
}
#[derive(Debug)]
struct Peer {
tx: CompactionEventTx<CompactionEvent>,
rx: mpsc::Receiver<CompactionEvent>,
}
#[derive(Debug)]
struct MockEventHandler(mpsc::Sender<CompactionEvent>);
impl MockEventHandler {
fn new() -> (Self, mpsc::Receiver<CompactionEvent>) {
let (tx, rx) = mpsc::channel(10);
(Self(tx), rx)
}
}
#[async_trait]
impl CompactionEventHandler for Arc<MockEventHandler> {
async fn handle(&self, event: CompactionEvent) {
self.0.send(event).await.unwrap();
}
}
async fn new_node_pair() -> (Peer, Peer) {
let metrics = Arc::new(metric::Registry::default());
let (a_socket, a_addr) = random_udp().await;
let (handler, a_rx) = MockEventHandler::new();
let a_store = Arc::new(handler);
let a_dispatcher = CompactionEventRx::new(Arc::clone(&a_store), 100);
let (b_socket, b_addr) = random_udp().await;
let (handler, b_rx) = MockEventHandler::new();
let b_store = Arc::new(handler);
let b_dispatcher = CompactionEventRx::new(Arc::clone(&b_store), 100);
// Initialise both gossip reactors
let addrs = vec![a_addr.to_string(), b_addr.to_string()];
let a = Builder::new(addrs.clone(), a_dispatcher, Arc::clone(&metrics)).build(a_socket);
let b = Builder::new(addrs, b_dispatcher, Arc::clone(&metrics)).build(b_socket);
// Wait for peer discovery to occur
async {
loop {
if a.get_peers().await.len() == 1 && b.get_peers().await.len() == 1 {
break;
}
}
}
.with_timeout_panic(Duration::from_secs(5))
.await;
let a = Peer {
tx: CompactionEventTx::new(a),
rx: a_rx,
};
let b = Peer {
tx: CompactionEventTx::new(b),
rx: b_rx,
};
(a, b)
}
/// Ensure a ParquetFile can be round-tripped through the gossip layer.
///
/// This acts as an integration test, covering the serialisation of parquet
/// file messages, passing into the gossip layer, topic assignment &
/// decoding, deserialisation of the parquet file message, and handling by
/// the new file event delegate abstraction defined by this crate.
#[tokio::test]
async fn test_round_trip() {
maybe_start_logging();
let (node_a, mut node_b) = new_node_pair().await;
let new_file_a = ParquetFile {
id: 42,
namespace_id: 4242,
table_id: 24,
partition_identifier: Some(PartitionIdentifier {
id: Some(partition_identifier::Id::HashId(vec![1, 2, 3, 4])),
}),
object_store_id: "bananas".to_string(),
min_time: 1,
max_time: 100,
to_delete: Some(0),
file_size_bytes: 424242,
row_count: 4242111,
compaction_level: 4200,
created_at: 12344321,
column_set: vec![1, 2, 3, 4, 5],
max_l0_created_at: 123455555,
};
let new_file_b = ParquetFile {
id: 13,
..new_file_a.clone()
};
let want = CompactionEvent {
deleted_file_ids: vec![1, 2, 3, 4],
updated_file_ids: vec![4, 3, 2, 1],
upgraded_target_level: 42,
new_files: vec![new_file_a, new_file_b],
};
// Broadcast the event from A
node_a.tx.broadcast(want.clone());
// Receive it from B
let got = node_b
.rx
.recv()
.with_timeout_panic(Duration::from_secs(5))
.await
.unwrap();
// Ensuring the content is identical
assert_eq!(got, want);
}
}

107
gossip_compaction/src/rx.rs Normal file
View File

@ -0,0 +1,107 @@
//! A deserialiser and dispatcher of [gossip] messages for the
//! [`Topic::CompactionEvents`] topic.
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use generated_types::influxdata::iox::gossip::{v1::CompactionEvent, Topic};
use generated_types::prost::Message;
use gossip::Identity;
use observability_deps::tracing::{info, warn};
use tokio::{sync::mpsc, task::JoinHandle};
/// A [`CompactionEvent`] notification handler received via gossip.
#[async_trait]
pub trait CompactionEventHandler: Send + Sync + Debug {
/// Process `message`.
async fn handle(&self, event: CompactionEvent);
}
#[async_trait]
impl<T> CompactionEventHandler for Arc<T>
where
T: CompactionEventHandler,
{
async fn handle(&self, event: CompactionEvent) {
T::handle(self, event).await
}
}
/// An async gossip message dispatcher.
///
/// This type is responsible for deserialising incoming gossip
/// [`Topic::NewParquetFiles`] payloads and passing them off to the provided
/// [`CompactionEventHandler`] implementation.
///
/// This decoupling allow the handler to deal strictly in terms of messages,
/// abstracting it from the underlying message transport / format.
///
/// This type also provides a buffer between incoming events, and processing,
/// preventing processing time from blocking the gossip reactor. Once the buffer
/// is full, incoming events are dropped until space is made through processing
/// of outstanding events. Dropping the [`CompactionEventRx`] stops the background
/// event loop.
#[derive(Debug)]
pub struct CompactionEventRx {
tx: mpsc::Sender<Bytes>,
task: JoinHandle<()>,
}
impl CompactionEventRx {
/// Initialise a new dispatcher, buffering up to `buffer` number of events.
///
/// The provided `handler` does not block the gossip reactor during
/// execution.
pub fn new<T>(handler: T, buffer: usize) -> Self
where
T: CompactionEventHandler + 'static,
{
// Initialise a buffered channel to decouple the two halves.
let (tx, rx) = mpsc::channel(buffer);
// And run a receiver loop to pull the events from the channel.
let task = tokio::spawn(dispatch_loop(rx, handler));
Self { tx, task }
}
}
#[async_trait]
impl gossip::Dispatcher<Topic> for CompactionEventRx {
async fn dispatch(&self, topic: Topic, payload: Bytes, _identity: Identity) {
if topic != Topic::CompactionEvents {
return;
}
if let Err(e) = self.tx.try_send(payload) {
warn!(error=%e, "failed to buffer gossip event");
}
}
}
impl Drop for CompactionEventRx {
fn drop(&mut self) {
self.task.abort();
}
}
async fn dispatch_loop<T>(mut rx: mpsc::Receiver<Bytes>, handler: T)
where
T: CompactionEventHandler,
{
while let Some(payload) = rx.recv().await {
// Deserialise the payload into the appropriate proto type.
let event = match CompactionEvent::decode(payload) {
Ok(v) => v,
Err(e) => {
warn!(error=%e, "failed to deserialise gossip message");
continue;
}
};
// Pass this message off to the handler to process.
handler.handle(event).await;
}
info!("stopping gossip dispatcher");
}

View File

@ -0,0 +1,88 @@
//! A serialiser and broadcaster of [`gossip`] messages for the
//! [`Topic::CompactionEvents`] topic.
use std::fmt::Debug;
use generated_types::{
influxdata::iox::gossip::{v1::CompactionEvent, Topic},
prost::Message,
};
use observability_deps::tracing::{debug, error, warn};
use tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinHandle,
};
/// A gossip broadcast primitive specialised for compaction completion
/// notifications.
///
/// This type accepts any type that converts into a [`CompactionEvent`] from the
/// application logic, serialises the message (applying any necessary
/// transformations due to the underlying transport limitations) and broadcasts
/// the result to all listening peers.
///
/// Serialisation and processing of the [`CompactionEvent`] given to the
/// [`CompactionEventTx::broadcast()`] method happens in a background actor task,
/// decoupling the caller from the latency of processing each frame. Dropping
/// the [`CompactionEventTx`] stops this background actor task.
#[derive(Debug)]
pub struct CompactionEventTx<T> {
tx: mpsc::Sender<T>,
task: JoinHandle<()>,
}
impl<T> Drop for CompactionEventTx<T> {
fn drop(&mut self) {
self.task.abort();
}
}
impl<T> CompactionEventTx<T>
where
T: Into<CompactionEvent> + Debug + Send + Sync + 'static,
{
/// Construct a new [`CompactionEventTx`] that publishes gossip messages over
/// `gossip`.
pub fn new(gossip: gossip::GossipHandle<Topic>) -> Self {
let (tx, rx) = mpsc::channel(100);
let task = tokio::spawn(actor_loop(rx, gossip));
Self { tx, task }
}
/// Asynchronously broadcast `file` to all interested peers.
///
/// This method enqueues `file` into the serialisation queue, and processed
/// & transmitted asynchronously.
pub fn broadcast(&self, file: T) {
debug!(?file, "sending new compaction notification");
match self.tx.try_send(file) {
Ok(_) => {}
Err(TrySendError::Closed(_)) => {
panic!("compaction notification serialisation actor not running")
}
Err(TrySendError::Full(_)) => {
warn!("compaction notification serialisation queue full, dropping message")
}
}
}
}
async fn actor_loop<T>(mut rx: mpsc::Receiver<T>, gossip: gossip::GossipHandle<Topic>)
where
T: Into<CompactionEvent> + Send + Sync,
{
while let Some(file) = rx.recv().await {
let file: CompactionEvent = file.into();
if let Err(e) = gossip
.broadcast(file.encode_to_vec(), Topic::CompactionEvents)
.await
{
error!(error=%e, "failed to broadcast payload");
}
}
debug!("stopping compaction gossip serialisation actor");
}