diff --git a/influxdb_iox/src/clap_blocks/catalog_dsn.rs b/influxdb_iox/src/clap_blocks/catalog_dsn.rs index 2bc5bc0404..b239090ac8 100644 --- a/influxdb_iox/src/clap_blocks/catalog_dsn.rs +++ b/influxdb_iox/src/clap_blocks/catalog_dsn.rs @@ -50,7 +50,7 @@ impl CatalogDsnConfig { pub async fn get_catalog( &self, app_name: &'static str, - metrics: Option>, + metrics: Arc, ) -> Result, Error> { let catalog = match self.catalog_type_ { CatalogType::Postgres => Arc::new( @@ -59,13 +59,13 @@ impl CatalogDsnConfig { iox_catalog::postgres::SCHEMA_NAME, self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?, self.max_catalog_connections, - metrics.unwrap_or_default(), + metrics, ) .await .context(CatalogSnafu)?, ) as Arc, CatalogType::Memory => { - let mem = MemCatalog::new(); + let mem = MemCatalog::new(metrics); let mut txn = mem.start_transaction().await.context(CatalogSnafu)?; create_or_get_default_records(2, txn.deref_mut()) diff --git a/influxdb_iox/src/commands/catalog.rs b/influxdb_iox/src/commands/catalog.rs index 52ed839566..2138a66b8e 100644 --- a/influxdb_iox/src/commands/catalog.rs +++ b/influxdb_iox/src/commands/catalog.rs @@ -1,5 +1,7 @@ //! This module implements the `catalog` CLI command +use std::sync::Arc; + use crate::clap_blocks::catalog_dsn::CatalogDsnConfig; use thiserror::Error; @@ -51,7 +53,8 @@ enum Command { pub async fn command(config: Config) -> Result<(), Error> { match config.command { Command::Setup(command) => { - let catalog = command.catalog_dsn.get_catalog("cli", None).await?; + let metrics = Arc::new(metric::Registry::new()); + let catalog = command.catalog_dsn.get_catalog("cli", metrics).await?; catalog.setup().await?; println!("OK"); } diff --git a/influxdb_iox/src/commands/catalog/topic.rs b/influxdb_iox/src/commands/catalog/topic.rs index 0f17561f01..a809300d29 100644 --- a/influxdb_iox/src/commands/catalog/topic.rs +++ b/influxdb_iox/src/commands/catalog/topic.rs @@ -1,5 +1,7 @@ //! This module implements the `catalog topic` CLI subcommand +use std::sync::Arc; + use thiserror::Error; use crate::clap_blocks::catalog_dsn::CatalogDsnConfig; @@ -46,7 +48,8 @@ enum Command { pub async fn command(config: Config) -> Result<(), Error> { match config.command { Command::Update(update) => { - let catalog = update.catalog_dsn.get_catalog("cli", None).await?; + let metrics = Arc::new(metric::Registry::new()); + let catalog = update.catalog_dsn.get_catalog("cli", metrics).await?; let mut repos = catalog.repositories().await; let topic = repos.kafka_topics().create_or_get(&update.db_name).await?; println!("{}", topic.id); diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs index 0dcb4d0178..7c0974a798 100644 --- a/influxdb_iox/src/commands/run/compactor.rs +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -62,7 +62,7 @@ pub async fn command(config: Config) -> Result<(), Error> { let metric_registry: Arc = Default::default(); let catalog = config .catalog_dsn - .get_catalog("compactor", Some(Arc::clone(&metric_registry))) + .get_catalog("compactor", Arc::clone(&metric_registry)) .await?; let object_store = Arc::new( diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs index 153e524c85..d310b3c14e 100644 --- a/influxdb_iox/src/commands/run/ingester.rs +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -151,7 +151,7 @@ pub async fn command(config: Config) -> Result<()> { let catalog = config .catalog_dsn - .get_catalog("ingester", Some(Arc::clone(&metric_registry))) + .get_catalog("ingester", Arc::clone(&metric_registry)) .await?; let mut txn = catalog.start_transaction().await?; diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 940a3b0974..d7729987aa 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -63,7 +63,7 @@ pub async fn command(config: Config) -> Result<(), Error> { let metric_registry: Arc = Default::default(); let catalog = config .catalog_dsn - .get_catalog("querier", Some(Arc::clone(&metric_registry))) + .get_catalog("querier", Arc::clone(&metric_registry)) .await?; let object_store = Arc::new( diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 1097fa8b1e..3dbc711dbc 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -90,7 +90,7 @@ pub async fn command(config: Config) -> Result<()> { let catalog = config .catalog_dsn - .get_catalog("router2", Some(Arc::clone(&metrics))) + .get_catalog("router2", Arc::clone(&metrics)) .await?; // Initialise the sharded write buffer and instrument it with DML handler diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 1f39f16d34..01db3b0c7d 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1475,7 +1475,8 @@ mod tests { #[tokio::test] async fn buffer_write_updates_lifecycle_manager_indicates_pause() { - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let mut repos = catalog.repositories().await; let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap(); let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap(); @@ -1523,7 +1524,7 @@ mod tests { let pause_size = w1.size() + 1; let manager = LifecycleManager::new( LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)), - Arc::new(metric::Registry::new()), + metrics, Arc::new(SystemProvider::new()), ); let should_pause = data @@ -1540,7 +1541,8 @@ mod tests { #[tokio::test] async fn persist() { - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let mut repos = catalog.repositories().await; let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap(); let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap(); @@ -1608,7 +1610,7 @@ mod tests { let manager = LifecycleManager::new( LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)), - Arc::new(metric::Registry::new()), + metrics, Arc::new(SystemProvider::new()), ); @@ -1895,7 +1897,8 @@ mod tests { #[tokio::test] async fn buffer_operation_ignores_already_persisted_data() { - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let mut repos = catalog.repositories().await; let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap(); let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap(); @@ -1965,7 +1968,7 @@ mod tests { let manager = LifecycleManager::new( LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)), - Arc::new(metric::Registry::new()), + metrics, Arc::new(SystemProvider::new()), ); let exec = Executor::new(1); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 885998a2b0..96f9ace182 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -673,7 +673,8 @@ mod tests { impl TestIngester { async fn new() -> Self { - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics: Arc = Default::default(); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let mut txn = catalog.start_transaction().await.unwrap(); let kafka_topic = txn.kafka_topics().create_or_get("whatevs").await.unwrap(); @@ -699,7 +700,6 @@ mod tests { let reading: Arc = Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()); let object_store = Arc::new(ObjectStore::new_in_memory()); - let metrics: Arc = Default::default(); let lifecycle_config = LifecycleConfig::new(1000000, 1000, 1000, Duration::from_secs(10)); diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index dde15ddeb7..b0c89a7fa4 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -607,7 +607,8 @@ bitflags! { /// you will be able to produce 14 scenarios by calling it in 2 loops pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData { // Whatever data because they won't be used in the tests - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics: Arc = Default::default(); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let object_store = Arc::new(object_store::ObjectStore::new_in_memory()); let exec = query::exec::Executor::new(1); @@ -655,7 +656,8 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData { // Whatever data because they won't be used in the tests - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics: Arc = Default::default(); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let object_store = Arc::new(object_store::ObjectStore::new_in_memory()); let exec = query::exec::Executor::new(1); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 9487dec711..ba9c056bf0 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -6,6 +6,7 @@ use schema::{InfluxColumnType, InfluxFieldType}; use snafu::{OptionExt, Snafu}; use std::convert::TryFrom; use std::fmt::Formatter; +use std::sync::Arc; use std::{collections::BTreeMap, fmt::Debug}; use uuid::Uuid; @@ -317,6 +318,9 @@ pub trait Catalog: Send + Sync + Debug { /// Access the repositories w/o a transaction scope. async fn repositories(&self) -> Box; + + /// Get metric registry associated w/ this catalog. + fn metrics(&self) -> Arc; } /// Secret module for [sealed traits]. @@ -1085,6 +1089,7 @@ pub struct ProcessedTombstone { #[cfg(test)] pub(crate) mod test_helpers { use ::test_helpers::{assert_contains, tracing::TracingCapture}; + use metric::{Attributes, Metric, U64Histogram}; use crate::add_parquet_file_with_tombstones; @@ -1105,6 +1110,17 @@ pub(crate) mod test_helpers { test_add_parquet_file_with_tombstones(Arc::clone(&catalog)).await; test_txn_isolation(Arc::clone(&catalog)).await; test_txn_drop(Arc::clone(&catalog)).await; + + let metrics = catalog.metrics(); + assert_metric_hit(&*metrics, "kafka_create_or_get"); + assert_metric_hit(&*metrics, "query_create_or_get"); + assert_metric_hit(&*metrics, "namespace_create"); + assert_metric_hit(&*metrics, "table_create_or_get"); + assert_metric_hit(&*metrics, "column_create_or_get"); + assert_metric_hit(&*metrics, "sequencer_create_or_get"); + assert_metric_hit(&*metrics, "partition_create_or_get"); + assert_metric_hit(&*metrics, "tombstone_create_or_get"); + assert_metric_hit(&*metrics, "parquet_create"); } async fn test_setup(catalog: Arc) { @@ -2091,4 +2107,16 @@ pub(crate) mod test_helpers { assert!(topic.is_none()); txn.abort().await.unwrap(); } + + fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) { + let histogram = metrics + .get_instrument::>("catalog_op_duration_ms") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) + .expect("failed to get observer") + .fetch(); + + let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + assert!(hit_count > 1, "metric did not record any calls"); + } } diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 1d3da9e187..4810d92013 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -273,11 +273,15 @@ mod tests { #[tokio::test] async fn []() { use crate::interface::Catalog; - use std::ops::DerefMut; + use std::{ + ops::DerefMut, + sync::Arc, + }; use pretty_assertions::assert_eq; const NAMESPACE_NAME: &str = "bananas"; - let catalog = MemCatalog::new(); + let metrics = Arc::new(metric::Registry::default()); + let catalog = MemCatalog::new(metrics); let mut txn = catalog.start_transaction().await.unwrap(); let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index e2637c8f55..000acc1316 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1,14 +1,17 @@ //! This module implements an in-memory implementation of the iox_catalog interface. It can be //! used for testing or for an IOx designed to run without catalog persistence. -use crate::interface::{ - sealed::TransactionFinalize, Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, - KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, - NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, - PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, - QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, - SequencerRepo, Table, TableId, TablePersistInfo, TableRepo, Timestamp, Tombstone, TombstoneId, - TombstoneRepo, Transaction, +use crate::{ + interface::{ + sealed::TransactionFinalize, Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, + KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, + NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, + PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, + QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, + SequencerRepo, Table, TableId, TablePersistInfo, TableRepo, Timestamp, Tombstone, + TombstoneId, TombstoneRepo, Transaction, + }, + metrics::MetricDecorator, }; use async_trait::async_trait; use observability_deps::tracing::warn; @@ -20,15 +23,18 @@ use uuid::Uuid; /// In-memory catalog that implements the `RepoCollection` and individual repo traits from /// the catalog interface. -#[derive(Default)] pub struct MemCatalog { + metrics: Arc, collections: Arc>, } impl MemCatalog { /// return new initialized `MemCatalog` - pub fn new() -> Self { - Self::default() + pub fn new(metrics: Arc) -> Self { + Self { + metrics, + collections: Default::default(), + } } } @@ -101,20 +107,30 @@ impl Catalog for MemCatalog { async fn start_transaction(&self) -> Result, Error> { let guard = Arc::clone(&self.collections).lock_owned().await; let stage = guard.clone(); - Ok(Box::new(MemTxn { - inner: MemTxnInner::Txn { - guard, - stage, - finalized: false, + Ok(Box::new(MetricDecorator::new( + MemTxn { + inner: MemTxnInner::Txn { + guard, + stage, + finalized: false, + }, }, - })) + Arc::clone(&self.metrics), + ))) } async fn repositories(&self) -> Box { let collections = Arc::clone(&self.collections).lock_owned().await; - Box::new(MemTxn { - inner: MemTxnInner::NoTxn { collections }, - }) + Box::new(MetricDecorator::new( + MemTxn { + inner: MemTxnInner::NoTxn { collections }, + }, + Arc::clone(&self.metrics), + )) + } + + fn metrics(&self) -> Arc { + Arc::clone(&self.metrics) } } @@ -850,6 +866,7 @@ mod tests { #[tokio::test] async fn test_catalog() { - crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await; + let metrics = Arc::new(metric::Registry::default()); + crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new(metrics))).await; } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index b9b7cb95a8..f6cedc3219 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -231,6 +231,10 @@ impl Catalog for PostgresCatalog { Arc::clone(&self.metrics), )) } + + fn metrics(&self) -> Arc { + Arc::clone(&self.metrics) + } } /// Creates a new [`sqlx::Pool`] from a database config and an explicit DSN. @@ -1231,7 +1235,6 @@ mod tests { use super::*; - use metric::{Attributes, Metric, U64Histogram}; use rand::Rng; use std::{env, ops::DerefMut, sync::Arc}; use std::{io::Write, time::Instant}; @@ -1284,18 +1287,6 @@ mod tests { }; } - fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) { - let histogram = metrics - .get_instrument::>("catalog_op_duration_ms") - .expect("failed to read metric") - .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) - .expect("failed to get observer") - .fetch(); - - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); - assert!(hit_count > 1, "metric did not record any calls"); - } - async fn setup_db() -> PostgresCatalog { // create a random schema for this particular pool let schema_name = { @@ -1347,20 +1338,9 @@ mod tests { maybe_skip_integration!(); let postgres = setup_db().await; - let metrics = Arc::clone(&postgres.metrics); let postgres: Arc = Arc::new(postgres); crate::interface::test_helpers::test_catalog(postgres).await; - - assert_metric_hit(&*metrics, "kafka_create_or_get"); - assert_metric_hit(&*metrics, "query_create_or_get"); - assert_metric_hit(&*metrics, "namespace_create"); - assert_metric_hit(&*metrics, "table_create_or_get"); - assert_metric_hit(&*metrics, "column_create_or_get"); - assert_metric_hit(&*metrics, "sequencer_create_or_get"); - assert_metric_hit(&*metrics, "partition_create_or_get"); - assert_metric_hit(&*metrics, "tombstone_create_or_get"); - assert_metric_hit(&*metrics, "parquet_create"); } #[tokio::test] diff --git a/querier/src/chunk.rs b/querier/src/chunk.rs index 37b4250dfa..ca270f7bf2 100644 --- a/querier/src/chunk.rs +++ b/querier/src/chunk.rs @@ -351,9 +351,9 @@ mod tests { #[tokio::test] async fn test_create_record() { - let catalog: Arc = Arc::new(MemCatalog::new()); - let object_store = Arc::new(ObjectStore::new_in_memory()); let metric_registry = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))); + let object_store = Arc::new(ObjectStore::new_in_memory()); let time_provider = Arc::new(MockProvider::new(now())); let adapter = ParquetChunkAdapter::new( diff --git a/querier/src/database.rs b/querier/src/database.rs index b50904031a..38ac3823f2 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -159,9 +159,9 @@ mod tests { #[tokio::test] async fn test_sync() { - let catalog: Arc = Arc::new(MemCatalog::new()); - let object_store = Arc::new(ObjectStore::new_in_memory()); let metric_registry = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))); + let object_store = Arc::new(ObjectStore::new_in_memory()); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0))); let db = QuerierDatabase::new( diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index a8d9e23bd2..c04fdcd478 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -53,7 +53,8 @@ fn e2e_benchmarks(c: &mut Criterion) { let mut group = c.benchmark_group("e2e"); let delegate = { - let catalog: Arc = Arc::new(MemCatalog::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let ns_cache = Arc::new(ShardedCache::new( iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); diff --git a/router2/src/dml_handlers/ns_autocreation.rs b/router2/src/dml_handlers/ns_autocreation.rs index e43a0d8b14..258f920a6e 100644 --- a/router2/src/dml_handlers/ns_autocreation.rs +++ b/router2/src/dml_handlers/ns_autocreation.rs @@ -160,7 +160,8 @@ mod tests { }, ); - let catalog: Arc = Arc::new(MemCatalog::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let creator = NamespaceAutocreation::new( Arc::clone(&catalog), @@ -195,7 +196,8 @@ mod tests { let ns = DatabaseName::try_from("bananas").unwrap(); let cache = Arc::new(MemoryNamespaceCache::default()); - let catalog: Arc = Arc::new(MemCatalog::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let creator = NamespaceAutocreation::new( Arc::clone(&catalog), diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index 8f1b0e7413..b7a4e31db5 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -226,7 +226,8 @@ mod tests { /// Initialise an in-memory [`MemCatalog`] and create a single namespace /// named [`NAMESPACE`]. async fn create_catalog() -> Arc { - let catalog: Arc = Arc::new(MemCatalog::new()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; repos diff --git a/router2/tests/http.rs b/router2/tests/http.rs index 0bfd284569..9094f0d985 100644 --- a/router2/tests/http.rs +++ b/router2/tests/http.rs @@ -89,7 +89,7 @@ impl TestContext { .collect::>(), ); - let catalog: Arc = Arc::new(MemCatalog::default()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let ns_cache = Arc::new(ShardedCache::new( iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), ));