From ed44817ed15eb0de80ed0c6697bb1e4bf83ef4fd Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 10 Aug 2022 16:06:03 -0400 Subject: [PATCH 1/5] feat: Add a histogram of ingested (new L0) Parquet file sizes Connects to #5348. --- ingester/src/data.rs | 82 +++++++++++++++++++++++++++---- ingester/src/handler.rs | 1 + ingester/src/test_util.rs | 4 +- query_tests/src/scenarios/util.rs | 1 + 4 files changed, 77 insertions(+), 11 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 38f1f834ce..797c48f9d1 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -19,7 +19,7 @@ use futures::{Stream, StreamExt}; use iox_catalog::interface::{get_table_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::SystemProvider; -use metric::U64Counter; +use metric::{Attributes, Metric, U64Counter, U64Histogram, U64HistogramOptions}; use mutable_batch::MutableBatch; use object_store::DynObjectStore; use observability_deps::tracing::{debug, warn}; @@ -94,6 +94,9 @@ pub struct IngesterData { /// Backoff config backoff_config: BackoffConfig, + + /// Metrics for file size of persisted Parquet files + persisted_file_size_bytes: Metric, } impl IngesterData { @@ -104,13 +107,30 @@ impl IngesterData { sequencers: BTreeMap, exec: Arc, backoff_config: BackoffConfig, + metrics: Arc, ) -> Self { + let persisted_file_size_bytes = metrics.register_metric_with_options( + "ingester_persisted_file_size_bytes", + "Size of files persisted by the ingester", + || { + U64HistogramOptions::new([ + 500 * 1024, // 500 KB + 1024 * 1024, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]) + }, + ); + Self { store: ParquetStorage::new(object_store), catalog, sequencers, exec, backoff_config, + persisted_file_size_bytes, } } @@ -344,6 +364,15 @@ impl Persister for IngesterData { .await .expect("retry forever"); + // Record metrics + let attributes = Attributes::from([( + "sequencer_id", + format!("{}", partition_info.partition.sequencer_id).into(), + )]); + self.persisted_file_size_bytes + .recorder(attributes) + .record(file_size as u64); + // and remove the persisted data from memory debug!( ?partition_id, @@ -1566,8 +1595,8 @@ pub(crate) type IngesterQueryPartitionStream = /// Response streams for querier<>ingester requests. /// -/// The data structure is constructed to allow lazy/streaming data generation. For easier consumption according to the -/// wire protocol, use the [`flatten`](Self::flatten) method. +/// The data structure is constructed to allow lazy/streaming data generation. For easier +/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method. pub struct IngesterQueryResponse { /// Stream of partitions. partitions: IngesterQueryPartitionStream, @@ -1649,13 +1678,15 @@ pub enum FlatIngesterQueryResponse { /// Start a new snapshot. /// - /// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) message. + /// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) + /// message. StartSnapshot { /// Snapshot schema. schema: Arc, }, - /// Add a record batch to the snapshot that was announced by the last [`StartSnapshot`](Self::StartSnapshot) message. + /// Add a record batch to the snapshot that was announced by the last + /// [`StartSnapshot`](Self::StartSnapshot) message. RecordBatch { /// Record batch. batch: RecordBatch, @@ -1796,6 +1827,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -1889,6 +1921,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -1943,7 +1976,7 @@ mod tests { let manager = LifecycleManager::new( LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)), - metrics, + Arc::clone(&metrics), Arc::new(SystemProvider::new()), ); @@ -2016,6 +2049,30 @@ mod tests { assert_eq!(pf.sequencer_id, sequencer1.id); assert!(pf.to_delete.is_none()); + // This value should be recorded in the metrics asserted next; it is less than 500 KB + assert_eq!(pf.file_size_bytes, 1252); + + // verify metrics + let persisted_file_size_bytes: Metric = metrics + .get_instrument("ingester_persisted_file_size_bytes") + .unwrap(); + + let observation = persisted_file_size_bytes + .get_observer(&Attributes::from([( + "sequencer_id", + format!("{}", sequencer1.id).into(), + )])) + .unwrap() + .fetch(); + assert_eq!(observation.sample_count(), 1); + let buckets_with_counts: Vec<_> = observation + .buckets + .iter() + .filter_map(|o| if o.count == 0 { None } else { Some(o.le) }) + .collect(); + // Only the < 500 KB bucket has a count + assert_eq!(buckets_with_counts, &[500 * 1024]); + // verify it set a sort key on the partition in the catalog let partition_info = repos .partitions() @@ -2083,6 +2140,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -2284,7 +2342,8 @@ mod tests { 50, // max time of data to get deleted "city=Boston", // delete predicate ); - // two rows will get deleted, one from existing snapshot, one from the buffer being moved to snpashot + // two rows will get deleted, one from existing snapshot, one from the buffer being moved + // to snpashot p.buffer_tombstone(&exec, "restaurant", ts).await; // verify data @@ -2341,9 +2400,11 @@ mod tests { // verify data assert!(p.data.buffer.is_none()); // always empty after delete - // no snpashots becasue buffer has not data yet and the sanpshot was empty too + // no snpashots becasue buffer has not data yet and the + // snapshot was empty too assert_eq!(p.data.snapshots.len(), 0); - assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is persisting + assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is + // persisting assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); // ------------------------------------------ @@ -2362,7 +2423,7 @@ mod tests { assert_eq!( p.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(8) - ); // 1 newlly added mutable batch of 3 rows of data + ); // 1 newly added mutable batch of 3 rows of data assert_eq!(p.data.snapshots.len(), 0); // still empty assert_eq!(p.data.deletes_during_persisting.len(), 1); assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); @@ -2622,6 +2683,7 @@ mod tests { sequencers, Arc::new(Executor::new(1)), BackoffConfig::default(), + Arc::clone(&metrics), )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 0cbb02e4cc..6aa373e58a 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -157,6 +157,7 @@ impl IngestHandlerImpl { sequencers, exec, BackoffConfig::default(), + Arc::clone(&metric_registry), )); let ingester_data = Arc::clone(&data); diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 3adc6b3a7b..e2d0eecd0f 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -701,13 +701,14 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa sequencers, exec, backoff::BackoffConfig::default(), + metrics, ) } pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData { // Whatever data because they won't be used in the tests let metrics: Arc = Default::default(); - let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let object_store = Arc::new(InMemory::new()); let exec = Arc::new(iox_query::exec::Executor::new(1)); @@ -745,6 +746,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa sequencers, exec, backoff::BackoffConfig::default(), + metrics, ) } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 3f8f63d1c7..409fad54f9 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -704,6 +704,7 @@ impl MockIngester { sequencers, catalog.exec(), BackoffConfig::default(), + catalog.metric_registry(), )); Self { From cd6c809fe073df14448b733d7551ca3d34095d21 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 11 Aug 2022 11:57:22 -0400 Subject: [PATCH 2/5] fix: Change metric tracking sizes of files selected for compaction to a histogram Connects to #5348. --- compactor/src/compact.rs | 32 +++-- compactor/src/lib.rs | 5 +- compactor/src/parquet_file_combining.rs | 6 +- compactor/src/parquet_file_filtering.rs | 178 +++++++++++++++++------- 4 files changed, 152 insertions(+), 69 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 8e929010d1..fa954e7a3c 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -11,7 +11,7 @@ use iox_query::exec::Executor; use iox_time::TimeProvider; use metric::{ Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Counter, U64Gauge, - DURATION_MAX, + U64Histogram, U64HistogramOptions, DURATION_MAX, }; use observability_deps::tracing::debug; use parquet_file::storage::ParquetStorage; @@ -101,17 +101,17 @@ pub struct Compactor { /// Counter for the number of files compacted pub(crate) compaction_counter: Metric, - /// Gauge for the number of compaction partition candidates + /// Gauge for the number of compaction partition candidates before filtering compaction_candidate_gauge: Metric, - /// Gauge for the number of Parquet file candidates. The recorded values have attributes for - /// the compaction level of the file and whether the file was selected for compaction or not. + /// Gauge for the number of Parquet file candidates after filtering. The recorded values have + /// attributes for the compaction level of the file and whether the file was selected for + /// compaction or not. pub(crate) parquet_file_candidate_gauge: Metric, - /// Gauge for the number of bytes of Parquet file candidates. The recorded values have - /// attributes for the compaction level of the file and whether the file was selected for - /// compaction or not. - pub(crate) parquet_file_candidate_bytes_gauge: Metric, + /// Histogram for the number of bytes of Parquet files selected for compaction. The recorded + /// values have attributes for the compaction level of the file. + pub(crate) parquet_file_candidate_bytes: Metric, /// Histogram for tracking the time to compact a partition pub(crate) compaction_duration: Metric, @@ -167,9 +167,19 @@ impl Compactor { "Number of Parquet file candidates", ); - let parquet_file_candidate_bytes_gauge = registry.register_metric( + let file_size_buckets = U64HistogramOptions::new([ + 500 * 1024, // 500 KB + 1024 * 1024, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]); + + let parquet_file_candidate_bytes = registry.register_metric_with_options( "parquet_file_candidate_bytes", - "Number of bytes of Parquet file candidates", + "Number of bytes of Parquet file compaction candidates", + || file_size_buckets.clone(), ); let duration_histogram_options = DurationHistogramOptions::new([ @@ -218,7 +228,7 @@ impl Compactor { compaction_counter, compaction_candidate_gauge, parquet_file_candidate_gauge, - parquet_file_candidate_bytes_gauge, + parquet_file_candidate_bytes, compaction_duration, candidate_selection_duration, partitions_extra_info_reading_duration, diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 8a3205881f..4323cb0777 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -66,7 +66,7 @@ pub(crate) async fn compact_hot_partition( compactor.config.input_size_threshold_bytes(), compactor.config.input_file_count_threshold(), &compactor.parquet_file_candidate_gauge, - &compactor.parquet_file_candidate_bytes_gauge, + &compactor.parquet_file_candidate_bytes, ); let compact_result = parquet_file_combining::compact_parquet_files( @@ -120,7 +120,7 @@ pub(crate) async fn compact_cold_partition( parquet_files_for_compaction, compactor.config.cold_input_size_threshold_bytes(), &compactor.parquet_file_candidate_gauge, - &compactor.parquet_file_candidate_bytes_gauge, + &compactor.parquet_file_candidate_bytes, ); let compact_result = @@ -712,7 +712,6 @@ mod tests { // // - the level 1 file that didn't overlap with anything // - the newly created level 1 file that was only upgraded from level 0 - // - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5 let mut files = catalog.list_by_table_not_to_delete(table.table.id).await; assert_eq!(files.len(), 2); let files_and_levels: Vec<_> = files diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 03f84ff9ba..49c4c89f9a 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -99,7 +99,8 @@ pub(crate) async fn compact_parquet_files( } ); - // Find the total size of all files. + // Find the total size of all files, to be used to determine if the result should be one file + // or if the result should be split into multiple files. let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum(); let total_size = total_size as u64; @@ -317,6 +318,7 @@ pub(crate) async fn compact_parquet_files( "sequencer_id", format!("{}", partition.sequencer_id()).into(), )]); + for (compaction_level, file_count) in files_by_level { let mut attributes = attributes.clone(); attributes.insert("compaction_level", format!("{}", compaction_level as i32)); @@ -908,7 +910,7 @@ mod tests { .collect(); // 1 large file not included in compaction, // 1 newly created CompactionLevel::FileNonOverlapped file as the result of - // compaction and NOT splitting + // compaction and NOT splitting assert_eq!( files_and_levels, vec![ diff --git a/compactor/src/parquet_file_filtering.rs b/compactor/src/parquet_file_filtering.rs index 650c160712..85ff4ece84 100644 --- a/compactor/src/parquet_file_filtering.rs +++ b/compactor/src/parquet_file_filtering.rs @@ -3,7 +3,7 @@ use crate::parquet_file_lookup::ParquetFilesForCompaction; use data_types::ParquetFile; -use metric::{Attributes, Metric, U64Gauge}; +use metric::{Attributes, Metric, U64Gauge, U64Histogram}; use observability_deps::tracing::*; /// Given a list of hot level 0 files sorted by max sequence number and a list of level 1 files for @@ -28,8 +28,8 @@ pub(crate) fn filter_hot_parquet_files( input_file_count_threshold: usize, // Gauge for the number of Parquet file candidates parquet_file_candidate_gauge: &Metric, - // Gauge for the number of bytes of Parquet file candidates - parquet_file_candidate_bytes_gauge: &Metric, + // Histogram for the number of bytes of Parquet file candidates + parquet_file_candidate_bytes: &Metric, ) -> Vec { let ParquetFilesForCompaction { level_0, @@ -118,9 +118,15 @@ pub(crate) fn filter_hot_parquet_files( num_level_1_compacting as u64, ); record_byte_metrics( - parquet_file_candidate_bytes_gauge, - total_level_0_bytes as u64, - total_level_1_bytes as u64, + parquet_file_candidate_bytes, + level_0_to_return + .iter() + .map(|pf| pf.file_size_bytes as u64) + .collect(), + files_to_return + .iter() + .map(|pf| pf.file_size_bytes as u64) + .collect(), ); // Return the level 1 files first, followed by the level 0 files assuming we've maintained @@ -148,8 +154,8 @@ pub(crate) fn filter_cold_parquet_files( max_bytes: u64, // Gauge for the number of Parquet file candidates parquet_file_candidate_gauge: &Metric, - // Gauge for the number of bytes of Parquet file candidates - parquet_file_candidate_bytes_gauge: &Metric, + // Histogram for the number of bytes of Parquet file candidates + parquet_file_candidate_bytes: &Metric, ) -> Vec { let ParquetFilesForCompaction { level_0, @@ -231,9 +237,15 @@ pub(crate) fn filter_cold_parquet_files( num_level_1_compacting as u64, ); record_byte_metrics( - parquet_file_candidate_bytes_gauge, - total_level_0_bytes as u64, - total_level_1_bytes as u64, + parquet_file_candidate_bytes, + level_0_to_return + .iter() + .map(|pf| pf.file_size_bytes as u64) + .collect(), + files_to_return + .iter() + .map(|pf| pf.file_size_bytes as u64) + .collect(), ); // Return the level 1 files first, followed by the level 0 files assuming we've maintained @@ -284,17 +296,21 @@ fn record_file_metrics( } fn record_byte_metrics( - gauge: &Metric, - total_level_0_bytes: u64, - total_level_1_bytes: u64, + histogram: &Metric, + level_0_sizes: Vec, + level_1_sizes: Vec, ) { let attributes = Attributes::from(&[("compaction_level", "0")]); - let recorder = gauge.recorder(attributes); - recorder.set(total_level_0_bytes); + let recorder = histogram.recorder(attributes); + for size in level_0_sizes { + recorder.record(size); + } let attributes = Attributes::from(&[("compaction_level", "1")]); - let recorder = gauge.recorder(attributes); - recorder.set(total_level_1_bytes); + let recorder = histogram.recorder(attributes); + for size in level_1_sizes { + recorder.record(size); + } } #[cfg(test)] @@ -304,9 +320,13 @@ mod tests { ColumnSet, CompactionLevel, NamespaceId, ParquetFileId, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, }; + use metric::{ObservationBucket, U64HistogramOptions}; use std::sync::Arc; use uuid::Uuid; + const BUCKET_500_KB: u64 = 500 * 1024; + const BUCKET_1_MB: u64 = 1024 * 1024; + #[test] fn test_overlaps_in_time() { assert_overlap((1, 3), (2, 4)); @@ -359,7 +379,7 @@ mod tests { ); } - fn metrics() -> (Metric, Metric) { + fn metrics() -> (Metric, Metric) { let registry = Arc::new(metric::Registry::new()); let parquet_file_candidate_gauge = registry.register_metric( @@ -367,15 +387,22 @@ mod tests { "Number of Parquet file candidates", ); - let parquet_file_candidate_bytes_gauge = registry.register_metric( + let parquet_file_candidate_bytes = registry.register_metric_with_options( "parquet_file_candidate_bytes", "Number of bytes of Parquet file candidates", + || { + U64HistogramOptions::new([ + BUCKET_500_KB, // 500 KB + BUCKET_1_MB, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]) + }, ); - ( - parquet_file_candidate_gauge, - parquet_file_candidate_bytes_gauge, - ) + (parquet_file_candidate_gauge, parquet_file_candidate_bytes) } mod hot { @@ -552,7 +579,7 @@ mod tests { .id(106) .min_time(340) .max_time(360) - .file_size_bytes(10) + .file_size_bytes(BUCKET_500_KB as i64 + 1) // exercise metrics .build(), // Does not overlap any level 0, times are too late ParquetFileBuilder::level_1() @@ -589,8 +616,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 10, - level_1: 20, + level_0_sample_count: 1, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], } ); @@ -619,8 +648,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 20, - level_1: 40, + level_0_sample_count: 2, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + level_1_sample_count: 4, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)], } ); @@ -649,8 +680,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 20, - level_1: 40, + level_0_sample_count: 2, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + level_1_sample_count: 4, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)], } ); @@ -678,8 +711,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 30, - level_1: 50, + level_0_sample_count: 3, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 3)], + level_1_sample_count: 5, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4), (BUCKET_1_MB, 1)], } ); @@ -708,8 +743,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 10, - level_1: 20, + level_0_sample_count: 1, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], } ); @@ -738,8 +775,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 10, - level_1: 20, + level_0_sample_count: 1, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], } ); @@ -768,8 +807,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 20, - level_1: 40, + level_0_sample_count: 2, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + level_1_sample_count: 4, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)], } ); } @@ -968,8 +1009,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 30, - level_1: 0, + level_0_sample_count: 3, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 3)], + level_1_sample_count: 0, + level_1_buckets_with_counts: vec![], } ); } @@ -1076,8 +1119,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 10, - level_1: 20, + level_0_sample_count: 1, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], } ); @@ -1105,8 +1150,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 20, - level_1: 40, + level_0_sample_count: 2, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + level_1_sample_count: 4, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)], } ); @@ -1134,8 +1181,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 20, - level_1: 40, + level_0_sample_count: 2, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + level_1_sample_count: 4, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 4)], } ); @@ -1161,8 +1210,10 @@ mod tests { assert_eq!( extract_byte_metrics(&bytes_metric), ExtractedByteMetrics { - level_0: 30, - level_1: 50, + level_0_sample_count: 3, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 3)], + level_1_sample_count: 5, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 5)], } ); } @@ -1304,21 +1355,42 @@ mod tests { #[derive(Debug, PartialEq)] struct ExtractedByteMetrics { - level_0: u64, - level_1: u64, + level_0_sample_count: u64, + level_0_buckets_with_counts: Vec<(u64, u64)>, + level_1_sample_count: u64, + level_1_buckets_with_counts: Vec<(u64, u64)>, } - fn extract_byte_metrics(metric: &Metric) -> ExtractedByteMetrics { + fn extract_byte_metrics(metric: &Metric) -> ExtractedByteMetrics { + let bucket_filter = |o: &ObservationBucket| { + if o.count == 0 { + None + } else { + Some((o.le, o.count)) + } + }; + let level_0 = metric .get_observer(&Attributes::from(&[("compaction_level", "0")])) .unwrap() .fetch(); + let mut level_0_buckets_with_counts: Vec<_> = + level_0.buckets.iter().filter_map(bucket_filter).collect(); + level_0_buckets_with_counts.sort(); let level_1 = metric .get_observer(&Attributes::from(&[("compaction_level", "1")])) .unwrap() .fetch(); + let mut level_1_buckets_with_counts: Vec<_> = + level_1.buckets.iter().filter_map(bucket_filter).collect(); + level_1_buckets_with_counts.sort(); - ExtractedByteMetrics { level_0, level_1 } + ExtractedByteMetrics { + level_0_sample_count: level_0.sample_count(), + level_0_buckets_with_counts, + level_1_sample_count: level_1.sample_count(), + level_1_buckets_with_counts, + } } } From af95ce7ca678f9bf657ecde86112b88f754c6d20 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 11 Aug 2022 13:36:48 -0400 Subject: [PATCH 3/5] feat: Add a histogram tracking sizes of files used as inputs to compaction Fixes #5348. --- compactor/src/compact.rs | 11 ++ compactor/src/lib.rs | 2 + compactor/src/parquet_file_combining.rs | 210 +++++++++++++++++++++--- 3 files changed, 204 insertions(+), 19 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index fa954e7a3c..975d385f27 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -113,6 +113,10 @@ pub struct Compactor { /// values have attributes for the compaction level of the file. pub(crate) parquet_file_candidate_bytes: Metric, + /// After a successful compaction operation, track the sizes of the files that were used as the + /// inputs of the compaction operation by compaction level. + pub(crate) compaction_input_file_bytes: Metric, + /// Histogram for tracking the time to compact a partition pub(crate) compaction_duration: Metric, @@ -182,6 +186,12 @@ impl Compactor { || file_size_buckets.clone(), ); + let compaction_input_file_bytes = registry.register_metric_with_options( + "compaction_input_file_bytes", + "Number of bytes of Parquet files used as inputs to a successful compaction operation", + || file_size_buckets.clone(), + ); + let duration_histogram_options = DurationHistogramOptions::new([ Duration::from_millis(100), Duration::from_millis(500), @@ -229,6 +239,7 @@ impl Compactor { compaction_candidate_gauge, parquet_file_candidate_gauge, parquet_file_candidate_bytes, + compaction_input_file_bytes, compaction_duration, candidate_selection_duration, partitions_extra_info_reading_duration, diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 4323cb0777..80d7a9e8f8 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -77,6 +77,7 @@ pub(crate) async fn compact_hot_partition( Arc::clone(&compactor.exec), Arc::clone(&compactor.time_provider), &compactor.compaction_counter, + &compactor.compaction_input_file_bytes, compactor.config.max_desired_file_size_bytes(), compactor.config.percentage_max_file_size(), compactor.config.split_percentage(), @@ -143,6 +144,7 @@ pub(crate) async fn compact_cold_partition( Arc::clone(&compactor.exec), Arc::clone(&compactor.time_provider), &compactor.compaction_counter, + &compactor.compaction_input_file_bytes, compactor.config.max_desired_file_size_bytes(), compactor.config.percentage_max_file_size(), compactor.config.split_percentage(), diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 49c4c89f9a..1380fd77f8 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -11,7 +11,7 @@ use iox_query::{ QueryChunk, }; use iox_time::TimeProvider; -use metric::{Attributes, Metric, U64Counter}; +use metric::{Attributes, Metric, U64Counter, U64Histogram}; use observability_deps::tracing::*; use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata, storage::ParquetStorage}; use schema::{sort::SortKey, Schema}; @@ -74,6 +74,8 @@ pub(crate) async fn compact_parquet_files( time_provider: Arc, // Counter for the number of files compacted compaction_counter: &Metric, + // Histogram for the sizes of the files compacted + compaction_input_file_bytes: &Metric, // Desired max size of compacted parquet files. // It is a target desired value, rather than a guarantee. max_desired_file_size_bytes: u64, @@ -104,12 +106,22 @@ pub(crate) async fn compact_parquet_files( let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum(); let total_size = total_size as u64; - let mut files_by_level = BTreeMap::new(); - for compaction_level in files.iter().map(|f| f.compaction_level) { - *files_by_level.entry(compaction_level).or_default() += 1; + // Compute the number of files per compaction level and collect the file sizes by compaction + // level for recording metrics if this compaction succeeds. + let mut num_files_by_level = BTreeMap::new(); + let mut file_sizes_by_level: BTreeMap> = BTreeMap::new(); + for (compaction_level, file_size_bytes) in files + .iter() + .map(|f| (f.compaction_level, f.file_size_bytes)) + { + *num_files_by_level.entry(compaction_level).or_default() += 1; + let sizes = file_sizes_by_level.entry(compaction_level).or_default(); + sizes.push(file_size_bytes as u64); } - let num_level_0 = files_by_level.get(&CompactionLevel::Initial).unwrap_or(&0); - let num_level_1 = files_by_level + let num_level_0 = num_files_by_level + .get(&CompactionLevel::Initial) + .unwrap_or(&0); + let num_level_1 = num_files_by_level .get(&CompactionLevel::FileNonOverlapped) .unwrap_or(&0); debug!( @@ -319,11 +331,18 @@ pub(crate) async fn compact_parquet_files( format!("{}", partition.sequencer_id()).into(), )]); - for (compaction_level, file_count) in files_by_level { + for (compaction_level, file_count) in num_files_by_level { let mut attributes = attributes.clone(); attributes.insert("compaction_level", format!("{}", compaction_level as i32)); - let compaction_counter = compaction_counter.recorder(attributes); + let compaction_counter = compaction_counter.recorder(attributes.clone()); compaction_counter.inc(file_count); + + let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes); + if let Some(sizes) = file_sizes_by_level.get(&compaction_level) { + for &size in sizes { + compaction_input_file_bytes.record(size); + } + } } Ok(()) @@ -464,8 +483,9 @@ mod tests { use super::*; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_sorted_eq; - use data_types::{ColumnType, PartitionParam}; + use data_types::{ColumnType, PartitionParam, SequencerId}; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; + use metric::{ObservationBucket, U64HistogramOptions}; use parquet_file::ParquetFilePath; use test_helpers::assert_error; @@ -487,6 +507,7 @@ mod tests { const DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES: u64 = 100 * 1024 * 1024; const DEFAULT_PERCENTAGE_MAX_FILE_SIZE: u16 = 30; const DEFAULT_SPLIT_PERCENTAGE: u16 = 80; + const BUCKET_500_KB: u64 = 500 * 1024; struct TestSetup { catalog: Arc, @@ -616,11 +637,28 @@ mod tests { } } - fn metrics() -> Metric { + fn metrics() -> (Metric, Metric) { let registry = Arc::new(metric::Registry::new()); - registry.register_metric( - "compactor_compacted_files_total", - "counter for the number of files compacted", + ( + registry.register_metric( + "compactor_compacted_files_total", + "counter for the number of files compacted", + ), + registry.register_metric_with_options( + "compaction_input_file_bytes", + "Number of bytes of Parquet files used as inputs to a successful compaction \ + operation", + || { + U64HistogramOptions::new([ + BUCKET_500_KB, // 500 KB + 1024 * 1024, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]) + }, + ), ) } @@ -633,7 +671,8 @@ mod tests { candidate_partition, .. } = test_setup().await; - let compaction_counter = metrics(); + let (compaction_counter, compaction_input_file_bytes) = metrics(); + let sequencer_id = candidate_partition.sequencer_id(); let files = vec![]; let result = compact_parquet_files( @@ -644,12 +683,24 @@ mod tests { Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, &compaction_counter, + &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, DEFAULT_SPLIT_PERCENTAGE, ) .await; assert_error!(result, Error::NotEnoughParquetFiles { num_files: 0, .. }); + + // No metrics recorded because the compaction didn't succeed + assert_eq!( + extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), + ExtractedByteMetrics { + level_0_sample_count: 0, + level_0_buckets_with_counts: vec![], + level_1_sample_count: 0, + level_1_buckets_with_counts: vec![], + } + ); } #[tokio::test] @@ -663,7 +714,8 @@ mod tests { .. } = test_setup().await; let table_id = candidate_partition.table_id(); - let compaction_counter = metrics(); + let (compaction_counter, compaction_input_file_bytes) = metrics(); + let sequencer_id = candidate_partition.sequencer_id(); let parquet_file = parquet_files.remove(0); compact_parquet_files( @@ -674,6 +726,7 @@ mod tests { Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, &compaction_counter, + &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, DEFAULT_SPLIT_PERCENTAGE, @@ -703,6 +756,17 @@ mod tests { (7, CompactionLevel::FileNonOverlapped), ] ); + + // Verify the metrics + assert_eq!( + extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), + ExtractedByteMetrics { + level_0_sample_count: 1, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)], + level_1_sample_count: 0, + level_1_buckets_with_counts: vec![], + } + ); } #[tokio::test] @@ -715,7 +779,8 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let compaction_counter = metrics(); + let (compaction_counter, compaction_input_file_bytes) = metrics(); + let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( parquet_files.into_iter().take(4).collect(), @@ -725,6 +790,7 @@ mod tests { Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, &compaction_counter, + &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, DEFAULT_SPLIT_PERCENTAGE, @@ -754,6 +820,17 @@ mod tests { ] ); + // Verify the metrics + assert_eq!( + extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), + ExtractedByteMetrics { + level_0_sample_count: 2, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + } + ); + // ------------------------------------------------ // Verify the parquet file content @@ -789,7 +866,8 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let compaction_counter = metrics(); + let (compaction_counter, compaction_input_file_bytes) = metrics(); + let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( parquet_files.into_iter().take(5).collect(), @@ -799,6 +877,7 @@ mod tests { Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, &compaction_counter, + &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, DEFAULT_SPLIT_PERCENTAGE, @@ -827,6 +906,17 @@ mod tests { ] ); + // Verify the metrics + assert_eq!( + extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), + ExtractedByteMetrics { + level_0_sample_count: 3, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2), (u64::MAX, 1)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + } + ); + // ------------------------------------------------ // Verify the parquet file content @@ -875,7 +965,8 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let compaction_counter = metrics(); + let (compaction_counter, compaction_input_file_bytes) = metrics(); + let sequencer_id = candidate_partition.sequencer_id(); let files_to_compact: Vec<_> = parquet_files.into_iter().take(5).collect(); @@ -891,6 +982,7 @@ mod tests { Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, &compaction_counter, + &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, split_percentage, @@ -919,6 +1011,17 @@ mod tests { ] ); + // Verify the metrics + assert_eq!( + extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), + ExtractedByteMetrics { + level_0_sample_count: 3, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2), (u64::MAX, 1)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + } + ); + // ------------------------------------------------ // Verify the parquet file content @@ -954,7 +1057,8 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let compaction_counter = metrics(); + let (compaction_counter, compaction_input_file_bytes) = metrics(); + let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( parquet_files, @@ -964,6 +1068,7 @@ mod tests { Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, &compaction_counter, + &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, DEFAULT_SPLIT_PERCENTAGE, @@ -990,6 +1095,17 @@ mod tests { ] ); + // Verify the metrics + assert_eq!( + extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), + ExtractedByteMetrics { + level_0_sample_count: 4, + level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2), (u64::MAX, 2)], + level_1_sample_count: 2, + level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + } + ); + // ------------------------------------------------ // Verify the parquet file content @@ -1063,4 +1179,60 @@ mod tests { .await .unwrap() } + + #[derive(Debug, PartialEq)] + struct ExtractedByteMetrics { + level_0_sample_count: u64, + level_0_buckets_with_counts: Vec<(u64, u64)>, + level_1_sample_count: u64, + level_1_buckets_with_counts: Vec<(u64, u64)>, + } + + fn extract_byte_metrics( + metric: &Metric, + sequencer_id: SequencerId, + ) -> ExtractedByteMetrics { + let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]); + + let bucket_filter = |o: &ObservationBucket| { + if o.count == 0 { + None + } else { + Some((o.le, o.count)) + } + }; + + let mut level_0_attributes = attributes.clone(); + level_0_attributes.insert("compaction_level", "0"); + let (level_0_sample_count, level_0_buckets_with_counts) = + if let Some(level_0) = metric.get_observer(&level_0_attributes) { + let level_0 = level_0.fetch(); + let mut level_0_buckets_with_counts: Vec<_> = + level_0.buckets.iter().filter_map(bucket_filter).collect(); + level_0_buckets_with_counts.sort(); + (level_0.sample_count(), level_0_buckets_with_counts) + } else { + (0, vec![]) + }; + + let mut level_1_attributes = attributes; + level_1_attributes.insert("compaction_level", "1"); + let (level_1_sample_count, level_1_buckets_with_counts) = + if let Some(level_1) = metric.get_observer(&level_1_attributes) { + let level_1 = level_1.fetch(); + let mut level_1_buckets_with_counts: Vec<_> = + level_1.buckets.iter().filter_map(bucket_filter).collect(); + level_1_buckets_with_counts.sort(); + (level_1.sample_count(), level_1_buckets_with_counts) + } else { + (0, vec![]) + }; + + ExtractedByteMetrics { + level_0_sample_count, + level_0_buckets_with_counts, + level_1_sample_count, + level_1_buckets_with_counts, + } + } } From a9ed32df89ebc9982d428ff4d243c5036515bfd1 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 15 Aug 2022 10:23:29 -0400 Subject: [PATCH 4/5] fix: Remove compaction_counter as it's now redundant with the compaction_input_file_bytes histogram --- compactor/src/compact.rs | 13 +---- compactor/src/lib.rs | 2 - compactor/src/parquet_file_combining.rs | 72 ++++++++++--------------- 3 files changed, 29 insertions(+), 58 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 975d385f27..e897360922 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -10,8 +10,8 @@ use iox_catalog::interface::{get_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::TimeProvider; use metric::{ - Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Counter, U64Gauge, - U64Histogram, U64HistogramOptions, DURATION_MAX, + Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Gauge, U64Histogram, + U64HistogramOptions, DURATION_MAX, }; use observability_deps::tracing::debug; use parquet_file::storage::ParquetStorage; @@ -98,9 +98,6 @@ pub struct Compactor { /// Configuration options for the compactor pub(crate) config: CompactorConfig, - /// Counter for the number of files compacted - pub(crate) compaction_counter: Metric, - /// Gauge for the number of compaction partition candidates before filtering compaction_candidate_gauge: Metric, @@ -156,11 +153,6 @@ impl Compactor { config: CompactorConfig, registry: Arc, ) -> Self { - let compaction_counter = registry.register_metric( - "compactor_compacted_files_total", - "counter for the number of files compacted", - ); - let compaction_candidate_gauge = registry.register_metric( "compactor_candidates", "gauge for the number of compaction candidates that are found when checked", @@ -235,7 +227,6 @@ impl Compactor { time_provider, backoff_config, config, - compaction_counter, compaction_candidate_gauge, parquet_file_candidate_gauge, parquet_file_candidate_bytes, diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 80d7a9e8f8..117c7e6d4e 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -76,7 +76,6 @@ pub(crate) async fn compact_hot_partition( compactor.store.clone(), Arc::clone(&compactor.exec), Arc::clone(&compactor.time_provider), - &compactor.compaction_counter, &compactor.compaction_input_file_bytes, compactor.config.max_desired_file_size_bytes(), compactor.config.percentage_max_file_size(), @@ -143,7 +142,6 @@ pub(crate) async fn compact_cold_partition( compactor.store.clone(), Arc::clone(&compactor.exec), Arc::clone(&compactor.time_provider), - &compactor.compaction_counter, &compactor.compaction_input_file_bytes, compactor.config.max_desired_file_size_bytes(), compactor.config.percentage_max_file_size(), diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 1380fd77f8..d1728408cf 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -11,7 +11,7 @@ use iox_query::{ QueryChunk, }; use iox_time::TimeProvider; -use metric::{Attributes, Metric, U64Counter, U64Histogram}; +use metric::{Attributes, Metric, U64Histogram}; use observability_deps::tracing::*; use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata, storage::ParquetStorage}; use schema::{sort::SortKey, Schema}; @@ -72,8 +72,6 @@ pub(crate) async fn compact_parquet_files( // Executor for running queries, compacting, and persisting exec: Arc, time_provider: Arc, - // Counter for the number of files compacted - compaction_counter: &Metric, // Histogram for the sizes of the files compacted compaction_input_file_bytes: &Metric, // Desired max size of compacted parquet files. @@ -106,8 +104,8 @@ pub(crate) async fn compact_parquet_files( let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum(); let total_size = total_size as u64; - // Compute the number of files per compaction level and collect the file sizes by compaction - // level for recording metrics if this compaction succeeds. + // Compute the number of files per compaction level for logging and collect the file sizes by + // compaction level for recording metrics if this compaction succeeds. let mut num_files_by_level = BTreeMap::new(); let mut file_sizes_by_level: BTreeMap> = BTreeMap::new(); for (compaction_level, file_size_bytes) in files @@ -331,17 +329,13 @@ pub(crate) async fn compact_parquet_files( format!("{}", partition.sequencer_id()).into(), )]); - for (compaction_level, file_count) in num_files_by_level { + for (compaction_level, file_sizes) in file_sizes_by_level { let mut attributes = attributes.clone(); attributes.insert("compaction_level", format!("{}", compaction_level as i32)); - let compaction_counter = compaction_counter.recorder(attributes.clone()); - compaction_counter.inc(file_count); let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes); - if let Some(sizes) = file_sizes_by_level.get(&compaction_level) { - for &size in sizes { - compaction_input_file_bytes.record(size); - } + for size in file_sizes { + compaction_input_file_bytes.record(size); } } @@ -637,28 +631,22 @@ mod tests { } } - fn metrics() -> (Metric, Metric) { + fn metrics() -> Metric { let registry = Arc::new(metric::Registry::new()); - ( - registry.register_metric( - "compactor_compacted_files_total", - "counter for the number of files compacted", - ), - registry.register_metric_with_options( - "compaction_input_file_bytes", - "Number of bytes of Parquet files used as inputs to a successful compaction \ - operation", - || { - U64HistogramOptions::new([ - BUCKET_500_KB, // 500 KB - 1024 * 1024, // 1 MB - 3 * 1024 * 1024, // 3 MB - 10 * 1024 * 1024, // 10 MB - 30 * 1024 * 1024, // 30 MB - u64::MAX, // Inf - ]) - }, - ), + registry.register_metric_with_options( + "compaction_input_file_bytes", + "Number of bytes of Parquet files used as inputs to a successful compaction \ + operation", + || { + U64HistogramOptions::new([ + BUCKET_500_KB, // 500 KB + 1024 * 1024, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]) + }, ) } @@ -671,7 +659,7 @@ mod tests { candidate_partition, .. } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); let files = vec![]; @@ -682,7 +670,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -714,7 +701,7 @@ mod tests { .. } = test_setup().await; let table_id = candidate_partition.table_id(); - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); let parquet_file = parquet_files.remove(0); @@ -725,7 +712,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -779,7 +765,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( @@ -789,7 +775,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -866,7 +851,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( @@ -876,7 +861,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -965,7 +949,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); let files_to_compact: Vec<_> = parquet_files.into_iter().take(5).collect(); @@ -981,7 +965,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -1057,7 +1040,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( @@ -1067,7 +1050,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, From ef716a5b9032b673f037490ad537a0bf150667fd Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 15 Aug 2022 10:50:04 -0400 Subject: [PATCH 5/5] fix: Remove compaction level attribute from the compaction_input_file_bytes metric --- compactor/src/parquet_file_combining.rs | 123 ++++++++---------------- 1 file changed, 41 insertions(+), 82 deletions(-) diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index d1728408cf..97a92b70e2 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -99,22 +99,17 @@ pub(crate) async fn compact_parquet_files( } ); + // Save all file sizes for recording metrics if this compaction succeeds. + let file_sizes: Vec<_> = files.iter().map(|f| f.file_size_bytes).collect(); // Find the total size of all files, to be used to determine if the result should be one file // or if the result should be split into multiple files. - let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum(); + let total_size: i64 = file_sizes.iter().sum(); let total_size = total_size as u64; - // Compute the number of files per compaction level for logging and collect the file sizes by - // compaction level for recording metrics if this compaction succeeds. + // Compute the number of files per compaction level for logging let mut num_files_by_level = BTreeMap::new(); - let mut file_sizes_by_level: BTreeMap> = BTreeMap::new(); - for (compaction_level, file_size_bytes) in files - .iter() - .map(|f| (f.compaction_level, f.file_size_bytes)) - { + for compaction_level in files.iter().map(|f| f.compaction_level) { *num_files_by_level.entry(compaction_level).or_default() += 1; - let sizes = file_sizes_by_level.entry(compaction_level).or_default(); - sizes.push(file_size_bytes as u64); } let num_level_0 = num_files_by_level .get(&CompactionLevel::Initial) @@ -328,15 +323,9 @@ pub(crate) async fn compact_parquet_files( "sequencer_id", format!("{}", partition.sequencer_id()).into(), )]); - - for (compaction_level, file_sizes) in file_sizes_by_level { - let mut attributes = attributes.clone(); - attributes.insert("compaction_level", format!("{}", compaction_level as i32)); - - let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes); - for size in file_sizes { - compaction_input_file_bytes.record(size); - } + let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes); + for size in file_sizes { + compaction_input_file_bytes.record(size as u64); } Ok(()) @@ -479,7 +468,7 @@ mod tests { use arrow_util::assert_batches_sorted_eq; use data_types::{ColumnType, PartitionParam, SequencerId}; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; - use metric::{ObservationBucket, U64HistogramOptions}; + use metric::U64HistogramOptions; use parquet_file::ParquetFilePath; use test_helpers::assert_error; @@ -682,10 +671,8 @@ mod tests { assert_eq!( extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), ExtractedByteMetrics { - level_0_sample_count: 0, - level_0_buckets_with_counts: vec![], - level_1_sample_count: 0, - level_1_buckets_with_counts: vec![], + sample_count: 0, + buckets_with_counts: vec![], } ); } @@ -747,10 +734,8 @@ mod tests { assert_eq!( extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), ExtractedByteMetrics { - level_0_sample_count: 1, - level_0_buckets_with_counts: vec![(BUCKET_500_KB, 1)], - level_1_sample_count: 0, - level_1_buckets_with_counts: vec![], + sample_count: 1, + buckets_with_counts: vec![(BUCKET_500_KB, 1)], } ); } @@ -809,10 +794,8 @@ mod tests { assert_eq!( extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), ExtractedByteMetrics { - level_0_sample_count: 2, - level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2)], - level_1_sample_count: 2, - level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + sample_count: 4, + buckets_with_counts: vec![(BUCKET_500_KB, 4)], } ); @@ -894,10 +877,8 @@ mod tests { assert_eq!( extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), ExtractedByteMetrics { - level_0_sample_count: 3, - level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2), (u64::MAX, 1)], - level_1_sample_count: 2, - level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + sample_count: 5, + buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 1)], } ); @@ -998,10 +979,8 @@ mod tests { assert_eq!( extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), ExtractedByteMetrics { - level_0_sample_count: 3, - level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2), (u64::MAX, 1)], - level_1_sample_count: 2, - level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + sample_count: 5, + buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 1)], } ); @@ -1081,10 +1060,8 @@ mod tests { assert_eq!( extract_byte_metrics(&compaction_input_file_bytes, sequencer_id), ExtractedByteMetrics { - level_0_sample_count: 4, - level_0_buckets_with_counts: vec![(BUCKET_500_KB, 2), (u64::MAX, 2)], - level_1_sample_count: 2, - level_1_buckets_with_counts: vec![(BUCKET_500_KB, 2)], + sample_count: 6, + buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 2)], } ); @@ -1164,10 +1141,8 @@ mod tests { #[derive(Debug, PartialEq)] struct ExtractedByteMetrics { - level_0_sample_count: u64, - level_0_buckets_with_counts: Vec<(u64, u64)>, - level_1_sample_count: u64, - level_1_buckets_with_counts: Vec<(u64, u64)>, + sample_count: u64, + buckets_with_counts: Vec<(u64, u64)>, } fn extract_byte_metrics( @@ -1176,45 +1151,29 @@ mod tests { ) -> ExtractedByteMetrics { let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]); - let bucket_filter = |o: &ObservationBucket| { - if o.count == 0 { - None - } else { - Some((o.le, o.count)) - } - }; - - let mut level_0_attributes = attributes.clone(); - level_0_attributes.insert("compaction_level", "0"); - let (level_0_sample_count, level_0_buckets_with_counts) = - if let Some(level_0) = metric.get_observer(&level_0_attributes) { - let level_0 = level_0.fetch(); - let mut level_0_buckets_with_counts: Vec<_> = - level_0.buckets.iter().filter_map(bucket_filter).collect(); - level_0_buckets_with_counts.sort(); - (level_0.sample_count(), level_0_buckets_with_counts) - } else { - (0, vec![]) - }; - - let mut level_1_attributes = attributes; - level_1_attributes.insert("compaction_level", "1"); - let (level_1_sample_count, level_1_buckets_with_counts) = - if let Some(level_1) = metric.get_observer(&level_1_attributes) { - let level_1 = level_1.fetch(); - let mut level_1_buckets_with_counts: Vec<_> = - level_1.buckets.iter().filter_map(bucket_filter).collect(); - level_1_buckets_with_counts.sort(); - (level_1.sample_count(), level_1_buckets_with_counts) + let (sample_count, buckets_with_counts) = + if let Some(observer) = metric.get_observer(&attributes) { + let observer = observer.fetch(); + let mut buckets_with_counts: Vec<_> = observer + .buckets + .iter() + .filter_map(|o| { + if o.count == 0 { + None + } else { + Some((o.le, o.count)) + } + }) + .collect(); + buckets_with_counts.sort(); + (observer.sample_count(), buckets_with_counts) } else { (0, vec![]) }; ExtractedByteMetrics { - level_0_sample_count, - level_0_buckets_with_counts, - level_1_sample_count, - level_1_buckets_with_counts, + sample_count, + buckets_with_counts, } } }