fix: let us not compact no-data (#4744)

* fix: let us not compact no-data

* fix: split time must be greater min_time, too

* fix: resolve merge conflict

Co-authored-by: Dom <dom@itsallbroken.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-05-31 13:02:14 -04:00 committed by GitHub
parent 51fd20c769
commit dfd35c05a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 102 additions and 9 deletions

View File

@ -725,15 +725,26 @@ impl Compactor {
// Identify split time
let split_time = self.compute_split_time(min_time, max_time);
// Build compact & split query plan
let plan = ReorgPlanner::new()
.split_plan(
Arc::clone(&merged_schema),
query_chunks,
sort_key.clone(),
split_time,
)
.context(CompactLogicalPlanSnafu)?;
// Build compact logical plan
let plan = {
// split data to compact data into 2 files
if min_time < split_time && split_time < max_time {
// split compact query plan
ReorgPlanner::new()
.split_plan(
Arc::clone(&merged_schema),
query_chunks,
sort_key.clone(),
split_time,
)
.context(CompactLogicalPlanSnafu)?
} else {
// compact everything into one file
ReorgPlanner::new()
.compact_plan(Arc::clone(&merged_schema), query_chunks, sort_key.clone())
.context(CompactLogicalPlanSnafu)?
}
};
let ctx = self.exec.new_context(ExecutorType::Reorg);
let physical_plan = ctx
@ -1598,6 +1609,88 @@ mod tests {
);
}
#[tokio::test]
async fn test_compact_one_file_no_split() {
let catalog = TestCatalog::new();
let lp = vec![
"table,tag1=WA field_int=1000 8000",
"table,tag1=VT field_int=10 10000",
"table,tag1=UT field_int=70 20000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_sequencer(1).await;
let table = ns.create_table("table").await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
.await;
let parquet_file = partition
.create_parquet_file_with_min_max(&lp, 1, 1, 8000, 20000)
.await
.parquet_file;
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
// split_percentage = 100 which means no split
CompactorConfig::new(100, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
let sort_key = SortKey::from_columns(["tag1", "time"]);
let partition = partition.update_sort_key(sort_key).await;
// ------------------------------------------------
// Let add a tombstone
let tombstone = table
.with_sequencer(&sequencer)
.create_tombstone(20, 6000, 12000, "tag1=VT")
.await;
let pf = ParquetFileWithTombstone {
data: Arc::new(parquet_file),
tombstones: vec![tombstone.tombstone.clone()],
};
// should have compacted datas
let batches = compactor
.compact(vec![pf], &partition.partition)
.await
.unwrap();
// 1 output set becasue split rule = 100%
assert_eq!(batches.len(), 1);
// Collect the results for inspection.
let batches = batches
.into_iter()
.map(|v| async {
datafusion::physical_plan::common::collect(v.data)
.await
.expect("failed to collect record batches")
})
.collect::<FuturesOrdered<_>>()
.collect::<Vec<_>>()
.await;
// Data: row tag1=VT was removed
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 1000 | WA | 1970-01-01T00:00:00.000008Z |",
"| 70 | UT | 1970-01-01T00:00:00.000020Z |",
"+-----------+------+-----------------------------+",
],
&batches[0]
);
}
#[tokio::test]
async fn test_compact_two_files() {
let catalog = TestCatalog::new();