From f97e1765d8ef351a35d8359e58583573745a1493 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 11 Apr 2023 16:35:29 +0200 Subject: [PATCH] fix: do not loose schema for empty query responses (#7506) Within our query tests and our CLI, we've used to print out empty query responses as: ```text ++ ++ ``` This is pretty misleading. Why are there no columns?! The reason is that while Flight provides us with schema information, we often have zero record batches (because why would the querier send an empty batch). Now lets fix this by creating an empty batch on the client side based on the schema data we've received. This way, people know that there are columns but no rows: ```text +-------+--------+------+------+ | count | system | time | town | +-------+--------+------+------+ +-------+--------+------+------+ ``` An alternative fix would be to pass the schema in addition to `Vec` to the formatting code, but that seemed to be more effort. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- influxdb_iox/src/commands/query.rs | 15 ++++++- .../tests/end_to_end_cases/all_in_one.rs | 6 ++- .../cases/in/issue_6112.influxql.expected | 39 +++++++++++----- .../cases/in/pushdown.sql.expected | 6 ++- test_helpers_end_to_end/src/client.rs | 44 +++++++++++++------ .../src/snapshot_comparison.rs | 6 ++- test_helpers_end_to_end/src/steps.rs | 14 +++--- 7 files changed, 91 insertions(+), 39 deletions(-) diff --git a/influxdb_iox/src/commands/query.rs b/influxdb_iox/src/commands/query.rs index 48c9e19ba7..3c081cad70 100644 --- a/influxdb_iox/src/commands/query.rs +++ b/influxdb_iox/src/commands/query.rs @@ -1,3 +1,4 @@ +use arrow::record_batch::RecordBatch; use clap::ValueEnum; use futures::TryStreamExt; use influxdb_iox_client::format::influxql::write_columnar; @@ -83,14 +84,24 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { query_lang, } = config; - let query_results = match query_lang { + let mut query_results = match query_lang { QueryLanguage::Sql => client.sql(namespace, query).await, QueryLanguage::InfluxQL => client.influxql(namespace, query).await, }?; // It might be nice to do some sort of streaming write // rather than buffering the whole thing. - let batches: Vec<_> = query_results.try_collect().await?; + let mut batches: Vec<_> = (&mut query_results).try_collect().await?; + + // read schema AFTER collection, otherwise the stream does not have the schema data yet + let schema = query_results + .inner() + .schema() + .cloned() + .ok_or(influxdb_iox_client::flight::Error::NoSchema)?; + + // preserve schema so we print table headers even for empty results + batches.push(RecordBatch::new_empty(schema)); match (query_lang, &format) { (QueryLanguage::InfluxQL, OutputFormat::Pretty) => { diff --git a/influxdb_iox/tests/end_to_end_cases/all_in_one.rs b/influxdb_iox/tests/end_to_end_cases/all_in_one.rs index a5842f4be8..1e53b98bb2 100644 --- a/influxdb_iox/tests/end_to_end_cases/all_in_one.rs +++ b/influxdb_iox/tests/end_to_end_cases/all_in_one.rs @@ -33,7 +33,8 @@ async fn smoke() { // run query let sql = format!("select * from {table_name}"); - let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await; + let (batches, _schema) = + run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await; let expected = [ "+------+------+--------------------------------+-----+", @@ -79,7 +80,8 @@ async fn ephemeral_mode() { // run query // do not select time becasue it changes every time let sql = format!("select tag1, tag2, val from {table_name}"); - let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await; + let (batches, _schema) = + run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await; let expected = [ "+------+------+-----+", diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected index ea5fb5b822..5395a75c53 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected @@ -15,6 +15,7 @@ -- InfluxQL: SELECT * FROM non_existent; ++ ++ +++ -- InfluxQL: SELECT *::tag, f64 FROM m0; -- Results After Sorting +------------------+----------------------+-------+-------+------+ @@ -359,8 +360,10 @@ Error while planning query: Error during planning: invalid number of arguments f | m0 | 2022-10-31T02:00:10Z | 21.2 | +------------------+----------------------+------+ -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND str = 1; -++ -++ ++------------------+------+-----+ +| iox::measurement | time | f64 | ++------------------+------+-----+ ++------------------+------+-----+ -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19 + 0.5 OR non_existent = 1; +------------------+----------------------+------+ | iox::measurement | time | f64 | @@ -368,11 +371,15 @@ Error while planning query: Error during planning: invalid number of arguments f | m0 | 2022-10-31T02:00:10Z | 21.2 | +------------------+----------------------+------+ -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND non_existent = 1; -++ -++ ++------------------+------+-----+ +| iox::measurement | time | f64 | ++------------------+------+-----+ ++------------------+------+-----+ -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND f64 =~ /foo/; -++ -++ ++------------------+------+-----+ +| iox::measurement | time | f64 | ++------------------+------+-----+ ++------------------+------+-----+ -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 OR f64 =~ /foo/; +------------------+----------------------+------+ | iox::measurement | time | f64 | @@ -667,6 +674,7 @@ Error while planning query: Error during planning: invalid number of arguments f -- InfluxQL: SELECT MEAN(foo) FROM cpu; ++ ++ +++ -- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu; +------------------+----------------------+-----------+--------------------------+ | iox::measurement | time | cpu | mean_usage_idle_mean_foo | @@ -686,6 +694,7 @@ Error while planning query: Error during planning: invalid number of arguments f -- InfluxQL: SELECT MEAN(foo) FROM cpu GROUP BY cpu; ++ ++ +++ -- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none); +------------------+----------------------+-------+------+ | iox::measurement | time | count | sum | @@ -1079,8 +1088,10 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr | disk | 2022-10-31T02:01:30Z | | disk1s5 | | 3 | +------------------+----------------------+-----------+---------+-------+---------+ -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null); -++ -++ ++------------------+------+-------+ +| iox::measurement | time | count | ++------------------+------+-------+ ++------------------+------+-------+ -- InfluxQL: SELECT f64 FROM m0 WHERE tag0 = 'val00' LIMIT 3; +------------------+----------------------+------+ | iox::measurement | time | f64 | @@ -1240,8 +1251,10 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr | cpu | 1970-01-01T00:00:00Z | cpu1 | 2 | +------------------+----------------------+-----------+-------+ -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1; -++ -++ ++------------------+------+-----+-------+ +| iox::measurement | time | cpu | count | ++------------------+------+-----+-------+ ++------------------+------+-----+-------+ -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1; +------------------+----------------------+-----------+-------+ | iox::measurement | time | cpu | count | @@ -1251,8 +1264,10 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr | cpu | 1970-01-01T00:00:00Z | cpu1 | 2 | +------------------+----------------------+-----------+-------+ -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1; -++ -++ ++------------------+------+-----+-------+ +| iox::measurement | time | cpu | count | ++------------------+------+-----+-------+ ++------------------+------+-----+-------+ -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:05:00Z' GROUP BY TIME(30s) LIMIT 2; +------------------+----------------------+-------+ | iox::measurement | time | count | diff --git a/influxdb_iox/tests/query_tests2/cases/in/pushdown.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/pushdown.sql.expected index 0bcc725724..3432920092 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/pushdown.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/pushdown.sql.expected @@ -233,8 +233,10 @@ ---------- -- SQL: SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00'); -- Results After Sorting -++ -++ ++-------+--------+------+------+ +| count | system | time | town | ++-------+--------+------+------+ ++-------+--------+------+------+ -- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00'); -- Results After Normalizing UUIDs ---------- diff --git a/test_helpers_end_to_end/src/client.rs b/test_helpers_end_to_end/src/client.rs index d431b4fdcc..80f404499f 100644 --- a/test_helpers_end_to_end/src/client.rs +++ b/test_helpers_end_to_end/src/client.rs @@ -1,5 +1,5 @@ //! Client helpers for writing end to end ng tests -use arrow::record_batch::RecordBatch; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use data_types::{NamespaceId, TableId}; use dml::{DmlMeta, DmlWrite}; use futures::TryStreamExt; @@ -84,7 +84,7 @@ pub async fn try_run_sql( namespace: impl Into, querier_connection: Connection, authorization: Option<&str>, -) -> Result, influxdb_iox_client::flight::Error> { +) -> Result<(Vec, SchemaRef), influxdb_iox_client::flight::Error> { let mut client = influxdb_iox_client::flight::Client::new(querier_connection); if let Some(authorization) = authorization { client.add_header("authorization", authorization).unwrap(); @@ -94,11 +94,18 @@ pub async fn try_run_sql( // Normally this would be done one per connection, not per query client.handshake().await?; - client - .sql(namespace.into(), sql_query.into()) - .await? - .try_collect() - .await + let mut stream = client.sql(namespace.into(), sql_query.into()).await?; + + let batches = (&mut stream).try_collect().await?; + + // read schema AFTER collection, otherwise the stream does not have the schema data yet + let schema = stream + .inner() + .schema() + .cloned() + .ok_or(influxdb_iox_client::flight::Error::NoSchema)?; + + Ok((batches, schema)) } /// Runs a InfluxQL query using the flight API on the specified connection. @@ -107,7 +114,7 @@ pub async fn try_run_influxql( namespace: impl Into, querier_connection: Connection, authorization: Option<&str>, -) -> Result, influxdb_iox_client::flight::Error> { +) -> Result<(Vec, SchemaRef), influxdb_iox_client::flight::Error> { let mut client = influxdb_iox_client::flight::Client::new(querier_connection); if let Some(authorization) = authorization { client.add_header("authorization", authorization).unwrap(); @@ -117,11 +124,20 @@ pub async fn try_run_influxql( // Normally this would be done one per connection, not per query client.handshake().await?; - client + let mut stream = client .influxql(namespace.into(), influxql_query.into()) - .await? - .try_collect() - .await + .await?; + + let batches = (&mut stream).try_collect().await?; + + // read schema AFTER collection, otherwise the stream does not have the schema data yet + let schema = stream + .inner() + .schema() + .cloned() + .ok_or(influxdb_iox_client::flight::Error::NoSchema)?; + + Ok((batches, schema)) } /// Runs a SQL query using the flight API on the specified connection. @@ -132,7 +148,7 @@ pub async fn run_sql( namespace: impl Into, querier_connection: Connection, authorization: Option<&str>, -) -> Vec { +) -> (Vec, SchemaRef) { try_run_sql(sql, namespace, querier_connection, authorization) .await .expect("Error executing sql query") @@ -146,7 +162,7 @@ pub async fn run_influxql( namespace: impl Into, querier_connection: Connection, authorization: Option<&str>, -) -> Vec { +) -> (Vec, SchemaRef) { try_run_influxql( influxql.clone(), namespace, diff --git a/test_helpers_end_to_end/src/snapshot_comparison.rs b/test_helpers_end_to_end/src/snapshot_comparison.rs index e18524bfdd..08fa5191c2 100644 --- a/test_helpers_end_to_end/src/snapshot_comparison.rs +++ b/test_helpers_end_to_end/src/snapshot_comparison.rs @@ -1,6 +1,7 @@ mod queries; use crate::{run_sql, snapshot_comparison::queries::TestQueries, try_run_influxql, MiniCluster}; +use arrow::record_batch::RecordBatch; use arrow_flight::error::FlightError; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ @@ -219,7 +220,7 @@ async fn run_query( ) -> Result> { let query_text = query.text(); - let results = match language { + let (mut batches, schema) = match language { Language::Sql => { run_sql( query_text, @@ -248,6 +249,7 @@ async fn run_query( } } }; + batches.push(RecordBatch::new_empty(schema)); - Ok(query.normalize_results(results)) + Ok(query.normalize_results(batches)) } diff --git a/test_helpers_end_to_end/src/steps.rs b/test_helpers_end_to_end/src/steps.rs index 83fee5ae39..3a1d56952e 100644 --- a/test_helpers_end_to_end/src/steps.rs +++ b/test_helpers_end_to_end/src/steps.rs @@ -382,13 +382,14 @@ where Step::Query { sql, expected } => { info!("====Begin running SQL query: {}", sql); // run query - let batches = run_sql( + let (mut batches, schema) = run_sql( sql, state.cluster.namespace(), state.cluster.querier().querier_grpc_connection(), None, ) .await; + batches.push(RecordBatch::new_empty(schema)); assert_batches_sorted_eq!(expected, &batches); info!("====Done running"); } @@ -439,20 +440,21 @@ where } => { info!("====Begin running SQL query (authenticated): {}", sql); // run query - let batches = run_sql( + let (mut batches, schema) = run_sql( sql, state.cluster.namespace(), state.cluster().querier().querier_grpc_connection(), Some(authorization.as_str()), ) .await; + batches.push(RecordBatch::new_empty(schema)); assert_batches_sorted_eq!(expected, &batches); info!("====Done running"); } Step::VerifiedQuery { sql, verify } => { info!("====Begin running SQL verified query: {}", sql); // run query - let batches = run_sql( + let (batches, _schema) = run_sql( sql, state.cluster.namespace(), state.cluster.querier().querier_grpc_connection(), @@ -465,13 +467,14 @@ where Step::InfluxQLQuery { query, expected } => { info!("====Begin running InfluxQL query: {}", query); // run query - let batches = run_influxql( + let (mut batches, schema) = run_influxql( query, state.cluster.namespace(), state.cluster.querier().querier_grpc_connection(), None, ) .await; + batches.push(RecordBatch::new_empty(schema)); assert_batches_sorted_eq!(expected, &batches); info!("====Done running"); } @@ -525,13 +528,14 @@ where } => { info!("====Begin running InfluxQL query: {}", query); // run query - let batches = run_influxql( + let (mut batches, schema) = run_influxql( query, state.cluster.namespace(), state.cluster.querier().querier_grpc_connection(), Some(authorization), ) .await; + batches.push(RecordBatch::new_empty(schema)); assert_batches_sorted_eq!(expected, &batches); info!("====Done running"); }