fix: using created_at to order chunks for deduplication (#6556)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-01-10 13:18:33 -05:00 committed by GitHub
parent 4662a7ed9e
commit 2de0e45b0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 48 deletions

View File

@ -145,6 +145,7 @@ mod tests {
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const MINUTE_WITHOUT_NEW_WRITE_TO_BE_COLD: u64 = 10;
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 10; const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 10;
@ -225,13 +226,19 @@ mod tests {
// parquet files that are all in the same partition // parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default(); let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
let time_60_minutes_ago = time.minutes_ago(60);
let time_50_minutes_ago = time.minutes_ago(50);
let time_40_minutes_ago = time.minutes_ago(40);
let time_30_minutes_ago = time.minutes_ago(30);
let time_20_minutes_ago = time.minutes_ago(20);
let time_11_minutes_ago = time.minutes_ago(11);
// pf1 does not overlap with any other level 0 // pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1) .with_line_protocol(&lp1)
.with_max_seq(3) .with_creation_time(time_50_minutes_ago)
.with_min_time(10) .with_min_time(10)
.with_max_time(20) .with_max_time(20);
.with_creation_time(time_five_hour_ago);
let pf1_no_overlap = partition.create_parquet_file(builder).await; let pf1_no_overlap = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(
pf1_no_overlap.parquet_file.id, pf1_no_overlap.parquet_file.id,
@ -241,27 +248,25 @@ mod tests {
// pf2 overlaps with pf3 // pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2) .with_line_protocol(&lp2)
.with_max_seq(5) .with_creation_time(time_40_minutes_ago)
.with_min_time(8_000) .with_min_time(8_000)
.with_max_time(20_000) .with_max_time(20_000);
.with_creation_time(time_five_hour_ago);
let pf2 = partition.create_parquet_file(builder).await; let pf2 = partition.create_parquet_file(builder).await;
size_overrides.insert(pf2.parquet_file.id, 100); // small file size_overrides.insert(pf2.parquet_file.id, 100); // small file
// pf3 overlaps with pf2 // pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3) .with_line_protocol(&lp3)
.with_max_seq(10) .with_creation_time(time_30_minutes_ago)
.with_min_time(6_000) .with_min_time(6_000)
.with_max_time(25_000) .with_max_time(25_000);
.with_creation_time(time_five_hour_ago);
let pf3 = partition.create_parquet_file(builder).await; let pf3 = partition.create_parquet_file(builder).await;
size_overrides.insert(pf3.parquet_file.id, 100); // small file size_overrides.insert(pf3.parquet_file.id, 100); // small file
// pf4 does not overlap with any but is small // pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4) .with_line_protocol(&lp4)
.with_max_seq(18) .with_creation_time(time_20_minutes_ago)
.with_min_time(26_000) .with_min_time(26_000)
.with_max_time(28_000) .with_max_time(28_000)
.with_creation_time(time_five_hour_ago); .with_creation_time(time_five_hour_ago);
@ -271,10 +276,9 @@ mod tests {
// pf5 was created in a previous compaction cycle; overlaps with pf1 // pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5) .with_line_protocol(&lp5)
.with_max_seq(1) .with_creation_time(time_60_minutes_ago)
.with_min_time(9) .with_min_time(9)
.with_max_time(25) .with_max_time(25)
.with_creation_time(time_five_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped); .with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf5 = partition.create_parquet_file(builder).await; let pf5 = partition.create_parquet_file(builder).await;
size_overrides.insert(pf5.parquet_file.id, 100); // small file size_overrides.insert(pf5.parquet_file.id, 100); // small file
@ -282,10 +286,9 @@ mod tests {
// pf6 was created in a previous compaction cycle; does not overlap with any // pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6) .with_line_protocol(&lp6)
.with_max_seq(20) .with_creation_time(time_11_minutes_ago)
.with_min_time(90000) .with_min_time(90000)
.with_max_time(91000) .with_max_time(91000)
.with_creation_time(time_five_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped); .with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf6 = partition.create_parquet_file(builder).await; let pf6 = partition.create_parquet_file(builder).await;
size_overrides.insert(pf6.parquet_file.id, 100); // small file size_overrides.insert(pf6.parquet_file.id, 100); // small file
@ -738,7 +741,7 @@ mod tests {
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1, min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1,
max_num_compacting_files: 20, max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40, max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10, minutes_without_new_writes_to_be_cold: MINUTE_WITHOUT_NEW_WRITE_TO_BE_COLD,
cold_partition_candidates_hours_threshold: cold_partition_candidates_hours_threshold:
DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD, DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
@ -1455,7 +1458,7 @@ mod tests {
let partition = table.with_shard(&shard).create_partition("part").await; let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new()); let time = Arc::new(SystemProvider::new());
let time_five_hour_ago = time.hours_ago(5); // let time_five_hour_ago = time.hours_ago(5);
let config = make_compactor_config(); let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new()); let metrics = Arc::new(metric::Registry::new());
let compactor = Arc::new(Compactor::new( let compactor = Arc::new(Compactor::new(
@ -1472,13 +1475,19 @@ mod tests {
// parquet files that are all in the same partition // parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default(); let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
let time_60_minutes_ago = time.minutes_ago(60);
let time_50_minutes_ago = time.minutes_ago(50);
let time_40_minutes_ago = time.minutes_ago(40);
let time_30_minutes_ago = time.minutes_ago(30);
let time_20_minutes_ago = time.minutes_ago(20);
let time_11_minutes_ago = time.minutes_ago(11);
// pf1 does not overlap with any other level 0 // pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1) .with_line_protocol(&lp1)
.with_max_seq(3) .with_creation_time(time_50_minutes_ago)
.with_min_time(10) .with_min_time(10)
.with_max_time(20) .with_max_time(20);
.with_creation_time(time_five_hour_ago);
let pf1 = partition.create_parquet_file(builder).await; let pf1 = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(
pf1.parquet_file.id, pf1.parquet_file.id,
@ -1488,10 +1497,9 @@ mod tests {
// pf2 overlaps with pf3 // pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2) .with_line_protocol(&lp2)
.with_max_seq(5) .with_creation_time(time_40_minutes_ago)
.with_min_time(8_000) .with_min_time(8_000)
.with_max_time(20_000) .with_max_time(20_000);
.with_creation_time(time_five_hour_ago);
let pf2 = partition.create_parquet_file(builder).await; let pf2 = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(
pf2.parquet_file.id, pf2.parquet_file.id,
@ -1501,10 +1509,9 @@ mod tests {
// pf3 overlaps with pf2 // pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3) .with_line_protocol(&lp3)
.with_max_seq(10) .with_creation_time(time_30_minutes_ago)
.with_min_time(6_000) .with_min_time(6_000)
.with_max_time(25_000) .with_max_time(25_000);
.with_creation_time(time_five_hour_ago);
let pf3 = partition.create_parquet_file(builder).await; let pf3 = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(
pf3.parquet_file.id, pf3.parquet_file.id,
@ -1514,10 +1521,9 @@ mod tests {
// pf4 does not overlap with any but is small // pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4) .with_line_protocol(&lp4)
.with_max_seq(18) .with_creation_time(time_20_minutes_ago)
.with_min_time(26_000) .with_min_time(26_000)
.with_max_time(28_000) .with_max_time(28_000);
.with_creation_time(time_five_hour_ago);
let pf4 = partition.create_parquet_file(builder).await; let pf4 = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(
pf4.parquet_file.id, pf4.parquet_file.id,
@ -1527,10 +1533,9 @@ mod tests {
// pf5 was created in a previous compaction cycle; overlaps with pf1 // pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5) .with_line_protocol(&lp5)
.with_max_seq(1) .with_creation_time(time_60_minutes_ago)
.with_min_time(9) .with_min_time(9)
.with_max_time(25) .with_max_time(25)
.with_creation_time(time_five_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped); .with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf5 = partition.create_parquet_file(builder).await; let pf5 = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(
@ -1541,10 +1546,9 @@ mod tests {
// pf6 was created in a previous compaction cycle; does not overlap with any // pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6) .with_line_protocol(&lp6)
.with_max_seq(20) .with_creation_time(time_11_minutes_ago)
.with_min_time(90000) .with_min_time(90000)
.with_max_time(91000) .with_max_time(91000)
.with_creation_time(time_five_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped); .with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf6 = partition.create_parquet_file(builder).await; let pf6 = partition.create_parquet_file(builder).await;
size_overrides.insert( size_overrides.insert(

View File

@ -883,6 +883,7 @@ mod tests {
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use data_types::{ColumnType, PartitionParam}; use data_types::{ColumnType, PartitionParam};
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::SystemProvider;
use itertools::Itertools; use itertools::Itertools;
use metric::U64HistogramOptions; use metric::U64HistogramOptions;
use parquet_file::storage::StorageId; use parquet_file::storage::StorageId;
@ -972,6 +973,10 @@ mod tests {
partition_key: partition.partition.partition_key.clone(), partition_key: partition.partition.partition_key.clone(),
}); });
let time = SystemProvider::new();
let time_60_minutes_ago = time.minutes_ago(60);
let time_50_minutes_ago = time.minutes_ago(50);
let lp = vec![ let lp = vec![
"table,tag2=PA,tag3=15 field_int=1601i 30000", "table,tag2=PA,tag3=15 field_int=1601i 30000",
"table,tag2=OH,tag3=21 field_int=21i 36000", "table,tag2=OH,tag3=21 field_int=21i 36000",
@ -979,7 +984,6 @@ mod tests {
.join("\n"); .join("\n");
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp) .with_line_protocol(&lp)
.with_max_seq(20) // This should be irrelevant because this is a level 1 file
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
let level_1_file = partition.create_parquet_file(builder).await.into(); let level_1_file = partition.create_parquet_file(builder).await.into();
@ -991,7 +995,7 @@ mod tests {
.join("\n"); .join("\n");
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp) .with_line_protocol(&lp)
.with_max_seq(1); .with_creation_time(time_60_minutes_ago);
let level_0_max_seq_1 = partition.create_parquet_file(builder).await.into(); let level_0_max_seq_1 = partition.create_parquet_file(builder).await.into();
let lp = vec![ let lp = vec![
@ -1002,7 +1006,7 @@ mod tests {
.join("\n"); .join("\n");
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp) .with_line_protocol(&lp)
.with_max_seq(2); .with_creation_time(time_50_minutes_ago);
let level_0_max_seq_2 = partition.create_parquet_file(builder).await.into(); let level_0_max_seq_2 = partition.create_parquet_file(builder).await.into();
let lp = vec![ let lp = vec![
@ -1012,7 +1016,6 @@ mod tests {
.join("\n"); .join("\n");
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp) .with_line_protocol(&lp)
.with_max_seq(5) // This should be irrelevant because this is a level 1 file
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
let level_1_with_duplicates = partition.create_parquet_file(builder).await.into(); let level_1_with_duplicates = partition.create_parquet_file(builder).await.into();

View File

@ -106,6 +106,11 @@ impl QueryableParquetChunk {
pub fn object_store_id(&self) -> Uuid { pub fn object_store_id(&self) -> Uuid {
self.data.object_store_id() self.data.object_store_id()
} }
/// Return the creation time of the file
pub fn created_at(&self) -> Timestamp {
self.data.parquet_file().created_at
}
} }
impl QueryChunkMeta for QueryableParquetChunk { impl QueryChunkMeta for QueryableParquetChunk {
@ -204,8 +209,8 @@ impl QueryChunk for QueryableParquetChunk {
// Files that haven't yet been compacted to the target level were created later and // Files that haven't yet been compacted to the target level were created later and
// should be sorted based on their max sequence number. // should be sorted based on their max sequence number.
(FileNonOverlapped, Initial) => ChunkOrder::new(self.max_sequence_number.get()), (FileNonOverlapped, Initial) => ChunkOrder::new(self.created_at().get()),
(Final, FileNonOverlapped) => ChunkOrder::new(self.max_sequence_number.get()), (Final, FileNonOverlapped) => ChunkOrder::new(self.created_at().get()),
// These combinations of target compaction level and file compaction level are // These combinations of target compaction level and file compaction level are
// invalid in this context given the current compaction algorithm. // invalid in this context given the current compaction algorithm.
@ -229,12 +234,13 @@ mod tests {
use super::*; use super::*;
use data_types::ColumnType; use data_types::ColumnType;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder}; use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
use iox_time::{SystemProvider, TimeProvider};
use parquet_file::storage::{ParquetStorage, StorageId}; use parquet_file::storage::{ParquetStorage, StorageId};
async fn test_setup( async fn test_setup(
compaction_level: CompactionLevel, compaction_level: CompactionLevel,
target_level: CompactionLevel, target_level: CompactionLevel,
max_sequence_number: i64, created_at: iox_time::Time,
) -> QueryableParquetChunk { ) -> QueryableParquetChunk {
let catalog = TestCatalog::new(); let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await; let ns = catalog.create_namespace_1hr_retention("ns").await;
@ -253,7 +259,7 @@ mod tests {
let builder = TestParquetFileBuilder::default() let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp) .with_line_protocol(&lp)
.with_compaction_level(compaction_level) .with_compaction_level(compaction_level)
.with_max_seq(max_sequence_number); .with_creation_time(created_at);
let file = partition.create_parquet_file(builder).await; let file = partition.create_parquet_file(builder).await;
let parquet_file = Arc::new(file.parquet_file); let parquet_file = Arc::new(file.parquet_file);
@ -278,23 +284,27 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn chunk_order_is_max_seq_when_compaction_level_0_and_target_level_1() { async fn chunk_order_is_created_at_when_compaction_level_0_and_target_level_1() {
let tp = SystemProvider::new();
let time = tp.hours_ago(1);
let chunk = test_setup( let chunk = test_setup(
CompactionLevel::Initial, CompactionLevel::Initial,
CompactionLevel::FileNonOverlapped, CompactionLevel::FileNonOverlapped,
2, time,
) )
.await; .await;
assert_eq!(chunk.order(), ChunkOrder::new(2)); assert_eq!(chunk.order(), ChunkOrder::new(time.timestamp_nanos()));
} }
#[tokio::test] #[tokio::test]
async fn chunk_order_is_0_when_compaction_level_1_and_target_level_1() { async fn chunk_order_is_0_when_compaction_level_1_and_target_level_1() {
let tp = SystemProvider::new();
let time = tp.hours_ago(1);
let chunk = test_setup( let chunk = test_setup(
CompactionLevel::FileNonOverlapped, CompactionLevel::FileNonOverlapped,
CompactionLevel::FileNonOverlapped, CompactionLevel::FileNonOverlapped,
2, time,
) )
.await; .await;
@ -302,20 +312,24 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn chunk_order_is_max_seq_when_compaction_level_1_and_target_level_2() { async fn chunk_order_is_created_at_when_compaction_level_1_and_target_level_2() {
let tp = SystemProvider::new();
let time = tp.hours_ago(1);
let chunk = test_setup( let chunk = test_setup(
CompactionLevel::FileNonOverlapped, CompactionLevel::FileNonOverlapped,
CompactionLevel::Final, CompactionLevel::Final,
2, time,
) )
.await; .await;
assert_eq!(chunk.order(), ChunkOrder::new(2)); assert_eq!(chunk.order(), ChunkOrder::new(time.timestamp_nanos()));
} }
#[tokio::test] #[tokio::test]
async fn chunk_order_is_0_when_compaction_level_2_and_target_level_2() { async fn chunk_order_is_0_when_compaction_level_2_and_target_level_2() {
let chunk = test_setup(CompactionLevel::Final, CompactionLevel::Final, 2).await; let tp = SystemProvider::new();
let time = tp.hours_ago(1);
let chunk = test_setup(CompactionLevel::Final, CompactionLevel::Final, time).await;
assert_eq!(chunk.order(), ChunkOrder::new(0)); assert_eq!(chunk.order(), ChunkOrder::new(0));
} }