feat: support v1 query API GROUP BY semantics (#25845)

This updates the v1 /query API hanlder to handle InfluxDB v1's unique
query response structure when GROUP BY clauses are provided.

The distinction is in the addition of a "tags" field to the emitted series
data that contains a map of the GROUP BY tags along with their distinct
values associated with the data in the "values" field.

This required splitting the QueryExecutor into two query paths for InfluxQL
and SQL, as this allowed for handling InfluxQL query parsing in advance
of query planning.

A set of snapshot tests were added to check that it all works.
pull/25850/head
Trevor Hilton 2025-01-16 11:59:01 -05:00 committed by GitHub
parent 4eccc38129
commit b8a94488b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 2156 additions and 149 deletions

11
Cargo.lock generated
View File

@ -2918,6 +2918,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"datafusion",
"influxdb_influxql_parser",
"iox_query",
"iox_query_params",
"thiserror 1.0.69",
@ -3052,6 +3053,7 @@ dependencies = [
"influxdb3_telemetry",
"influxdb3_wal",
"influxdb3_write",
"influxdb_influxql_parser",
"iox_catalog",
"iox_http",
"iox_query",
@ -3071,6 +3073,7 @@ dependencies = [
"pin-project-lite",
"pretty_assertions",
"pyo3",
"regex",
"schema",
"secrecy",
"serde",
@ -6370,9 +6373,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.42.0"
version = "1.43.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551"
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
dependencies = [
"backtrace",
"bytes",
@ -6399,9 +6402,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.4.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
dependencies = [
"proc-macro2",
"quote",

View File

@ -71,8 +71,8 @@ datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git",
dashmap = "6.1.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.28"
futures-util = "0.3.30"
futures = "0.3.31"
futures-util = "0.3.31"
hashbrown = { version = "0.15.1", features = ["serde"] }
hex = "0.4.3"
http = "0.2.9"
@ -101,6 +101,7 @@ prost-build = "0.12.6"
prost-types = "0.12.6"
proptest = { version = "1", default-features = false, features = ["std"] }
rand = "0.8.5"
regex = "1.11.1"
reqwest = { version = "0.11.27", default-features = false, features = ["rustls-tls", "stream", "json"] }
secrecy = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
@ -117,8 +118,8 @@ sysinfo = "0.30.8"
tempfile = "3.14.0"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.42", features = ["full"] }
tokio-util = "0.7.9"
tokio = { version = "1.43", features = ["full"] }
tokio-util = "0.7.13"
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
tonic-health = "0.11.0"

View File

@ -1,3 +1,5 @@
use core::str;
use crate::TestServer;
use futures::StreamExt;
use hyper::StatusCode;
@ -1582,6 +1584,104 @@ async fn api_v1_query_uri_and_body() {
}
}
#[tokio::test]
async fn api_v1_query_group_by() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"\
bar,t1=a,t2=aa val=1 2998574931\n\
bar,t1=b,t2=aa val=2 2998574932\n\
bar,t1=a,t2=bb val=3 2998574933\n\
bar,t1=b,t2=bb val=4 2998574934",
Precision::Second,
)
.await
.unwrap();
for (chunked, query) in [
(false, "select * from bar group by t1"),
(true, "select * from bar group by t1"),
(false, "select * from bar group by t1, t2"),
(true, "select * from bar group by t1, t2"),
(false, "select * from bar group by /t/"),
(true, "select * from bar group by /t/"),
(false, "select * from bar group by /t[1]/"),
(true, "select * from bar group by /t[1]/"),
(false, "select * from bar group by /t[1,2]/"),
(true, "select * from bar group by /t[1,2]/"),
(false, "select * from bar group by t1, t2, t3"),
(true, "select * from bar group by t1, t2, t3"),
(false, "select * from bar group by *"),
(true, "select * from bar group by *"),
(false, "select * from bar group by /not_a_match/"),
(true, "select * from bar group by /not_a_match/"),
] {
let params = vec![
("db", "foo"),
("q", query),
("chunked", if chunked { "true" } else { "false" }),
];
let stream = server.api_v1_query(&params, None).await.bytes_stream();
let values = stream
.map(|chunk| serde_json::from_slice(&chunk.unwrap()).unwrap())
.collect::<Vec<Value>>()
.await;
// Use a snapshot to assert on the output structure. This deserializes each emitted line as
// as JSON first, then combines and collects them into a Vec<Value> to serialize into a JSON
// array for the snapshot.
insta::with_settings!({
description => format!("query: {query}, chunked: {chunked}"),
}, {
insta::assert_json_snapshot!(values);
});
}
}
#[tokio::test]
async fn api_v1_query_group_by_with_nulls() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"\
bar,t1=a val=1 2998574931\n\
bar val=2 2998574932\n\
bar,t1=a val=3 2998574933\n\
",
Precision::Second,
)
.await
.unwrap();
for (chunked, query) in [
(false, "select * from bar group by t1"),
(true, "select * from bar group by t1"),
] {
let params = vec![
("db", "foo"),
("q", query),
("chunked", if chunked { "true" } else { "false" }),
];
let stream = server.api_v1_query(&params, None).await.bytes_stream();
let values = stream
.map(|chunk| serde_json::from_slice(&chunk.unwrap()).unwrap())
.collect::<Vec<Value>>()
.await;
// Use a snapshot to assert on the output structure. This deserializes each emitted line as
// as JSON first, then combines and collects them into a Vec<Value> to serialize into a JSON
// array for the snapshot.
insta::with_settings!({
description => format!("query: {query}, chunked: {chunked}"),
}, {
insta::assert_json_snapshot!(values);
});
}
}
#[tokio::test]
async fn api_v3_query_sql_distinct_cache() {
let server = TestServer::spawn().await;

View File

@ -0,0 +1,111 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /t[1,2]/, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,88 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, t2, t3, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,115 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, t2, t3, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb",
"t3": ""
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,84 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by *, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,111 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by *, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,41 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /not_a_match/, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"values": [
[
"2065-01-07T17:28:51Z",
1.0
],
[
"2065-01-07T17:28:52Z",
2.0
],
[
"2065-01-07T17:28:53Z",
3.0
],
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,41 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /not_a_match/, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"values": [
[
"2065-01-07T17:28:51Z",
1.0
],
[
"2065-01-07T17:28:52Z",
2.0
],
[
"2065-01-07T17:28:53Z",
3.0
],
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,71 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"t2",
"val"
],
"name": "bar",
"tags": {
"t1": "a"
},
"values": [
[
"2065-01-07T17:28:51Z",
"aa",
1.0
],
[
"2065-01-07T17:28:53Z",
"bb",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"t2",
"val"
],
"name": "bar",
"tags": {
"t1": "b"
},
"values": [
[
"2065-01-07T17:28:52Z",
"aa",
2.0
],
[
"2065-01-07T17:28:54Z",
"bb",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,84 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, t2, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,111 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, t2, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,84 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /t/, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,111 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /t/, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,56 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /t[1]/, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
],
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
],
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,65 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by /t[1]/, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
],
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
],
[
"2065-01-07T17:28:54Z",
4.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,84 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by *, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:54Z",
4.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "b",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "bb"
},
"values": [
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a",
"t2": "aa"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,62 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"t2",
"val"
],
"name": "bar",
"tags": {
"t1": "b"
},
"values": [
[
"2065-01-07T17:28:52Z",
"aa",
2.0
],
[
"2065-01-07T17:28:54Z",
"bb",
4.0
]
]
},
{
"columns": [
"time",
"t2",
"val"
],
"name": "bar",
"tags": {
"t1": "a"
},
"values": [
[
"2065-01-07T17:28:51Z",
"aa",
1.0
],
[
"2065-01-07T17:28:53Z",
"bb",
3.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,61 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, chunked: true"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": ""
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
},
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
],
[
"2065-01-07T17:28:53Z",
3.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -0,0 +1,52 @@
---
source: influxdb3/tests/server/query.rs
description: "query: select * from bar group by t1, chunked: false"
expression: values
---
[
{
"results": [
{
"series": [
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": "a"
},
"values": [
[
"2065-01-07T17:28:51Z",
1.0
],
[
"2065-01-07T17:28:53Z",
3.0
]
]
},
{
"columns": [
"time",
"val"
],
"name": "bar",
"tags": {
"t1": ""
},
"values": [
[
"2065-01-07T17:28:52Z",
2.0
]
]
}
],
"statement_id": 0
}
]
}
]

View File

@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
# Core Crates
influxdb_influxql_parser.workspace = true
iox_query.workspace = true
iox_query_params.workspace = true
trace.workspace = true

View File

@ -2,6 +2,7 @@ use async_trait::async_trait;
use datafusion::arrow::error::ArrowError;
use datafusion::common::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use influxdb_influxql_parser::statement::Statement;
use iox_query::query_log::QueryLogEntries;
use iox_query::{QueryDatabase, QueryNamespace};
use iox_query_params::StatementParams;
@ -30,12 +31,21 @@ pub enum QueryExecutorError {
#[async_trait]
pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static {
async fn query(
async fn query_sql(
&self,
database: &str,
q: &str,
params: Option<StatementParams>,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError>;
async fn query_influxql(
&self,
database_name: &str,
query_str: &str,
influxql_statement: Statement,
params: Option<StatementParams>,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError>;
@ -54,21 +64,6 @@ pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static {
fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)>;
}
#[derive(Debug, Clone, Copy)]
pub enum QueryKind {
Sql,
InfluxQl,
}
impl QueryKind {
pub fn query_type(&self) -> &'static str {
match self {
Self::Sql => "sql",
Self::InfluxQl => "influxql",
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct UnimplementedQueryExecutor;
@ -97,27 +92,34 @@ impl QueryDatabase for UnimplementedQueryExecutor {
#[async_trait]
impl QueryExecutor for UnimplementedQueryExecutor {
async fn query(
async fn query_sql(
&self,
_database: &str,
_q: &str,
_params: Option<StatementParams>,
_kind: QueryKind,
_span_ctx: Option<SpanContext>,
_external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::DatabaseNotFound {
db_name: "unimplemented".to_string(),
})
Err(QueryExecutorError::MethodNotImplemented("query_sql"))
}
async fn query_influxql(
&self,
_database_name: &str,
_query_str: &str,
_influxql_statement: Statement,
_params: Option<StatementParams>,
_span_ctx: Option<SpanContext>,
_external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::MethodNotImplemented("query_influxql"))
}
fn show_databases(
&self,
_include_deleted: bool,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::DatabaseNotFound {
db_name: "unimplemented".to_string(),
})
Err(QueryExecutorError::MethodNotImplemented("show_databases"))
}
async fn show_retention_policies(
@ -125,9 +127,9 @@ impl QueryExecutor for UnimplementedQueryExecutor {
_database: Option<&str>,
_span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::DatabaseNotFound {
db_name: "unimplemented".to_string(),
})
Err(QueryExecutorError::MethodNotImplemented(
"show_retention_policies",
))
}
fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)> {

View File

@ -8,7 +8,7 @@ use futures::TryStreamExt;
use hashbrown::HashMap;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_id::TableId;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::{FieldData, WriteBatch};
use iox_query_params::StatementParams;
use observability_deps::tracing::{error, info, warn};
@ -139,14 +139,7 @@ impl PyPluginCallApi {
// Spawn the async task
let handle = tokio::spawn(async move {
let res = query_executor
.query(
db_schema_name.as_ref(),
&query,
params,
QueryKind::Sql,
None,
None,
)
.query_sql(db_schema_name.as_ref(), &query, params, None, None)
.await
.map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?;

View File

@ -11,6 +11,7 @@ authz.workspace = true
data_types.workspace = true
datafusion_util.workspace = true
influxdb-line-protocol.workspace = true
influxdb_influxql_parser.workspace = true
iox_catalog.workspace = true
iox_http.workspace = true
iox_query.workspace = true
@ -67,6 +68,7 @@ mime.workspace = true
object_store.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@ -23,7 +23,7 @@ use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, MaxAge, MaxCardinality};
use influxdb3_cache::last_cache;
use influxdb3_catalog::catalog::Error as CatalogError;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError};
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
use influxdb3_processing_engine::manager::ProcessingEngineManager;
use influxdb3_wal::{PluginType, TriggerSpecificationDefinition};
@ -32,6 +32,8 @@ use influxdb3_write::write_buffer::Error as WriteBufferError;
use influxdb3_write::BufferedWriteRequest;
use influxdb3_write::Precision;
use influxdb3_write::WriteBuffer;
use influxdb_influxql_parser::select::GroupByClause;
use influxdb_influxql_parser::statement::Statement;
use iox_http::write::single_tenant::SingleTenantRequestUnifier;
use iox_http::write::v1::V1_NAMESPACE_RP_SEPARATOR;
use iox_http::write::{WriteParseError, WriteRequestUnifier};
@ -547,7 +549,7 @@ where
let stream = self
.query_executor
.query(&database, &query_str, params, QueryKind::Sql, None, None)
.query_sql(&database, &query_str, params, None, None)
.await?;
Response::builder()
@ -567,7 +569,7 @@ where
info!(?database, %query_str, ?format, "handling query_influxql");
let stream = self
let (stream, _) = self
.query_influxql_inner(database, &query_str, params)
.await?;
@ -733,7 +735,7 @@ where
database: Option<String>,
query_str: &str,
params: Option<StatementParams>,
) -> Result<SendableRecordBatchStream> {
) -> Result<(SendableRecordBatchStream, Option<GroupByClause>)> {
let mut statements = rewrite::parse_statements(query_str)?;
if statements.len() != 1 {
@ -756,31 +758,29 @@ where
}
};
if statement.statement().is_show_databases() {
self.query_executor.show_databases(true)
} else if statement.statement().is_show_retention_policies() {
let statement = statement.to_statement();
let group_by = match &statement {
Statement::Select(select_statement) => select_statement.group_by.clone(),
_ => None,
};
let stream = if statement.is_show_databases() {
self.query_executor.show_databases(true)?
} else if statement.is_show_retention_policies() {
self.query_executor
.show_retention_policies(database.as_deref(), None)
.await
.await?
} else {
let Some(database) = database else {
return Err(Error::InfluxqlNoDatabase);
};
self.query_executor
.query(
&database,
// TODO - implement an interface that takes the statement directly,
// so we don't need to double down on the parsing
&statement.to_statement().to_string(),
params,
QueryKind::InfluxQl,
None,
None,
)
.await
}
.map_err(Into::into)
.query_influxql(&database, query_str, statement, params, None, None)
.await?
};
Ok((stream, group_by))
}
/// Create a new distinct value cache given the [`DistinctCacheCreateRequest`] arguments in the request

View File

@ -1,5 +1,6 @@
use std::{
collections::{HashMap, VecDeque},
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
task::{Context, Poll},
@ -17,15 +18,18 @@ use arrow::{
record_batch::RecordBatch,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
use chrono::{format::SecondsFormat, DateTime};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{ready, stream::Fuse, Stream, StreamExt};
use hyper::http::HeaderValue;
use hyper::{header::ACCEPT, header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use influxdb_influxql_parser::select::{Dimension, GroupByClause};
use iox_time::TimeProvider;
use observability_deps::tracing::info;
use schema::{INFLUXQL_MEASUREMENT_COLUMN_NAME, TIME_COLUMN_NAME};
use regex::Regex;
use schema::{InfluxColumnType, INFLUXQL_MEASUREMENT_COLUMN_NAME, TIME_COLUMN_NAME};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -73,9 +77,9 @@ where
// TODO - Currently not supporting parameterized queries, see
// https://github.com/influxdata/influxdb/issues/24805
let stream = self.query_influxql_inner(database, &query, None).await?;
let stream =
QueryResponseStream::new(0, stream, chunk_size, format, epoch).map_err(QueryError)?;
let (stream, group_by) = self.query_influxql_inner(database, &query, None).await?;
let stream = QueryResponseStream::new(0, stream, chunk_size, format, epoch, group_by)
.map_err(QueryError)?;
let body = Body::wrap_stream(stream);
Ok(Response::builder()
@ -249,7 +253,7 @@ enum Precision {
/// [`anyhow::Error`] is used as a catch-all because if anything fails during
/// that process it will result in a 500 INTERNAL ERROR.
#[derive(Debug, thiserror::Error)]
#[error("unexpected query error: {0}")]
#[error("unexpected query error: {0:#}")]
pub struct QueryError(#[from] anyhow::Error);
/// The response structure returned by the v1 query API
@ -354,6 +358,8 @@ struct StatementResponse {
#[derive(Debug, Serialize)]
struct Series {
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
tags: Option<HashMap<String, Option<String>>>,
columns: Vec<String>,
values: Vec<Row>,
}
@ -362,14 +368,29 @@ struct Series {
#[derive(Debug, Serialize)]
struct Row(Vec<Value>);
impl Deref for Row {
type Target = Vec<Value>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Row {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// A buffer for storing records from a stream of [`RecordBatch`]es
///
/// The optional `size` indicates whether this is operating in `chunked` mode (see
/// [`QueryResponseStream`]), and when specified, gives the size of chunks that will
/// be emitted.
#[derive(Debug)]
struct ChunkBuffer {
size: Option<usize>,
series: VecDeque<(String, Vec<Row>)>,
series: VecDeque<(String, BufferGroupByTagSet, Vec<Row>)>,
}
impl ChunkBuffer {
@ -382,27 +403,80 @@ impl ChunkBuffer {
/// Get the name of the current measurement [`Series`] being streamed
fn current_measurement_name(&self) -> Option<&str> {
self.series.front().map(|(n, _)| n.as_str())
self.series.front().map(|(n, _, _)| n.as_str())
}
/// For queries that produce multiple [`Series`], this will be called when
/// the current series is completed streaming
fn push_next_measurement<S: Into<String>>(&mut self, name: S) {
self.series.push_front((name.into(), vec![]));
self.series.push_front((name.into(), None, vec![]));
}
/// Push a new [`Row`] into the current measurement [`Series`]
fn push_row(&mut self, row: Row) -> Result<(), anyhow::Error> {
self.series
.front_mut()
.context("tried to push row with no measurements buffered")?
.1
.push(row);
///
/// If the stream is producing tags that are part of a `GROUP BY` clause, then `group_by` should
/// hold a map of those tag keys to tag values for the given row.
fn push_row(
&mut self,
group_by: Option<HashMap<String, Option<String>>>,
row: Row,
) -> Result<(), anyhow::Error> {
let (_, tags, rows) = self
.series
.front()
.context("tried to push row with no measurements buffered")?;
// Usually series are split on the measurement name. This functin is not concerned with
// that split, as the caller does that. However, if we are processing a query with a `GROUP BY`
// clause, then we make the decision here. If the incoming `group_by` tag key/value pairs do
// not match the those for the current row set, then we need to start a new entry in the
// `series` on the chunk buffer.
use BufferGroupByDecision::*;
let group_by_decision = match (tags, &group_by) {
(None, None) => NotAGroupBy,
(Some(tags), Some(group_by)) => {
if group_by.len() == tags.len() {
if group_by == tags {
NewRowInExistingSet
} else {
NewSet
}
} else {
bail!(
"group by columns in query result and chunk buffer are not the same size"
);
}
}
(None, Some(_)) => {
if rows.is_empty() {
FirstRowInSeries
} else {
bail!("received inconsistent group by tags in query result");
}
}
(Some(_), None) => bail!(
"chunk buffer expects group by tags but none were present in the query result"
),
};
match group_by_decision {
NotAGroupBy | NewRowInExistingSet => self.series.front_mut().unwrap().2.push(row),
FirstRowInSeries => {
let (_, tags, rows) = self.series.front_mut().unwrap();
*tags = group_by;
rows.push(row);
}
NewSet => {
let name = self.series.front().unwrap().0.clone();
self.series.push_front((name, group_by, vec![row]));
}
}
Ok(())
}
/// Flush a single chunk from the [`ChunkBuffer`], if possible
fn flush_one(&mut self) -> Option<(String, Vec<Row>)> {
fn flush_one(&mut self) -> Option<(String, BufferGroupByTagSet, Vec<Row>)> {
if !self.can_flush() {
return None;
}
@ -411,23 +485,23 @@ impl ChunkBuffer {
if self
.series
.back()
.is_some_and(|(_, rows)| rows.len() <= size)
.is_some_and(|(_, _, rows)| rows.len() <= size)
{
// the back series is smaller than the chunk size, so we just
// pop and take the whole thing:
self.series.pop_back()
} else {
// only drain a chunk's worth from the back series:
self.series
.back_mut()
.map(|(name, rows)| (name.to_owned(), rows.drain(..size).collect()))
self.series.back_mut().map(|(name, tags, rows)| {
(name.to_owned(), tags.clone(), rows.drain(..size).collect())
})
}
}
/// The [`ChunkBuffer`] is operating in chunked mode, and can flush a chunk
fn can_flush(&self) -> bool {
if let (Some(size), Some(m)) = (self.size, self.series.back()) {
m.1.len() >= size || self.series.len() > 1
m.2.len() >= size || self.series.len() > 1
} else {
false
}
@ -439,6 +513,24 @@ impl ChunkBuffer {
}
}
/// Convenience type for representing an optional map of tag name to optional tag values
type BufferGroupByTagSet = Option<HashMap<String, Option<String>>>;
/// Decide how to handle an incoming set of `GROUP BY` tag key value pairs when pushing a row into
/// the `ChunkBuffer`
enum BufferGroupByDecision {
/// The query is not using a `GROUP BY` with tags
NotAGroupBy,
/// This is the first time a row has been pushed to the series with this `GROUP BY` tag
/// key/value combination
FirstRowInSeries,
/// Still adding rows to the current set of `GROUP BY` tag key/value pairs
NewRowInExistingSet,
/// The incoming set of `GROUP BY` tag key/value pairs do not match, so we need to start a
/// new row set in the series.
NewSet,
}
/// The state of the [`QueryResponseStream`]
enum State {
/// The initial state of the stream; no query results have been streamed
@ -473,7 +565,7 @@ impl State {
struct QueryResponseStream {
buffer: ChunkBuffer,
input: Fuse<SendableRecordBatchStream>,
column_map: HashMap<String, usize>,
column_map: ColumnMap,
statement_id: usize,
format: QueryFormat,
epoch: Option<Precision>,
@ -490,22 +582,11 @@ impl QueryResponseStream {
chunk_size: Option<usize>,
format: QueryFormat,
epoch: Option<Precision>,
group_by_clause: Option<GroupByClause>,
) -> Result<Self, anyhow::Error> {
let buffer = ChunkBuffer::new(chunk_size);
let schema = input.schema();
let column_map = schema
.fields
.iter()
.map(|f| f.name().to_owned())
.enumerate()
.flat_map(|(i, n)| {
if n != INFLUXQL_MEASUREMENT_COLUMN_NAME && i > 0 {
Some((n, i - 1))
} else {
None
}
})
.collect();
let buffer = ChunkBuffer::new(chunk_size);
let column_map = ColumnMap::new(schema, group_by_clause)?;
Ok(Self {
buffer,
column_map,
@ -543,7 +624,8 @@ impl QueryResponseStream {
let schema = batch.schema();
for row_index in 0..batch.num_rows() {
let mut row = vec![Value::Null; column_map.len()];
let mut row = vec![Value::Null; column_map.row_size()];
let mut tags = None;
for (col_index, column) in columns.iter().enumerate() {
let field = schema.field(col_index);
@ -577,32 +659,43 @@ impl QueryResponseStream {
cell_value = convert_ns_epoch(cell_value, precision)?
}
}
let col_position = column_map
.get(column_name)
.context("failed to retrieve column position")?;
row[*col_position] = cell_value;
if let Some(index) = column_map.as_row_index(column_name) {
row[index] = cell_value;
} else if column_map.is_group_by_tag(column_name) {
let tag_val = match cell_value {
Value::Null => None,
Value::String(s) => Some(s),
other => bail!(
"tag column {column_name} expected as a string or null, got {other:?}"
),
};
tags.get_or_insert_with(HashMap::new)
.insert(column_name.to_string(), tag_val);
} else if column_map.is_orphan_group_by_tag(column_name) {
tags.get_or_insert_with(HashMap::new)
.insert(column_name.to_string(), Some(String::default()));
} else {
bail!("failed to retrieve column position for column with name {column_name}");
}
}
self.buffer.push_row(Row(row))?;
self.buffer.push_row(tags.take(), Row(row))?;
}
Ok(())
}
fn columns(&self) -> Vec<String> {
let mut columns = vec!["".to_string(); self.column_map.len()];
self.column_map
.iter()
.for_each(|(k, i)| k.clone_into(&mut columns[*i]));
columns
fn column_names(&self) -> Vec<String> {
self.column_map.row_column_names()
}
/// Flush a single chunk, or time series, when operating in chunked mode
fn flush_one(&mut self) -> QueryResponse {
let columns = self.columns();
let columns = self.column_names();
// this unwrap is okay because we only ever call flush_one
// after calling can_flush on the buffer:
let (name, values) = self.buffer.flush_one().unwrap();
let (name, tags, values) = self.buffer.flush_one().unwrap();
let series = vec![Series {
name,
tags,
columns,
values,
}];
@ -618,13 +711,14 @@ impl QueryResponseStream {
/// Flush the entire buffer
fn flush_all(&mut self) -> QueryResponse {
let columns = self.columns();
let columns = self.column_names();
let series = self
.buffer
.series
.drain(..)
.map(|(name, values)| Series {
.map(|(name, tags, values)| Series {
name,
tags,
columns: columns.clone(),
values,
})
@ -771,6 +865,191 @@ fn cast_column_value(column: &ArrayRef, row_index: usize) -> Result<Value, anyho
Ok(value)
}
/// Map column names to their respective [`ColumnType`]
struct ColumnMap {
/// The map of column names to column types
map: HashMap<String, ColumnType>,
/// How many columns are in the `values` set, i.e., that are not `GROUP BY` tags
row_size: usize,
}
/// A column's type in the context of a v1 /query API response
enum ColumnType {
/// A value to be included in the `series.[].values` array, at the given `index`
Value { index: usize },
/// A tag that is part of the `GROUP BY` clause, either explicitly, or by a regex/wildcard match
/// and is included in the `series.[].tags` map
GroupByTag,
/// This is a case where a GROUP BY clause contains a field which doesn't exist in the table
///
/// For example,
/// ```text
/// select * from foo group by t1, t2
/// ```
/// If `t1` is a tag in the table, but `t2` is not, nor is a field in the table, then the v1
/// /query API response will include `t2` in the `series.[].tags` property in the results with
/// an empty string for a value (`""`).
OrphanGroupByTag,
}
impl ColumnMap {
/// Create a new `ColumnMap`
fn new(
schema: SchemaRef,
group_by_clause: Option<GroupByClause>,
) -> Result<Self, anyhow::Error> {
let mut map = HashMap::new();
let group_by = if let Some(clause) = group_by_clause {
GroupByEval::from_clause(clause)?
} else {
None
};
let mut index = 0;
for field in schema
.fields()
.into_iter()
.filter(|f| f.name() != INFLUXQL_MEASUREMENT_COLUMN_NAME)
{
if group_by
.as_ref()
.is_some_and(|gb| is_tag(field) && gb.evaluate_tag(field.name()))
{
map.insert(field.name().to_string(), ColumnType::GroupByTag);
} else if group_by.as_ref().is_some_and(|gb| {
field.metadata().is_empty() && gb.contains_explicit_col_name(field.name())
}) {
map.insert(field.name().to_string(), ColumnType::OrphanGroupByTag);
} else {
map.insert(field.name().to_string(), ColumnType::Value { index });
index += 1;
}
}
Ok(Self {
map,
row_size: index,
})
}
fn row_size(&self) -> usize {
self.row_size
}
fn row_column_names(&self) -> Vec<String> {
let mut result = vec![None; self.row_size];
self.map.iter().for_each(|(name, c)| {
if let ColumnType::Value { index } = c {
result[*index].replace(name.to_owned());
}
});
result.into_iter().flatten().collect()
}
/// If this column is part of the `values` row data, get its index, or `None` otherwise
fn as_row_index(&self, column_name: &str) -> Option<usize> {
self.map.get(column_name).and_then(|col| match col {
ColumnType::Value { index } => Some(*index),
ColumnType::GroupByTag | ColumnType::OrphanGroupByTag => None,
})
}
/// This column is a `GROUP BY` tag
fn is_group_by_tag(&self, column_name: &str) -> bool {
self.map
.get(column_name)
.is_some_and(|col| matches!(col, ColumnType::GroupByTag))
}
/// This column is an orphan `GROUP BY` tag
fn is_orphan_group_by_tag(&self, column_name: &str) -> bool {
self.map
.get(column_name)
.is_some_and(|col| matches!(col, ColumnType::OrphanGroupByTag))
}
}
// TODO: this is defined in schema crate, so needs to be made pub there:
const COLUMN_METADATA_KEY: &str = "iox::column::type";
/// Decide based on metadata if this [`Field`] is a tag column
fn is_tag(field: &Arc<Field>) -> bool {
field
.metadata()
.get(COLUMN_METADATA_KEY)
.map(|s| InfluxColumnType::try_from(s.as_str()))
.transpose()
.ok()
.flatten()
.is_some_and(|t| matches!(t, InfluxColumnType::Tag))
}
/// Derived from a [`GroupByClause`] and used to evaluate whether a given tag column is part of the
/// `GROUP BY` clause in an InfluxQL query
struct GroupByEval(Vec<GroupByEvalType>);
/// The kind of `GROUP BY` evaluator
enum GroupByEvalType {
/// An explicit tag name in a `GROUP BY` clause, e.g., `GROUP BY t1, t2`
Tag(String),
/// A regex in a `GROUP BY` that could match 0-or-more tags, e.g., `GROUP BY /t[1,2]/`
Regex(Regex),
/// A wildcard that matches all tags, e.g., `GROUP BY *`
Wildcard,
}
impl GroupByEval {
/// Convert a [`GroupByClause`] to a [`GroupByEval`] if any of its members match on tag columns
///
/// This will produce an error if an invalid regex is provided as one of the `GROUP BY` clauses.
/// That will likely be caught upstream during query parsing, but handle it here anyway.
fn from_clause(clause: GroupByClause) -> Result<Option<Self>, anyhow::Error> {
let v = clause
.iter()
.filter_map(|dim| match dim {
Dimension::Time(_) => None,
Dimension::VarRef(tag) => Some(Ok(GroupByEvalType::Tag(tag.to_string()))),
Dimension::Regex(regex) => Some(
Regex::new(regex.as_str())
.map(GroupByEvalType::Regex)
.context("invalid regex in group by clause"),
),
Dimension::Wildcard => Some(Ok(GroupByEvalType::Wildcard)),
})
.collect::<Result<Vec<GroupByEvalType>, anyhow::Error>>()?;
if v.is_empty() {
Ok(None)
} else {
Ok(Some(Self(v)))
}
}
/// Check if a tag is matched by this set of `GROUP BY` clauses
fn evaluate_tag(&self, tag_name: &str) -> bool {
self.0.iter().any(|eval| eval.test(tag_name))
}
/// Check if the tag name is included explicitly in the `GROUP BY` clause.
///
/// This is for determining orphan `GROUP BY` tag columns.
fn contains_explicit_col_name(&self, col_name: &str) -> bool {
self.0.iter().any(|eval| match eval {
GroupByEvalType::Tag(t) => t == col_name,
_ => false,
})
}
}
impl GroupByEvalType {
/// Test the given `tag_name` agains this evaluator
fn test(&self, tag_name: &str) -> bool {
match self {
Self::Tag(t) => t == tag_name,
Self::Regex(regex) => regex.is_match(tag_name),
Self::Wildcard => true,
}
}
}
impl Stream for QueryResponseStream {
type Item = Result<QueryResponse, anyhow::Error>;
@ -808,6 +1087,7 @@ impl Stream for QueryResponseStream {
// this is why the input stream is fused, because we will end up
// polling the input stream again if we end up here.
Poll::Ready(Some(Ok(self.flush_all())))
// Poll::Ready(None)
} else if self.state.is_initialized() {
// we are still in an initialized state, which means no records were buffered
// and therefore we need to emit an empty result set before ending the stream:

View File

@ -21,10 +21,11 @@ use datafusion_util::MemoryStream;
use influxdb3_cache::distinct_cache::{DistinctCacheFunction, DISTINCT_CACHE_UDTF_NAME};
use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::WriteBuffer;
use influxdb_influxql_parser::statement::Statement;
use iox_query::exec::{Executor, IOxSessionContext, QueryConfig};
use iox_query::provider::ProviderBuilder;
use iox_query::query_log::QueryLog;
@ -109,35 +110,66 @@ impl QueryExecutorImpl {
sys_events_store,
}
}
async fn get_db_namespace(
&self,
database_name: &str,
span_ctx: &Option<SpanContext>,
) -> Result<Arc<dyn QueryNamespace>, QueryExecutorError> {
self.namespace(
database_name,
span_ctx.child_span("get_db_namespace"),
false,
)
.await
.map_err(|_| QueryExecutorError::DatabaseNotFound {
db_name: database_name.to_string(),
})?
.ok_or_else(|| QueryExecutorError::DatabaseNotFound {
db_name: database_name.to_string(),
})
}
}
#[async_trait]
impl QueryExecutor for QueryExecutorImpl {
async fn query(
async fn query_sql(
&self,
database: &str,
query: &str,
params: Option<StatementParams>,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
info!(%database, %query, ?params, ?kind, "QueryExecutorImpl as QueryExecutor::query");
let db = self
.namespace(database, span_ctx.child_span("get database"), false)
.await
.map_err(|_| QueryExecutorError::DatabaseNotFound {
db_name: database.to_string(),
})?
.ok_or_else(|| QueryExecutorError::DatabaseNotFound {
db_name: database.to_string(),
})?;
query_database(
info!(%database, %query, ?params, "executing sql query");
let db = self.get_db_namespace(database, &span_ctx).await?;
query_database_sql(
db,
query,
params,
kind,
span_ctx,
external_span_ctx,
Arc::clone(&self.telemetry_store),
)
.await
}
async fn query_influxql(
&self,
database: &str,
query: &str,
influxql_statement: Statement,
params: Option<StatementParams>,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
info!(database, query, ?params, "executing influxql query");
let db = self.get_db_namespace(database, &span_ctx).await?;
query_database_influxql(
db,
query,
influxql_statement,
params,
span_ctx,
external_span_ctx,
Arc::clone(&self.telemetry_store),
@ -232,11 +264,12 @@ impl QueryExecutor for QueryExecutorImpl {
}
}
async fn query_database(
// NOTE: this method is separated out as it is called from a separate query executor
// implementation in Enterprise
async fn query_database_sql(
db: Arc<dyn QueryNamespace>,
query: &str,
params: Option<StatementParams>,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
telemetry_store: Arc<TelemetryStore>,
@ -245,7 +278,7 @@ async fn query_database(
let token = db.record_query(
external_span_ctx.as_ref().map(RequestLogContext::ctx),
kind.query_type(),
"sql",
Box::new(query.to_string()),
params.clone(),
);
@ -258,12 +291,7 @@ async fn query_database(
// Perform query planning on a separate threadpool than the IO runtime that is servicing
// this request by using `IOxSessionContext::run`.
let plan = ctx
.run(async move {
match kind {
QueryKind::Sql => planner.sql(query, params).await,
QueryKind::InfluxQl => planner.influxql(query, params).await,
}
})
.run(async move { planner.sql(query, params).await })
.await;
let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
@ -292,6 +320,55 @@ async fn query_database(
}
}
async fn query_database_influxql(
db: Arc<dyn QueryNamespace>,
query_str: &str,
statement: Statement,
params: Option<StatementParams>,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
telemetry_store: Arc<TelemetryStore>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
let params = params.unwrap_or_default();
let token = db.record_query(
external_span_ctx.as_ref().map(RequestLogContext::ctx),
"influxql",
Box::new(query_str.to_string()),
params.clone(),
);
let ctx = db.new_query_context(span_ctx, Default::default());
let planner = Planner::new(&ctx);
let plan = ctx
.run(async move { planner.influxql(statement, params).await })
.await;
let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
Ok(plan) => plan,
Err(e) => {
token.fail();
return Err(e);
}
};
let token = token.planned(&ctx, Arc::clone(&plan));
let token = token.permit();
telemetry_store.update_num_queries();
match ctx.execute_stream(Arc::clone(&plan)).await {
Ok(query_results) => {
token.success();
Ok(query_results)
}
Err(err) => {
token.fail();
Err(QueryExecutorError::ExecuteStream(err))
}
}
}
#[derive(Debug)]
struct RetentionPolicyRow {
database: String,
@ -682,7 +759,7 @@ mod tests {
parquet_cache::test_cached_obj_store_and_oracle,
};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
@ -895,7 +972,7 @@ mod tests {
for t in test_cases {
let batch_stream = query_executor
.query(db_name, t.query, None, QueryKind::Sql, None, None)
.query_sql(db_name, t.query, None, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = batch_stream.try_collect().await.unwrap();

View File

@ -1,6 +1,15 @@
use std::sync::Arc;
use std::{any::Any, sync::Arc};
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use arrow_schema::SchemaRef;
use datafusion::{
error::DataFusionError,
execution::{SendableRecordBatchStream, TaskContext},
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
},
};
use influxdb_influxql_parser::statement::Statement;
use iox_query::{exec::IOxSessionContext, frontend::sql::SqlQueryPlanner};
use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner;
use iox_query_params::StatementParams;
@ -41,12 +50,119 @@ impl Planner {
/// Plan an InfluxQL query and return a DataFusion physical plan
pub(crate) async fn influxql(
&self,
query: impl AsRef<str> + Send,
statement: Statement,
params: impl Into<StatementParams> + Send,
) -> Result<Arc<dyn ExecutionPlan>> {
let query = query.as_ref();
let ctx = self.ctx.child_ctx("rest_api_query_planner_influxql");
InfluxQLQueryPlanner::query(query, params, &ctx).await
let logical_plan = InfluxQLQueryPlanner::statement_to_plan(statement, params, &ctx).await?;
let input = ctx.create_physical_plan(&logical_plan).await?;
let input_schema = input.schema();
let mut md = input_schema.metadata().clone();
md.extend(logical_plan.schema().metadata().clone());
let schema = Arc::new(arrow::datatypes::Schema::new_with_metadata(
input_schema.fields().clone(),
md,
));
Ok(Arc::new(SchemaExec::new(input, schema)))
}
}
// NOTE: the below code is currently copied from IOx and needs to be made pub so we can
// re-use it.
/// A physical operator that overrides the `schema` API,
/// to return an amended version owned by `SchemaExec`. The
/// principal use case is to add additional metadata to the schema.
struct SchemaExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
/// Cache holding plan properties like equivalences, output partitioning, output ordering etc.
cache: PlanProperties,
}
impl SchemaExec {
fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
let cache = Self::compute_properties(&input, Arc::clone(&schema));
Self {
input,
schema,
cache,
}
}
/// This function creates the cache object that stores the plan properties such as equivalence properties, partitioning, ordering, etc.
fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
let eq_properties = match input.properties().output_ordering() {
None => EquivalenceProperties::new(schema),
Some(output_odering) => {
EquivalenceProperties::new_with_orderings(schema, &[output_odering.to_vec()])
}
};
let output_partitioning = input.output_partitioning().clone();
PlanProperties::new(eq_properties, output_partitioning, input.execution_mode())
}
}
impl std::fmt::Debug for SchemaExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.fmt_as(DisplayFormatType::Default, f)
}
}
impl ExecutionPlan for SchemaExec {
fn name(&self) -> &str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.input.execute(partition, context)
}
fn statistics(&self) -> Result<datafusion::physical_plan::Statistics, DataFusionError> {
Ok(datafusion::physical_plan::Statistics::new_unknown(
&self.schema(),
))
}
}
impl DisplayAs for SchemaExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "SchemaExec")
}
}
}
}