From 863a6d0b4a86c78395bb39a7b64cf5579f6f2b7e Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 17 Mar 2025 20:20:07 -0400 Subject: [PATCH] feat: ack catalog update broadcast (#26118) This creates a CatalogUpdateMessage type that is used to send CatalogUpdates; this type performs the send on the oneshot Sender so that the consumer of the message does not need to do so. Subscribers to the catalog get a CatalogSubscription, which uses the CatalogUpdateMessage type to ACK the message broadcast from the catalog. This means that catalog message broadcast can fail, but this commit does not provide any means of rolling back a catalog update. A test was added to check that it works. --- influxdb3/src/commands/serve.rs | 5 +- influxdb3_cache/src/distinct_cache/mod.rs | 8 +- .../src/distinct_cache/provider.rs | 93 +++++----- influxdb3_cache/src/last_cache/mod.rs | 9 +- influxdb3_cache/src/last_cache/provider.rs | 87 +++++----- influxdb3_catalog/src/catalog.rs | 32 ++-- influxdb3_catalog/src/catalog/update.rs | 21 ++- influxdb3_catalog/src/channel.rs | 161 ++++++++++++++++++ influxdb3_catalog/src/error.rs | 5 +- influxdb3_catalog/src/lib.rs | 1 + influxdb3_processing_engine/src/lib.rs | 153 ++++++++--------- influxdb3_server/src/lib.rs | 8 +- influxdb3_server/src/query_executor/mod.rs | 5 +- influxdb3_write/src/write_buffer/mod.rs | 26 ++- .../src/write_buffer/queryable_buffer.rs | 5 +- 15 files changed, 387 insertions(+), 232 deletions(-) create mode 100644 influxdb3_catalog/src/channel.rs diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 3ca8138221..d39f4fc23a 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -562,6 +562,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&catalog) as _, config.last_cache_eviction_interval.into(), ) + .await .map_err(Error::InitializeLastCache)?; let distinct_cache = DistinctCacheProvider::new_from_catalog_with_background_eviction( @@ -569,6 +570,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&catalog), config.distinct_cache_eviction_interval.into(), ) + .await .map_err(Error::InitializeDistinctCache)?; let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs { @@ -637,7 +639,8 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&query_executor) as _, Arc::clone(&time_provider) as _, sys_events_store, - ); + ) + .await; let builder = ServerBuilder::new(common_state) .max_request_size(config.max_http_request_size) diff --git a/influxdb3_cache/src/distinct_cache/mod.rs b/influxdb3_cache/src/distinct_cache/mod.rs index 5ceaeb9dd3..a0ffa21ba7 100644 --- a/influxdb3_cache/src/distinct_cache/mod.rs +++ b/influxdb3_cache/src/distinct_cache/mod.rs @@ -419,7 +419,9 @@ mod tests { debug!(catlog = ?writer.catalog(), "writer catalog"); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let distinct_provider = - DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()) + .await + .unwrap(); writer .catalog() .create_distinct_cache( @@ -865,7 +867,9 @@ mod tests { ).await; let distinct_provider = - DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()) + .await + .unwrap(); writer .catalog() .create_distinct_cache( diff --git a/influxdb3_cache/src/distinct_cache/provider.rs b/influxdb3_cache/src/distinct_cache/provider.rs index 1d59564797..884481f364 100644 --- a/influxdb3_cache/src/distinct_cache/provider.rs +++ b/influxdb3_cache/src/distinct_cache/provider.rs @@ -2,7 +2,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::datatypes::SchemaRef; use influxdb3_catalog::{ - catalog::{Catalog, CatalogBroadcastReceiver}, + catalog::Catalog, + channel::CatalogUpdateReceiver, log::{ CatalogBatch, DatabaseCatalogOp, DeleteDistinctCacheLog, DistinctCacheDefinition, SoftDeleteTableLog, @@ -11,9 +12,7 @@ use influxdb3_catalog::{ use influxdb3_id::{DbId, DistinctCacheId, TableId}; use influxdb3_wal::{WalContents, WalOp}; use iox_time::TimeProvider; -use observability_deps::tracing::warn; use parking_lot::RwLock; -use tokio::sync::broadcast::error::RecvError; use super::{ CacheError, @@ -46,7 +45,7 @@ pub struct DistinctCacheProvider { impl DistinctCacheProvider { /// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's /// `cache_map` from the definitions in the catalog. - pub fn new_from_catalog( + pub async fn new_from_catalog( time_provider: Arc, catalog: Arc, ) -> Result, ProviderError> { @@ -72,7 +71,10 @@ impl DistinctCacheProvider { } } - background_catalog_update(Arc::clone(&provider), catalog.subscribe_to_updates()); + background_catalog_update( + Arc::clone(&provider), + catalog.subscribe_to_updates("distinct_cache").await, + ); Ok(provider) } @@ -81,12 +83,12 @@ impl DistinctCacheProvider { /// `cache_map` from the definitions in the catalog. This starts a background process that /// runs on the provided `eviction_interval` to perform eviction on all of the caches /// in the created [`DistinctCacheProvider`]'s `cache_map`. - pub fn new_from_catalog_with_background_eviction( + pub async fn new_from_catalog_with_background_eviction( time_provider: Arc, catalog: Arc, eviction_interval: Duration, ) -> Result, ProviderError> { - let provider = Self::new_from_catalog(time_provider, catalog)?; + let provider = Self::new_from_catalog(time_provider, catalog).await?; background_eviction_process(Arc::clone(&provider), eviction_interval); @@ -267,58 +269,41 @@ impl DistinctCacheProvider { fn background_catalog_update( provider: Arc, - mut subscription: CatalogBroadcastReceiver, + mut subscription: CatalogUpdateReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - loop { - match subscription.recv().await { - Ok(catalog_update) => { - for batch in catalog_update - .batches() - .filter_map(CatalogBatch::as_database) - { - for op in batch.ops.iter() { - match op { - DatabaseCatalogOp::SoftDeleteDatabase(_) => { - provider.delete_caches_for_db(&batch.database_id); - } - DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { - database_id, - table_id, - .. - }) => { - provider.delete_caches_for_db_and_table(database_id, table_id); - } - DatabaseCatalogOp::CreateDistinctCache(log) => { - provider.create_from_catalog(batch.database_id, log); - } - DatabaseCatalogOp::DeleteDistinctCache( - DeleteDistinctCacheLog { - table_id, cache_id, .. - }, - ) => { - // This only errors when the cache isn't there, so we ignore the - // error... - let _ = provider.delete_cache( - &batch.database_id, - table_id, - cache_id, - ); - } - _ => (), - } + while let Some(catalog_update) = subscription.recv().await { + for batch in catalog_update + .batches() + .filter_map(CatalogBatch::as_database) + { + for op in batch.ops.iter() { + match op { + DatabaseCatalogOp::SoftDeleteDatabase(_) => { + provider.delete_caches_for_db(&batch.database_id); } + DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { + database_id, + table_id, + .. + }) => { + provider.delete_caches_for_db_and_table(database_id, table_id); + } + DatabaseCatalogOp::CreateDistinctCache(log) => { + provider.create_from_catalog(batch.database_id, log); + } + DatabaseCatalogOp::DeleteDistinctCache(DeleteDistinctCacheLog { + table_id, + cache_id, + .. + }) => { + // This only errors when the cache isn't there, so we ignore the + // error... + let _ = provider.delete_cache(&batch.database_id, table_id, cache_id); + } + _ => (), } } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_messages_skipped)) => { - // TODO: in this case, we would need to re-initialize the distinct cache provider - // from the catalog, if possible. - warn!( - num_messages_skipped, - "distinct cache provider catalog subscription is lagging" - ); - } } } }) diff --git a/influxdb3_cache/src/last_cache/mod.rs b/influxdb3_cache/src/last_cache/mod.rs index 3f17d7c36f..2b01f09d7b 100644 --- a/influxdb3_cache/src/last_cache/mod.rs +++ b/influxdb3_cache/src/last_cache/mod.rs @@ -1243,6 +1243,7 @@ mod tests { // This is the function we are testing, which initializes the LastCacheProvider from the catalog: let provider = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await .expect("create last cache provider from catalog"); // There should be a total of 3 caches: assert_eq!(3, provider.size()); @@ -1271,7 +1272,9 @@ mod tests { .await; // create a last cache provider so we can use it to create our UDTF provider: - let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap(); + let provider = LastCacheProvider::new_from_catalog(writer.catalog()) + .await + .unwrap(); writer .catalog() .create_last_cache( @@ -1526,7 +1529,9 @@ mod tests { let _ = writer .write_lp_to_write_batch("cpu,region=us-east,host=a usage=99,temp=88", 0) .await; - let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap(); + let provider = LastCacheProvider::new_from_catalog(writer.catalog()) + .await + .unwrap(); writer .catalog() .create_last_cache( diff --git a/influxdb3_cache/src/last_cache/provider.rs b/influxdb3_cache/src/last_cache/provider.rs index 32a3c3929e..b1fe0eba37 100644 --- a/influxdb3_cache/src/last_cache/provider.rs +++ b/influxdb3_cache/src/last_cache/provider.rs @@ -3,7 +3,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; use influxdb3_catalog::{ - catalog::{Catalog, CatalogBroadcastReceiver}, + catalog::Catalog, + channel::CatalogUpdateReceiver, log::{ CatalogBatch, DatabaseCatalogOp, DeleteLastCacheLog, LastCacheDefinition, LastCacheValueColumnsDef, SoftDeleteTableLog, @@ -11,9 +12,8 @@ use influxdb3_catalog::{ }; use influxdb3_id::{DbId, LastCacheId, TableId}; use influxdb3_wal::{WalContents, WalOp}; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::debug; use parking_lot::RwLock; -use tokio::sync::broadcast::error::RecvError; use super::{ CreateLastCacheArgs, Error, @@ -37,7 +37,7 @@ impl std::fmt::Debug for LastCacheProvider { impl LastCacheProvider { /// Initialize a [`LastCacheProvider`] from a [`Catalog`] - pub fn new_from_catalog(catalog: Arc) -> Result, Error> { + pub async fn new_from_catalog(catalog: Arc) -> Result, Error> { let provider = Arc::new(LastCacheProvider { catalog: Arc::clone(&catalog), cache_map: Default::default(), @@ -73,18 +73,21 @@ impl LastCacheProvider { } } - background_catalog_update(Arc::clone(&provider), catalog.subscribe_to_updates()); + background_catalog_update( + Arc::clone(&provider), + catalog.subscribe_to_updates("last_cache").await, + ); Ok(provider) } /// Initialize a [`LastCacheProvider`] from a [`Catalog`] and run a background process to /// evict expired entries from the cache - pub fn new_from_catalog_with_background_eviction( + pub async fn new_from_catalog_with_background_eviction( catalog: Arc, eviction_interval: Duration, ) -> Result, Error> { - let provider = Self::new_from_catalog(catalog)?; + let provider = Self::new_from_catalog(catalog).await?; background_eviction_process(Arc::clone(&provider), eviction_interval); @@ -320,53 +323,39 @@ impl LastCacheProvider { fn background_catalog_update( provider: Arc, - mut subscription: CatalogBroadcastReceiver, + mut subscription: CatalogUpdateReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - loop { - match subscription.recv().await { - Ok(catalog_update) => { - for batch in catalog_update - .batches() - .filter_map(CatalogBatch::as_database) - { - for op in batch.ops.iter() { - match op { - DatabaseCatalogOp::SoftDeleteDatabase(_) => { - provider.delete_caches_for_db(&batch.database_id); - } - DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { - table_id, - .. - }) => { - provider.delete_caches_for_table(&batch.database_id, table_id); - } - DatabaseCatalogOp::CreateLastCache(log) => { - provider.create_cache_from_definition(batch.database_id, log); - } - DatabaseCatalogOp::DeleteLastCache(DeleteLastCacheLog { - table_id, - id, - .. - }) => { - // This only errors when the cache isn't there, so we ignore the - // error... - let _ = provider.delete_cache(&batch.database_id, table_id, id); - } - _ => (), - } + while let Some(catalog_update) = subscription.recv().await { + for batch in catalog_update + .batches() + .filter_map(CatalogBatch::as_database) + { + for op in batch.ops.iter() { + match op { + DatabaseCatalogOp::SoftDeleteDatabase(_) => { + provider.delete_caches_for_db(&batch.database_id); } + DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { + table_id, .. + }) => { + provider.delete_caches_for_table(&batch.database_id, table_id); + } + DatabaseCatalogOp::CreateLastCache(log) => { + provider.create_cache_from_definition(batch.database_id, log); + } + DatabaseCatalogOp::DeleteLastCache(DeleteLastCacheLog { + table_id, + id, + .. + }) => { + // This only errors when the cache isn't there, so we ignore the + // error... + let _ = provider.delete_cache(&batch.database_id, table_id, id); + } + _ => (), } } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_messages_skipped)) => { - // TODO: in this case, we would need to re-initialize the last cache provider - // from the catalog, if possible. - warn!( - num_messages_skipped, - "last cache provider catalog subscription is lagging" - ); - } } } }) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 2a7ce9e0d4..fa0567a01b 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -16,14 +16,14 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::hash::Hash; use std::sync::Arc; -use tokio::sync::{Mutex, MutexGuard, broadcast}; -use update::CatalogUpdate; +use tokio::sync::{Mutex, MutexGuard}; use uuid::Uuid; mod update; pub use schema::{InfluxColumnType, InfluxFieldType}; -pub use update::{DatabaseCatalogTransaction, Prompt}; +pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt}; +use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver}; use crate::log::{ CreateDatabaseLog, DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode, RegisterNodeLog, @@ -80,20 +80,8 @@ static CATALOG_WRITE_PERMIT: Mutex = /// time that the permit was acquired. pub type CatalogWritePermit = MutexGuard<'static, CatalogSequenceNumber>; -const CATALOG_BROADCAST_CHANNEL_CAPACITY: usize = 10_000; - -pub type CatalogBroadcastSender = broadcast::Sender>; -pub type CatalogBroadcastReceiver = broadcast::Receiver>; - pub struct Catalog { - /// Channel for broadcasting updates to other components that must handle `CatalogOp`s - /// - /// # Implementation Note - /// - /// This currently uses a `tokio::broadcast` channel, which can lead to dropped messages if - /// the channel fills up. If that is a concern a more durable form of broadcasting single - /// producer to multiple consumer messages would need to be implemented. - channel: CatalogBroadcastSender, + subscriptions: Arc>, time_provider: Arc, /// Connection to the object store for managing persistence and updates to the catalog store: ObjectStoreCatalog, @@ -128,14 +116,14 @@ impl Catalog { let node_id = catalog_id.into(); let store = ObjectStoreCatalog::new(Arc::clone(&node_id), CATALOG_CHECKPOINT_INTERVAL, store); - let (channel, _) = broadcast::channel(CATALOG_BROADCAST_CHANNEL_CAPACITY); + let subscriptions = Default::default(); store .load_or_create_catalog() .await .map_err(Into::into) .map(RwLock::new) .map(|inner| Self { - channel, + subscriptions, time_provider, store, inner, @@ -150,8 +138,8 @@ impl Catalog { self.inner.read().catalog_uuid } - pub fn subscribe_to_updates(&self) -> broadcast::Receiver> { - self.channel.subscribe() + pub async fn subscribe_to_updates(&self, name: &'static str) -> CatalogUpdateReceiver { + self.subscriptions.write().await.subscribe(name) } pub fn object_store(&self) -> Arc { @@ -363,10 +351,10 @@ impl Catalog { ) -> Result { let store = ObjectStoreCatalog::new(catalog_id, checkpoint_interval, store); let inner = store.load_or_create_catalog().await?; - let (channel, _) = broadcast::channel(CATALOG_BROADCAST_CHANNEL_CAPACITY); + let subscriptions = Default::default(); Ok(Self { - channel, + subscriptions, time_provider, store, inner: RwLock::new(inner), diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index a41ddedd7e..e95e8bd4dd 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -83,7 +83,7 @@ impl Catalog { UpdatePrompt::Applied => { self.apply_ordered_catalog_batch(&ordered_batch, &permit); self.background_checkpoint(&ordered_batch); - self.broadcast_update(ordered_batch.into_batch()); + self.broadcast_update(ordered_batch.into_batch()).await?; Ok(Prompt::Success(self.sequence_number())) } } @@ -676,7 +676,8 @@ impl Catalog { UpdatePrompt::Applied => { self.apply_ordered_catalog_batch(&ordered_batch, &permit); self.background_checkpoint(&ordered_batch); - self.broadcast_update(ordered_batch.clone().into_batch()); + self.broadcast_update(ordered_batch.clone().into_batch()) + .await?; return Ok(Some(ordered_batch)); } } @@ -747,7 +748,7 @@ impl Catalog { .inspect_err(|error| debug!(?error, "failed to fetch next catalog sequence"))? { let batch = self.apply_ordered_catalog_batch(&ordered_catalog_batch, permit); - self.broadcast_update(batch); + self.broadcast_update(batch).await?; sequence_number = sequence_number.next(); if update_until.is_some_and(|max_sequence| sequence_number > max_sequence) { break; @@ -757,11 +758,13 @@ impl Catalog { } /// Broadcast a `CatalogUpdate` to all subscribed components in the system. - fn broadcast_update(&self, update: impl Into) { - if let Err(send_error) = self.channel.send(Arc::new(update.into())) { - info!("nothing listening for catalog updates"); - trace!(?send_error, "nothing listening for catalog updates"); - } + async fn broadcast_update(&self, update: impl Into) -> Result<()> { + self.subscriptions + .write() + .await + .send_update(Arc::new(update.into())) + .await?; + Ok(()) } /// Persist the catalog as a checkpoint in the background if we are at the _n_th sequence @@ -807,7 +810,7 @@ pub struct CatalogUpdate { } impl CatalogUpdate { - pub fn batches(&self) -> impl Iterator { + pub(crate) fn batches(&self) -> impl Iterator { self.batches.iter() } } diff --git a/influxdb3_catalog/src/channel.rs b/influxdb3_catalog/src/channel.rs new file mode 100644 index 0000000000..2287cf0c47 --- /dev/null +++ b/influxdb3_catalog/src/channel.rs @@ -0,0 +1,161 @@ +use std::sync::Arc; + +use anyhow::Context; +use futures::future::try_join_all; +use observability_deps::tracing::warn; +use tokio::sync::{mpsc, oneshot}; + +use crate::{catalog::CatalogUpdate, log::CatalogBatch}; + +#[derive(Debug, thiserror::Error)] +#[error("error in catalog update subscribers: {0:?}")] +pub struct SubscriptionError(#[from] anyhow::Error); + +const CATALOG_SUBSCRIPTION_BUFFER_SIZE: usize = 10_000; + +type CatalogUpdateSender = mpsc::Sender; +pub type CatalogUpdateReceiver = mpsc::Receiver; + +/// A message containing a set of `CatalogUpdate`s that can be handled by subscribers to the +/// `Catalog`. +/// +/// The response is sent in the `Drop` implementation of this type, so that the consumer of these +/// messages does not need to worry about sending the response back to the catalog on broadcast. +pub struct CatalogUpdateMessage { + update: Arc, + tx: Option>, +} + +impl CatalogUpdateMessage { + /// Create a new `CatalogUpdateMessage` + fn new(update: Arc, tx: oneshot::Sender<()>) -> Self { + Self { + update, + tx: Some(tx), + } + } + + /// Iterate over the `CatalogBatch`s in the update + pub fn batches(&self) -> impl Iterator { + self.update.batches() + } +} + +impl Drop for CatalogUpdateMessage { + /// Send the response to the catalog via the oneshot sender + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx + .send(()) + .inspect_err(|error| warn!(?error, "unable to send ACK for catalog update")); + } + } +} + +impl std::fmt::Debug for CatalogUpdateMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CatalogUpdateMessage") + .field("update", &self.update) + .finish() + } +} + +#[derive(Debug, Default)] +pub(crate) struct CatalogSubscriptions { + subscriptions: hashbrown::HashMap, CatalogUpdateSender>, +} + +impl CatalogSubscriptions { + /// Subscribe to the catalog for updates. + /// + /// This allows components in the system to listen for updates made to the catalog + /// and handle/apply them as needed. A `subscription_name` is provided that identifies the + /// component subscribing. + /// + /// # Panics + /// + /// If the provided `subscription_name` has already been used, this will panic. + pub(crate) fn subscribe(&mut self, subscription_name: &'static str) -> CatalogUpdateReceiver { + let (tx, rx) = mpsc::channel(CATALOG_SUBSCRIPTION_BUFFER_SIZE); + assert!( + self.subscriptions + .insert(Arc::from(subscription_name), tx) + .is_none(), + "attempted to subscribe to catalog with same component name more than once, \ + name: {subscription_name}" + ); + rx + } + + pub(crate) async fn send_update( + &self, + update: Arc, + ) -> Result<(), SubscriptionError> { + let mut responses = vec![]; + for (name, sub) in self + .subscriptions + .iter() + .map(|(n, s)| (Arc::clone(n), s.clone())) + { + let update_cloned = Arc::clone(&update); + responses.push(tokio::spawn(async move { + let (tx, rx) = oneshot::channel(); + sub.send(CatalogUpdateMessage::new(update_cloned, tx)) + .await + .with_context(|| format!("failed to send update to {name}"))?; + rx.await + .with_context(|| format!("failed to receive response from {name}"))?; + Ok(()) + })); + } + + try_join_all(responses) + .await + .context("failed to collect responses from catalog subscribers")? + .into_iter() + .collect::, anyhow::Error>>()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use observability_deps::tracing::debug; + + use crate::{ + catalog::Catalog, + log::{CatalogBatch, FieldDataType}, + }; + + #[test_log::test(tokio::test)] + async fn test_catalog_update_sub() { + let catalog = Catalog::new_in_memory("cats").await.unwrap(); + let mut sub = catalog.subscribe_to_updates("test_sub").await; + let handle = tokio::spawn(async move { + let mut n_updates = 0; + while let Some(update) = sub.recv().await { + debug!(?update, "got an update"); + for b in update.batches() { + match b { + CatalogBatch::Node(_) => (), + CatalogBatch::Database(_) => (), + } + } + n_updates += 1; + } + n_updates + }); + + catalog.create_database("foo").await.unwrap(); + catalog + .create_table("foo", "bar", &["tag"], &[("field", FieldDataType::String)]) + .await + .unwrap(); + + // drop the catalog so the channel closes and the handle above doesn't hang... + drop(catalog); + + let n_updates = handle.await.unwrap(); + assert_eq!(2, n_updates); + } +} diff --git a/influxdb3_catalog/src/error.rs b/influxdb3_catalog/src/error.rs index 81a036e039..9ee075770c 100644 --- a/influxdb3_catalog/src/error.rs +++ b/influxdb3_catalog/src/error.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::anyhow; use schema::InfluxColumnType; -use crate::{catalog::Catalog, object_store::ObjectStoreCatalogError}; +use crate::{catalog::Catalog, channel::SubscriptionError, object_store::ObjectStoreCatalogError}; #[derive(Debug, thiserror::Error)] pub enum CatalogError { @@ -92,6 +92,9 @@ pub enum CatalogError { existing: String, }, + #[error("catalog subscription error: {0}")] + Subscription(#[from] SubscriptionError), + #[error(transparent)] Other(#[from] anyhow::Error), diff --git a/influxdb3_catalog/src/lib.rs b/influxdb3_catalog/src/lib.rs index c0e1875737..fd914e82c1 100644 --- a/influxdb3_catalog/src/lib.rs +++ b/influxdb3_catalog/src/lib.rs @@ -1,4 +1,5 @@ pub mod catalog; +pub mod channel; pub mod error; pub mod id; pub mod log; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 9f1e32baa6..81d36ea884 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -8,7 +8,8 @@ use bytes::Bytes; use hashbrown::HashMap; use hyper::{Body, Response}; use influxdb3_catalog::CatalogError; -use influxdb3_catalog::catalog::{Catalog, CatalogBroadcastReceiver}; +use influxdb3_catalog::catalog::Catalog; +use influxdb3_catalog::channel::CatalogUpdateReceiver; use influxdb3_catalog::log::{ CatalogBatch, DatabaseCatalogOp, DeleteTriggerLog, PluginType, TriggerDefinition, TriggerIdentifier, TriggerSpecificationDefinition, ValidPluginFilename, @@ -29,7 +30,6 @@ use std::any::Any; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use tokio::sync::broadcast::error::RecvError; use tokio::sync::oneshot::Receiver; use tokio::sync::{RwLock, mpsc, oneshot}; @@ -205,7 +205,7 @@ impl PluginChannels { } impl ProcessingEngineManagerImpl { - pub fn new( + pub async fn new( environment: ProcessingEngineEnvironmentManager, catalog: Arc, node_id: impl Into>, @@ -225,7 +225,7 @@ impl ProcessingEngineManagerImpl { } } - let catalog_sub = catalog.subscribe_to_updates(); + let catalog_sub = catalog.subscribe_to_updates("processing_engine").await; let cache = Arc::new(Mutex::new(CacheStore::new( Arc::clone(&time_provider), @@ -668,89 +668,74 @@ pub(crate) struct Request { fn background_catalog_update( processing_engine_manager: Arc, - mut subscription: CatalogBroadcastReceiver, + mut subscription: CatalogUpdateReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - loop { - match subscription.recv().await { - Ok(catalog_update) => { - for batch in catalog_update - .batches() - .filter_map(CatalogBatch::as_database) - { - for op in batch.ops.iter() { - let processing_engine_manager = Arc::clone(&processing_engine_manager); - match op { - DatabaseCatalogOp::CreateTrigger(TriggerDefinition { - trigger_name, - database_name, - disabled, - .. - }) => { - if !disabled { - if let Err(error) = processing_engine_manager - .run_trigger(database_name, trigger_name) - .await - { - error!(?error, "failed to run the created trigger"); - } - } + while let Some(catalog_update) = subscription.recv().await { + for batch in catalog_update + .batches() + .filter_map(CatalogBatch::as_database) + { + for op in batch.ops.iter() { + let processing_engine_manager = Arc::clone(&processing_engine_manager); + match op { + DatabaseCatalogOp::CreateTrigger(TriggerDefinition { + trigger_name, + database_name, + disabled, + .. + }) => { + if !disabled { + if let Err(error) = processing_engine_manager + .run_trigger(database_name, trigger_name) + .await + { + error!(?error, "failed to run the created trigger"); } - DatabaseCatalogOp::EnableTrigger(TriggerIdentifier { - db_name, - trigger_name, - .. - }) => { - if let Err(error) = processing_engine_manager - .run_trigger(db_name, trigger_name) - .await - { - error!(?error, "failed to run the trigger"); - } - } - DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog { - trigger_name, - force: true, - .. - }) => { - if let Err(error) = processing_engine_manager - .stop_trigger(&batch.database_name, trigger_name) - .await - { - error!(?error, "failed to disable the trigger"); - } - } - DatabaseCatalogOp::DisableTrigger(TriggerIdentifier { - db_name, - trigger_name, - .. - }) => { - if let Err(error) = processing_engine_manager - .stop_trigger(db_name, trigger_name) - .await - { - error!(?error, "failed to disable the trigger"); - } - } - // NOTE(trevor/catalog-refactor): it is not clear that any other operation needs to be - // handled, based on the existing code, but we could potentially - // handle database deletion, trigger creation/deletion/enable here - _ => (), } } + DatabaseCatalogOp::EnableTrigger(TriggerIdentifier { + db_name, + trigger_name, + .. + }) => { + if let Err(error) = processing_engine_manager + .run_trigger(db_name, trigger_name) + .await + { + error!(?error, "failed to run the trigger"); + } + } + DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog { + trigger_name, + force: true, + .. + }) => { + if let Err(error) = processing_engine_manager + .stop_trigger(&batch.database_name, trigger_name) + .await + { + error!(?error, "failed to disable the trigger"); + } + } + DatabaseCatalogOp::DisableTrigger(TriggerIdentifier { + db_name, + trigger_name, + .. + }) => { + if let Err(error) = processing_engine_manager + .stop_trigger(db_name, trigger_name) + .await + { + error!(?error, "failed to disable the trigger"); + } + } + // NOTE(trevor/catalog-refactor): it is not clear that any other operation needs to be + // handled, based on the existing code, but we could potentially + // handle database deletion, trigger creation/deletion/enable here + _ => (), } } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_messages_skipped)) => { - // NOTE(trevor/catalog-refactor): in this case, we would need to re-initialize the proc eng manager - // from the catalog, if possible; but, it may be more desireable to not have this - // situation be possible at all.. The use of a broadcast channel should only - // be temporary, so this particular error variant should go away in future - warn!( - num_messages_skipped, - "processing engine manager catalog subscription is lagging" - ); - } } } }) @@ -999,11 +984,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { persister, @@ -1049,7 +1037,8 @@ def process_writes(influxdb3_local, table_batches, args=None): qe, time_provider, sys_event_store, - ), + ) + .await, file, ) } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index a51c7202ef..1cc477fc3c 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -786,11 +786,14 @@ mod tests { influxdb3_write::write_buffer::WriteBufferImplArgs { persister: Arc::clone(&persister), catalog: Arc::clone(&catalog), - last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)) + .await + .unwrap(), distinct_cache: DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider) as _, Arc::clone(&catalog), ) + .await .unwrap(), time_provider: Arc::clone(&time_provider) as _, executor: Arc::clone(&exec), @@ -847,7 +850,8 @@ mod tests { Arc::clone(&query_executor) as _, Arc::clone(&time_provider) as _, sys_events_store, - ); + ) + .await; let server = ServerBuilder::new(common_state) .write_buffer(Arc::clone(&write_buffer)) diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index cef3738109..89420a8bb7 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -825,11 +825,14 @@ mod tests { let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs { persister, catalog: Arc::clone(&catalog), - last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)) + .await + .unwrap(), distinct_cache: DistinctCacheProvider::new_from_catalog( Arc::::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(), time_provider: Arc::::clone(&time_provider), executor: Arc::clone(&exec), diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 92a190ef27..8e6093f874 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -740,11 +740,14 @@ mod tests { "test_host", Arc::clone(&time_provider), )); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&persister), @@ -838,11 +841,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister, @@ -925,12 +931,14 @@ mod tests { .await .unwrap(), ); - let last_cache = - LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&wbuf.persister), @@ -1180,11 +1188,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&write_buffer.persister), @@ -3055,11 +3066,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { persister, diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index e0d7916c8e..18c6b2f2f8 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -667,11 +667,14 @@ mod tests { executor: Arc::clone(&exec), catalog: Arc::clone(&catalog), persister: Arc::clone(&persister), - last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)) + .await + .unwrap(), distinct_cache_provider: DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(), persisted_files: Arc::new(PersistedFiles::new()), parquet_cache: None,