fix: use created_at to order L0 during comapction
parent
042b7c4521
commit
1a93f70a8b
|
@ -111,6 +111,10 @@ impl CompactorParquetFile {
|
|||
pub fn table_id(&self) -> TableId {
|
||||
self.inner.table_id
|
||||
}
|
||||
|
||||
pub fn created_at(&self) -> Timestamp {
|
||||
self.inner.created_at
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactorParquetFile> for ParquetFile {
|
||||
|
|
|
@ -114,7 +114,7 @@ impl ParquetFilesForCompaction {
|
|||
}
|
||||
}
|
||||
|
||||
level_0.sort_by_key(|pf| pf.max_sequence_number());
|
||||
level_0.sort_by_key(|pf| pf.created_at());
|
||||
level_1.sort_by_key(|pf| pf.min_time());
|
||||
|
||||
Ok(Self {
|
||||
|
@ -130,6 +130,7 @@ mod tests {
|
|||
use super::*;
|
||||
use data_types::ColumnType;
|
||||
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestPartition};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
|
||||
const ARBITRARY_LINE_PROTOCOL: &str = r#"
|
||||
table,tag1=WA field_int=1000i 8000
|
||||
|
@ -448,7 +449,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn level_0_files_are_sorted_on_max_seq_num() {
|
||||
async fn level_0_files_are_sorted_on_created_at() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let TestSetup {
|
||||
catalog,
|
||||
|
@ -457,19 +458,23 @@ mod tests {
|
|||
..
|
||||
} = test_setup().await;
|
||||
|
||||
// Create a level 0 file, max seq = 100
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.with_max_seq(100);
|
||||
let l0_max_seq_100 = partition.create_parquet_file(builder).await;
|
||||
let time_provider = SystemProvider::new();
|
||||
let one_hour_ago = time_provider.hours_ago(1);
|
||||
let ten_munites_ago = time_provider.minutes_ago(10);
|
||||
|
||||
// Create a level 0 file, max seq = 50
|
||||
// Create a level 0 file 10 minutes ago
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.with_max_seq(50);
|
||||
let l0_max_seq_50 = partition.create_parquet_file(builder).await;
|
||||
.with_creation_time(ten_munites_ago);
|
||||
let l0_ten_minutes_ago = partition.create_parquet_file(builder).await;
|
||||
|
||||
// Create a level 0 file one hour ago
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.with_creation_time(one_hour_ago);
|
||||
let l0_one_hour_ago = partition.create_parquet_file(builder).await;
|
||||
|
||||
// Create a level 1 file
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
|
@ -488,23 +493,24 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let l0_max_seq_50_file_size_in_mem = 5 * l0_max_seq_50.parquet_file.file_size_bytes as u64;
|
||||
let l0_max_seq_100_file_size_in_mem =
|
||||
5 * l0_max_seq_100.parquet_file.file_size_bytes as u64;
|
||||
let l0_ten_minutes_ago_file_size_in_mem =
|
||||
5 * l0_ten_minutes_ago.parquet_file.file_size_bytes as u64;
|
||||
let l0_one_hour_ago_file_size_in_mem =
|
||||
5 * l0_one_hour_ago.parquet_file.file_size_bytes as u64;
|
||||
let l1_file_size_in_mem = 5 * l1.parquet_file.file_size_bytes as u64;
|
||||
|
||||
assert_eq!(
|
||||
parquet_files_for_compaction.level_0,
|
||||
vec![
|
||||
CompactorParquetFile::new(
|
||||
l0_max_seq_50.parquet_file,
|
||||
l0_one_hour_ago.parquet_file,
|
||||
0,
|
||||
l0_max_seq_50_file_size_in_mem
|
||||
l0_one_hour_ago_file_size_in_mem
|
||||
),
|
||||
CompactorParquetFile::new(
|
||||
l0_max_seq_100.parquet_file,
|
||||
l0_ten_minutes_ago.parquet_file,
|
||||
0,
|
||||
l0_max_seq_100_file_size_in_mem
|
||||
l0_ten_minutes_ago_file_size_in_mem
|
||||
),
|
||||
]
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue