diff --git a/compactor/src/cold.rs b/compactor/src/cold.rs index 833ff15375..54373b2af2 100644 --- a/compactor/src/cold.rs +++ b/compactor/src/cold.rs @@ -145,6 +145,7 @@ mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; + const MINUTE_WITHOUT_NEW_WRITE_TO_BE_COLD: u64 = 10; const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 10; @@ -225,13 +226,19 @@ mod tests { // parquet files that are all in the same partition let mut size_overrides = HashMap::::default(); + let time_60_minutes_ago = time.minutes_ago(60); + let time_50_minutes_ago = time.minutes_ago(50); + let time_40_minutes_ago = time.minutes_ago(40); + let time_30_minutes_ago = time.minutes_ago(30); + let time_20_minutes_ago = time.minutes_ago(20); + let time_11_minutes_ago = time.minutes_ago(11); + // pf1 does not overlap with any other level 0 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp1) - .with_max_seq(3) + .with_creation_time(time_50_minutes_ago) .with_min_time(10) - .with_max_time(20) - .with_creation_time(time_five_hour_ago); + .with_max_time(20); let pf1_no_overlap = partition.create_parquet_file(builder).await; size_overrides.insert( pf1_no_overlap.parquet_file.id, @@ -241,27 +248,25 @@ mod tests { // pf2 overlaps with pf3 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp2) - .with_max_seq(5) + .with_creation_time(time_40_minutes_ago) .with_min_time(8_000) - .with_max_time(20_000) - .with_creation_time(time_five_hour_ago); + .with_max_time(20_000); let pf2 = partition.create_parquet_file(builder).await; size_overrides.insert(pf2.parquet_file.id, 100); // small file // pf3 overlaps with pf2 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp3) - .with_max_seq(10) + .with_creation_time(time_30_minutes_ago) .with_min_time(6_000) - .with_max_time(25_000) - .with_creation_time(time_five_hour_ago); + .with_max_time(25_000); let pf3 = partition.create_parquet_file(builder).await; size_overrides.insert(pf3.parquet_file.id, 100); // small file // pf4 does not overlap with any but is small let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp4) - .with_max_seq(18) + .with_creation_time(time_20_minutes_ago) .with_min_time(26_000) .with_max_time(28_000) .with_creation_time(time_five_hour_ago); @@ -271,10 +276,9 @@ mod tests { // pf5 was created in a previous compaction cycle; overlaps with pf1 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp5) - .with_max_seq(1) + .with_creation_time(time_60_minutes_ago) .with_min_time(9) .with_max_time(25) - .with_creation_time(time_five_hour_ago) .with_compaction_level(CompactionLevel::FileNonOverlapped); let pf5 = partition.create_parquet_file(builder).await; size_overrides.insert(pf5.parquet_file.id, 100); // small file @@ -282,10 +286,9 @@ mod tests { // pf6 was created in a previous compaction cycle; does not overlap with any let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp6) - .with_max_seq(20) + .with_creation_time(time_11_minutes_ago) .with_min_time(90000) .with_max_time(91000) - .with_creation_time(time_five_hour_ago) .with_compaction_level(CompactionLevel::FileNonOverlapped); let pf6 = partition.create_parquet_file(builder).await; size_overrides.insert(pf6.parquet_file.id, 100); // small file @@ -738,7 +741,7 @@ mod tests { min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1, max_num_compacting_files: 20, max_num_compacting_files_first_in_partition: 40, - minutes_without_new_writes_to_be_cold: 10, + minutes_without_new_writes_to_be_cold: MINUTE_WITHOUT_NEW_WRITE_TO_BE_COLD, cold_partition_candidates_hours_threshold: DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, @@ -1455,7 +1458,7 @@ mod tests { let partition = table.with_shard(&shard).create_partition("part").await; let time = Arc::new(SystemProvider::new()); - let time_five_hour_ago = time.hours_ago(5); + // let time_five_hour_ago = time.hours_ago(5); let config = make_compactor_config(); let metrics = Arc::new(metric::Registry::new()); let compactor = Arc::new(Compactor::new( @@ -1472,13 +1475,19 @@ mod tests { // parquet files that are all in the same partition let mut size_overrides = HashMap::::default(); + let time_60_minutes_ago = time.minutes_ago(60); + let time_50_minutes_ago = time.minutes_ago(50); + let time_40_minutes_ago = time.minutes_ago(40); + let time_30_minutes_ago = time.minutes_ago(30); + let time_20_minutes_ago = time.minutes_ago(20); + let time_11_minutes_ago = time.minutes_ago(11); + // pf1 does not overlap with any other level 0 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp1) - .with_max_seq(3) + .with_creation_time(time_50_minutes_ago) .with_min_time(10) - .with_max_time(20) - .with_creation_time(time_five_hour_ago); + .with_max_time(20); let pf1 = partition.create_parquet_file(builder).await; size_overrides.insert( pf1.parquet_file.id, @@ -1488,10 +1497,9 @@ mod tests { // pf2 overlaps with pf3 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp2) - .with_max_seq(5) + .with_creation_time(time_40_minutes_ago) .with_min_time(8_000) - .with_max_time(20_000) - .with_creation_time(time_five_hour_ago); + .with_max_time(20_000); let pf2 = partition.create_parquet_file(builder).await; size_overrides.insert( pf2.parquet_file.id, @@ -1501,10 +1509,9 @@ mod tests { // pf3 overlaps with pf2 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp3) - .with_max_seq(10) + .with_creation_time(time_30_minutes_ago) .with_min_time(6_000) - .with_max_time(25_000) - .with_creation_time(time_five_hour_ago); + .with_max_time(25_000); let pf3 = partition.create_parquet_file(builder).await; size_overrides.insert( pf3.parquet_file.id, @@ -1514,10 +1521,9 @@ mod tests { // pf4 does not overlap with any but is small let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp4) - .with_max_seq(18) + .with_creation_time(time_20_minutes_ago) .with_min_time(26_000) - .with_max_time(28_000) - .with_creation_time(time_five_hour_ago); + .with_max_time(28_000); let pf4 = partition.create_parquet_file(builder).await; size_overrides.insert( pf4.parquet_file.id, @@ -1527,10 +1533,9 @@ mod tests { // pf5 was created in a previous compaction cycle; overlaps with pf1 let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp5) - .with_max_seq(1) + .with_creation_time(time_60_minutes_ago) .with_min_time(9) .with_max_time(25) - .with_creation_time(time_five_hour_ago) .with_compaction_level(CompactionLevel::FileNonOverlapped); let pf5 = partition.create_parquet_file(builder).await; size_overrides.insert( @@ -1541,10 +1546,9 @@ mod tests { // pf6 was created in a previous compaction cycle; does not overlap with any let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp6) - .with_max_seq(20) + .with_creation_time(time_11_minutes_ago) .with_min_time(90000) .with_max_time(91000) - .with_creation_time(time_five_hour_ago) .with_compaction_level(CompactionLevel::FileNonOverlapped); let pf6 = partition.create_parquet_file(builder).await; size_overrides.insert( diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 7e9736aeac..e0574699e5 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -883,6 +883,7 @@ mod tests { use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use data_types::{ColumnType, PartitionParam}; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; + use iox_time::SystemProvider; use itertools::Itertools; use metric::U64HistogramOptions; use parquet_file::storage::StorageId; @@ -972,6 +973,10 @@ mod tests { partition_key: partition.partition.partition_key.clone(), }); + let time = SystemProvider::new(); + let time_60_minutes_ago = time.minutes_ago(60); + let time_50_minutes_ago = time.minutes_ago(50); + let lp = vec![ "table,tag2=PA,tag3=15 field_int=1601i 30000", "table,tag2=OH,tag3=21 field_int=21i 36000", @@ -979,7 +984,6 @@ mod tests { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_max_seq(20) // This should be irrelevant because this is a level 1 file .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction let level_1_file = partition.create_parquet_file(builder).await.into(); @@ -991,7 +995,7 @@ mod tests { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_max_seq(1); + .with_creation_time(time_60_minutes_ago); let level_0_max_seq_1 = partition.create_parquet_file(builder).await.into(); let lp = vec![ @@ -1002,7 +1006,7 @@ mod tests { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_max_seq(2); + .with_creation_time(time_50_minutes_ago); let level_0_max_seq_2 = partition.create_parquet_file(builder).await.into(); let lp = vec![ @@ -1012,7 +1016,6 @@ mod tests { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) - .with_max_seq(5) // This should be irrelevant because this is a level 1 file .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction let level_1_with_duplicates = partition.create_parquet_file(builder).await.into(); diff --git a/compactor/src/query.rs b/compactor/src/query.rs index c34c4abcab..df4e299663 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -106,6 +106,11 @@ impl QueryableParquetChunk { pub fn object_store_id(&self) -> Uuid { self.data.object_store_id() } + + /// Return the creation time of the file + pub fn created_at(&self) -> Timestamp { + self.data.parquet_file().created_at + } } impl QueryChunkMeta for QueryableParquetChunk { @@ -204,8 +209,8 @@ impl QueryChunk for QueryableParquetChunk { // Files that haven't yet been compacted to the target level were created later and // should be sorted based on their max sequence number. - (FileNonOverlapped, Initial) => ChunkOrder::new(self.max_sequence_number.get()), - (Final, FileNonOverlapped) => ChunkOrder::new(self.max_sequence_number.get()), + (FileNonOverlapped, Initial) => ChunkOrder::new(self.created_at().get()), + (Final, FileNonOverlapped) => ChunkOrder::new(self.created_at().get()), // These combinations of target compaction level and file compaction level are // invalid in this context given the current compaction algorithm. @@ -229,12 +234,13 @@ mod tests { use super::*; use data_types::ColumnType; use iox_tests::util::{TestCatalog, TestParquetFileBuilder}; + use iox_time::{SystemProvider, TimeProvider}; use parquet_file::storage::{ParquetStorage, StorageId}; async fn test_setup( compaction_level: CompactionLevel, target_level: CompactionLevel, - max_sequence_number: i64, + created_at: iox_time::Time, ) -> QueryableParquetChunk { let catalog = TestCatalog::new(); let ns = catalog.create_namespace_1hr_retention("ns").await; @@ -253,7 +259,7 @@ mod tests { let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) .with_compaction_level(compaction_level) - .with_max_seq(max_sequence_number); + .with_creation_time(created_at); let file = partition.create_parquet_file(builder).await; let parquet_file = Arc::new(file.parquet_file); @@ -278,23 +284,27 @@ mod tests { } #[tokio::test] - async fn chunk_order_is_max_seq_when_compaction_level_0_and_target_level_1() { + async fn chunk_order_is_created_at_when_compaction_level_0_and_target_level_1() { + let tp = SystemProvider::new(); + let time = tp.hours_ago(1); let chunk = test_setup( CompactionLevel::Initial, CompactionLevel::FileNonOverlapped, - 2, + time, ) .await; - assert_eq!(chunk.order(), ChunkOrder::new(2)); + assert_eq!(chunk.order(), ChunkOrder::new(time.timestamp_nanos())); } #[tokio::test] async fn chunk_order_is_0_when_compaction_level_1_and_target_level_1() { + let tp = SystemProvider::new(); + let time = tp.hours_ago(1); let chunk = test_setup( CompactionLevel::FileNonOverlapped, CompactionLevel::FileNonOverlapped, - 2, + time, ) .await; @@ -302,20 +312,24 @@ mod tests { } #[tokio::test] - async fn chunk_order_is_max_seq_when_compaction_level_1_and_target_level_2() { + async fn chunk_order_is_created_at_when_compaction_level_1_and_target_level_2() { + let tp = SystemProvider::new(); + let time = tp.hours_ago(1); let chunk = test_setup( CompactionLevel::FileNonOverlapped, CompactionLevel::Final, - 2, + time, ) .await; - assert_eq!(chunk.order(), ChunkOrder::new(2)); + assert_eq!(chunk.order(), ChunkOrder::new(time.timestamp_nanos())); } #[tokio::test] async fn chunk_order_is_0_when_compaction_level_2_and_target_level_2() { - let chunk = test_setup(CompactionLevel::Final, CompactionLevel::Final, 2).await; + let tp = SystemProvider::new(); + let time = tp.hours_ago(1); + let chunk = test_setup(CompactionLevel::Final, CompactionLevel::Final, time).await; assert_eq!(chunk.order(), ChunkOrder::new(0)); }