feat: add more metrics for compactor (#4575)
* feat: add more metrics for compactor * chore: clearer commentpull/24376/head
parent
0d5e3c97f0
commit
f9e3495e47
|
@ -16,7 +16,7 @@ use data_types::{
|
||||||
Timestamp, Tombstone, TombstoneId,
|
Timestamp, Tombstone, TombstoneId,
|
||||||
};
|
};
|
||||||
use datafusion::error::DataFusionError;
|
use datafusion::error::DataFusionError;
|
||||||
use iox_catalog::interface::{Catalog, Transaction};
|
use iox_catalog::interface::{Catalog, Transaction, INITIAL_COMPACTION_LEVEL};
|
||||||
use iox_time::{Time, TimeProvider};
|
use iox_time::{Time, TimeProvider};
|
||||||
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
|
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||||
use object_store::DynObjectStore;
|
use object_store::DynObjectStore;
|
||||||
|
@ -207,13 +207,21 @@ pub struct Compactor {
|
||||||
/// Counter for the number of files compacted
|
/// Counter for the number of files compacted
|
||||||
compaction_counter: Metric<U64Counter>,
|
compaction_counter: Metric<U64Counter>,
|
||||||
|
|
||||||
|
/// Counter for the actual number of files compacted
|
||||||
|
/// compaction_actual_counter = compaction_counter - some large-enough non-overlapped files
|
||||||
|
compaction_actual_counter: Metric<U64Counter>,
|
||||||
|
|
||||||
/// Counter for level promotion from level 0 to 1
|
/// Counter for level promotion from level 0 to 1
|
||||||
|
/// These are large enough and non-overlapped file
|
||||||
level_promotion_counter: Metric<U64Counter>,
|
level_promotion_counter: Metric<U64Counter>,
|
||||||
|
|
||||||
/// Gauge for the number of compaction candidates
|
/// Counter for the number of output compacted files
|
||||||
|
compaction_output_counter: Metric<U64Counter>,
|
||||||
|
|
||||||
|
/// Gauge for the number of compaction partition candidates
|
||||||
compaction_candidate_gauge: Metric<U64Gauge>,
|
compaction_candidate_gauge: Metric<U64Gauge>,
|
||||||
|
|
||||||
/// Gauge for the number of bytes across compaction candidates
|
/// Gauge for the number of bytes of level-0 files across compaction partition candidates
|
||||||
compaction_candidate_bytes_gauge: Metric<U64Gauge>,
|
compaction_candidate_bytes_gauge: Metric<U64Gauge>,
|
||||||
|
|
||||||
/// Histogram for tracking the time to compact a partition
|
/// Histogram for tracking the time to compact a partition
|
||||||
|
@ -237,6 +245,17 @@ impl Compactor {
|
||||||
"compactor_compacted_files_total",
|
"compactor_compacted_files_total",
|
||||||
"counter for the number of files compacted",
|
"counter for the number of files compacted",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let compaction_actual_counter = registry.register_metric(
|
||||||
|
"compactor_actual_compacted_files_total",
|
||||||
|
"counter for the number of actual files compacted",
|
||||||
|
);
|
||||||
|
|
||||||
|
let compaction_output_counter = registry.register_metric(
|
||||||
|
"compactor_output_compacted_files_total",
|
||||||
|
"counter for the number of output compacted files",
|
||||||
|
);
|
||||||
|
|
||||||
let compaction_candidate_gauge = registry.register_metric(
|
let compaction_candidate_gauge = registry.register_metric(
|
||||||
"compactor_candidates",
|
"compactor_candidates",
|
||||||
"gauge for the number of compaction candidates that are found when checked",
|
"gauge for the number of compaction candidates that are found when checked",
|
||||||
|
@ -269,6 +288,8 @@ impl Compactor {
|
||||||
backoff_config,
|
backoff_config,
|
||||||
config,
|
config,
|
||||||
compaction_counter,
|
compaction_counter,
|
||||||
|
compaction_actual_counter,
|
||||||
|
compaction_output_counter,
|
||||||
level_promotion_counter,
|
level_promotion_counter,
|
||||||
compaction_candidate_gauge,
|
compaction_candidate_gauge,
|
||||||
compaction_candidate_bytes_gauge,
|
compaction_candidate_bytes_gauge,
|
||||||
|
@ -409,6 +430,7 @@ impl Compactor {
|
||||||
|
|
||||||
let sequencer_id = parquet_files[0].sequencer_id;
|
let sequencer_id = parquet_files[0].sequencer_id;
|
||||||
let file_count = parquet_files.len();
|
let file_count = parquet_files.len();
|
||||||
|
let mut actual_compacted_file_count = file_count;
|
||||||
|
|
||||||
// Group overlapped files
|
// Group overlapped files
|
||||||
let overlapped_file_groups = Self::overlapped_groups(parquet_files);
|
let overlapped_file_groups = Self::overlapped_groups(parquet_files);
|
||||||
|
@ -424,14 +446,18 @@ impl Compactor {
|
||||||
// Compact, persist,and update catalog accordingly for each overlaped file
|
// Compact, persist,and update catalog accordingly for each overlaped file
|
||||||
let mut tombstones = BTreeMap::new();
|
let mut tombstones = BTreeMap::new();
|
||||||
let mut upgrade_level_list: Vec<ParquetFileId> = vec![];
|
let mut upgrade_level_list: Vec<ParquetFileId> = vec![];
|
||||||
|
let mut output_file_count = 0;
|
||||||
for group in groups_with_tombstones {
|
for group in groups_with_tombstones {
|
||||||
// keep tombstone ids
|
// keep tombstone ids
|
||||||
tombstones = Self::union_tombstones(tombstones, &group);
|
tombstones = Self::union_tombstones(tombstones, &group);
|
||||||
|
|
||||||
// Only one file without tombstones, no need to compact, upgrade it since it is
|
// Only one file without tombstones, no need to compact.
|
||||||
// non-overlapping
|
|
||||||
if group.parquet_files.len() == 1 && group.tombstones.is_empty() {
|
if group.parquet_files.len() == 1 && group.tombstones.is_empty() {
|
||||||
upgrade_level_list.push(group.parquet_files[0].parquet_file_id());
|
actual_compacted_file_count -= 1;
|
||||||
|
// If it is level 0, upgrade it since it is non-overlapping
|
||||||
|
if group.parquet_files[0].data.compaction_level == INITIAL_COMPACTION_LEVEL {
|
||||||
|
upgrade_level_list.push(group.parquet_files[0].parquet_file_id());
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -468,6 +494,9 @@ impl Compactor {
|
||||||
catalog_update_info.push(CatalogUpdate::new(meta, file_size, md, tombstones));
|
catalog_update_info.push(CatalogUpdate::new(meta, file_size, md, tombstones));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
output_file_count += catalog_update_info.len();
|
||||||
|
|
||||||
let mut txn = self
|
let mut txn = self
|
||||||
.catalog
|
.catalog
|
||||||
.start_transaction()
|
.start_transaction()
|
||||||
|
@ -505,9 +534,15 @@ impl Compactor {
|
||||||
duration_ms.record(delta.as_millis() as _);
|
duration_ms.record(delta.as_millis() as _);
|
||||||
}
|
}
|
||||||
|
|
||||||
let compaction_counter = self.compaction_counter.recorder(attributes);
|
let compaction_counter = self.compaction_counter.recorder(attributes.clone());
|
||||||
compaction_counter.inc(file_count as u64);
|
compaction_counter.inc(file_count as u64);
|
||||||
|
|
||||||
|
let compaction_actual_counter = self.compaction_actual_counter.recorder(attributes.clone());
|
||||||
|
compaction_actual_counter.inc(actual_compacted_file_count as u64);
|
||||||
|
|
||||||
|
let compaction_output_counter = self.compaction_output_counter.recorder(attributes);
|
||||||
|
compaction_output_counter.inc(output_file_count as u64);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue