fix: Remove compaction_counter as it's now redundant with the compaction_input_file_bytes histogram
parent
af95ce7ca6
commit
a9ed32df89
|
@ -10,8 +10,8 @@ use iox_catalog::interface::{get_schema_by_id, Catalog};
|
|||
use iox_query::exec::Executor;
|
||||
use iox_time::TimeProvider;
|
||||
use metric::{
|
||||
Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Counter, U64Gauge,
|
||||
U64Histogram, U64HistogramOptions, DURATION_MAX,
|
||||
Attributes, DurationHistogram, DurationHistogramOptions, Metric, U64Gauge, U64Histogram,
|
||||
U64HistogramOptions, DURATION_MAX,
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
|
@ -98,9 +98,6 @@ pub struct Compactor {
|
|||
/// Configuration options for the compactor
|
||||
pub(crate) config: CompactorConfig,
|
||||
|
||||
/// Counter for the number of files compacted
|
||||
pub(crate) compaction_counter: Metric<U64Counter>,
|
||||
|
||||
/// Gauge for the number of compaction partition candidates before filtering
|
||||
compaction_candidate_gauge: Metric<U64Gauge>,
|
||||
|
||||
|
@ -156,11 +153,6 @@ impl Compactor {
|
|||
config: CompactorConfig,
|
||||
registry: Arc<metric::Registry>,
|
||||
) -> Self {
|
||||
let compaction_counter = registry.register_metric(
|
||||
"compactor_compacted_files_total",
|
||||
"counter for the number of files compacted",
|
||||
);
|
||||
|
||||
let compaction_candidate_gauge = registry.register_metric(
|
||||
"compactor_candidates",
|
||||
"gauge for the number of compaction candidates that are found when checked",
|
||||
|
@ -235,7 +227,6 @@ impl Compactor {
|
|||
time_provider,
|
||||
backoff_config,
|
||||
config,
|
||||
compaction_counter,
|
||||
compaction_candidate_gauge,
|
||||
parquet_file_candidate_gauge,
|
||||
parquet_file_candidate_bytes,
|
||||
|
|
|
@ -76,7 +76,6 @@ pub(crate) async fn compact_hot_partition(
|
|||
compactor.store.clone(),
|
||||
Arc::clone(&compactor.exec),
|
||||
Arc::clone(&compactor.time_provider),
|
||||
&compactor.compaction_counter,
|
||||
&compactor.compaction_input_file_bytes,
|
||||
compactor.config.max_desired_file_size_bytes(),
|
||||
compactor.config.percentage_max_file_size(),
|
||||
|
@ -143,7 +142,6 @@ pub(crate) async fn compact_cold_partition(
|
|||
compactor.store.clone(),
|
||||
Arc::clone(&compactor.exec),
|
||||
Arc::clone(&compactor.time_provider),
|
||||
&compactor.compaction_counter,
|
||||
&compactor.compaction_input_file_bytes,
|
||||
compactor.config.max_desired_file_size_bytes(),
|
||||
compactor.config.percentage_max_file_size(),
|
||||
|
|
|
@ -11,7 +11,7 @@ use iox_query::{
|
|||
QueryChunk,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use metric::{Attributes, Metric, U64Counter, U64Histogram};
|
||||
use metric::{Attributes, Metric, U64Histogram};
|
||||
use observability_deps::tracing::*;
|
||||
use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata, storage::ParquetStorage};
|
||||
use schema::{sort::SortKey, Schema};
|
||||
|
@ -72,8 +72,6 @@ pub(crate) async fn compact_parquet_files(
|
|||
// Executor for running queries, compacting, and persisting
|
||||
exec: Arc<Executor>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
// Counter for the number of files compacted
|
||||
compaction_counter: &Metric<U64Counter>,
|
||||
// Histogram for the sizes of the files compacted
|
||||
compaction_input_file_bytes: &Metric<U64Histogram>,
|
||||
// Desired max size of compacted parquet files.
|
||||
|
@ -106,8 +104,8 @@ pub(crate) async fn compact_parquet_files(
|
|||
let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum();
|
||||
let total_size = total_size as u64;
|
||||
|
||||
// Compute the number of files per compaction level and collect the file sizes by compaction
|
||||
// level for recording metrics if this compaction succeeds.
|
||||
// Compute the number of files per compaction level for logging and collect the file sizes by
|
||||
// compaction level for recording metrics if this compaction succeeds.
|
||||
let mut num_files_by_level = BTreeMap::new();
|
||||
let mut file_sizes_by_level: BTreeMap<CompactionLevel, Vec<u64>> = BTreeMap::new();
|
||||
for (compaction_level, file_size_bytes) in files
|
||||
|
@ -331,17 +329,13 @@ pub(crate) async fn compact_parquet_files(
|
|||
format!("{}", partition.sequencer_id()).into(),
|
||||
)]);
|
||||
|
||||
for (compaction_level, file_count) in num_files_by_level {
|
||||
for (compaction_level, file_sizes) in file_sizes_by_level {
|
||||
let mut attributes = attributes.clone();
|
||||
attributes.insert("compaction_level", format!("{}", compaction_level as i32));
|
||||
let compaction_counter = compaction_counter.recorder(attributes.clone());
|
||||
compaction_counter.inc(file_count);
|
||||
|
||||
let compaction_input_file_bytes = compaction_input_file_bytes.recorder(attributes);
|
||||
if let Some(sizes) = file_sizes_by_level.get(&compaction_level) {
|
||||
for &size in sizes {
|
||||
compaction_input_file_bytes.record(size);
|
||||
}
|
||||
for size in file_sizes {
|
||||
compaction_input_file_bytes.record(size);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -637,28 +631,22 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn metrics() -> (Metric<U64Counter>, Metric<U64Histogram>) {
|
||||
fn metrics() -> Metric<U64Histogram> {
|
||||
let registry = Arc::new(metric::Registry::new());
|
||||
(
|
||||
registry.register_metric(
|
||||
"compactor_compacted_files_total",
|
||||
"counter for the number of files compacted",
|
||||
),
|
||||
registry.register_metric_with_options(
|
||||
"compaction_input_file_bytes",
|
||||
"Number of bytes of Parquet files used as inputs to a successful compaction \
|
||||
operation",
|
||||
|| {
|
||||
U64HistogramOptions::new([
|
||||
BUCKET_500_KB, // 500 KB
|
||||
1024 * 1024, // 1 MB
|
||||
3 * 1024 * 1024, // 3 MB
|
||||
10 * 1024 * 1024, // 10 MB
|
||||
30 * 1024 * 1024, // 30 MB
|
||||
u64::MAX, // Inf
|
||||
])
|
||||
},
|
||||
),
|
||||
registry.register_metric_with_options(
|
||||
"compaction_input_file_bytes",
|
||||
"Number of bytes of Parquet files used as inputs to a successful compaction \
|
||||
operation",
|
||||
|| {
|
||||
U64HistogramOptions::new([
|
||||
BUCKET_500_KB, // 500 KB
|
||||
1024 * 1024, // 1 MB
|
||||
3 * 1024 * 1024, // 3 MB
|
||||
10 * 1024 * 1024, // 10 MB
|
||||
30 * 1024 * 1024, // 30 MB
|
||||
u64::MAX, // Inf
|
||||
])
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -671,7 +659,7 @@ mod tests {
|
|||
candidate_partition,
|
||||
..
|
||||
} = test_setup().await;
|
||||
let (compaction_counter, compaction_input_file_bytes) = metrics();
|
||||
let compaction_input_file_bytes = metrics();
|
||||
let sequencer_id = candidate_partition.sequencer_id();
|
||||
|
||||
let files = vec![];
|
||||
|
@ -682,7 +670,6 @@ mod tests {
|
|||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_counter,
|
||||
&compaction_input_file_bytes,
|
||||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
|
@ -714,7 +701,7 @@ mod tests {
|
|||
..
|
||||
} = test_setup().await;
|
||||
let table_id = candidate_partition.table_id();
|
||||
let (compaction_counter, compaction_input_file_bytes) = metrics();
|
||||
let compaction_input_file_bytes = metrics();
|
||||
let sequencer_id = candidate_partition.sequencer_id();
|
||||
|
||||
let parquet_file = parquet_files.remove(0);
|
||||
|
@ -725,7 +712,6 @@ mod tests {
|
|||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_counter,
|
||||
&compaction_input_file_bytes,
|
||||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
|
@ -779,7 +765,7 @@ mod tests {
|
|||
candidate_partition,
|
||||
parquet_files,
|
||||
} = test_setup().await;
|
||||
let (compaction_counter, compaction_input_file_bytes) = metrics();
|
||||
let compaction_input_file_bytes = metrics();
|
||||
let sequencer_id = candidate_partition.sequencer_id();
|
||||
|
||||
compact_parquet_files(
|
||||
|
@ -789,7 +775,6 @@ mod tests {
|
|||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_counter,
|
||||
&compaction_input_file_bytes,
|
||||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
|
@ -866,7 +851,7 @@ mod tests {
|
|||
candidate_partition,
|
||||
parquet_files,
|
||||
} = test_setup().await;
|
||||
let (compaction_counter, compaction_input_file_bytes) = metrics();
|
||||
let compaction_input_file_bytes = metrics();
|
||||
let sequencer_id = candidate_partition.sequencer_id();
|
||||
|
||||
compact_parquet_files(
|
||||
|
@ -876,7 +861,6 @@ mod tests {
|
|||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_counter,
|
||||
&compaction_input_file_bytes,
|
||||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
|
@ -965,7 +949,7 @@ mod tests {
|
|||
candidate_partition,
|
||||
parquet_files,
|
||||
} = test_setup().await;
|
||||
let (compaction_counter, compaction_input_file_bytes) = metrics();
|
||||
let compaction_input_file_bytes = metrics();
|
||||
let sequencer_id = candidate_partition.sequencer_id();
|
||||
|
||||
let files_to_compact: Vec<_> = parquet_files.into_iter().take(5).collect();
|
||||
|
@ -981,7 +965,6 @@ mod tests {
|
|||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_counter,
|
||||
&compaction_input_file_bytes,
|
||||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
|
@ -1057,7 +1040,7 @@ mod tests {
|
|||
candidate_partition,
|
||||
parquet_files,
|
||||
} = test_setup().await;
|
||||
let (compaction_counter, compaction_input_file_bytes) = metrics();
|
||||
let compaction_input_file_bytes = metrics();
|
||||
let sequencer_id = candidate_partition.sequencer_id();
|
||||
|
||||
compact_parquet_files(
|
||||
|
@ -1067,7 +1050,6 @@ mod tests {
|
|||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_counter,
|
||||
&compaction_input_file_bytes,
|
||||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
|
|
Loading…
Reference in New Issue