test: deduplication across memory and parquet chunks

tjh/duplicate-in-mem-and-parquet
Trevor Hilton 2025-05-29 13:52:33 -04:00
parent 756a50daa6
commit e0b1e81fc9
2 changed files with 88 additions and 0 deletions

View File

@ -124,6 +124,76 @@ async fn test_deduplicate_rows_in_write_buffer_parquet() {
insta::assert_snapshot!(plan);
}
#[test_log::test(tokio::test)]
async fn test_deduplicate_rows_in_write_buffer_both_memory_and_parquet() {
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
// uses a small snapshot size to get parquet persisted early, and therefore have queries
// serve chunks from both in-memory buffer and persisted parquet
snapshot_size: 1,
};
let service = TestService::setup(wal_config).await;
service
.do_writes(
"foo",
[
TestWrite::lp("bar value=false", 1),
TestWrite::lp("bar value=false", 2),
TestWrite::lp("bar value=false", 3),
],
)
.await;
service.wait_for_snapshot_sequence(1).await;
let batches = service.query_sql("foo", "select * from bar").await;
assert_batches_sorted_eq!(
[
"+---------------------+-------+",
"| time | value |",
"+---------------------+-------+",
"| 1970-01-01T00:00:01 | false |", // note that this is `false`
"| 1970-01-01T00:00:02 | false |",
"| 1970-01-01T00:00:03 | false |",
"+---------------------+-------+",
],
&batches
);
// write a duplicate row, but don't trigger a snapshot, so subsequent query will be served
// by chunks in both memory buffer and persisted parquet; this also flips the `value` to `true`
// to show that the most recent written line appears in the query result:
service
.do_writes("foo", [TestWrite::lp("bar value=true", 1)])
.await;
let batches = service.query_sql("foo", "select * from bar").await;
assert_batches_sorted_eq!(
[
"+---------------------+-------+",
"| time | value |",
"+---------------------+-------+",
"| 1970-01-01T00:00:01 | true |", // note that this is now `true`
"| 1970-01-01T00:00:02 | false |",
"| 1970-01-01T00:00:03 | false |",
"+---------------------+-------+",
],
&batches
);
let batches = service.query_sql("foo", "explain select * from bar").await;
// There should be a union of a ParquetExec and RecordBatchesExec that feeds into a
// DeduplicateExec in the query plan, i.e., there is both in-memory buffer chunks and persisted
// parquet chunks:
let plan = arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string();
insta::assert_snapshot!(plan);
}
struct TestService {
query_executor: Arc<dyn QueryExecutor>,
write_buffer: Arc<dyn WriteBuffer>,

View File

@ -0,0 +1,18 @@
---
source: influxdb3_server/tests/lib.rs
expression: plan
---
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: bar projection=[time, value] |
| physical_plan | ProjectionExec: expr=[time@0 as time, value@1 as value] |
| | DeduplicateExec: [time@0 ASC] |
| | SortPreservingMergeExec: [time@0 ASC, __chunk_order@2 ASC] |
| | UnionExec |
| | SortExec: expr=[time@0 ASC, __chunk_order@2 ASC], preserve_partitioning=[false] |
| | RecordBatchesExec: chunks=1, projection=[time, value, __chunk_order] |
| | SortExec: expr=[time@0 ASC, __chunk_order@2 ASC], preserve_partitioning=[false] |
| | ParquetExec: file_groups={1 group: [[test-node/dbs/foo-1/bar-0/1970-01-01/00-00/0000000003.parquet]]}, projection=[time, value, __chunk_order], output_ordering=[__chunk_order@2 ASC] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+