influxdb/influxdb3/tests/server/flight.rs

225 lines
7.9 KiB
Rust

use arrow_flight::sql::SqlInfo;
use arrow_flight::Ticket;
use arrow_util::assert_batches_sorted_eq;
use influxdb3_client::Precision;
use test_helpers::assert_contains;
use crate::collect_stream;
use crate::TestServer;
#[test_log::test(tokio::test)]
async fn flight() -> Result<(), influxdb3_client::Error> {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await?;
let mut client = server.flight_sql_client("foo").await;
// Ad-hoc Query:
{
let response = client
.query("SELECT host, region, time, usage FROM cpu")
.await
.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------+---------+--------------------------------+-------+",
"| host | region | time | usage |",
"+------+---------+--------------------------------+-------+",
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
"+------+---------+--------------------------------+-------+",
],
&batches
);
}
// Ad-hoc Query error:
{
let error = client
.query("SELECT * FROM invalid_table")
.await
.unwrap_err();
assert!(error
.to_string()
.contains("table 'public.iox.invalid_table' not found"));
}
// Prepared query:
{
let handle = client
.prepare("SELECT host, region, time, usage FROM cpu".into(), None)
.await
.unwrap();
let stream = client.execute(handle).await.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+------+---------+--------------------------------+-------+",
"| host | region | time | usage |",
"+------+---------+--------------------------------+-------+",
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
"+------+---------+--------------------------------+-------+",
],
&batches
);
}
// Get SQL Infos:
{
let infos = vec![SqlInfo::FlightSqlServerName as u32];
let stream = client.get_sql_info(infos).await.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+-----------+-----------------------------+",
"| info_name | value |",
"+-----------+-----------------------------+",
"| 0 | {string_value=InfluxDB IOx} |",
"+-----------+-----------------------------+",
],
&batches
);
}
// Get Tables
{
type OptStr = std::option::Option<&'static str>;
let stream = client
.get_tables(OptStr::None, OptStr::None, OptStr::None, vec![], false)
.await
.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+--------------+--------------------+----------------------------+------------+",
"| catalog_name | db_schema_name | table_name | table_type |",
"+--------------+--------------------+----------------------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
"| public | system | distinct_caches | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | processing_engine_plugins | BASE TABLE |",
"| public | system | processing_engine_triggers | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+----------------------------+------------+",
],
&batches
);
}
// Get Catalogs
{
let stream = client.get_catalogs().await.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+--------------+",
"| catalog_name |",
"+--------------+",
"| public |",
"+--------------+",
],
&batches
);
}
Ok(())
}
#[tokio::test]
async fn flight_influxql() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await
.unwrap();
let mut client = server.flight_client().await;
// Ad-hoc query, using qualified measurement name
// This is no longer supported in 3.0, see
// https://github.com/influxdata/influxdb_iox/pull/11254
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SELECT time, host, region, usage FROM foo.autogen.cpu",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap_err().to_string();
assert_contains!(response, "database prefix in qualified measurement syntax");
}
// InfluxQL-specific query to show measurements:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SHOW MEASUREMENTS",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------------------+------+",
"| iox::measurement | name |",
"+------------------+------+",
"| measurements | cpu |",
"+------------------+------+",
],
&batches
);
}
// An InfluxQL query that is not supported over Flight:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SHOW DATABASES",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap_err();
assert_contains!(
response.to_string(),
"This feature is not implemented: SHOW DATABASES"
);
}
}