refactor(router): use gossip_schema

Replace the bespoke schema gossip logic in the router with the reusable
gossip_schema crate.
pull/24376/head
Dom Dwyer 2023-08-17 17:37:39 +02:00
parent 36503b04cf
commit ee063057b3
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
10 changed files with 99 additions and 591 deletions

3
Cargo.lock generated
View File

@ -3141,6 +3141,7 @@ dependencies = [
"clap_blocks",
"data_types",
"gossip",
"gossip_schema",
"hashbrown 0.14.0",
"hyper",
"iox_catalog",
@ -4766,7 +4767,7 @@ dependencies = [
"flate2",
"futures",
"generated_types",
"gossip",
"gossip_schema",
"hashbrown 0.14.0",
"hyper",
"influxdb-line-protocol",

View File

@ -11,6 +11,7 @@ authz = { path = "../authz" }
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
gossip = { version = "0.1.0", path = "../gossip" }
gossip_schema = { version = "0.1.0", path = "../gossip_schema" }
hashbrown = { workspace = true }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
@ -25,4 +26,12 @@ trace = { path = "../trace" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tokio = { version = "1.32", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio = { version = "1.32", features = [
"macros",
"net",
"parking_lot",
"rt-multi-thread",
"signal",
"sync",
"time",
] }

View File

@ -13,6 +13,7 @@
#![allow(clippy::default_constructed_unit_structs)]
use gossip::TopicInterests;
use gossip_schema::{dispatcher::SchemaRx, handle::SchemaTx};
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
@ -53,8 +54,7 @@ use router::{
InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator,
},
gossip::{
dispatcher::GossipMessageDispatcher, namespace_cache::NamespaceSchemaGossip,
schema_change_observer::SchemaChangeObserver,
namespace_cache::NamespaceSchemaGossip, schema_change_observer::SchemaChangeObserver,
},
namespace_cache::{
metrics::InstrumentedCache, MaybeLayer, MemoryNamespaceCache, NamespaceCache,
@ -316,7 +316,7 @@ pub async fn create_router_server_type(
// incoming gossip schema diffs.
let gossip_reader = Arc::new(NamespaceSchemaGossip::new(Arc::clone(&ns_cache)));
// Adapt it to the gossip subsystem via the "Dispatcher" trait
let dispatcher = GossipMessageDispatcher::new(Arc::clone(&gossip_reader), 100);
let dispatcher = SchemaRx::new(Arc::clone(&gossip_reader), 100);
// Initialise the gossip subsystem, delegating message processing to
// the above dispatcher.
@ -335,7 +335,7 @@ pub async fn create_router_server_type(
// local changes made to the cache content.
//
// This sits above / wraps the NamespaceSchemaGossip layer.
let ns_cache = SchemaChangeObserver::new(ns_cache, Arc::new(handle));
let ns_cache = SchemaChangeObserver::new(ns_cache, SchemaTx::new(handle));
MaybeLayer::With(ns_cache)
}

View File

@ -15,7 +15,7 @@ dml = { path = "../dml" }
flate2 = "1.0"
futures = "0.3.28"
generated_types = { path = "../generated_types" }
gossip = { version = "0.1.0", path = "../gossip" }
gossip_schema = { version = "0.1.0", path = "../gossip_schema" }
hashbrown = { workspace = true }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
@ -29,8 +29,8 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
serde = "1.0"
serde_urlencoded = "0.7"
service_grpc_catalog = { path = "../service_grpc_catalog"}
service_grpc_namespace = { path = "../service_grpc_namespace"}
service_grpc_catalog = { path = "../service_grpc_catalog" }
service_grpc_namespace = { path = "../service_grpc_namespace" }
service_grpc_object_store = { path = "../service_grpc_object_store" }
service_grpc_schema = { path = "../service_grpc_schema" }
service_grpc_table = { path = "../service_grpc_table" }
@ -47,8 +47,11 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1.5"
base64 = "0.21.2"
chrono = {version = "0.4.26", default-features = false }
criterion = { version = "0.5", default-features = false, features = ["async_tokio", "rayon"]}
chrono = { version = "0.4.26", default-features = false }
criterion = { version = "0.5", default-features = false, features = [
"async_tokio",
"rayon",
] }
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
iox_tests = { path = "../iox_tests" }
once_cell = "1"
@ -57,7 +60,9 @@ pretty_assertions = "1.4.0"
proptest = { version = "1.2.0", default-features = false }
rand = "0.8.3"
schema = { version = "0.1.0", path = "../schema" }
test_helpers = { version = "0.1.0", path = "../test_helpers", features = ["future_timeout"] }
test_helpers = { version = "0.1.0", path = "../test_helpers", features = [
"future_timeout",
] }
tokio = { version = "1", features = ["test-util"] }
tokio-stream = { version = "0.1.13", default_features = false, features = [] }

View File

@ -1,99 +0,0 @@
//! A deserialiser and dispatcher of [`gossip`] messages.
use std::fmt::Debug;
use async_trait::async_trait;
use bytes::Bytes;
use generated_types::influxdata::iox::gossip::{
v1::{schema_message::Event, SchemaMessage},
Topic,
};
use generated_types::prost::Message;
use observability_deps::tracing::{info, warn};
use tokio::{sync::mpsc, task::JoinHandle};
/// A handler of [`Event`] received via gossip.
#[async_trait]
pub trait GossipMessageHandler: Send + Sync + Debug {
/// Process `message`.
async fn handle(&self, message: Event);
}
/// An async gossip message dispatcher.
///
/// This type is responsible for deserialising incoming gossip payloads and
/// passing them off to the provided [`GossipMessageHandler`] implementation.
/// This decoupling allow the handler to deal strictly in terms of messages,
/// abstracting it from the underlying message transport / format.
///
/// This type provides a buffer between incoming events, and processing,
/// preventing processing time from blocking the gossip reactor. Once the buffer
/// is full, incoming events are dropped until space is made through processing
/// of outstanding events.
#[derive(Debug)]
pub struct GossipMessageDispatcher {
tx: mpsc::Sender<Bytes>,
task: JoinHandle<()>,
}
impl GossipMessageDispatcher {
/// Initialise a new dispatcher, buffering up to `buffer` number of events.
///
/// The provided `handler` does not block the gossip reactor during
/// execution.
pub fn new<T>(handler: T, buffer: usize) -> Self
where
T: GossipMessageHandler + 'static,
{
// Initialise a buffered channel to decouple the two halves.
let (tx, rx) = mpsc::channel(buffer);
// And run a receiver loop to pull the events from the channel.
let task = tokio::spawn(dispatch_loop(rx, handler));
Self { tx, task }
}
}
#[async_trait]
impl gossip::Dispatcher<Topic> for GossipMessageDispatcher {
async fn dispatch(&self, topic: Topic, payload: Bytes) {
if topic != Topic::SchemaChanges {
return;
}
if let Err(e) = self.tx.try_send(payload) {
warn!(error=%e, "failed to buffer gossip event");
}
}
}
impl Drop for GossipMessageDispatcher {
fn drop(&mut self) {
self.task.abort();
}
}
async fn dispatch_loop<T>(mut rx: mpsc::Receiver<Bytes>, handler: T)
where
T: GossipMessageHandler,
{
while let Some(payload) = rx.recv().await {
// Deserialise the payload into the appropriate proto type.
let event = match SchemaMessage::decode(payload).map(|v| v.event) {
Ok(Some(v)) => v,
Ok(None) => {
warn!("valid frame contains no message");
continue;
}
Err(e) => {
warn!(error=%e, "failed to deserialise gossip message");
continue;
}
};
// Pass this message off to the handler to process.
handler.handle(event).await;
}
info!("stopping gossip dispatcher");
}

View File

@ -1,10 +1,6 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use generated_types::{
influxdata::iox::gossip::v1::{schema_message::Event, SchemaMessage},
prost::Message,
};
use generated_types::influxdata::iox::gossip::v1::schema_message::Event;
use parking_lot::Mutex;
use test_helpers::timeout::FutureTimeout;
@ -12,33 +8,19 @@ use super::traits::SchemaBroadcast;
#[derive(Debug, Default)]
pub struct MockSchemaBroadcast {
payloads: Mutex<Vec<Vec<u8>>>,
payloads: Mutex<Vec<Event>>,
}
#[async_trait]
impl SchemaBroadcast for Arc<MockSchemaBroadcast> {
async fn broadcast(&self, payload: Vec<u8>) {
fn broadcast(&self, payload: Event) {
self.payloads.lock().push(payload);
}
}
impl MockSchemaBroadcast {
/// Return the raw, serialised payloads.
pub fn raw_payloads(&self) -> Vec<Vec<u8>> {
self.payloads.lock().clone()
}
/// Return the deserialised [`Event`].
/// Return the broadcast [`Event`].
pub fn messages(&self) -> Vec<Event> {
self.payloads
.lock()
.iter()
.map(|v| {
SchemaMessage::decode(v.as_slice())
.map(|v| v.event.expect("no message in payload"))
.expect("invalid gossip payload")
})
.collect()
self.payloads.lock().clone()
}
pub async fn wait_for_messages(&self, count: usize) {

View File

@ -1,21 +1,17 @@
//! Gossip event dispatcher & handler implementations for routers.
//! Gossip schema event dispatcher & handler implementations for routers.
//!
//! This sub-system is composed of the following primary components:
//!
//! * [`gossip`] crate: provides the gossip transport, the [`GossipHandle`], and
//! the [`Dispatcher`]. This crate operates on raw bytes.
//! * [`gossip_schema`] crate: provides the schema-specific encoding and
//! transmission over gossip transport.
//!
//! * The outgoing [`SchemaChangeObserver`]: a router-specific wrapper over the
//! underlying [`GossipHandle`]. This type translates the application calls
//! into protobuf [`Event`], and serialises them into bytes, sending them over
//! the underlying [`gossip`] impl.
//! * The outgoing [`SchemaChangeObserver`]: a [`NamespaceCache`] decorator,
//! calculating diff events for cache changes observed and passing them to the
//! the [`SchemaTx`] for serialisation and dispatch.
//!
//! * The incoming [`GossipMessageDispatcher`]: deserialises the incoming bytes
//! from the gossip [`Dispatcher`] into [`Event`] and passes them off to the
//! [`NamespaceSchemaGossip`] implementation for processing.
//!
//! * The incoming [`NamespaceSchemaGossip`]: processes [`Event`] received from
//! peers, applying them to the local cache state if necessary.
//! * The incoming [`NamespaceSchemaGossip`]: processes gossip [`Event`]
//! received from peers, idempotently applying them to the local cache state
//! if necessary.
//!
//! ```text
//! ┌────────────────────────────────────────────────────┐
@ -23,42 +19,34 @@
//! └────────────────────────────────────────────────────┘
//! │ ▲
//! │ │
//! diff diff
//! │ │
//! │ ┌─────────────────────────┐
//! │ │ NamespaceSchemaGossip │
//! │ └─────────────────────────┘
//! │ ▲
//! │ │
//! │ Application types │
//! diff diff
//! │ │
//! ▼ │
//! ┌──────────────────────┐ ┌─────────────────────────┐
//! │ SchemaChangeObserver │ │ GossipMessageDispatcher
//! │ SchemaChangeObserver │ │ NamespaceSchemaGossip │
//! └──────────────────────┘ └─────────────────────────┘
//! │ ▲
//! │ │
//! │ Encoded Protobuf bytes
//! │ Schema Event
//! │ │
//! │ │
//! ┌ Gossip ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─
//! ┌ gossip_schema ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─
//! ▼ │ │
//! │ ┌──────────────┐ ──────────────────┐
//! │ GossipHandle │ │ Dispatcher │
//! │ └──────────────┘ ──────────────────┘
//! │ ┌──────────────┐ ┌──────────────┐
//! │ SchemaTx │ │ SchemaRx │
//! │ └──────────────┘ └──────────────┘
//! │
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ```
//!
//! [`GossipHandle`]: gossip::GossipHandle
//! [`Dispatcher`]: gossip::Dispatcher
//! [`SchemaChangeObserver`]: schema_change_observer::SchemaChangeObserver
//! [`Event`]:
//! generated_types::influxdata::iox::gossip::v1::schema_message::Event
//! [`GossipMessageDispatcher`]: dispatcher::GossipMessageDispatcher
//! [`NamespaceSchemaGossip`]: namespace_cache::NamespaceSchemaGossip
//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache
//! [`SchemaTx`]: gossip_schema::handle::SchemaTx
pub mod dispatcher;
pub mod namespace_cache;
pub mod schema_change_observer;
pub mod traits;
@ -70,7 +58,6 @@ mod mock_schema_broadcast;
mod tests {
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use data_types::{
partition_template::{
test_table_partition_override, NamespacePartitionTemplateOverride,
@ -79,35 +66,35 @@ mod tests {
Column, ColumnId, ColumnsByName, NamespaceId, NamespaceName, NamespaceSchema, TableId,
TableSchema,
};
use generated_types::influxdata::iox::gossip::Topic;
use gossip::Dispatcher;
use generated_types::influxdata::iox::gossip::v1::schema_message::Event;
use gossip_schema::dispatcher::SchemaEventHandler;
use test_helpers::timeout::FutureTimeout;
use crate::namespace_cache::{MemoryNamespaceCache, NamespaceCache};
use crate::namespace_cache::{CacheMissErr, MemoryNamespaceCache, NamespaceCache};
use super::{
dispatcher::GossipMessageDispatcher, namespace_cache::NamespaceSchemaGossip,
schema_change_observer::SchemaChangeObserver, traits::SchemaBroadcast,
namespace_cache::NamespaceSchemaGossip, schema_change_observer::SchemaChangeObserver,
traits::SchemaBroadcast,
};
#[derive(Debug)]
struct GossipPipe {
dispatcher: GossipMessageDispatcher,
struct GossipPipe<C> {
rx: Arc<NamespaceSchemaGossip<C>>,
}
impl GossipPipe {
fn new(dispatcher: GossipMessageDispatcher) -> Self {
Self { dispatcher }
impl<C> GossipPipe<C> {
fn new(rx: Arc<NamespaceSchemaGossip<C>>) -> Self {
Self { rx }
}
}
#[async_trait]
impl SchemaBroadcast for Arc<GossipPipe> {
async fn broadcast(&self, payload: Vec<u8>) {
self.dispatcher
.dispatch(Topic::SchemaChanges, payload.into())
.with_timeout_panic(Duration::from_secs(5))
.await;
impl<C> SchemaBroadcast for Arc<GossipPipe<C>>
where
C: NamespaceCache<ReadError = CacheMissErr> + 'static,
{
fn broadcast(&self, payload: Event) {
let this = Arc::clone(self);
tokio::spawn(async move { this.rx.handle(payload).await });
}
}
@ -117,13 +104,11 @@ mod tests {
// Setup a cache for node A and wrap it in the gossip layer.
let node_a_cache = Arc::new(MemoryNamespaceCache::default());
let dispatcher_a = Arc::new(NamespaceSchemaGossip::new(Arc::clone(&node_a_cache)));
let dispatcher_a = GossipMessageDispatcher::new(dispatcher_a, 100);
let gossip_a = Arc::new(GossipPipe::new(dispatcher_a));
// Setup a cache for node B.
let node_b_cache = Arc::new(MemoryNamespaceCache::default());
let dispatcher_b = Arc::new(NamespaceSchemaGossip::new(Arc::clone(&node_b_cache)));
let dispatcher_b = GossipMessageDispatcher::new(dispatcher_b, 100);
let gossip_b = Arc::new(GossipPipe::new(dispatcher_b));
// Connect the two nodes via adaptors that will plug one "node" into the

View File

@ -1,6 +1,6 @@
//! [`NamespaceCache`] decorator to gossip schema changes.
use std::{borrow::Cow, collections::BTreeMap, fmt::Debug, sync::Arc};
use std::{borrow::Cow, collections::BTreeMap, fmt::Debug};
use async_trait::async_trait;
use data_types::{
@ -11,13 +11,12 @@ use data_types::{
use generated_types::influxdata::iox::gossip::v1::{
schema_message::Event, NamespaceCreated, TableCreated, TableUpdated,
};
use gossip_schema::dispatcher::SchemaEventHandler;
use observability_deps::tracing::{debug, error, trace, warn};
use thiserror::Error;
use crate::namespace_cache::{CacheMissErr, NamespaceCache};
use super::dispatcher::GossipMessageHandler;
/// Errors caused by incoming schema gossip messages from cluster peers.
#[derive(Debug, Error)]
enum Error {
@ -46,7 +45,7 @@ enum Error {
}
/// A [`NamespaceCache`] decorator applying incoming schema change notifications
/// via the [`gossip`] subsystem.
/// via the gossip subsystem.
///
/// Any schema additions received from peers are applied to the decorated
/// [`NamespaceCache`], helping to keep the peers approximately in-sync on a
@ -60,8 +59,8 @@ enum Error {
/// that would cause a cache miss resulting in a catalog query, and the
/// associated latency penalty and catalog load that comes with it.
///
/// This type implements the [`GossipMessageHandler`] which is invoked with the
/// [`Event`] received from an opaque peer by the [`gossip`] subsystem (off of the
/// This type implements the [`SchemaEventHandler`] which is invoked with the
/// [`Event`] received from an opaque peer by the gossip subsystem (off of the
/// hot path), which when processed causes the cache contents to be updated if
/// appropriate through the usual [`NamespaceCache::get_schema()`] and
/// [`NamespaceCache::put_schema()`] abstraction.
@ -90,7 +89,7 @@ pub struct NamespaceSchemaGossip<C> {
/// Merges the content of each event with the existing [`NamespaceCache`]
/// contents, if any.
#[async_trait]
impl<C> GossipMessageHandler for Arc<NamespaceSchemaGossip<C>>
impl<C> SchemaEventHandler for NamespaceSchemaGossip<C>
where
C: NamespaceCache<ReadError = CacheMissErr>,
{

View File

@ -4,17 +4,8 @@ use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema};
use generated_types::{
influxdata::iox::gossip::v1::{
schema_message::Event, Column, NamespaceCreated, SchemaMessage, TableCreated, TableUpdated,
},
prost::Message,
};
use gossip::MAX_USER_PAYLOAD_BYTES;
use observability_deps::tracing::{debug, error, warn};
use tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinHandle,
use generated_types::influxdata::iox::gossip::v1::{
schema_message::Event, Column, NamespaceCreated, TableCreated, TableUpdated,
};
use crate::namespace_cache::{ChangeStats, NamespaceCache};
@ -22,7 +13,7 @@ use crate::namespace_cache::{ChangeStats, NamespaceCache};
use super::traits::SchemaBroadcast;
/// A [`NamespaceCache`] decorator implementing cluster-wide, best-effort
/// propagation of local schema changes via the [`gossip`] subsystem.
/// propagation of local schema changes via the gossip subsystem.
///
/// This type sits in the call chain of schema cache operations and gossips any
/// observed diffs applied to the local cache.
@ -36,31 +27,25 @@ use super::traits::SchemaBroadcast;
/// the [`ChangeStats`] are transmitted on a best-effort basis.
///
/// Gossip [`Event`] are populated within the call to
/// [`NamespaceCache::put_schema()`] but packed & serialised into [`gossip`]
/// frames off-path in a background task to minimise the latency overhead.
/// [`NamespaceCache::put_schema()`] but packed & serialised into gossip frames
/// off-path in a background task to minimise the latency overhead.
///
/// Processing of the diffs happens partially on the cache request path, and
/// partially off of it - the actual message content is generated on the cache
/// request path, but framing, serialisation and dispatch happen asynchronously.
#[derive(Debug)]
pub struct SchemaChangeObserver<T> {
tx: mpsc::Sender<Event>,
task: JoinHandle<()>,
pub struct SchemaChangeObserver<T, U> {
tx: U,
inner: T,
}
impl<T> Drop for SchemaChangeObserver<T> {
fn drop(&mut self) {
self.task.abort();
}
}
/// A pass-through [`NamespaceCache`] implementation that gossips new schema
/// additions.
#[async_trait]
impl<T> NamespaceCache for SchemaChangeObserver<T>
impl<T, U> NamespaceCache for SchemaChangeObserver<T, U>
where
T: NamespaceCache,
U: SchemaBroadcast,
{
type ReadError = T::ReadError;
@ -90,18 +75,14 @@ where
}
}
impl<T> SchemaChangeObserver<T> {
impl<T, U> SchemaChangeObserver<T, U>
where
U: SchemaBroadcast,
{
/// Construct a new [`SchemaChangeObserver`] that publishes gossip messages
/// over `gossip`, and delegates cache operations to `inner`.
pub fn new<U>(inner: T, gossip: U) -> Self
where
U: SchemaBroadcast + 'static,
{
let (tx, rx) = mpsc::channel(100);
let task = tokio::spawn(actor_loop(rx, gossip));
Self { tx, task, inner }
pub fn new(inner: T, gossip: U) -> Self {
Self { tx: gossip, inner }
}
/// Derive a set of gossip event messages from `c`, and dispatch them to
@ -144,7 +125,7 @@ impl<T> SchemaChangeObserver<T> {
retention_period_ns: schema.retention_period_ns,
};
self.enqueue(Event::NamespaceCreated(msg));
self.tx.broadcast(Event::NamespaceCreated(msg));
}
/// Gossip that the provided set of `new_tables` have been added to the
@ -173,7 +154,7 @@ impl<T> SchemaChangeObserver<T> {
partition_template: schema.partition_template.as_proto().cloned(),
};
self.enqueue(Event::TableCreated(msg));
self.tx.broadcast(Event::TableCreated(msg));
}
}
@ -211,155 +192,9 @@ impl<T> SchemaChangeObserver<T> {
.collect(),
};
self.enqueue(Event::TableUpdated(msg));
self.tx.broadcast(Event::TableUpdated(msg));
}
}
/// Serialise, pack & gossip `msg` asynchronously.
fn enqueue(&self, msg: Event) {
debug!(?msg, "sending schema message");
match self.tx.try_send(msg) {
Ok(_) => {}
Err(TrySendError::Closed(_)) => panic!("schema serialisation actor not running"),
Err(TrySendError::Full(_)) => {
warn!("schema serialisation queue full, dropping message")
}
}
}
}
/// A background task loop that pulls [`Event`] from `rx`, serialises / packs them
/// into a single gossip frame, and broadcasts the result over `gossip`.
async fn actor_loop<T>(mut rx: mpsc::Receiver<Event>, gossip: T)
where
T: SchemaBroadcast,
{
while let Some(event) = rx.recv().await {
let frames = match event {
v @ Event::NamespaceCreated(_) => vec![v],
Event::TableCreated(v) => serialise_table_create_frames(v),
Event::TableUpdated(v) => {
// Split the frame up into N frames, sized as big as the gossip
// transport will allow.
let mut frames = Vec::with_capacity(1);
if !serialise_table_update_frames(v, MAX_USER_PAYLOAD_BYTES, &mut frames) {
warn!("dropping oversized columns in table update");
// Continue sending the columns that were packed
// successfully
}
// It's possible all columns were oversized and therefore
// dropped during the split.
if frames.is_empty() {
warn!("dropping empty table update message");
continue;
}
frames.into_iter().map(Event::TableUpdated).collect()
}
};
for frame in frames {
let msg = SchemaMessage { event: Some(frame) };
gossip.broadcast(msg.encode_to_vec()).await
}
}
debug!("stopping schema serialisation actor");
}
/// Serialise `msg` into one or more frames and append them to `out`.
///
/// If when `msg` is serialised it is less than or equal to `max_frame_bytes` in
/// length, this method appends a single frame. If `msg` is too large, it is
/// split into `N` multiple smaller frames and each appended individually.
///
/// If any [`Column`] within the message is too large to fit into an update
/// containing only itself, then this method returns `false` indicating
/// oversized columns were dropped from the output.
fn serialise_table_update_frames(
mut msg: TableUpdated,
max_frame_bytes: usize,
out: &mut Vec<TableUpdated>,
) -> bool {
// Does this frame fit within the maximum allowed frame size?
if msg.encoded_len() <= max_frame_bytes {
// Never send empty update messages.
if !msg.columns.is_empty() {
out.push(msg);
}
return true;
}
// This message is too large to be sent as a single message.
debug!(
n_bytes = msg.encoded_len(),
"splitting oversized table update message"
);
// Find the midpoint in the column list.
//
// If the column list is down to one candidate, then the TableUpdate
// containing only this column is too large to be sent, so this column must
// be dropped from the output.
if msg.columns.len() <= 1 {
// Return false to indicate some columns were dropped.
return false;
}
let mid = msg.columns.len() / 2;
// Split up the columns in the message into two.
let right = msg.columns.drain(mid..).collect();
// msg now contains the left-most half of the columns.
//
// Construct the frame for the right-most half of the columns.
let other = TableUpdated {
columns: right,
table_name: msg.table_name.clone(),
namespace_name: msg.namespace_name.clone(),
table_id: msg.table_id,
};
// Recursively split the frames until they fit, at which point they'll
// be sent within this call.
serialise_table_update_frames(msg, max_frame_bytes, out)
&& serialise_table_update_frames(other, max_frame_bytes, out)
}
/// Split `msg` into a [`TableCreated`] and a series of [`TableUpdated`] frames,
/// each sized less than [`MAX_USER_PAYLOAD_BYTES`].
fn serialise_table_create_frames(mut msg: TableCreated) -> Vec<Event> {
// If it fits, do nothing.
if msg.encoded_len() <= MAX_USER_PAYLOAD_BYTES {
return vec![Event::TableCreated(msg)];
}
// If not, split the message into a single create, followed by N table
// updates.
let columns = std::mem::take(&mut msg.table.as_mut().unwrap().columns);
// Check that on its own, without columns, it'll be send-able.
if msg.encoded_len() > MAX_USER_PAYLOAD_BYTES {
error!("dropping oversized table create message");
return vec![];
}
// Recreate the new TableUpdate containing all the columns
let mut update = msg.table.as_ref().unwrap().clone();
update.columns = columns;
let mut updates = Vec::with_capacity(1);
if !serialise_table_update_frames(update, MAX_USER_PAYLOAD_BYTES, &mut updates) {
warn!("dropping oversized columns in table update");
// Continue sending the columns that were packed
// successfully
}
// Return the table creation, followed by the updates containing columns.
std::iter::once(Event::TableCreated(msg))
.chain(updates.into_iter().map(Event::TableUpdated))
.collect()
}
#[cfg(test)]
@ -377,11 +212,7 @@ mod tests {
partition_template::{test_table_partition_override, NamespacePartitionTemplateOverride},
ColumnId, NamespaceId, TableId, TableSchema,
};
use generated_types::influxdata::iox::{
gossip::v1::column::ColumnType,
partition_template::v1::{template_part::Part, PartitionTemplate, TemplatePart},
};
use proptest::prelude::*;
use generated_types::influxdata::iox::gossip::v1::column::ColumnType;
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
@ -398,206 +229,6 @@ mod tests {
partition_template: DEFAULT_NAMESPACE_PARTITION_TEMPLATE,
};
proptest! {
/// Assert that overly-large [`TableUpdated`] frames are correctly split
/// into multiple smaller frames that are under the provided maximum
/// size.
#[test]
fn prop_table_update_frame_splitting(
max_frame_bytes in 0_usize..1_000,
n_columns in 0_i64..1_000,
) {
let mut msg = TableUpdated {
table_name: TABLE_NAME.to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
table_id: TABLE_ID,
columns: (0..n_columns).map(|v| {
Column {
name: "ignored".to_string(),
column_id: v,
column_type: 42,
}
}).collect(),
};
let mut out = Vec::new();
let success = serialise_table_update_frames(
msg.clone(),
max_frame_bytes,
&mut out
);
// For all inputs, either:
//
// - The return value indicates a frame could not be split
// entirely, in which case at least one column must exceed
// max_frame_bytes when packed in an update message, preventing
// it from being split down to N=1, and that column is absent
// from the output messages.
//
// or
//
// - The message may have been split into more than one message of
// less-than-or-equal-to max_frame_bytes in size, in which case
// all columns must appear exactly once (validated by checking
// the column ID values reduce to the input set)
//
// All output messages must contain identical table metadata and be
// non-empty.
// Otherwise validate the successful case.
for v in &out {
// Invariant: the split frames must be less than or equal to the
// desired maximum encoded frame size
assert!(v.encoded_len() <= max_frame_bytes);
// Invariant: all messages must contain the same metadata
assert_eq!(v.table_name, msg.table_name);
assert_eq!(v.namespace_name, msg.namespace_name);
assert_eq!(v.table_id, msg.table_id);
// Invariant: there should always be at least one column per
// message (no empty update frames).
assert!(!v.columns.is_empty());
}
let got_ids = into_sorted_vec(out.iter()
.flat_map(|v| v.columns.iter().map(|v| v.column_id)));
// Build the set of IDs that should appear.
let want_ids = if success {
// All column IDs must appear in the output as the splitting was
// successful.
(0..n_columns).collect::<Vec<_>>()
} else {
// Splitting failed.
//
// Build the set of column IDs expected to be in the output
// (those that are under the maximum size on their own).
let cols = std::mem::take(&mut msg.columns);
let want_ids = into_sorted_vec(cols.into_iter().filter_map(|v| {
let column_id = v.column_id;
msg.columns = vec![v];
if msg.encoded_len() > max_frame_bytes {
return None;
}
Some(column_id)
}));
// Assert at least one column must be too large to be packed
// into an update containing only that column if one was
// provided.
if n_columns != 0 {
assert_ne!(want_ids.len(), n_columns as usize);
}
want_ids
};
// The ordered iterator of observed IDs must match the input
// interval of [0, n_columns)
assert!(want_ids.into_iter().eq(got_ids.into_iter()));
}
#[test]
fn prop_table_create_frame_splitting(
n_columns in 0..MAX_USER_PAYLOAD_BYTES as i64,
partition_template_size in 0..MAX_USER_PAYLOAD_BYTES
) {
let mut msg = TableCreated {
table: Some(TableUpdated {
table_name: TABLE_NAME.to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
table_id: TABLE_ID,
columns: (0..n_columns).map(|v| {
Column {
name: "ignored".to_string(),
column_id: v,
column_type: 42,
}
}).collect(),
}),
// Generate a potentially outrageously big partition template.
// It's probably invalid, but that's fine for this test.
partition_template: Some(PartitionTemplate{ parts: vec![
TemplatePart{ part: Some(Part::TagValue(
"b".repeat(partition_template_size).to_string()
))
}]}),
};
let frames = serialise_table_create_frames(msg.clone());
// This TableCreated message is in one of three states:
//
// 1. Small enough to be sent in a one-er
// 2. Big enough to need splitting into a create + updates
// 3. Too big to send the create message at all
//
// For 1 and 2, all columns should be observed, and should follow
// the create message. For 3, nothing should be sent, as those
// updates are likely to go unused by peers who likely don't know
// about this table.
// Validate 3 first.
if frames.is_empty() {
// Then it MUST be too large to send even with no columns.
//
// Remove them and assert the size.
msg.table.as_mut().unwrap().columns = vec![];
assert!(msg.encoded_len() > MAX_USER_PAYLOAD_BYTES);
return Ok(());
}
// Both 1 and 2 require all columns to eventually be observed, as
// well as the create message.
let mut iter = frames.into_iter();
let mut columns = assert_matches!(iter.next(), Some(Event::TableCreated(v)) => {
// Invariant: table metadata must be correct
assert_eq!(v.partition_template, msg.partition_template);
let update = v.table.unwrap();
assert_eq!(update.table_name, TABLE_NAME);
assert_eq!(update.namespace_name, NAMESPACE_NAME);
assert_eq!(update.table_id, TABLE_ID);
// Return the columns in this create message, if any.
update.columns
});
// Combine the columns from above, with any subsequent update
// messages
columns.extend(iter.flat_map(|v| {
assert_matches!(v, Event::TableUpdated(v) => {
// Invariant: metadata in follow-on updates must also match
assert_eq!(v.table_name, TABLE_NAME);
assert_eq!(v.namespace_name, NAMESPACE_NAME);
assert_eq!(v.table_id, TABLE_ID);
v.columns
})
}));
// Columns now contains all the columns, across all the output
// messages.
let got_ids = into_sorted_vec(columns.into_iter().map(|v| v.column_id));
// Which should match the full input set of column IDs
assert!((0..n_columns).eq(got_ids.into_iter()));
}
}
/// Generate a `Vec` of sorted `T`, preserving duplicates, if any.
fn into_sorted_vec<T>(v: impl IntoIterator<Item = T>) -> Vec<T>
where
T: Ord,
{
let mut v = v.into_iter().collect::<Vec<_>>();
v.sort_unstable();
v
}
macro_rules! test_observe {
(
$name:ident,

View File

@ -1,26 +1,21 @@
//! Abstractions decoupling application schema gossiping from the underlying
//! transport.
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;
use async_trait::async_trait;
use generated_types::influxdata::iox::gossip::Topic;
use observability_deps::tracing::error;
use generated_types::influxdata::iox::gossip::v1::schema_message::Event;
use gossip_schema::handle::SchemaTx;
/// An abstract best-effort broadcast primitive, sending an opaque payload to
/// all peers.
#[async_trait]
pub trait SchemaBroadcast: Send + Sync + Debug {
/// Broadcast `payload` to all peers, blocking until the message is enqueued
/// for processing.
async fn broadcast(&self, payload: Vec<u8>);
fn broadcast(&self, payload: Event);
}
#[async_trait]
impl SchemaBroadcast for Arc<gossip::GossipHandle<Topic>> {
async fn broadcast(&self, payload: Vec<u8>) {
if let Err(e) = gossip::GossipHandle::broadcast(self, payload, Topic::SchemaChanges).await {
error!(error=%e, "failed to broadcast payload");
}
impl SchemaBroadcast for SchemaTx {
fn broadcast(&self, payload: Event) {
SchemaTx::broadcast(self, payload)
}
}