diff --git a/compactor/src/parquet_file.rs b/compactor/src/parquet_file.rs index c4b56e5596..cf6f1c2897 100644 --- a/compactor/src/parquet_file.rs +++ b/compactor/src/parquet_file.rs @@ -111,6 +111,10 @@ impl CompactorParquetFile { pub fn table_id(&self) -> TableId { self.inner.table_id } + + pub fn created_at(&self) -> Timestamp { + self.inner.created_at + } } impl From for ParquetFile { diff --git a/compactor/src/parquet_file_lookup.rs b/compactor/src/parquet_file_lookup.rs index c1276309aa..3b85f57748 100644 --- a/compactor/src/parquet_file_lookup.rs +++ b/compactor/src/parquet_file_lookup.rs @@ -114,7 +114,7 @@ impl ParquetFilesForCompaction { } } - level_0.sort_by_key(|pf| pf.max_sequence_number()); + level_0.sort_by_key(|pf| pf.created_at()); level_1.sort_by_key(|pf| pf.min_time()); Ok(Self { @@ -130,6 +130,7 @@ mod tests { use super::*; use data_types::ColumnType; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestPartition}; + use iox_time::{SystemProvider, TimeProvider}; const ARBITRARY_LINE_PROTOCOL: &str = r#" table,tag1=WA field_int=1000i 8000 @@ -448,7 +449,7 @@ mod tests { } #[tokio::test] - async fn level_0_files_are_sorted_on_max_seq_num() { + async fn level_0_files_are_sorted_on_created_at() { test_helpers::maybe_start_logging(); let TestSetup { catalog, @@ -457,19 +458,23 @@ mod tests { .. } = test_setup().await; - // Create a level 0 file, max seq = 100 - let builder = TestParquetFileBuilder::default() - .with_line_protocol(ARBITRARY_LINE_PROTOCOL) - .with_compaction_level(CompactionLevel::Initial) - .with_max_seq(100); - let l0_max_seq_100 = partition.create_parquet_file(builder).await; + let time_provider = SystemProvider::new(); + let one_hour_ago = time_provider.hours_ago(1); + let ten_munites_ago = time_provider.minutes_ago(10); - // Create a level 0 file, max seq = 50 + // Create a level 0 file 10 minutes ago let builder = TestParquetFileBuilder::default() .with_line_protocol(ARBITRARY_LINE_PROTOCOL) .with_compaction_level(CompactionLevel::Initial) - .with_max_seq(50); - let l0_max_seq_50 = partition.create_parquet_file(builder).await; + .with_creation_time(ten_munites_ago); + let l0_ten_minutes_ago = partition.create_parquet_file(builder).await; + + // Create a level 0 file one hour ago + let builder = TestParquetFileBuilder::default() + .with_line_protocol(ARBITRARY_LINE_PROTOCOL) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(one_hour_ago); + let l0_one_hour_ago = partition.create_parquet_file(builder).await; // Create a level 1 file let builder = TestParquetFileBuilder::default() @@ -488,23 +493,24 @@ mod tests { .await .unwrap(); - let l0_max_seq_50_file_size_in_mem = 5 * l0_max_seq_50.parquet_file.file_size_bytes as u64; - let l0_max_seq_100_file_size_in_mem = - 5 * l0_max_seq_100.parquet_file.file_size_bytes as u64; + let l0_ten_minutes_ago_file_size_in_mem = + 5 * l0_ten_minutes_ago.parquet_file.file_size_bytes as u64; + let l0_one_hour_ago_file_size_in_mem = + 5 * l0_one_hour_ago.parquet_file.file_size_bytes as u64; let l1_file_size_in_mem = 5 * l1.parquet_file.file_size_bytes as u64; assert_eq!( parquet_files_for_compaction.level_0, vec![ CompactorParquetFile::new( - l0_max_seq_50.parquet_file, + l0_one_hour_ago.parquet_file, 0, - l0_max_seq_50_file_size_in_mem + l0_one_hour_ago_file_size_in_mem ), CompactorParquetFile::new( - l0_max_seq_100.parquet_file, + l0_ten_minutes_ago.parquet_file, 0, - l0_max_seq_100_file_size_in_mem + l0_ten_minutes_ago_file_size_in_mem ), ] );