225 lines
7.9 KiB
Rust
225 lines
7.9 KiB
Rust
use arrow_flight::sql::SqlInfo;
|
|
use arrow_flight::Ticket;
|
|
use arrow_util::assert_batches_sorted_eq;
|
|
use influxdb3_client::Precision;
|
|
use test_helpers::assert_contains;
|
|
|
|
use crate::collect_stream;
|
|
use crate::TestServer;
|
|
|
|
#[test_log::test(tokio::test)]
|
|
async fn flight() -> Result<(), influxdb3_client::Error> {
|
|
let server = TestServer::spawn().await;
|
|
|
|
server
|
|
.write_lp_to_db(
|
|
"foo",
|
|
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
|
cpu,host=s1,region=us-east usage=0.89 2\n\
|
|
cpu,host=s1,region=us-east usage=0.85 3",
|
|
Precision::Nanosecond,
|
|
)
|
|
.await?;
|
|
|
|
let mut client = server.flight_sql_client("foo").await;
|
|
|
|
// Ad-hoc Query:
|
|
{
|
|
let response = client
|
|
.query("SELECT host, region, time, usage FROM cpu")
|
|
.await
|
|
.unwrap();
|
|
|
|
let batches = collect_stream(response).await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+------+---------+--------------------------------+-------+",
|
|
"| host | region | time | usage |",
|
|
"+------+---------+--------------------------------+-------+",
|
|
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
|
|
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
|
|
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
|
|
"+------+---------+--------------------------------+-------+",
|
|
],
|
|
&batches
|
|
);
|
|
}
|
|
|
|
// Ad-hoc Query error:
|
|
{
|
|
let error = client
|
|
.query("SELECT * FROM invalid_table")
|
|
.await
|
|
.unwrap_err();
|
|
|
|
assert!(error
|
|
.to_string()
|
|
.contains("table 'public.iox.invalid_table' not found"));
|
|
}
|
|
|
|
// Prepared query:
|
|
{
|
|
let handle = client
|
|
.prepare("SELECT host, region, time, usage FROM cpu".into(), None)
|
|
.await
|
|
.unwrap();
|
|
let stream = client.execute(handle).await.unwrap();
|
|
|
|
let batches = collect_stream(stream).await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+------+---------+--------------------------------+-------+",
|
|
"| host | region | time | usage |",
|
|
"+------+---------+--------------------------------+-------+",
|
|
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
|
|
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
|
|
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
|
|
"+------+---------+--------------------------------+-------+",
|
|
],
|
|
&batches
|
|
);
|
|
}
|
|
|
|
// Get SQL Infos:
|
|
{
|
|
let infos = vec![SqlInfo::FlightSqlServerName as u32];
|
|
let stream = client.get_sql_info(infos).await.unwrap();
|
|
let batches = collect_stream(stream).await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+-----------+-----------------------------+",
|
|
"| info_name | value |",
|
|
"+-----------+-----------------------------+",
|
|
"| 0 | {string_value=InfluxDB IOx} |",
|
|
"+-----------+-----------------------------+",
|
|
],
|
|
&batches
|
|
);
|
|
}
|
|
|
|
// Get Tables
|
|
{
|
|
type OptStr = std::option::Option<&'static str>;
|
|
let stream = client
|
|
.get_tables(OptStr::None, OptStr::None, OptStr::None, vec![], false)
|
|
.await
|
|
.unwrap();
|
|
let batches = collect_stream(stream).await;
|
|
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+--------------+--------------------+----------------------------+------------+",
|
|
"| catalog_name | db_schema_name | table_name | table_type |",
|
|
"+--------------+--------------------+----------------------------+------------+",
|
|
"| public | information_schema | columns | VIEW |",
|
|
"| public | information_schema | df_settings | VIEW |",
|
|
"| public | information_schema | schemata | VIEW |",
|
|
"| public | information_schema | tables | VIEW |",
|
|
"| public | information_schema | views | VIEW |",
|
|
"| public | iox | cpu | BASE TABLE |",
|
|
"| public | system | distinct_caches | BASE TABLE |",
|
|
"| public | system | last_caches | BASE TABLE |",
|
|
"| public | system | parquet_files | BASE TABLE |",
|
|
"| public | system | processing_engine_plugins | BASE TABLE |",
|
|
"| public | system | processing_engine_triggers | BASE TABLE |",
|
|
"| public | system | queries | BASE TABLE |",
|
|
"+--------------+--------------------+----------------------------+------------+",
|
|
],
|
|
&batches
|
|
);
|
|
}
|
|
|
|
// Get Catalogs
|
|
{
|
|
let stream = client.get_catalogs().await.unwrap();
|
|
let batches = collect_stream(stream).await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+--------------+",
|
|
"| catalog_name |",
|
|
"+--------------+",
|
|
"| public |",
|
|
"+--------------+",
|
|
],
|
|
&batches
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn flight_influxql() {
|
|
let server = TestServer::spawn().await;
|
|
|
|
server
|
|
.write_lp_to_db(
|
|
"foo",
|
|
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
|
cpu,host=s1,region=us-east usage=0.89 2\n\
|
|
cpu,host=s1,region=us-east usage=0.85 3",
|
|
Precision::Nanosecond,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut client = server.flight_client().await;
|
|
|
|
// Ad-hoc query, using qualified measurement name
|
|
// This is no longer supported in 3.0, see
|
|
// https://github.com/influxdata/influxdb_iox/pull/11254
|
|
{
|
|
let ticket = Ticket::new(
|
|
r#"{
|
|
"database": "foo",
|
|
"sql_query": "SELECT time, host, region, usage FROM foo.autogen.cpu",
|
|
"query_type": "influxql"
|
|
}"#,
|
|
);
|
|
let response = client.do_get(ticket).await.unwrap_err().to_string();
|
|
|
|
assert_contains!(response, "database prefix in qualified measurement syntax");
|
|
}
|
|
|
|
// InfluxQL-specific query to show measurements:
|
|
{
|
|
let ticket = Ticket::new(
|
|
r#"{
|
|
"database": "foo",
|
|
"sql_query": "SHOW MEASUREMENTS",
|
|
"query_type": "influxql"
|
|
}"#,
|
|
);
|
|
let response = client.do_get(ticket).await.unwrap();
|
|
|
|
let batches = collect_stream(response).await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+------------------+------+",
|
|
"| iox::measurement | name |",
|
|
"+------------------+------+",
|
|
"| measurements | cpu |",
|
|
"+------------------+------+",
|
|
],
|
|
&batches
|
|
);
|
|
}
|
|
|
|
// An InfluxQL query that is not supported over Flight:
|
|
{
|
|
let ticket = Ticket::new(
|
|
r#"{
|
|
"database": "foo",
|
|
"sql_query": "SHOW DATABASES",
|
|
"query_type": "influxql"
|
|
}"#,
|
|
);
|
|
let response = client.do_get(ticket).await.unwrap_err();
|
|
|
|
assert_contains!(
|
|
response.to_string(),
|
|
"This feature is not implemented: SHOW DATABASES"
|
|
);
|
|
}
|
|
}
|