Merge branch 'main' into ntran/c2-skip

pull/24376/head
Nga Tran 2023-02-07 09:38:23 -05:00 committed by GitHub
commit 6297ad206f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 885 additions and 96 deletions

View File

@ -58,10 +58,8 @@ impl Compactor2 {
_ = async {
compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await;
// the main entry point does not allow servers to shut down themselves, so we just wait forever
info!("comapctor done");
futures::future::pending::<()>().await;
} => unreachable!(),
} => {}
}
});
let worker = shared_handle(worker);

View File

@ -13,7 +13,7 @@ mod tests {
},
config::AlgoVersion,
driver::compact,
test_util::{list_object_store, AssertFutureExt, TestSetup},
test_util::{list_object_store, TestSetup},
};
#[tokio::test]
@ -27,10 +27,7 @@ mod tests {
assert!(files.is_empty());
// compact
// This wil wait for files forever.
let fut = run_compact(&setup);
tokio::pin!(fut);
fut.assert_pending().await;
run_compact(&setup).await;
// verify catalog is still empty
let files = setup.list_by_table_not_to_delete().await;

View File

@ -111,13 +111,13 @@ where
registry,
METRIC_NAME_JOB_FILES,
"Number of files committed by the compactor, per job",
HistogramType::Bytes,
HistogramType::Files,
),
job_bytes: Histogram::new(
registry,
METRIC_NAME_JOB_BYTES,
"Number of bytes committed by the compactor, per job",
HistogramType::Files,
HistogramType::Bytes,
),
job_rows: Histogram::new(
registry,

View File

@ -36,7 +36,8 @@ impl FilesSplit for TargetLevelTargetLevelSplit {
mod tests {
use crate::test_util::{
create_l0_files, create_l1_files, create_l2_files, create_overlapped_files,
assert_parquet_files, assert_parquet_files_split, create_l0_files, create_l1_files,
create_l2_files, create_overlapped_files,
};
use super::*;
@ -62,7 +63,13 @@ mod tests {
#[test]
fn test_apply_partial_empty_files_l0() {
let files = create_l0_files(1);
assert_eq!(files.len(), 3);
let expected = vec![
"L0 ",
"L0.2[650,750] |-----L0.2------| ",
"L0.1[450,620] |------------L0.1------------| ",
"L0.3[800,900] |-----L0.3------| ",
];
assert_parquet_files(expected, &files);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
@ -81,13 +88,19 @@ mod tests {
#[test]
fn test_apply_partial_empty_files_l1() {
let files = create_l1_files(1);
assert_eq!(files.len(), 3);
let expected = vec![
"L1 ",
"L1.13[600,700] |-----L1.13-----| ",
"L1.12[400,500] |-----L1.12-----| ",
"L1.11[250,350] |-----L1.11-----| ",
];
assert_parquet_files(expected, &files);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
assert_eq!(lower.len(), 0);
assert_eq!(higher.len(), 3);
//
let (lower, higher) = split.apply(files.clone(), CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 0);
@ -100,7 +113,12 @@ mod tests {
#[test]
fn test_apply_partial_empty_files_l2() {
let files = create_l2_files();
assert_eq!(files.len(), 2);
let expected = vec![
"L2 ",
"L2.21[0,100] |---------L2.21----------| ",
"L2.22[200,300] |---------L2.22----------| ",
];
assert_parquet_files(expected, &files);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
@ -120,10 +138,41 @@ mod tests {
fn test_apply_target_level_0() {
// Test target level Initial
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let expected = vec![
"L0 ",
"L0.2[650,750]@1 |-L0.2-| ",
"L0.1[450,620]@1 |----L0.1-----| ",
"L0.3[800,900]@100 |-L0.3-| ",
"L1 ",
"L1.13[600,700]@100 |L1.13-| ",
"L1.12[400,500]@1 |L1.12-| ",
"L1.11[250,350]@1 |L1.11-| ",
"L2 ",
"L2.21[0,100]@1 |L2.21-| ",
"L2.22[200,300]@1 |L2.22-| ",
];
assert_parquet_files(expected, &files);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::Initial);
let expected = vec![
"left",
"L0 ",
"L0.2[650,750]@1 |-----L0.2------| ",
"L0.1[450,620]@1 |------------L0.1------------| ",
"L0.3[800,900]@100 |-----L0.3------| ",
"right",
"L1 ",
"L1.13[600,700]@100 |--L1.13--| ",
"L1.12[400,500]@1 |--L1.12--| ",
"L1.11[250,350]@1 |--L1.11--| ",
"L2 ",
"L2.21[0,100]@1 |--L2.21--| ",
"L2.22[200,300]@1 |--L2.22--| ",
];
assert_parquet_files_split(expected, &lower, &higher);
// verify number of files
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 5);
@ -141,10 +190,41 @@ mod tests {
fn test_apply_target_level_l1() {
// Test target level is FileNonOverlapped
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let expected = vec![
"L0 ",
"L0.2[650,750]@1 |-L0.2-| ",
"L0.1[450,620]@1 |----L0.1-----| ",
"L0.3[800,900]@100 |-L0.3-| ",
"L1 ",
"L1.13[600,700]@100 |L1.13-| ",
"L1.12[400,500]@1 |L1.12-| ",
"L1.11[250,350]@1 |L1.11-| ",
"L2 ",
"L2.21[0,100]@1 |L2.21-| ",
"L2.22[200,300]@1 |L2.22-| ",
];
assert_parquet_files(expected, &files);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::FileNonOverlapped);
let expected = vec![
"left",
"L0 ",
"L0.2[650,750]@1 |---L0.2---| ",
"L0.1[450,620]@1 |-------L0.1-------| ",
"L0.3[800,900]@100 |---L0.3---| ",
"L1 ",
"L1.13[600,700]@100 |--L1.13---| ",
"L1.12[400,500]@1 |--L1.12---| ",
"L1.11[250,350]@1 |--L1.11---| ",
"right",
"L2 ",
"L2.21[0,100] |---------L2.21----------| ",
"L2.22[200,300] |---------L2.22----------| ",
];
assert_parquet_files_split(expected, &lower, &higher);
// verify number of files
assert_eq!(lower.len(), 6);
assert_eq!(higher.len(), 2);
@ -162,11 +242,25 @@ mod tests {
fn test_apply_taget_level_l2() {
// Test target level is Final
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let expected = vec![
"L0 ",
"L0.2[650,750]@1 |-L0.2-| ",
"L0.1[450,620]@1 |----L0.1-----| ",
"L0.3[800,900]@100 |-L0.3-| ",
"L1 ",
"L1.13[600,700]@100 |L1.13-| ",
"L1.12[400,500]@1 |L1.12-| ",
"L1.11[250,350]@1 |L1.11-| ",
"L2 ",
"L2.21[0,100]@1 |L2.21-| ",
"L2.22[200,300]@1 |L2.22-| ",
];
assert_parquet_files(expected, &files);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::Final);
// verify number of files
// verify number of files (nothing in higher)
assert_eq!(lower.len(), 8);
assert_eq!(higher.len(), 0);
// verify compaction level of files

View File

@ -142,26 +142,6 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.catalog),
))
};
let partition_done_sink =
LoggingPartitionDoneSinkWrapper::new(MetricsPartitionDoneSinkWrapper::new(
ErrorKindPartitionDoneSinkWrapper::new(
partition_done_sink,
ErrorKind::variants()
.iter()
.filter(|kind| {
// use explicit match statement so we never forget to add new variants
match kind {
ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => {
true
}
ErrorKind::ObjectStore => false,
}
})
.copied()
.collect(),
),
&config.metric_registry,
));
let commit: Arc<dyn Commit> = if config.shadow_mode {
Arc::new(MockCommit::new())
@ -191,14 +171,21 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
// Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work even when there
// is not data.
let partitions_source = NotEmptyPartitionsSourceWrapper::new(
let partitions_source =
LoggingPartitionsSourceWrapper::new(MetricsPartitionsSourceWrapper::new(
RandomizeOrderPartitionsSourcesWrapper::new(partitions_source, 1234),
&config.metric_registry,
)),
Duration::from_secs(5),
Arc::clone(&config.time_provider),
);
));
let partitions_source: Arc<dyn PartitionsSource> = if config.process_once {
// do not wrap into the "not empty" filter because we do NOT wanna throttle in this case but just exit early
Arc::new(partitions_source)
} else {
Arc::new(NotEmptyPartitionsSourceWrapper::new(
partitions_source,
Duration::from_secs(5),
Arc::clone(&config.time_provider),
))
};
let partition_stream: Arc<dyn PartitionStream> = if config.process_once {
Arc::new(OncePartititionStream::new(partitions_source))
@ -228,7 +215,27 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
&config.metric_registry,
),
)),
partition_done_sink: Arc::new(partition_done_sink),
partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new(
MetricsPartitionDoneSinkWrapper::new(
ErrorKindPartitionDoneSinkWrapper::new(
partition_done_sink,
ErrorKind::variants()
.iter()
.filter(|kind| {
// use explicit match statement so we never forget to add new variants
match kind {
ErrorKind::OutOfMemory
| ErrorKind::Timeout
| ErrorKind::Unknown => true,
ErrorKind::ObjectStore => false,
}
})
.copied()
.collect(),
),
&config.metric_registry,
),
)),
commit: Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
commit,
&config.metric_registry,

View File

@ -1,3 +1,6 @@
mod display;
pub(crate) use display::{assert_parquet_files, assert_parquet_files_split};
use std::{
collections::{BTreeMap, HashSet},
future::Future,
@ -220,6 +223,7 @@ impl NamespaceBuilder {
query_pool_id,
tables,
max_columns_per_table: 10,
max_tables: 42,
retention_period_ns: None,
},
},

View File

@ -0,0 +1,387 @@
use std::collections::BTreeMap;
use data_types::{CompactionLevel, ParquetFile};
/// Compares the a vec of strs with the output of a set of parquet
/// files. See docs on [`ParquetFileFormatter`] for example
/// expected output.
///
/// Designed so that failure output can be directly copy/pasted
/// into the test code as expected results.
///
/// Expects to be called about like this:
/// assert_parquet_files!(expected_lines: &[&str], &files)
#[track_caller]
pub fn assert_parquet_files<'a>(
expected_lines: impl IntoIterator<Item = &'a str>,
files: &[ParquetFile],
) {
let expected_lines: Vec<String> = expected_lines.into_iter().map(|s| s.to_string()).collect();
let actual_lines = readable_list_of_files(None, files);
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n",
);
}
/// Compares the a vec of strs with the output of a set of parquet
/// files. This is used to compare the results of splitting files into
/// two groups. See docs on [`ParquetFileFormatter`] for example
/// expected output.
///
/// Designed so that failure output can be directly copy/pasted
/// into the test code as expected results.
///
/// Expects to be called about like this:
/// assert_parquet_files_split!(expected_lines: &[&str], &files1, &files2)
#[track_caller]
pub fn assert_parquet_files_split<'a>(
expected_lines: impl IntoIterator<Item = &'a str>,
files1: &[ParquetFile],
files2: &[ParquetFile],
) {
let expected_lines: Vec<String> = expected_lines.into_iter().map(|s| s.to_string()).collect();
let actual_lines_one = readable_list_of_files(Some("left".into()), files1);
let actual_lines_two = readable_list_of_files(Some("right".into()), files2);
let actual_lines: Vec<_> = actual_lines_one
.into_iter()
.chain(actual_lines_two.into_iter())
.collect();
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n",
);
}
/// default width for printing
const DEFAULT_WIDTH: usize = 80;
/// default width for header
const DEFAULT_HEADING_WIDTH: usize = 20;
/// This function returns a visual representation of the list of
/// parquet files arranged so they are lined up horizontally based on
/// their relative time range.
///
/// See docs on [`ParquetFileFormatter`]
/// for examples.
pub fn readable_list_of_files<'a>(
title: Option<String>,
files: impl IntoIterator<Item = &'a ParquetFile>,
) -> Vec<String> {
let mut output = vec![];
if let Some(title) = title {
output.push(title);
}
let files: Vec<_> = files.into_iter().collect();
if files.is_empty() {
return output;
}
let formatter = ParquetFileFormatter::new(&files);
// split up the files into groups by levels (compaction levels)
let mut files_by_level = BTreeMap::new();
for file in &files {
let existing_files = files_by_level
.entry(file.compaction_level)
.or_insert_with(Vec::new);
existing_files.push(file);
}
for (level, files) in files_by_level {
output.push(formatter.format_level(&level));
for file in files {
output.push(formatter.format_file(file))
}
}
output
}
/// Formats a parquet files as a single line of text, with widths
/// normalized based on their min/max times and lined up horizontally
/// based on their relative time range.
///
/// Each file has this format:
///
/// ```text
/// L<levelno>.<id>[min_time,max_time]@file_size_bytes
/// ```
///
/// Example
///
/// ```text
/// L0
/// L0.1[100,200]@1 |----------L0.1----------|
/// L0.2[300,400]@1 |----------L0.2----------|
/// L0.11[150,350]@44 |-----------------------L0.11-----------------------|
/// ```
#[derive(Debug, Default)]
struct ParquetFileFormatter {
/// should the size of the files be shown (if they are different)
show_size: bool,
/// width in characater
row_heading_chars: usize,
/// width, in characters, of the entire min/max timerange
width_chars: usize,
/// how many ns are given a single character's width
ns_per_char: f64,
/// what is the lowest time range in any file
min_time: i64,
/// what is the largest time in any file?
max_time: i64,
}
#[derive(Debug)]
/// helper to track if there are multiple file sizes in a set of parquet files
enum FileSizeSeen {
None,
One(i64),
Many,
}
impl FileSizeSeen {
fn observe(self, file_size_bytes: i64) -> Self {
match self {
Self::None => Self::One(file_size_bytes),
// same file size?
Self::One(sz) if sz == file_size_bytes => Self::One(sz),
// different file size or already seen difference
Self::One(_) | Self::Many => Self::Many,
}
}
}
impl ParquetFileFormatter {
/// calculates display parameters for formatting a set of files
fn new(files: &[&ParquetFile]) -> Self {
let row_heading_chars = DEFAULT_HEADING_WIDTH;
let width_chars = DEFAULT_WIDTH;
let min_time = files
.iter()
.map(|f| f.min_time.get())
.min()
.expect("at least one file");
let max_time = files
.iter()
.map(|f| f.max_time.get())
.max()
.expect("at least one file");
let file_size_seen = files
.iter()
.fold(FileSizeSeen::None, |file_size_seen, file| {
file_size_seen.observe(file.file_size_bytes)
});
// show the size if there are multiple sizes
let show_size = matches!(file_size_seen, FileSizeSeen::Many);
let time_range = max_time - min_time;
let ns_per_char = (time_range as f64) / (width_chars as f64);
Self {
show_size,
width_chars,
ns_per_char,
min_time,
max_time,
row_heading_chars,
}
}
/// return how many characters of `self.width_chars` would be consumed by `range` ns
fn time_range_to_chars(&self, time_range: i64) -> usize {
// avoid divide by zero
if self.ns_per_char > 0.0 {
(time_range as f64 / self.ns_per_char) as usize
} else if time_range > 0 {
self.width_chars
} else {
0
}
}
fn format_level(&self, level: &CompactionLevel) -> String {
format!(
"{:width$}",
display_level(level),
width = self.width_chars + self.row_heading_chars
)
}
/// Formats a single parquet file into a string of `width_chars`
/// characters, which tries to visually depict the timge range of
/// the file using the width. See docs on [`ParquetFileFormatter`]
/// for examples.
fn format_file(&self, file: &ParquetFile) -> String {
// use try_into to force conversion to usize
let time_width = (file.max_time - file.min_time).get();
// special case "zero" width times
let field_width = if self.min_time == self.max_time {
self.width_chars
} else {
self.time_range_to_chars(time_width)
}
// account for starting and ending '|'
.saturating_sub(2);
// Get compact display of the file, like 'L0.1'
// add |--- ---| formatting (based on field width)
let file_string = format!("|{:-^width$}|", display_file_id(file), width = field_width);
let row_heading = display_format(file, self.show_size);
// special case "zero" width times
if self.min_time == self.max_time {
return format!(
"{row_heading:width1$}{file_string:^width2$}",
width1 = self.row_heading_chars,
width2 = self.width_chars,
);
}
// otherwise, figure out whitespace padding at start and back
// based on the relative start time of the file
// assume time from 0
let prefix_time_range = file.min_time.get().saturating_sub(self.min_time);
let prefix_padding = " ".repeat(self.time_range_to_chars(prefix_time_range));
// pad the rest with whitespace
let postfix_padding_len = self
.width_chars
.saturating_sub(file_string.len())
.saturating_sub(prefix_padding.len());
let postfix_padding = " ".repeat(postfix_padding_len);
format!(
"{row_heading:width$}{prefix_padding}{file_string}{postfix_padding}",
width = self.row_heading_chars
)
}
}
fn display_level(compaction_level: &CompactionLevel) -> &'static str {
match compaction_level {
CompactionLevel::Initial => "L0",
CompactionLevel::FileNonOverlapped => "L1",
CompactionLevel::Final => "L2",
}
}
/// Display like 'L0.1' with file level and id
fn display_file_id(file: &ParquetFile) -> String {
let level = display_level(&file.compaction_level);
let id = file.id;
format!("{level}.{id}")
}
/// Compact display of level, id min/max time and optional size.
///
/// Example
///
/// ```text
/// L0.1[100,200]@1
/// ```
fn display_format(file: &ParquetFile, show_size: bool) -> String {
let file_id = display_file_id(file);
let min_time = file.min_time.get(); // display as i64
let max_time = file.max_time.get(); // display as i64
let sz = file.file_size_bytes;
if show_size {
format!("{file_id}[{min_time},{max_time}]@{sz}")
} else {
format!("{file_id}[{min_time},{max_time}]")
}
}
#[cfg(test)]
mod test {
use crate::test_util::ParquetFileBuilder;
use super::*;
#[test]
fn display_builder() {
let files = vec![
ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build(),
ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Initial)
.build(),
];
let expected = vec![
"L0 ",
"L0.1[0,0] |-------------------------------------L0.1-------------------------------------|",
"L0.2[0,0] |-------------------------------------L0.2-------------------------------------|",
];
assert_parquet_files(expected, &files);
}
#[test]
fn display_builder_multi_levels_with_size() {
let files = vec![
ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build(),
ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Initial)
.build(),
ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::Final)
.with_file_size_bytes(42)
.build(),
];
let expected = vec![
"L0 ",
"L0.1[0,0]@1 |-------------------------------------L0.1-------------------------------------|",
"L0.2[0,0]@1 |-------------------------------------L0.2-------------------------------------|",
"L2 ",
"L2.3[0,0]@42 |-------------------------------------L2.3-------------------------------------|",
];
assert_parquet_files(expected, &files);
}
#[test]
fn display_builder_size_time_ranges() {
let files = vec![
ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.with_time_range(100, 200)
.build(),
ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Initial)
.with_time_range(300, 400)
.build(),
// overlapping file
ParquetFileBuilder::new(11)
.with_compaction_level(CompactionLevel::Initial)
.with_time_range(150, 350)
.with_file_size_bytes(44)
.build(),
];
let expected = vec![
"L0 ",
"L0.1[100,200]@1 |----------L0.1----------| ",
"L0.2[300,400]@1 |----------L0.2----------| ",
"L0.11[150,350]@44 |-----------------------L0.11-----------------------| ",
];
assert_parquet_files(expected, &files);
}
}

View File

@ -499,6 +499,8 @@ pub struct NamespaceSchema {
pub tables: BTreeMap<String, TableSchema>,
/// the number of columns per table this namespace allows
pub max_columns_per_table: usize,
/// The maximum number of tables permitted in this namespace.
pub max_tables: usize,
/// The retention period in ns.
/// None represents infinite duration (i.e. never drop data).
pub retention_period_ns: Option<i64>,
@ -511,6 +513,7 @@ impl NamespaceSchema {
topic_id: TopicId,
query_pool_id: QueryPoolId,
max_columns_per_table: i32,
max_tables: i32,
retention_period_ns: Option<i64>,
) -> Self {
Self {
@ -519,6 +522,7 @@ impl NamespaceSchema {
topic_id,
query_pool_id,
max_columns_per_table: max_columns_per_table as usize,
max_tables: max_tables as usize,
retention_period_ns,
}
}
@ -3501,6 +3505,7 @@ mod tests {
query_pool_id: QueryPoolId::new(3),
tables: BTreeMap::from([]),
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
};
let schema2 = NamespaceSchema {
@ -3509,6 +3514,7 @@ mod tests {
query_pool_id: QueryPoolId::new(3),
tables: BTreeMap::from([(String::from("foo"), TableSchema::new(TableId::new(1)))]),
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
};
assert!(schema1.size() < schema2.size());

View File

@ -112,6 +112,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
}));
let time_provider = Arc::new(SystemProvider::new());
let process_once = config.compactor_config.process_once;
let server_type = create_compactor2_server_type(
&common_state,
Arc::clone(&metric_registry),
@ -127,5 +128,14 @@ pub async fn command(config: Config) -> Result<(), Error> {
info!("starting compactor");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metric_registry).await?)
let res = main::main(common_state, services, metric_registry).await;
match res {
Ok(()) => Ok(()),
// compactor2 is allowed to shut itself down
Err(main::Error::Wrapper {
source: _source @ ioxd_common::Error::LostServer,
}) if process_once => Ok(()),
Err(e) => Err(e.into()),
}
}

View File

@ -752,7 +752,8 @@ mod tests {
.await
.unwrap();
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100, None);
let schema =
NamespaceSchema::new(namespace.id, topic.id, query_pool.id, 100, 42, None);
let shard_index = ShardIndex::new(0);
let shard1 = repos

View File

@ -197,6 +197,7 @@ impl TestContext {
self.topic_id,
self.query_id,
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
retention_period_ns,
),
)

View File

@ -792,6 +792,7 @@ where
namespace.topic_id,
namespace.query_pool_id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
);
@ -929,6 +930,7 @@ pub async fn list_schemas(
v.topic_id,
v.query_pool_id,
v.max_columns_per_table,
v.max_tables,
v.retention_period_ns,
);
ns.tables = joined.remove(&v.id)?;
@ -5970,6 +5972,7 @@ pub(crate) mod test_helpers {
topic.id,
pool.id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
);

View File

@ -282,6 +282,7 @@ mod tests {
namespace.topic_id,
namespace.query_pool_id,
namespace.max_columns_per_table,
namespace.max_tables,
namespace.retention_period_ns,
);

View File

@ -356,6 +356,16 @@ impl TestNamespace {
.await
.unwrap();
}
/// Set the number of tables allowed in this namespace.
pub async fn update_table_limit(&self, new_max: i32) {
let mut repos = self.catalog.catalog.repositories().await;
repos
.namespaces()
.update_table_limit(&self.namespace.name, new_max)
.await
.unwrap();
}
}
/// A test shard with its namespace in the catalog

View File

@ -131,7 +131,7 @@ impl MutableBatch {
}
/// Returns an iterator over the columns in this batch in no particular order
pub fn columns(&self) -> impl Iterator<Item = (&String, &Column)> + '_ {
pub fn columns(&self) -> impl Iterator<Item = (&String, &Column)> + ExactSizeIterator + '_ {
self.column_names
.iter()
.map(move |(name, idx)| (name, &self.columns[*idx]))
@ -269,6 +269,7 @@ mod tests {
let batch = batches.get("cpu").unwrap();
assert_eq!(batch.size_data(), 128);
assert_eq!(batch.columns().len(), 5);
let batches = lines_to_batches(
"cpu,t1=hellomore,t2=world f1=1.1,f2=1i 1234\ncpu,t1=h,t2=w f1=2.2,f2=2i 1234",
@ -277,6 +278,7 @@ mod tests {
.unwrap();
let batch = batches.get("cpu").unwrap();
assert_eq!(batch.size_data(), 138);
assert_eq!(batch.columns().len(), 5);
}
#[test]
@ -289,5 +291,6 @@ mod tests {
let batch = batches.get("cpu").unwrap();
assert_eq!(batch.size_data(), 124);
assert_eq!(batch.columns().len(), 5);
}
}

View File

@ -207,14 +207,41 @@ where
}
};
validate_column_limits(&batches, &schema).map_err(|e| {
warn!(
%namespace,
%namespace_id,
error=%e,
"service protection limit reached"
);
self.service_limit_hit_columns.inc(1);
validate_schema_limits(&batches, &schema).map_err(|e| {
match &e {
CachedServiceProtectionLimit::Column {
table_name,
existing_column_count,
merged_column_count,
max_columns_per_table,
} => {
warn!(
%table_name,
%existing_column_count,
%merged_column_count,
%max_columns_per_table,
%namespace,
%namespace_id,
"service protection limit reached (columns)"
);
self.service_limit_hit_columns.inc(1);
}
CachedServiceProtectionLimit::Table {
existing_table_count,
merged_table_count,
table_count_limit,
} => {
warn!(
%existing_table_count,
%merged_table_count,
%table_count_limit,
%namespace,
%namespace_id,
"service protection limit reached (tables)"
);
self.service_limit_hit_tables.inc(1);
}
}
SchemaError::ServiceLimit(Box::new(e))
})?;
@ -328,32 +355,106 @@ where
}
}
/// An error returned by schema limit evaluation against a cached
/// [`NamespaceSchema`].
#[derive(Debug, Error)]
#[error(
"couldn't create columns in table `{table_name}`; table contains \
pub enum CachedServiceProtectionLimit {
/// The number of columns would exceed the table column limit cached in the
/// [`NamespaceSchema`].
#[error(
"couldn't create columns in table `{table_name}`; table contains \
{existing_column_count} existing columns, applying this write would result \
in {merged_column_count} columns, limit is {max_columns_per_table}"
)]
struct OverColumnLimit {
table_name: String,
// Number of columns already in the table.
existing_column_count: usize,
// Number of resultant columns after merging the write with existing columns.
merged_column_count: usize,
// The configured limit.
max_columns_per_table: usize,
)]
Column {
/// The table that exceeds the column limit.
table_name: String,
/// Number of columns already in the table.
existing_column_count: usize,
/// Number of resultant columns after merging the write with existing
/// columns.
merged_column_count: usize,
/// The configured limit.
max_columns_per_table: usize,
},
/// The number of table would exceed the table limit cached in the
/// [`NamespaceSchema`].
#[error(
"couldn't create new table; namespace contains {existing_table_count} \
existing tables, applying this write would result in \
{merged_table_count} tables, limit is {table_count_limit}"
)]
Table {
/// Number of tables already in the namespace.
existing_table_count: usize,
/// Number of resultant tables after merging the write with existing
/// tables.
merged_table_count: usize,
/// The configured limit.
table_count_limit: usize,
},
}
fn validate_column_limits(
/// Evaluate the number of columns/tables that would result if `batches` was
/// applied to `schema`, and ensure the column/table count does not exceed the
/// maximum permitted amount cached in the [`NamespaceSchema`].
fn validate_schema_limits(
batches: &HashMap<String, MutableBatch>,
schema: &NamespaceSchema,
) -> Result<(), OverColumnLimit> {
) -> Result<(), CachedServiceProtectionLimit> {
// Maintain a counter tracking the number of tables in `batches` that do not
// exist in `schema`.
//
// This number of tables would be newly created when accepting the write.
let mut new_tables = 0;
for (table_name, batch) in batches {
let mut existing_columns = schema
.tables
.get(table_name)
.map(|t| t.column_names())
.unwrap_or_default();
// Get the column set for this table from the schema.
let mut existing_columns = match schema.tables.get(table_name) {
Some(v) => v.column_names(),
None if batch.columns().len() > schema.max_columns_per_table => {
// The table does not exist, therefore all the columns in this
// write must be created - there's no need to perform a set
// union to discover the distinct column count.
return Err(CachedServiceProtectionLimit::Column {
table_name: table_name.into(),
merged_column_count: batch.columns().len(),
existing_column_count: 0,
max_columns_per_table: schema.max_columns_per_table,
});
}
None => {
// The table must be created.
new_tables += 1;
// At least one new table will be created, ensure this does not
// exceed the configured maximum.
//
// Enforcing the check here ensures table limits are validated
// only when new tables are being created - this ensures
// existing tables do not become unusable if the limit is
// lowered, or because multiple writes were concurrently
// submitted to multiple router instances, exceeding the schema
// limit by some degree (eventual enforcement).
let merged_table_count = schema.tables.len() + new_tables;
if merged_table_count > schema.max_tables {
return Err(CachedServiceProtectionLimit::Table {
existing_table_count: schema.tables.len(),
merged_table_count,
table_count_limit: schema.max_tables,
});
}
// Therefore all the columns in this write are new, and they are
// less than the maximum permitted number of columns.
continue;
}
};
// The union of existing columns and new columns in this write must be
// calculated to derive the total distinct column count for this table
// after this write applied.
let existing_column_count = existing_columns.len();
let merged_column_count = {
@ -361,13 +462,14 @@ fn validate_column_limits(
existing_columns.len()
};
// If the table is currently over the column limit but this write only includes existing
// columns and doesn't exceed the limit more, this is allowed.
// If the table is currently over the column limit but this write only
// includes existing columns and doesn't exceed the limit more, this is
// allowed.
let columns_were_added_in_this_batch = merged_column_count > existing_column_count;
let column_limit_exceeded = merged_column_count > schema.max_columns_per_table;
if columns_were_added_in_this_batch && column_limit_exceeded {
return Err(OverColumnLimit {
return Err(CachedServiceProtectionLimit::Column {
table_name: table_name.into(),
merged_column_count,
existing_column_count,
@ -393,7 +495,7 @@ mod tests {
static NAMESPACE: Lazy<NamespaceName<'static>> = Lazy::new(|| "bananas".try_into().unwrap());
#[tokio::test]
async fn validate_limits() {
async fn test_validate_column_limits() {
let (catalog, namespace) = test_setup().await;
namespace.update_column_limit(3).await;
@ -403,12 +505,12 @@ mod tests {
let schema = namespace.schema().await;
// Columns under the limit is ok
let batches = lp_to_writes("nonexistent val=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
assert!(validate_schema_limits(&batches, &schema).is_ok());
// Columns over the limit is an error
let batches = lp_to_writes("nonexistent,tag1=A,tag2=B val=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Column {
table_name: _,
existing_column_count: 0,
merged_column_count: 4,
@ -423,12 +525,12 @@ mod tests {
let schema = namespace.schema().await;
// Columns under the limit is ok
let batches = lp_to_writes("no_columns_in_schema val=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
assert!(validate_schema_limits(&batches, &schema).is_ok());
// Columns over the limit is an error
let batches = lp_to_writes("no_columns_in_schema,tag1=A,tag2=B val=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Column {
table_name: _,
existing_column_count: 0,
merged_column_count: 4,
@ -444,15 +546,15 @@ mod tests {
let schema = namespace.schema().await;
// Columns already existing is ok
let batches = lp_to_writes("i_got_columns i_got_music=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
assert!(validate_schema_limits(&batches, &schema).is_ok());
// Adding columns under the limit is ok
let batches = lp_to_writes("i_got_columns,tag1=A i_got_music=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
assert!(validate_schema_limits(&batches, &schema).is_ok());
// Adding columns over the limit is an error
let batches = lp_to_writes("i_got_columns,tag1=A,tag2=B i_got_music=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Column {
table_name: _,
existing_column_count: 1,
merged_column_count: 4,
@ -472,12 +574,12 @@ mod tests {
let schema = namespace.schema().await;
// Columns already existing is allowed
let batches = lp_to_writes("bananas greatness=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
assert!(validate_schema_limits(&batches, &schema).is_ok());
// Adding columns over the limit is an error
let batches = lp_to_writes("bananas i_got_music=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Column {
table_name: _,
existing_column_count: 3,
merged_column_count: 4,
@ -530,12 +632,12 @@ mod tests {
// Columns already existing is allowed
let batches = lp_to_writes("dragonfruit val=42i 123456");
assert!(validate_column_limits(&batches, &schema).is_ok());
assert!(validate_schema_limits(&batches, &schema).is_ok());
// Adding more columns over the limit is an error
let batches = lp_to_writes("dragonfruit i_got_music=42i 123456");
assert_matches!(
validate_column_limits(&batches, &schema),
Err(OverColumnLimit {
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Column {
table_name: _,
existing_column_count: 4,
merged_column_count: 5,
@ -545,6 +647,124 @@ mod tests {
}
}
#[tokio::test]
async fn test_validate_table_limits() {
let (_catalog, namespace) = test_setup().await;
namespace.update_table_limit(2).await;
// Creating a table in an empty namespace is OK
{
let schema = namespace.schema().await;
let batches = lp_to_writes("nonexistent val=42i 123456");
assert!(validate_schema_limits(&batches, &schema).is_ok());
}
// Creating two tables (the limit) is OK
{
let schema = namespace.schema().await;
let batches = lp_to_writes("nonexistent val=42i 123456\nbananas val=2 42");
assert!(validate_schema_limits(&batches, &schema).is_ok());
}
// Creating three tables (above the limit) fails
{
let schema = namespace.schema().await;
let batches =
lp_to_writes("nonexistent val=42i 123456\nbananas val=2 42\nplatanos val=2 42");
assert_matches!(
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Table {
existing_table_count: 0,
merged_table_count: 3,
table_count_limit: 2
})
);
}
// Create a table to cover non-empty namespaces
namespace.create_table("bananas").await;
// Adding a second table is OK
{
let schema = namespace.schema().await;
let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42");
assert!(validate_schema_limits(&batches, &schema).is_ok());
}
// Adding a third table is rejected OK
{
let schema = namespace.schema().await;
let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42\nnope v=2 42");
assert_matches!(
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Table {
existing_table_count: 1,
merged_table_count: 3,
table_count_limit: 2
})
);
}
// Create another table and reduce the table limit to be less than the
// current number of tables.
//
// Multiple router instances can race to populate the catalog with new
// tables/columns, therefore all existing tables MUST be accepted to
// ensure deterministic enforcement once all caches have converged.
namespace.create_table("platanos").await;
namespace.update_table_limit(1).await;
// The existing tables are accepted, even though this single write
// exceeds the new table limit.
{
let schema = namespace.schema().await;
assert_eq!(schema.tables.len(), 2);
assert_eq!(schema.max_tables, 1);
let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42");
assert_matches!(validate_schema_limits(&batches, &schema), Ok(()));
}
// A new table is always rejected.
{
let schema = namespace.schema().await;
let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42\nnope v=1 42");
assert_matches!(
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Table {
existing_table_count: 2,
merged_table_count: 3,
table_count_limit: 1,
})
);
}
{
let schema = namespace.schema().await;
let batches = lp_to_writes("bananas val=2 42\nnope v=1 42");
assert_matches!(
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Table {
existing_table_count: 2,
merged_table_count: 3,
table_count_limit: 1,
})
);
}
{
let schema = namespace.schema().await;
let batches = lp_to_writes("nope v=1 42");
assert_matches!(
validate_schema_limits(&batches, &schema),
Err(CachedServiceProtectionLimit::Table {
existing_table_count: 2,
merged_table_count: 3,
table_count_limit: 1,
})
);
}
}
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<String, MutableBatch> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)

View File

@ -46,6 +46,7 @@ mod tests {
query_pool_id: QueryPoolId::new(1234),
tables: Default::default(),
max_columns_per_table: 50,
max_tables: 24,
retention_period_ns: Some(876),
};
assert!(cache.put_schema(ns.clone(), schema1.clone()).is_none());
@ -57,6 +58,7 @@ mod tests {
query_pool_id: QueryPoolId::new(2),
tables: Default::default(),
max_columns_per_table: 10,
max_tables: 42,
retention_period_ns: Some(876),
};

View File

@ -199,6 +199,7 @@ mod tests {
query_pool_id: QueryPoolId::new(1234),
tables,
max_columns_per_table: 100,
max_tables: 42,
retention_period_ns: None,
}
}

View File

@ -65,6 +65,7 @@ mod tests {
query_pool_id: QueryPoolId::new(1),
tables: Default::default(),
max_columns_per_table: 7,
max_tables: 42,
retention_period_ns: None,
}
}

View File

@ -122,6 +122,7 @@ mod tests {
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
},
);

View File

@ -176,6 +176,7 @@ mod tests {
query_pool_id: QueryPoolId::new(3),
tables: Default::default(),
max_columns_per_table: 4,
max_tables: 42,
retention_period_ns: None,
},
);

View File

@ -560,11 +560,14 @@ where
mod tests {
use super::*;
use crate::{
dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall},
dml_handlers::{
mock::{MockDmlHandler, MockDmlHandlerCall},
CachedServiceProtectionLimit,
},
namespace_resolver::{mock::MockNamespaceResolver, NamespaceCreationError},
};
use assert_matches::assert_matches;
use data_types::{NamespaceId, NamespaceNameError};
use data_types::{NamespaceId, NamespaceNameError, TableId};
use flate2::{write::GzEncoder, Compression};
use hyper::header::HeaderValue;
use metric::{Attributes, Metric};
@ -1527,5 +1530,43 @@ mod tests {
RequestLimit,
"this service is overloaded, please try again later",
),
(
DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(CachedServiceProtectionLimit::Column {
table_name: "bananas".to_string(),
existing_column_count: 42,
merged_column_count: 4242,
max_columns_per_table: 24,
})))),
"dml handler error: service limit reached: couldn't create columns in table `bananas`; table contains 42 \
existing columns, applying this write would result in 4242 columns, limit is 24",
),
(
DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(CachedServiceProtectionLimit::Table {
existing_table_count: 42,
merged_table_count: 4242,
table_count_limit: 24,
})))),
"dml handler error: service limit reached: couldn't create new table; namespace contains 42 existing \
tables, applying this write would result in 4242 tables, limit is 24",
),
(
DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(iox_catalog::interface::Error::ColumnCreateLimitError {
column_name: "bananas".to_string(),
table_id: TableId::new(42),
})))),
"dml handler error: service limit reached: couldn't create column bananas in table 42; limit reached on \
namespace",
),
(
DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(iox_catalog::interface::Error::TableCreateLimitError {
table_name: "bananas".to_string(),
namespace_id: NamespaceId::new(42),
})))),
"dml handler error: service limit reached: couldn't create table bananas; limit reached on namespace 42",
),
}
}