From e0b1e81fc90db9b22bd8b91c889d37bbcfcf75ba Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Thu, 29 May 2025 13:52:33 -0400 Subject: [PATCH] test: deduplication across memory and parquet chunks --- influxdb3_server/tests/lib.rs | 70 +++++++++++++++++++ ..._write_buffer_both_memory_and_parquet.snap | 18 +++++ 2 files changed, 88 insertions(+) create mode 100644 influxdb3_server/tests/snapshots/lib__deduplicate_rows_in_write_buffer_both_memory_and_parquet.snap diff --git a/influxdb3_server/tests/lib.rs b/influxdb3_server/tests/lib.rs index 6cf0d990bb..6576d5ee34 100644 --- a/influxdb3_server/tests/lib.rs +++ b/influxdb3_server/tests/lib.rs @@ -124,6 +124,76 @@ async fn test_deduplicate_rows_in_write_buffer_parquet() { insta::assert_snapshot!(plan); } +#[test_log::test(tokio::test)] +async fn test_deduplicate_rows_in_write_buffer_both_memory_and_parquet() { + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + // uses a small snapshot size to get parquet persisted early, and therefore have queries + // serve chunks from both in-memory buffer and persisted parquet + snapshot_size: 1, + }; + let service = TestService::setup(wal_config).await; + + service + .do_writes( + "foo", + [ + TestWrite::lp("bar value=false", 1), + TestWrite::lp("bar value=false", 2), + TestWrite::lp("bar value=false", 3), + ], + ) + .await; + + service.wait_for_snapshot_sequence(1).await; + + let batches = service.query_sql("foo", "select * from bar").await; + assert_batches_sorted_eq!( + [ + "+---------------------+-------+", + "| time | value |", + "+---------------------+-------+", + "| 1970-01-01T00:00:01 | false |", // note that this is `false` + "| 1970-01-01T00:00:02 | false |", + "| 1970-01-01T00:00:03 | false |", + "+---------------------+-------+", + ], + &batches + ); + + // write a duplicate row, but don't trigger a snapshot, so subsequent query will be served + // by chunks in both memory buffer and persisted parquet; this also flips the `value` to `true` + // to show that the most recent written line appears in the query result: + service + .do_writes("foo", [TestWrite::lp("bar value=true", 1)]) + .await; + + let batches = service.query_sql("foo", "select * from bar").await; + assert_batches_sorted_eq!( + [ + "+---------------------+-------+", + "| time | value |", + "+---------------------+-------+", + "| 1970-01-01T00:00:01 | true |", // note that this is now `true` + "| 1970-01-01T00:00:02 | false |", + "| 1970-01-01T00:00:03 | false |", + "+---------------------+-------+", + ], + &batches + ); + + let batches = service.query_sql("foo", "explain select * from bar").await; + // There should be a union of a ParquetExec and RecordBatchesExec that feeds into a + // DeduplicateExec in the query plan, i.e., there is both in-memory buffer chunks and persisted + // parquet chunks: + let plan = arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string(); + insta::assert_snapshot!(plan); +} + struct TestService { query_executor: Arc, write_buffer: Arc, diff --git a/influxdb3_server/tests/snapshots/lib__deduplicate_rows_in_write_buffer_both_memory_and_parquet.snap b/influxdb3_server/tests/snapshots/lib__deduplicate_rows_in_write_buffer_both_memory_and_parquet.snap new file mode 100644 index 0000000000..fb52063984 --- /dev/null +++ b/influxdb3_server/tests/snapshots/lib__deduplicate_rows_in_write_buffer_both_memory_and_parquet.snap @@ -0,0 +1,18 @@ +--- +source: influxdb3_server/tests/lib.rs +expression: plan +--- ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | TableScan: bar projection=[time, value] | +| physical_plan | ProjectionExec: expr=[time@0 as time, value@1 as value] | +| | DeduplicateExec: [time@0 ASC] | +| | SortPreservingMergeExec: [time@0 ASC, __chunk_order@2 ASC] | +| | UnionExec | +| | SortExec: expr=[time@0 ASC, __chunk_order@2 ASC], preserve_partitioning=[false] | +| | RecordBatchesExec: chunks=1, projection=[time, value, __chunk_order] | +| | SortExec: expr=[time@0 ASC, __chunk_order@2 ASC], preserve_partitioning=[false] | +| | ParquetExec: file_groups={1 group: [[test-node/dbs/foo-1/bar-0/1970-01-01/00-00/0000000003.parquet]]}, projection=[time, value, __chunk_order], output_ordering=[__chunk_order@2 ASC] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+