From e7038b892b800bfbb3887d7983faf8454e49c787 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 13 Jan 2025 14:26:49 -0500 Subject: [PATCH] test: add reproducer that shows group by behaviour --- influxdb3_server/src/query_executor/mod.rs | 116 ++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 493350431e..9aabd20a05 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -675,7 +675,7 @@ mod tests { use crate::query_executor::QueryExecutorImpl; use arrow::array::RecordBatch; use data_types::NamespaceName; - use datafusion::assert_batches_sorted_eq; + use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use futures::TryStreamExt; use influxdb3_cache::{ distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider, @@ -902,4 +902,118 @@ mod tests { assert_batches_sorted_eq!(t.expected, &batches); } } + + #[test_log::test(tokio::test)] + async fn influxql_group_by_stream() { + let (write_buffer, query_executor, _, _) = setup().await; + + // do some writes that have three tag columns, and various combinations thereof, so we can + // test the group by behaviour: + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "\ + bar,t1=a,t2=aa,t3=aaa combo=1 1\n\ + bar,t1=b,t2=aa,t3=aaa combo=2 1\n\ + bar,t1=a,t2=bb,t3=aaa combo=3 1\n\ + bar,t1=b,t2=bb,t3=aaa combo=4 1\n\ + bar,t1=a,t2=aa,t3=bbb combo=5 1\n\ + bar,t1=b,t2=aa,t3=bbb combo=6 1\n\ + bar,t1=a,t2=bb,t3=bbb combo=7 1\n\ + bar,t1=b,t2=bb,t3=bbb combo=8 1\n\ + bar,t1=a,t2=aa,t3=aaa combo=1 2\n\ + bar,t1=b,t2=aa,t3=aaa combo=2 2\n\ + bar,t1=a,t2=bb,t3=aaa combo=3 2\n\ + bar,t1=b,t2=bb,t3=aaa combo=4 2\n\ + bar,t1=a,t2=aa,t3=bbb combo=5 2\n\ + bar,t1=b,t2=aa,t3=bbb combo=6 2\n\ + bar,t1=a,t2=bb,t3=bbb combo=7 2\n\ + bar,t1=b,t2=bb,t3=bbb combo=8 2\n\ + ", + Time::from_timestamp_nanos(0), + false, + influxdb3_write::Precision::Second, + ) + .await + .unwrap(); + + /// A test case that asserts that 1 or more queries produce the same output + struct TestCase<'a> { + /// Queries whose output should be the same + queries: &'a [&'a str], + /// The expected number of RecordBatches that will be produced in the stream + expected_num_batches_in_stream: usize, + /// The expected query result set + expected_result: &'a [&'a str], + } + + let test_cases = [ + TestCase { + queries: &[ + // this one produces the expected output, including all tag columns in the + // query result: + "select * from bar group by t1", + // this one does not, because the t2 and t3 columns are projected out of the + // result due to using the regex: + "select * from bar group by /t[1]/", + ], + expected_num_batches_in_stream: 1, + expected_result: &[ + "+------------------+---------------------+----+-------+----+-----+", + "| iox::measurement | time | t1 | combo | t2 | t3 |", + "+------------------+---------------------+----+-------+----+-----+", + "| bar | 1970-01-01T00:00:01 | a | 1.0 | aa | aaa |", + "| bar | 1970-01-01T00:00:01 | a | 5.0 | aa | bbb |", + "| bar | 1970-01-01T00:00:01 | a | 3.0 | bb | aaa |", + "| bar | 1970-01-01T00:00:01 | a | 7.0 | bb | bbb |", + "| bar | 1970-01-01T00:00:02 | a | 1.0 | aa | aaa |", + "| bar | 1970-01-01T00:00:02 | a | 5.0 | aa | bbb |", + "| bar | 1970-01-01T00:00:02 | a | 3.0 | bb | aaa |", + "| bar | 1970-01-01T00:00:02 | a | 7.0 | bb | bbb |", + "| bar | 1970-01-01T00:00:01 | b | 2.0 | aa | aaa |", + "| bar | 1970-01-01T00:00:01 | b | 6.0 | aa | bbb |", + "| bar | 1970-01-01T00:00:01 | b | 4.0 | bb | aaa |", + "| bar | 1970-01-01T00:00:01 | b | 8.0 | bb | bbb |", + "| bar | 1970-01-01T00:00:02 | b | 2.0 | aa | aaa |", + "| bar | 1970-01-01T00:00:02 | b | 6.0 | aa | bbb |", + "| bar | 1970-01-01T00:00:02 | b | 4.0 | bb | aaa |", + "| bar | 1970-01-01T00:00:02 | b | 8.0 | bb | bbb |", + "+------------------+---------------------+----+-------+----+-----+", + ], + }, + // TODO: add expectations to these; for now they don't run because the above fails + TestCase { + queries: &[ + "select * from bar group by t1, t2", + "select * from bar group by /t[1,2]/", + ], + expected_num_batches_in_stream: 1, + expected_result: &[], + }, + TestCase { + queries: &[ + "select * from bar group by t1, t2, t3", + "select * from bar group by /t/", + "select * from bar group by /t[1, 2, 3]/", + ], + expected_num_batches_in_stream: 1, + expected_result: &[], + }, + ]; + + for t in test_cases { + for q in t.queries { + // query with a group by on + let stream = query_executor + .query("foo", q, None, QueryKind::InfluxQl, None, None) + .await + .unwrap(); + + let batches: Vec = stream.try_collect().await.unwrap(); + assert_eq!(t.expected_num_batches_in_stream, batches.len()); + println!("asserting result for query string: {q}"); + assert_batches_eq!(t.expected_result, &batches); + } + } + } }