feat: offer metrics for in-mem catalog (#3876)

This can be quite helpful to test certain caching behavior w/o writing
yet-another abstraction layer.
pull/24376/head
Marco Neumann 2022-03-01 11:33:54 +00:00 committed by GitHub
parent 0e0dc500f6
commit 48722783f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 119 additions and 75 deletions

View File

@ -50,7 +50,7 @@ impl CatalogDsnConfig {
pub async fn get_catalog(
&self,
app_name: &'static str,
metrics: Option<Arc<metric::Registry>>,
metrics: Arc<metric::Registry>,
) -> Result<Arc<dyn Catalog>, Error> {
let catalog = match self.catalog_type_ {
CatalogType::Postgres => Arc::new(
@ -59,13 +59,13 @@ impl CatalogDsnConfig {
iox_catalog::postgres::SCHEMA_NAME,
self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?,
self.max_catalog_connections,
metrics.unwrap_or_default(),
metrics,
)
.await
.context(CatalogSnafu)?,
) as Arc<dyn Catalog>,
CatalogType::Memory => {
let mem = MemCatalog::new();
let mem = MemCatalog::new(metrics);
let mut txn = mem.start_transaction().await.context(CatalogSnafu)?;
create_or_get_default_records(2, txn.deref_mut())

View File

@ -1,5 +1,7 @@
//! This module implements the `catalog` CLI command
use std::sync::Arc;
use crate::clap_blocks::catalog_dsn::CatalogDsnConfig;
use thiserror::Error;
@ -51,7 +53,8 @@ enum Command {
pub async fn command(config: Config) -> Result<(), Error> {
match config.command {
Command::Setup(command) => {
let catalog = command.catalog_dsn.get_catalog("cli", None).await?;
let metrics = Arc::new(metric::Registry::new());
let catalog = command.catalog_dsn.get_catalog("cli", metrics).await?;
catalog.setup().await?;
println!("OK");
}

View File

@ -1,5 +1,7 @@
//! This module implements the `catalog topic` CLI subcommand
use std::sync::Arc;
use thiserror::Error;
use crate::clap_blocks::catalog_dsn::CatalogDsnConfig;
@ -46,7 +48,8 @@ enum Command {
pub async fn command(config: Config) -> Result<(), Error> {
match config.command {
Command::Update(update) => {
let catalog = update.catalog_dsn.get_catalog("cli", None).await?;
let metrics = Arc::new(metric::Registry::new());
let catalog = update.catalog_dsn.get_catalog("cli", metrics).await?;
let mut repos = catalog.repositories().await;
let topic = repos.kafka_topics().create_or_get(&update.db_name).await?;
println!("{}", topic.id);

View File

@ -62,7 +62,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
let metric_registry: Arc<metric::Registry> = Default::default();
let catalog = config
.catalog_dsn
.get_catalog("compactor", Some(Arc::clone(&metric_registry)))
.get_catalog("compactor", Arc::clone(&metric_registry))
.await?;
let object_store = Arc::new(

View File

@ -151,7 +151,7 @@ pub async fn command(config: Config) -> Result<()> {
let catalog = config
.catalog_dsn
.get_catalog("ingester", Some(Arc::clone(&metric_registry)))
.get_catalog("ingester", Arc::clone(&metric_registry))
.await?;
let mut txn = catalog.start_transaction().await?;

View File

@ -63,7 +63,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
let metric_registry: Arc<metric::Registry> = Default::default();
let catalog = config
.catalog_dsn
.get_catalog("querier", Some(Arc::clone(&metric_registry)))
.get_catalog("querier", Arc::clone(&metric_registry))
.await?;
let object_store = Arc::new(

View File

@ -90,7 +90,7 @@ pub async fn command(config: Config) -> Result<()> {
let catalog = config
.catalog_dsn
.get_catalog("router2", Some(Arc::clone(&metrics)))
.get_catalog("router2", Arc::clone(&metrics))
.await?;
// Initialise the sharded write buffer and instrument it with DML handler

View File

@ -1475,7 +1475,8 @@ mod tests {
#[tokio::test]
async fn buffer_write_updates_lifecycle_manager_indicates_pause() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let mut repos = catalog.repositories().await;
let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap();
let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap();
@ -1523,7 +1524,7 @@ mod tests {
let pause_size = w1.size() + 1;
let manager = LifecycleManager::new(
LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)),
Arc::new(metric::Registry::new()),
metrics,
Arc::new(SystemProvider::new()),
);
let should_pause = data
@ -1540,7 +1541,8 @@ mod tests {
#[tokio::test]
async fn persist() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let mut repos = catalog.repositories().await;
let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap();
let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap();
@ -1608,7 +1610,7 @@ mod tests {
let manager = LifecycleManager::new(
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)),
Arc::new(metric::Registry::new()),
metrics,
Arc::new(SystemProvider::new()),
);
@ -1895,7 +1897,8 @@ mod tests {
#[tokio::test]
async fn buffer_operation_ignores_already_persisted_data() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let mut repos = catalog.repositories().await;
let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap();
let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap();
@ -1965,7 +1968,7 @@ mod tests {
let manager = LifecycleManager::new(
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)),
Arc::new(metric::Registry::new()),
metrics,
Arc::new(SystemProvider::new()),
);
let exec = Executor::new(1);

View File

@ -673,7 +673,8 @@ mod tests {
impl TestIngester {
async fn new() -> Self {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let mut txn = catalog.start_transaction().await.unwrap();
let kafka_topic = txn.kafka_topics().create_or_get("whatevs").await.unwrap();
@ -699,7 +700,6 @@ mod tests {
let reading: Arc<dyn WriteBufferReading> =
Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap());
let object_store = Arc::new(ObjectStore::new_in_memory());
let metrics: Arc<metric::Registry> = Default::default();
let lifecycle_config =
LifecycleConfig::new(1000000, 1000, 1000, Duration::from_secs(10));

View File

@ -607,7 +607,8 @@ bitflags! {
/// you will be able to produce 14 scenarios by calling it in 2 loops
pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData {
// Whatever data because they won't be used in the tests
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let object_store = Arc::new(object_store::ObjectStore::new_in_memory());
let exec = query::exec::Executor::new(1);
@ -655,7 +656,8 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData {
// Whatever data because they won't be used in the tests
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let object_store = Arc::new(object_store::ObjectStore::new_in_memory());
let exec = query::exec::Executor::new(1);

View File

@ -6,6 +6,7 @@ use schema::{InfluxColumnType, InfluxFieldType};
use snafu::{OptionExt, Snafu};
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::Arc;
use std::{collections::BTreeMap, fmt::Debug};
use uuid::Uuid;
@ -317,6 +318,9 @@ pub trait Catalog: Send + Sync + Debug {
/// Access the repositories w/o a transaction scope.
async fn repositories(&self) -> Box<dyn RepoCollection>;
/// Get metric registry associated w/ this catalog.
fn metrics(&self) -> Arc<metric::Registry>;
}
/// Secret module for [sealed traits].
@ -1085,6 +1089,7 @@ pub struct ProcessedTombstone {
#[cfg(test)]
pub(crate) mod test_helpers {
use ::test_helpers::{assert_contains, tracing::TracingCapture};
use metric::{Attributes, Metric, U64Histogram};
use crate::add_parquet_file_with_tombstones;
@ -1105,6 +1110,17 @@ pub(crate) mod test_helpers {
test_add_parquet_file_with_tombstones(Arc::clone(&catalog)).await;
test_txn_isolation(Arc::clone(&catalog)).await;
test_txn_drop(Arc::clone(&catalog)).await;
let metrics = catalog.metrics();
assert_metric_hit(&*metrics, "kafka_create_or_get");
assert_metric_hit(&*metrics, "query_create_or_get");
assert_metric_hit(&*metrics, "namespace_create");
assert_metric_hit(&*metrics, "table_create_or_get");
assert_metric_hit(&*metrics, "column_create_or_get");
assert_metric_hit(&*metrics, "sequencer_create_or_get");
assert_metric_hit(&*metrics, "partition_create_or_get");
assert_metric_hit(&*metrics, "tombstone_create_or_get");
assert_metric_hit(&*metrics, "parquet_create");
}
async fn test_setup(catalog: Arc<dyn Catalog>) {
@ -2091,4 +2107,16 @@ pub(crate) mod test_helpers {
assert!(topic.is_none());
txn.abort().await.unwrap();
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
assert!(hit_count > 1, "metric did not record any calls");
}
}

View File

@ -273,11 +273,15 @@ mod tests {
#[tokio::test]
async fn [<test_validate_schema_ $name>]() {
use crate::interface::Catalog;
use std::ops::DerefMut;
use std::{
ops::DerefMut,
sync::Arc,
};
use pretty_assertions::assert_eq;
const NAMESPACE_NAME: &str = "bananas";
let catalog = MemCatalog::new();
let metrics = Arc::new(metric::Registry::default());
let catalog = MemCatalog::new(metrics);
let mut txn = catalog.start_transaction().await.unwrap();
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap();

View File

@ -1,14 +1,17 @@
//! This module implements an in-memory implementation of the iox_catalog interface. It can be
//! used for testing or for an IOx designed to run without catalog persistence.
use crate::interface::{
sealed::TransactionFinalize, Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error,
KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId,
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId,
SequencerRepo, Table, TableId, TablePersistInfo, TableRepo, Timestamp, Tombstone, TombstoneId,
TombstoneRepo, Transaction,
use crate::{
interface::{
sealed::TransactionFinalize, Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error,
KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId,
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId,
SequencerRepo, Table, TableId, TablePersistInfo, TableRepo, Timestamp, Tombstone,
TombstoneId, TombstoneRepo, Transaction,
},
metrics::MetricDecorator,
};
use async_trait::async_trait;
use observability_deps::tracing::warn;
@ -20,15 +23,18 @@ use uuid::Uuid;
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
/// the catalog interface.
#[derive(Default)]
pub struct MemCatalog {
metrics: Arc<metric::Registry>,
collections: Arc<Mutex<MemCollections>>,
}
impl MemCatalog {
/// return new initialized `MemCatalog`
pub fn new() -> Self {
Self::default()
pub fn new(metrics: Arc<metric::Registry>) -> Self {
Self {
metrics,
collections: Default::default(),
}
}
}
@ -101,20 +107,30 @@ impl Catalog for MemCatalog {
async fn start_transaction(&self) -> Result<Box<dyn Transaction>, Error> {
let guard = Arc::clone(&self.collections).lock_owned().await;
let stage = guard.clone();
Ok(Box::new(MemTxn {
inner: MemTxnInner::Txn {
guard,
stage,
finalized: false,
Ok(Box::new(MetricDecorator::new(
MemTxn {
inner: MemTxnInner::Txn {
guard,
stage,
finalized: false,
},
},
}))
Arc::clone(&self.metrics),
)))
}
async fn repositories(&self) -> Box<dyn RepoCollection> {
let collections = Arc::clone(&self.collections).lock_owned().await;
Box::new(MemTxn {
inner: MemTxnInner::NoTxn { collections },
})
Box::new(MetricDecorator::new(
MemTxn {
inner: MemTxnInner::NoTxn { collections },
},
Arc::clone(&self.metrics),
))
}
fn metrics(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
}
@ -850,6 +866,7 @@ mod tests {
#[tokio::test]
async fn test_catalog() {
crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await;
let metrics = Arc::new(metric::Registry::default());
crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new(metrics))).await;
}
}

View File

@ -231,6 +231,10 @@ impl Catalog for PostgresCatalog {
Arc::clone(&self.metrics),
))
}
fn metrics(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
}
/// Creates a new [`sqlx::Pool`] from a database config and an explicit DSN.
@ -1231,7 +1235,6 @@ mod tests {
use super::*;
use metric::{Attributes, Metric, U64Histogram};
use rand::Rng;
use std::{env, ops::DerefMut, sync::Arc};
use std::{io::Write, time::Instant};
@ -1284,18 +1287,6 @@ mod tests {
};
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
assert!(hit_count > 1, "metric did not record any calls");
}
async fn setup_db() -> PostgresCatalog {
// create a random schema for this particular pool
let schema_name = {
@ -1347,20 +1338,9 @@ mod tests {
maybe_skip_integration!();
let postgres = setup_db().await;
let metrics = Arc::clone(&postgres.metrics);
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
crate::interface::test_helpers::test_catalog(postgres).await;
assert_metric_hit(&*metrics, "kafka_create_or_get");
assert_metric_hit(&*metrics, "query_create_or_get");
assert_metric_hit(&*metrics, "namespace_create");
assert_metric_hit(&*metrics, "table_create_or_get");
assert_metric_hit(&*metrics, "column_create_or_get");
assert_metric_hit(&*metrics, "sequencer_create_or_get");
assert_metric_hit(&*metrics, "partition_create_or_get");
assert_metric_hit(&*metrics, "tombstone_create_or_get");
assert_metric_hit(&*metrics, "parquet_create");
}
#[tokio::test]

View File

@ -351,9 +351,9 @@ mod tests {
#[tokio::test]
async fn test_create_record() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let object_store = Arc::new(ObjectStore::new_in_memory());
let metric_registry = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(ObjectStore::new_in_memory());
let time_provider = Arc::new(MockProvider::new(now()));
let adapter = ParquetChunkAdapter::new(

View File

@ -159,9 +159,9 @@ mod tests {
#[tokio::test]
async fn test_sync() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let object_store = Arc::new(ObjectStore::new_in_memory());
let metric_registry = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(ObjectStore::new_in_memory());
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0)));
let db = QuerierDatabase::new(

View File

@ -53,7 +53,8 @@ fn e2e_benchmarks(c: &mut Criterion) {
let mut group = c.benchmark_group("e2e");
let delegate = {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));

View File

@ -160,7 +160,8 @@ mod tests {
},
);
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let creator = NamespaceAutocreation::new(
Arc::clone(&catalog),
@ -195,7 +196,8 @@ mod tests {
let ns = DatabaseName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let creator = NamespaceAutocreation::new(
Arc::clone(&catalog),

View File

@ -226,7 +226,8 @@ mod tests {
/// Initialise an in-memory [`MemCatalog`] and create a single namespace
/// named [`NAMESPACE`].
async fn create_catalog() -> Arc<dyn Catalog> {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let mut repos = catalog.repositories().await;
repos

View File

@ -89,7 +89,7 @@ impl TestContext {
.collect::<JumpHash<_>>(),
);
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));