feat: ack catalog update broadcast (#26118)
This creates a CatalogUpdateMessage type that is used to send CatalogUpdates; this type performs the send on the oneshot Sender so that the consumer of the message does not need to do so. Subscribers to the catalog get a CatalogSubscription, which uses the CatalogUpdateMessage type to ACK the message broadcast from the catalog. This means that catalog message broadcast can fail, but this commit does not provide any means of rolling back a catalog update. A test was added to check that it works.pull/26158/head
parent
98ca8dcb50
commit
863a6d0b4a
|
@ -562,6 +562,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Arc::clone(&catalog) as _,
|
||||
config.last_cache_eviction_interval.into(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::InitializeLastCache)?;
|
||||
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog_with_background_eviction(
|
||||
|
@ -569,6 +570,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Arc::clone(&catalog),
|
||||
config.distinct_cache_eviction_interval.into(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::InitializeDistinctCache)?;
|
||||
|
||||
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
|
@ -637,7 +639,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Arc::clone(&query_executor) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
sys_events_store,
|
||||
);
|
||||
)
|
||||
.await;
|
||||
|
||||
let builder = ServerBuilder::new(common_state)
|
||||
.max_request_size(config.max_http_request_size)
|
||||
|
|
|
@ -419,7 +419,9 @@ mod tests {
|
|||
debug!(catlog = ?writer.catalog(), "writer catalog");
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let distinct_provider =
|
||||
DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap();
|
||||
DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog())
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.catalog()
|
||||
.create_distinct_cache(
|
||||
|
@ -865,7 +867,9 @@ mod tests {
|
|||
).await;
|
||||
|
||||
let distinct_provider =
|
||||
DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap();
|
||||
DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog())
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.catalog()
|
||||
.create_distinct_cache(
|
||||
|
|
|
@ -2,7 +2,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
|
|||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use influxdb3_catalog::{
|
||||
catalog::{Catalog, CatalogBroadcastReceiver},
|
||||
catalog::Catalog,
|
||||
channel::CatalogUpdateReceiver,
|
||||
log::{
|
||||
CatalogBatch, DatabaseCatalogOp, DeleteDistinctCacheLog, DistinctCacheDefinition,
|
||||
SoftDeleteTableLog,
|
||||
|
@ -11,9 +12,7 @@ use influxdb3_catalog::{
|
|||
use influxdb3_id::{DbId, DistinctCacheId, TableId};
|
||||
use influxdb3_wal::{WalContents, WalOp};
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::warn;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
use super::{
|
||||
CacheError,
|
||||
|
@ -46,7 +45,7 @@ pub struct DistinctCacheProvider {
|
|||
impl DistinctCacheProvider {
|
||||
/// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's
|
||||
/// `cache_map` from the definitions in the catalog.
|
||||
pub fn new_from_catalog(
|
||||
pub async fn new_from_catalog(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
catalog: Arc<Catalog>,
|
||||
) -> Result<Arc<Self>, ProviderError> {
|
||||
|
@ -72,7 +71,10 @@ impl DistinctCacheProvider {
|
|||
}
|
||||
}
|
||||
|
||||
background_catalog_update(Arc::clone(&provider), catalog.subscribe_to_updates());
|
||||
background_catalog_update(
|
||||
Arc::clone(&provider),
|
||||
catalog.subscribe_to_updates("distinct_cache").await,
|
||||
);
|
||||
|
||||
Ok(provider)
|
||||
}
|
||||
|
@ -81,12 +83,12 @@ impl DistinctCacheProvider {
|
|||
/// `cache_map` from the definitions in the catalog. This starts a background process that
|
||||
/// runs on the provided `eviction_interval` to perform eviction on all of the caches
|
||||
/// in the created [`DistinctCacheProvider`]'s `cache_map`.
|
||||
pub fn new_from_catalog_with_background_eviction(
|
||||
pub async fn new_from_catalog_with_background_eviction(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
catalog: Arc<Catalog>,
|
||||
eviction_interval: Duration,
|
||||
) -> Result<Arc<Self>, ProviderError> {
|
||||
let provider = Self::new_from_catalog(time_provider, catalog)?;
|
||||
let provider = Self::new_from_catalog(time_provider, catalog).await?;
|
||||
|
||||
background_eviction_process(Arc::clone(&provider), eviction_interval);
|
||||
|
||||
|
@ -267,58 +269,41 @@ impl DistinctCacheProvider {
|
|||
|
||||
fn background_catalog_update(
|
||||
provider: Arc<DistinctCacheProvider>,
|
||||
mut subscription: CatalogBroadcastReceiver,
|
||||
mut subscription: CatalogUpdateReceiver,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match subscription.recv().await {
|
||||
Ok(catalog_update) => {
|
||||
for batch in catalog_update
|
||||
.batches()
|
||||
.filter_map(CatalogBatch::as_database)
|
||||
{
|
||||
for op in batch.ops.iter() {
|
||||
match op {
|
||||
DatabaseCatalogOp::SoftDeleteDatabase(_) => {
|
||||
provider.delete_caches_for_db(&batch.database_id);
|
||||
}
|
||||
DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog {
|
||||
database_id,
|
||||
table_id,
|
||||
..
|
||||
}) => {
|
||||
provider.delete_caches_for_db_and_table(database_id, table_id);
|
||||
}
|
||||
DatabaseCatalogOp::CreateDistinctCache(log) => {
|
||||
provider.create_from_catalog(batch.database_id, log);
|
||||
}
|
||||
DatabaseCatalogOp::DeleteDistinctCache(
|
||||
DeleteDistinctCacheLog {
|
||||
table_id, cache_id, ..
|
||||
},
|
||||
) => {
|
||||
// This only errors when the cache isn't there, so we ignore the
|
||||
// error...
|
||||
let _ = provider.delete_cache(
|
||||
&batch.database_id,
|
||||
table_id,
|
||||
cache_id,
|
||||
);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
while let Some(catalog_update) = subscription.recv().await {
|
||||
for batch in catalog_update
|
||||
.batches()
|
||||
.filter_map(CatalogBatch::as_database)
|
||||
{
|
||||
for op in batch.ops.iter() {
|
||||
match op {
|
||||
DatabaseCatalogOp::SoftDeleteDatabase(_) => {
|
||||
provider.delete_caches_for_db(&batch.database_id);
|
||||
}
|
||||
DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog {
|
||||
database_id,
|
||||
table_id,
|
||||
..
|
||||
}) => {
|
||||
provider.delete_caches_for_db_and_table(database_id, table_id);
|
||||
}
|
||||
DatabaseCatalogOp::CreateDistinctCache(log) => {
|
||||
provider.create_from_catalog(batch.database_id, log);
|
||||
}
|
||||
DatabaseCatalogOp::DeleteDistinctCache(DeleteDistinctCacheLog {
|
||||
table_id,
|
||||
cache_id,
|
||||
..
|
||||
}) => {
|
||||
// This only errors when the cache isn't there, so we ignore the
|
||||
// error...
|
||||
let _ = provider.delete_cache(&batch.database_id, table_id, cache_id);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => break,
|
||||
Err(RecvError::Lagged(num_messages_skipped)) => {
|
||||
// TODO: in this case, we would need to re-initialize the distinct cache provider
|
||||
// from the catalog, if possible.
|
||||
warn!(
|
||||
num_messages_skipped,
|
||||
"distinct cache provider catalog subscription is lagging"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
@ -1243,6 +1243,7 @@ mod tests {
|
|||
|
||||
// This is the function we are testing, which initializes the LastCacheProvider from the catalog:
|
||||
let provider = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.expect("create last cache provider from catalog");
|
||||
// There should be a total of 3 caches:
|
||||
assert_eq!(3, provider.size());
|
||||
|
@ -1271,7 +1272,9 @@ mod tests {
|
|||
.await;
|
||||
|
||||
// create a last cache provider so we can use it to create our UDTF provider:
|
||||
let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap();
|
||||
let provider = LastCacheProvider::new_from_catalog(writer.catalog())
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.catalog()
|
||||
.create_last_cache(
|
||||
|
@ -1526,7 +1529,9 @@ mod tests {
|
|||
let _ = writer
|
||||
.write_lp_to_write_batch("cpu,region=us-east,host=a usage=99,temp=88", 0)
|
||||
.await;
|
||||
let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap();
|
||||
let provider = LastCacheProvider::new_from_catalog(writer.catalog())
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.catalog()
|
||||
.create_last_cache(
|
||||
|
|
|
@ -3,7 +3,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
|
|||
use arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
|
||||
|
||||
use influxdb3_catalog::{
|
||||
catalog::{Catalog, CatalogBroadcastReceiver},
|
||||
catalog::Catalog,
|
||||
channel::CatalogUpdateReceiver,
|
||||
log::{
|
||||
CatalogBatch, DatabaseCatalogOp, DeleteLastCacheLog, LastCacheDefinition,
|
||||
LastCacheValueColumnsDef, SoftDeleteTableLog,
|
||||
|
@ -11,9 +12,8 @@ use influxdb3_catalog::{
|
|||
};
|
||||
use influxdb3_id::{DbId, LastCacheId, TableId};
|
||||
use influxdb3_wal::{WalContents, WalOp};
|
||||
use observability_deps::tracing::{debug, warn};
|
||||
use observability_deps::tracing::debug;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
use super::{
|
||||
CreateLastCacheArgs, Error,
|
||||
|
@ -37,7 +37,7 @@ impl std::fmt::Debug for LastCacheProvider {
|
|||
|
||||
impl LastCacheProvider {
|
||||
/// Initialize a [`LastCacheProvider`] from a [`Catalog`]
|
||||
pub fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Arc<Self>, Error> {
|
||||
pub async fn new_from_catalog(catalog: Arc<Catalog>) -> Result<Arc<Self>, Error> {
|
||||
let provider = Arc::new(LastCacheProvider {
|
||||
catalog: Arc::clone(&catalog),
|
||||
cache_map: Default::default(),
|
||||
|
@ -73,18 +73,21 @@ impl LastCacheProvider {
|
|||
}
|
||||
}
|
||||
|
||||
background_catalog_update(Arc::clone(&provider), catalog.subscribe_to_updates());
|
||||
background_catalog_update(
|
||||
Arc::clone(&provider),
|
||||
catalog.subscribe_to_updates("last_cache").await,
|
||||
);
|
||||
|
||||
Ok(provider)
|
||||
}
|
||||
|
||||
/// Initialize a [`LastCacheProvider`] from a [`Catalog`] and run a background process to
|
||||
/// evict expired entries from the cache
|
||||
pub fn new_from_catalog_with_background_eviction(
|
||||
pub async fn new_from_catalog_with_background_eviction(
|
||||
catalog: Arc<Catalog>,
|
||||
eviction_interval: Duration,
|
||||
) -> Result<Arc<Self>, Error> {
|
||||
let provider = Self::new_from_catalog(catalog)?;
|
||||
let provider = Self::new_from_catalog(catalog).await?;
|
||||
|
||||
background_eviction_process(Arc::clone(&provider), eviction_interval);
|
||||
|
||||
|
@ -320,53 +323,39 @@ impl LastCacheProvider {
|
|||
|
||||
fn background_catalog_update(
|
||||
provider: Arc<LastCacheProvider>,
|
||||
mut subscription: CatalogBroadcastReceiver,
|
||||
mut subscription: CatalogUpdateReceiver,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match subscription.recv().await {
|
||||
Ok(catalog_update) => {
|
||||
for batch in catalog_update
|
||||
.batches()
|
||||
.filter_map(CatalogBatch::as_database)
|
||||
{
|
||||
for op in batch.ops.iter() {
|
||||
match op {
|
||||
DatabaseCatalogOp::SoftDeleteDatabase(_) => {
|
||||
provider.delete_caches_for_db(&batch.database_id);
|
||||
}
|
||||
DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog {
|
||||
table_id,
|
||||
..
|
||||
}) => {
|
||||
provider.delete_caches_for_table(&batch.database_id, table_id);
|
||||
}
|
||||
DatabaseCatalogOp::CreateLastCache(log) => {
|
||||
provider.create_cache_from_definition(batch.database_id, log);
|
||||
}
|
||||
DatabaseCatalogOp::DeleteLastCache(DeleteLastCacheLog {
|
||||
table_id,
|
||||
id,
|
||||
..
|
||||
}) => {
|
||||
// This only errors when the cache isn't there, so we ignore the
|
||||
// error...
|
||||
let _ = provider.delete_cache(&batch.database_id, table_id, id);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
while let Some(catalog_update) = subscription.recv().await {
|
||||
for batch in catalog_update
|
||||
.batches()
|
||||
.filter_map(CatalogBatch::as_database)
|
||||
{
|
||||
for op in batch.ops.iter() {
|
||||
match op {
|
||||
DatabaseCatalogOp::SoftDeleteDatabase(_) => {
|
||||
provider.delete_caches_for_db(&batch.database_id);
|
||||
}
|
||||
DatabaseCatalogOp::SoftDeleteTable(SoftDeleteTableLog {
|
||||
table_id, ..
|
||||
}) => {
|
||||
provider.delete_caches_for_table(&batch.database_id, table_id);
|
||||
}
|
||||
DatabaseCatalogOp::CreateLastCache(log) => {
|
||||
provider.create_cache_from_definition(batch.database_id, log);
|
||||
}
|
||||
DatabaseCatalogOp::DeleteLastCache(DeleteLastCacheLog {
|
||||
table_id,
|
||||
id,
|
||||
..
|
||||
}) => {
|
||||
// This only errors when the cache isn't there, so we ignore the
|
||||
// error...
|
||||
let _ = provider.delete_cache(&batch.database_id, table_id, id);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => break,
|
||||
Err(RecvError::Lagged(num_messages_skipped)) => {
|
||||
// TODO: in this case, we would need to re-initialize the last cache provider
|
||||
// from the catalog, if possible.
|
||||
warn!(
|
||||
num_messages_skipped,
|
||||
"last cache provider catalog subscription is lagging"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
@ -16,14 +16,14 @@ use std::cmp::Ordering;
|
|||
use std::collections::BTreeMap;
|
||||
use std::hash::Hash;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, MutexGuard, broadcast};
|
||||
use update::CatalogUpdate;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use uuid::Uuid;
|
||||
|
||||
mod update;
|
||||
pub use schema::{InfluxColumnType, InfluxFieldType};
|
||||
pub use update::{DatabaseCatalogTransaction, Prompt};
|
||||
pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt};
|
||||
|
||||
use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver};
|
||||
use crate::log::{
|
||||
CreateDatabaseLog, DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode,
|
||||
RegisterNodeLog,
|
||||
|
@ -80,20 +80,8 @@ static CATALOG_WRITE_PERMIT: Mutex<CatalogSequenceNumber> =
|
|||
/// time that the permit was acquired.
|
||||
pub type CatalogWritePermit = MutexGuard<'static, CatalogSequenceNumber>;
|
||||
|
||||
const CATALOG_BROADCAST_CHANNEL_CAPACITY: usize = 10_000;
|
||||
|
||||
pub type CatalogBroadcastSender = broadcast::Sender<Arc<CatalogUpdate>>;
|
||||
pub type CatalogBroadcastReceiver = broadcast::Receiver<Arc<CatalogUpdate>>;
|
||||
|
||||
pub struct Catalog {
|
||||
/// Channel for broadcasting updates to other components that must handle `CatalogOp`s
|
||||
///
|
||||
/// # Implementation Note
|
||||
///
|
||||
/// This currently uses a `tokio::broadcast` channel, which can lead to dropped messages if
|
||||
/// the channel fills up. If that is a concern a more durable form of broadcasting single
|
||||
/// producer to multiple consumer messages would need to be implemented.
|
||||
channel: CatalogBroadcastSender,
|
||||
subscriptions: Arc<tokio::sync::RwLock<CatalogSubscriptions>>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
/// Connection to the object store for managing persistence and updates to the catalog
|
||||
store: ObjectStoreCatalog,
|
||||
|
@ -128,14 +116,14 @@ impl Catalog {
|
|||
let node_id = catalog_id.into();
|
||||
let store =
|
||||
ObjectStoreCatalog::new(Arc::clone(&node_id), CATALOG_CHECKPOINT_INTERVAL, store);
|
||||
let (channel, _) = broadcast::channel(CATALOG_BROADCAST_CHANNEL_CAPACITY);
|
||||
let subscriptions = Default::default();
|
||||
store
|
||||
.load_or_create_catalog()
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
.map(RwLock::new)
|
||||
.map(|inner| Self {
|
||||
channel,
|
||||
subscriptions,
|
||||
time_provider,
|
||||
store,
|
||||
inner,
|
||||
|
@ -150,8 +138,8 @@ impl Catalog {
|
|||
self.inner.read().catalog_uuid
|
||||
}
|
||||
|
||||
pub fn subscribe_to_updates(&self) -> broadcast::Receiver<Arc<CatalogUpdate>> {
|
||||
self.channel.subscribe()
|
||||
pub async fn subscribe_to_updates(&self, name: &'static str) -> CatalogUpdateReceiver {
|
||||
self.subscriptions.write().await.subscribe(name)
|
||||
}
|
||||
|
||||
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
|
||||
|
@ -363,10 +351,10 @@ impl Catalog {
|
|||
) -> Result<Self> {
|
||||
let store = ObjectStoreCatalog::new(catalog_id, checkpoint_interval, store);
|
||||
let inner = store.load_or_create_catalog().await?;
|
||||
let (channel, _) = broadcast::channel(CATALOG_BROADCAST_CHANNEL_CAPACITY);
|
||||
let subscriptions = Default::default();
|
||||
|
||||
Ok(Self {
|
||||
channel,
|
||||
subscriptions,
|
||||
time_provider,
|
||||
store,
|
||||
inner: RwLock::new(inner),
|
||||
|
|
|
@ -83,7 +83,7 @@ impl Catalog {
|
|||
UpdatePrompt::Applied => {
|
||||
self.apply_ordered_catalog_batch(&ordered_batch, &permit);
|
||||
self.background_checkpoint(&ordered_batch);
|
||||
self.broadcast_update(ordered_batch.into_batch());
|
||||
self.broadcast_update(ordered_batch.into_batch()).await?;
|
||||
Ok(Prompt::Success(self.sequence_number()))
|
||||
}
|
||||
}
|
||||
|
@ -676,7 +676,8 @@ impl Catalog {
|
|||
UpdatePrompt::Applied => {
|
||||
self.apply_ordered_catalog_batch(&ordered_batch, &permit);
|
||||
self.background_checkpoint(&ordered_batch);
|
||||
self.broadcast_update(ordered_batch.clone().into_batch());
|
||||
self.broadcast_update(ordered_batch.clone().into_batch())
|
||||
.await?;
|
||||
return Ok(Some(ordered_batch));
|
||||
}
|
||||
}
|
||||
|
@ -747,7 +748,7 @@ impl Catalog {
|
|||
.inspect_err(|error| debug!(?error, "failed to fetch next catalog sequence"))?
|
||||
{
|
||||
let batch = self.apply_ordered_catalog_batch(&ordered_catalog_batch, permit);
|
||||
self.broadcast_update(batch);
|
||||
self.broadcast_update(batch).await?;
|
||||
sequence_number = sequence_number.next();
|
||||
if update_until.is_some_and(|max_sequence| sequence_number > max_sequence) {
|
||||
break;
|
||||
|
@ -757,11 +758,13 @@ impl Catalog {
|
|||
}
|
||||
|
||||
/// Broadcast a `CatalogUpdate` to all subscribed components in the system.
|
||||
fn broadcast_update(&self, update: impl Into<CatalogUpdate>) {
|
||||
if let Err(send_error) = self.channel.send(Arc::new(update.into())) {
|
||||
info!("nothing listening for catalog updates");
|
||||
trace!(?send_error, "nothing listening for catalog updates");
|
||||
}
|
||||
async fn broadcast_update(&self, update: impl Into<CatalogUpdate>) -> Result<()> {
|
||||
self.subscriptions
|
||||
.write()
|
||||
.await
|
||||
.send_update(Arc::new(update.into()))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist the catalog as a checkpoint in the background if we are at the _n_th sequence
|
||||
|
@ -807,7 +810,7 @@ pub struct CatalogUpdate {
|
|||
}
|
||||
|
||||
impl CatalogUpdate {
|
||||
pub fn batches(&self) -> impl Iterator<Item = &CatalogBatch> {
|
||||
pub(crate) fn batches(&self) -> impl Iterator<Item = &CatalogBatch> {
|
||||
self.batches.iter()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::future::try_join_all;
|
||||
use observability_deps::tracing::warn;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use crate::{catalog::CatalogUpdate, log::CatalogBatch};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("error in catalog update subscribers: {0:?}")]
|
||||
pub struct SubscriptionError(#[from] anyhow::Error);
|
||||
|
||||
const CATALOG_SUBSCRIPTION_BUFFER_SIZE: usize = 10_000;
|
||||
|
||||
type CatalogUpdateSender = mpsc::Sender<CatalogUpdateMessage>;
|
||||
pub type CatalogUpdateReceiver = mpsc::Receiver<CatalogUpdateMessage>;
|
||||
|
||||
/// A message containing a set of `CatalogUpdate`s that can be handled by subscribers to the
|
||||
/// `Catalog`.
|
||||
///
|
||||
/// The response is sent in the `Drop` implementation of this type, so that the consumer of these
|
||||
/// messages does not need to worry about sending the response back to the catalog on broadcast.
|
||||
pub struct CatalogUpdateMessage {
|
||||
update: Arc<CatalogUpdate>,
|
||||
tx: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl CatalogUpdateMessage {
|
||||
/// Create a new `CatalogUpdateMessage`
|
||||
fn new(update: Arc<CatalogUpdate>, tx: oneshot::Sender<()>) -> Self {
|
||||
Self {
|
||||
update,
|
||||
tx: Some(tx),
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate over the `CatalogBatch`s in the update
|
||||
pub fn batches(&self) -> impl Iterator<Item = &CatalogBatch> {
|
||||
self.update.batches()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CatalogUpdateMessage {
|
||||
/// Send the response to the catalog via the oneshot sender
|
||||
fn drop(&mut self) {
|
||||
if let Some(tx) = self.tx.take() {
|
||||
let _ = tx
|
||||
.send(())
|
||||
.inspect_err(|error| warn!(?error, "unable to send ACK for catalog update"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for CatalogUpdateMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CatalogUpdateMessage")
|
||||
.field("update", &self.update)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct CatalogSubscriptions {
|
||||
subscriptions: hashbrown::HashMap<Arc<str>, CatalogUpdateSender>,
|
||||
}
|
||||
|
||||
impl CatalogSubscriptions {
|
||||
/// Subscribe to the catalog for updates.
|
||||
///
|
||||
/// This allows components in the system to listen for updates made to the catalog
|
||||
/// and handle/apply them as needed. A `subscription_name` is provided that identifies the
|
||||
/// component subscribing.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the provided `subscription_name` has already been used, this will panic.
|
||||
pub(crate) fn subscribe(&mut self, subscription_name: &'static str) -> CatalogUpdateReceiver {
|
||||
let (tx, rx) = mpsc::channel(CATALOG_SUBSCRIPTION_BUFFER_SIZE);
|
||||
assert!(
|
||||
self.subscriptions
|
||||
.insert(Arc::from(subscription_name), tx)
|
||||
.is_none(),
|
||||
"attempted to subscribe to catalog with same component name more than once, \
|
||||
name: {subscription_name}"
|
||||
);
|
||||
rx
|
||||
}
|
||||
|
||||
pub(crate) async fn send_update(
|
||||
&self,
|
||||
update: Arc<CatalogUpdate>,
|
||||
) -> Result<(), SubscriptionError> {
|
||||
let mut responses = vec![];
|
||||
for (name, sub) in self
|
||||
.subscriptions
|
||||
.iter()
|
||||
.map(|(n, s)| (Arc::clone(n), s.clone()))
|
||||
{
|
||||
let update_cloned = Arc::clone(&update);
|
||||
responses.push(tokio::spawn(async move {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sub.send(CatalogUpdateMessage::new(update_cloned, tx))
|
||||
.await
|
||||
.with_context(|| format!("failed to send update to {name}"))?;
|
||||
rx.await
|
||||
.with_context(|| format!("failed to receive response from {name}"))?;
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
|
||||
try_join_all(responses)
|
||||
.await
|
||||
.context("failed to collect responses from catalog subscribers")?
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<()>, anyhow::Error>>()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use crate::{
|
||||
catalog::Catalog,
|
||||
log::{CatalogBatch, FieldDataType},
|
||||
};
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_catalog_update_sub() {
|
||||
let catalog = Catalog::new_in_memory("cats").await.unwrap();
|
||||
let mut sub = catalog.subscribe_to_updates("test_sub").await;
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut n_updates = 0;
|
||||
while let Some(update) = sub.recv().await {
|
||||
debug!(?update, "got an update");
|
||||
for b in update.batches() {
|
||||
match b {
|
||||
CatalogBatch::Node(_) => (),
|
||||
CatalogBatch::Database(_) => (),
|
||||
}
|
||||
}
|
||||
n_updates += 1;
|
||||
}
|
||||
n_updates
|
||||
});
|
||||
|
||||
catalog.create_database("foo").await.unwrap();
|
||||
catalog
|
||||
.create_table("foo", "bar", &["tag"], &[("field", FieldDataType::String)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// drop the catalog so the channel closes and the handle above doesn't hang...
|
||||
drop(catalog);
|
||||
|
||||
let n_updates = handle.await.unwrap();
|
||||
assert_eq!(2, n_updates);
|
||||
}
|
||||
}
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use anyhow::anyhow;
|
||||
use schema::InfluxColumnType;
|
||||
|
||||
use crate::{catalog::Catalog, object_store::ObjectStoreCatalogError};
|
||||
use crate::{catalog::Catalog, channel::SubscriptionError, object_store::ObjectStoreCatalogError};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum CatalogError {
|
||||
|
@ -92,6 +92,9 @@ pub enum CatalogError {
|
|||
existing: String,
|
||||
},
|
||||
|
||||
#[error("catalog subscription error: {0}")]
|
||||
Subscription(#[from] SubscriptionError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pub mod catalog;
|
||||
pub mod channel;
|
||||
pub mod error;
|
||||
pub mod id;
|
||||
pub mod log;
|
||||
|
|
|
@ -8,7 +8,8 @@ use bytes::Bytes;
|
|||
use hashbrown::HashMap;
|
||||
use hyper::{Body, Response};
|
||||
use influxdb3_catalog::CatalogError;
|
||||
use influxdb3_catalog::catalog::{Catalog, CatalogBroadcastReceiver};
|
||||
use influxdb3_catalog::catalog::Catalog;
|
||||
use influxdb3_catalog::channel::CatalogUpdateReceiver;
|
||||
use influxdb3_catalog::log::{
|
||||
CatalogBatch, DatabaseCatalogOp, DeleteTriggerLog, PluginType, TriggerDefinition,
|
||||
TriggerIdentifier, TriggerSpecificationDefinition, ValidPluginFilename,
|
||||
|
@ -29,7 +30,6 @@ use std::any::Any;
|
|||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::oneshot::Receiver;
|
||||
use tokio::sync::{RwLock, mpsc, oneshot};
|
||||
|
||||
|
@ -205,7 +205,7 @@ impl PluginChannels {
|
|||
}
|
||||
|
||||
impl ProcessingEngineManagerImpl {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
environment: ProcessingEngineEnvironmentManager,
|
||||
catalog: Arc<Catalog>,
|
||||
node_id: impl Into<Arc<str>>,
|
||||
|
@ -225,7 +225,7 @@ impl ProcessingEngineManagerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
let catalog_sub = catalog.subscribe_to_updates();
|
||||
let catalog_sub = catalog.subscribe_to_updates("processing_engine").await;
|
||||
|
||||
let cache = Arc::new(Mutex::new(CacheStore::new(
|
||||
Arc::clone(&time_provider),
|
||||
|
@ -668,89 +668,74 @@ pub(crate) struct Request {
|
|||
|
||||
fn background_catalog_update(
|
||||
processing_engine_manager: Arc<ProcessingEngineManagerImpl>,
|
||||
mut subscription: CatalogBroadcastReceiver,
|
||||
mut subscription: CatalogUpdateReceiver,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match subscription.recv().await {
|
||||
Ok(catalog_update) => {
|
||||
for batch in catalog_update
|
||||
.batches()
|
||||
.filter_map(CatalogBatch::as_database)
|
||||
{
|
||||
for op in batch.ops.iter() {
|
||||
let processing_engine_manager = Arc::clone(&processing_engine_manager);
|
||||
match op {
|
||||
DatabaseCatalogOp::CreateTrigger(TriggerDefinition {
|
||||
trigger_name,
|
||||
database_name,
|
||||
disabled,
|
||||
..
|
||||
}) => {
|
||||
if !disabled {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.run_trigger(database_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to run the created trigger");
|
||||
}
|
||||
}
|
||||
while let Some(catalog_update) = subscription.recv().await {
|
||||
for batch in catalog_update
|
||||
.batches()
|
||||
.filter_map(CatalogBatch::as_database)
|
||||
{
|
||||
for op in batch.ops.iter() {
|
||||
let processing_engine_manager = Arc::clone(&processing_engine_manager);
|
||||
match op {
|
||||
DatabaseCatalogOp::CreateTrigger(TriggerDefinition {
|
||||
trigger_name,
|
||||
database_name,
|
||||
disabled,
|
||||
..
|
||||
}) => {
|
||||
if !disabled {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.run_trigger(database_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to run the created trigger");
|
||||
}
|
||||
DatabaseCatalogOp::EnableTrigger(TriggerIdentifier {
|
||||
db_name,
|
||||
trigger_name,
|
||||
..
|
||||
}) => {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.run_trigger(db_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to run the trigger");
|
||||
}
|
||||
}
|
||||
DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog {
|
||||
trigger_name,
|
||||
force: true,
|
||||
..
|
||||
}) => {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.stop_trigger(&batch.database_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to disable the trigger");
|
||||
}
|
||||
}
|
||||
DatabaseCatalogOp::DisableTrigger(TriggerIdentifier {
|
||||
db_name,
|
||||
trigger_name,
|
||||
..
|
||||
}) => {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.stop_trigger(db_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to disable the trigger");
|
||||
}
|
||||
}
|
||||
// NOTE(trevor/catalog-refactor): it is not clear that any other operation needs to be
|
||||
// handled, based on the existing code, but we could potentially
|
||||
// handle database deletion, trigger creation/deletion/enable here
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
DatabaseCatalogOp::EnableTrigger(TriggerIdentifier {
|
||||
db_name,
|
||||
trigger_name,
|
||||
..
|
||||
}) => {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.run_trigger(db_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to run the trigger");
|
||||
}
|
||||
}
|
||||
DatabaseCatalogOp::DeleteTrigger(DeleteTriggerLog {
|
||||
trigger_name,
|
||||
force: true,
|
||||
..
|
||||
}) => {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.stop_trigger(&batch.database_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to disable the trigger");
|
||||
}
|
||||
}
|
||||
DatabaseCatalogOp::DisableTrigger(TriggerIdentifier {
|
||||
db_name,
|
||||
trigger_name,
|
||||
..
|
||||
}) => {
|
||||
if let Err(error) = processing_engine_manager
|
||||
.stop_trigger(db_name, trigger_name)
|
||||
.await
|
||||
{
|
||||
error!(?error, "failed to disable the trigger");
|
||||
}
|
||||
}
|
||||
// NOTE(trevor/catalog-refactor): it is not clear that any other operation needs to be
|
||||
// handled, based on the existing code, but we could potentially
|
||||
// handle database deletion, trigger creation/deletion/enable here
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => break,
|
||||
Err(RecvError::Lagged(num_messages_skipped)) => {
|
||||
// NOTE(trevor/catalog-refactor): in this case, we would need to re-initialize the proc eng manager
|
||||
// from the catalog, if possible; but, it may be more desireable to not have this
|
||||
// situation be possible at all.. The use of a broadcast channel should only
|
||||
// be temporary, so this particular error variant should go away in future
|
||||
warn!(
|
||||
num_messages_skipped,
|
||||
"processing engine manager catalog subscription is lagging"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -999,11 +984,14 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.unwrap();
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let wbuf = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister,
|
||||
|
@ -1049,7 +1037,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
qe,
|
||||
time_provider,
|
||||
sys_event_store,
|
||||
),
|
||||
)
|
||||
.await,
|
||||
file,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -786,11 +786,14 @@ mod tests {
|
|||
influxdb3_write::write_buffer::WriteBufferImplArgs {
|
||||
persister: Arc::clone(&persister),
|
||||
catalog: Arc::clone(&catalog),
|
||||
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
|
||||
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog))
|
||||
.await
|
||||
.unwrap(),
|
||||
distinct_cache: DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider) as _,
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
time_provider: Arc::clone(&time_provider) as _,
|
||||
executor: Arc::clone(&exec),
|
||||
|
@ -847,7 +850,8 @@ mod tests {
|
|||
Arc::clone(&query_executor) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
sys_events_store,
|
||||
);
|
||||
)
|
||||
.await;
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
.write_buffer(Arc::clone(&write_buffer))
|
||||
|
|
|
@ -825,11 +825,14 @@ mod tests {
|
|||
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister,
|
||||
catalog: Arc::clone(&catalog),
|
||||
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
|
||||
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog))
|
||||
.await
|
||||
.unwrap(),
|
||||
distinct_cache: DistinctCacheProvider::new_from_catalog(
|
||||
Arc::<MockProvider>::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
time_provider: Arc::<MockProvider>::clone(&time_provider),
|
||||
executor: Arc::clone(&exec),
|
||||
|
|
|
@ -740,11 +740,14 @@ mod tests {
|
|||
"test_host",
|
||||
Arc::clone(&time_provider),
|
||||
));
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.unwrap();
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister: Arc::clone(&persister),
|
||||
|
@ -838,11 +841,14 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.unwrap();
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister,
|
||||
|
@ -925,12 +931,14 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let last_cache =
|
||||
LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.unwrap();
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister: Arc::clone(&wbuf.persister),
|
||||
|
@ -1180,11 +1188,14 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.unwrap();
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister: Arc::clone(&write_buffer.persister),
|
||||
|
@ -3055,11 +3066,14 @@ mod tests {
|
|||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
|
||||
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
|
||||
.await
|
||||
.unwrap();
|
||||
let distinct_cache = DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let wbuf = WriteBufferImpl::new(WriteBufferImplArgs {
|
||||
persister,
|
||||
|
|
|
@ -667,11 +667,14 @@ mod tests {
|
|||
executor: Arc::clone(&exec),
|
||||
catalog: Arc::clone(&catalog),
|
||||
persister: Arc::clone(&persister),
|
||||
last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
|
||||
last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog))
|
||||
.await
|
||||
.unwrap(),
|
||||
distinct_cache_provider: DistinctCacheProvider::new_from_catalog(
|
||||
Arc::clone(&time_provider),
|
||||
Arc::clone(&catalog),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
persisted_files: Arc::new(PersistedFiles::new()),
|
||||
parquet_cache: None,
|
||||
|
|
Loading…
Reference in New Issue