Merge branch 'main' into 7899/wal-disk-metrics

pull/24376/head
Dom 2023-07-06 14:44:11 +01:00 committed by GitHub
commit a005f344d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1190 additions and 115 deletions

96
Cargo.lock generated
View File

@ -379,7 +379,7 @@ dependencies = [
"arrow-schema",
"arrow-select",
"regex",
"regex-syntax 0.7.2",
"regex-syntax 0.7.3",
]
[[package]]
@ -485,9 +485,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.70"
version = "0.1.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79fa67157abdfd688a259b6648808757db9347af834624f27ec646da976aee5d"
checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf"
dependencies = [
"proc-macro2",
"quote",
@ -686,7 +686,7 @@ checksum = "a246e68bb43f6cd9db24bea052a53e40405417c5fb372e3d1a8a7f770a564ef5"
dependencies = [
"memchr",
"once_cell",
"regex-automata",
"regex-automata 0.1.10",
"serde",
]
@ -841,9 +841,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.3.10"
version = "4.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384e169cc618c613d5e3ca6404dda77a8685a63e08660dcc64abaf7da7cb0c7a"
checksum = "1640e5cc7fb47dbb8338fd471b105e7ed6c3cb2aeb00c2e067127ffd3764a05d"
dependencies = [
"clap_builder",
"clap_derive",
@ -873,9 +873,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.3.10"
version = "4.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef137bbe35aab78bdb468ccfba75a5f4d8321ae011d34063770780545176af2d"
checksum = "98c59138d527eeaf9b53f35a77fcc1fad9d883116070c63d5de1c7dc7b00c72b"
dependencies = [
"anstream",
"anstyle",
@ -1458,7 +1458,7 @@ dependencies = [
"hashbrown 0.14.0",
"itertools 0.10.5",
"log",
"regex-syntax 0.7.2",
"regex-syntax 0.7.3",
]
[[package]]
@ -1723,7 +1723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5"
dependencies = [
"cfg-if",
"rustix 0.38.2",
"rustix 0.38.3",
"windows-sys 0.48.0",
]
@ -2287,7 +2287,7 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7"
dependencies = [
"http",
"hyper",
"rustls 0.21.2",
"rustls 0.21.3",
"tokio",
"tokio-rustls 0.24.1",
]
@ -3115,7 +3115,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24fddda5af7e54bf7da53067d6e802dbcc381d0a8eef629df528e3ebf68755cb"
dependencies = [
"hermit-abi",
"rustix 0.38.2",
"rustix 0.38.3",
"windows-sys 0.48.0",
]
@ -3337,7 +3337,7 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
"regex-automata 0.1.10",
]
[[package]]
@ -4410,7 +4410,7 @@ dependencies = [
"itertools 0.11.0",
"once_cell",
"regex",
"regex-syntax 0.7.2",
"regex-syntax 0.7.3",
"schema",
"snafu",
"tokio",
@ -4537,13 +4537,14 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.4"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
checksum = "89089e897c013b3deb627116ae56a6955a72b8bed395c9526af31c9fe528b484"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.2",
"regex-automata 0.3.0",
"regex-syntax 0.7.3",
]
[[package]]
@ -4555,6 +4556,17 @@ dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa250384981ea14565685dea16a9ccc4d1c541a13f82b9c168572264d1df8c56"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.3",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
@ -4563,9 +4575,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.2"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
checksum = "2ab07dc67230e4a4718e70fd5c20055a4334b121f1f9db8fe63ef39ce9b8c846"
[[package]]
name = "reqwest"
@ -4590,7 +4602,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.2",
"rustls 0.21.3",
"rustls-pemfile",
"serde",
"serde_json",
@ -4717,9 +4729,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.2"
version = "0.38.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabcb0461ebd01d6b79945797c27f8529082226cb630a9865a71870ff63532a4"
checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4"
dependencies = [
"bitflags 2.3.3",
"errno",
@ -4742,13 +4754,13 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.2"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e32ca28af694bc1bbf399c33a516dbdf1c90090b8ab23c2bc24f834aa2247f5f"
checksum = "b19faa85ecb5197342b54f987b142fb3e30d0c90da40f80ef4fa9a726e6676ed"
dependencies = [
"log",
"ring",
"rustls-webpki",
"rustls-webpki 0.101.1",
"sct",
]
@ -4771,6 +4783,16 @@ dependencies = [
"untrusted",
]
[[package]]
name = "rustls-webpki"
version = "0.101.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.12"
@ -4963,6 +4985,7 @@ dependencies = [
"serde_json",
"service_common",
"snafu",
"test_helpers",
"tokio",
"tonic",
"trace",
@ -5159,9 +5182,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.10.0"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
[[package]]
name = "snafu"
@ -5766,7 +5789,7 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls 0.21.2",
"rustls 0.21.3",
"tokio",
]
@ -5808,9 +5831,9 @@ dependencies = [
[[package]]
name = "toml"
version = "0.7.5"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ebafdf5ad1220cb59e7d17cf4d2c72015297b75b19a10472f99b89225089240"
checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542"
dependencies = [
"serde",
"serde_spanned",
@ -5829,9 +5852,9 @@ dependencies = [
[[package]]
name = "toml_edit"
version = "0.19.11"
version = "0.19.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "266f016b7f039eec8a1a80dfe6156b633d208b9fccca5e4db1d6775b0c4e34a7"
checksum = "c500344a19072298cd05a7224b3c0c629348b78692bf48466c5238656e315a78"
dependencies = [
"indexmap 2.0.0",
"serde",
@ -6435,7 +6458,7 @@ version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
dependencies = [
"rustls-webpki",
"rustls-webpki 0.100.1",
]
[[package]]
@ -6709,11 +6732,12 @@ dependencies = [
"rand",
"rand_core",
"regex",
"regex-syntax 0.7.2",
"regex-automata 0.3.0",
"regex-syntax 0.7.3",
"reqwest",
"ring",
"rustix 0.38.2",
"rustls 0.21.2",
"rustix 0.38.3",
"rustls 0.21.3",
"scopeguard",
"serde",
"serde_json",

View File

@ -16,7 +16,7 @@ comfy-table = { version = "7.0", default-features = false }
hashbrown = { workspace = true }
num-traits = "0.2"
once_cell = { version = "1.18", features = ["parking_lot"] }
regex = "1.8.4"
regex = "1.9.0"
snafu = "0.7"
uuid = "1"
workspace-hack = { version = "0.1", path = "../workspace-hack" }

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.70"
async-trait = "0.1.71"
backoff = { path = "../backoff" }
futures = "0.3"
iox_time = { path = "../iox_time" }

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.70"
async-trait = "0.1.71"
backoff = { path = "../backoff" }
bytes = "1.4"
compactor_scheduler = { path = "../compactor_scheduler" }

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.70"
async-trait = "0.1.71"
backoff = { path = "../backoff" }
data_types = { path = "../data_types" }
iox_catalog = { path = "../iox_catalog" }

View File

@ -7,7 +7,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.70"
async-trait = "0.1.71"
backoff = { path = "../backoff" }
compactor = { path = "../compactor" }
compactor_scheduler = { path = "../compactor_scheduler" }

View File

@ -426,6 +426,20 @@ mod influxql {
.await;
}
/// Test TOP/BOTTOM functions, which use window functions to project
/// the top or bottom rows in groups.
#[tokio::test]
async fn top_bottom() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/top_bottom.influxql",
chunk_stage: ChunkStage::Ingester,
}
.run()
.await;
}
#[tokio::test]
async fn influxql_metadata() {
test_helpers::maybe_start_logging();

View File

@ -0,0 +1,26 @@
-- IOX_SETUP: top_bottom
--
-- top
--
SELECT top(writes, 2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT top(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
SELECT top(usage_idle,5), cpu FROM cpu GROUP BY machine;
SELECT top(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT top(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
SELECT top(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
SELECT top(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
SELECT top(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
SELECT top(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s);
--
-- bottom
--
SELECT bottom(reads, 3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT bottom(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1';
SELECT bottom(usage_idle,5), cpu FROM cpu GROUP BY machine;
SELECT bottom(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT bottom(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
SELECT bottom(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
SELECT bottom(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
SELECT bottom(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
SELECT bottom(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s);

View File

@ -0,0 +1,210 @@
-- Test Setup: top_bottom
-- InfluxQL: SELECT top(writes, 2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+---------+
| time | top |
+---------------------+---------+
| 1970-01-01T00:03:20 | 5593589 |
| 1970-01-01T00:03:30 | 5593735 |
+---------------------+---------+
-- InfluxQL: SELECT top(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
name: cpu
+---------------------+------+
| time | top |
+---------------------+------+
| 1970-01-01T00:02:10 | 89.8 |
| 1970-01-01T00:02:50 | 89.8 |
| 1970-01-01T00:03:00 | 90.0 |
+---------------------+------+
-- InfluxQL: SELECT top(usage_idle,5), cpu FROM cpu GROUP BY machine;
name: cpu
tags: machine=machine1
+---------------------+------+------+
| time | top | cpu |
+---------------------+------+------+
| 1970-01-01T00:01:00 | 99.8 | cpu1 |
| 1970-01-01T00:01:20 | 99.8 | cpu1 |
| 1970-01-01T00:02:00 | 99.9 | cpu1 |
| 1970-01-01T00:02:20 | 99.9 | cpu1 |
| 1970-01-01T00:02:30 | 99.9 | cpu1 |
+---------------------+------+------+
name: cpu
tags: machine=machine2
+---------------------+------+------+
| time | top | cpu |
+---------------------+------+------+
| 1970-01-01T00:01:00 | 89.8 | cpu1 |
| 1970-01-01T00:01:20 | 89.8 | cpu1 |
| 1970-01-01T00:02:00 | 89.9 | cpu1 |
| 1970-01-01T00:02:20 | 89.9 | cpu1 |
| 1970-01-01T00:02:30 | 89.9 | cpu1 |
+---------------------+------+------+
-- InfluxQL: SELECT top(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+---------+
| time | top |
+---------------------+---------+
| 1970-01-01T00:02:10 | 5592646 |
| 1970-01-01T00:02:20 | 5592810 |
| 1970-01-01T00:02:30 | 5592997 |
| 1970-01-01T00:02:40 | 5593109 |
| 1970-01-01T00:02:50 | 5593219 |
| 1970-01-01T00:03:00 | 5593438 |
| 1970-01-01T00:03:10 | 5593513 |
| 1970-01-01T00:03:20 | 5593589 |
| 1970-01-01T00:03:30 | 5593735 |
+---------------------+---------+
-- InfluxQL: SELECT top(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+---------+
| time | top |
+---------------------+---------+
| 1970-01-01T00:02:10 | 5592646 |
| 1970-01-01T00:02:20 | 5592810 |
| 1970-01-01T00:02:40 | 5593109 |
| 1970-01-01T00:02:50 | 5593219 |
| 1970-01-01T00:03:10 | 5593513 |
| 1970-01-01T00:03:20 | 5593589 |
| 1970-01-01T00:03:30 | 5593735 |
+---------------------+---------+
-- InfluxQL: SELECT top(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
name: cpu
+---------------------+------+----------+------+
| time | top | machine | cpu |
+---------------------+------+----------+------+
| 1970-01-01T00:02:00 | 99.9 | machine1 | cpu1 |
| 1970-01-01T00:03:00 | 90.0 | machine1 | cpu0 |
+---------------------+------+----------+------+
-- InfluxQL: SELECT top(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
name: cpu
+---------------------+------+----------+------+
| time | top | machine | cpu |
+---------------------+------+----------+------+
| 1970-01-01T00:02:00 | 99.9 | machine1 | cpu1 |
| 1970-01-01T00:02:00 | 89.9 | machine2 | cpu1 |
+---------------------+------+----------+------+
-- InfluxQL: SELECT top(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
name: cpu
+---------------------+------+----------+-----------+
| time | top | machine | machine_1 |
+---------------------+------+----------+-----------+
| 1970-01-01T00:02:00 | 99.9 | machine1 | machine1 |
| 1970-01-01T00:02:00 | 89.9 | machine2 | machine2 |
+---------------------+------+----------+-----------+
-- InfluxQL: SELECT top(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s);
name: cpu
+---------------------+------+----------+------+
| time | top | machine | cpu |
+---------------------+------+----------+------+
| 1970-01-01T00:01:00 | 99.8 | machine1 | cpu1 |
| 1970-01-01T00:01:00 | 89.8 | machine2 | cpu1 |
| 1970-01-01T00:02:00 | 99.9 | machine1 | cpu1 |
| 1970-01-01T00:02:30 | 90.4 | machine1 | cpu0 |
| 1970-01-01T00:03:00 | 99.8 | machine1 | cpu1 |
| 1970-01-01T00:03:00 | 90.0 | machine1 | cpu0 |
+---------------------+------+----------+------+
-- InfluxQL: SELECT bottom(reads, 3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+---------+
| time | bottom |
+---------------------+---------+
| 1970-01-01T00:02:10 | 2592646 |
| 1970-01-01T00:02:30 | 2592997 |
| 1970-01-01T00:02:50 | 2593219 |
+---------------------+---------+
-- InfluxQL: SELECT bottom(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1';
name: cpu
+---------------------+--------+
| time | bottom |
+---------------------+--------+
| 1970-01-01T00:01:00 | 89.8 |
| 1970-01-01T00:01:10 | 89.7 |
| 1970-01-01T00:02:10 | 89.8 |
+---------------------+--------+
-- InfluxQL: SELECT bottom(usage_idle,5), cpu FROM cpu GROUP BY machine;
name: cpu
tags: machine=machine1
+---------------------+--------+------+
| time | bottom | cpu |
+---------------------+--------+------+
| 1970-01-01T00:01:10 | 88.6 | cpu0 |
| 1970-01-01T00:01:20 | 88.6 | cpu0 |
| 1970-01-01T00:01:30 | 83.4 | cpu0 |
| 1970-01-01T00:01:40 | 87.7 | cpu0 |
| 1970-01-01T00:02:00 | 86.9 | cpu0 |
+---------------------+--------+------+
name: cpu
tags: machine=machine2
+---------------------+--------+------+
| time | bottom | cpu |
+---------------------+--------+------+
| 1970-01-01T00:01:10 | 78.6 | cpu0 |
| 1970-01-01T00:01:20 | 78.6 | cpu0 |
| 1970-01-01T00:01:30 | 73.4 | cpu0 |
| 1970-01-01T00:01:40 | 77.7 | cpu0 |
| 1970-01-01T00:02:00 | 76.9 | cpu0 |
+---------------------+--------+------+
-- InfluxQL: SELECT bottom(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+---------+
| time | bottom |
+---------------------+---------+
| 1970-01-01T00:02:10 | 5592646 |
| 1970-01-01T00:02:20 | 5592810 |
| 1970-01-01T00:02:30 | 5592997 |
| 1970-01-01T00:02:40 | 5593109 |
| 1970-01-01T00:02:50 | 5593219 |
| 1970-01-01T00:03:00 | 5593438 |
| 1970-01-01T00:03:10 | 5593513 |
| 1970-01-01T00:03:20 | 5593589 |
| 1970-01-01T00:03:30 | 5593735 |
+---------------------+---------+
-- InfluxQL: SELECT bottom(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+---------+
| time | bottom |
+---------------------+---------+
| 1970-01-01T00:02:10 | 5592646 |
| 1970-01-01T00:02:20 | 5592810 |
| 1970-01-01T00:02:30 | 5592997 |
| 1970-01-01T00:02:40 | 5593109 |
| 1970-01-01T00:03:00 | 5593438 |
| 1970-01-01T00:03:10 | 5593513 |
| 1970-01-01T00:03:30 | 5593735 |
+---------------------+---------+
-- InfluxQL: SELECT bottom(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
name: cpu
+---------------------+--------+----------+------+
| time | bottom | machine | cpu |
+---------------------+--------+----------+------+
| 1970-01-01T00:01:30 | 73.4 | machine2 | cpu0 |
| 1970-01-01T00:01:30 | 83.4 | machine1 | cpu0 |
+---------------------+--------+----------+------+
-- InfluxQL: SELECT bottom(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
name: cpu
+---------------------+--------+----------+------+
| time | bottom | machine | cpu |
+---------------------+--------+----------+------+
| 1970-01-01T00:01:30 | 73.4 | machine2 | cpu0 |
| 1970-01-01T00:01:30 | 83.4 | machine1 | cpu0 |
+---------------------+--------+----------+------+
-- InfluxQL: SELECT bottom(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001;
name: cpu
+---------------------+--------+----------+-----------+
| time | bottom | machine | machine_1 |
+---------------------+--------+----------+-----------+
| 1970-01-01T00:01:30 | 83.4 | machine1 | machine1 |
| 1970-01-01T00:01:30 | 73.4 | machine2 | machine2 |
+---------------------+--------+----------+-----------+
-- InfluxQL: SELECT bottom(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s);
name: cpu
+---------------------+--------+----------+------+
| time | bottom | machine | cpu |
+---------------------+--------+----------+------+
| 1970-01-01T00:01:30 | 73.4 | machine2 | cpu0 |
| 1970-01-01T00:01:30 | 83.4 | machine1 | cpu0 |
| 1970-01-01T00:02:00 | 76.9 | machine2 | cpu0 |
| 1970-01-01T00:02:00 | 86.9 | machine1 | cpu0 |
| 1970-01-01T00:03:10 | 78.8 | machine2 | cpu0 |
| 1970-01-01T00:03:10 | 88.8 | machine1 | cpu0 |
+---------------------+--------+----------+------+

View File

@ -0,0 +1,80 @@
# Load into influxdb 1.8:
#
# curl localhost:8086/write\?db=top_bottom --data-binary "@influxdb_iox/tests/query_tests/data/top_bottom.lp"
#
# Float data, regular intervals, usage_system has gaps
#
cpu,cpu=cpu0,machine=machine1 usage_idle=89.5,usage_system=89.5 0000000060000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=79.5,usage_system=79.5 0000000060000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000060000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000060000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=88.6,usage_system=88.6 0000000070000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=78.6,usage_system=78.6 0000000070000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.7,usage_system=99.7 0000000070000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.7,usage_system=89.7 0000000070000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=88.6 0000000080000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=78.6 0000000080000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8 0000000080000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8 0000000080000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.7 0000000090000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.7 0000000090000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=83.4,usage_system=83.4 0000000090000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=73.4,usage_system=73.4 0000000090000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.7 0000000100000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.7 0000000100000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=87.7,usage_system=87.7 0000000100000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=77.7,usage_system=77.7 0000000100000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=88.7 0000000110000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=78.7 0000000110000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.3 0000000110000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.3 0000000110000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=86.9 0000000120000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=76.9 0000000120000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.9,usage_system=99.9 0000000120000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.9,usage_system=89.9 0000000120000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=89.8,usage_system=89.8 0000000130000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=79.8,usage_system=79.8 0000000130000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000130000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000130000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=89.0 0000000140000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=79.0 0000000140000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.9,usage_system=99.9 0000000140000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.9,usage_system=89.9 0000000140000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=90.4 0000000150000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=80.4 0000000150000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.9 0000000150000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.9 0000000150000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=90.2 0000000160000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=80.2 0000000160000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8 0000000160000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8 0000000160000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8 0000000170000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8 0000000170000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=89.8,usage_system=89.8 0000000170000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=79.8,usage_system=79.8 0000000170000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=90.0,usage_system=90.0 0000000180000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=80.0,usage_system=80.0 0000000180000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000180000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000180000000000
cpu,cpu=cpu0,machine=machine1 usage_idle=88.8 0000000190000000000
cpu,cpu=cpu0,machine=machine2 usage_idle=78.8 0000000190000000000
cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000190000000000
cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000190000000000
# integers at regular intervals
diskio,name=disk0 reads=2591520i,writes=5591520i 0000000060000000000
diskio,name=disk0 writes=5591620i 0000000070000000000
diskio,name=disk0 writes=5591729i 0000000080000000000
diskio,name=disk0 writes=5592114i 0000000090000000000
diskio,name=disk0 writes=5592210i 0000000100000000000
diskio,name=disk0 reads=2592366i,writes=5592366i 0000000110000000000
diskio,name=disk0 reads=2592576i,writes=5592576i 0000000120000000000
diskio,name=disk0 reads=2592646i,writes=5592646i 0000000130000000000
diskio,name=disk0 writes=5592810i 0000000140000000000
diskio,name=disk0 reads=2592997i,writes=5592997i 0000000150000000000
diskio,name=disk0 writes=5593109i 0000000160000000000
diskio,name=disk0 reads=2593219i,writes=5593219i 0000000170000000000
diskio,name=disk0 reads=2593438i,writes=5593438i 0000000180000000000
diskio,name=disk0 writes=5593513i 0000000190000000000
diskio,name=disk0 reads=2593589i,writes=5593589i 0000000200000000000
diskio,name=disk0 reads=2593735i,writes=5593735i 0000000210000000000

View File

@ -1378,6 +1378,20 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
},
],
),
(
// Used for top/bottom function tests for InfluxQL
"top_bottom",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
include_str!("data/top_bottom.lp").to_string()
),
Step::Persist,
Step::WaitForPersisted {
expected_increase: 1,
},
],
),
(
"DuplicateDifferentDomains",
(0..2)

View File

@ -20,7 +20,7 @@ repository = "https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line
bytes = "1.4"
log = "0.4.19"
nom = { version = "7", default-features = false, features = ["std"] }
smallvec = { version = "1.10.0", features = ["union"] }
smallvec = { version = "1.11.0", features = ["union"] }
snafu = "0.7"
[dev-dependencies] # In alphabetical order

View File

@ -10,7 +10,7 @@ arrow = { workspace = true, features = ["prettyprint"] }
arrow_util = { version = "0.1.0", path = "../arrow_util" }
arrow-flight = { workspace = true }
async-channel = "1.8.0"
async-trait = "0.1.70"
async-trait = "0.1.71"
backoff = { version = "0.1.0", path = "../backoff" }
bytes = "1.4.0"
crossbeam-utils = "0.8.16"

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
async-trait = "0.1.70"
async-trait = "0.1.71"
data_types = { path = "../data_types" }
futures = "0.3"
iox_time = { version = "0.1.0", path = "../iox_time" }

View File

@ -20,13 +20,13 @@ mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch = { path = "../mutable_batch" }
parquet_file = { path = "../parquet_file" }
rand = { version = "0.8.3", features = ["small_rng"] }
regex = "1.8"
regex = "1.9"
schema = { path = "../schema" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.100"
snafu = "0.7"
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
toml = "0.7.5"
toml = "0.7.6"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1", default_features = false }

View File

@ -82,7 +82,7 @@ use schema::{
InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME,
INFLUXQL_METADATA_KEY,
};
use std::collections::{BTreeSet, HashSet};
use std::collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
use std::fmt::Debug;
use std::iter;
use std::ops::{Bound, ControlFlow, Deref, Not, Range};
@ -421,20 +421,6 @@ impl<'a> Context<'a> {
fn fill(&self) -> FillClause {
self.fill.unwrap_or_default()
}
fn is_aggregate(&self) -> bool {
matches!(
self.projection_type,
ProjectionType::Aggregate
| ProjectionType::WindowAggregate
| ProjectionType::WindowAggregateMixed
| ProjectionType::Selector { .. }
)
}
fn is_raw_distinct(&self) -> bool {
matches!(self.projection_type, ProjectionType::RawDistinct)
}
}
#[allow(missing_debug_implementations)]
@ -775,38 +761,126 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
if ctx.projection_type == ProjectionType::WindowAggregateMixed {
return error::not_implemented("mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)");
match ctx.projection_type {
ProjectionType::Raw => self.project_select_raw(input, fields),
ProjectionType::RawDistinct => self.project_select_raw_distinct(input, fields),
ProjectionType::Aggregate | ProjectionType::Selector{..} => self.project_select_aggregate(ctx, input, fields, group_by_tag_set),
ProjectionType::Window => self.project_select_window(ctx, input, fields, group_by_tag_set),
ProjectionType::WindowAggregate => self.project_select_window_aggregate(ctx, input, fields, group_by_tag_set),
ProjectionType::WindowAggregateMixed => error::not_implemented("mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)"),
ProjectionType::TopBottomSelector => self.project_select_top_bottom_selector(ctx, input, fields, group_by_tag_set),
}
}
fn project_select_raw(&self, input: LogicalPlan, fields: &[Field]) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
project(input, select_exprs)
}
fn project_select_raw_distinct(
&self,
input: LogicalPlan,
fields: &[Field],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let mut select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
if ctx.is_raw_distinct() {
// This is a special case, where exactly one column can be projected with a `DISTINCT`
// clause or the `distinct` function.
//
// In addition, the time column is projected as the Unix epoch.
// This is a special case, where exactly one column can be projected with a `DISTINCT`
// clause or the `distinct` function.
//
// In addition, the time column is projected as the Unix epoch.
let Some(time_column_index) = find_time_column_index(fields) else {
let Some(time_column_index) = find_time_column_index(fields) else {
return error::internal("unable to find time column")
};
// Take ownership of the alias, so we don't reallocate, and temporarily place a literal
// `NULL` in its place.
let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else {
// Take ownership of the alias, so we don't reallocate, and temporarily place a literal
// `NULL` in its place.
let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else {
return error::internal("time column is not an alias")
};
select_exprs[time_column_index] = lit_timestamp_nano(0).alias(alias);
select_exprs[time_column_index] = lit_timestamp_nano(0).alias(alias);
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
let plan = project(input, select_exprs)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
let plan = project(input, select_exprs)?;
return LogicalPlanBuilder::from(plan).distinct()?.build();
LogicalPlanBuilder::from(plan).distinct()?.build()
}
fn project_select_aggregate(
&self,
ctx: &Context<'_>,
input: LogicalPlan,
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let (plan, select_exprs) =
self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
project(plan, select_exprs)
}
fn project_select_window(
&self,
ctx: &Context<'_>,
input: LogicalPlan,
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let (plan, select_exprs) =
self.select_window(ctx, input, select_exprs, group_by_tag_set)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
let plan = project(plan, select_exprs)?;
// InfluxQL OG physical operators for
// generate a predicate to filter rows where all field values of the row are `NULL`,
// like:
//
// NOT (field1 IS NULL AND field2 IS NULL AND ...)
match conjunction(fields.iter().filter_map(|f| {
if matches!(f.data_type, Some(InfluxColumnType::Field(_))) {
Some(f.name.as_expr().is_null())
} else {
None
}
})) {
Some(expr) => LogicalPlanBuilder::from(plan).filter(expr.not())?.build(),
None => Ok(plan),
}
}
fn project_select_window_aggregate(
&self,
ctx: &Context<'_>,
input: LogicalPlan,
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let (plan, select_exprs) =
self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?;
@ -816,29 +890,117 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
let plan = project(plan, select_exprs)?;
if matches!(
ctx.projection_type,
ProjectionType::WindowAggregate | ProjectionType::Window
) {
// InfluxQL OG physical operators for
// InfluxQL OG physical operators for
// generate a predicate to filter rows where all field values of the row are `NULL`,
// like:
//
// NOT (field1 IS NULL AND field2 IS NULL AND ...)
match conjunction(fields.iter().filter_map(|f| {
if matches!(f.data_type, Some(InfluxColumnType::Field(_))) {
Some(f.name.as_expr().is_null())
} else {
None
}
})) {
Some(expr) => LogicalPlanBuilder::from(plan).filter(expr.not())?.build(),
None => Ok(plan),
// generate a predicate to filter rows where all field values of the row are `NULL`,
// like:
//
// NOT (field1 IS NULL AND field2 IS NULL AND ...)
match conjunction(fields.iter().filter_map(|f| {
if matches!(f.data_type, Some(InfluxColumnType::Field(_))) {
Some(f.name.as_expr().is_null())
} else {
None
}
})) {
Some(expr) => LogicalPlanBuilder::from(plan).filter(expr.not())?.build(),
None => Ok(plan),
}
}
fn project_select_top_bottom_selector(
&self,
ctx: &Context<'_>,
input: LogicalPlan,
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let (selector_index, call) = fields
.iter()
.enumerate()
.find_map(|(idx, f)| match &f.expr {
IQLExpr::Call(c) if c.name == "top" || c.name == "bottom" => Some((idx, c.clone())),
_ => None,
})
.ok_or(error::map::internal(
"ProjectionTopBottomSelector used without top or bottom field",
))?;
// Find the selector parameters.
let is_bottom = call.name == "bottom";
let [field, tag_keys @ .., narg] = call.args.as_slice() else {
return error::internal(format!(
"invalid number of arguments for {}: expected 2 or more, got {}",
call.name,
call.args.len()
));
};
let field = if let IQLExpr::VarRef(v) = field {
Field {
expr: IQLExpr::VarRef(v.clone()),
name: v.name.clone().take(),
data_type: None,
}
} else {
Ok(plan)
return error::internal(format!(
"invalid expression for {} field argument, {field}",
call.name,
));
};
let n = if let IQLExpr::Literal(Literal::Integer(v)) = narg {
*v
} else {
return error::internal(format!(
"invalid expression for {} n argument, {narg}",
call.name
));
};
let mut internal_group_by = group_by_tag_set.to_vec();
let mut fields_vec = fields.to_vec();
for (i, tag_key) in tag_keys.iter().enumerate() {
if let IQLExpr::VarRef(v) = &tag_key {
fields_vec.insert(
selector_index + i + 1,
Field {
expr: IQLExpr::VarRef(v.clone()),
name: v.name.clone().take(),
data_type: None,
},
);
internal_group_by.push(v.name.as_ref());
} else {
return error::internal(format!(
"invalid expression for {} tag_keys argument, {}",
call.name, &tag_key
));
}
}
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields_vec.as_slice(), &schemas)?;
let plan = if !tag_keys.is_empty() {
self.select_first(
ctx,
input,
&schemas,
&field,
is_bottom,
internal_group_by.as_slice(),
1,
)?
} else {
input
};
let plan =
self.select_first(ctx, plan, &schemas, &field, is_bottom, group_by_tag_set, n)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
project(plan, select_exprs)
}
fn select_aggregate(
@ -849,10 +1011,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
mut select_exprs: Vec<Expr>,
group_by_tag_set: &[&str],
) -> Result<(LogicalPlan, Vec<Expr>)> {
if !ctx.is_aggregate() {
return Ok((input, select_exprs));
}
// Find a list of unique aggregate expressions from the projection.
//
// For example, a projection such as:
@ -1123,6 +1281,56 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Ok((plan, select_exprs))
}
/// Generate a plan to select the first n rows from each partition in the input data sorted by the requested field.
#[allow(clippy::too_many_arguments)]
fn select_first(
&self,
ctx: &Context<'_>,
input: LogicalPlan,
schemas: &Schemas,
field: &Field,
asc: bool,
group_by_tags: &[&str],
count: i64,
) -> Result<LogicalPlan> {
let mut group_by =
fields_to_exprs_no_nulls(input.schema(), group_by_tags).collect::<Vec<_>>();
if let Some(i) = ctx.interval {
let stride = lit(ScalarValue::new_interval_mdn(0, 0, i.duration));
let offset = i.offset.unwrap_or_default();
group_by.push(date_bin(
stride,
"time".as_expr(),
lit(ScalarValue::TimestampNanosecond(Some(offset), None)),
));
}
let field_sort_expr = self
.field_to_df_expr(field, &input, schemas)?
.sort(asc, false);
let window_expr = Expr::WindowFunction(WindowFunction::new(
window_function::WindowFunction::BuiltInWindowFunction(
window_function::BuiltInWindowFunction::RowNumber,
),
Vec::<Expr>::new(),
group_by,
vec![field_sort_expr, ctx.time_sort_expr()],
WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
},
));
let column_name = window_expr.display_name()?;
let filter_expr = binary_expr(col(column_name.clone()), Operator::LtEq, lit(count));
LogicalPlanBuilder::from(input)
.window(vec![window_expr.alias(column_name)])?
.filter(filter_expr)?
.build()
}
/// Transform a UDF to a window expression.
fn udf_to_expr(
ctx: &Context<'_>,
@ -1380,9 +1588,25 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
schemas: &Schemas,
) -> Result<Vec<Expr>> {
let mut names: HashMap<&str, usize> = HashMap::new();
fields
.iter()
.map(|field| self.field_to_df_expr(field, plan, schemas))
.map(|field| {
let mut new_field = field.clone();
new_field.name = match names.entry(field.name.as_str()) {
Entry::Vacant(v) => {
v.insert(0);
field.name.clone()
}
Entry::Occupied(mut e) => {
let count = e.get_mut();
*count += 1;
format!("{}_{}", field.name, *count)
}
};
new_field
})
.map(|field| self.field_to_df_expr(&field, plan, schemas))
.collect()
}
@ -1729,6 +1953,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Ok(non_negative_derivative(eargs))
}
// The TOP/BOTTOM function is handled as a `ProjectionType::TopBottomSelector`
// query, so the planner only needs to project the single column
// argument.
"top" | "bottom" => self.expr_to_df_expr(scope, &args[0], schemas),
_ => error::query(format!("Invalid function '{name}'")),
}
}
@ -3799,6 +4028,80 @@ mod test {
}
}
#[test]
fn test_top() {
assert_snapshot!(plan("SELECT top(usage_idle,10) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS top [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N]
Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT top(usage_idle,10),cpu FROM cpu"), @r###"
Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS top, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT top(usage_idle,10) FROM cpu GROUP BY cpu"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, top:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS top [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, top:Float64;N]
Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT top(usage_idle,cpu,10) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS top, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(1) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_bottom() {
assert_snapshot!(plan("SELECT bottom(usage_idle,10) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS bottom [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N]
Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT bottom(usage_idle,10),cpu FROM cpu"), @r###"
Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS bottom, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT bottom(usage_idle,10) FROM cpu GROUP BY cpu"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, bottom:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS bottom [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, bottom:Float64;N]
Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT bottom(usage_idle,cpu,10) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS bottom, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N]
Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(1) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
/// Test InfluxQL-specific behaviour of scalar functions that differ
/// from DataFusion
#[test]

View File

@ -373,7 +373,7 @@ pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF {
}
#[derive(Debug)]
pub(super) struct DerivativeAccumulator {
struct DerivativeAccumulator {
unit: i64,
prev: Option<Point>,
curr: Option<Point>,

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
async-trait = "0.1.70"
async-trait = "0.1.71"
bytes = "1.4"
futures = "0.3"
iox_time = { version = "0.1.0", path = "../iox_time" }

View File

@ -205,7 +205,12 @@ impl FieldProjectionRewriter {
}
});
Ok(predicate.with_field_columns(new_fields).unwrap())
let predicate = predicate
.with_field_columns(new_fields)
// errors are possible if the field colmns are not supported
.map_err(|e| DataFusionError::NotImplemented(e.to_string()))?;
Ok(predicate)
}
}

View File

@ -8,7 +8,7 @@ license.workspace = true
[dependencies]
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = "0.1.70"
async-trait = "0.1.71"
backoff = { path = "../backoff" }
bytes = "1.4"
cache_system = { path = "../cache_system" }

View File

@ -11,7 +11,7 @@ chrono = { version = "0.4", default-features = false }
datafusion = { workspace = true }
once_cell = "1"
regex = "1"
regex-syntax = "0.7.1"
regex-syntax = "0.7.3"
schema = { path = "../schema" }
snafu = "0.7"
workspace-hack = { version = "0.1", path = "../workspace-hack" }

View File

@ -34,7 +34,7 @@ service_grpc_object_store = { path = "../service_grpc_object_store" }
service_grpc_schema = { path = "../service_grpc_schema" }
service_grpc_table = { path = "../service_grpc_table" }
sharder = { path = "../sharder" }
smallvec = "1.10.0"
smallvec = "1.11.0"
thiserror = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
tonic = { workspace = true }

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
async-trait = "0.1.70"
async-trait = "0.1.71"
bytes = "1.4"
datafusion = { workspace = true }
iox_query = { path = "../iox_query" }

View File

@ -28,6 +28,7 @@ prost = "0.11"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.100"
snafu = "0.7"
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = { workspace = true }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
@ -35,4 +36,4 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
assert_matches = "1"
async-trait = "0.1"
metric = { path = "../metric" }
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
test_helpers = { path = "../test_helpers" }

View File

@ -0,0 +1,383 @@
//! Keep alive handling for response streaming.
//!
//! # The Problem
//! Under some deployment scenarios, we receive reports of cryptic error messages for certain long-running queries. For
//! example, the InfluxDB IOx CLI will report:
//!
//! ```text
//! Error querying:
//! Tonic(
//! Status {
//! code: Internal, message: "h2 protocol error: error reading a body from connection: stream error received: unexpected internal error encountered",
//! source: Some(
//! hyper::Error(
//! Body,
//! Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) }
//! )
//! )
//! }
//! )
//! ```
//!
//! And [PyArrow] will report something like:
//!
//! ```text
//! pyarrow._flight.FlightInternalError:
//! Flight returned internal error, with message:
//! Received RST_STREAM with error code 2. gRPC client debug context:
//! UNKNOWN:Error received from peer ipv6:%5B::1%5D:8888 {
//! created_time:"2023-07-03T17:54:56.346363565+02:00",
//! grpc_status:13,
//! grpc_message:"Received RST_STREAM with error code 2"
//! }.
//! Client context: OK
//! ```
//!
//! `Received RST_STREAM with error code 2` is a good hint. According to [RFC 7540] (the HTTP/2 spec) the error code is
//! (see section 7):
//!
//! > INTERNAL_ERROR (0x2): The endpoint encountered an unexpected internal error.
//!
//! and `RST_STREAM` is (see section 6.4):
//!
//! > The `RST_STREAM` frame (type=0x3) allows for immediate termination of a stream. `RST_STREAM` is sent to request
//! > cancellation of a stream or to indicate that an error condition has occurred.
//!
//! The `grpc_status:13` confirms that -- according to [gRPC Status Codes] this means:
//!
//! > Internal errors. This means that some invariants expected by the underlying system have been broken. This error
//! > code is reserved for serious errors.
//!
//! The issue was replicated using [NGINX] and a hack in InfluxDB that makes streams really slow.
//!
//! The underlying issue is that some middleware or egress component -- e.g. [NGINX] -- terminates the response stream
//! because it thinks it is dead.
//!
//! # The Official Way
//! The [gPRC Keepalive] docs say:
//!
//! > HTTP/2 PING-based keepalives are a way to keep an HTTP/2 connection alive even when there is no data being
//! > transferred. This is done by periodically sending a PING frame to the other end of the connection.
//!
//! The `PING` mechanism is described by [RFC 7540] in section 6.7:
//!
//! > In addition to the frame header, `PING` frames MUST contain 8 octets of opaque data in the payload. ...
//! >
//! > Receivers of a `PING frame that does not include an ACK flag MUST send a `PING` frame with the ACK flag set in
//! > response, with an identical payload. ...
//!
//! So every "ping" has a "pong". However the same section also says:
//!
//! > `PING` frames are not associated with any individual stream. If a `PING` frame is received with a stream
//! > identifier field value other than `0x0`, the recipient MUST respond with a connection error (Section 5.4.1) of
//! > type `PROTOCOL_ERROR`.
//!
//! Now how should an egress proxy deal with this? Because streams may come from multiple upstream servers, they have
//! no way to establish a proper ping-pong end-to-end signaling path per stream. Hence in general it is not feasible to
//! use `PING` as a keep-alive mechanism, contrary to what the [gRPC] spec says. So what DO egress proxies do then?
//! Looking at various egress solutions:
//!
//! - <https://github.com/microsoft/reverse-proxy/issues/118#issuecomment-940191553>
//! - <https://kubernetes.github.io/ingress-nginx/examples/grpc/#notes-on-using-responserequest-streams>
//!
//! They all seem to agree that either you set really long timeouts and/or activity-based keep-alive, i.e. they require
//! SOMETHING to be send on that stream.
//!
//! # The Wanted Workaround
//! Since all `PING`-based signalling is broken, we fall back to activity-based keep-alive, i.e. we ensure that we
//! regularly send something in our stream.
//!
//! Our response stream follows the [Apache Flight] defintion. This means that we have a [gRPC] stream with
//! [`FlightData`] messages. Every of these messages has a [`MessageHeader`] describing its content. This is
//! [FlatBuffers] union with the following options:
//!
//! - `None`: This is the implicit default.
//! - `Schema`: Sent before any other data to describe the schema of the stream.
//! - `DictionaryBatch`: Encodes dictionary data. This is not used in practice at the moment because dictionaries are
//! always hydrated.
//! - `RecordBatch`: Content of a `RecordBatch` w/o schema information.
//! - `Tensor`, `SparseTensor`: Irrelevant for us.
//!
//! Ideally we would send a `None` messages with some metdata. However most clients are too broken to accept this and
//! will trip over these messages. E.g. [PyArrow] -- which uses the C++ implementation -- will fail with:
//!
//! ```text
//! OSError: Header-type of flatbuffer-encoded Message is not RecordBatch.
//! ```
//!
//! # The Actual Workaround
//! So we send actual empty `RecordBatch`es instead. These are encoded as `RecordBatch` messages w/o a schema (see
//! section above). The schema is sent separately right at the start of the stream. The arrow-rs implementation does
//! that for us and also ensures that the schema is adjusted for dictionary hydration. So we just inspect the data
//! stream and wait for that schema (the upstream implementation will always send this without any blocking / wait
//! time / actual `RecordBatch` data).
//!
//!
//! [Apache Flight]: https://arrow.apache.org/docs/format/Flight.html
//! [FlatBuffers]: https://flatbuffers.dev/
//! [`FlightData`]: https://github.com/apache/arrow/blob/cd1ed18fd1e08912ea47b64edf55be9c046375c4/format/Flight.proto#L401-L429
//! [gRPC]: https://grpc.io/
//! [gPRC Keepalive]: https://grpc.io/docs/guides/keepalive/
//! [gRPC Status Codes]: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
//! [`MessageHeader`]: https://github.com/apache/arrow/blob/cd1ed18fd1e08912ea47b64edf55be9c046375c4/format/Message.fbs#L124-L132
//! [NGINX]: https://nginx.org/
//! [PyArrow]: https://arrow.apache.org/docs/python/index.html
//! [RFC 7540]: https://httpwg.org/specs/rfc7540.html
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use arrow::{
datatypes::{DataType, Schema, SchemaRef},
ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
record_batch::RecordBatch,
};
use arrow_flight::{error::FlightError, FlightData};
use futures::{stream::BoxStream, Stream, StreamExt};
use observability_deps::tracing::{info, warn};
use tokio::time::{Interval, MissedTickBehavior};
/// Keep alive underlying response stream by sending regular empty [`RecordBatch`]es.
pub struct KeepAliveStream {
inner: BoxStream<'static, Result<FlightData, FlightError>>,
}
impl KeepAliveStream {
/// Create new keep-alive wrapper from the underlying stream and the given interval.
///
/// The interval is measured from the last message -- which can either be a "real" message or a keep-alive.
pub fn new<S>(s: S, interval: Duration) -> Self
where
S: Stream<Item = Result<FlightData, FlightError>> + Send + 'static,
{
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
let state = State {
inner: s.boxed(),
schema: None,
ticker,
};
let inner = futures::stream::unfold(state, |mut state| async move {
loop {
tokio::select! {
_ = state.ticker.tick() => {
let Some(data) = build_empty_batch_msg(state.schema.as_ref()) else {
continue;
};
info!("stream keep-alive");
return Some((Ok(data), state));
}
res = state.inner.next() => {
// peek at content to detect schema transmission
if let Some(Ok(data)) = &res {
if let Some(schema) = decode_schema(data) {
if check_schema(&schema) {
state.schema = Some(Arc::new(schema));
}
}
}
state.ticker.reset();
return res.map(|res| (res, state));
}
}
}
})
.boxed();
Self { inner }
}
}
impl Stream for KeepAliveStream {
type Item = Result<FlightData, FlightError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
/// Inner state of [`KeepAliveStream`]
struct State {
/// The underlying stream that is kept alive.
inner: BoxStream<'static, Result<FlightData, FlightError>>,
/// A [`Schema`] that was already received from the stream.
///
/// We need this to produce sensible empty [`RecordBatch`]es and because [`RecordBatch`] messages can only come
/// AFTER an encoded [`Schema`].
schema: Option<SchemaRef>,
/// Keep-alive ticker.
ticker: Interval,
}
/// Decode [`Schema`] from response data stream.
fn decode_schema(data: &FlightData) -> Option<Schema> {
let message = arrow::ipc::root_as_message(&data.data_header[..]).ok()?;
if arrow::ipc::MessageHeader::Schema != message.header_type() {
return None;
}
Schema::try_from(data).ok()
}
/// Check that the [`Schema`] that we've [decoded](decode_schema) is sensible.
///
/// Returns `true` if the [`Schema`] is OK. Will log a warning and return `false` if there is a problem.
fn check_schema(schema: &Schema) -> bool {
schema.fields().iter().all(|field| match field.data_type() {
DataType::Dictionary(_, _) => {
warn!(
field = field.name(),
"arrow IPC schema still contains dictionary, should have been hydrated by now",
);
false
}
_ => true,
})
}
/// Encode an empty [`RecordBatch`] as a message.
///
/// This must only be sent AFTER a [`Schema`] was transmitted.
fn build_empty_batch_msg(schema: Option<&SchemaRef>) -> Option<FlightData> {
let Some(schema) = schema else {
warn!(
"cannot send keep-alive because no schema was transmitted yet",
);
return None;
};
let batch = RecordBatch::new_empty(Arc::clone(schema));
let data_gen = IpcDataGenerator::default();
let mut dictionary_tracker = DictionaryTracker::new(true);
let write_options = IpcWriteOptions::default();
let batch_data = match data_gen.encoded_batch(&batch, &mut dictionary_tracker, &write_options) {
Ok((dicts_data, batch_data)) => {
assert!(dicts_data.is_empty());
batch_data
}
Err(e) => {
warn!(
%e,
"cannot encode empty batch",
);
return None;
}
};
Some(batch_data.into())
}
#[cfg(test)]
pub mod test_util {
use std::time::Duration;
use futures::{stream::BoxStream, Stream, StreamExt};
/// Ensure that there is a delay between steam responses.
pub fn make_stream_slow<S>(s: S, delay: Duration) -> BoxStream<'static, S::Item>
where
S: Send + Stream + Unpin + 'static,
{
futures::stream::unfold(s, move |mut s| async move {
tokio::time::sleep(delay).await;
let res = s.next().await;
res.map(|res| (res, s))
})
.boxed()
}
}
#[cfg(test)]
mod tests {
use arrow::{array::Int64Array, datatypes::Field};
use arrow_flight::{decode::FlightRecordBatchStream, encode::FlightDataEncoderBuilder};
use datafusion::assert_batches_eq;
use futures::TryStreamExt;
use test_helpers::maybe_start_logging;
use super::{test_util::make_stream_slow, *};
type BatchStream = BoxStream<'static, Result<RecordBatch, FlightError>>;
type FlightStream = BoxStream<'static, Result<FlightData, FlightError>>;
#[tokio::test]
#[should_panic(expected = "stream timeout")]
async fn test_timeout() {
let s = make_test_stream(false);
let s = FlightRecordBatchStream::new_from_flight_data(s);
s.collect::<Vec<_>>().await;
}
#[tokio::test]
async fn test_keep_alive() {
maybe_start_logging();
let s = make_test_stream(true);
let s = FlightRecordBatchStream::new_from_flight_data(s);
let batches: Vec<_> = s.try_collect().await.unwrap();
assert_batches_eq!(
vec!["+---+", "| f |", "+---+", "| 1 |", "| 2 |", "| 3 |", "| 4 |", "| 5 |", "+---+",],
&batches
);
}
/// Creates a stream like the query processing would do.
fn make_query_result_stream() -> (BatchStream, SchemaRef) {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int64, false)]));
let batch_1 = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let batch_2 = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int64Array::from(vec![4, 5]))],
)
.unwrap();
let s = futures::stream::iter([batch_1, batch_2]).map(Ok).boxed();
(s, schema)
}
/// Convert query result stream (= [`RecordBatch`]es) into a [`FlightData`] stream.
///
/// This stream will -- as in prod -- send the [`Schema`] data even when there are no [`RecordBatch`]es yet.
fn make_flight_data_stream(s: BatchStream, schema: SchemaRef) -> FlightStream {
FlightDataEncoderBuilder::new()
.with_schema(schema)
.build(s)
.boxed()
}
fn panic_on_stream_timeout(s: FlightStream, timeout: Duration) -> FlightStream {
futures::stream::unfold(s, move |mut s| async move {
let res = tokio::time::timeout(timeout, s.next())
.await
.expect("stream timeout");
res.map(|res| (res, s))
})
.boxed()
}
fn make_test_stream(keep_alive: bool) -> FlightStream {
let (s, schema) = make_query_result_stream();
let s = make_stream_slow(s, Duration::from_millis(500));
let s = make_flight_data_stream(s, schema);
let s = if keep_alive {
KeepAliveStream::new(s, Duration::from_millis(100)).boxed()
} else {
s
};
let s = panic_on_stream_timeout(s, Duration::from_millis(250));
s
}
}

View File

@ -16,14 +16,16 @@
unused_crate_dependencies
)]
use keep_alive::KeepAliveStream;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
mod keep_alive;
mod request;
use arrow::error::ArrowError;
use arrow_flight::{
encode::{FlightDataEncoder, FlightDataEncoderBuilder},
encode::FlightDataEncoderBuilder,
flight_descriptor::DescriptorType,
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
@ -44,7 +46,13 @@ use prost::Message;
use request::{IoxGetRequest, RunQuery};
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use std::{
fmt::Debug,
pin::Pin,
sync::Arc,
task::Poll,
time::{Duration, Instant},
};
use tonic::{
metadata::{AsciiMetadataValue, MetadataMap},
Request, Response, Streaming,
@ -65,6 +73,9 @@ const IOX_FLIGHT_SQL_DATABASE_HEADERS: [&str; 4] = [
"iox-namespace-name", // deprecated
];
/// In which interval should the `DoGet` stream send empty messages as keep alive markers?
const DO_GET_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
@ -883,7 +894,7 @@ fn has_debug_header(metadata: &MetadataMap) -> bool {
/// Wrapper over a FlightDataEncodeStream that adds IOx specfic
/// metadata and records completion
struct GetStream {
inner: FlightDataEncoder,
inner: KeepAliveStream,
#[allow(dead_code)]
permit: InstrumentedAsyncOwnedSemaphorePermit,
query_completed_token: QueryCompletedToken,
@ -919,6 +930,9 @@ impl GetStream {
.with_metadata(app_metadata.encode_to_vec().into())
.build(query_results);
// add keep alive
let inner = KeepAliveStream::new(inner, DO_GET_KEEP_ALIVE_INTERVAL);
Ok(Self {
inner,
permit,
@ -958,7 +972,6 @@ impl Stream for GetStream {
}
}
}
#[cfg(test)]
mod tests {
use arrow_flight::sql::ProstMessageExt;

View File

@ -28,7 +28,7 @@ arrow = { workspace = true, features = ["prettyprint"] }
futures = "0.3"
pin-project = "1.1"
prost = "0.11"
regex = "1.8.4"
regex = "1.9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.100"
snafu = "0.7"

View File

@ -13,7 +13,7 @@ tracing-log = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
observability_deps = { path = "../observability_deps" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
async-trait = { version = "0.1.70", optional = true }
async-trait = { version = "0.1.71", optional = true }
tokio = { version = "1.29.1", optional = true, default_features = false, features = ["time"] }
[features]

View File

@ -28,7 +28,7 @@ once_cell = { version = "1.18", features = ["parking_lot"] }
parking_lot = "0.12"
prost = "0.11"
rand = "0.8.3"
regex = "1.8"
regex = "1.9"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
snafu = "0.7"
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }

View File

@ -69,6 +69,7 @@ prost-types = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
regex = { version = "1" }
regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.7" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] }
ring = { version = "0.16", features = ["std"] }
@ -138,6 +139,7 @@ prost-types = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
regex = { version = "1" }
regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.7" }
ring = { version = "0.16", features = ["std"] }
serde = { version = "1", features = ["derive", "rc"] }