diff --git a/ingester/src/init.rs b/ingester/src/init.rs index 2ce6f7b621..d8a88145c2 100644 --- a/ingester/src/init.rs +++ b/ingester/src/init.rs @@ -36,8 +36,8 @@ use crate::{ ingest_state::IngestState, ingester_id::IngesterId, persist::{ - completion_observer::NopObserver, handle::PersistHandle, - hot_partitions::HotPartitionPersister, + completion_observer::NopObserver, file_metrics::ParquetFileInstrumentation, + handle::PersistHandle, hot_partitions::HotPartitionPersister, }, query::{ exec_instrumentation::QueryExecInstrumentation, @@ -292,7 +292,9 @@ where persist_executor, object_store, Arc::clone(&catalog), - NopObserver::default(), + // Register a post-persistence observer that emits Parquet file + // attributes as metrics. + ParquetFileInstrumentation::new(NopObserver::default(), &metrics), &metrics, ); let persist_handle = Arc::new(persist_handle); diff --git a/ingester/src/persist/completion_observer.rs b/ingester/src/persist/completion_observer.rs index 46fa9e6a44..fc7fcc74a9 100644 --- a/ingester/src/persist/completion_observer.rs +++ b/ingester/src/persist/completion_observer.rs @@ -1,7 +1,9 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; -use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId, PartitionId, TableId}; +use data_types::{ + sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, PartitionId, TableId, +}; /// An abstract observer of persistence completion events. /// @@ -24,10 +26,8 @@ pub(crate) trait PersistCompletionObserver: Send + Sync + Debug { /// A set of details describing the persisted data. #[derive(Debug)] pub struct CompletedPersist { - /// The catalog identifiers for the persisted partition. - namespace_id: NamespaceId, - table_id: TableId, - partition_id: PartitionId, + /// The catalog metadata for the persist operation. + meta: ParquetFileParams, /// The [`SequenceNumberSet`] of the persisted data. sequence_numbers: SequenceNumberSet, @@ -35,33 +35,26 @@ pub struct CompletedPersist { impl CompletedPersist { /// Construct a new completion notification. - pub(crate) fn new( - namespace_id: NamespaceId, - table_id: TableId, - partition_id: PartitionId, - sequence_numbers: SequenceNumberSet, - ) -> Self { + pub(crate) fn new(meta: ParquetFileParams, sequence_numbers: SequenceNumberSet) -> Self { Self { - namespace_id, - table_id, - partition_id, + meta, sequence_numbers, } } /// Returns the [`NamespaceId`] of the persisted data. pub(crate) fn namespace_id(&self) -> NamespaceId { - self.namespace_id + self.meta.namespace_id } /// Returns the [`TableId`] of the persisted data. pub(crate) fn table_id(&self) -> TableId { - self.table_id + self.meta.table_id } /// Returns the [`PartitionId`] of the persisted data. pub(crate) fn partition_id(&self) -> PartitionId { - self.partition_id + self.meta.partition_id } /// Returns the [`SequenceNumberSet`] of the persisted data. @@ -85,6 +78,31 @@ impl CompletedPersist { .map(|v| v.into_sequence_numbers()) .unwrap_or_else(|v| v.sequence_numbers().clone()) } + + /// The number of rows persisted. + pub fn row_count(&self) -> usize { + self.meta.row_count as _ + } + + /// The number of columns persisted. + pub fn column_count(&self) -> usize { + self.meta.column_set.len() + } + + /// The byte size of the generated Parquet file. + pub fn parquet_file_bytes(&self) -> usize { + self.meta.file_size_bytes as _ + } + + /// The duration of time covered by this file (difference between min + /// timestamp, and max timestamp). + pub fn timestamp_range(&self) -> Duration { + let min = iox_time::Time::from(self.meta.min_time); + let max = iox_time::Time::from(self.meta.max_time); + + max.checked_duration_since(min) + .expect("parquet min/max file timestamp difference is negative") + } } /// A no-op implementation of the [`PersistCompletionObserver`] trait. @@ -138,10 +156,31 @@ pub(crate) mod mock { #[cfg(test)] mod tests { - use data_types::SequenceNumber; + use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp}; use super::*; + const NAMESPACE_ID: NamespaceId = NamespaceId::new(1); + const TABLE_ID: TableId = TableId::new(1); + const PARTITION_ID: PartitionId = PartitionId::new(1); + + fn arbitrary_file_meta() -> ParquetFileParams { + ParquetFileParams { + namespace_id: NAMESPACE_ID, + table_id: TABLE_ID, + partition_id: PARTITION_ID, + object_store_id: Default::default(), + min_time: Timestamp::new(42), + max_time: Timestamp::new(42), + file_size_bytes: 42424242, + row_count: 24, + compaction_level: data_types::CompactionLevel::Initial, + created_at: Timestamp::new(1234), + column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)), + max_l0_created_at: Timestamp::new(42), + } + } + #[test] fn test_owned_sequence_numbers_only_ref() { let orig_set = [SequenceNumber::new(42)] @@ -149,9 +188,7 @@ mod tests { .collect::(); let note = Arc::new(CompletedPersist::new( - NamespaceId::new(1), - TableId::new(2), - PartitionId::new(3), + arbitrary_file_meta(), orig_set.clone(), )); @@ -165,9 +202,7 @@ mod tests { .collect::(); let note = Arc::new(CompletedPersist::new( - NamespaceId::new(1), - TableId::new(2), - PartitionId::new(3), + arbitrary_file_meta(), orig_set.clone(), )); @@ -176,4 +211,53 @@ mod tests { assert_eq!(orig_set, note.owned_sequence_numbers()); assert_eq!(orig_set, note2.owned_sequence_numbers()); } + + #[test] + fn test_accessors() { + let meta = arbitrary_file_meta(); + + let note = CompletedPersist::new(meta.clone(), Default::default()); + + assert_eq!(note.namespace_id(), meta.namespace_id); + assert_eq!(note.table_id(), meta.table_id); + assert_eq!(note.partition_id(), meta.partition_id); + + assert_eq!(note.column_count(), meta.column_set.len()); + assert_eq!(note.row_count(), meta.row_count as usize); + assert_eq!(note.parquet_file_bytes(), meta.file_size_bytes as usize); + } + + #[test] + fn test_timestamp_range() { + const RANGE: Duration = Duration::from_secs(42); + + let min = iox_time::Time::from_timestamp_nanos(0); + let max = min.checked_add(RANGE).unwrap(); + + let mut meta = arbitrary_file_meta(); + meta.min_time = Timestamp::from(min); + meta.max_time = Timestamp::from(max); + + let note = CompletedPersist::new(meta, Default::default()); + + assert_eq!(note.timestamp_range(), RANGE); + } + + #[test] + #[should_panic(expected = "parquet min/max file timestamp difference is negative")] + fn test_timestamp_range_negative() { + const RANGE: Duration = Duration::from_secs(42); + + let min = iox_time::Time::from_timestamp_nanos(0); + let max = min.checked_add(RANGE).unwrap(); + + let mut meta = arbitrary_file_meta(); + + // Values are the wrong way around! + meta.max_time = Timestamp::from(min); + meta.min_time = Timestamp::from(max); + + let note = CompletedPersist::new(meta, Default::default()); + let _ = note.timestamp_range(); + } } diff --git a/ingester/src/persist/context.rs b/ingester/src/persist/context.rs index 97bb2781aa..77ba1ac9b9 100644 --- a/ingester/src/persist/context.rs +++ b/ingester/src/persist/context.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use data_types::{NamespaceId, PartitionId, PartitionKey, TableId}; +use data_types::{NamespaceId, ParquetFileParams, PartitionId, PartitionKey, TableId}; use observability_deps::tracing::*; use parking_lot::Mutex; use schema::sort::SortKey; @@ -217,8 +217,12 @@ impl Context { // Call [`PartitionData::mark_complete`] to finalise the persistence job, // emit a log for the user, and notify the observer of this persistence // task, if any. - pub(super) async fn mark_complete(self, object_store_id: Uuid, completion_observer: &O) - where + pub(super) async fn mark_complete( + self, + object_store_id: Uuid, + metadata: ParquetFileParams, + completion_observer: &O, + ) where O: PersistCompletionObserver, { // Mark the partition as having completed persistence, causing it to @@ -234,12 +238,7 @@ impl Context { // Dispatch the completion notification into the observer chain before // completing the persist operation. completion_observer - .persist_complete(Arc::new(CompletedPersist::new( - self.namespace_id, - self.table_id, - self.partition_id, - sequence_numbers, - ))) + .persist_complete(Arc::new(CompletedPersist::new(metadata, sequence_numbers))) .await; let now = Instant::now(); diff --git a/ingester/src/persist/file_metrics.rs b/ingester/src/persist/file_metrics.rs new file mode 100644 index 0000000000..d12024b43f --- /dev/null +++ b/ingester/src/persist/file_metrics.rs @@ -0,0 +1,228 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use metric::{ + DurationHistogram, DurationHistogramOptions, U64Histogram, U64HistogramOptions, DURATION_MAX, +}; + +use super::completion_observer::{CompletedPersist, PersistCompletionObserver}; + +const MINUTES: Duration = Duration::from_secs(60 * 60); + +#[derive(Debug)] +pub(crate) struct ParquetFileInstrumentation { + inner: T, + + row_count: U64Histogram, + column_count: U64Histogram, + file_size_bytes: U64Histogram, + file_time_range: DurationHistogram, +} + +impl ParquetFileInstrumentation { + pub(crate) fn new(inner: T, metrics: &metric::Registry) -> Self { + // A metric capturing the duration difference between min & max + // timestamps. + let file_time_range: DurationHistogram = metrics + .register_metric_with_options::( + "ingester_persist_parquet_file_time_range", + "range from min to max timestamp in output parquet file", + || { + DurationHistogramOptions::new([ + 30 * MINUTES, // 30m + 60 * MINUTES, // 1h + 120 * MINUTES, // 2h + 240 * MINUTES, // 4h + 480 * MINUTES, // 8h + 960 * MINUTES, // 16h + 1_920 * MINUTES, // 32h + DURATION_MAX, + ]) + }, + ) + .recorder(&[]); + + // File size distribution. + let file_size_bytes: U64Histogram = metrics + .register_metric_with_options::( + "ingester_persist_parquet_file_size_bytes", + "distribution of output parquet file size in bytes", + || { + U64HistogramOptions::new([ + 4_u64.pow(5), // 1 kibibyte + 4_u64.pow(6), // 4 kibibytes + 4_u64.pow(7), // 16 kibibytes + 4_u64.pow(8), // 64 kibibytes + 4_u64.pow(9), // 256 kibibytes + 4_u64.pow(10), // 1 mebibyte + 4_u64.pow(11), // 4 mebibytes + 4_u64.pow(12), // 16 mebibytes + 4_u64.pow(13), // 64 mebibytes + 4_u64.pow(14), // 256 mebibytes + 4_u64.pow(15), // 1 gibibyte + 4_u64.pow(16), // 4 gibibytes + u64::MAX, + ]) + }, + ) + .recorder(&[]); + + // Row count distribution. + let row_count: U64Histogram = metrics + .register_metric_with_options::( + "ingester_persist_parquet_file_row_count", + "distribution of row count in output parquet files", + || { + U64HistogramOptions::new([ + 4_u64.pow(3), // 64 + 4_u64.pow(4), // 256 + 4_u64.pow(5), // 1,024 + 4_u64.pow(6), // 4,096 + 4_u64.pow(7), // 16,384 + 4_u64.pow(8), // 65,536 + 4_u64.pow(9), // 262,144 + 4_u64.pow(10), // 1,048,576 + 4_u64.pow(11), // 4,194,304 + 4_u64.pow(12), // 16,777,216 + u64::MAX, + ]) + }, + ) + .recorder(&[]); + + // Column count distribution. + // + // Because the column count is, by default, limited per table, this + // range should exceed that limit by some degree to discover overshoot + // (limits are eventually consistent) and correctly measure workloads + // that have been configured with a higher limit. + let column_count: U64Histogram = metrics + .register_metric_with_options::( + "ingester_persist_parquet_file_column_count", + "distribution of column count in output parquet files", + || { + U64HistogramOptions::new([ + 2_u64.pow(1), // 2 + 2_u64.pow(2), // 4 + 2_u64.pow(3), // 8 + 2_u64.pow(4), // 16 + 2_u64.pow(5), // 32 + 2_u64.pow(6), // 64 + 2_u64.pow(7), // 128 + 2_u64.pow(8), // 256 + 2_u64.pow(9), // 512 + 2_u64.pow(10), // 1,024 + 2_u64.pow(11), // 2,048 + u64::MAX, + ]) + }, + ) + .recorder(&[]); + + Self { + inner, + row_count, + column_count, + file_size_bytes, + file_time_range, + } + } +} + +#[async_trait] +impl PersistCompletionObserver for ParquetFileInstrumentation +where + T: PersistCompletionObserver, +{ + async fn persist_complete(&self, note: Arc) { + // Observe the persistence notification values. + self.row_count.record(note.row_count() as _); + self.column_count.record(note.column_count() as _); + self.file_size_bytes.record(note.parquet_file_bytes() as _); + self.file_time_range.record(note.timestamp_range()); + + // Forward on the notification to the next handler. + self.inner.persist_complete(note).await; + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use data_types::{ + sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, NamespaceId, + ParquetFileParams, PartitionId, TableId, Timestamp, + }; + use metric::assert_histogram; + + use crate::persist::completion_observer::mock::MockCompletionObserver; + + use super::*; + + const NAMESPACE_ID: NamespaceId = NamespaceId::new(1); + const TABLE_ID: TableId = TableId::new(1); + const PARTITION_ID: PartitionId = PartitionId::new(1); + + #[tokio::test] + async fn test_persisted_file_metrics() { + let inner = Arc::new(MockCompletionObserver::default()); + + let metrics = metric::Registry::default(); + let decorator = ParquetFileInstrumentation::new(Arc::clone(&inner), &metrics); + + let meta = ParquetFileParams { + namespace_id: NAMESPACE_ID, + table_id: TABLE_ID, + partition_id: PARTITION_ID, + object_store_id: Default::default(), + min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _), + max_time: Timestamp::new(Duration::from_secs(1_042).as_nanos() as _), // 42 seconds later + file_size_bytes: 42424242, + row_count: 24, + compaction_level: data_types::CompactionLevel::Initial, + created_at: Timestamp::new(1234), + column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)), + max_l0_created_at: Timestamp::new(42), + }; + + decorator + .persist_complete(Arc::new(CompletedPersist::new( + meta.clone(), + SequenceNumberSet::default(), + ))) + .await; + + assert_histogram!( + metrics, + DurationHistogram, + "ingester_persist_parquet_file_time_range", + samples = 1, + sum = Duration::from_secs(42), + ); + + assert_histogram!( + metrics, + U64Histogram, + "ingester_persist_parquet_file_size_bytes", + samples = 1, + sum = meta.file_size_bytes as u64, + ); + + assert_histogram!( + metrics, + U64Histogram, + "ingester_persist_parquet_file_row_count", + samples = 1, + sum = meta.row_count as u64, + ); + + assert_histogram!( + metrics, + U64Histogram, + "ingester_persist_parquet_file_column_count", + samples = 1, + sum = meta.column_set.len() as u64, + ); + } +} diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 835964ea77..f1456ace3e 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -5,6 +5,7 @@ pub(super) mod compact; pub(crate) mod completion_observer; mod context; pub(crate) mod drain_buffer; +pub(crate) mod file_metrics; pub(crate) mod handle; pub(crate) mod hot_partitions; pub mod queue; diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index 9d02eba7fd..eaca3a6981 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -135,12 +135,17 @@ pub(super) async fn run_task( }; // Make the newly uploaded parquet file visible to other nodes. - let object_store_id = update_catalog_parquet(&ctx, &worker_state, parquet_table_data).await; + let object_store_id = + update_catalog_parquet(&ctx, &worker_state, &parquet_table_data).await; // And finally mark the persist job as complete and notify any // observers. - ctx.mark_complete(object_store_id, &worker_state.completion_observer) - .await; + ctx.mark_complete( + object_store_id, + parquet_table_data, + &worker_state.completion_observer, + ) + .await; // Capture the time spent actively persisting. let now = Instant::now(); @@ -459,7 +464,7 @@ where async fn update_catalog_parquet( ctx: &Context, worker_state: &SharedWorkerState, - parquet_table_data: ParquetFileParams, + parquet_table_data: &ParquetFileParams, ) -> Uuid where O: Send + Sync, diff --git a/ingester/src/wal/reference_tracker/handle.rs b/ingester/src/wal/reference_tracker/handle.rs index 0e9d33badd..16364bc467 100644 --- a/ingester/src/wal/reference_tracker/handle.rs +++ b/ingester/src/wal/reference_tracker/handle.rs @@ -209,7 +209,9 @@ mod tests { use assert_matches::assert_matches; use async_trait::async_trait; - use data_types::{NamespaceId, PartitionId, TableId}; + use data_types::{ + ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp, + }; use futures::{task::Context, Future, FutureExt}; use metric::{assert_counter, U64Gauge}; use parking_lot::Mutex; @@ -262,9 +264,20 @@ mod tests { T: IntoIterator, { Arc::new(CompletedPersist::new( - NamespaceId::new(1), - TableId::new(2), - PartitionId::new(3), + ParquetFileParams { + namespace_id: NamespaceId::new(1), + table_id: TableId::new(2), + partition_id: PartitionId::new(3), + object_store_id: Default::default(), + min_time: Timestamp::new(42), + max_time: Timestamp::new(42), + file_size_bytes: 42424242, + row_count: 24, + compaction_level: data_types::CompactionLevel::Initial, + created_at: Timestamp::new(1234), + column_set: ColumnSet::new([1, 2, 3, 4].into_iter().map(ColumnId::new)), + max_l0_created_at: Timestamp::new(42), + }, new_set(vals), )) } diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index 930e433461..c046fa6ba9 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -1,11 +1,14 @@ use arrow_util::assert_batches_sorted_eq; use assert_matches::assert_matches; -use data_types::PartitionKey; +use data_types::{PartitionKey, TableId, Timestamp}; use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest; -use ingester_test_ctx::TestContextBuilder; +use ingester_test_ctx::{TestContextBuilder, DEFAULT_MAX_PERSIST_QUEUE_DEPTH}; use iox_catalog::interface::Catalog; -use metric::{DurationHistogram, U64Histogram}; -use std::sync::Arc; +use metric::{ + assert_counter, assert_histogram, DurationHistogram, U64Counter, U64Gauge, U64Histogram, +}; +use parquet_file::ParquetFilePath; +use std::{sync::Arc, time::Duration}; // Write data to an ingester through the RPC interface and query the data, validating the contents. #[tokio::test] @@ -89,6 +92,161 @@ async fn write_query() { assert_eq!(hist.total, 2); } +// Write data to an ingester through the RPC interface and persist the data. +#[tokio::test] +async fn write_persist() { + let namespace_name = "write_query_test_namespace"; + let mut ctx = TestContextBuilder::default().build().await; + let ns = ctx.ensure_namespace(namespace_name, None).await; + + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + namespace_name, + r#"bananas count=42,greatness="inf" 200"#, + partition_key.clone(), + 42, + ) + .await; + + // Perform a query to validate the actual data buffered. + let table_id = ctx.table_id(namespace_name, "bananas").await.get(); + let data: Vec<_> = ctx + .query(IngesterQueryRequest { + namespace_id: ns.id.get(), + table_id, + columns: vec![], + predicate: None, + }) + .await + .expect("query request failed"); + + let expected = vec![ + "+-------+-----------+--------------------------------+", + "| count | greatness | time |", + "+-------+-----------+--------------------------------+", + "| 42.0 | inf | 1970-01-01T00:00:00.000000200Z |", + "+-------+-----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Persist the data. + ctx.persist(namespace_name).await; + + // Ensure the data is no longer buffered. + let data: Vec<_> = ctx + .query(IngesterQueryRequest { + namespace_id: ns.id.get(), + table_id, + columns: vec![], + predicate: None, + }) + .await + .expect("query request failed"); + assert!(data.is_empty()); + + // Validate the parquet file was added to the catalog + let parquet_files = ctx.catalog_parquet_file_records(namespace_name).await; + let (path, want_file_size) = assert_matches!(parquet_files.as_slice(), [f] => { + assert_eq!(f.namespace_id, ns.id); + assert_eq!(f.table_id, TableId::new(table_id)); + assert_eq!(f.min_time, Timestamp::new(200)); + assert_eq!(f.max_time, Timestamp::new(200)); + assert_eq!(f.to_delete, None); + assert_eq!(f.row_count, 1); + assert_eq!(f.column_set.len(), 3); + assert_eq!(f.max_l0_created_at, f.created_at); + + (ParquetFilePath::from(f), f.file_size_bytes) + }); + + // Validate the file exists at the expected object store path. + let file_size = ctx + .object_store() + .get(&path.object_store_path()) + .await + .expect("parquet file must exist in object store") + .bytes() + .await + .expect("failed to read parquet file bytes") + .len(); + assert_eq!(file_size, want_file_size as usize); + + // And that the persist metrics were recorded. + let metrics = ctx.metrics(); + + //////////////////////////////////////////////////////////////////////////// + // Config reflection metrics + assert_counter!( + metrics, + U64Gauge, + "ingester_persist_max_parallelism", + value = 5, + ); + + assert_counter!( + metrics, + U64Gauge, + "ingester_persist_max_queue_depth", + value = DEFAULT_MAX_PERSIST_QUEUE_DEPTH as u64, + ); + + //////////////////////////////////////////////////////////////////////////// + // Persist worker metrics + assert_histogram!( + metrics, + DurationHistogram, + "ingester_persist_active_duration", + samples = 1, + ); + + assert_histogram!( + metrics, + DurationHistogram, + "ingester_persist_enqueue_duration", + samples = 1, + ); + + assert_counter!( + metrics, + U64Counter, + "ingester_persist_enqueued_jobs", + value = 1, + ); + + //////////////////////////////////////////////////////////////////////////// + // Parquet file metrics + assert_histogram!( + metrics, + DurationHistogram, + "ingester_persist_parquet_file_time_range", + samples = 1, + sum = Duration::from_secs(0), + ); + + assert_histogram!( + metrics, + U64Histogram, + "ingester_persist_parquet_file_size_bytes", + samples = 1, + ); + + assert_histogram!( + metrics, + U64Histogram, + "ingester_persist_parquet_file_row_count", + samples = 1, + sum = 1, + ); + + assert_histogram!( + metrics, + U64Histogram, + "ingester_persist_parquet_file_column_count", + samples = 1, + sum = 3, + ); +} + // Write data to the ingester, which writes it to the WAL, then drop and recreate the WAL and // validate the data is replayed from the WAL into memory. #[tokio::test] diff --git a/ingester_test_ctx/src/lib.rs b/ingester_test_ctx/src/lib.rs index d3f7f68f44..5d7d290163 100644 --- a/ingester_test_ctx/src/lib.rs +++ b/ingester_test_ctx/src/lib.rs @@ -35,6 +35,7 @@ use iox_time::TimeProvider; use metric::{Attributes, Metric, MetricObserver}; use mutable_batch_lp::lines_to_batches; use mutable_batch_pb::encode::encode_write; +use object_store::ObjectStore; use observability_deps::tracing::*; use parquet_file::storage::ParquetStorage; use tempfile::TempDir; @@ -158,7 +159,7 @@ impl TestContextBuilder { shutdown_tx, _dir: dir, catalog, - _storage: storage, + storage, metrics, namespaces: Default::default(), } @@ -174,7 +175,7 @@ pub struct TestContext { ingester: IngesterGuard, shutdown_tx: oneshot::Sender, catalog: Arc, - _storage: ParquetStorage, + storage: ParquetStorage, metrics: Arc, /// Once the last [`TempDir`] reference is dropped, the directory it @@ -415,6 +416,11 @@ where .recorder() } + /// Return the metric recorder for the [`TestContext`]. + pub fn metrics(&self) -> &metric::Registry { + &self.metrics + } + /// Retrieve the Parquet files in the catalog for the specified namespace. pub async fn catalog_parquet_file_records(&self, namespace: &str) -> Vec { let namespace_id = self.namespace_id(namespace).await; @@ -432,6 +438,11 @@ where Arc::clone(&self.catalog) } + /// Return the [`ObjectStore`] for this [`TestContext`]. + pub fn object_store(&self) -> Arc { + Arc::clone(self.storage.object_store()) + } + /// Return the [`IngesterRpcInterface`] for this [`TestContext`]. /// /// Calls duration made through this interface measures the cost of the