diff --git a/Cargo.lock b/Cargo.lock index 1987b9fc0b..9a816d9059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,6 +2060,21 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "gossip_parquet_file" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "generated_types", + "gossip", + "metric", + "observability_deps", + "test_helpers", + "tokio", + "workspace-hack", +] + [[package]] name = "gossip_schema" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index aa91492ffe..9716568443 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "garbage_collector", "generated_types", "gossip", + "gossip_parquet_file", "gossip_schema", "grpc-binary-logger-proto", "grpc-binary-logger-test-proto", diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 9ecf355bb2..c6d5357d1f 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -106,6 +106,9 @@ pub mod influxdata { /// New namespace, table, and column additions observed and /// broadcast by the routers. SchemaChanges = 1, + + /// Parquet file creation notifications. + NewParquetFiles = 2, } impl TryFrom for Topic { @@ -114,6 +117,7 @@ pub mod influxdata { fn try_from(v: u64) -> Result { Ok(match v { v if v == Self::SchemaChanges as u64 => Self::SchemaChanges, + v if v == Self::NewParquetFiles as u64 => Self::NewParquetFiles, _ => return Err(format!("unknown topic id {}", v).into()), }) } @@ -310,7 +314,7 @@ mod tests { #[test] fn test_gossip_topics() { - let topics = [Topic::SchemaChanges]; + let topics = [Topic::SchemaChanges, Topic::NewParquetFiles]; for topic in topics { let v = u64::from(topic); @@ -323,6 +327,7 @@ mod tests { // message). match topics[0] { Topic::SchemaChanges => {} + Topic::NewParquetFiles => {} } } } diff --git a/gossip_parquet_file/Cargo.toml b/gossip_parquet_file/Cargo.toml new file mode 100644 index 0000000000..c6a0447632 --- /dev/null +++ b/gossip_parquet_file/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "gossip_parquet_file" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +bytes = "1.4" +generated_types = { path = "../generated_types" } +gossip = { version = "0.1.0", path = "../gossip" } +observability_deps = { path = "../observability_deps" } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } +workspace-hack = { version = "0.1", path = "../workspace-hack" } + +[dev-dependencies] +metric = { path = "../metric" } +test_helpers = { version = "0.1.0", path = "../test_helpers", features = [ + "future_timeout", +] } +tokio = { version = "1", features = ["test-util"] } diff --git a/gossip_parquet_file/src/lib.rs b/gossip_parquet_file/src/lib.rs new file mode 100644 index 0000000000..05557cc238 --- /dev/null +++ b/gossip_parquet_file/src/lib.rs @@ -0,0 +1,225 @@ +//! Parquet file creation notifications over [gossip]. +//! +//! This sub-system is composed of the following primary components: +//! +//! * [`gossip`] crate: provides the gossip transport, the [`GossipHandle`], and +//! the [`Dispatcher`]. This crate operates on raw bytes. +//! +//! * The outgoing [`ParquetFileTx`]: a parquet file focussed wrapper over the +//! underlying [`GossipHandle`]. This type translates the protobuf +//! [`ParquetFile`] from the application layer into raw serialised bytes, +//! sending them over the underlying [`gossip`] impl. +//! +//! * The incoming [`ParquetFileRx`]: deserialises the incoming bytes from the +//! gossip [`Dispatcher`] into [`ParquetFile`] and passes them off to the +//! [`ParquetFileEventHandler`] implementation for processing. +//! +//! Users of this crate should implement the [`ParquetFileEventHandler`] trait +//! to receive change events, and push events into the [`ParquetFileTx`] to +//! broadcast changes to peers. +//! +//! ```text +//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │ +//! │ Application +//! │ +//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │ ▲ +//! │ │ +//! │ │ +//! │ ParquetFile │ +//! │ │ +//! ▼ │ +//! ┌──────────────────────┐ ┌─────────────────────────┐ +//! │ ParquetFileTx │ │ ParquetFileRx │ +//! └──────────────────────┘ └─────────────────────────┘ +//! │ ▲ +//! │ │ +//! │ Encoded bytes │ +//! │ │ +//! │ │ +//! ┌ Gossip ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ +//! ▼ │ │ +//! │ ┌──────────────┐ ┌──────────────────┐ +//! │ GossipHandle │ │ Dispatcher │ │ +//! │ └──────────────┘ └──────────────────┘ +//! │ +//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! ``` +//! +//! # Best Effort +//! +//! This underlying gossip subsystem is designed to provide best effort delivery +//! of messages, and therefore best-effort delivery of parquet creation events, +//! without any ordering or delivery guarantees. +//! +//! This crate does NOT provide any eventual consistency guarantees. +//! +//! [`ParquetFileTx`]: tx::ParquetFileTx +//! [`ParquetFileRx`]: rx::ParquetFileRx +//! [`ParquetFileEventHandler`]: rx::ParquetFileEventHandler +//! [`GossipHandle`]: gossip::GossipHandle +//! [`Dispatcher`]: gossip::Dispatcher +//! [`ParquetFile`]: generated_types::influxdata::iox::catalog::v1::ParquetFile + +#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] +#![warn( + clippy::clone_on_ref_ptr, + clippy::dbg_macro, + clippy::explicit_iter_loop, + // See https://github.com/influxdata/influxdb_iox/pull/1671 + clippy::future_not_send, + clippy::todo, + clippy::use_self, + missing_copy_implementations, + missing_debug_implementations, + unused_crate_dependencies, + missing_docs +)] +#![allow(clippy::default_constructed_unit_structs)] + +// Workaround for "unused crate" lint false positives. +use workspace_hack as _; + +pub mod rx; +pub mod tx; + +#[cfg(test)] +mod tests { + use std::{net::SocketAddr, sync::Arc, time::Duration}; + + use async_trait::async_trait; + use generated_types::influxdata::iox::catalog::v1::{ + partition_identifier, ParquetFile, PartitionIdentifier, + }; + use gossip::Builder; + use test_helpers::{maybe_start_logging, timeout::FutureTimeout}; + use tokio::{net::UdpSocket, sync::mpsc}; + + use crate::{ + rx::{ParquetFileEventHandler, ParquetFileRx}, + tx::ParquetFileTx, + }; + + /// Bind a UDP socket on a random port and return it alongside the socket + /// address. + async fn random_udp() -> (UdpSocket, SocketAddr) { + // Bind a UDP socket to a random port + let socket = UdpSocket::bind("127.0.0.1:0") + .await + .expect("failed to bind UDP socket"); + let addr = socket.local_addr().expect("failed to read local addr"); + + (socket, addr) + } + + #[derive(Debug)] + struct Peer { + tx: ParquetFileTx, + rx: mpsc::Receiver, + } + + #[derive(Debug)] + struct MockParquetEventHandler(mpsc::Sender); + + impl MockParquetEventHandler { + fn new() -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(10); + (Self(tx), rx) + } + } + + #[async_trait] + impl ParquetFileEventHandler for Arc { + async fn handle(&self, event: ParquetFile) { + self.0.send(event).await.unwrap(); + } + } + + async fn new_node_pair() -> (Peer, Peer) { + let metrics = Arc::new(metric::Registry::default()); + + let (a_socket, a_addr) = random_udp().await; + let (handler, a_rx) = MockParquetEventHandler::new(); + let a_store = Arc::new(handler); + let a_dispatcher = ParquetFileRx::new(Arc::clone(&a_store), 100); + + let (b_socket, b_addr) = random_udp().await; + let (handler, b_rx) = MockParquetEventHandler::new(); + let b_store = Arc::new(handler); + let b_dispatcher = ParquetFileRx::new(Arc::clone(&b_store), 100); + + // Initialise both gossip reactors + let addrs = vec![a_addr.to_string(), b_addr.to_string()]; + let a = Builder::new(addrs.clone(), a_dispatcher, Arc::clone(&metrics)).build(a_socket); + let b = Builder::new(addrs, b_dispatcher, Arc::clone(&metrics)).build(b_socket); + + // Wait for peer discovery to occur + async { + loop { + if a.get_peers().await.len() == 1 && b.get_peers().await.len() == 1 { + break; + } + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + let a = Peer { + tx: ParquetFileTx::new(a), + rx: a_rx, + }; + + let b = Peer { + tx: ParquetFileTx::new(b), + rx: b_rx, + }; + + (a, b) + } + + /// Ensure a ParquetFile can be round-tripped through the gossip layer. + /// + /// This acts as an integration test, covering the serialisation of parquet + /// file messages, passing into the gossip layer, topic assignment & + /// decoding, deserialisation of the parquet file message, and handling by + /// the new file event delegate abstraction defined by this crate. + #[tokio::test] + async fn test_round_trip() { + maybe_start_logging(); + + let (node_a, mut node_b) = new_node_pair().await; + let want = ParquetFile { + id: 42, + namespace_id: 4242, + table_id: 24, + partition_identifier: Some(PartitionIdentifier { + id: Some(partition_identifier::Id::HashId(vec![1, 2, 3, 4])), + }), + object_store_id: "bananas".to_string(), + min_time: 1, + max_time: 100, + to_delete: 0, + file_size_bytes: 424242, + row_count: 4242111, + compaction_level: 4200, + created_at: 12344321, + column_set: vec![1, 2, 3, 4, 5], + max_l0_created_at: 123455555, + }; + + // Broadcast the event from A + node_a.tx.broadcast(want.clone()); + + // Receive it from B + let got = node_b + .rx + .recv() + .with_timeout_panic(Duration::from_secs(5)) + .await + .unwrap(); + + // Ensuring the content is identical + assert_eq!(got, want); + } +} diff --git a/gossip_parquet_file/src/rx.rs b/gossip_parquet_file/src/rx.rs new file mode 100644 index 0000000000..30f3846acd --- /dev/null +++ b/gossip_parquet_file/src/rx.rs @@ -0,0 +1,113 @@ +//! A deserialiser and dispatcher of [gossip] messages for the +//! [`Topic::NewParquetFiles`] topic. + +use std::{fmt::Debug, sync::Arc}; + +use async_trait::async_trait; +use bytes::Bytes; +use generated_types::influxdata::iox::{ + catalog::v1::ParquetFile, + gossip::{v1::NewParquetFile, Topic}, +}; +use generated_types::prost::Message; +use observability_deps::tracing::{info, warn}; +use tokio::{sync::mpsc, task::JoinHandle}; + +/// A [`ParquetFile`] notification handler received via gossip. +#[async_trait] +pub trait ParquetFileEventHandler: Send + Sync + Debug { + /// Process `message`. + async fn handle(&self, event: ParquetFile); +} + +#[async_trait] +impl ParquetFileEventHandler for Arc +where + T: ParquetFileEventHandler, +{ + async fn handle(&self, event: ParquetFile) { + T::handle(self, event).await + } +} + +/// An async gossip message dispatcher. +/// +/// This type is responsible for deserialising incoming gossip +/// [`Topic::NewParquetFiles`] payloads and passing them off to the provided +/// [`ParquetFileEventHandler`] implementation. +/// +/// This decoupling allow the handler to deal strictly in terms of messages, +/// abstracting it from the underlying message transport / format. +/// +/// This type also provides a buffer between incoming events, and processing, +/// preventing processing time from blocking the gossip reactor. Once the buffer +/// is full, incoming events are dropped until space is made through processing +/// of outstanding events. Dropping the [`ParquetFileRx`] stops the background +/// event loop. +#[derive(Debug)] +pub struct ParquetFileRx { + tx: mpsc::Sender, + task: JoinHandle<()>, +} + +impl ParquetFileRx { + /// Initialise a new dispatcher, buffering up to `buffer` number of events. + /// + /// The provided `handler` does not block the gossip reactor during + /// execution. + pub fn new(handler: T, buffer: usize) -> Self + where + T: ParquetFileEventHandler + 'static, + { + // Initialise a buffered channel to decouple the two halves. + let (tx, rx) = mpsc::channel(buffer); + + // And run a receiver loop to pull the events from the channel. + let task = tokio::spawn(dispatch_loop(rx, handler)); + + Self { tx, task } + } +} + +#[async_trait] +impl gossip::Dispatcher for ParquetFileRx { + async fn dispatch(&self, topic: Topic, payload: Bytes) { + if topic != Topic::NewParquetFiles { + return; + } + if let Err(e) = self.tx.try_send(payload) { + warn!(error=%e, "failed to buffer gossip event"); + } + } +} + +impl Drop for ParquetFileRx { + fn drop(&mut self) { + self.task.abort(); + } +} + +async fn dispatch_loop(mut rx: mpsc::Receiver, handler: T) +where + T: ParquetFileEventHandler, +{ + while let Some(payload) = rx.recv().await { + // Deserialise the payload into the appropriate proto type. + let event = match NewParquetFile::decode(payload).map(|v| v.file) { + Ok(Some(v)) => v, + Ok(None) => { + warn!("valid frame contains no message"); + continue; + } + Err(e) => { + warn!(error=%e, "failed to deserialise gossip message"); + continue; + } + }; + + // Pass this message off to the handler to process. + handler.handle(event).await; + } + + info!("stopping gossip dispatcher"); +} diff --git a/gossip_parquet_file/src/tx.rs b/gossip_parquet_file/src/tx.rs new file mode 100644 index 0000000000..ea9e9b310f --- /dev/null +++ b/gossip_parquet_file/src/tx.rs @@ -0,0 +1,92 @@ +//! A serialiser and broadcaster of [`gossip`] messages for the +//! [`Topic::NewParquetFiles`] topic. + +use std::fmt::Debug; + +use generated_types::{ + influxdata::iox::{ + catalog::v1::ParquetFile, + gossip::{v1::NewParquetFile, Topic}, + }, + prost::Message, +}; +use observability_deps::tracing::{debug, error, warn}; +use tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinHandle, +}; + +/// A gossip broadcast primitive specialised for parquet file creation +/// notifications. +/// +/// This type accepts any type that converts into a [`ParquetFile`] from the +/// application logic, serialises the message (applying any necessary +/// transformations due to the underlying transport limitations) and broadcasts +/// the result to all listening peers. +/// +/// Serialisation and processing of the [`ParquetFile`] given to the +/// [`ParquetFileTx::broadcast()`] method happens in a background actor task, +/// decoupling the caller from the latency of processing each frame. Dropping +/// the [`ParquetFileTx`] stops this background actor task. +#[derive(Debug)] +pub struct ParquetFileTx { + tx: mpsc::Sender, + task: JoinHandle<()>, +} + +impl Drop for ParquetFileTx { + fn drop(&mut self) { + self.task.abort(); + } +} + +impl ParquetFileTx +where + T: Into + Debug + Send + Sync + 'static, +{ + /// Construct a new [`ParquetFileTx`] that publishes gossip messages over + /// `gossip`. + pub fn new(gossip: gossip::GossipHandle) -> Self { + let (tx, rx) = mpsc::channel(100); + + let task = tokio::spawn(actor_loop(rx, gossip)); + + Self { tx, task } + } + + /// Asynchronously broadcast `file` to all interested peers. + /// + /// This method enqueues `file` into the serialisation queue, and processed + /// & transmitted asynchronously. + pub fn broadcast(&self, file: T) { + debug!(?file, "sending new parquet file notification"); + match self.tx.try_send(file) { + Ok(_) => {} + Err(TrySendError::Closed(_)) => { + panic!("parquet notification serialisation actor not running") + } + Err(TrySendError::Full(_)) => { + warn!("parquet notification serialisation queue full, dropping message") + } + } + } +} + +async fn actor_loop(mut rx: mpsc::Receiver, gossip: gossip::GossipHandle) +where + T: Into + Send + Sync, +{ + while let Some(file) = rx.recv().await { + let file = NewParquetFile { + file: Some(file.into()), + }; + if let Err(e) = gossip + .broadcast(file.encode_to_vec(), Topic::NewParquetFiles) + .await + { + error!(error=%e, "failed to broadcast payload"); + } + } + + debug!("stopping parquet gossip serialisation actor"); +}