influxdb/influxdb3/tests/server/flight.rs

230 lines
7.8 KiB
Rust
Raw Normal View History

use arrow_flight::sql::SqlInfo;
use arrow_flight::Ticket;
use arrow_util::assert_batches_sorted_eq;
feat: Add write_lp partial write, name check, and precision (#24677) * feat: Add partial write and name check to write_lp This commit adds new behavior to the v3 write_lp http endpoint by implementing both partial writes and checking the db name for validity. It also sets the partial write behavior as the default now, whereas before we would reject the entire request if one line was incorrect. Users who *do* actually want that behavior can now opt in by putting 'accept_partial=false' into the url of the request. We also check that the db name used in the request contains only numbers, letters, underscores and hyphens and that it must start with either a number or letter. We also introduce a more standardized way to return errors to the user as JSON that we can expand over time to give actionable error messages to the user that they can use to fix their requests. Finally tests have been included to mock out and test the behavior for all of the above so that changes to the error messages are reflected in tests, that both partial and not partial writes work as expected, and that invalid db names are rejected without writing. * feat: Add precision to write_lp http endpoint This commit adds the ability to control the precision of the time stamp passed in to the endpoint. For example if a user chooses 'second' and the timestamp 20 that will be 20 seconds past the Unix Epoch. If they choose 'millisecond' instead it will be 20 milliseconds past the Epoch. Up to this point we assumed that all data passed in was of nanosecond precision. The data is still stored in the database as nanoseconds. Instead upon receiving the data we convert it to nanoseconds. If the precision URL parameter is not specified we default to auto and take a best effort guess at what the user wanted based on the order of magnitude of the data passed in. This change will allow users finer grained control over what precision they want to use for their data as well as trying our best to make a good user experience and having things work as expected and not creating a failure mode whereby a user wanted seconds and instead put in nanoseconds by default.
2024-02-27 16:57:10 +00:00
use influxdb3_client::Precision;
use test_helpers::assert_contains;
use crate::collect_stream;
feat: add the `api/v3/query_influxql` API (#24696) feat: add query_influxql api This PR adds support for the /api/v3/query_influxql API. This re-uses code from the existing query_sql API, but some refactoring was done to allow for code re-use between the two. The main change to the original code from the existing query_sql API was that the format is determined up front, in the event that the user provides some incorrect Accept header, so that the 400 BAD REQUEST is returned before performing the query. Support of several InfluxQL queries that previously required a bridge to be executed in 3.0 was added: SHOW MEASUREMENTS SHOW TAG KEYS SHOW TAG VALUES SHOW FIELD KEYS SHOW DATABASES Handling of qualified measurement names in SELECT queries (see below) This is accomplished with the newly added iox_query_influxql_rewrite crate, which provides the means to re-write an InfluxQL statement to strip out a database name and retention policy, if provided. Doing so allows the query_influxql API to have the database parameter optional, as it may be provided in the query string. Handling qualified measurement names in SELECT The implementation in this PR will inspect all measurements provided in a FROM clause and extract the database (DB) name and retention policy (RP) name (if not the default). If multiple DB/RP's are provided, an error is thrown. Testing E2E tests were added for performing basic queries against a running server on both the query_sql and query_influxql APIs. In addition, the test for query_influxql includes some of the InfluxQL-specific queries, e.g., SHOW MEASUREMENTS. Other Changes The influxdb3_client now has the api_v3_query_influxql method (and a basic test was added for this)
2024-03-01 17:27:38 +00:00
use crate::TestServer;
#[tokio::test]
async fn flight() -> Result<(), influxdb3_client::Error> {
let server = TestServer::spawn().await;
feat: add the `api/v3/query_influxql` API (#24696) feat: add query_influxql api This PR adds support for the /api/v3/query_influxql API. This re-uses code from the existing query_sql API, but some refactoring was done to allow for code re-use between the two. The main change to the original code from the existing query_sql API was that the format is determined up front, in the event that the user provides some incorrect Accept header, so that the 400 BAD REQUEST is returned before performing the query. Support of several InfluxQL queries that previously required a bridge to be executed in 3.0 was added: SHOW MEASUREMENTS SHOW TAG KEYS SHOW TAG VALUES SHOW FIELD KEYS SHOW DATABASES Handling of qualified measurement names in SELECT queries (see below) This is accomplished with the newly added iox_query_influxql_rewrite crate, which provides the means to re-write an InfluxQL statement to strip out a database name and retention policy, if provided. Doing so allows the query_influxql API to have the database parameter optional, as it may be provided in the query string. Handling qualified measurement names in SELECT The implementation in this PR will inspect all measurements provided in a FROM clause and extract the database (DB) name and retention policy (RP) name (if not the default). If multiple DB/RP's are provided, an error is thrown. Testing E2E tests were added for performing basic queries against a running server on both the query_sql and query_influxql APIs. In addition, the test for query_influxql includes some of the InfluxQL-specific queries, e.g., SHOW MEASUREMENTS. Other Changes The influxdb3_client now has the api_v3_query_influxql method (and a basic test was added for this)
2024-03-01 17:27:38 +00:00
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
feat: add the `api/v3/query_influxql` API (#24696) feat: add query_influxql api This PR adds support for the /api/v3/query_influxql API. This re-uses code from the existing query_sql API, but some refactoring was done to allow for code re-use between the two. The main change to the original code from the existing query_sql API was that the format is determined up front, in the event that the user provides some incorrect Accept header, so that the 400 BAD REQUEST is returned before performing the query. Support of several InfluxQL queries that previously required a bridge to be executed in 3.0 was added: SHOW MEASUREMENTS SHOW TAG KEYS SHOW TAG VALUES SHOW FIELD KEYS SHOW DATABASES Handling of qualified measurement names in SELECT queries (see below) This is accomplished with the newly added iox_query_influxql_rewrite crate, which provides the means to re-write an InfluxQL statement to strip out a database name and retention policy, if provided. Doing so allows the query_influxql API to have the database parameter optional, as it may be provided in the query string. Handling qualified measurement names in SELECT The implementation in this PR will inspect all measurements provided in a FROM clause and extract the database (DB) name and retention policy (RP) name (if not the default). If multiple DB/RP's are provided, an error is thrown. Testing E2E tests were added for performing basic queries against a running server on both the query_sql and query_influxql APIs. In addition, the test for query_influxql includes some of the InfluxQL-specific queries, e.g., SHOW MEASUREMENTS. Other Changes The influxdb3_client now has the api_v3_query_influxql method (and a basic test was added for this)
2024-03-01 17:27:38 +00:00
Precision::Nanosecond,
)
.await?;
let mut client = server.flight_sql_client("foo").await;
// Ad-hoc Query:
{
let response = client
.query("SELECT host, region, time, usage FROM cpu")
.await
.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------+---------+--------------------------------+-------+",
"| host | region | time | usage |",
"+------+---------+--------------------------------+-------+",
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
"+------+---------+--------------------------------+-------+",
],
&batches
);
}
// Ad-hoc Query error:
{
let error = client
.query("SELECT * FROM invalid_table")
.await
.unwrap_err();
assert!(error
.to_string()
.contains("table 'public.iox.invalid_table' not found"));
}
// Prepared query:
{
let handle = client
.prepare("SELECT host, region, time, usage FROM cpu".into(), None)
.await
.unwrap();
let stream = client.execute(handle).await.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+------+---------+--------------------------------+-------+",
"| host | region | time | usage |",
"+------+---------+--------------------------------+-------+",
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
"+------+---------+--------------------------------+-------+",
],
&batches
);
}
// Get SQL Infos:
{
let infos = vec![SqlInfo::FlightSqlServerName as u32];
let stream = client.get_sql_info(infos).await.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+-----------+-----------------------------+",
"| info_name | value |",
"+-----------+-----------------------------+",
"| 0 | {string_value=InfluxDB IOx} |",
"+-----------+-----------------------------+",
],
&batches
);
}
// Get Tables
{
type OptStr = std::option::Option<&'static str>;
let stream = client
.get_tables(OptStr::None, OptStr::None, OptStr::None, vec![], false)
.await
.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+--------------+--------------------+-------------+------------+",
"| catalog_name | db_schema_name | table_name | table_type |",
"+--------------+--------------------+-------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
feat: add the `api/v3/query_influxql` API (#24696) feat: add query_influxql api This PR adds support for the /api/v3/query_influxql API. This re-uses code from the existing query_sql API, but some refactoring was done to allow for code re-use between the two. The main change to the original code from the existing query_sql API was that the format is determined up front, in the event that the user provides some incorrect Accept header, so that the 400 BAD REQUEST is returned before performing the query. Support of several InfluxQL queries that previously required a bridge to be executed in 3.0 was added: SHOW MEASUREMENTS SHOW TAG KEYS SHOW TAG VALUES SHOW FIELD KEYS SHOW DATABASES Handling of qualified measurement names in SELECT queries (see below) This is accomplished with the newly added iox_query_influxql_rewrite crate, which provides the means to re-write an InfluxQL statement to strip out a database name and retention policy, if provided. Doing so allows the query_influxql API to have the database parameter optional, as it may be provided in the query string. Handling qualified measurement names in SELECT The implementation in this PR will inspect all measurements provided in a FROM clause and extract the database (DB) name and retention policy (RP) name (if not the default). If multiple DB/RP's are provided, an error is thrown. Testing E2E tests were added for performing basic queries against a running server on both the query_sql and query_influxql APIs. In addition, the test for query_influxql includes some of the InfluxQL-specific queries, e.g., SHOW MEASUREMENTS. Other Changes The influxdb3_client now has the api_v3_query_influxql method (and a basic test was added for this)
2024-03-01 17:27:38 +00:00
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+-------------+------------+",
],
&batches
);
}
// Get Catalogs
{
let stream = client.get_catalogs().await.unwrap();
let batches = collect_stream(stream).await;
assert_batches_sorted_eq!(
[
"+--------------+",
"| catalog_name |",
"+--------------+",
"| public |",
"+--------------+",
],
&batches
);
}
Ok(())
}
#[tokio::test]
async fn flight_influxql() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await
.unwrap();
let mut client = server.flight_client().await;
// Ad-hoc query, using qualified measurement name:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SELECT time, host, region, usage FROM foo.autogen.cpu",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------------------+--------------------------------+------+---------+-------+",
"| iox::measurement | time | host | region | usage |",
"+------------------+--------------------------------+------+---------+-------+",
"| cpu | 1970-01-01T00:00:00.000000001Z | s1 | us-east | 0.9 |",
"| cpu | 1970-01-01T00:00:00.000000002Z | s1 | us-east | 0.89 |",
"| cpu | 1970-01-01T00:00:00.000000003Z | s1 | us-east | 0.85 |",
"+------------------+--------------------------------+------+---------+-------+",
],
&batches
);
}
// InfluxQL-specific query to show measurements:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SHOW MEASUREMENTS",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+------------------+------+",
"| iox::measurement | name |",
"+------------------+------+",
"| measurements | cpu |",
"+------------------+------+",
],
&batches
);
}
// An InfluxQL query that is not supported over Flight:
{
let ticket = Ticket::new(
r#"{
"database": "foo",
"sql_query": "SHOW DATABASES",
"query_type": "influxql"
}"#,
);
let response = client.do_get(ticket).await.unwrap_err();
assert_contains!(
response.to_string(),
"This feature is not implemented: SHOW DATABASES"
);
}
}