test: add tests to check InfluxQL over Flight (#24732)

test: add tests to check InfluxQL over Flight
pull/24737/head
Trevor Hilton 2024-03-05 15:41:30 -05:00 committed by GitHub
parent fb4f09d675
commit 971676b498
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 104 additions and 8 deletions

8
Cargo.lock generated
View File

@ -4119,18 +4119,18 @@ dependencies = [
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.4" version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
dependencies = [ dependencies = [
"pin-project-internal", "pin-project-internal",
] ]
[[package]] [[package]]
name = "pin-project-internal" name = "pin-project-internal"
version = "1.1.4" version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -1,6 +1,8 @@
use arrow_flight::sql::SqlInfo; use arrow_flight::sql::SqlInfo;
use arrow_flight::Ticket;
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use influxdb3_client::Precision; use influxdb3_client::Precision;
use test_helpers::assert_contains;
use crate::collect_stream; use crate::collect_stream;
use crate::TestServer; use crate::TestServer;
@ -19,7 +21,7 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
) )
.await?; .await?;
let mut client = server.flight_client("foo").await; let mut client = server.flight_sql_client("foo").await;
// Ad-hoc Query: // Ad-hoc Query:
{ {
@ -133,3 +135,88 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
Ok(()) Ok(())
} }
#[tokio::test]
async fn flight_influxql() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await
.unwrap();
let mut client = server.flight_client().await;
// Ad-hoc query, using qualified measurement name:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SELECT * FROM foo.autogen.cpu",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------------------+--------------------------------+------+---------+-------+",
"| iox::measurement | time | host | region | usage |",
"+------------------+--------------------------------+------+---------+-------+",
"| cpu | 1970-01-01T00:00:00.000000001Z | s1 | us-east | 0.9 |",
"| cpu | 1970-01-01T00:00:00.000000002Z | s1 | us-east | 0.89 |",
"| cpu | 1970-01-01T00:00:00.000000003Z | s1 | us-east | 0.85 |",
"+------------------+--------------------------------+------+---------+-------+",
],
&batches
);
}
// InfluxQL-specific query to show measurements:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SHOW MEASUREMENTS",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------------------+------+",
"| iox::measurement | name |",
"+------------------+------+",
"| measurements | cpu |",
"+------------------+------+",
],
&batches
);
}
// An InfluxQL query that is not supported over Flight:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SHOW DATABASES",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap_err();
assert_contains!(
response.to_string(),
"This feature is not implemented: SHOW DATABASES"
);
}
}

View File

@ -5,7 +5,7 @@ use std::{
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::{decode::FlightRecordBatchStream, FlightClient};
use assert_cmd::cargo::CommandCargoExt; use assert_cmd::cargo::CommandCargoExt;
use futures::TryStreamExt; use futures::TryStreamExt;
use influxdb3_client::Precision; use influxdb3_client::Precision;
@ -58,8 +58,7 @@ impl TestServer {
} }
/// Get a [`FlightSqlClient`] for making requests to the running service over gRPC /// Get a [`FlightSqlClient`] for making requests to the running service over gRPC
#[allow(dead_code)] pub async fn flight_sql_client(&self, database: &str) -> FlightSqlClient {
pub async fn flight_client(&self, database: &str) -> FlightSqlClient {
let channel = tonic::transport::Channel::from_shared(self.client_addr()) let channel = tonic::transport::Channel::from_shared(self.client_addr())
.expect("create tonic channel") .expect("create tonic channel")
.connect() .connect()
@ -71,6 +70,16 @@ impl TestServer {
client client
} }
/// Get a raw [`FlightClient`] for performing Flight actions directly
pub async fn flight_client(&self) -> FlightClient {
let channel = tonic::transport::Channel::from_shared(self.client_addr())
.expect("create tonic channel")
.connect()
.await
.expect("connect to gRPC client");
FlightClient::new(channel)
}
fn kill(&mut self) { fn kill(&mut self) {
self.server_process.kill().expect("kill the server process"); self.server_process.kill().expect("kill the server process");
} }