fix: do not loose schema for empty query responses (#7506)

Within our query tests and our CLI, we've used to print out empty
query responses as:

```text
++
++
```

This is pretty misleading. Why are there no columns?! The reason is that
while Flight provides us with schema information, we often have zero
record batches (because why would the querier send an empty batch). Now
lets fix this by creating an empty batch on the client side based on the
schema data we've received. This way, people know that there are columns
but no rows:

```text
+-------+--------+------+------+
| count | system | time | town |
+-------+--------+------+------+
+-------+--------+------+------+
```

An alternative fix would be to pass the schema in addition to
`Vec<RecordBatch>` to the formatting code, but that seemed to be more
effort.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-04-11 16:35:29 +02:00 committed by GitHub
parent b131895fc2
commit f97e1765d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 91 additions and 39 deletions

View File

@ -1,3 +1,4 @@
use arrow::record_batch::RecordBatch;
use clap::ValueEnum; use clap::ValueEnum;
use futures::TryStreamExt; use futures::TryStreamExt;
use influxdb_iox_client::format::influxql::write_columnar; use influxdb_iox_client::format::influxql::write_columnar;
@ -83,14 +84,24 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
query_lang, query_lang,
} = config; } = config;
let query_results = match query_lang { let mut query_results = match query_lang {
QueryLanguage::Sql => client.sql(namespace, query).await, QueryLanguage::Sql => client.sql(namespace, query).await,
QueryLanguage::InfluxQL => client.influxql(namespace, query).await, QueryLanguage::InfluxQL => client.influxql(namespace, query).await,
}?; }?;
// It might be nice to do some sort of streaming write // It might be nice to do some sort of streaming write
// rather than buffering the whole thing. // rather than buffering the whole thing.
let batches: Vec<_> = query_results.try_collect().await?; let mut batches: Vec<_> = (&mut query_results).try_collect().await?;
// read schema AFTER collection, otherwise the stream does not have the schema data yet
let schema = query_results
.inner()
.schema()
.cloned()
.ok_or(influxdb_iox_client::flight::Error::NoSchema)?;
// preserve schema so we print table headers even for empty results
batches.push(RecordBatch::new_empty(schema));
match (query_lang, &format) { match (query_lang, &format) {
(QueryLanguage::InfluxQL, OutputFormat::Pretty) => { (QueryLanguage::InfluxQL, OutputFormat::Pretty) => {

View File

@ -33,7 +33,8 @@ async fn smoke() {
// run query // run query
let sql = format!("select * from {table_name}"); let sql = format!("select * from {table_name}");
let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await; let (batches, _schema) =
run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await;
let expected = [ let expected = [
"+------+------+--------------------------------+-----+", "+------+------+--------------------------------+-----+",
@ -79,7 +80,8 @@ async fn ephemeral_mode() {
// run query // run query
// do not select time becasue it changes every time // do not select time becasue it changes every time
let sql = format!("select tag1, tag2, val from {table_name}"); let sql = format!("select tag1, tag2, val from {table_name}");
let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await; let (batches, _schema) =
run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await;
let expected = [ let expected = [
"+------+------+-----+", "+------+------+-----+",

View File

@ -15,6 +15,7 @@
-- InfluxQL: SELECT * FROM non_existent; -- InfluxQL: SELECT * FROM non_existent;
++ ++
++ ++
++
-- InfluxQL: SELECT *::tag, f64 FROM m0; -- InfluxQL: SELECT *::tag, f64 FROM m0;
-- Results After Sorting -- Results After Sorting
+------------------+----------------------+-------+-------+------+ +------------------+----------------------+-------+-------+------+
@ -359,8 +360,10 @@ Error while planning query: Error during planning: invalid number of arguments f
| m0 | 2022-10-31T02:00:10Z | 21.2 | | m0 | 2022-10-31T02:00:10Z | 21.2 |
+------------------+----------------------+------+ +------------------+----------------------+------+
-- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND str = 1; -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND str = 1;
++ +------------------+------+-----+
++ | iox::measurement | time | f64 |
+------------------+------+-----+
+------------------+------+-----+
-- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19 + 0.5 OR non_existent = 1; -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19 + 0.5 OR non_existent = 1;
+------------------+----------------------+------+ +------------------+----------------------+------+
| iox::measurement | time | f64 | | iox::measurement | time | f64 |
@ -368,11 +371,15 @@ Error while planning query: Error during planning: invalid number of arguments f
| m0 | 2022-10-31T02:00:10Z | 21.2 | | m0 | 2022-10-31T02:00:10Z | 21.2 |
+------------------+----------------------+------+ +------------------+----------------------+------+
-- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND non_existent = 1; -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND non_existent = 1;
++ +------------------+------+-----+
++ | iox::measurement | time | f64 |
+------------------+------+-----+
+------------------+------+-----+
-- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND f64 =~ /foo/; -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 AND f64 =~ /foo/;
++ +------------------+------+-----+
++ | iox::measurement | time | f64 |
+------------------+------+-----+
+------------------+------+-----+
-- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 OR f64 =~ /foo/; -- InfluxQL: SELECT f64 FROM m0 WHERE f64 >= 19.5 OR f64 =~ /foo/;
+------------------+----------------------+------+ +------------------+----------------------+------+
| iox::measurement | time | f64 | | iox::measurement | time | f64 |
@ -667,6 +674,7 @@ Error while planning query: Error during planning: invalid number of arguments f
-- InfluxQL: SELECT MEAN(foo) FROM cpu; -- InfluxQL: SELECT MEAN(foo) FROM cpu;
++ ++
++ ++
++
-- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu; -- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu;
+------------------+----------------------+-----------+--------------------------+ +------------------+----------------------+-----------+--------------------------+
| iox::measurement | time | cpu | mean_usage_idle_mean_foo | | iox::measurement | time | cpu | mean_usage_idle_mean_foo |
@ -686,6 +694,7 @@ Error while planning query: Error during planning: invalid number of arguments f
-- InfluxQL: SELECT MEAN(foo) FROM cpu GROUP BY cpu; -- InfluxQL: SELECT MEAN(foo) FROM cpu GROUP BY cpu;
++ ++
++ ++
++
-- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none); -- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none);
+------------------+----------------------+-------+------+ +------------------+----------------------+-------+------+
| iox::measurement | time | count | sum | | iox::measurement | time | count | sum |
@ -1079,8 +1088,10 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr
| disk | 2022-10-31T02:01:30Z | | disk1s5 | | 3 | | disk | 2022-10-31T02:01:30Z | | disk1s5 | | 3 |
+------------------+----------------------+-----------+---------+-------+---------+ +------------------+----------------------+-----------+---------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null); -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null);
++ +------------------+------+-------+
++ | iox::measurement | time | count |
+------------------+------+-------+
+------------------+------+-------+
-- InfluxQL: SELECT f64 FROM m0 WHERE tag0 = 'val00' LIMIT 3; -- InfluxQL: SELECT f64 FROM m0 WHERE tag0 = 'val00' LIMIT 3;
+------------------+----------------------+------+ +------------------+----------------------+------+
| iox::measurement | time | f64 | | iox::measurement | time | f64 |
@ -1240,8 +1251,10 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr
| cpu | 1970-01-01T00:00:00Z | cpu1 | 2 | | cpu | 1970-01-01T00:00:00Z | cpu1 | 2 |
+------------------+----------------------+-----------+-------+ +------------------+----------------------+-----------+-------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1; -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1;
++ +------------------+------+-----+-------+
++ | iox::measurement | time | cpu | count |
+------------------+------+-----+-------+
+------------------+------+-----+-------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1; -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1;
+------------------+----------------------+-----------+-------+ +------------------+----------------------+-----------+-------+
| iox::measurement | time | cpu | count | | iox::measurement | time | cpu | count |
@ -1251,8 +1264,10 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr
| cpu | 1970-01-01T00:00:00Z | cpu1 | 2 | | cpu | 1970-01-01T00:00:00Z | cpu1 | 2 |
+------------------+----------------------+-----------+-------+ +------------------+----------------------+-----------+-------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1; -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1;
++ +------------------+------+-----+-------+
++ | iox::measurement | time | cpu | count |
+------------------+------+-----+-------+
+------------------+------+-----+-------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:05:00Z' GROUP BY TIME(30s) LIMIT 2; -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:05:00Z' GROUP BY TIME(30s) LIMIT 2;
+------------------+----------------------+-------+ +------------------+----------------------+-------+
| iox::measurement | time | count | | iox::measurement | time | count |

View File

@ -233,8 +233,10 @@
---------- ----------
-- SQL: SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00'); -- SQL: SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
-- Results After Sorting -- Results After Sorting
++ +-------+--------+------+------+
++ | count | system | time | town |
+-------+--------+------+------+
+-------+--------+------+------+
-- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00'); -- SQL: EXPLAIN SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
-- Results After Normalizing UUIDs -- Results After Normalizing UUIDs
---------- ----------

View File

@ -1,5 +1,5 @@
//! Client helpers for writing end to end ng tests //! Client helpers for writing end to end ng tests
use arrow::record_batch::RecordBatch; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, TableId};
use dml::{DmlMeta, DmlWrite}; use dml::{DmlMeta, DmlWrite};
use futures::TryStreamExt; use futures::TryStreamExt;
@ -84,7 +84,7 @@ pub async fn try_run_sql(
namespace: impl Into<String>, namespace: impl Into<String>,
querier_connection: Connection, querier_connection: Connection,
authorization: Option<&str>, authorization: Option<&str>,
) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> { ) -> Result<(Vec<RecordBatch>, SchemaRef), influxdb_iox_client::flight::Error> {
let mut client = influxdb_iox_client::flight::Client::new(querier_connection); let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
if let Some(authorization) = authorization { if let Some(authorization) = authorization {
client.add_header("authorization", authorization).unwrap(); client.add_header("authorization", authorization).unwrap();
@ -94,11 +94,18 @@ pub async fn try_run_sql(
// Normally this would be done one per connection, not per query // Normally this would be done one per connection, not per query
client.handshake().await?; client.handshake().await?;
client let mut stream = client.sql(namespace.into(), sql_query.into()).await?;
.sql(namespace.into(), sql_query.into())
.await? let batches = (&mut stream).try_collect().await?;
.try_collect()
.await // read schema AFTER collection, otherwise the stream does not have the schema data yet
let schema = stream
.inner()
.schema()
.cloned()
.ok_or(influxdb_iox_client::flight::Error::NoSchema)?;
Ok((batches, schema))
} }
/// Runs a InfluxQL query using the flight API on the specified connection. /// Runs a InfluxQL query using the flight API on the specified connection.
@ -107,7 +114,7 @@ pub async fn try_run_influxql(
namespace: impl Into<String>, namespace: impl Into<String>,
querier_connection: Connection, querier_connection: Connection,
authorization: Option<&str>, authorization: Option<&str>,
) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> { ) -> Result<(Vec<RecordBatch>, SchemaRef), influxdb_iox_client::flight::Error> {
let mut client = influxdb_iox_client::flight::Client::new(querier_connection); let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
if let Some(authorization) = authorization { if let Some(authorization) = authorization {
client.add_header("authorization", authorization).unwrap(); client.add_header("authorization", authorization).unwrap();
@ -117,11 +124,20 @@ pub async fn try_run_influxql(
// Normally this would be done one per connection, not per query // Normally this would be done one per connection, not per query
client.handshake().await?; client.handshake().await?;
client let mut stream = client
.influxql(namespace.into(), influxql_query.into()) .influxql(namespace.into(), influxql_query.into())
.await? .await?;
.try_collect()
.await let batches = (&mut stream).try_collect().await?;
// read schema AFTER collection, otherwise the stream does not have the schema data yet
let schema = stream
.inner()
.schema()
.cloned()
.ok_or(influxdb_iox_client::flight::Error::NoSchema)?;
Ok((batches, schema))
} }
/// Runs a SQL query using the flight API on the specified connection. /// Runs a SQL query using the flight API on the specified connection.
@ -132,7 +148,7 @@ pub async fn run_sql(
namespace: impl Into<String>, namespace: impl Into<String>,
querier_connection: Connection, querier_connection: Connection,
authorization: Option<&str>, authorization: Option<&str>,
) -> Vec<RecordBatch> { ) -> (Vec<RecordBatch>, SchemaRef) {
try_run_sql(sql, namespace, querier_connection, authorization) try_run_sql(sql, namespace, querier_connection, authorization)
.await .await
.expect("Error executing sql query") .expect("Error executing sql query")
@ -146,7 +162,7 @@ pub async fn run_influxql(
namespace: impl Into<String>, namespace: impl Into<String>,
querier_connection: Connection, querier_connection: Connection,
authorization: Option<&str>, authorization: Option<&str>,
) -> Vec<RecordBatch> { ) -> (Vec<RecordBatch>, SchemaRef) {
try_run_influxql( try_run_influxql(
influxql.clone(), influxql.clone(),
namespace, namespace,

View File

@ -1,6 +1,7 @@
mod queries; mod queries;
use crate::{run_sql, snapshot_comparison::queries::TestQueries, try_run_influxql, MiniCluster}; use crate::{run_sql, snapshot_comparison::queries::TestQueries, try_run_influxql, MiniCluster};
use arrow::record_batch::RecordBatch;
use arrow_flight::error::FlightError; use arrow_flight::error::FlightError;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::{ use std::{
@ -219,7 +220,7 @@ async fn run_query(
) -> Result<Vec<String>> { ) -> Result<Vec<String>> {
let query_text = query.text(); let query_text = query.text();
let results = match language { let (mut batches, schema) = match language {
Language::Sql => { Language::Sql => {
run_sql( run_sql(
query_text, query_text,
@ -248,6 +249,7 @@ async fn run_query(
} }
} }
}; };
batches.push(RecordBatch::new_empty(schema));
Ok(query.normalize_results(results)) Ok(query.normalize_results(batches))
} }

View File

@ -382,13 +382,14 @@ where
Step::Query { sql, expected } => { Step::Query { sql, expected } => {
info!("====Begin running SQL query: {}", sql); info!("====Begin running SQL query: {}", sql);
// run query // run query
let batches = run_sql( let (mut batches, schema) = run_sql(
sql, sql,
state.cluster.namespace(), state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(), state.cluster.querier().querier_grpc_connection(),
None, None,
) )
.await; .await;
batches.push(RecordBatch::new_empty(schema));
assert_batches_sorted_eq!(expected, &batches); assert_batches_sorted_eq!(expected, &batches);
info!("====Done running"); info!("====Done running");
} }
@ -439,20 +440,21 @@ where
} => { } => {
info!("====Begin running SQL query (authenticated): {}", sql); info!("====Begin running SQL query (authenticated): {}", sql);
// run query // run query
let batches = run_sql( let (mut batches, schema) = run_sql(
sql, sql,
state.cluster.namespace(), state.cluster.namespace(),
state.cluster().querier().querier_grpc_connection(), state.cluster().querier().querier_grpc_connection(),
Some(authorization.as_str()), Some(authorization.as_str()),
) )
.await; .await;
batches.push(RecordBatch::new_empty(schema));
assert_batches_sorted_eq!(expected, &batches); assert_batches_sorted_eq!(expected, &batches);
info!("====Done running"); info!("====Done running");
} }
Step::VerifiedQuery { sql, verify } => { Step::VerifiedQuery { sql, verify } => {
info!("====Begin running SQL verified query: {}", sql); info!("====Begin running SQL verified query: {}", sql);
// run query // run query
let batches = run_sql( let (batches, _schema) = run_sql(
sql, sql,
state.cluster.namespace(), state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(), state.cluster.querier().querier_grpc_connection(),
@ -465,13 +467,14 @@ where
Step::InfluxQLQuery { query, expected } => { Step::InfluxQLQuery { query, expected } => {
info!("====Begin running InfluxQL query: {}", query); info!("====Begin running InfluxQL query: {}", query);
// run query // run query
let batches = run_influxql( let (mut batches, schema) = run_influxql(
query, query,
state.cluster.namespace(), state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(), state.cluster.querier().querier_grpc_connection(),
None, None,
) )
.await; .await;
batches.push(RecordBatch::new_empty(schema));
assert_batches_sorted_eq!(expected, &batches); assert_batches_sorted_eq!(expected, &batches);
info!("====Done running"); info!("====Done running");
} }
@ -525,13 +528,14 @@ where
} => { } => {
info!("====Begin running InfluxQL query: {}", query); info!("====Begin running InfluxQL query: {}", query);
// run query // run query
let batches = run_influxql( let (mut batches, schema) = run_influxql(
query, query,
state.cluster.namespace(), state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(), state.cluster.querier().querier_grpc_connection(),
Some(authorization), Some(authorization),
) )
.await; .await;
batches.push(RecordBatch::new_empty(schema));
assert_batches_sorted_eq!(expected, &batches); assert_batches_sorted_eq!(expected, &batches);
info!("====Done running"); info!("====Done running");
} }