diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 3ca8138221..d39f4fc23a 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -562,6 +562,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&catalog) as _, config.last_cache_eviction_interval.into(), ) + .await .map_err(Error::InitializeLastCache)?; let distinct_cache = DistinctCacheProvider::new_from_catalog_with_background_eviction( @@ -569,6 +570,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&catalog), config.distinct_cache_eviction_interval.into(), ) + .await .map_err(Error::InitializeDistinctCache)?; let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs { @@ -637,7 +639,8 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&query_executor) as _, Arc::clone(&time_provider) as _, sys_events_store, - ); + ) + .await; let builder = ServerBuilder::new(common_state) .max_request_size(config.max_http_request_size) diff --git a/influxdb3_cache/src/distinct_cache/mod.rs b/influxdb3_cache/src/distinct_cache/mod.rs index 5ceaeb9dd3..a0ffa21ba7 100644 --- a/influxdb3_cache/src/distinct_cache/mod.rs +++ b/influxdb3_cache/src/distinct_cache/mod.rs @@ -419,7 +419,9 @@ mod tests { debug!(catlog = ?writer.catalog(), "writer catalog"); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let distinct_provider = - DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()) + .await + .unwrap(); writer .catalog() .create_distinct_cache( @@ -865,7 +867,9 @@ mod tests { ).await; let distinct_provider = - DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap(); + DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()) + .await + .unwrap(); writer .catalog() .create_distinct_cache( diff --git a/influxdb3_cache/src/distinct_cache/provider.rs b/influxdb3_cache/src/distinct_cache/provider.rs index 1d59564797..884481f364 100644 --- a/influxdb3_cache/src/distinct_cache/provider.rs +++ b/influxdb3_cache/src/distinct_cache/provider.rs @@ -2,7 +2,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::datatypes::SchemaRef; use influxdb3_catalog::{ - catalog::{Catalog, CatalogBroadcastReceiver}, + catalog::Catalog, + channel::CatalogUpdateReceiver, log::{ CatalogBatch, DatabaseCatalogOp, DeleteDistinctCacheLog, DistinctCacheDefinition, SoftDeleteTableLog, @@ -11,9 +12,7 @@ use influxdb3_catalog::{ use influxdb3_id::{DbId, DistinctCacheId, TableId}; use influxdb3_wal::{WalContents, WalOp}; use iox_time::TimeProvider; -use observability_deps::tracing::warn; use parking_lot::RwLock; -use tokio::sync::broadcast::error::RecvError; use super::{ CacheError, @@ -46,7 +45,7 @@ pub struct DistinctCacheProvider { impl DistinctCacheProvider { /// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's /// `cache_map` from the definitions in the catalog. - pub fn new_from_catalog( + pub async fn new_from_catalog( time_provider: Arc, catalog: Arc, ) -> Result, ProviderError> { @@ -72,7 +71,10 @@ impl DistinctCacheProvider { } } - background_catalog_update(Arc::clone(&provider), catalog.subscribe_to_updates()); + background_catalog_update( + Arc::clone(&provider), + catalog.subscribe_to_updates("distinct_cache").await, + ); Ok(provider) } @@ -81,12 +83,12 @@ impl DistinctCacheProvider { /// `cache_map` from the definitions in the catalog. This starts a background process that /// runs on the provided `eviction_interval` to perform eviction on all of the caches /// in the created [`DistinctCacheProvider`]'s `cache_map`. - pub fn new_from_catalog_with_background_eviction( + pub async fn new_from_catalog_with_background_eviction( time_provider: Arc, catalog: Arc, eviction_interval: Duration, ) -> Result, ProviderError> { - let provider = Self::new_from_catalog(time_provider, catalog)?; + let provider = Self::new_from_catalog(time_provider, catalog).await?; background_eviction_process(Arc::clone(&provider), eviction_interval); @@ -267,58 +269,41 @@ impl DistinctCacheProvider { fn background_catalog_update( provider: Arc, - mut subscription: CatalogBroadcastReceiver, + mut subscription: CatalogUpdateReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - loop { - match subscription.recv().await { - Ok(catalog_update) => { - for batch in catalog_update - .batches() - .filter_map(CatalogBatch::as_database) - { - for op in batch.ops.iter() { - match op { - DatabaseCatalogOp::SoftDeleteDatabase(_) => { - provider.delete_caches_for_db(&batch.database_id); - } - DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { - database_id, - table_id, - .. - }) => { - provider.delete_caches_for_db_and_table(database_id, table_id); - } - DatabaseCatalogOp::CreateDistinctCache(log) => { - provider.create_from_catalog(batch.database_id, log); - } - DatabaseCatalogOp::DeleteDistinctCache( - DeleteDistinctCacheLog { - table_id, cache_id, .. - }, - ) => { - // This only errors when the cache isn't there, so we ignore the - // error... - let _ = provider.delete_cache( - &batch.database_id, - table_id, - cache_id, - ); - } - _ => (), - } + while let Some(catalog_update) = subscription.recv().await { + for batch in catalog_update + .batches() + .filter_map(CatalogBatch::as_database) + { + for op in batch.ops.iter() { + match op { + DatabaseCatalogOp::SoftDeleteDatabase(_) => { + provider.delete_caches_for_db(&batch.database_id); } + DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { + database_id, + table_id, + .. + }) => { + provider.delete_caches_for_db_and_table(database_id, table_id); + } + DatabaseCatalogOp::CreateDistinctCache(log) => { + provider.create_from_catalog(batch.database_id, log); + } + DatabaseCatalogOp::DeleteDistinctCache(DeleteDistinctCacheLog { + table_id, + cache_id, + .. + }) => { + // This only errors when the cache isn't there, so we ignore the + // error... + let _ = provider.delete_cache(&batch.database_id, table_id, cache_id); + } + _ => (), } } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_messages_skipped)) => { - // TODO: in this case, we would need to re-initialize the distinct cache provider - // from the catalog, if possible. - warn!( - num_messages_skipped, - "distinct cache provider catalog subscription is lagging" - ); - } } } }) diff --git a/influxdb3_cache/src/last_cache/mod.rs b/influxdb3_cache/src/last_cache/mod.rs index 3f17d7c36f..2b01f09d7b 100644 --- a/influxdb3_cache/src/last_cache/mod.rs +++ b/influxdb3_cache/src/last_cache/mod.rs @@ -1243,6 +1243,7 @@ mod tests { // This is the function we are testing, which initializes the LastCacheProvider from the catalog: let provider = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await .expect("create last cache provider from catalog"); // There should be a total of 3 caches: assert_eq!(3, provider.size()); @@ -1271,7 +1272,9 @@ mod tests { .await; // create a last cache provider so we can use it to create our UDTF provider: - let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap(); + let provider = LastCacheProvider::new_from_catalog(writer.catalog()) + .await + .unwrap(); writer .catalog() .create_last_cache( @@ -1526,7 +1529,9 @@ mod tests { let _ = writer .write_lp_to_write_batch("cpu,region=us-east,host=a usage=99,temp=88", 0) .await; - let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap(); + let provider = LastCacheProvider::new_from_catalog(writer.catalog()) + .await + .unwrap(); writer .catalog() .create_last_cache( diff --git a/influxdb3_cache/src/last_cache/provider.rs b/influxdb3_cache/src/last_cache/provider.rs index 32a3c3929e..b1fe0eba37 100644 --- a/influxdb3_cache/src/last_cache/provider.rs +++ b/influxdb3_cache/src/last_cache/provider.rs @@ -3,7 +3,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; use influxdb3_catalog::{ - catalog::{Catalog, CatalogBroadcastReceiver}, + catalog::Catalog, + channel::CatalogUpdateReceiver, log::{ CatalogBatch, DatabaseCatalogOp, DeleteLastCacheLog, LastCacheDefinition, LastCacheValueColumnsDef, SoftDeleteTableLog, @@ -11,9 +12,8 @@ use influxdb3_catalog::{ }; use influxdb3_id::{DbId, LastCacheId, TableId}; use influxdb3_wal::{WalContents, WalOp}; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::debug; use parking_lot::RwLock; -use tokio::sync::broadcast::error::RecvError; use super::{ CreateLastCacheArgs, Error, @@ -37,7 +37,7 @@ impl std::fmt::Debug for LastCacheProvider { impl LastCacheProvider { /// Initialize a [`LastCacheProvider`] from a [`Catalog`] - pub fn new_from_catalog(catalog: Arc) -> Result, Error> { + pub async fn new_from_catalog(catalog: Arc) -> Result, Error> { let provider = Arc::new(LastCacheProvider { catalog: Arc::clone(&catalog), cache_map: Default::default(), @@ -73,18 +73,21 @@ impl LastCacheProvider { } } - background_catalog_update(Arc::clone(&provider), catalog.subscribe_to_updates()); + background_catalog_update( + Arc::clone(&provider), + catalog.subscribe_to_updates("last_cache").await, + ); Ok(provider) } /// Initialize a [`LastCacheProvider`] from a [`Catalog`] and run a background process to /// evict expired entries from the cache - pub fn new_from_catalog_with_background_eviction( + pub async fn new_from_catalog_with_background_eviction( catalog: Arc, eviction_interval: Duration, ) -> Result, Error> { - let provider = Self::new_from_catalog(catalog)?; + let provider = Self::new_from_catalog(catalog).await?; background_eviction_process(Arc::clone(&provider), eviction_interval); @@ -320,53 +323,39 @@ impl LastCacheProvider { fn background_catalog_update( provider: Arc, - mut subscription: CatalogBroadcastReceiver, + mut subscription: CatalogUpdateReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - loop { - match subscription.recv().await { - Ok(catalog_update) => { - for batch in catalog_update - .batches() - .filter_map(CatalogBatch::as_database) - { - for op in batch.ops.iter() { - match op { - DatabaseCatalogOp::SoftDeleteDatabase(_) => { - provider.delete_caches_for_db(&batch.database_id); - } - DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { - table_id, - .. - }) => { - provider.delete_caches_for_table(&batch.database_id, table_id); - } - DatabaseCatalogOp::CreateLastCache(log) => { - provider.create_cache_from_definition(batch.database_id, log); - } - DatabaseCatalogOp::DeleteLastCache(DeleteLastCacheLog { - table_id, - id, - .. - }) => { - // This only errors when the cache isn't there, so we ignore the - // error... - let _ = provider.delete_cache(&batch.database_id, table_id, id); - } - _ => (), - } + while let Some(catalog_update) = subscription.recv().await { + for batch in catalog_update + .batches() + .filter_map(CatalogBatch::as_database) + { + for op in batch.ops.iter() { + match op { + DatabaseCatalogOp::SoftDeleteDatabase(_) => { + provider.delete_caches_for_db(&batch.database_id); } + DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog { + table_id, .. + }) => { + provider.delete_caches_for_table(&batch.database_id, table_id); + } + DatabaseCatalogOp::CreateLastCache(log) => { + provider.create_cache_from_definition(batch.database_id, log); + } + DatabaseCatalogOp::DeleteLastCache(DeleteLastCacheLog { + table_id, + id, + .. + }) => { + // This only errors when the cache isn't there, so we ignore the + // error... + let _ = provider.delete_cache(&batch.database_id, table_id, id); + } + _ => (), } } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_messages_skipped)) => { - // TODO: in this case, we would need to re-initialize the last cache provider - // from the catalog, if possible. - warn!( - num_messages_skipped, - "last cache provider catalog subscription is lagging" - ); - } } } }) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 2a7ce9e0d4..fa0567a01b 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -16,14 +16,14 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::hash::Hash; use std::sync::Arc; -use tokio::sync::{Mutex, MutexGuard, broadcast}; -use update::CatalogUpdate; +use tokio::sync::{Mutex, MutexGuard}; use uuid::Uuid; mod update; pub use schema::{InfluxColumnType, InfluxFieldType}; -pub use update::{DatabaseCatalogTransaction, Prompt}; +pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt}; +use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver}; use crate::log::{ CreateDatabaseLog, DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode, RegisterNodeLog, @@ -80,20 +80,8 @@ static CATALOG_WRITE_PERMIT: Mutex = /// time that the permit was acquired. pub type CatalogWritePermit = MutexGuard<'static, CatalogSequenceNumber>; -const CATALOG_BROADCAST_CHANNEL_CAPACITY: usize = 10_000; - -pub type CatalogBroadcastSender = broadcast::Sender>; -pub type CatalogBroadcastReceiver = broadcast::Receiver>; - pub struct Catalog { - /// Channel for broadcasting updates to other components that must handle `CatalogOp`s - /// - /// # Implementation Note - /// - /// This currently uses a `tokio::broadcast` channel, which can lead to dropped messages if - /// the channel fills up. If that is a concern a more durable form of broadcasting single - /// producer to multiple consumer messages would need to be implemented. - channel: CatalogBroadcastSender, + subscriptions: Arc>, time_provider: Arc, /// Connection to the object store for managing persistence and updates to the catalog store: ObjectStoreCatalog, @@ -128,14 +116,14 @@ impl Catalog { let node_id = catalog_id.into(); let store = ObjectStoreCatalog::new(Arc::clone(&node_id), CATALOG_CHECKPOINT_INTERVAL, store); - let (channel, _) = broadcast::channel(CATALOG_BROADCAST_CHANNEL_CAPACITY); + let subscriptions = Default::default(); store .load_or_create_catalog() .await .map_err(Into::into) .map(RwLock::new) .map(|inner| Self { - channel, + subscriptions, time_provider, store, inner, @@ -150,8 +138,8 @@ impl Catalog { self.inner.read().catalog_uuid } - pub fn subscribe_to_updates(&self) -> broadcast::Receiver> { - self.channel.subscribe() + pub async fn subscribe_to_updates(&self, name: &'static str) -> CatalogUpdateReceiver { + self.subscriptions.write().await.subscribe(name) } pub fn object_store(&self) -> Arc { @@ -363,10 +351,10 @@ impl Catalog { ) -> Result { let store = ObjectStoreCatalog::new(catalog_id, checkpoint_interval, store); let inner = store.load_or_create_catalog().await?; - let (channel, _) = broadcast::channel(CATALOG_BROADCAST_CHANNEL_CAPACITY); + let subscriptions = Default::default(); Ok(Self { - channel, + subscriptions, time_provider, store, inner: RwLock::new(inner), diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index a41ddedd7e..e95e8bd4dd 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -83,7 +83,7 @@ impl Catalog { UpdatePrompt::Applied => { self.apply_ordered_catalog_batch(&ordered_batch, &permit); self.background_checkpoint(&ordered_batch); - self.broadcast_update(ordered_batch.into_batch()); + self.broadcast_update(ordered_batch.into_batch()).await?; Ok(Prompt::Success(self.sequence_number())) } } @@ -676,7 +676,8 @@ impl Catalog { UpdatePrompt::Applied => { self.apply_ordered_catalog_batch(&ordered_batch, &permit); self.background_checkpoint(&ordered_batch); - self.broadcast_update(ordered_batch.clone().into_batch()); + self.broadcast_update(ordered_batch.clone().into_batch()) + .await?; return Ok(Some(ordered_batch)); } } @@ -747,7 +748,7 @@ impl Catalog { .inspect_err(|error| debug!(?error, "failed to fetch next catalog sequence"))? { let batch = self.apply_ordered_catalog_batch(&ordered_catalog_batch, permit); - self.broadcast_update(batch); + self.broadcast_update(batch).await?; sequence_number = sequence_number.next(); if update_until.is_some_and(|max_sequence| sequence_number > max_sequence) { break; @@ -757,11 +758,13 @@ impl Catalog { } /// Broadcast a `CatalogUpdate` to all subscribed components in the system. - fn broadcast_update(&self, update: impl Into) { - if let Err(send_error) = self.channel.send(Arc::new(update.into())) { - info!("nothing listening for catalog updates"); - trace!(?send_error, "nothing listening for catalog updates"); - } + async fn broadcast_update(&self, update: impl Into) -> Result<()> { + self.subscriptions + .write() + .await + .send_update(Arc::new(update.into())) + .await?; + Ok(()) } /// Persist the catalog as a checkpoint in the background if we are at the _n_th sequence @@ -807,7 +810,7 @@ pub struct CatalogUpdate { } impl CatalogUpdate { - pub fn batches(&self) -> impl Iterator { + pub(crate) fn batches(&self) -> impl Iterator { self.batches.iter() } } diff --git a/influxdb3_catalog/src/channel.rs b/influxdb3_catalog/src/channel.rs new file mode 100644 index 0000000000..2287cf0c47 --- /dev/null +++ b/influxdb3_catalog/src/channel.rs @@ -0,0 +1,161 @@ +use std::sync::Arc; + +use anyhow::Context; +use futures::future::try_join_all; +use observability_deps::tracing::warn; +use tokio::sync::{mpsc, oneshot}; + +use crate::{catalog::CatalogUpdate, log::CatalogBatch}; + +#[derive(Debug, thiserror::Error)] +#[error("error in catalog update subscribers: {0:?}")] +pub struct SubscriptionError(#[from] anyhow::Error); + +const CATALOG_SUBSCRIPTION_BUFFER_SIZE: usize = 10_000; + +type CatalogUpdateSender = mpsc::Sender; +pub type CatalogUpdateReceiver = mpsc::Receiver; + +/// A message containing a set of `CatalogUpdate`s that can be handled by subscribers to the +/// `Catalog`. +/// +/// The response is sent in the `Drop` implementation of this type, so that the consumer of these +/// messages does not need to worry about sending the response back to the catalog on broadcast. +pub struct CatalogUpdateMessage { + update: Arc, + tx: Option>, +} + +impl CatalogUpdateMessage { + /// Create a new `CatalogUpdateMessage` + fn new(update: Arc, tx: oneshot::Sender<()>) -> Self { + Self { + update, + tx: Some(tx), + } + } + + /// Iterate over the `CatalogBatch`s in the update + pub fn batches(&self) -> impl Iterator { + self.update.batches() + } +} + +impl Drop for CatalogUpdateMessage { + /// Send the response to the catalog via the oneshot sender + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx + .send(()) + .inspect_err(|error| warn!(?error, "unable to send ACK for catalog update")); + } + } +} + +impl std::fmt::Debug for CatalogUpdateMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CatalogUpdateMessage") + .field("update", &self.update) + .finish() + } +} + +#[derive(Debug, Default)] +pub(crate) struct CatalogSubscriptions { + subscriptions: hashbrown::HashMap, CatalogUpdateSender>, +} + +impl CatalogSubscriptions { + /// Subscribe to the catalog for updates. + /// + /// This allows components in the system to listen for updates made to the catalog + /// and handle/apply them as needed. A `subscription_name` is provided that identifies the + /// component subscribing. + /// + /// # Panics + /// + /// If the provided `subscription_name` has already been used, this will panic. + pub(crate) fn subscribe(&mut self, subscription_name: &'static str) -> CatalogUpdateReceiver { + let (tx, rx) = mpsc::channel(CATALOG_SUBSCRIPTION_BUFFER_SIZE); + assert!( + self.subscriptions + .insert(Arc::from(subscription_name), tx) + .is_none(), + "attempted to subscribe to catalog with same component name more than once, \ + name: {subscription_name}" + ); + rx + } + + pub(crate) async fn send_update( + &self, + update: Arc, + ) -> Result<(), SubscriptionError> { + let mut responses = vec![]; + for (name, sub) in self + .subscriptions + .iter() + .map(|(n, s)| (Arc::clone(n), s.clone())) + { + let update_cloned = Arc::clone(&update); + responses.push(tokio::spawn(async move { + let (tx, rx) = oneshot::channel(); + sub.send(CatalogUpdateMessage::new(update_cloned, tx)) + .await + .with_context(|| format!("failed to send update to {name}"))?; + rx.await + .with_context(|| format!("failed to receive response from {name}"))?; + Ok(()) + })); + } + + try_join_all(responses) + .await + .context("failed to collect responses from catalog subscribers")? + .into_iter() + .collect::, anyhow::Error>>()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use observability_deps::tracing::debug; + + use crate::{ + catalog::Catalog, + log::{CatalogBatch, FieldDataType}, + }; + + #[test_log::test(tokio::test)] + async fn test_catalog_update_sub() { + let catalog = Catalog::new_in_memory("cats").await.unwrap(); + let mut sub = catalog.subscribe_to_updates("test_sub").await; + let handle = tokio::spawn(async move { + let mut n_updates = 0; + while let Some(update) = sub.recv().await { + debug!(?update, "got an update"); + for b in update.batches() { + match b { + CatalogBatch::Node(_) => (), + CatalogBatch::Database(_) => (), + } + } + n_updates += 1; + } + n_updates + }); + + catalog.create_database("foo").await.unwrap(); + catalog + .create_table("foo", "bar", &["tag"], &[("field", FieldDataType::String)]) + .await + .unwrap(); + + // drop the catalog so the channel closes and the handle above doesn't hang... + drop(catalog); + + let n_updates = handle.await.unwrap(); + assert_eq!(2, n_updates); + } +} diff --git a/influxdb3_catalog/src/error.rs b/influxdb3_catalog/src/error.rs index 81a036e039..9ee075770c 100644 --- a/influxdb3_catalog/src/error.rs +++ b/influxdb3_catalog/src/error.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::anyhow; use schema::InfluxColumnType; -use crate::{catalog::Catalog, object_store::ObjectStoreCatalogError}; +use crate::{catalog::Catalog, channel::SubscriptionError, object_store::ObjectStoreCatalogError}; #[derive(Debug, thiserror::Error)] pub enum CatalogError { @@ -92,6 +92,9 @@ pub enum CatalogError { existing: String, }, + #[error("catalog subscription error: {0}")] + Subscription(#[from] SubscriptionError), + #[error(transparent)] Other(#[from] anyhow::Error), diff --git a/influxdb3_catalog/src/lib.rs b/influxdb3_catalog/src/lib.rs index c0e1875737..fd914e82c1 100644 --- a/influxdb3_catalog/src/lib.rs +++ b/influxdb3_catalog/src/lib.rs @@ -1,4 +1,5 @@ pub mod catalog; +pub mod channel; pub mod error; pub mod id; pub mod log; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 9f1e32baa6..81d36ea884 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -8,7 +8,8 @@ use bytes::Bytes; use hashbrown::HashMap; use hyper::{Body, Response}; use influxdb3_catalog::CatalogError; -use influxdb3_catalog::catalog::{Catalog, CatalogBroadcastReceiver}; +use influxdb3_catalog::catalog::Catalog; +use influxdb3_catalog::channel::CatalogUpdateReceiver; use influxdb3_catalog::log::{ CatalogBatch, DatabaseCatalogOp, DeleteTriggerLog, PluginType, TriggerDefinition, TriggerIdentifier, TriggerSpecificationDefinition, ValidPluginFilename, @@ -29,7 +30,6 @@ use std::any::Any; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use tokio::sync::broadcast::error::RecvError; use tokio::sync::oneshot::Receiver; use tokio::sync::{RwLock, mpsc, oneshot}; @@ -205,7 +205,7 @@ impl PluginChannels { } impl ProcessingEngineManagerImpl { - pub fn new( + pub async fn new( environment: ProcessingEngineEnvironmentManager, catalog: Arc, node_id: impl Into>, @@ -225,7 +225,7 @@ impl ProcessingEngineManagerImpl { } } - let catalog_sub = catalog.subscribe_to_updates(); + let catalog_sub = catalog.subscribe_to_updates("processing_engine").await; let cache = Arc::new(Mutex::new(CacheStore::new( Arc::clone(&time_provider), @@ -668,89 +668,74 @@ pub(crate) struct Request { fn background_catalog_update( processing_engine_manager: Arc, - mut subscription: CatalogBroadcastReceiver, + mut subscription: CatalogUpdateReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - loop { - match subscription.recv().await { - Ok(catalog_update) => { - for batch in catalog_update - .batches() - .filter_map(CatalogBatch::as_database) - { - for op in batch.ops.iter() { - let processing_engine_manager = Arc::clone(&processing_engine_manager); - match op { - DatabaseCatalogOp::CreateTrigger(TriggerDefinition { - trigger_name, - database_name, - disabled, - .. - }) => { - if !disabled { - if let Err(error) = processing_engine_manager - .run_trigger(database_name, trigger_name) - .await - { - error!(?error, "failed to run the created trigger"); - } - } + while let Some(catalog_update) = subscription.recv().await { + for batch in catalog_update + .batches() + .filter_map(CatalogBatch::as_database) + { + for op in batch.ops.iter() { + let processing_engine_manager = Arc::clone(&processing_engine_manager); + match op { + DatabaseCatalogOp::CreateTrigger(TriggerDefinition { + trigger_name, + database_name, + disabled, + .. + }) => { + if !disabled { + if let Err(error) = processing_engine_manager + .run_trigger(database_name, trigger_name) + .await + { + error!(?error, "failed to run the created trigger"); } - DatabaseCatalogOp::EnableTrigger(TriggerIdentifier { - db_name, - trigger_name, - .. - }) => { - if let Err(error) = processing_engine_manager - .run_trigger(db_name, trigger_name) - .await - { - error!(?error, "failed to run the trigger"); - } - } - DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog { - trigger_name, - force: true, - .. - }) => { - if let Err(error) = processing_engine_manager - .stop_trigger(&batch.database_name, trigger_name) - .await - { - error!(?error, "failed to disable the trigger"); - } - } - DatabaseCatalogOp::DisableTrigger(TriggerIdentifier { - db_name, - trigger_name, - .. - }) => { - if let Err(error) = processing_engine_manager - .stop_trigger(db_name, trigger_name) - .await - { - error!(?error, "failed to disable the trigger"); - } - } - // NOTE(trevor/catalog-refactor): it is not clear that any other operation needs to be - // handled, based on the existing code, but we could potentially - // handle database deletion, trigger creation/deletion/enable here - _ => (), } } + DatabaseCatalogOp::EnableTrigger(TriggerIdentifier { + db_name, + trigger_name, + .. + }) => { + if let Err(error) = processing_engine_manager + .run_trigger(db_name, trigger_name) + .await + { + error!(?error, "failed to run the trigger"); + } + } + DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog { + trigger_name, + force: true, + .. + }) => { + if let Err(error) = processing_engine_manager + .stop_trigger(&batch.database_name, trigger_name) + .await + { + error!(?error, "failed to disable the trigger"); + } + } + DatabaseCatalogOp::DisableTrigger(TriggerIdentifier { + db_name, + trigger_name, + .. + }) => { + if let Err(error) = processing_engine_manager + .stop_trigger(db_name, trigger_name) + .await + { + error!(?error, "failed to disable the trigger"); + } + } + // NOTE(trevor/catalog-refactor): it is not clear that any other operation needs to be + // handled, based on the existing code, but we could potentially + // handle database deletion, trigger creation/deletion/enable here + _ => (), } } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_messages_skipped)) => { - // NOTE(trevor/catalog-refactor): in this case, we would need to re-initialize the proc eng manager - // from the catalog, if possible; but, it may be more desireable to not have this - // situation be possible at all.. The use of a broadcast channel should only - // be temporary, so this particular error variant should go away in future - warn!( - num_messages_skipped, - "processing engine manager catalog subscription is lagging" - ); - } } } }) @@ -999,11 +984,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { persister, @@ -1049,7 +1037,8 @@ def process_writes(influxdb3_local, table_batches, args=None): qe, time_provider, sys_event_store, - ), + ) + .await, file, ) } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index a51c7202ef..1cc477fc3c 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -786,11 +786,14 @@ mod tests { influxdb3_write::write_buffer::WriteBufferImplArgs { persister: Arc::clone(&persister), catalog: Arc::clone(&catalog), - last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)) + .await + .unwrap(), distinct_cache: DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider) as _, Arc::clone(&catalog), ) + .await .unwrap(), time_provider: Arc::clone(&time_provider) as _, executor: Arc::clone(&exec), @@ -847,7 +850,8 @@ mod tests { Arc::clone(&query_executor) as _, Arc::clone(&time_provider) as _, sys_events_store, - ); + ) + .await; let server = ServerBuilder::new(common_state) .write_buffer(Arc::clone(&write_buffer)) diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index cef3738109..89420a8bb7 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -825,11 +825,14 @@ mod tests { let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs { persister, catalog: Arc::clone(&catalog), - last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)) + .await + .unwrap(), distinct_cache: DistinctCacheProvider::new_from_catalog( Arc::::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(), time_provider: Arc::::clone(&time_provider), executor: Arc::clone(&exec), diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 92a190ef27..8e6093f874 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -740,11 +740,14 @@ mod tests { "test_host", Arc::clone(&time_provider), )); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&persister), @@ -838,11 +841,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister, @@ -925,12 +931,14 @@ mod tests { .await .unwrap(), ); - let last_cache = - LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&wbuf.persister), @@ -1180,11 +1188,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs { persister: Arc::clone(&write_buffer.persister), @@ -3055,11 +3066,14 @@ mod tests { .await .unwrap(), ); - let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) + .await + .unwrap(); let distinct_cache = DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(); let wbuf = WriteBufferImpl::new(WriteBufferImplArgs { persister, diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index e0d7916c8e..18c6b2f2f8 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -667,11 +667,14 @@ mod tests { executor: Arc::clone(&exec), catalog: Arc::clone(&catalog), persister: Arc::clone(&persister), - last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)) + .await + .unwrap(), distinct_cache_provider: DistinctCacheProvider::new_from_catalog( Arc::clone(&time_provider), Arc::clone(&catalog), ) + .await .unwrap(), persisted_files: Arc::new(PersistedFiles::new()), parquet_cache: None,