From ed44817ed15eb0de80ed0c6697bb1e4bf83ef4fd Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 10 Aug 2022 16:06:03 -0400 Subject: [PATCH] feat: Add a histogram of ingested (new L0) Parquet file sizes Connects to #5348. --- ingester/src/data.rs | 82 +++++++++++++++++++++++++++---- ingester/src/handler.rs | 1 + ingester/src/test_util.rs | 4 +- query_tests/src/scenarios/util.rs | 1 + 4 files changed, 77 insertions(+), 11 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 38f1f834ce..797c48f9d1 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -19,7 +19,7 @@ use futures::{Stream, StreamExt}; use iox_catalog::interface::{get_table_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::SystemProvider; -use metric::U64Counter; +use metric::{Attributes, Metric, U64Counter, U64Histogram, U64HistogramOptions}; use mutable_batch::MutableBatch; use object_store::DynObjectStore; use observability_deps::tracing::{debug, warn}; @@ -94,6 +94,9 @@ pub struct IngesterData { /// Backoff config backoff_config: BackoffConfig, + + /// Metrics for file size of persisted Parquet files + persisted_file_size_bytes: Metric, } impl IngesterData { @@ -104,13 +107,30 @@ impl IngesterData { sequencers: BTreeMap, exec: Arc, backoff_config: BackoffConfig, + metrics: Arc, ) -> Self { + let persisted_file_size_bytes = metrics.register_metric_with_options( + "ingester_persisted_file_size_bytes", + "Size of files persisted by the ingester", + || { + U64HistogramOptions::new([ + 500 * 1024, // 500 KB + 1024 * 1024, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]) + }, + ); + Self { store: ParquetStorage::new(object_store), catalog, sequencers, exec, backoff_config, + persisted_file_size_bytes, } } @@ -344,6 +364,15 @@ impl Persister for IngesterData { .await .expect("retry forever"); + // Record metrics + let attributes = Attributes::from([( + "sequencer_id", + format!("{}", partition_info.partition.sequencer_id).into(), + )]); + self.persisted_file_size_bytes + .recorder(attributes) + .record(file_size as u64); + // and remove the persisted data from memory debug!( ?partition_id, @@ -1566,8 +1595,8 @@ pub(crate) type IngesterQueryPartitionStream = /// Response streams for querier<>ingester requests. /// -/// The data structure is constructed to allow lazy/streaming data generation. For easier consumption according to the -/// wire protocol, use the [`flatten`](Self::flatten) method. +/// The data structure is constructed to allow lazy/streaming data generation. For easier +/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method. pub struct IngesterQueryResponse { /// Stream of partitions. partitions: IngesterQueryPartitionStream, @@ -1649,13 +1678,15 @@ pub enum FlatIngesterQueryResponse { /// Start a new snapshot. /// - /// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) message. + /// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) + /// message. StartSnapshot { /// Snapshot schema. schema: Arc, }, - /// Add a record batch to the snapshot that was announced by the last [`StartSnapshot`](Self::StartSnapshot) message. + /// Add a record batch to the snapshot that was announced by the last + /// [`StartSnapshot`](Self::StartSnapshot) message. RecordBatch { /// Record batch. batch: RecordBatch, @@ -1796,6 +1827,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -1889,6 +1921,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -1943,7 +1976,7 @@ mod tests { let manager = LifecycleManager::new( LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)), - metrics, + Arc::clone(&metrics), Arc::new(SystemProvider::new()), ); @@ -2016,6 +2049,30 @@ mod tests { assert_eq!(pf.sequencer_id, sequencer1.id); assert!(pf.to_delete.is_none()); + // This value should be recorded in the metrics asserted next; it is less than 500 KB + assert_eq!(pf.file_size_bytes, 1252); + + // verify metrics + let persisted_file_size_bytes: Metric = metrics + .get_instrument("ingester_persisted_file_size_bytes") + .unwrap(); + + let observation = persisted_file_size_bytes + .get_observer(&Attributes::from([( + "sequencer_id", + format!("{}", sequencer1.id).into(), + )])) + .unwrap() + .fetch(); + assert_eq!(observation.sample_count(), 1); + let buckets_with_counts: Vec<_> = observation + .buckets + .iter() + .filter_map(|o| if o.count == 0 { None } else { Some(o.le) }) + .collect(); + // Only the < 500 KB bucket has a count + assert_eq!(buckets_with_counts, &[500 * 1024]); + // verify it set a sort key on the partition in the catalog let partition_info = repos .partitions() @@ -2083,6 +2140,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -2284,7 +2342,8 @@ mod tests { 50, // max time of data to get deleted "city=Boston", // delete predicate ); - // two rows will get deleted, one from existing snapshot, one from the buffer being moved to snpashot + // two rows will get deleted, one from existing snapshot, one from the buffer being moved + // to snpashot p.buffer_tombstone(&exec, "restaurant", ts).await; // verify data @@ -2341,9 +2400,11 @@ mod tests { // verify data assert!(p.data.buffer.is_none()); // always empty after delete - // no snpashots becasue buffer has not data yet and the sanpshot was empty too + // no snpashots becasue buffer has not data yet and the + // snapshot was empty too assert_eq!(p.data.snapshots.len(), 0); - assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is persisting + assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is + // persisting assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); // ------------------------------------------ @@ -2362,7 +2423,7 @@ mod tests { assert_eq!( p.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(8) - ); // 1 newlly added mutable batch of 3 rows of data + ); // 1 newly added mutable batch of 3 rows of data assert_eq!(p.data.snapshots.len(), 0); // still empty assert_eq!(p.data.deletes_during_persisting.len(), 1); assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); @@ -2622,6 +2683,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 0cbb02e4cc..6aa373e58a 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -157,6 +157,7 @@ impl IngestHandlerImpl { sequencers, exec, BackoffConfig::default(), + Arc::clone(&metric_registry), )); let ingester_data = Arc::clone(&data); diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 3adc6b3a7b..e2d0eecd0f 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -701,13 +701,14 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa sequencers, exec, backoff::BackoffConfig::default(), + metrics, ) } pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData { // Whatever data because they won't be used in the tests let metrics: Arc = Default::default(); - let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let object_store = Arc::new(InMemory::new()); let exec = Arc::new(iox_query::exec::Executor::new(1)); @@ -745,6 +746,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa sequencers, exec, backoff::BackoffConfig::default(), + metrics, ) } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 3f8f63d1c7..409fad54f9 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -704,6 +704,7 @@ impl MockIngester { sequencers, catalog.exec(), BackoffConfig::default(), + catalog.metric_registry(), )); Self {