Merge pull request #8538 from influxdata/dom/gossip-rename

refactor(gossip): rename GossipMessage
pull/24376/head
Dom 2023-08-22 11:41:40 +01:00 committed by GitHub
commit 53a2ece7b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 68 deletions

View File

@ -57,7 +57,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
catalog_path.join("service.proto"),
compactor_path.join("service.proto"),
delete_path.join("service.proto"),
gossip_path.join("message.proto"),
gossip_path.join("schema.proto"),
ingester_path.join("parquet_metadata.proto"),
ingester_path.join("persist.proto"),
ingester_path.join("write.proto"),

View File

@ -4,9 +4,9 @@ option go_package = "github.com/influxdata/iox/gossip/v1";
import "influxdata/iox/partition_template/v1/template.proto";
// A message exchanged via the IOx gossip mechanism.
message GossipMessage {
oneof msg {
// A message exchanged via the IOx gossip mechanism describing schema changes.
message SchemaMessage {
oneof event {
// A new namespace was created.
NamespaceCreated namespace_created = 1;

View File

@ -5,18 +5,18 @@ use std::fmt::Debug;
use async_trait::async_trait;
use bytes::Bytes;
use generated_types::influxdata::iox::gossip::{
v1::{gossip_message::Msg, GossipMessage},
v1::{schema_message::Event, SchemaMessage},
Topic,
};
use generated_types::prost::Message;
use observability_deps::tracing::{info, warn};
use tokio::{sync::mpsc, task::JoinHandle};
/// A handler of [`Msg`] received via gossip.
/// A handler of [`Event`] received via gossip.
#[async_trait]
pub trait GossipMessageHandler: Send + Sync + Debug {
/// Process `message`.
async fn handle(&self, message: Msg);
async fn handle(&self, message: Event);
}
/// An async gossip message dispatcher.
@ -79,7 +79,7 @@ where
{
while let Some(payload) = rx.recv().await {
// Deserialise the payload into the appropriate proto type.
let msg = match GossipMessage::decode(payload).map(|v| v.msg) {
let event = match SchemaMessage::decode(payload).map(|v| v.event) {
Ok(Some(v)) => v,
Ok(None) => {
warn!("valid frame contains no message");
@ -92,7 +92,7 @@ where
};
// Pass this message off to the handler to process.
handler.handle(msg).await;
handler.handle(event).await;
}
info!("stopping gossip dispatcher");

View File

@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use generated_types::{
influxdata::iox::gossip::v1::{gossip_message::Msg, GossipMessage},
influxdata::iox::gossip::v1::{schema_message::Event, SchemaMessage},
prost::Message,
};
use parking_lot::Mutex;
@ -28,14 +28,14 @@ impl MockSchemaBroadcast {
self.payloads.lock().clone()
}
/// Return the deserialised [`Msg`].
pub fn messages(&self) -> Vec<Msg> {
/// Return the deserialised [`Event`].
pub fn messages(&self) -> Vec<Event> {
self.payloads
.lock()
.iter()
.map(|v| {
GossipMessage::decode(v.as_slice())
.map(|v| v.msg.expect("no message in payload"))
SchemaMessage::decode(v.as_slice())
.map(|v| v.event.expect("no message in payload"))
.expect("invalid gossip payload")
})
.collect()

View File

@ -7,14 +7,14 @@
//!
//! * The outgoing [`SchemaChangeObserver`]: a router-specific wrapper over the
//! underlying [`GossipHandle`]. This type translates the application calls
//! into protobuf [`Msg`], and serialises them into bytes, sending them over
//! into protobuf [`Event`], and serialises them into bytes, sending them over
//! the underlying [`gossip`] impl.
//!
//! * The incoming [`GossipMessageDispatcher`]: deserialises the incoming bytes
//! from the gossip [`Dispatcher`] into [`Msg`] and passes them off to the
//! from the gossip [`Dispatcher`] into [`Event`] and passes them off to the
//! [`NamespaceSchemaGossip`] implementation for processing.
//!
//! * The incoming [`NamespaceSchemaGossip`]: processes [`Msg`] received from
//! * The incoming [`NamespaceSchemaGossip`]: processes [`Event`] received from
//! peers, applying them to the local cache state if necessary.
//!
//! ```text
@ -53,7 +53,8 @@
//! [`GossipHandle`]: gossip::GossipHandle
//! [`Dispatcher`]: gossip::Dispatcher
//! [`SchemaChangeObserver`]: schema_change_observer::SchemaChangeObserver
//! [`Msg`]: generated_types::influxdata::iox::gossip::v1::gossip_message::Msg
//! [`Event`]:
//! generated_types::influxdata::iox::gossip::v1::schema_message::Event
//! [`GossipMessageDispatcher`]: dispatcher::GossipMessageDispatcher
//! [`NamespaceSchemaGossip`]: namespace_cache::NamespaceSchemaGossip

View File

@ -9,7 +9,7 @@ use data_types::{
TableId, TableSchema,
};
use generated_types::influxdata::iox::gossip::v1::{
gossip_message::Msg, NamespaceCreated, TableCreated, TableUpdated,
schema_message::Event, NamespaceCreated, TableCreated, TableUpdated,
};
use observability_deps::tracing::{debug, error, trace, warn};
use thiserror::Error;
@ -61,7 +61,7 @@ enum Error {
/// associated latency penalty and catalog load that comes with it.
///
/// This type implements the [`GossipMessageHandler`] which is invoked with the
/// [`Msg`] received from an opaque peer by the [`gossip`] subsystem (off of the
/// [`Event`] received from an opaque peer by the [`gossip`] subsystem (off of the
/// hot path), which when processed causes the cache contents to be updated if
/// appropriate through the usual [`NamespaceCache::get_schema()`] and
/// [`NamespaceCache::put_schema()`] abstraction.
@ -94,13 +94,13 @@ impl<C> GossipMessageHandler for Arc<NamespaceSchemaGossip<C>>
where
C: NamespaceCache<ReadError = CacheMissErr>,
{
async fn handle(&self, message: Msg) {
async fn handle(&self, message: Event) {
trace!(?message, "received schema message");
let res = match message {
Msg::NamespaceCreated(v) => self.handle_namespace_created(v).await,
Msg::TableCreated(v) => self.handle_table_created(v).await,
Msg::TableUpdated(v) => self.handle_updated_table(v).await,
Event::NamespaceCreated(v) => self.handle_namespace_created(v).await,
Event::TableCreated(v) => self.handle_table_created(v).await,
Event::TableUpdated(v) => self.handle_updated_table(v).await,
};
if let Err(error) = res {
@ -456,7 +456,7 @@ mod tests {
test_handle_gossip_message_!(
table_created_namespace_miss,
existing = None,
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
@ -474,7 +474,7 @@ mod tests {
test_handle_gossip_message_!(
table_created_no_columns,
existing = Some(DEFAULT_NAMESPACE.clone()),
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
@ -507,7 +507,7 @@ mod tests {
test_handle_gossip_message_!(
table_created,
existing = Some(DEFAULT_NAMESPACE.clone()),
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
@ -550,7 +550,7 @@ mod tests {
test_handle_gossip_message_!(
table_created_no_partition_template,
existing = Some(DEFAULT_NAMESPACE.clone()),
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
@ -597,7 +597,7 @@ mod tests {
ns
}),
// Send a gossip message adding a new column to the above
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
@ -663,7 +663,7 @@ mod tests {
ns
}),
// Send a gossip message adding a new column to the above
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: NAMESPACE_NAME.to_string(),
@ -711,7 +711,7 @@ mod tests {
test_handle_gossip_message_!(
table_created_missing_update,
existing = Some(DEFAULT_NAMESPACE),
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: None, // No inner content!
partition_template: None,
}),
@ -724,7 +724,7 @@ mod tests {
test_handle_gossip_message_!(
table_created_missing_namespace_name,
existing = Some(DEFAULT_NAMESPACE),
message = Msg::TableCreated(TableCreated {
message = Event::TableCreated(TableCreated {
table: Some(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: "".to_string(), // empty is missing in proto
@ -742,7 +742,7 @@ mod tests {
test_handle_gossip_message_!(
table_updated_missing_namespace_name,
existing = Some(DEFAULT_NAMESPACE),
message = Msg::TableUpdated(TableUpdated {
message = Event::TableUpdated(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: "".to_string(), // empty is missing in proto
table_id: 42,
@ -757,7 +757,7 @@ mod tests {
test_handle_gossip_message_!(
table_updated_missing_namespace,
existing = Some(DEFAULT_NAMESPACE),
message = Msg::TableUpdated(TableUpdated {
message = Event::TableUpdated(TableUpdated {
table_name: "bananas".to_string(),
namespace_name: "another".to_string(), // not known locally
table_id: 42,
@ -772,7 +772,7 @@ mod tests {
test_handle_gossip_message_!(
table_updated_missing_table,
existing = Some(DEFAULT_NAMESPACE),
message = Msg::TableUpdated(TableUpdated {
message = Event::TableUpdated(TableUpdated {
table_name: "bananas".to_string(), // Table not known locally
namespace_name: NAMESPACE_NAME.to_string(),
table_id: 42,
@ -806,7 +806,7 @@ mod tests {
ns
}),
message = Msg::TableUpdated(TableUpdated {
message = Event::TableUpdated(TableUpdated {
table_name: "bananas".to_string(), // Table not known locally
namespace_name: NAMESPACE_NAME.to_string(),
table_id: 42,
@ -856,7 +856,7 @@ mod tests {
test_handle_gossip_message_!(
namespace_created_missing_name,
existing = None,
message = Msg::NamespaceCreated(NamespaceCreated {
message = Event::NamespaceCreated(NamespaceCreated {
namespace_name: "".to_string(), // missing in proto
namespace_id: DEFAULT_NAMESPACE.id.get(),
partition_template: Some((**PARTITION_BY_DAY_PROTO).clone()),
@ -871,7 +871,7 @@ mod tests {
test_handle_gossip_message_!(
namespace_created,
existing = None,
message = Msg::NamespaceCreated(NamespaceCreated {
message = Event::NamespaceCreated(NamespaceCreated {
namespace_name: NAMESPACE_NAME.to_string(),
namespace_id: DEFAULT_NAMESPACE.id.get(),
partition_template: None,
@ -889,7 +889,7 @@ mod tests {
test_handle_gossip_message_!(
namespace_created_specified_partition_template,
existing = None,
message = Msg::NamespaceCreated(NamespaceCreated {
message = Event::NamespaceCreated(NamespaceCreated {
namespace_name: NAMESPACE_NAME.to_string(), // missing in proto
namespace_id: DEFAULT_NAMESPACE.id.get(),
partition_template: Some((**PARTITION_BY_DAY_PROTO).clone()),
@ -908,7 +908,7 @@ mod tests {
test_handle_gossip_message_!(
namespace_created_existing,
existing = Some(DEFAULT_NAMESPACE),
message = Msg::NamespaceCreated(NamespaceCreated {
message = Event::NamespaceCreated(NamespaceCreated {
namespace_name: NAMESPACE_NAME.to_string(), // missing in proto
namespace_id: DEFAULT_NAMESPACE.id.get(),

View File

@ -6,7 +6,7 @@ use async_trait::async_trait;
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema};
use generated_types::{
influxdata::iox::gossip::v1::{
gossip_message::Msg, Column, GossipMessage, NamespaceCreated, TableCreated, TableUpdated,
schema_message::Event, Column, NamespaceCreated, SchemaMessage, TableCreated, TableUpdated,
},
prost::Message,
};
@ -35,7 +35,7 @@ use super::traits::SchemaBroadcast;
/// Instead of gossiping the entire schema, the new schema elements described by
/// the [`ChangeStats`] are transmitted on a best-effort basis.
///
/// Gossip [`Msg`] are populated within the call to
/// Gossip [`Event`] are populated within the call to
/// [`NamespaceCache::put_schema()`] but packed & serialised into [`gossip`]
/// frames off-path in a background task to minimise the latency overhead.
///
@ -44,7 +44,7 @@ use super::traits::SchemaBroadcast;
/// request path, but framing, serialisation and dispatch happen asynchronously.
#[derive(Debug)]
pub struct SchemaChangeObserver<T> {
tx: mpsc::Sender<Msg>,
tx: mpsc::Sender<Event>,
task: JoinHandle<()>,
inner: T,
}
@ -144,7 +144,7 @@ impl<T> SchemaChangeObserver<T> {
retention_period_ns: schema.retention_period_ns,
};
self.enqueue(Msg::NamespaceCreated(msg));
self.enqueue(Event::NamespaceCreated(msg));
}
/// Gossip that the provided set of `new_tables` have been added to the
@ -173,7 +173,7 @@ impl<T> SchemaChangeObserver<T> {
partition_template: schema.partition_template.as_proto().cloned(),
};
self.enqueue(Msg::TableCreated(msg));
self.enqueue(Event::TableCreated(msg));
}
}
@ -211,12 +211,12 @@ impl<T> SchemaChangeObserver<T> {
.collect(),
};
self.enqueue(Msg::TableUpdated(msg));
self.enqueue(Event::TableUpdated(msg));
}
}
/// Serialise, pack & gossip `msg` asynchronously.
fn enqueue(&self, msg: Msg) {
fn enqueue(&self, msg: Event) {
debug!(?msg, "sending schema message");
match self.tx.try_send(msg) {
Ok(_) => {}
@ -228,17 +228,17 @@ impl<T> SchemaChangeObserver<T> {
}
}
/// A background task loop that pulls [`Msg`] from `rx`, serialises / packs them
/// A background task loop that pulls [`Event`] from `rx`, serialises / packs them
/// into a single gossip frame, and broadcasts the result over `gossip`.
async fn actor_loop<T>(mut rx: mpsc::Receiver<Msg>, gossip: T)
async fn actor_loop<T>(mut rx: mpsc::Receiver<Event>, gossip: T)
where
T: SchemaBroadcast,
{
while let Some(msg) = rx.recv().await {
let frames = match msg {
v @ Msg::NamespaceCreated(_) => vec![v],
Msg::TableCreated(v) => serialise_table_create_frames(v),
Msg::TableUpdated(v) => {
while let Some(event) = rx.recv().await {
let frames = match event {
v @ Event::NamespaceCreated(_) => vec![v],
Event::TableCreated(v) => serialise_table_create_frames(v),
Event::TableUpdated(v) => {
// Split the frame up into N frames, sized as big as the gossip
// transport will allow.
let mut frames = Vec::with_capacity(1);
@ -255,12 +255,12 @@ where
continue;
}
frames.into_iter().map(Msg::TableUpdated).collect()
frames.into_iter().map(Event::TableUpdated).collect()
}
};
for frame in frames {
let msg = GossipMessage { msg: Some(frame) };
let msg = SchemaMessage { event: Some(frame) };
gossip.broadcast(msg.encode_to_vec()).await
}
}
@ -329,10 +329,10 @@ fn serialise_table_update_frames(
/// Split `msg` into a [`TableCreated`] and a series of [`TableUpdated`] frames,
/// each sized less than [`MAX_USER_PAYLOAD_BYTES`].
fn serialise_table_create_frames(mut msg: TableCreated) -> Vec<Msg> {
fn serialise_table_create_frames(mut msg: TableCreated) -> Vec<Event> {
// If it fits, do nothing.
if msg.encoded_len() <= MAX_USER_PAYLOAD_BYTES {
return vec![Msg::TableCreated(msg)];
return vec![Event::TableCreated(msg)];
}
// If not, split the message into a single create, followed by N table
@ -357,8 +357,8 @@ fn serialise_table_create_frames(mut msg: TableCreated) -> Vec<Msg> {
}
// Return the table creation, followed by the updates containing columns.
std::iter::once(Msg::TableCreated(msg))
.chain(updates.into_iter().map(Msg::TableUpdated))
std::iter::once(Event::TableCreated(msg))
.chain(updates.into_iter().map(Event::TableUpdated))
.collect()
}
@ -553,7 +553,7 @@ mod tests {
// well as the create message.
let mut iter = frames.into_iter();
let mut columns = assert_matches!(iter.next(), Some(Msg::TableCreated(v)) => {
let mut columns = assert_matches!(iter.next(), Some(Event::TableCreated(v)) => {
// Invariant: table metadata must be correct
assert_eq!(v.partition_template, msg.partition_template);
@ -569,7 +569,7 @@ mod tests {
// Combine the columns from above, with any subsequent update
// messages
columns.extend(iter.flat_map(|v| {
assert_matches!(v, Msg::TableUpdated(v) => {
assert_matches!(v, Event::TableUpdated(v) => {
// Invariant: metadata in follow-on updates must also match
assert_eq!(v.table_name, TABLE_NAME);
assert_eq!(v.namespace_name, NAMESPACE_NAME);
@ -643,7 +643,7 @@ mod tests {
},
schema = DEFAULT_NAMESPACE,
want_count = 1,
want = [Msg::NamespaceCreated(NamespaceCreated {
want = [Event::NamespaceCreated(NamespaceCreated {
namespace_name,
namespace_id,
partition_template,
@ -687,7 +687,7 @@ mod tests {
},
schema = DEFAULT_NAMESPACE,
want_count = 1,
want = [Msg::NamespaceCreated(NamespaceCreated {
want = [Event::NamespaceCreated(NamespaceCreated {
namespace_name,
namespace_id,
partition_template,
@ -695,7 +695,7 @@ mod tests {
max_tables,
retention_period_ns
}),
Msg::TableCreated(TableCreated { table, partition_template: table_template })] => {
Event::TableCreated(TableCreated { table, partition_template: table_template })] => {
// Validate the namespace create message
assert_eq!(namespace_name, NAMESPACE_NAME);
@ -751,7 +751,7 @@ mod tests {
},
schema = DEFAULT_NAMESPACE,
want_count = 1,
want = [Msg::TableCreated(TableCreated { table, partition_template: table_template })] => {
want = [Event::TableCreated(TableCreated { table, partition_template: table_template })] => {
let meta = table.as_ref().expect("must have metadata");
assert_eq!(meta.table_name, TABLE_NAME);
@ -823,8 +823,8 @@ mod tests {
},
want_count = 1,
want = [
Msg::TableCreated(created),
Msg::TableUpdated(updated),
Event::TableCreated(created),
Event::TableUpdated(updated),
] => {
// Table create validation
let meta = created.table.as_ref().expect("must have metadata");