feat: add the `api/v3/query_influxql` API (#24696)

feat: add query_influxql api

This PR adds support for the /api/v3/query_influxql API. This re-uses code from the existing query_sql API, but some refactoring was done to allow for code re-use between the two.

The main change to the original code from the existing query_sql API was that the format is determined up front, in the event that the user provides some incorrect Accept header, so that the 400 BAD REQUEST is returned before performing the query.

Support of several InfluxQL queries that previously required a bridge to be executed in 3.0 was added:

SHOW MEASUREMENTS
SHOW TAG KEYS
SHOW TAG VALUES
SHOW FIELD KEYS
SHOW DATABASES

Handling of qualified measurement names in SELECT queries (see below)

This is accomplished with the newly added iox_query_influxql_rewrite crate, which provides the means to re-write an InfluxQL statement to strip out a database name and retention policy, if provided. Doing so allows the query_influxql API to have the database parameter optional, as it may be provided in the query string.

Handling qualified measurement names in SELECT

The implementation in this PR will inspect all measurements provided in a FROM clause and extract the database (DB) name and retention policy (RP) name (if not the default). If multiple DB/RP's are provided, an error is thrown.

Testing

E2E tests were added for performing basic queries against a running server on both the query_sql and query_influxql APIs. In addition, the test for query_influxql includes some of the InfluxQL-specific queries, e.g., SHOW MEASUREMENTS.

Other Changes

The influxdb3_client now has the api_v3_query_influxql method (and a basic test was added for this)
pull/24703/head
Trevor Hilton 2024-03-01 12:27:38 -05:00 committed by GitHub
parent 3c9e6ed836
commit f7892ebee5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1244 additions and 334 deletions

460
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,7 @@ members = [
"influxdb3_client",
"influxdb3_server",
"influxdb3_write",
"iox_query_influxql_rewrite",
]
default-members = ["influxdb3"]
@ -28,13 +29,13 @@ edition = "2021"
license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "49.0.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "49.0.0"
arrow-buffer = "49.0.0"
arrow-csv = "49.0.0"
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
arrow-json = "49.0.0"
arrow-schema = "49.0.0"
arrow = { version = "50.0.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-json = "50.0.0"
arrow-schema = "50.0.0"
assert_cmd = "2.0.14"
async-trait = "0.1"
backtrace = "0.3"
@ -44,8 +45,8 @@ chrono = "0.4"
clap = { version = "4", features = ["derive", "env", "string"] }
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "0e53c6d816f3a9d3d27c6ebb6d25b1699e5553e7" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "0e53c6d816f3a9d3d27c6ebb6d25b1699e5553e7" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91f3eb2e5430d23e2b551e66732bec1a3a575971" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91f3eb2e5430d23e2b551e66732bec1a3a575971" }
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.28"
@ -57,10 +58,10 @@ hyper = "0.14"
libc = { version = "0.2" }
mockito = { version = "1.2.0", default-features = false }
num_cpus = "1.16.0"
object_store = "0.8.0"
object_store = "0.9.0"
once_cell = { version = "1.18", features = ["parking_lot"] }
parking_lot = "0.12.1"
parquet = { version = "49.0.0", features = ["object_store"] }
parquet = { version = "50.0.0", features = ["object_store"] }
pbjson = "0.6.0"
pbjson-build = "0.6.2"
pbjson-types = "0.6.0"
@ -91,33 +92,36 @@ urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }
# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev ="675113bd298c777ec7c45bdbbf89bd6413da6feb" }
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev ="675113bd298c777ec7c45bdbbf89bd6413da6feb", features = ["future_timeout"] }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "675113bd298c777ec7c45bdbbf89bd6413da6feb", default-features = true, features = ["clap"] }
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "86d72868fd39f7865e97d0b3a66bac29a5f662b2", default-features = true, features = ["clap"] }
[workspace.lints.rust]
rust_2018_idioms = "deny"

View File

@ -84,6 +84,8 @@ arrow-flight.workspace = true
assert_cmd.workspace = true
futures.workspace = true
hyper.workspace = true
pretty_assertions.workspace = true
reqwest.workspace = true
test_helpers.workspace = true
tonic.workspace = true
tower.workspace = true

View File

@ -92,6 +92,7 @@ impl From<Format> for influxdb3_client::Format {
#[derive(Debug, ValueEnum, Clone)]
enum QueryLanguage {
Sql,
Influxql,
}
pub(crate) async fn command(config: Config) -> Result<()> {
@ -116,6 +117,13 @@ pub(crate) async fn command(config: Config) -> Result<()> {
.send()
.await?
}
QueryLanguage::Influxql => {
client
.api_v3_query_influxql(database_name, query)
.format(config.output_format.clone().into())
.send()
.await?
}
};
// write to file if output path specified

View File

@ -38,6 +38,8 @@ async fn auth() {
Command::new(bin_path)
.args([
"serve",
"--object-store",
"memory",
"--bearer-token",
"2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", // foo as a sha256
])

View File

@ -4,24 +4,21 @@ use arrow_util::assert_batches_sorted_eq;
use futures::TryStreamExt;
use influxdb3_client::Precision;
use crate::common::TestServer;
mod common;
use crate::TestServer;
#[tokio::test]
async fn flight() {
let server = TestServer::spawn().await;
// use the influxdb3_client to write in some data
write_lp_to_db(
&server,
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await;
Precision::Nanosecond,
)
.await;
let mut client = server.flight_client("foo").await;
@ -109,6 +106,7 @@ async fn flight() {
"+--------------+--------------------+-------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
@ -135,22 +133,6 @@ async fn flight() {
}
}
async fn write_lp_to_db(
server: &TestServer,
database: &str,
lp: &'static str,
precision: Precision,
) {
let client = influxdb3_client::Client::new(server.client_addr()).unwrap();
client
.api_v3_write_lp(database)
.body(lp)
.precision(precision)
.send()
.await
.unwrap();
}
async fn collect_stream(stream: FlightRecordBatchStream) -> Vec<RecordBatch> {
stream
.try_collect()

View File

@ -5,7 +5,13 @@ use std::{
};
use assert_cmd::cargo::CommandCargoExt;
use influxdb3_client::Precision;
use influxdb_iox_client::flightsql::FlightSqlClient;
use reqwest::Response;
mod auth;
mod flight;
mod query;
/// A running instance of the `influxdb3 serve` process
pub struct TestServer {
@ -25,6 +31,7 @@ impl TestServer {
let command = command
.arg("serve")
.args(["--http-bind", &bind_addr.to_string()])
.args(["--object-store", "memory"])
// TODO - other configuration can be passed through
.stdout(Stdio::null())
.stderr(Stdio::null());
@ -82,6 +89,32 @@ impl Drop for TestServer {
}
}
impl TestServer {
/// Write some line protocol to the server
pub async fn write_lp_to_db(&self, database: &str, lp: &'static str, precision: Precision) {
let client = influxdb3_client::Client::new(self.client_addr()).unwrap();
client
.api_v3_write_lp(database)
.body(lp)
.precision(precision)
.send()
.await
.unwrap();
}
pub async fn api_v3_query_influxql(&self, params: &[(&str, &str)]) -> Response {
self.http_client
.get(format!(
"{base}/api/v3/query_influxql",
base = self.client_addr()
))
.query(params)
.send()
.await
.expect("send /api/v3/query_influxql request to server")
}
}
/// Get an available bind address on localhost
///
/// This binds a [`TcpListener`] to 127.0.0.1:0, which will randomly

View File

@ -0,0 +1,223 @@
use crate::TestServer;
use influxdb3_client::Precision;
use pretty_assertions::assert_eq;
#[tokio::test]
async fn api_v3_query_sql() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await;
let client = reqwest::Client::new();
let resp = client
.get(format!(
"{base}/api/v3/query_sql",
base = server.client_addr()
))
.query(&[
("db", "foo"),
("q", "SELECT * FROM cpu"),
("format", "pretty"),
])
.send()
.await
.unwrap()
.text()
.await
.unwrap();
assert_eq!(
"+------+---------+-------------------------------+-------+\n\
| host | region | time | usage |\n\
+------+---------+-------------------------------+-------+\n\
| s1 | us-east | 1970-01-01T00:00:00.000000001 | 0.9 |\n\
| s1 | us-east | 1970-01-01T00:00:00.000000002 | 0.89 |\n\
| s1 | us-east | 1970-01-01T00:00:00.000000003 | 0.85 |\n\
+------+---------+-------------------------------+-------+",
resp,
);
}
#[tokio::test]
async fn api_v3_query_influxql() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3\n\
mem,host=s1,region=us-east usage=0.5 4\n\
mem,host=s1,region=us-east usage=0.6 5\n\
mem,host=s1,region=us-east usage=0.7 6",
Precision::Nanosecond,
)
.await;
struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
expected: &'a str,
}
let test_cases = [
TestCase {
database: Some("foo"),
query: "SELECT * FROM cpu",
expected:
"+------------------+-------------------------------+------+---------+-------+\n\
| iox::measurement | time | host | region | usage |\n\
+------------------+-------------------------------+------+---------+-------+\n\
| cpu | 1970-01-01T00:00:00.000000001 | s1 | us-east | 0.9 |\n\
| cpu | 1970-01-01T00:00:00.000000002 | s1 | us-east | 0.89 |\n\
| cpu | 1970-01-01T00:00:00.000000003 | s1 | us-east | 0.85 |\n\
+------------------+-------------------------------+------+---------+-------+",
},
TestCase {
database: None,
query: "SELECT * FROM foo.autogen.cpu",
expected:
"+------------------+-------------------------------+------+---------+-------+\n\
| iox::measurement | time | host | region | usage |\n\
+------------------+-------------------------------+------+---------+-------+\n\
| cpu | 1970-01-01T00:00:00.000000001 | s1 | us-east | 0.9 |\n\
| cpu | 1970-01-01T00:00:00.000000002 | s1 | us-east | 0.89 |\n\
| cpu | 1970-01-01T00:00:00.000000003 | s1 | us-east | 0.85 |\n\
+------------------+-------------------------------+------+---------+-------+",
},
TestCase {
database: Some("foo"),
query: "SELECT * FROM cpu, mem",
expected:
"+------------------+-------------------------------+------+---------+-------+\n\
| iox::measurement | time | host | region | usage |\n\
+------------------+-------------------------------+------+---------+-------+\n\
| cpu | 1970-01-01T00:00:00.000000001 | s1 | us-east | 0.9 |\n\
| cpu | 1970-01-01T00:00:00.000000002 | s1 | us-east | 0.89 |\n\
| cpu | 1970-01-01T00:00:00.000000003 | s1 | us-east | 0.85 |\n\
| mem | 1970-01-01T00:00:00.000000004 | s1 | us-east | 0.5 |\n\
| mem | 1970-01-01T00:00:00.000000005 | s1 | us-east | 0.6 |\n\
| mem | 1970-01-01T00:00:00.000000006 | s1 | us-east | 0.7 |\n\
+------------------+-------------------------------+------+---------+-------+",
},
TestCase {
database: Some("foo"),
query: "SHOW MEASUREMENTS",
expected: "+------------------+------+\n\
| iox::measurement | name |\n\
+------------------+------+\n\
| measurements | cpu |\n\
| measurements | mem |\n\
+------------------+------+",
},
TestCase {
database: None,
query: "SHOW MEASUREMENTS ON foo",
expected: "+------------------+------+\n\
| iox::measurement | name |\n\
+------------------+------+\n\
| measurements | cpu |\n\
| measurements | mem |\n\
+------------------+------+",
},
TestCase {
database: Some("foo"),
query: "SHOW FIELD KEYS",
expected: "+------------------+----------+-----------+\n\
| iox::measurement | fieldKey | fieldType |\n\
+------------------+----------+-----------+\n\
| cpu | usage | float |\n\
| mem | usage | float |\n\
+------------------+----------+-----------+",
},
TestCase {
database: None,
query: "SHOW FIELD KEYS ON foo",
expected: "+------------------+----------+-----------+\n\
| iox::measurement | fieldKey | fieldType |\n\
+------------------+----------+-----------+\n\
| cpu | usage | float |\n\
| mem | usage | float |\n\
+------------------+----------+-----------+",
},
TestCase {
database: Some("foo"),
query: "SHOW TAG KEYS",
expected: "+------------------+--------+\n\
| iox::measurement | tagKey |\n\
+------------------+--------+\n\
| cpu | host |\n\
| cpu | region |\n\
| mem | host |\n\
| mem | region |\n\
+------------------+--------+",
},
TestCase {
database: None,
query: "SHOW TAG KEYS ON foo",
expected: "+------------------+--------+\n\
| iox::measurement | tagKey |\n\
+------------------+--------+\n\
| cpu | host |\n\
| cpu | region |\n\
| mem | host |\n\
| mem | region |\n\
+------------------+--------+",
},
TestCase {
database: Some("foo"),
query: "SHOW TAG VALUES WITH KEY = \"host\" WHERE time < 1970-01-02",
expected: "+------------------+------+-------+\n\
| iox::measurement | key | value |\n\
+------------------+------+-------+\n\
| cpu | host | s1 |\n\
| mem | host | s1 |\n\
+------------------+------+-------+",
},
TestCase {
database: None,
query: "SHOW TAG VALUES ON foo WITH KEY = \"host\" WHERE time < 1970-01-02",
expected: "+------------------+------+-------+\n\
| iox::measurement | key | value |\n\
+------------------+------+-------+\n\
| cpu | host | s1 |\n\
| mem | host | s1 |\n\
+------------------+------+-------+",
},
TestCase {
database: None,
query: "SHOW DATABASES",
expected: "+---------------+\n\
| iox::database |\n\
+---------------+\n\
| foo |\n\
+---------------+",
},
];
for t in test_cases {
let mut params = vec![("q", t.query), ("format", "pretty")];
if let Some(db) = t.database {
params.push(("db", db))
}
let resp = server
.api_v3_query_influxql(&params)
.await
.text()
.await
.unwrap();
println!("\n{q}", q = t.query);
println!("{resp}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}

View File

@ -1,4 +1,4 @@
use std::string::FromUtf8Error;
use std::{fmt::Display, string::FromUtf8Error};
use bytes::Bytes;
use reqwest::{Body, IntoUrl, StatusCode};
@ -21,8 +21,12 @@ pub enum Error {
#[error("failed to read the API response bytes: {0}")]
Bytes(#[source] reqwest::Error),
#[error("failed to send /api/v3/query_sql request: {0}")]
QuerySqlSend(#[source] reqwest::Error),
#[error("failed to send /api/v3/query_{kind} request: {source}")]
QuerySend {
kind: QueryKind,
#[source]
source: reqwest::Error,
},
#[error("invalid UTF8 in response: {0}")]
InvalidUtf8(#[from] FromUtf8Error),
@ -128,6 +132,37 @@ impl Client {
) -> QueryRequestBuilder<'_> {
QueryRequestBuilder {
client: self,
kind: QueryKind::Sql,
db: db.into(),
query: query.into(),
format: None,
}
}
/// Compose a request to the `/api/v3/query_influxql` API
///
/// # Example
/// ```no_run
/// # use influxdb3_client::Client;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let response_bytes = client
/// .api_v3_query_influxql("db_name", "SELECT * FROM foo")
/// .send()
/// .await
/// .expect("send query_influxql request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_query_influxql<D: Into<String>, Q: Into<String>>(
&self,
db: D,
query: Q,
) -> QueryRequestBuilder<'_> {
QueryRequestBuilder {
client: self,
kind: QueryKind::InfluxQl,
db: db.into(),
query: query.into(),
format: None,
@ -247,6 +282,7 @@ pub struct NoBody;
#[derive(Debug)]
pub struct QueryRequestBuilder<'c> {
client: &'c Client,
kind: QueryKind,
db: String,
query: String,
format: Option<Format>,
@ -262,15 +298,21 @@ impl<'c> QueryRequestBuilder<'c> {
self
}
/// Send the request to `/api/v3/query_sql`
/// Send the request to `/api/v3/query_sql` or `/api/v3/query_influxql`
pub async fn send(self) -> Result<Bytes> {
let url = self.client.base_url.join("/api/v3/query_sql")?;
let url = match self.kind {
QueryKind::Sql => self.client.base_url.join("/api/v3/query_sql")?,
QueryKind::InfluxQl => self.client.base_url.join("/api/v3/query_influxql")?,
};
let params = QueryParams::from(&self);
let mut req = self.client.http_client.get(url).query(&params);
if let Some(token) = &self.client.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req.send().await.map_err(Error::QuerySqlSend)?;
let resp = req.send().await.map_err(|source| Error::QuerySend {
kind: self.kind,
source,
})?;
let status = resp.status();
let content = resp.bytes().await.map_err(Error::Bytes)?;
@ -303,6 +345,21 @@ impl<'a> From<&'a QueryRequestBuilder<'a>> for QueryParams<'a> {
}
}
#[derive(Debug, Copy, Clone)]
pub enum QueryKind {
Sql,
InfluxQl,
}
impl Display for QueryKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryKind::Sql => write!(f, "sql"),
QueryKind::InfluxQl => write!(f, "influxql"),
}
}
}
/// Output format to request from the server in the `/api/v3/query_sql` API
#[derive(Debug, Serialize, Copy, Clone)]
#[serde(rename_all = "snake_case")]
@ -395,4 +452,40 @@ mod tests {
mock.assert_async().await;
}
#[tokio::test]
async fn api_v3_query_influxql() {
let db = "stats";
let query = "SELECT * FROM foo";
let body = r#"[{"host": "foo", "time": "1990-07-23T06:00:00:000", "val": 1}]"#;
let mut mock_server = Server::new_async().await;
let mock = mock_server
.mock("GET", "/api/v3/query_influxql")
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("db".into(), db.into()),
Matcher::UrlEncoded("q".into(), query.into()),
Matcher::UrlEncoded("format".into(), "json".into()),
]))
.with_status(200)
// TODO - could add content-type header but that may be too brittle
// at the moment
// - this will be JSON Lines at some point
.with_body(body)
.create_async()
.await;
let client = Client::new(mock_server.url()).expect("create client");
let r = client
.api_v3_query_influxql(db, query)
.format(Format::Json)
.send()
.await
.expect("send request to server");
assert_eq!(&r, body);
mock.assert_async().await;
}
}

View File

@ -13,6 +13,8 @@ datafusion_util.workspace = true
influxdb-line-protocol.workspace = true
iox_catalog.workspace = true
iox_query.workspace = true
iox_query_params.workspace = true
iox_query_influxql.workspace = true
iox_time.workspace = true
metric.workspace = true
metric_exporters.workspace = true
@ -27,6 +29,7 @@ tracker.workspace = true
# Local Deps
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
# crates.io Dependencies
arrow.workspace = true

View File

@ -1,16 +1,20 @@
//! HTTP API service implementations for `server`
use crate::{query_executor, QueryKind};
use crate::{CommonServerState, QueryExecutor};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use authz::http::AuthorizationHeaderExtension;
use bytes::{Bytes, BytesMut};
use data_types::NamespaceName;
use datafusion::error::DataFusionError;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use futures::StreamExt;
use datafusion::execution::RecordBatchStream;
use futures::{StreamExt, TryStreamExt};
use hyper::header::ACCEPT;
use hyper::header::AUTHORIZATION;
use hyper::header::CONTENT_ENCODING;
use hyper::header::CONTENT_TYPE;
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_write::persister::TrackedMemoryArrowWriter;
@ -18,8 +22,10 @@ use influxdb3_write::write_buffer::Error as WriteBufferError;
use influxdb3_write::BufferedWriteRequest;
use influxdb3_write::Precision;
use influxdb3_write::WriteBuffer;
use iox_query_influxql_rewrite as rewrite;
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, error, info};
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use sha2::Digest;
@ -27,7 +33,9 @@ use sha2::Sha256;
use std::convert::Infallible;
use std::fmt::Debug;
use std::num::NonZeroI32;
use std::pin::Pin;
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::sync::Arc;
use thiserror::Error;
use unicode_segmentation::UnicodeSegmentation;
@ -111,6 +119,9 @@ pub enum Error {
#[error("serde error: {0}")]
Serde(#[from] serde_urlencoded::de::Error),
#[error("error in query parameters: {0}")]
QueryParams(#[from] QueryParamsError),
/// Arrow error
#[error("arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
@ -139,6 +150,15 @@ pub enum Error {
#[error("serde json error: {0}")]
Influxdb3Write(#[from] influxdb3_write::Error),
#[error("datafusion error: {0}")]
Datafusion(#[from] DataFusionError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("query error: {0}")]
Query(#[from] query_executor::Error),
// Invalid Start Character for a Database Name
#[error("db name did not start with a number or letter")]
DbNameInvalidStartChar,
@ -149,6 +169,26 @@ pub enum Error {
#[error("partial write of line protocol ocurred")]
PartialLpWrite(BufferedWriteRequest),
#[error("error in InfluxQL statement: {0}")]
InfluxqlRewrite(#[from] rewrite::Error),
#[error("must provide only one InfluxQl statement per query")]
InfluxqlSingleStatement,
#[error(
"must specify a 'database' parameter, or provide the database\
in the InfluxQL query, for queries that support ON clauses"
)]
InfluxqlNoDatabase,
#[error(
"provided a database in both the parameters ({param_db}) and\
query string ({query_db}) that do not match, if providing a query\
with an ON clause, you can omit the 'database' parameter from your\
request"
)]
InfluxqlDatabaseMismatch { param_db: String, query_db: String },
}
#[derive(Debug, Error)]
@ -246,6 +286,7 @@ impl<W, Q> HttpApi<W, Q>
where
W: WriteBuffer,
Q: QueryExecutor,
Error: From<<Q as QueryExecutor>::Error>,
{
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
@ -280,119 +321,79 @@ where
}
async fn query_sql(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().ok_or(Error::MissingQueryParams)?;
let params: QuerySqlParams = serde_urlencoded::from_str(query)?;
let QueryParams {
database,
query_str,
format,
} = QueryParams::<String, _>::from_request(&req)?;
println!("query_sql {:?}", params);
info!(%database, %query_str, ?format, "handling query_sql");
let result = self
let stream = self
.query_executor
.query(&params.db, &params.q, None, None)
.await
.unwrap();
.query(&database, &query_str, QueryKind::Sql, None, None)
.await?;
let batches: Vec<RecordBatch> = result
.collect::<Vec<datafusion::common::Result<RecordBatch>>>()
.await
.into_iter()
.map(|b| b.unwrap())
.collect();
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, format.as_content_type())
.body(record_batch_stream_to_body(stream, format).await?)
.map_err(Into::into)
}
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
let batches: Vec<&RecordBatch> = batches.iter().collect();
Ok(Bytes::from(serde_json::to_string(
&arrow_json::writer::record_batches_to_json_rows(batches.as_slice())?,
)?))
async fn query_influxql(&self, req: Request<Body>) -> Result<Response<Body>> {
let QueryParams {
database,
query_str,
format,
} = QueryParams::<Option<String>, _>::from_request(&req)?;
info!(?database, %query_str, ?format, "handling query_influxql");
let mut statements = rewrite::parse_statements(&query_str)?;
if statements.len() != 1 {
return Err(Error::InfluxqlSingleStatement);
}
fn to_csv(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut writer = arrow_csv::writer::Writer::new(Vec::new());
for batch in batches {
writer.write(&batch)?;
}
let statement = statements.pop().unwrap();
Ok(Bytes::from(writer.into_inner()))
}
fn to_pretty(batches: Vec<RecordBatch>) -> Result<Bytes> {
Ok(Bytes::from(format!(
"{}",
pretty::pretty_format_batches(&batches)?
)))
}
fn to_parquet(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer =
TrackedMemoryArrowWriter::try_new(&mut bytes, batches[0].schema(), mem_pool)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(Bytes::from(bytes))
}
enum Format {
Parquet,
Csv,
Pretty,
Json,
Error,
}
let (body, format) = match params.format {
None => match req
.headers()
.get(ACCEPT)
.map(HeaderValue::to_str)
.transpose()?
{
// Accept Headers use the MIME types maintained by IANA here:
// https://www.iana.org/assignments/media-types/media-types.xhtml
// Note parquet hasn't been accepted yet just Arrow, but there
// is the possibility it will be:
// https://issues.apache.org/jira/browse/PARQUET-1889
Some("application/vnd.apache.parquet") => {
(to_parquet(batches)?, Format::Parquet)
let stream = if statement.statement().is_show_databases() {
self.query_executor.show_databases()?
} else {
let database = match (database, statement.resolve_dbrp()) {
(None, None) => return Err(Error::InfluxqlNoDatabase),
(None, Some(s)) | (Some(s), None) => s,
(Some(p), Some(q)) => {
if p == q {
p
} else {
return Err(Error::InfluxqlDatabaseMismatch {
param_db: p,
query_db: q,
});
}
}
Some("text/csv") => (to_csv(batches)?, Format::Csv),
Some("text/plain") => (to_pretty(batches)?, Format::Pretty),
Some("application/json") => (to_json(batches)?, Format::Json),
Some("*/*") | None => (to_json(batches)?, Format::Json),
Some(_) => (Bytes::from("{ \"error\": \"Available mime types are: application/vnd.apache.parquet, text/csv, text/plain, and application/json\" }"), Format::Error),
},
Some(format) => match format.as_str() {
"parquet" => (to_parquet(batches)?, Format::Parquet),
"csv" => (to_csv(batches)?, Format::Csv),
"pretty" => (to_pretty(batches)?, Format::Pretty),
"json" => (to_json(batches)?, Format::Json),
_ => (Bytes::from("{ \"error\": \"Available formats are: parquet, csv, pretty, and json\" }"), Format::Error),
},
};
self.query_executor
.query(
&database,
// TODO - implement an interface that takes the statement directly,
// so we don't need to double down on the parsing
&statement.to_statement().to_string(),
QueryKind::InfluxQl,
None,
None,
)
.await?
};
match format {
Format::Parquet => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/vnd.apache.parquet")
.body(Body::from(body))?),
Format::Csv => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/csv")
.body(Body::from(body))?),
Format::Pretty => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; charset=utf-8")
.body(Body::from(body))?),
Format::Json => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))?),
Format::Error => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(Body::from(body))?),
}
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, format.as_content_type())
.body(record_batch_stream_to_body(stream, format).await?)
.map_err(Into::into)
}
fn health(&self) -> Result<Response<Body>> {
@ -534,10 +535,126 @@ fn validate_db_name(name: &str) -> Result<()> {
}
#[derive(Debug, Deserialize)]
pub(crate) struct QuerySqlParams {
pub(crate) db: String,
pub(crate) q: String,
pub(crate) format: Option<String>,
pub(crate) struct QueryParams<D, F> {
#[serde(rename = "db")]
pub(crate) database: D,
#[serde(rename = "q")]
pub(crate) query_str: String,
pub(crate) format: F,
}
impl<D> QueryParams<D, QueryFormat>
where
D: DeserializeOwned,
{
fn from_request(req: &Request<Body>) -> Result<Self> {
let query = req.uri().query().ok_or(Error::MissingQueryParams)?;
let params = serde_urlencoded::from_str::<QueryParams<D, Option<QueryFormat>>>(query)?;
let format = match params.format {
None => match req.headers().get(ACCEPT).map(HeaderValue::as_bytes) {
// Accept Headers use the MIME types maintained by IANA here:
// https://www.iana.org/assignments/media-types/media-types.xhtml
// Note parquet hasn't been accepted yet just Arrow, but there
// is the possibility it will be:
// https://issues.apache.org/jira/browse/PARQUET-1889
Some(b"application/vnd.apache.parquet") => QueryFormat::Parquet,
Some(b"text/csv") => QueryFormat::Csv,
Some(b"text/plain") => QueryFormat::Pretty,
Some(b"application/json" | b"*/*") | None => QueryFormat::Json,
Some(mime_type) => match String::from_utf8(mime_type.to_vec()) {
Ok(s) => return Err(QueryParamsError::InvalidMimeType(s).into()),
Err(e) => return Err(QueryParamsError::NonUtf8MimeType(e).into()),
},
},
Some(f) => f,
};
Ok(Self {
database: params.database,
query_str: params.query_str,
format,
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum QueryParamsError {
#[error(
"invalid mime type ({0}), available types are \
application/vnd.apache.parquet, text/csv, text/plain, and application/json"
)]
InvalidMimeType(String),
#[error("the mime type specified was not valid UTF8: {0}")]
NonUtf8MimeType(#[from] FromUtf8Error),
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum QueryFormat {
Parquet,
Csv,
Pretty,
Json,
}
impl QueryFormat {
fn as_content_type(&self) -> &str {
match self {
Self::Parquet => "application/vnd.apache.parquet",
Self::Csv => "text/csv",
Self::Pretty => "text/plain; charset=utf-8",
Self::Json => "application/json",
}
}
}
async fn record_batch_stream_to_body(
stream: Pin<Box<dyn RecordBatchStream + Send>>,
format: QueryFormat,
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
let batches: Vec<&RecordBatch> = batches.iter().collect();
Ok(Bytes::from(serde_json::to_string(
&arrow_json::writer::record_batches_to_json_rows(batches.as_slice())?,
)?))
}
fn to_csv(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut writer = arrow_csv::writer::Writer::new(Vec::new());
for batch in batches {
writer.write(&batch)?;
}
Ok(Bytes::from(writer.into_inner()))
}
fn to_pretty(batches: Vec<RecordBatch>) -> Result<Bytes> {
Ok(Bytes::from(format!(
"{}",
pretty::pretty_format_batches(&batches)?
)))
}
fn to_parquet(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer =
TrackedMemoryArrowWriter::try_new(&mut bytes, batches[0].schema(), mem_pool)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(Bytes::from(bytes))
}
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
match format {
QueryFormat::Pretty => to_pretty(batches),
QueryFormat::Parquet => to_parquet(batches),
QueryFormat::Csv => to_csv(batches),
QueryFormat::Json => to_json(batches),
}
.map(Body::from)
}
// This is a hack around the fact that bool default is false not true
@ -553,10 +670,15 @@ pub(crate) struct WriteParams {
pub(crate) precision: Precision,
}
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor>(
pub(crate) async fn route_request<W, Q>(
http_server: Arc<HttpApi<W, Q>>,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
) -> Result<Response<Body>, Infallible>
where
W: WriteBuffer,
Q: QueryExecutor,
Error: From<<Q as QueryExecutor>::Error>,
{
if let Err(e) = http_server.authorize_request(&mut req) {
match e {
AuthorizationError::Unauthorized => {
@ -592,6 +714,9 @@ pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor>(
let response = match (method.clone(), uri.path()) {
(Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await,
(Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await,
(Method::GET | Method::POST, "/api/v3/query_influxql") => {
http_server.query_influxql(req).await
}
(Method::GET, "/health") => http_server.health(),
(Method::GET, "/metrics") => http_server.handle_metrics(),
(Method::GET, "/debug/pprof") => pprof_home(req).await,

View File

@ -125,13 +125,24 @@ pub struct Server<W, Q, P> {
#[async_trait]
pub trait QueryExecutor: QueryNamespaceProvider + Debug + Send + Sync + 'static {
type Error;
async fn query(
&self,
database: &str,
q: &str,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream>;
) -> Result<SendableRecordBatchStream, Self::Error>;
fn show_databases(&self) -> Result<SendableRecordBatchStream, Self::Error>;
}
#[derive(Debug)]
pub enum QueryKind {
Sql,
InfluxQl,
}
impl<W, Q, P> Server<W, Q, P>
@ -165,6 +176,7 @@ pub async fn serve<W, Q, P>(server: Server<W, Q, P>, shutdown: CancellationToken
where
W: WriteBuffer,
Q: QueryExecutor,
http::Error: From<<Q as QueryExecutor>::Error>,
P: Persister,
{
// TODO:

View File

@ -1,12 +1,15 @@
//! module for query executor
use crate::QueryExecutor;
use crate::{QueryExecutor, QueryKind};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_schema::ArrowError;
use async_trait::async_trait;
use data_types::NamespaceId;
use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::common::ParamValues;
use datafusion::common::arrow::array::StringArray;
use datafusion::common::arrow::datatypes::{DataType, Field, Schema as DatafusionSchema};
use datafusion::common::Statistics;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
@ -15,11 +18,12 @@ use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_write::{
catalog::{Catalog, DatabaseSchema},
WriteBuffer,
};
use iox_query::exec::{Executor, ExecutorType, IOxSessionContext};
use iox_query::exec::{Executor, ExecutorType, IOxSessionContext, QueryConfig};
use iox_query::frontend::sql::SqlQueryPlanner;
use iox_query::provider::ProviderBuilder;
use iox_query::query_log::QueryCompletedToken;
@ -28,6 +32,8 @@ use iox_query::query_log::QueryText;
use iox_query::query_log::StateReceived;
use iox_query::QueryNamespaceProvider;
use iox_query::{QueryChunk, QueryChunkData, QueryNamespace};
use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner;
use iox_query_params::StatementParams;
use metric::Registry;
use observability_deps::tracing::{debug, info, trace};
use schema::sort::SortKey;
@ -79,22 +85,29 @@ impl<W: WriteBuffer> QueryExecutorImpl<W> {
#[async_trait]
impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
type Error = Error;
async fn query(
&self,
database: &str,
q: &str,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> crate::Result<SendableRecordBatchStream> {
) -> Result<SendableRecordBatchStream, Self::Error> {
info!("query in executor {}", database);
let db = self
.db(database, span_ctx.child_span("get database"), false)
.await
.ok_or_else(|| crate::Error::DatabaseNotFound {
.map_err(|_| Error::DatabaseNotFound {
db_name: database.to_string(),
})?
.ok_or_else(|| Error::DatabaseNotFound {
db_name: database.to_string(),
})?;
let ctx = db.new_query_context(span_ctx);
// TODO - configure query here?
let ctx = db.new_query_context(span_ctx, Default::default());
let token = db.record_query(
external_span_ctx.as_ref().map(RequestLogContext::ctx),
@ -103,11 +116,20 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
);
info!("plan");
let planner = SqlQueryPlanner::new();
// TODO: Figure out if we want to support parameter values in SQL
// queries
let params = ParamValues::List(Vec::new());
let plan = planner.query(q, params, &ctx).await?;
let params = StatementParams::default();
let plan = match kind {
QueryKind::Sql => {
let planner = SqlQueryPlanner::new();
planner.query(q, params, &ctx).await
}
QueryKind::InfluxQl => {
let planner = InfluxQLQueryPlanner::new();
planner.query(q, params, &ctx).await
}
}
.map_err(Error::QueryPlanning)?;
let token = token.planned(Arc::clone(&plan));
// TODO: Enforce concurrency limit here
@ -121,10 +143,31 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
}
Err(err) => {
token.fail();
Err(err.into())
Err(Error::ExecuteStream(err))
}
}
}
fn show_databases(&self) -> Result<SendableRecordBatchStream, Self::Error> {
let databases = StringArray::from(self.catalog.list_databases());
let schema =
DatafusionSchema::new(vec![Field::new("iox::database", DataType::Utf8, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(databases)])
.map_err(Error::DatabasesToRecordBatch)?;
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("database not found: {db_name}")]
DatabaseNotFound { db_name: String },
#[error("error while planning query: {0}")]
QueryPlanning(#[source] DataFusionError),
#[error("error while executing plan: {0}")]
ExecuteStream(#[source] DataFusionError),
#[error("unable to compose record batches from databases: {0}")]
DatabasesToRecordBatch(#[source] ArrowError),
}
// This implementation is for the Flight service
@ -135,17 +178,21 @@ impl<W: WriteBuffer> QueryNamespaceProvider for QueryExecutorImpl<W> {
name: &str,
span: Option<Span>,
_include_debug_info_tables: bool,
) -> Option<Arc<dyn QueryNamespace>> {
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);
let db_schema = self.catalog.db_schema(name)?;
let db_schema = self.catalog.db_schema(name).ok_or_else(|| {
DataFusionError::External(Box::new(Error::DatabaseNotFound {
db_name: name.into(),
}))
})?;
Some(Arc::new(QueryDatabase::new(
Ok(Some(Arc::new(QueryDatabase::new(
db_schema,
Arc::clone(&self.write_buffer) as _,
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
)))
))))
}
async fn acquire_semaphore(&self, span: Option<Span>) -> InstrumentedAsyncOwnedSemaphorePermit {
@ -241,7 +288,11 @@ impl<B: WriteBuffer> QueryNamespace for QueryDatabase<B> {
)
}
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
fn new_query_context(
&self,
span_ctx: Option<SpanContext>,
_config: Option<&QueryConfig>,
) -> IOxSessionContext {
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),

View File

@ -37,4 +37,5 @@ futures-util.workspace = true
[dev-dependencies]
# Core Crates
arrow_util.workspace = true
pretty_assertions.workspace = true
test_helpers.workspace = true

View File

@ -104,6 +104,10 @@ impl Catalog {
pub fn clone_inner(&self) -> InnerCatalog {
self.inner.read().clone()
}
pub fn list_databases(&self) -> Vec<String> {
self.inner.read().databases.keys().cloned().collect()
}
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]

View File

@ -127,6 +127,7 @@ mod tests {
use arrow_util::assert_batches_eq;
use object_store::memory::InMemory;
use object_store::ObjectStore;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
#[tokio::test]
@ -335,7 +336,7 @@ mod tests {
PersistedSegment {
segment_id,
segment_wal_size_bytes: 201,
segment_parquet_size_bytes: 3398,
segment_parquet_size_bytes: 3458,
segment_row_count: 3,
segment_min_time: 10,
segment_max_time: 20,
@ -350,7 +351,7 @@ mod tests {
parquet_files: vec![ParquetFile {
path: "dbs/db1/cpu/1970-01-01/4294967294.parquet"
.to_string(),
size_bytes: 1690,
size_bytes: 1721,
row_count: 1,
min_time: 10,
max_time: 10,
@ -365,7 +366,7 @@ mod tests {
parquet_files: vec![ParquetFile {
path: "dbs/db1/mem/1970-01-01/4294967294.parquet"
.to_string(),
size_bytes: 1708,
size_bytes: 1737,
row_count: 2,
min_time: 15,
max_time: 20,

View File

@ -0,0 +1,18 @@
[package]
name = "iox_query_influxql_rewrite"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# workspace dependencies:
influxdb_influxql_parser.workspace = true
# crates.io dependencies:
thiserror.workspace = true
[lints]
workspace = true

View File

@ -0,0 +1,158 @@
use std::collections::HashSet;
use influxdb_influxql_parser::{
common::ParseError, identifier::Identifier, parse_statements as parse_internal,
select::MeasurementSelection, show_measurements::ExtendedOnClause, statement::Statement,
};
#[derive(Debug)]
pub struct RewrittenStatement {
database: Option<Identifier>,
retention_policy: Option<Identifier>,
statement: Statement,
}
impl RewrittenStatement {
fn new(statement: Statement) -> Self {
Self {
database: None,
retention_policy: None,
statement,
}
}
fn with_database(mut self, db: Option<Identifier>) -> Self {
self.database = db;
self
}
fn with_retention_policy(mut self, rp: Option<Identifier>) -> Self {
self.retention_policy = rp;
self
}
pub fn database(&self) -> Option<&Identifier> {
self.database.as_ref()
}
pub fn retention_policy(&self) -> Option<&Identifier> {
self.retention_policy.as_ref()
}
pub fn statement(&self) -> &Statement {
&self.statement
}
pub fn to_statement(self) -> Statement {
self.statement
}
pub fn resolve_dbrp(&self) -> Option<String> {
match (&self.database, &self.retention_policy) {
(None, None) | (None, Some(_)) => None,
(Some(db), None) => Some(db.to_string()),
(Some(db), Some(rp)) => {
if rp.as_str() != "autogen" && rp.as_str() != "default" {
Some(format!("{db}/{rp}"))
} else {
Some(db.to_string())
}
}
}
}
}
impl From<RewrittenStatement> for Statement {
fn from(r: RewrittenStatement) -> Self {
r.to_statement()
}
}
impl TryFrom<Statement> for RewrittenStatement {
type Error = Error;
fn try_from(statement: Statement) -> Result<Self, Self::Error> {
match statement {
Statement::ShowMeasurements(mut s) => {
if let Some(on) = s.on.take() {
let (db, rp) = match on {
ExtendedOnClause::Database(db) => (Some(db), None),
ExtendedOnClause::DatabaseRetentionPolicy(db, rp) => (Some(db), Some(rp)),
ExtendedOnClause::AllDatabases
| ExtendedOnClause::AllDatabasesAndRetentionPolicies => {
return Err(Error::MultiDatabase)
}
};
Ok(Self::new(Statement::ShowMeasurements(s))
.with_database(db)
.with_retention_policy(rp))
} else {
Ok(Self::new(Statement::ShowMeasurements(s)))
}
}
Statement::ShowRetentionPolicies(mut s) => {
let identifier = s.database.take().map(Into::into);
Ok(Self::new(Statement::ShowRetentionPolicies(s)).with_database(identifier))
}
Statement::ShowTagKeys(mut s) => {
let identifier = s.database.take().map(Into::into);
Ok(Self::new(Statement::ShowTagKeys(s)).with_database(identifier))
}
Statement::ShowTagValues(mut s) => {
let identifier = s.database.take().map(Into::into);
Ok(Self::new(Statement::ShowTagValues(s)).with_database(identifier))
}
Statement::ShowFieldKeys(mut s) => {
let identifier = s.database.take().map(Into::into);
Ok(Self::new(Statement::ShowFieldKeys(s)).with_database(identifier))
}
Statement::Select(mut s) => {
let mut db_rp_set = HashSet::new();
let from_clause = s
.from
.take()
.into_iter()
.map(|ms| match ms {
MeasurementSelection::Name(mut qn) => {
let db = qn.database.take();
let rp = qn.retention_policy.take();
if db_rp_set.insert((db, rp)) && db_rp_set.len() > 1 {
return Err(Error::MultiDatabase);
}
Ok(MeasurementSelection::Name(qn))
}
// TODO - handle sub-queries?
MeasurementSelection::Subquery(_) => Ok(ms),
})
.collect::<Result<Vec<MeasurementSelection>, Error>>()?;
s.from.replace(from_clause);
let mut result = Self::new(Statement::Select(s));
if let Some((db, rp)) = db_rp_set.into_iter().next() {
result = result.with_database(db).with_retention_policy(rp);
}
Ok(result)
}
Statement::CreateDatabase(_)
| Statement::Delete(_)
| Statement::DropMeasurement(_)
| Statement::Explain(_)
| Statement::ShowDatabases(_) => Ok(Self::new(statement)),
}
}
}
#[derive(Debug, thiserror::Error, Clone)]
pub enum Error {
#[error("can only perform queries on a single database")]
MultiDatabase,
#[error("parsing error: {0}")]
Parse(ParseError),
}
pub fn parse_statements(input: &str) -> Result<Vec<RewrittenStatement>, Error> {
parse_internal(input)
.map_err(Error::Parse)?
.into_iter()
.map(RewrittenStatement::try_from)
.collect::<Result<Vec<RewrittenStatement>, Error>>()
}