Merge branch 'main' into jrb_5_add_shutdown_log
commit
391e64772c
|
@ -991,6 +991,7 @@ dependencies = [
|
||||||
"iox_query",
|
"iox_query",
|
||||||
"iox_tests",
|
"iox_tests",
|
||||||
"iox_time",
|
"iox_time",
|
||||||
|
"itertools",
|
||||||
"metric",
|
"metric",
|
||||||
"object_store",
|
"object_store",
|
||||||
"observability_deps",
|
"observability_deps",
|
||||||
|
@ -3857,9 +3858,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "once_cell"
|
name = "once_cell"
|
||||||
version = "1.17.0"
|
version = "1.17.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
|
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"parking_lot_core 0.9.7",
|
"parking_lot_core 0.9.7",
|
||||||
]
|
]
|
||||||
|
|
|
@ -15,6 +15,7 @@ futures = "0.3"
|
||||||
iox_catalog = { path = "../iox_catalog" }
|
iox_catalog = { path = "../iox_catalog" }
|
||||||
iox_query = { path = "../iox_query" }
|
iox_query = { path = "../iox_query" }
|
||||||
iox_time = { path = "../iox_time" }
|
iox_time = { path = "../iox_time" }
|
||||||
|
itertools = "0.10.5"
|
||||||
metric = { path = "../metric" }
|
metric = { path = "../metric" }
|
||||||
object_store = "0.5.4"
|
object_store = "0.5.4"
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use std::fmt::Display;
|
use std::{collections::HashMap, fmt::Display};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
|
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
|
||||||
|
use itertools::Itertools;
|
||||||
use metric::{Registry, U64Histogram, U64HistogramOptions};
|
use metric::{Registry, U64Histogram, U64HistogramOptions};
|
||||||
|
|
||||||
use super::Commit;
|
use super::Commit;
|
||||||
|
@ -44,9 +45,14 @@ impl From<HistogramType> for U64HistogramOptions {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Histogram {
|
struct Histogram {
|
||||||
create: U64Histogram,
|
/// Files created, by the level they were created at.
|
||||||
delete: U64Histogram,
|
create: HashMap<CompactionLevel, U64Histogram>,
|
||||||
upgrade: U64Histogram,
|
|
||||||
|
/// Files deleted, by the level they had at this point in time.
|
||||||
|
delete: HashMap<CompactionLevel, U64Histogram>,
|
||||||
|
|
||||||
|
/// Files upgraded, by level they had before the upgrade and the target compaction level.
|
||||||
|
upgrade: HashMap<(CompactionLevel, CompactionLevel), U64Histogram>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Histogram {
|
impl Histogram {
|
||||||
|
@ -59,9 +65,34 @@ impl Histogram {
|
||||||
let metric =
|
let metric =
|
||||||
registry
|
registry
|
||||||
.register_metric_with_options::<U64Histogram, _>(name, description, || t.into());
|
.register_metric_with_options::<U64Histogram, _>(name, description, || t.into());
|
||||||
let create = metric.recorder(&[("op", "create")]);
|
let create = CompactionLevel::all()
|
||||||
let delete = metric.recorder(&[("op", "delete")]);
|
.iter()
|
||||||
let upgrade = metric.recorder(&[("op", "upgrade")]);
|
.map(|level| {
|
||||||
|
(
|
||||||
|
*level,
|
||||||
|
metric.recorder(&[("op", "create"), ("level", level.name())]),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let delete = CompactionLevel::all()
|
||||||
|
.iter()
|
||||||
|
.map(|level| {
|
||||||
|
(
|
||||||
|
*level,
|
||||||
|
metric.recorder(&[("op", "delete"), ("level", level.name())]),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let upgrade = CompactionLevel::all()
|
||||||
|
.iter()
|
||||||
|
.cartesian_product(CompactionLevel::all())
|
||||||
|
.map(|(from, to)| {
|
||||||
|
(
|
||||||
|
(*from, *to),
|
||||||
|
metric.recorder(&[("op", "upgrade"), ("from", from.name()), ("to", to.name())]),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
Self {
|
Self {
|
||||||
create,
|
create,
|
||||||
delete,
|
delete,
|
||||||
|
@ -160,43 +191,110 @@ where
|
||||||
|
|
||||||
// per file metrics
|
// per file metrics
|
||||||
for f in create {
|
for f in create {
|
||||||
self.file_bytes.create.record(f.file_size_bytes as u64);
|
self.file_bytes
|
||||||
self.file_rows.create.record(f.row_count as u64);
|
.create
|
||||||
|
.get(&f.compaction_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(f.file_size_bytes as u64);
|
||||||
|
self.file_rows
|
||||||
|
.create
|
||||||
|
.get(&f.compaction_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(f.row_count as u64);
|
||||||
}
|
}
|
||||||
for f in delete {
|
for f in delete {
|
||||||
self.file_bytes.delete.record(f.file_size_bytes as u64);
|
self.file_bytes
|
||||||
self.file_rows.delete.record(f.row_count as u64);
|
.delete
|
||||||
|
.get(&f.compaction_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(f.file_size_bytes as u64);
|
||||||
|
self.file_rows
|
||||||
|
.delete
|
||||||
|
.get(&f.compaction_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(f.row_count as u64);
|
||||||
}
|
}
|
||||||
for f in upgrade {
|
for f in upgrade {
|
||||||
self.file_bytes.upgrade.record(f.file_size_bytes as u64);
|
self.file_bytes
|
||||||
self.file_rows.upgrade.record(f.row_count as u64);
|
.upgrade
|
||||||
|
.get(&(f.compaction_level, target_level))
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(f.file_size_bytes as u64);
|
||||||
|
self.file_rows
|
||||||
|
.upgrade
|
||||||
|
.get(&(f.compaction_level, target_level))
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(f.row_count as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
// per-partition metrics
|
// per-partition metrics
|
||||||
self.job_files.create.record(create.len() as u64);
|
for file_level in CompactionLevel::all() {
|
||||||
self.job_files.delete.record(delete.len() as u64);
|
let create = create
|
||||||
self.job_files.upgrade.record(upgrade.len() as u64);
|
|
||||||
self.job_bytes
|
|
||||||
.create
|
|
||||||
.record(create.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
|
||||||
self.job_bytes
|
|
||||||
.delete
|
|
||||||
.record(delete.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
|
||||||
self.job_bytes.upgrade.record(
|
|
||||||
upgrade
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|f| f.file_size_bytes as u64)
|
.filter(|f| f.compaction_level == *file_level)
|
||||||
.sum::<u64>(),
|
.collect::<Vec<_>>();
|
||||||
);
|
let delete = delete
|
||||||
self.job_rows
|
.iter()
|
||||||
.create
|
.filter(|f| f.compaction_level == *file_level)
|
||||||
.record(create.iter().map(|f| f.row_count as u64).sum::<u64>());
|
.collect::<Vec<_>>();
|
||||||
self.job_rows
|
let upgrade = upgrade
|
||||||
.delete
|
.iter()
|
||||||
.record(delete.iter().map(|f| f.row_count as u64).sum::<u64>());
|
.filter(|f| f.compaction_level == *file_level)
|
||||||
self.job_rows
|
.collect::<Vec<_>>();
|
||||||
.upgrade
|
|
||||||
.record(upgrade.iter().map(|f| f.row_count as u64).sum::<u64>());
|
self.job_files
|
||||||
|
.create
|
||||||
|
.get(file_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(create.len() as u64);
|
||||||
|
self.job_bytes
|
||||||
|
.create
|
||||||
|
.get(file_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(create.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
||||||
|
self.job_rows
|
||||||
|
.create
|
||||||
|
.get(file_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(create.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||||
|
|
||||||
|
self.job_files
|
||||||
|
.delete
|
||||||
|
.get(file_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(delete.len() as u64);
|
||||||
|
self.job_bytes
|
||||||
|
.delete
|
||||||
|
.get(file_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(delete.iter().map(|f| f.file_size_bytes as u64).sum::<u64>());
|
||||||
|
self.job_rows
|
||||||
|
.delete
|
||||||
|
.get(file_level)
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(delete.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||||
|
|
||||||
|
self.job_files
|
||||||
|
.upgrade
|
||||||
|
.get(&(*file_level, target_level))
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(upgrade.len() as u64);
|
||||||
|
self.job_bytes
|
||||||
|
.upgrade
|
||||||
|
.get(&(*file_level, target_level))
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(
|
||||||
|
upgrade
|
||||||
|
.iter()
|
||||||
|
.map(|f| f.file_size_bytes as u64)
|
||||||
|
.sum::<u64>(),
|
||||||
|
);
|
||||||
|
self.job_rows
|
||||||
|
.upgrade
|
||||||
|
.get(&(*file_level, target_level))
|
||||||
|
.expect("all compaction levels covered")
|
||||||
|
.record(upgrade.iter().map(|f| f.row_count as u64).sum::<u64>());
|
||||||
|
}
|
||||||
|
|
||||||
ids
|
ids
|
||||||
}
|
}
|
||||||
|
@ -230,9 +328,13 @@ mod tests {
|
||||||
.with_file_size_bytes(10_001)
|
.with_file_size_bytes(10_001)
|
||||||
.with_row_count(1_001)
|
.with_row_count(1_001)
|
||||||
.build();
|
.build();
|
||||||
let existing_2 = ParquetFileBuilder::new(2)
|
let existing_2a = ParquetFileBuilder::new(2)
|
||||||
.with_file_size_bytes(10_002)
|
.with_file_size_bytes(10_002)
|
||||||
.with_row_count(1_002)
|
.with_row_count(1_002)
|
||||||
|
.with_compaction_level(CompactionLevel::Initial)
|
||||||
|
.build();
|
||||||
|
let existing_2b = ParquetFileBuilder::from(existing_2a.clone())
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
.build();
|
.build();
|
||||||
let existing_3 = ParquetFileBuilder::new(3)
|
let existing_3 = ParquetFileBuilder::new(3)
|
||||||
.with_file_size_bytes(10_004)
|
.with_file_size_bytes(10_004)
|
||||||
|
@ -247,6 +349,7 @@ mod tests {
|
||||||
.with_file_size_bytes(10_016)
|
.with_file_size_bytes(10_016)
|
||||||
.with_row_count(1_016)
|
.with_row_count(1_016)
|
||||||
.with_partition(1)
|
.with_partition(1)
|
||||||
|
.with_compaction_level(CompactionLevel::Initial)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
for metric_name in [
|
for metric_name in [
|
||||||
|
@ -256,9 +359,52 @@ mod tests {
|
||||||
METRIC_NAME_JOB_FILES,
|
METRIC_NAME_JOB_FILES,
|
||||||
METRIC_NAME_JOB_ROWS,
|
METRIC_NAME_JOB_ROWS,
|
||||||
] {
|
] {
|
||||||
for op in ["create", "delete", "upgrade"] {
|
for file_level in CompactionLevel::all() {
|
||||||
assert_eq!(hist_count(®istry, metric_name, op), 0);
|
for op in ["create", "delete"] {
|
||||||
assert_eq!(hist_total(®istry, metric_name, op), 0);
|
assert_eq!(
|
||||||
|
hist_count(
|
||||||
|
®istry,
|
||||||
|
metric_name,
|
||||||
|
[("op", op), ("level", file_level.name())]
|
||||||
|
),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_total(
|
||||||
|
®istry,
|
||||||
|
metric_name,
|
||||||
|
[("op", op), ("level", file_level.name())]
|
||||||
|
),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
for target_level in CompactionLevel::all() {
|
||||||
|
assert_eq!(
|
||||||
|
hist_count(
|
||||||
|
®istry,
|
||||||
|
metric_name,
|
||||||
|
[
|
||||||
|
("op", "upgrade"),
|
||||||
|
("from", file_level.name()),
|
||||||
|
("to", target_level.name())
|
||||||
|
]
|
||||||
|
),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_total(
|
||||||
|
®istry,
|
||||||
|
metric_name,
|
||||||
|
[
|
||||||
|
("op", "upgrade"),
|
||||||
|
("from", file_level.name()),
|
||||||
|
("to", target_level.name())
|
||||||
|
]
|
||||||
|
),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +412,7 @@ mod tests {
|
||||||
.commit(
|
.commit(
|
||||||
PartitionId::new(1),
|
PartitionId::new(1),
|
||||||
&[existing_1.clone()],
|
&[existing_1.clone()],
|
||||||
&[existing_2.clone()],
|
&[existing_2a.clone()],
|
||||||
&[created.clone().into()],
|
&[created.clone().into()],
|
||||||
CompactionLevel::FileNonOverlapped,
|
CompactionLevel::FileNonOverlapped,
|
||||||
)
|
)
|
||||||
|
@ -276,7 +422,7 @@ mod tests {
|
||||||
let ids = commit
|
let ids = commit
|
||||||
.commit(
|
.commit(
|
||||||
PartitionId::new(2),
|
PartitionId::new(2),
|
||||||
&[existing_2.clone(), existing_3.clone()],
|
&[existing_2b.clone(), existing_3.clone()],
|
||||||
&[existing_4.clone()],
|
&[existing_4.clone()],
|
||||||
&[],
|
&[],
|
||||||
CompactionLevel::Final,
|
CompactionLevel::Final,
|
||||||
|
@ -284,19 +430,68 @@ mod tests {
|
||||||
.await;
|
.await;
|
||||||
assert_eq!(ids, vec![]);
|
assert_eq!(ids, vec![]);
|
||||||
|
|
||||||
assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "create"), 1);
|
|
||||||
assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "upgrade"), 2);
|
|
||||||
assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "delete"), 3);
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
hist_total(®istry, METRIC_NAME_FILE_BYTES, "create"),
|
hist_count(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "create"), ("level", "L0")]
|
||||||
|
),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_count(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "upgrade"), ("from", "L0"), ("to", "L1")]
|
||||||
|
),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_count(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "upgrade"), ("from", "L1"), ("to", "L2")]
|
||||||
|
),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_count(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "delete"), ("level", "L1")]
|
||||||
|
),
|
||||||
|
3
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_total(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "create"), ("level", "L0")]
|
||||||
|
),
|
||||||
10_016
|
10_016
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
hist_total(®istry, METRIC_NAME_FILE_BYTES, "upgrade"),
|
hist_total(
|
||||||
20_010
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "upgrade"), ("from", "L0"), ("to", "L1")]
|
||||||
|
),
|
||||||
|
10_002
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
hist_total(®istry, METRIC_NAME_FILE_BYTES, "delete"),
|
hist_total(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "upgrade"), ("from", "L1"), ("to", "L2")]
|
||||||
|
),
|
||||||
|
10_008
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
hist_total(
|
||||||
|
®istry,
|
||||||
|
METRIC_NAME_FILE_BYTES,
|
||||||
|
[("op", "delete"), ("level", "L1")]
|
||||||
|
),
|
||||||
30_007
|
30_007
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -306,13 +501,13 @@ mod tests {
|
||||||
CommitHistoryEntry {
|
CommitHistoryEntry {
|
||||||
partition_id: PartitionId::new(1),
|
partition_id: PartitionId::new(1),
|
||||||
delete: vec![existing_1],
|
delete: vec![existing_1],
|
||||||
upgrade: vec![existing_2.clone()],
|
upgrade: vec![existing_2a.clone()],
|
||||||
created: vec![created],
|
created: vec![created],
|
||||||
target_level: CompactionLevel::FileNonOverlapped,
|
target_level: CompactionLevel::FileNonOverlapped,
|
||||||
},
|
},
|
||||||
CommitHistoryEntry {
|
CommitHistoryEntry {
|
||||||
partition_id: PartitionId::new(2),
|
partition_id: PartitionId::new(2),
|
||||||
delete: vec![existing_2, existing_3],
|
delete: vec![existing_2b, existing_3],
|
||||||
upgrade: vec![existing_4],
|
upgrade: vec![existing_4],
|
||||||
created: vec![],
|
created: vec![],
|
||||||
target_level: CompactionLevel::Final,
|
target_level: CompactionLevel::Final,
|
||||||
|
@ -321,24 +516,32 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn hist(
|
fn hist<const N: usize>(
|
||||||
registry: &Registry,
|
registry: &Registry,
|
||||||
metric_name: &'static str,
|
metric_name: &'static str,
|
||||||
op: &'static str,
|
attributes: [(&'static str, &'static str); N],
|
||||||
) -> HistogramObservation<u64> {
|
) -> HistogramObservation<u64> {
|
||||||
registry
|
registry
|
||||||
.get_instrument::<Metric<U64Histogram>>(metric_name)
|
.get_instrument::<Metric<U64Histogram>>(metric_name)
|
||||||
.expect("instrument not found")
|
.expect("instrument not found")
|
||||||
.get_observer(&Attributes::from(&[("op", op)]))
|
.get_observer(&Attributes::from(&attributes))
|
||||||
.expect("observer not found")
|
.expect("observer not found")
|
||||||
.fetch()
|
.fetch()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn hist_count(registry: &Registry, metric_name: &'static str, op: &'static str) -> u64 {
|
fn hist_count<const N: usize>(
|
||||||
hist(registry, metric_name, op).sample_count()
|
registry: &Registry,
|
||||||
|
metric_name: &'static str,
|
||||||
|
attributes: [(&'static str, &'static str); N],
|
||||||
|
) -> u64 {
|
||||||
|
hist(registry, metric_name, attributes).sample_count()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn hist_total(registry: &Registry, metric_name: &'static str, op: &'static str) -> u64 {
|
fn hist_total<const N: usize>(
|
||||||
hist(registry, metric_name, op).total
|
registry: &Registry,
|
||||||
|
metric_name: &'static str,
|
||||||
|
attributes: [(&'static str, &'static str); N],
|
||||||
|
) -> u64 {
|
||||||
|
hist(registry, metric_name, attributes).total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,7 +87,7 @@ impl CompactionLevel {
|
||||||
match self {
|
match self {
|
||||||
Self::Initial => Self::FileNonOverlapped,
|
Self::Initial => Self::FileNonOverlapped,
|
||||||
Self::FileNonOverlapped => Self::Final,
|
Self::FileNonOverlapped => Self::Final,
|
||||||
_ => Self::Final,
|
Self::Final => Self::Final,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,21 @@ impl CompactionLevel {
|
||||||
match self {
|
match self {
|
||||||
Self::Initial => Self::Initial,
|
Self::Initial => Self::Initial,
|
||||||
Self::FileNonOverlapped => Self::Initial,
|
Self::FileNonOverlapped => Self::Initial,
|
||||||
_ => Self::FileNonOverlapped,
|
Self::Final => Self::FileNonOverlapped,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns all levels
|
||||||
|
pub fn all() -> &'static [Self] {
|
||||||
|
&[Self::Initial, Self::FileNonOverlapped, Self::Final]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Static name
|
||||||
|
pub fn name(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::Initial => "L0",
|
||||||
|
Self::FileNonOverlapped => "L1",
|
||||||
|
Self::Final => "L2",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,12 +31,12 @@ impl SequenceNumberSet {
|
||||||
self.0.andnot_inplace(&other.0)
|
self.0.andnot_inplace(&other.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serialise `self` into a set of bytes.
|
/// Serialise `self` into an array of bytes.
|
||||||
///
|
///
|
||||||
/// [This document][spec] describes the serialised format.
|
/// [This document][spec] describes the serialised format.
|
||||||
///
|
///
|
||||||
/// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/
|
/// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/
|
||||||
pub fn as_bytes(&self) -> Vec<u8> {
|
pub fn to_bytes(&self) -> Vec<u8> {
|
||||||
self.0.serialize()
|
self.0.serialize()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ use data_types::{
|
||||||
};
|
};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
/// Build up [`ParquetFile`]s for testing
|
/// Build up [`ParquetFile`]s for testing
|
||||||
pub struct ParquetFileBuilder {
|
pub struct ParquetFileBuilder {
|
||||||
file: ParquetFile,
|
file: ParquetFile,
|
||||||
|
@ -93,6 +93,12 @@ impl ParquetFileBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<ParquetFile> for ParquetFileBuilder {
|
||||||
|
fn from(file: ParquetFile) -> Self {
|
||||||
|
Self { file }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Build [`Table`]s for testing
|
/// Build [`Table`]s for testing
|
||||||
pub struct TableBuilder {
|
pub struct TableBuilder {
|
||||||
|
|
|
@ -134,6 +134,138 @@ async fn test_namespace_create() {
|
||||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ensure invoking the gRPC NamespaceService to delete a namespace propagates
|
||||||
|
/// the catalog and denies writes after the cache has converged / router
|
||||||
|
/// restarted.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_namespace_delete() {
|
||||||
|
// Initialise a TestContext requiring explicit namespace creation.
|
||||||
|
let ctx = TestContext::new(true, None).await;
|
||||||
|
|
||||||
|
const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _;
|
||||||
|
|
||||||
|
// Explicitly create the namespace.
|
||||||
|
let req = CreateNamespaceRequest {
|
||||||
|
name: "bananas_test".to_string(),
|
||||||
|
retention_period_ns: Some(RETENTION),
|
||||||
|
};
|
||||||
|
let got = ctx
|
||||||
|
.grpc_delegate()
|
||||||
|
.namespace_service()
|
||||||
|
.create_namespace(Request::new(req))
|
||||||
|
.await
|
||||||
|
.expect("failed to create namespace")
|
||||||
|
.into_inner()
|
||||||
|
.namespace
|
||||||
|
.expect("no namespace in response");
|
||||||
|
|
||||||
|
assert_eq!(got.name, "bananas_test");
|
||||||
|
assert_eq!(got.id, 1);
|
||||||
|
assert_eq!(got.retention_period_ns, Some(RETENTION));
|
||||||
|
|
||||||
|
// The namespace is usable.
|
||||||
|
let now = SystemProvider::default()
|
||||||
|
.now()
|
||||||
|
.timestamp_nanos()
|
||||||
|
.to_string();
|
||||||
|
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
|
||||||
|
let response = ctx
|
||||||
|
.write_lp("bananas", "test", &lp)
|
||||||
|
.await
|
||||||
|
.expect("write failed");
|
||||||
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||||
|
|
||||||
|
// The RPC endpoint must return a namespace.
|
||||||
|
{
|
||||||
|
let current = ctx
|
||||||
|
.grpc_delegate()
|
||||||
|
.namespace_service()
|
||||||
|
.get_namespaces(Request::new(Default::default()))
|
||||||
|
.await
|
||||||
|
.expect("must return namespaces")
|
||||||
|
.into_inner();
|
||||||
|
assert!(!current.namespaces.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the namespace
|
||||||
|
{
|
||||||
|
let _resp = ctx
|
||||||
|
.grpc_delegate()
|
||||||
|
.namespace_service()
|
||||||
|
.delete_namespace(Request::new(DeleteNamespaceRequest {
|
||||||
|
name: "bananas_test".to_string(),
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("must delete");
|
||||||
|
}
|
||||||
|
|
||||||
|
// The RPC endpoint must not return the namespace.
|
||||||
|
{
|
||||||
|
let current = ctx
|
||||||
|
.grpc_delegate()
|
||||||
|
.namespace_service()
|
||||||
|
.get_namespaces(Request::new(Default::default()))
|
||||||
|
.await
|
||||||
|
.expect("must return namespaces")
|
||||||
|
.into_inner();
|
||||||
|
assert!(current.namespaces.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
// The catalog should contain the namespace, but "soft-deleted".
|
||||||
|
{
|
||||||
|
let db_list = ctx
|
||||||
|
.catalog()
|
||||||
|
.repositories()
|
||||||
|
.await
|
||||||
|
.namespaces()
|
||||||
|
.list(SoftDeletedRows::ExcludeDeleted)
|
||||||
|
.await
|
||||||
|
.expect("query failure");
|
||||||
|
assert!(db_list.is_empty());
|
||||||
|
|
||||||
|
let db_list = ctx
|
||||||
|
.catalog()
|
||||||
|
.repositories()
|
||||||
|
.await
|
||||||
|
.namespaces()
|
||||||
|
.list(SoftDeletedRows::OnlyDeleted)
|
||||||
|
.await
|
||||||
|
.expect("query failure");
|
||||||
|
assert_matches!(db_list.as_slice(), [ns] => {
|
||||||
|
assert_eq!(ns.id.get(), got.id);
|
||||||
|
assert_eq!(ns.name, got.name);
|
||||||
|
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
|
||||||
|
assert!(ns.deleted_at.is_some());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// The cached entry is not affected, and writes continue to be validated
|
||||||
|
// against cached entry.
|
||||||
|
//
|
||||||
|
// https://github.com/influxdata/influxdb_iox/issues/6175
|
||||||
|
|
||||||
|
let response = ctx
|
||||||
|
.write_lp("bananas", "test", &lp)
|
||||||
|
.await
|
||||||
|
.expect("write failed");
|
||||||
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||||
|
|
||||||
|
// The router restarts, and writes are no longer accepted for the
|
||||||
|
// soft-deleted bucket.
|
||||||
|
let ctx = ctx.restart();
|
||||||
|
|
||||||
|
let err = ctx
|
||||||
|
.write_lp("bananas", "test", lp)
|
||||||
|
.await
|
||||||
|
.expect_err("write should fail");
|
||||||
|
assert_matches!(
|
||||||
|
err,
|
||||||
|
router::server::http::Error::NamespaceResolver(router::namespace_resolver::Error::Lookup(
|
||||||
|
iox_catalog::interface::Error::NamespaceNotFoundByName { .. }
|
||||||
|
))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/// Ensure creating a namespace with a retention period of 0 maps to "infinite"
|
/// Ensure creating a namespace with a retention period of 0 maps to "infinite"
|
||||||
/// and not "none".
|
/// and not "none".
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -97,12 +97,24 @@ impl namespace_service_server::NamespaceService for NamespaceService {
|
||||||
|
|
||||||
async fn delete_namespace(
|
async fn delete_namespace(
|
||||||
&self,
|
&self,
|
||||||
_request: Request<DeleteNamespaceRequest>,
|
request: Request<DeleteNamespaceRequest>,
|
||||||
) -> Result<Response<DeleteNamespaceResponse>, Status> {
|
) -> Result<Response<DeleteNamespaceResponse>, Status> {
|
||||||
warn!("call to namespace delete - unimplemented");
|
let namespace_name = request.into_inner().name;
|
||||||
Err(Status::unimplemented(
|
|
||||||
"namespace delete is not yet supported",
|
self.catalog
|
||||||
))
|
.repositories()
|
||||||
|
.await
|
||||||
|
.namespaces()
|
||||||
|
.soft_delete(&namespace_name)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!(error=%e, %namespace_name, "failed to soft-delete namespace");
|
||||||
|
Status::internal(e.to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
info!(namespace_name, "soft-deleted namespace");
|
||||||
|
|
||||||
|
Ok(Response::new(Default::default()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_namespace_retention(
|
async fn update_namespace_retention(
|
||||||
|
@ -179,21 +191,138 @@ fn map_retention_period(v: Option<i64>) -> Result<Option<i64>, Status> {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use assert_matches::assert_matches;
|
||||||
|
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService as _;
|
||||||
|
use iox_catalog::mem::MemCatalog;
|
||||||
use tonic::Code;
|
use tonic::Code;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _;
|
||||||
|
const NS_NAME: &str = "bananas";
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_retention_mapping() {
|
fn test_retention_mapping() {
|
||||||
assert_matches::assert_matches!(map_retention_period(None), Ok(None));
|
assert_matches!(map_retention_period(None), Ok(None));
|
||||||
assert_matches::assert_matches!(map_retention_period(Some(0)), Ok(None));
|
assert_matches!(map_retention_period(Some(0)), Ok(None));
|
||||||
assert_matches::assert_matches!(map_retention_period(Some(1)), Ok(Some(1)));
|
assert_matches!(map_retention_period(Some(1)), Ok(Some(1)));
|
||||||
assert_matches::assert_matches!(map_retention_period(Some(42)), Ok(Some(42)));
|
assert_matches!(map_retention_period(Some(42)), Ok(Some(42)));
|
||||||
assert_matches::assert_matches!(map_retention_period(Some(-1)), Err(e) => {
|
assert_matches!(map_retention_period(Some(-1)), Err(e) => {
|
||||||
assert_eq!(e.code(), Code::InvalidArgument)
|
assert_eq!(e.code(), Code::InvalidArgument)
|
||||||
});
|
});
|
||||||
assert_matches::assert_matches!(map_retention_period(Some(-42)), Err(e) => {
|
assert_matches!(map_retention_period(Some(-42)), Err(e) => {
|
||||||
assert_eq!(e.code(), Code::InvalidArgument)
|
assert_eq!(e.code(), Code::InvalidArgument)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_crud() {
|
||||||
|
let catalog: Arc<dyn Catalog> =
|
||||||
|
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
|
||||||
|
|
||||||
|
let topic = catalog
|
||||||
|
.repositories()
|
||||||
|
.await
|
||||||
|
.topics()
|
||||||
|
.create_or_get("kafka-topic")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let query_pool = catalog
|
||||||
|
.repositories()
|
||||||
|
.await
|
||||||
|
.query_pools()
|
||||||
|
.create_or_get("query-pool")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id));
|
||||||
|
|
||||||
|
// There should be no namespaces to start with.
|
||||||
|
{
|
||||||
|
let current = handler
|
||||||
|
.get_namespaces(Request::new(Default::default()))
|
||||||
|
.await
|
||||||
|
.expect("must return namespaces")
|
||||||
|
.into_inner()
|
||||||
|
.namespaces;
|
||||||
|
assert!(current.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
let req = CreateNamespaceRequest {
|
||||||
|
name: NS_NAME.to_string(),
|
||||||
|
retention_period_ns: Some(RETENTION),
|
||||||
|
};
|
||||||
|
let created_ns = handler
|
||||||
|
.create_namespace(Request::new(req))
|
||||||
|
.await
|
||||||
|
.expect("failed to create namespace")
|
||||||
|
.into_inner()
|
||||||
|
.namespace
|
||||||
|
.expect("no namespace in response");
|
||||||
|
assert_eq!(created_ns.name, NS_NAME);
|
||||||
|
assert_eq!(created_ns.retention_period_ns, Some(RETENTION));
|
||||||
|
|
||||||
|
// There should now be one namespace
|
||||||
|
{
|
||||||
|
let current = handler
|
||||||
|
.get_namespaces(Request::new(Default::default()))
|
||||||
|
.await
|
||||||
|
.expect("must return namespaces")
|
||||||
|
.into_inner()
|
||||||
|
.namespaces;
|
||||||
|
assert_matches!(current.as_slice(), [ns] => {
|
||||||
|
assert_eq!(ns, &created_ns);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the retention period
|
||||||
|
let updated_ns = handler
|
||||||
|
.update_namespace_retention(Request::new(UpdateNamespaceRetentionRequest {
|
||||||
|
name: NS_NAME.to_string(),
|
||||||
|
retention_period_ns: Some(0), // A zero!
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("failed to update namespace")
|
||||||
|
.into_inner()
|
||||||
|
.namespace
|
||||||
|
.expect("no namespace in response");
|
||||||
|
assert_eq!(updated_ns.name, created_ns.name);
|
||||||
|
assert_eq!(updated_ns.id, created_ns.id);
|
||||||
|
assert_eq!(created_ns.retention_period_ns, Some(RETENTION));
|
||||||
|
assert_eq!(updated_ns.retention_period_ns, None);
|
||||||
|
|
||||||
|
// Listing the namespaces should return the updated namespace
|
||||||
|
{
|
||||||
|
let current = handler
|
||||||
|
.get_namespaces(Request::new(Default::default()))
|
||||||
|
.await
|
||||||
|
.expect("must return namespaces")
|
||||||
|
.into_inner()
|
||||||
|
.namespaces;
|
||||||
|
assert_matches!(current.as_slice(), [ns] => {
|
||||||
|
assert_eq!(ns, &updated_ns);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deleting the namespace should cause it to disappear
|
||||||
|
handler
|
||||||
|
.delete_namespace(Request::new(DeleteNamespaceRequest {
|
||||||
|
name: NS_NAME.to_string(),
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("must delete");
|
||||||
|
|
||||||
|
// Listing the namespaces should now return nothing.
|
||||||
|
{
|
||||||
|
let current = handler
|
||||||
|
.get_namespaces(Request::new(Default::default()))
|
||||||
|
.await
|
||||||
|
.expect("must return namespaces")
|
||||||
|
.into_inner()
|
||||||
|
.namespaces;
|
||||||
|
assert_matches!(current.as_slice(), []);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue