feat: add compaction level to commit metrics (#6985)
* feat: add compaction level to commit metrics * test: more realismpull/24376/head
parent
7fb052208f
commit
f499022511
|
@ -991,6 +991,7 @@ dependencies = [
|
|||
"iox_query",
|
||||
"iox_tests",
|
||||
"iox_time",
|
||||
"itertools",
|
||||
"metric",
|
||||
"object_store",
|
||||
"observability_deps",
|
||||
|
|
|
@ -15,6 +15,7 @@ futures = "0.3"
|
|||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
iox_time = { path = "../iox_time" }
|
||||
itertools = "0.10.5"
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.5.4"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::fmt::Display;
|
||||
use std::{collections::HashMap, fmt::Display};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
|
||||
use itertools::Itertools;
|
||||
use metric::{Registry, U64Histogram, U64HistogramOptions};
|
||||
|
||||
use super::Commit;
|
||||
|
@ -44,9 +45,14 @@ impl From<HistogramType> for U64HistogramOptions {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct Histogram {
|
||||
create: U64Histogram,
|
||||
delete: U64Histogram,
|
||||
upgrade: U64Histogram,
|
||||
/// Files created, by the level they were created at.
|
||||
create: HashMap<CompactionLevel, U64Histogram>,
|
||||
|
||||
/// Files deleted, by the level they had at this point in time.
|
||||
delete: HashMap<CompactionLevel, U64Histogram>,
|
||||
|
||||
/// Files upgraded, by level they had before the upgrade and the target compaction level.
|
||||
upgrade: HashMap<(CompactionLevel, CompactionLevel), U64Histogram>,
|
||||
}
|
||||
|
||||
impl Histogram {
|
||||
|
@ -59,9 +65,34 @@ impl Histogram {
|
|||
let metric =
|
||||
registry
|
||||
.register_metric_with_options::<U64Histogram, _>(name, description, || t.into());
|
||||
let create = metric.recorder(&[("op", "create")]);
|
||||
let delete = metric.recorder(&[("op", "delete")]);
|
||||
let upgrade = metric.recorder(&[("op", "upgrade")]);
|
||||
let create = CompactionLevel::all()
|
||||
.iter()
|
||||
.map(|level| {
|
||||
(
|
||||
*level,
|
||||
metric.recorder(&[("op", "create"), ("level", level.name())]),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let delete = CompactionLevel::all()
|
||||
.iter()
|
||||
.map(|level| {
|
||||
(
|
||||
*level,
|
||||
metric.recorder(&[("op", "delete"), ("level", level.name())]),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let upgrade = CompactionLevel::all()
|
||||
.iter()
|
||||
.cartesian_product(CompactionLevel::all())
|
||||
.map(|(from, to)| {
|
||||
(
|
||||
(*from, *to),
|
||||
metric.recorder(&[("op", "upgrade"), ("from", from.name()), ("to", to.name())]),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
create,
|
||||
delete,
|
||||
|
@ -160,43 +191,110 @@ where
|
|||
|
||||
// per file metrics
|
||||
for f in create {
|
||||
self.file_bytes.create.record(f.file_size_bytes as u64);
|
||||
self.file_rows.create.record(f.row_count as u64);
|
||||
self.file_bytes
|
||||
.create
|
||||
.get(&f.compaction_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(f.file_size_bytes as u64);
|
||||
self.file_rows
|
||||
.create
|
||||
.get(&f.compaction_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(f.row_count as u64);
|
||||
}
|
||||
for f in delete {
|
||||
self.file_bytes.delete.record(f.file_size_bytes as u64);
|
||||
self.file_rows.delete.record(f.row_count as u64);
|
||||
self.file_bytes
|
||||
.delete
|
||||
.get(&f.compaction_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(f.file_size_bytes as u64);
|
||||
self.file_rows
|
||||
.delete
|
||||
.get(&f.compaction_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(f.row_count as u64);
|
||||
}
|
||||
for f in upgrade {
|
||||
self.file_bytes.upgrade.record(f.file_size_bytes as u64);
|
||||
self.file_rows.upgrade.record(f.row_count as u64);
|
||||
self.file_bytes
|
||||
.upgrade
|
||||
.get(&(f.compaction_level, target_level))
|
||||
.expect("all compaction levels covered")
|
||||
.record(f.file_size_bytes as u64);
|
||||
self.file_rows
|
||||
.upgrade
|
||||
.get(&(f.compaction_level, target_level))
|
||||
.expect("all compaction levels covered")
|
||||
.record(f.row_count as u64);
|
||||
}
|
||||
|
||||
// per-partition metrics
|
||||
self.job_files.create.record(create.len() as u64);
|
||||
self.job_files.delete.record(delete.len() as u64);
|
||||
self.job_files.upgrade.record(upgrade.len() as u64);
|
||||
self.job_bytes
|
||||
.create
|
||||
.record(create.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
||||
self.job_bytes
|
||||
.delete
|
||||
.record(delete.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
||||
self.job_bytes.upgrade.record(
|
||||
upgrade
|
||||
for file_level in CompactionLevel::all() {
|
||||
let create = create
|
||||
.iter()
|
||||
.map(|f| f.file_size_bytes as u64)
|
||||
.sum::<u64>(),
|
||||
);
|
||||
self.job_rows
|
||||
.create
|
||||
.record(create.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||
self.job_rows
|
||||
.delete
|
||||
.record(delete.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||
self.job_rows
|
||||
.upgrade
|
||||
.record(upgrade.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||
.filter(|f| f.compaction_level == *file_level)
|
||||
.collect::<Vec<_>>();
|
||||
let delete = delete
|
||||
.iter()
|
||||
.filter(|f| f.compaction_level == *file_level)
|
||||
.collect::<Vec<_>>();
|
||||
let upgrade = upgrade
|
||||
.iter()
|
||||
.filter(|f| f.compaction_level == *file_level)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.job_files
|
||||
.create
|
||||
.get(file_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(create.len() as u64);
|
||||
self.job_bytes
|
||||
.create
|
||||
.get(file_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(create.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
||||
self.job_rows
|
||||
.create
|
||||
.get(file_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(create.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||
|
||||
self.job_files
|
||||
.delete
|
||||
.get(file_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(delete.len() as u64);
|
||||
self.job_bytes
|
||||
.delete
|
||||
.get(file_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(delete.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
||||
self.job_rows
|
||||
.delete
|
||||
.get(file_level)
|
||||
.expect("all compaction levels covered")
|
||||
.record(delete.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||
|
||||
self.job_files
|
||||
.upgrade
|
||||
.get(&(*file_level, target_level))
|
||||
.expect("all compaction levels covered")
|
||||
.record(upgrade.len() as u64);
|
||||
self.job_bytes
|
||||
.upgrade
|
||||
.get(&(*file_level, target_level))
|
||||
.expect("all compaction levels covered")
|
||||
.record(
|
||||
upgrade
|
||||
.iter()
|
||||
.map(|f| f.file_size_bytes as u64)
|
||||
.sum::<u64>(),
|
||||
);
|
||||
self.job_rows
|
||||
.upgrade
|
||||
.get(&(*file_level, target_level))
|
||||
.expect("all compaction levels covered")
|
||||
.record(upgrade.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||
}
|
||||
|
||||
ids
|
||||
}
|
||||
|
@ -230,9 +328,13 @@ mod tests {
|
|||
.with_file_size_bytes(10_001)
|
||||
.with_row_count(1_001)
|
||||
.build();
|
||||
let existing_2 = ParquetFileBuilder::new(2)
|
||||
let existing_2a = ParquetFileBuilder::new(2)
|
||||
.with_file_size_bytes(10_002)
|
||||
.with_row_count(1_002)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.build();
|
||||
let existing_2b = ParquetFileBuilder::from(existing_2a.clone())
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.build();
|
||||
let existing_3 = ParquetFileBuilder::new(3)
|
||||
.with_file_size_bytes(10_004)
|
||||
|
@ -247,6 +349,7 @@ mod tests {
|
|||
.with_file_size_bytes(10_016)
|
||||
.with_row_count(1_016)
|
||||
.with_partition(1)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.build();
|
||||
|
||||
for metric_name in [
|
||||
|
@ -256,9 +359,52 @@ mod tests {
|
|||
METRIC_NAME_JOB_FILES,
|
||||
METRIC_NAME_JOB_ROWS,
|
||||
] {
|
||||
for op in ["create", "delete", "upgrade"] {
|
||||
assert_eq!(hist_count(®istry, metric_name, op), 0);
|
||||
assert_eq!(hist_total(®istry, metric_name, op), 0);
|
||||
for file_level in CompactionLevel::all() {
|
||||
for op in ["create", "delete"] {
|
||||
assert_eq!(
|
||||
hist_count(
|
||||
®istry,
|
||||
metric_name,
|
||||
[("op", op), ("level", file_level.name())]
|
||||
),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
hist_total(
|
||||
®istry,
|
||||
metric_name,
|
||||
[("op", op), ("level", file_level.name())]
|
||||
),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
for target_level in CompactionLevel::all() {
|
||||
assert_eq!(
|
||||
hist_count(
|
||||
®istry,
|
||||
metric_name,
|
||||
[
|
||||
("op", "upgrade"),
|
||||
("from", file_level.name()),
|
||||
("to", target_level.name())
|
||||
]
|
||||
),
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
hist_total(
|
||||
®istry,
|
||||
metric_name,
|
||||
[
|
||||
("op", "upgrade"),
|
||||
("from", file_level.name()),
|
||||
("to", target_level.name())
|
||||
]
|
||||
),
|
||||
0
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -266,7 +412,7 @@ mod tests {
|
|||
.commit(
|
||||
PartitionId::new(1),
|
||||
&[existing_1.clone()],
|
||||
&[existing_2.clone()],
|
||||
&[existing_2a.clone()],
|
||||
&[created.clone().into()],
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
|
@ -276,7 +422,7 @@ mod tests {
|
|||
let ids = commit
|
||||
.commit(
|
||||
PartitionId::new(2),
|
||||
&[existing_2.clone(), existing_3.clone()],
|
||||
&[existing_2b.clone(), existing_3.clone()],
|
||||
&[existing_4.clone()],
|
||||
&[],
|
||||
CompactionLevel::Final,
|
||||
|
@ -284,19 +430,68 @@ mod tests {
|
|||
.await;
|
||||
assert_eq!(ids, vec![]);
|
||||
|
||||
assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "create"), 1);
|
||||
assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "upgrade"), 2);
|
||||
assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "delete"), 3);
|
||||
assert_eq!(
|
||||
hist_total(®istry, METRIC_NAME_FILE_BYTES, "create"),
|
||||
hist_count(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "create"), ("level", "L0")]
|
||||
),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
hist_count(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "upgrade"), ("from", "L0"), ("to", "L1")]
|
||||
),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
hist_count(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "upgrade"), ("from", "L1"), ("to", "L2")]
|
||||
),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
hist_count(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "delete"), ("level", "L1")]
|
||||
),
|
||||
3
|
||||
);
|
||||
assert_eq!(
|
||||
hist_total(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "create"), ("level", "L0")]
|
||||
),
|
||||
10_016
|
||||
);
|
||||
assert_eq!(
|
||||
hist_total(®istry, METRIC_NAME_FILE_BYTES, "upgrade"),
|
||||
20_010
|
||||
hist_total(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "upgrade"), ("from", "L0"), ("to", "L1")]
|
||||
),
|
||||
10_002
|
||||
);
|
||||
assert_eq!(
|
||||
hist_total(®istry, METRIC_NAME_FILE_BYTES, "delete"),
|
||||
hist_total(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "upgrade"), ("from", "L1"), ("to", "L2")]
|
||||
),
|
||||
10_008
|
||||
);
|
||||
assert_eq!(
|
||||
hist_total(
|
||||
®istry,
|
||||
METRIC_NAME_FILE_BYTES,
|
||||
[("op", "delete"), ("level", "L1")]
|
||||
),
|
||||
30_007
|
||||
);
|
||||
|
||||
|
@ -306,13 +501,13 @@ mod tests {
|
|||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(1),
|
||||
delete: vec![existing_1],
|
||||
upgrade: vec![existing_2.clone()],
|
||||
upgrade: vec![existing_2a.clone()],
|
||||
created: vec![created],
|
||||
target_level: CompactionLevel::FileNonOverlapped,
|
||||
},
|
||||
CommitHistoryEntry {
|
||||
partition_id: PartitionId::new(2),
|
||||
delete: vec![existing_2, existing_3],
|
||||
delete: vec![existing_2b, existing_3],
|
||||
upgrade: vec![existing_4],
|
||||
created: vec![],
|
||||
target_level: CompactionLevel::Final,
|
||||
|
@ -321,24 +516,32 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
fn hist(
|
||||
fn hist<const N: usize>(
|
||||
registry: &Registry,
|
||||
metric_name: &'static str,
|
||||
op: &'static str,
|
||||
attributes: [(&'static str, &'static str); N],
|
||||
) -> HistogramObservation<u64> {
|
||||
registry
|
||||
.get_instrument::<Metric<U64Histogram>>(metric_name)
|
||||
.expect("instrument not found")
|
||||
.get_observer(&Attributes::from(&[("op", op)]))
|
||||
.get_observer(&Attributes::from(&attributes))
|
||||
.expect("observer not found")
|
||||
.fetch()
|
||||
}
|
||||
|
||||
fn hist_count(registry: &Registry, metric_name: &'static str, op: &'static str) -> u64 {
|
||||
hist(registry, metric_name, op).sample_count()
|
||||
fn hist_count<const N: usize>(
|
||||
registry: &Registry,
|
||||
metric_name: &'static str,
|
||||
attributes: [(&'static str, &'static str); N],
|
||||
) -> u64 {
|
||||
hist(registry, metric_name, attributes).sample_count()
|
||||
}
|
||||
|
||||
fn hist_total(registry: &Registry, metric_name: &'static str, op: &'static str) -> u64 {
|
||||
hist(registry, metric_name, op).total
|
||||
fn hist_total<const N: usize>(
|
||||
registry: &Registry,
|
||||
metric_name: &'static str,
|
||||
attributes: [(&'static str, &'static str); N],
|
||||
) -> u64 {
|
||||
hist(registry, metric_name, attributes).total
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ impl CompactionLevel {
|
|||
match self {
|
||||
Self::Initial => Self::FileNonOverlapped,
|
||||
Self::FileNonOverlapped => Self::Final,
|
||||
_ => Self::Final,
|
||||
Self::Final => Self::Final,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,7 +96,21 @@ impl CompactionLevel {
|
|||
match self {
|
||||
Self::Initial => Self::Initial,
|
||||
Self::FileNonOverlapped => Self::Initial,
|
||||
_ => Self::FileNonOverlapped,
|
||||
Self::Final => Self::FileNonOverlapped,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all levels
|
||||
pub fn all() -> &'static [Self] {
|
||||
&[Self::Initial, Self::FileNonOverlapped, Self::Final]
|
||||
}
|
||||
|
||||
/// Static name
|
||||
pub fn name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Initial => "L0",
|
||||
Self::FileNonOverlapped => "L1",
|
||||
Self::Final => "L2",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use data_types::{
|
|||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
/// Build up [`ParquetFile`]s for testing
|
||||
pub struct ParquetFileBuilder {
|
||||
file: ParquetFile,
|
||||
|
@ -93,6 +93,12 @@ impl ParquetFileBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<ParquetFile> for ParquetFileBuilder {
|
||||
fn from(file: ParquetFile) -> Self {
|
||||
Self { file }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Build [`Table`]s for testing
|
||||
pub struct TableBuilder {
|
||||
|
|
Loading…
Reference in New Issue