From d35bd48f654d2a9423fb04d566395fa6efec4d92 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 17 Aug 2023 11:38:24 +0200 Subject: [PATCH] refactor(gossip): rename GossipMessage Now there's a Topic, there's no need for a giant "all message types" enum. As part of this shift, the gossip_message::GossipMessage used for schema gossiping is sounding overly generic. This commit changes the name to schema_message::SchemaMessage and updates the code. This is a backwards-compatible change (and if anything goes wrong, the "old" routers simply log a warning if a message is unreadable). --- generated_types/build.rs | 2 +- .../gossip/v1/{message.proto => schema.proto} | 6 +- router/src/gossip/dispatcher.rs | 10 ++-- router/src/gossip/mock_schema_broadcast.rs | 10 ++-- router/src/gossip/mod.rs | 9 +-- router/src/gossip/namespace_cache.rs | 44 +++++++-------- router/src/gossip/schema_change_observer.rs | 56 +++++++++---------- 7 files changed, 69 insertions(+), 68 deletions(-) rename generated_types/protos/influxdata/iox/gossip/v1/{message.proto => schema.proto} (96%) diff --git a/generated_types/build.rs b/generated_types/build.rs index d5e42e3416..6465a8dcdf 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -57,7 +57,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { catalog_path.join("service.proto"), compactor_path.join("service.proto"), delete_path.join("service.proto"), - gossip_path.join("message.proto"), + gossip_path.join("schema.proto"), ingester_path.join("parquet_metadata.proto"), ingester_path.join("persist.proto"), ingester_path.join("write.proto"), diff --git a/generated_types/protos/influxdata/iox/gossip/v1/message.proto b/generated_types/protos/influxdata/iox/gossip/v1/schema.proto similarity index 96% rename from generated_types/protos/influxdata/iox/gossip/v1/message.proto rename to generated_types/protos/influxdata/iox/gossip/v1/schema.proto index 2125456d86..7ecf8e3411 100644 --- a/generated_types/protos/influxdata/iox/gossip/v1/message.proto +++ b/generated_types/protos/influxdata/iox/gossip/v1/schema.proto @@ -4,9 +4,9 @@ option go_package = "github.com/influxdata/iox/gossip/v1"; import "influxdata/iox/partition_template/v1/template.proto"; -// A message exchanged via the IOx gossip mechanism. -message GossipMessage { - oneof msg { +// A message exchanged via the IOx gossip mechanism describing schema changes. +message SchemaMessage { + oneof event { // A new namespace was created. NamespaceCreated namespace_created = 1; diff --git a/router/src/gossip/dispatcher.rs b/router/src/gossip/dispatcher.rs index 3f578e5279..292bfb9413 100644 --- a/router/src/gossip/dispatcher.rs +++ b/router/src/gossip/dispatcher.rs @@ -5,18 +5,18 @@ use std::fmt::Debug; use async_trait::async_trait; use bytes::Bytes; use generated_types::influxdata::iox::gossip::{ - v1::{gossip_message::Msg, GossipMessage}, + v1::{schema_message::Event, SchemaMessage}, Topic, }; use generated_types::prost::Message; use observability_deps::tracing::{info, warn}; use tokio::{sync::mpsc, task::JoinHandle}; -/// A handler of [`Msg`] received via gossip. +/// A handler of [`Event`] received via gossip. #[async_trait] pub trait GossipMessageHandler: Send + Sync + Debug { /// Process `message`. - async fn handle(&self, message: Msg); + async fn handle(&self, message: Event); } /// An async gossip message dispatcher. @@ -79,7 +79,7 @@ where { while let Some(payload) = rx.recv().await { // Deserialise the payload into the appropriate proto type. - let msg = match GossipMessage::decode(payload).map(|v| v.msg) { + let event = match SchemaMessage::decode(payload).map(|v| v.event) { Ok(Some(v)) => v, Ok(None) => { warn!("valid frame contains no message"); @@ -92,7 +92,7 @@ where }; // Pass this message off to the handler to process. - handler.handle(msg).await; + handler.handle(event).await; } info!("stopping gossip dispatcher"); diff --git a/router/src/gossip/mock_schema_broadcast.rs b/router/src/gossip/mock_schema_broadcast.rs index ba41f0d05e..7d6e3e8d4e 100644 --- a/router/src/gossip/mock_schema_broadcast.rs +++ b/router/src/gossip/mock_schema_broadcast.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use generated_types::{ - influxdata::iox::gossip::v1::{gossip_message::Msg, GossipMessage}, + influxdata::iox::gossip::v1::{schema_message::Event, SchemaMessage}, prost::Message, }; use parking_lot::Mutex; @@ -28,14 +28,14 @@ impl MockSchemaBroadcast { self.payloads.lock().clone() } - /// Return the deserialised [`Msg`]. - pub fn messages(&self) -> Vec { + /// Return the deserialised [`Event`]. + pub fn messages(&self) -> Vec { self.payloads .lock() .iter() .map(|v| { - GossipMessage::decode(v.as_slice()) - .map(|v| v.msg.expect("no message in payload")) + SchemaMessage::decode(v.as_slice()) + .map(|v| v.event.expect("no message in payload")) .expect("invalid gossip payload") }) .collect() diff --git a/router/src/gossip/mod.rs b/router/src/gossip/mod.rs index c6a5b97094..bd2b783c29 100644 --- a/router/src/gossip/mod.rs +++ b/router/src/gossip/mod.rs @@ -7,14 +7,14 @@ //! //! * The outgoing [`SchemaChangeObserver`]: a router-specific wrapper over the //! underlying [`GossipHandle`]. This type translates the application calls -//! into protobuf [`Msg`], and serialises them into bytes, sending them over +//! into protobuf [`Event`], and serialises them into bytes, sending them over //! the underlying [`gossip`] impl. //! //! * The incoming [`GossipMessageDispatcher`]: deserialises the incoming bytes -//! from the gossip [`Dispatcher`] into [`Msg`] and passes them off to the +//! from the gossip [`Dispatcher`] into [`Event`] and passes them off to the //! [`NamespaceSchemaGossip`] implementation for processing. //! -//! * The incoming [`NamespaceSchemaGossip`]: processes [`Msg`] received from +//! * The incoming [`NamespaceSchemaGossip`]: processes [`Event`] received from //! peers, applying them to the local cache state if necessary. //! //! ```text @@ -53,7 +53,8 @@ //! [`GossipHandle`]: gossip::GossipHandle //! [`Dispatcher`]: gossip::Dispatcher //! [`SchemaChangeObserver`]: schema_change_observer::SchemaChangeObserver -//! [`Msg`]: generated_types::influxdata::iox::gossip::v1::gossip_message::Msg +//! [`Event`]: +//! generated_types::influxdata::iox::gossip::v1::schema_message::Event //! [`GossipMessageDispatcher`]: dispatcher::GossipMessageDispatcher //! [`NamespaceSchemaGossip`]: namespace_cache::NamespaceSchemaGossip diff --git a/router/src/gossip/namespace_cache.rs b/router/src/gossip/namespace_cache.rs index 11e7b7170c..90b876a975 100644 --- a/router/src/gossip/namespace_cache.rs +++ b/router/src/gossip/namespace_cache.rs @@ -9,7 +9,7 @@ use data_types::{ TableId, TableSchema, }; use generated_types::influxdata::iox::gossip::v1::{ - gossip_message::Msg, NamespaceCreated, TableCreated, TableUpdated, + schema_message::Event, NamespaceCreated, TableCreated, TableUpdated, }; use observability_deps::tracing::{debug, error, trace, warn}; use thiserror::Error; @@ -61,7 +61,7 @@ enum Error { /// associated latency penalty and catalog load that comes with it. /// /// This type implements the [`GossipMessageHandler`] which is invoked with the -/// [`Msg`] received from an opaque peer by the [`gossip`] subsystem (off of the +/// [`Event`] received from an opaque peer by the [`gossip`] subsystem (off of the /// hot path), which when processed causes the cache contents to be updated if /// appropriate through the usual [`NamespaceCache::get_schema()`] and /// [`NamespaceCache::put_schema()`] abstraction. @@ -94,13 +94,13 @@ impl GossipMessageHandler for Arc> where C: NamespaceCache, { - async fn handle(&self, message: Msg) { + async fn handle(&self, message: Event) { trace!(?message, "received schema message"); let res = match message { - Msg::NamespaceCreated(v) => self.handle_namespace_created(v).await, - Msg::TableCreated(v) => self.handle_table_created(v).await, - Msg::TableUpdated(v) => self.handle_updated_table(v).await, + Event::NamespaceCreated(v) => self.handle_namespace_created(v).await, + Event::TableCreated(v) => self.handle_table_created(v).await, + Event::TableUpdated(v) => self.handle_updated_table(v).await, }; if let Err(error) = res { @@ -456,7 +456,7 @@ mod tests { test_handle_gossip_message_!( table_created_namespace_miss, existing = None, - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: NAMESPACE_NAME.to_string(), @@ -474,7 +474,7 @@ mod tests { test_handle_gossip_message_!( table_created_no_columns, existing = Some(DEFAULT_NAMESPACE.clone()), - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: NAMESPACE_NAME.to_string(), @@ -507,7 +507,7 @@ mod tests { test_handle_gossip_message_!( table_created, existing = Some(DEFAULT_NAMESPACE.clone()), - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: NAMESPACE_NAME.to_string(), @@ -550,7 +550,7 @@ mod tests { test_handle_gossip_message_!( table_created_no_partition_template, existing = Some(DEFAULT_NAMESPACE.clone()), - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: NAMESPACE_NAME.to_string(), @@ -597,7 +597,7 @@ mod tests { ns }), // Send a gossip message adding a new column to the above - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: NAMESPACE_NAME.to_string(), @@ -663,7 +663,7 @@ mod tests { ns }), // Send a gossip message adding a new column to the above - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: NAMESPACE_NAME.to_string(), @@ -711,7 +711,7 @@ mod tests { test_handle_gossip_message_!( table_created_missing_update, existing = Some(DEFAULT_NAMESPACE), - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: None, // No inner content! partition_template: None, }), @@ -724,7 +724,7 @@ mod tests { test_handle_gossip_message_!( table_created_missing_namespace_name, existing = Some(DEFAULT_NAMESPACE), - message = Msg::TableCreated(TableCreated { + message = Event::TableCreated(TableCreated { table: Some(TableUpdated { table_name: "bananas".to_string(), namespace_name: "".to_string(), // empty is missing in proto @@ -742,7 +742,7 @@ mod tests { test_handle_gossip_message_!( table_updated_missing_namespace_name, existing = Some(DEFAULT_NAMESPACE), - message = Msg::TableUpdated(TableUpdated { + message = Event::TableUpdated(TableUpdated { table_name: "bananas".to_string(), namespace_name: "".to_string(), // empty is missing in proto table_id: 42, @@ -757,7 +757,7 @@ mod tests { test_handle_gossip_message_!( table_updated_missing_namespace, existing = Some(DEFAULT_NAMESPACE), - message = Msg::TableUpdated(TableUpdated { + message = Event::TableUpdated(TableUpdated { table_name: "bananas".to_string(), namespace_name: "another".to_string(), // not known locally table_id: 42, @@ -772,7 +772,7 @@ mod tests { test_handle_gossip_message_!( table_updated_missing_table, existing = Some(DEFAULT_NAMESPACE), - message = Msg::TableUpdated(TableUpdated { + message = Event::TableUpdated(TableUpdated { table_name: "bananas".to_string(), // Table not known locally namespace_name: NAMESPACE_NAME.to_string(), table_id: 42, @@ -806,7 +806,7 @@ mod tests { ns }), - message = Msg::TableUpdated(TableUpdated { + message = Event::TableUpdated(TableUpdated { table_name: "bananas".to_string(), // Table not known locally namespace_name: NAMESPACE_NAME.to_string(), table_id: 42, @@ -856,7 +856,7 @@ mod tests { test_handle_gossip_message_!( namespace_created_missing_name, existing = None, - message = Msg::NamespaceCreated(NamespaceCreated { + message = Event::NamespaceCreated(NamespaceCreated { namespace_name: "".to_string(), // missing in proto namespace_id: DEFAULT_NAMESPACE.id.get(), partition_template: Some((**PARTITION_BY_DAY_PROTO).clone()), @@ -871,7 +871,7 @@ mod tests { test_handle_gossip_message_!( namespace_created, existing = None, - message = Msg::NamespaceCreated(NamespaceCreated { + message = Event::NamespaceCreated(NamespaceCreated { namespace_name: NAMESPACE_NAME.to_string(), namespace_id: DEFAULT_NAMESPACE.id.get(), partition_template: None, @@ -889,7 +889,7 @@ mod tests { test_handle_gossip_message_!( namespace_created_specified_partition_template, existing = None, - message = Msg::NamespaceCreated(NamespaceCreated { + message = Event::NamespaceCreated(NamespaceCreated { namespace_name: NAMESPACE_NAME.to_string(), // missing in proto namespace_id: DEFAULT_NAMESPACE.id.get(), partition_template: Some((**PARTITION_BY_DAY_PROTO).clone()), @@ -908,7 +908,7 @@ mod tests { test_handle_gossip_message_!( namespace_created_existing, existing = Some(DEFAULT_NAMESPACE), - message = Msg::NamespaceCreated(NamespaceCreated { + message = Event::NamespaceCreated(NamespaceCreated { namespace_name: NAMESPACE_NAME.to_string(), // missing in proto namespace_id: DEFAULT_NAMESPACE.id.get(), diff --git a/router/src/gossip/schema_change_observer.rs b/router/src/gossip/schema_change_observer.rs index e4372d0ee4..2bb3500e1c 100644 --- a/router/src/gossip/schema_change_observer.rs +++ b/router/src/gossip/schema_change_observer.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use data_types::{ColumnsByName, NamespaceName, NamespaceSchema}; use generated_types::{ influxdata::iox::gossip::v1::{ - gossip_message::Msg, Column, GossipMessage, NamespaceCreated, TableCreated, TableUpdated, + schema_message::Event, Column, NamespaceCreated, SchemaMessage, TableCreated, TableUpdated, }, prost::Message, }; @@ -35,7 +35,7 @@ use super::traits::SchemaBroadcast; /// Instead of gossiping the entire schema, the new schema elements described by /// the [`ChangeStats`] are transmitted on a best-effort basis. /// -/// Gossip [`Msg`] are populated within the call to +/// Gossip [`Event`] are populated within the call to /// [`NamespaceCache::put_schema()`] but packed & serialised into [`gossip`] /// frames off-path in a background task to minimise the latency overhead. /// @@ -44,7 +44,7 @@ use super::traits::SchemaBroadcast; /// request path, but framing, serialisation and dispatch happen asynchronously. #[derive(Debug)] pub struct SchemaChangeObserver { - tx: mpsc::Sender, + tx: mpsc::Sender, task: JoinHandle<()>, inner: T, } @@ -144,7 +144,7 @@ impl SchemaChangeObserver { retention_period_ns: schema.retention_period_ns, }; - self.enqueue(Msg::NamespaceCreated(msg)); + self.enqueue(Event::NamespaceCreated(msg)); } /// Gossip that the provided set of `new_tables` have been added to the @@ -173,7 +173,7 @@ impl SchemaChangeObserver { partition_template: schema.partition_template.as_proto().cloned(), }; - self.enqueue(Msg::TableCreated(msg)); + self.enqueue(Event::TableCreated(msg)); } } @@ -211,12 +211,12 @@ impl SchemaChangeObserver { .collect(), }; - self.enqueue(Msg::TableUpdated(msg)); + self.enqueue(Event::TableUpdated(msg)); } } /// Serialise, pack & gossip `msg` asynchronously. - fn enqueue(&self, msg: Msg) { + fn enqueue(&self, msg: Event) { debug!(?msg, "sending schema message"); match self.tx.try_send(msg) { Ok(_) => {} @@ -228,17 +228,17 @@ impl SchemaChangeObserver { } } -/// A background task loop that pulls [`Msg`] from `rx`, serialises / packs them +/// A background task loop that pulls [`Event`] from `rx`, serialises / packs them /// into a single gossip frame, and broadcasts the result over `gossip`. -async fn actor_loop(mut rx: mpsc::Receiver, gossip: T) +async fn actor_loop(mut rx: mpsc::Receiver, gossip: T) where T: SchemaBroadcast, { - while let Some(msg) = rx.recv().await { - let frames = match msg { - v @ Msg::NamespaceCreated(_) => vec![v], - Msg::TableCreated(v) => serialise_table_create_frames(v), - Msg::TableUpdated(v) => { + while let Some(event) = rx.recv().await { + let frames = match event { + v @ Event::NamespaceCreated(_) => vec![v], + Event::TableCreated(v) => serialise_table_create_frames(v), + Event::TableUpdated(v) => { // Split the frame up into N frames, sized as big as the gossip // transport will allow. let mut frames = Vec::with_capacity(1); @@ -255,12 +255,12 @@ where continue; } - frames.into_iter().map(Msg::TableUpdated).collect() + frames.into_iter().map(Event::TableUpdated).collect() } }; for frame in frames { - let msg = GossipMessage { msg: Some(frame) }; + let msg = SchemaMessage { event: Some(frame) }; gossip.broadcast(msg.encode_to_vec()).await } } @@ -329,10 +329,10 @@ fn serialise_table_update_frames( /// Split `msg` into a [`TableCreated`] and a series of [`TableUpdated`] frames, /// each sized less than [`MAX_USER_PAYLOAD_BYTES`]. -fn serialise_table_create_frames(mut msg: TableCreated) -> Vec { +fn serialise_table_create_frames(mut msg: TableCreated) -> Vec { // If it fits, do nothing. if msg.encoded_len() <= MAX_USER_PAYLOAD_BYTES { - return vec![Msg::TableCreated(msg)]; + return vec![Event::TableCreated(msg)]; } // If not, split the message into a single create, followed by N table @@ -357,8 +357,8 @@ fn serialise_table_create_frames(mut msg: TableCreated) -> Vec { } // Return the table creation, followed by the updates containing columns. - std::iter::once(Msg::TableCreated(msg)) - .chain(updates.into_iter().map(Msg::TableUpdated)) + std::iter::once(Event::TableCreated(msg)) + .chain(updates.into_iter().map(Event::TableUpdated)) .collect() } @@ -553,7 +553,7 @@ mod tests { // well as the create message. let mut iter = frames.into_iter(); - let mut columns = assert_matches!(iter.next(), Some(Msg::TableCreated(v)) => { + let mut columns = assert_matches!(iter.next(), Some(Event::TableCreated(v)) => { // Invariant: table metadata must be correct assert_eq!(v.partition_template, msg.partition_template); @@ -569,7 +569,7 @@ mod tests { // Combine the columns from above, with any subsequent update // messages columns.extend(iter.flat_map(|v| { - assert_matches!(v, Msg::TableUpdated(v) => { + assert_matches!(v, Event::TableUpdated(v) => { // Invariant: metadata in follow-on updates must also match assert_eq!(v.table_name, TABLE_NAME); assert_eq!(v.namespace_name, NAMESPACE_NAME); @@ -643,7 +643,7 @@ mod tests { }, schema = DEFAULT_NAMESPACE, want_count = 1, - want = [Msg::NamespaceCreated(NamespaceCreated { + want = [Event::NamespaceCreated(NamespaceCreated { namespace_name, namespace_id, partition_template, @@ -687,7 +687,7 @@ mod tests { }, schema = DEFAULT_NAMESPACE, want_count = 1, - want = [Msg::NamespaceCreated(NamespaceCreated { + want = [Event::NamespaceCreated(NamespaceCreated { namespace_name, namespace_id, partition_template, @@ -695,7 +695,7 @@ mod tests { max_tables, retention_period_ns }), - Msg::TableCreated(TableCreated { table, partition_template: table_template })] => { + Event::TableCreated(TableCreated { table, partition_template: table_template })] => { // Validate the namespace create message assert_eq!(namespace_name, NAMESPACE_NAME); @@ -751,7 +751,7 @@ mod tests { }, schema = DEFAULT_NAMESPACE, want_count = 1, - want = [Msg::TableCreated(TableCreated { table, partition_template: table_template })] => { + want = [Event::TableCreated(TableCreated { table, partition_template: table_template })] => { let meta = table.as_ref().expect("must have metadata"); assert_eq!(meta.table_name, TABLE_NAME); @@ -823,8 +823,8 @@ mod tests { }, want_count = 1, want = [ - Msg::TableCreated(created), - Msg::TableUpdated(updated), + Event::TableCreated(created), + Event::TableUpdated(updated), ] => { // Table create validation let meta = created.table.as_ref().expect("must have metadata");