Merge pull request #8565 from influxdata/dom/gossip-parquet-crate

feat(gossip): new parquet file gossip crate
pull/24376/head
Dom 2023-08-25 13:19:19 +01:00 committed by GitHub
commit d029265696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 476 additions and 1 deletions

15
Cargo.lock generated
View File

@ -2060,6 +2060,21 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "gossip_parquet_file"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"generated_types",
"gossip",
"metric",
"observability_deps",
"test_helpers",
"tokio",
"workspace-hack",
]
[[package]]
name = "gossip_schema"
version = "0.1.0"

View File

@ -18,6 +18,7 @@ members = [
"garbage_collector",
"generated_types",
"gossip",
"gossip_parquet_file",
"gossip_schema",
"grpc-binary-logger-proto",
"grpc-binary-logger-test-proto",

View File

@ -106,6 +106,9 @@ pub mod influxdata {
/// New namespace, table, and column additions observed and
/// broadcast by the routers.
SchemaChanges = 1,
/// Parquet file creation notifications.
NewParquetFiles = 2,
}
impl TryFrom<u64> for Topic {
@ -114,6 +117,7 @@ pub mod influxdata {
fn try_from(v: u64) -> Result<Self, Self::Error> {
Ok(match v {
v if v == Self::SchemaChanges as u64 => Self::SchemaChanges,
v if v == Self::NewParquetFiles as u64 => Self::NewParquetFiles,
_ => return Err(format!("unknown topic id {}", v).into()),
})
}
@ -310,7 +314,7 @@ mod tests {
#[test]
fn test_gossip_topics() {
let topics = [Topic::SchemaChanges];
let topics = [Topic::SchemaChanges, Topic::NewParquetFiles];
for topic in topics {
let v = u64::from(topic);
@ -323,6 +327,7 @@ mod tests {
// message).
match topics[0] {
Topic::SchemaChanges => {}
Topic::NewParquetFiles => {}
}
}
}

View File

@ -0,0 +1,24 @@
[package]
name = "gossip_parquet_file"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
bytes = "1.4"
generated_types = { path = "../generated_types" }
gossip = { version = "0.1.0", path = "../gossip" }
observability_deps = { path = "../observability_deps" }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
metric = { path = "../metric" }
test_helpers = { version = "0.1.0", path = "../test_helpers", features = [
"future_timeout",
] }
tokio = { version = "1", features = ["test-util"] }

View File

@ -0,0 +1,225 @@
//! Parquet file creation notifications over [gossip].
//!
//! This sub-system is composed of the following primary components:
//!
//! * [`gossip`] crate: provides the gossip transport, the [`GossipHandle`], and
//! the [`Dispatcher`]. This crate operates on raw bytes.
//!
//! * The outgoing [`ParquetFileTx`]: a parquet file focussed wrapper over the
//! underlying [`GossipHandle`]. This type translates the protobuf
//! [`ParquetFile`] from the application layer into raw serialised bytes,
//! sending them over the underlying [`gossip`] impl.
//!
//! * The incoming [`ParquetFileRx`]: deserialises the incoming bytes from the
//! gossip [`Dispatcher`] into [`ParquetFile`] and passes them off to the
//! [`ParquetFileEventHandler`] implementation for processing.
//!
//! Users of this crate should implement the [`ParquetFileEventHandler`] trait
//! to receive change events, and push events into the [`ParquetFileTx`] to
//! broadcast changes to peers.
//!
//! ```text
//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! │
//! │ Application
//! │
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! │ ▲
//! │ │
//! │ │
//! │ ParquetFile │
//! │ │
//! ▼ │
//! ┌──────────────────────┐ ┌─────────────────────────┐
//! │ ParquetFileTx │ │ ParquetFileRx │
//! └──────────────────────┘ └─────────────────────────┘
//! │ ▲
//! │ │
//! │ Encoded bytes │
//! │ │
//! │ │
//! ┌ Gossip ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─
//! ▼ │ │
//! │ ┌──────────────┐ ┌──────────────────┐
//! │ GossipHandle │ │ Dispatcher │ │
//! │ └──────────────┘ └──────────────────┘
//! │
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ```
//!
//! # Best Effort
//!
//! This underlying gossip subsystem is designed to provide best effort delivery
//! of messages, and therefore best-effort delivery of parquet creation events,
//! without any ordering or delivery guarantees.
//!
//! This crate does NOT provide any eventual consistency guarantees.
//!
//! [`ParquetFileTx`]: tx::ParquetFileTx
//! [`ParquetFileRx`]: rx::ParquetFileRx
//! [`ParquetFileEventHandler`]: rx::ParquetFileEventHandler
//! [`GossipHandle`]: gossip::GossipHandle
//! [`Dispatcher`]: gossip::Dispatcher
//! [`ParquetFile`]: generated_types::influxdata::iox::catalog::v1::ParquetFile
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
clippy::clone_on_ref_ptr,
clippy::dbg_macro,
clippy::explicit_iter_loop,
// See https://github.com/influxdata/influxdb_iox/pull/1671
clippy::future_not_send,
clippy::todo,
clippy::use_self,
missing_copy_implementations,
missing_debug_implementations,
unused_crate_dependencies,
missing_docs
)]
#![allow(clippy::default_constructed_unit_structs)]
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
pub mod rx;
pub mod tx;
#[cfg(test)]
mod tests {
use std::{net::SocketAddr, sync::Arc, time::Duration};
use async_trait::async_trait;
use generated_types::influxdata::iox::catalog::v1::{
partition_identifier, ParquetFile, PartitionIdentifier,
};
use gossip::Builder;
use test_helpers::{maybe_start_logging, timeout::FutureTimeout};
use tokio::{net::UdpSocket, sync::mpsc};
use crate::{
rx::{ParquetFileEventHandler, ParquetFileRx},
tx::ParquetFileTx,
};
/// Bind a UDP socket on a random port and return it alongside the socket
/// address.
async fn random_udp() -> (UdpSocket, SocketAddr) {
// Bind a UDP socket to a random port
let socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("failed to bind UDP socket");
let addr = socket.local_addr().expect("failed to read local addr");
(socket, addr)
}
#[derive(Debug)]
struct Peer {
tx: ParquetFileTx<ParquetFile>,
rx: mpsc::Receiver<ParquetFile>,
}
#[derive(Debug)]
struct MockParquetEventHandler(mpsc::Sender<ParquetFile>);
impl MockParquetEventHandler {
fn new() -> (Self, mpsc::Receiver<ParquetFile>) {
let (tx, rx) = mpsc::channel(10);
(Self(tx), rx)
}
}
#[async_trait]
impl ParquetFileEventHandler for Arc<MockParquetEventHandler> {
async fn handle(&self, event: ParquetFile) {
self.0.send(event).await.unwrap();
}
}
async fn new_node_pair() -> (Peer, Peer) {
let metrics = Arc::new(metric::Registry::default());
let (a_socket, a_addr) = random_udp().await;
let (handler, a_rx) = MockParquetEventHandler::new();
let a_store = Arc::new(handler);
let a_dispatcher = ParquetFileRx::new(Arc::clone(&a_store), 100);
let (b_socket, b_addr) = random_udp().await;
let (handler, b_rx) = MockParquetEventHandler::new();
let b_store = Arc::new(handler);
let b_dispatcher = ParquetFileRx::new(Arc::clone(&b_store), 100);
// Initialise both gossip reactors
let addrs = vec![a_addr.to_string(), b_addr.to_string()];
let a = Builder::new(addrs.clone(), a_dispatcher, Arc::clone(&metrics)).build(a_socket);
let b = Builder::new(addrs, b_dispatcher, Arc::clone(&metrics)).build(b_socket);
// Wait for peer discovery to occur
async {
loop {
if a.get_peers().await.len() == 1 && b.get_peers().await.len() == 1 {
break;
}
}
}
.with_timeout_panic(Duration::from_secs(5))
.await;
let a = Peer {
tx: ParquetFileTx::new(a),
rx: a_rx,
};
let b = Peer {
tx: ParquetFileTx::new(b),
rx: b_rx,
};
(a, b)
}
/// Ensure a ParquetFile can be round-tripped through the gossip layer.
///
/// This acts as an integration test, covering the serialisation of parquet
/// file messages, passing into the gossip layer, topic assignment &
/// decoding, deserialisation of the parquet file message, and handling by
/// the new file event delegate abstraction defined by this crate.
#[tokio::test]
async fn test_round_trip() {
maybe_start_logging();
let (node_a, mut node_b) = new_node_pair().await;
let want = ParquetFile {
id: 42,
namespace_id: 4242,
table_id: 24,
partition_identifier: Some(PartitionIdentifier {
id: Some(partition_identifier::Id::HashId(vec![1, 2, 3, 4])),
}),
object_store_id: "bananas".to_string(),
min_time: 1,
max_time: 100,
to_delete: 0,
file_size_bytes: 424242,
row_count: 4242111,
compaction_level: 4200,
created_at: 12344321,
column_set: vec![1, 2, 3, 4, 5],
max_l0_created_at: 123455555,
};
// Broadcast the event from A
node_a.tx.broadcast(want.clone());
// Receive it from B
let got = node_b
.rx
.recv()
.with_timeout_panic(Duration::from_secs(5))
.await
.unwrap();
// Ensuring the content is identical
assert_eq!(got, want);
}
}

View File

@ -0,0 +1,113 @@
//! A deserialiser and dispatcher of [gossip] messages for the
//! [`Topic::NewParquetFiles`] topic.
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use generated_types::influxdata::iox::{
catalog::v1::ParquetFile,
gossip::{v1::NewParquetFile, Topic},
};
use generated_types::prost::Message;
use observability_deps::tracing::{info, warn};
use tokio::{sync::mpsc, task::JoinHandle};
/// A [`ParquetFile`] notification handler received via gossip.
#[async_trait]
pub trait ParquetFileEventHandler: Send + Sync + Debug {
/// Process `message`.
async fn handle(&self, event: ParquetFile);
}
#[async_trait]
impl<T> ParquetFileEventHandler for Arc<T>
where
T: ParquetFileEventHandler,
{
async fn handle(&self, event: ParquetFile) {
T::handle(self, event).await
}
}
/// An async gossip message dispatcher.
///
/// This type is responsible for deserialising incoming gossip
/// [`Topic::NewParquetFiles`] payloads and passing them off to the provided
/// [`ParquetFileEventHandler`] implementation.
///
/// This decoupling allow the handler to deal strictly in terms of messages,
/// abstracting it from the underlying message transport / format.
///
/// This type also provides a buffer between incoming events, and processing,
/// preventing processing time from blocking the gossip reactor. Once the buffer
/// is full, incoming events are dropped until space is made through processing
/// of outstanding events. Dropping the [`ParquetFileRx`] stops the background
/// event loop.
#[derive(Debug)]
pub struct ParquetFileRx {
tx: mpsc::Sender<Bytes>,
task: JoinHandle<()>,
}
impl ParquetFileRx {
/// Initialise a new dispatcher, buffering up to `buffer` number of events.
///
/// The provided `handler` does not block the gossip reactor during
/// execution.
pub fn new<T>(handler: T, buffer: usize) -> Self
where
T: ParquetFileEventHandler + 'static,
{
// Initialise a buffered channel to decouple the two halves.
let (tx, rx) = mpsc::channel(buffer);
// And run a receiver loop to pull the events from the channel.
let task = tokio::spawn(dispatch_loop(rx, handler));
Self { tx, task }
}
}
#[async_trait]
impl gossip::Dispatcher<Topic> for ParquetFileRx {
async fn dispatch(&self, topic: Topic, payload: Bytes) {
if topic != Topic::NewParquetFiles {
return;
}
if let Err(e) = self.tx.try_send(payload) {
warn!(error=%e, "failed to buffer gossip event");
}
}
}
impl Drop for ParquetFileRx {
fn drop(&mut self) {
self.task.abort();
}
}
async fn dispatch_loop<T>(mut rx: mpsc::Receiver<Bytes>, handler: T)
where
T: ParquetFileEventHandler,
{
while let Some(payload) = rx.recv().await {
// Deserialise the payload into the appropriate proto type.
let event = match NewParquetFile::decode(payload).map(|v| v.file) {
Ok(Some(v)) => v,
Ok(None) => {
warn!("valid frame contains no message");
continue;
}
Err(e) => {
warn!(error=%e, "failed to deserialise gossip message");
continue;
}
};
// Pass this message off to the handler to process.
handler.handle(event).await;
}
info!("stopping gossip dispatcher");
}

View File

@ -0,0 +1,92 @@
//! A serialiser and broadcaster of [`gossip`] messages for the
//! [`Topic::NewParquetFiles`] topic.
use std::fmt::Debug;
use generated_types::{
influxdata::iox::{
catalog::v1::ParquetFile,
gossip::{v1::NewParquetFile, Topic},
},
prost::Message,
};
use observability_deps::tracing::{debug, error, warn};
use tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinHandle,
};
/// A gossip broadcast primitive specialised for parquet file creation
/// notifications.
///
/// This type accepts any type that converts into a [`ParquetFile`] from the
/// application logic, serialises the message (applying any necessary
/// transformations due to the underlying transport limitations) and broadcasts
/// the result to all listening peers.
///
/// Serialisation and processing of the [`ParquetFile`] given to the
/// [`ParquetFileTx::broadcast()`] method happens in a background actor task,
/// decoupling the caller from the latency of processing each frame. Dropping
/// the [`ParquetFileTx`] stops this background actor task.
#[derive(Debug)]
pub struct ParquetFileTx<T> {
tx: mpsc::Sender<T>,
task: JoinHandle<()>,
}
impl<T> Drop for ParquetFileTx<T> {
fn drop(&mut self) {
self.task.abort();
}
}
impl<T> ParquetFileTx<T>
where
T: Into<ParquetFile> + Debug + Send + Sync + 'static,
{
/// Construct a new [`ParquetFileTx`] that publishes gossip messages over
/// `gossip`.
pub fn new(gossip: gossip::GossipHandle<Topic>) -> Self {
let (tx, rx) = mpsc::channel(100);
let task = tokio::spawn(actor_loop(rx, gossip));
Self { tx, task }
}
/// Asynchronously broadcast `file` to all interested peers.
///
/// This method enqueues `file` into the serialisation queue, and processed
/// & transmitted asynchronously.
pub fn broadcast(&self, file: T) {
debug!(?file, "sending new parquet file notification");
match self.tx.try_send(file) {
Ok(_) => {}
Err(TrySendError::Closed(_)) => {
panic!("parquet notification serialisation actor not running")
}
Err(TrySendError::Full(_)) => {
warn!("parquet notification serialisation queue full, dropping message")
}
}
}
}
async fn actor_loop<T>(mut rx: mpsc::Receiver<T>, gossip: gossip::GossipHandle<Topic>)
where
T: Into<ParquetFile> + Send + Sync,
{
while let Some(file) = rx.recv().await {
let file = NewParquetFile {
file: Some(file.into()),
};
if let Err(e) = gossip
.broadcast(file.encode_to_vec(), Topic::NewParquetFiles)
.await
{
error!(error=%e, "failed to broadcast payload");
}
}
debug!("stopping parquet gossip serialisation actor");
}