diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 975d385f27..e897360922 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -10,8 +10,8 @@ use iox_catalog::interface::{get_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::TimeProvider; use metric::{ - Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Counter, U64Gauge, - U64Histogram, U64HistogramOptions, DURATION_MAX, + Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Gauge, U64Histogram, + U64HistogramOptions, DURATION_MAX, }; use observability_deps::tracing::debug; use parquet_file::storage::ParquetStorage; @@ -98,9 +98,6 @@ pub struct Compactor { /// Configuration options for the compactor pub(crate) config: CompactorConfig, - /// Counter for the number of files compacted - pub(crate) compaction_counter: Metric, - /// Gauge for the number of compaction partition candidates before filtering compaction_candidate_gauge: Metric, @@ -156,11 +153,6 @@ impl Compactor { config: CompactorConfig, registry: Arc, ) -> Self { - let compaction_counter = registry.register_metric( - "compactor_compacted_files_total", - "counter for the number of files compacted", - ); - let compaction_candidate_gauge = registry.register_metric( "compactor_candidates", "gauge for the number of compaction candidates that are found when checked", @@ -235,7 +227,6 @@ impl Compactor { time_provider, backoff_config, config, - compaction_counter, compaction_candidate_gauge, parquet_file_candidate_gauge, parquet_file_candidate_bytes, diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 80d7a9e8f8..117c7e6d4e 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -76,7 +76,6 @@ pub(crate) async fn compact_hot_partition( compactor.store.clone(), Arc::clone(&compactor.exec), Arc::clone(&compactor.time_provider), - &compactor.compaction_counter, &compactor.compaction_input_file_bytes, compactor.config.max_desired_file_size_bytes(), compactor.config.percentage_max_file_size(), @@ -143,7 +142,6 @@ pub(crate) async fn compact_cold_partition( compactor.store.clone(), Arc::clone(&compactor.exec), Arc::clone(&compactor.time_provider), - &compactor.compaction_counter, &compactor.compaction_input_file_bytes, compactor.config.max_desired_file_size_bytes(), compactor.config.percentage_max_file_size(), diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 1380fd77f8..d1728408cf 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -11,7 +11,7 @@ use iox_query::{ QueryChunk, }; use iox_time::TimeProvider; -use metric::{Attributes, Metric, U64Counter, U64Histogram}; +use metric::{Attributes, Metric, U64Histogram}; use observability_deps::tracing::*; use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata, storage::ParquetStorage}; use schema::{sort::SortKey, Schema}; @@ -72,8 +72,6 @@ pub(crate) async fn compact_parquet_files( // Executor for running queries, compacting, and persisting exec: Arc, time_provider: Arc, - // Counter for the number of files compacted - compaction_counter: &Metric, // Histogram for the sizes of the files compacted compaction_input_file_bytes: &Metric, // Desired max size of compacted parquet files. @@ -106,8 +104,8 @@ pub(crate) async fn compact_parquet_files( let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum(); let total_size = total_size as u64; - // Compute the number of files per compaction level and collect the file sizes by compaction - // level for recording metrics if this compaction succeeds. + // Compute the number of files per compaction level for logging and collect the file sizes by + // compaction level for recording metrics if this compaction succeeds. let mut num_files_by_level = BTreeMap::new(); let mut file_sizes_by_level: BTreeMap> = BTreeMap::new(); for (compaction_level, file_size_bytes) in files @@ -331,17 +329,13 @@ pub(crate) async fn compact_parquet_files( format!("{}", partition.sequencer_id()).into(), )]); - for (compaction_level, file_count) in num_files_by_level { + for (compaction_level, file_sizes) in file_sizes_by_level { let mut attributes = attributes.clone(); attributes.insert("compaction_level", format!("{}", compaction_level as i32)); - let compaction_counter = compaction_counter.recorder(attributes.clone()); - compaction_counter.inc(file_count); let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes); - if let Some(sizes) = file_sizes_by_level.get(&compaction_level) { - for &size in sizes { - compaction_input_file_bytes.record(size); - } + for size in file_sizes { + compaction_input_file_bytes.record(size); } } @@ -637,28 +631,22 @@ mod tests { } } - fn metrics() -> (Metric, Metric) { + fn metrics() -> Metric { let registry = Arc::new(metric::Registry::new()); - ( - registry.register_metric( - "compactor_compacted_files_total", - "counter for the number of files compacted", - ), - registry.register_metric_with_options( - "compaction_input_file_bytes", - "Number of bytes of Parquet files used as inputs to a successful compaction \ - operation", - || { - U64HistogramOptions::new([ - BUCKET_500_KB, // 500 KB - 1024 * 1024, // 1 MB - 3 * 1024 * 1024, // 3 MB - 10 * 1024 * 1024, // 10 MB - 30 * 1024 * 1024, // 30 MB - u64::MAX, // Inf - ]) - }, - ), + registry.register_metric_with_options( + "compaction_input_file_bytes", + "Number of bytes of Parquet files used as inputs to a successful compaction \ + operation", + || { + U64HistogramOptions::new([ + BUCKET_500_KB, // 500 KB + 1024 * 1024, // 1 MB + 3 * 1024 * 1024, // 3 MB + 10 * 1024 * 1024, // 10 MB + 30 * 1024 * 1024, // 30 MB + u64::MAX, // Inf + ]) + }, ) } @@ -671,7 +659,7 @@ mod tests { candidate_partition, .. } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); let files = vec![]; @@ -682,7 +670,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -714,7 +701,7 @@ mod tests { .. } = test_setup().await; let table_id = candidate_partition.table_id(); - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); let parquet_file = parquet_files.remove(0); @@ -725,7 +712,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -779,7 +765,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( @@ -789,7 +775,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -866,7 +851,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( @@ -876,7 +861,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -965,7 +949,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); let files_to_compact: Vec<_> = parquet_files.into_iter().take(5).collect(); @@ -981,7 +965,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE, @@ -1057,7 +1040,7 @@ mod tests { candidate_partition, parquet_files, } = test_setup().await; - let (compaction_counter, compaction_input_file_bytes) = metrics(); + let compaction_input_file_bytes = metrics(); let sequencer_id = candidate_partition.sequencer_id(); compact_parquet_files( @@ -1067,7 +1050,6 @@ mod tests { ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, - &compaction_counter, &compaction_input_file_bytes, DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES, DEFAULT_PERCENTAGE_MAX_FILE_SIZE,