test: Add more cases for cold compaction
parent
7cd78a3020
commit
723aedfbca
|
@ -577,7 +577,7 @@ mod tests {
|
|||
assert_eq!(candidates.len(), 1);
|
||||
let c = Arc::new(candidates.pop_front().unwrap());
|
||||
|
||||
compact_remaining_level_0_files(&compactor, c, &size_overrides)
|
||||
compact_remaining_level_0_files(&compactor, Arc::clone(&c), &size_overrides)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -631,6 +631,119 @@ mod tests {
|
|||
],
|
||||
&batches
|
||||
);
|
||||
|
||||
// Full compaction will now combine the two level 1 files into one level 2 file
|
||||
full_compaction(&compactor, c, &size_overrides)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 1);
|
||||
let file = files.pop().unwrap();
|
||||
assert_eq!(file.id.get(), 3);
|
||||
assert_eq!(file.compaction_level, CompactionLevel::Final);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Verify the parquet file content
|
||||
|
||||
let batches = table.read_parquet_file(file).await;
|
||||
assert_batches_sorted_eq!(
|
||||
&[
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
"| field_int | tag1 | tag2 | tag3 | time |",
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 421 | | OH | 21 | 1970-01-01T00:00:00.000091Z |",
|
||||
"| 81601 | | PA | 15 | 1970-01-01T00:00:00.000090Z |",
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_full_cold_compaction_upgrades_one_level_0() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
// Create one cold level 0 file that will get upgraded to level 1 then upgraded to level 2
|
||||
let lp1 = vec![
|
||||
"table,tag1=WA field_int=1000i 10",
|
||||
"table,tag1=VT field_int=10i 20",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
let shard = ns.create_shard(1).await;
|
||||
let table = ns.create_table("table").await;
|
||||
table.create_column("field_int", ColumnType::I64).await;
|
||||
table.create_column("tag1", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
let partition = table.with_shard(&shard).create_partition("part").await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
|
||||
let config = make_compactor_config();
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let compactor = Compactor::new(
|
||||
vec![shard.shard.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
config,
|
||||
Arc::clone(&metrics),
|
||||
);
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp1)
|
||||
.with_creation_time(time_38_hour_ago);
|
||||
partition.create_parquet_file(builder).await;
|
||||
|
||||
// should have 1 level-0 file before compacting
|
||||
let count = catalog.count_level_0_files(shard.shard.id).await;
|
||||
assert_eq!(count, 1);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
let candidates = compactor
|
||||
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
|
||||
|
||||
assert_eq!(candidates.len(), 1);
|
||||
let c = Arc::new(candidates.pop_front().unwrap());
|
||||
|
||||
compact_cold_partition(&compactor, c, &Default::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Should have 1 non-soft-deleted files:
|
||||
//
|
||||
// - the newly created file that was upgraded to level 1 then to level 2
|
||||
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 1);
|
||||
let file = files.pop().unwrap();
|
||||
assert_eq!(file.id.get(), 1); // ID doesn't change because the file doesn't get rewritten
|
||||
assert_eq!(file.compaction_level, CompactionLevel::Final);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Verify the parquet file content
|
||||
|
||||
let batches = table.read_parquet_file(file).await;
|
||||
assert_batches_sorted_eq!(
|
||||
&[
|
||||
"+-----------+------+--------------------------------+",
|
||||
"| field_int | tag1 | time |",
|
||||
"+-----------+------+--------------------------------+",
|
||||
"| 10 | VT | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| 1000 | WA | 1970-01-01T00:00:00.000000010Z |",
|
||||
"+-----------+------+--------------------------------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
}
|
||||
|
||||
fn make_compactor_config() -> CompactorConfig {
|
||||
|
|
Loading…
Reference in New Issue