diff --git a/Cargo.lock b/Cargo.lock index a0529dc94e..0c91048612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -379,7 +379,7 @@ dependencies = [ "arrow-schema", "arrow-select", "regex", - "regex-syntax 0.7.2", + "regex-syntax 0.7.3", ] [[package]] @@ -485,9 +485,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.70" +version = "0.1.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79fa67157abdfd688a259b6648808757db9347af834624f27ec646da976aee5d" +checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" dependencies = [ "proc-macro2", "quote", @@ -686,7 +686,7 @@ checksum = "a246e68bb43f6cd9db24bea052a53e40405417c5fb372e3d1a8a7f770a564ef5" dependencies = [ "memchr", "once_cell", - "regex-automata", + "regex-automata 0.1.10", "serde", ] @@ -841,9 +841,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.10" +version = "4.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "384e169cc618c613d5e3ca6404dda77a8685a63e08660dcc64abaf7da7cb0c7a" +checksum = "1640e5cc7fb47dbb8338fd471b105e7ed6c3cb2aeb00c2e067127ffd3764a05d" dependencies = [ "clap_builder", "clap_derive", @@ -873,9 +873,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.10" +version = "4.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef137bbe35aab78bdb468ccfba75a5f4d8321ae011d34063770780545176af2d" +checksum = "98c59138d527eeaf9b53f35a77fcc1fad9d883116070c63d5de1c7dc7b00c72b" dependencies = [ "anstream", "anstyle", @@ -1458,7 +1458,7 @@ dependencies = [ "hashbrown 0.14.0", "itertools 0.10.5", "log", - "regex-syntax 0.7.2", + "regex-syntax 0.7.3", ] [[package]] @@ -1723,7 +1723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" dependencies = [ "cfg-if", - "rustix 0.38.2", + "rustix 0.38.3", "windows-sys 0.48.0", ] @@ -2287,7 +2287,7 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ "http", "hyper", - "rustls 0.21.2", + "rustls 0.21.3", "tokio", "tokio-rustls 0.24.1", ] @@ -3115,7 +3115,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24fddda5af7e54bf7da53067d6e802dbcc381d0a8eef629df528e3ebf68755cb" dependencies = [ "hermit-abi", - "rustix 0.38.2", + "rustix 0.38.3", "windows-sys 0.48.0", ] @@ -3337,7 +3337,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ - "regex-automata", + "regex-automata 0.1.10", ] [[package]] @@ -4410,7 +4410,7 @@ dependencies = [ "itertools 0.11.0", "once_cell", "regex", - "regex-syntax 0.7.2", + "regex-syntax 0.7.3", "schema", "snafu", "tokio", @@ -4537,13 +4537,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.8.4" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" +checksum = "89089e897c013b3deb627116ae56a6955a72b8bed395c9526af31c9fe528b484" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.2", + "regex-automata 0.3.0", + "regex-syntax 0.7.3", ] [[package]] @@ -4555,6 +4556,17 @@ dependencies = [ "regex-syntax 0.6.29", ] +[[package]] +name = "regex-automata" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa250384981ea14565685dea16a9ccc4d1c541a13f82b9c168572264d1df8c56" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.7.3", +] + [[package]] name = "regex-syntax" version = "0.6.29" @@ -4563,9 +4575,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +checksum = "2ab07dc67230e4a4718e70fd5c20055a4334b121f1f9db8fe63ef39ce9b8c846" [[package]] name = "reqwest" @@ -4590,7 +4602,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.2", + "rustls 0.21.3", "rustls-pemfile", "serde", "serde_json", @@ -4717,9 +4729,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.2" +version = "0.38.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabcb0461ebd01d6b79945797c27f8529082226cb630a9865a71870ff63532a4" +checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4" dependencies = [ "bitflags 2.3.3", "errno", @@ -4742,13 +4754,13 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.2" +version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e32ca28af694bc1bbf399c33a516dbdf1c90090b8ab23c2bc24f834aa2247f5f" +checksum = "b19faa85ecb5197342b54f987b142fb3e30d0c90da40f80ef4fa9a726e6676ed" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.1", "sct", ] @@ -4771,6 +4783,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.101.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.12" @@ -4963,6 +4985,7 @@ dependencies = [ "serde_json", "service_common", "snafu", + "test_helpers", "tokio", "tonic", "trace", @@ -5159,9 +5182,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" [[package]] name = "snafu" @@ -5766,7 +5789,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.2", + "rustls 0.21.3", "tokio", ] @@ -5808,9 +5831,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ebafdf5ad1220cb59e7d17cf4d2c72015297b75b19a10472f99b89225089240" +checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542" dependencies = [ "serde", "serde_spanned", @@ -5829,9 +5852,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.19.11" +version = "0.19.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266f016b7f039eec8a1a80dfe6156b633d208b9fccca5e4db1d6775b0c4e34a7" +checksum = "c500344a19072298cd05a7224b3c0c629348b78692bf48466c5238656e315a78" dependencies = [ "indexmap 2.0.0", "serde", @@ -6435,7 +6458,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" dependencies = [ - "rustls-webpki", + "rustls-webpki 0.100.1", ] [[package]] @@ -6709,11 +6732,12 @@ dependencies = [ "rand", "rand_core", "regex", - "regex-syntax 0.7.2", + "regex-automata 0.3.0", + "regex-syntax 0.7.3", "reqwest", "ring", - "rustix 0.38.2", - "rustls 0.21.2", + "rustix 0.38.3", + "rustls 0.21.3", "scopeguard", "serde", "serde_json", diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index 35d2910657..adf2eef49c 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -16,7 +16,7 @@ comfy-table = { version = "7.0", default-features = false } hashbrown = { workspace = true } num-traits = "0.2" once_cell = { version = "1.18", features = ["parking_lot"] } -regex = "1.8.4" +regex = "1.9.0" snafu = "0.7" uuid = "1" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/cache_system/Cargo.toml b/cache_system/Cargo.toml index de55ede341..539d70e5b7 100644 --- a/cache_system/Cargo.toml +++ b/cache_system/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true [dependencies] -async-trait = "0.1.70" +async-trait = "0.1.71" backoff = { path = "../backoff" } futures = "0.3" iox_time = { path = "../iox_time" } diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml index bd0652ee68..8d37f5955c 100644 --- a/compactor/Cargo.toml +++ b/compactor/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true [dependencies] -async-trait = "0.1.70" +async-trait = "0.1.71" backoff = { path = "../backoff" } bytes = "1.4" compactor_scheduler = { path = "../compactor_scheduler" } diff --git a/compactor_scheduler/Cargo.toml b/compactor_scheduler/Cargo.toml index 8c7891b7bc..426b5ab1ba 100644 --- a/compactor_scheduler/Cargo.toml +++ b/compactor_scheduler/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true [dependencies] -async-trait = "0.1.70" +async-trait = "0.1.71" backoff = { path = "../backoff" } data_types = { path = "../data_types" } iox_catalog = { path = "../iox_catalog" } diff --git a/compactor_test_utils/Cargo.toml b/compactor_test_utils/Cargo.toml index 8fefc05087..25281b20c5 100644 --- a/compactor_test_utils/Cargo.toml +++ b/compactor_test_utils/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] -async-trait = "0.1.70" +async-trait = "0.1.71" backoff = { path = "../backoff" } compactor = { path = "../compactor" } compactor_scheduler = { path = "../compactor_scheduler" } diff --git a/influxdb_iox/tests/query_tests/cases.rs b/influxdb_iox/tests/query_tests/cases.rs index 221869b5b3..b46b480f78 100644 --- a/influxdb_iox/tests/query_tests/cases.rs +++ b/influxdb_iox/tests/query_tests/cases.rs @@ -426,6 +426,20 @@ mod influxql { .await; } + /// Test TOP/BOTTOM functions, which use window functions to project + /// the top or bottom rows in groups. + #[tokio::test] + async fn top_bottom() { + test_helpers::maybe_start_logging(); + + TestCase { + input: "cases/in/top_bottom.influxql", + chunk_stage: ChunkStage::Ingester, + } + .run() + .await; + } + #[tokio::test] async fn influxql_metadata() { test_helpers::maybe_start_logging(); diff --git a/influxdb_iox/tests/query_tests/cases/in/top_bottom.influxql b/influxdb_iox/tests/query_tests/cases/in/top_bottom.influxql new file mode 100644 index 0000000000..f4c14f2854 --- /dev/null +++ b/influxdb_iox/tests/query_tests/cases/in/top_bottom.influxql @@ -0,0 +1,26 @@ +-- IOX_SETUP: top_bottom + +-- +-- top +-- +SELECT top(writes, 2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +SELECT top(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0'; +SELECT top(usage_idle,5), cpu FROM cpu GROUP BY machine; +SELECT top(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +SELECT top(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +SELECT top(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +SELECT top(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +SELECT top(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +SELECT top(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s); +-- +-- bottom +-- +SELECT bottom(reads, 3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +SELECT bottom(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1'; +SELECT bottom(usage_idle,5), cpu FROM cpu GROUP BY machine; +SELECT bottom(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +SELECT bottom(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +SELECT bottom(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +SELECT bottom(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +SELECT bottom(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +SELECT bottom(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s); \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/cases/in/top_bottom.influxql.expected b/influxdb_iox/tests/query_tests/cases/in/top_bottom.influxql.expected new file mode 100644 index 0000000000..2e1b7c8932 --- /dev/null +++ b/influxdb_iox/tests/query_tests/cases/in/top_bottom.influxql.expected @@ -0,0 +1,210 @@ +-- Test Setup: top_bottom +-- InfluxQL: SELECT top(writes, 2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +name: diskio ++---------------------+---------+ +| time | top | ++---------------------+---------+ +| 1970-01-01T00:03:20 | 5593589 | +| 1970-01-01T00:03:30 | 5593735 | ++---------------------+---------+ +-- InfluxQL: SELECT top(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0'; +name: cpu ++---------------------+------+ +| time | top | ++---------------------+------+ +| 1970-01-01T00:02:10 | 89.8 | +| 1970-01-01T00:02:50 | 89.8 | +| 1970-01-01T00:03:00 | 90.0 | ++---------------------+------+ +-- InfluxQL: SELECT top(usage_idle,5), cpu FROM cpu GROUP BY machine; +name: cpu +tags: machine=machine1 ++---------------------+------+------+ +| time | top | cpu | ++---------------------+------+------+ +| 1970-01-01T00:01:00 | 99.8 | cpu1 | +| 1970-01-01T00:01:20 | 99.8 | cpu1 | +| 1970-01-01T00:02:00 | 99.9 | cpu1 | +| 1970-01-01T00:02:20 | 99.9 | cpu1 | +| 1970-01-01T00:02:30 | 99.9 | cpu1 | ++---------------------+------+------+ +name: cpu +tags: machine=machine2 ++---------------------+------+------+ +| time | top | cpu | ++---------------------+------+------+ +| 1970-01-01T00:01:00 | 89.8 | cpu1 | +| 1970-01-01T00:01:20 | 89.8 | cpu1 | +| 1970-01-01T00:02:00 | 89.9 | cpu1 | +| 1970-01-01T00:02:20 | 89.9 | cpu1 | +| 1970-01-01T00:02:30 | 89.9 | cpu1 | ++---------------------+------+------+ +-- InfluxQL: SELECT top(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +name: diskio ++---------------------+---------+ +| time | top | ++---------------------+---------+ +| 1970-01-01T00:02:10 | 5592646 | +| 1970-01-01T00:02:20 | 5592810 | +| 1970-01-01T00:02:30 | 5592997 | +| 1970-01-01T00:02:40 | 5593109 | +| 1970-01-01T00:02:50 | 5593219 | +| 1970-01-01T00:03:00 | 5593438 | +| 1970-01-01T00:03:10 | 5593513 | +| 1970-01-01T00:03:20 | 5593589 | +| 1970-01-01T00:03:30 | 5593735 | ++---------------------+---------+ +-- InfluxQL: SELECT top(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +name: diskio ++---------------------+---------+ +| time | top | ++---------------------+---------+ +| 1970-01-01T00:02:10 | 5592646 | +| 1970-01-01T00:02:20 | 5592810 | +| 1970-01-01T00:02:40 | 5593109 | +| 1970-01-01T00:02:50 | 5593219 | +| 1970-01-01T00:03:10 | 5593513 | +| 1970-01-01T00:03:20 | 5593589 | +| 1970-01-01T00:03:30 | 5593735 | ++---------------------+---------+ +-- InfluxQL: SELECT top(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +name: cpu ++---------------------+------+----------+------+ +| time | top | machine | cpu | ++---------------------+------+----------+------+ +| 1970-01-01T00:02:00 | 99.9 | machine1 | cpu1 | +| 1970-01-01T00:03:00 | 90.0 | machine1 | cpu0 | ++---------------------+------+----------+------+ +-- InfluxQL: SELECT top(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +name: cpu ++---------------------+------+----------+------+ +| time | top | machine | cpu | ++---------------------+------+----------+------+ +| 1970-01-01T00:02:00 | 99.9 | machine1 | cpu1 | +| 1970-01-01T00:02:00 | 89.9 | machine2 | cpu1 | ++---------------------+------+----------+------+ +-- InfluxQL: SELECT top(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +name: cpu ++---------------------+------+----------+-----------+ +| time | top | machine | machine_1 | ++---------------------+------+----------+-----------+ +| 1970-01-01T00:02:00 | 99.9 | machine1 | machine1 | +| 1970-01-01T00:02:00 | 89.9 | machine2 | machine2 | ++---------------------+------+----------+-----------+ +-- InfluxQL: SELECT top(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s); +name: cpu ++---------------------+------+----------+------+ +| time | top | machine | cpu | ++---------------------+------+----------+------+ +| 1970-01-01T00:01:00 | 99.8 | machine1 | cpu1 | +| 1970-01-01T00:01:00 | 89.8 | machine2 | cpu1 | +| 1970-01-01T00:02:00 | 99.9 | machine1 | cpu1 | +| 1970-01-01T00:02:30 | 90.4 | machine1 | cpu0 | +| 1970-01-01T00:03:00 | 99.8 | machine1 | cpu1 | +| 1970-01-01T00:03:00 | 90.0 | machine1 | cpu0 | ++---------------------+------+----------+------+ +-- InfluxQL: SELECT bottom(reads, 3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +name: diskio ++---------------------+---------+ +| time | bottom | ++---------------------+---------+ +| 1970-01-01T00:02:10 | 2592646 | +| 1970-01-01T00:02:30 | 2592997 | +| 1970-01-01T00:02:50 | 2593219 | ++---------------------+---------+ +-- InfluxQL: SELECT bottom(usage_system,3) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1'; +name: cpu ++---------------------+--------+ +| time | bottom | ++---------------------+--------+ +| 1970-01-01T00:01:00 | 89.8 | +| 1970-01-01T00:01:10 | 89.7 | +| 1970-01-01T00:02:10 | 89.8 | ++---------------------+--------+ +-- InfluxQL: SELECT bottom(usage_idle,5), cpu FROM cpu GROUP BY machine; +name: cpu +tags: machine=machine1 ++---------------------+--------+------+ +| time | bottom | cpu | ++---------------------+--------+------+ +| 1970-01-01T00:01:10 | 88.6 | cpu0 | +| 1970-01-01T00:01:20 | 88.6 | cpu0 | +| 1970-01-01T00:01:30 | 83.4 | cpu0 | +| 1970-01-01T00:01:40 | 87.7 | cpu0 | +| 1970-01-01T00:02:00 | 86.9 | cpu0 | ++---------------------+--------+------+ +name: cpu +tags: machine=machine2 ++---------------------+--------+------+ +| time | bottom | cpu | ++---------------------+--------+------+ +| 1970-01-01T00:01:10 | 78.6 | cpu0 | +| 1970-01-01T00:01:20 | 78.6 | cpu0 | +| 1970-01-01T00:01:30 | 73.4 | cpu0 | +| 1970-01-01T00:01:40 | 77.7 | cpu0 | +| 1970-01-01T00:02:00 | 76.9 | cpu0 | ++---------------------+--------+------+ +-- InfluxQL: SELECT bottom(writes,3) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +name: diskio ++---------------------+---------+ +| time | bottom | ++---------------------+---------+ +| 1970-01-01T00:02:10 | 5592646 | +| 1970-01-01T00:02:20 | 5592810 | +| 1970-01-01T00:02:30 | 5592997 | +| 1970-01-01T00:02:40 | 5593109 | +| 1970-01-01T00:02:50 | 5593219 | +| 1970-01-01T00:03:00 | 5593438 | +| 1970-01-01T00:03:10 | 5593513 | +| 1970-01-01T00:03:20 | 5593589 | +| 1970-01-01T00:03:30 | 5593735 | ++---------------------+---------+ +-- InfluxQL: SELECT bottom(writes,2) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +name: diskio ++---------------------+---------+ +| time | bottom | ++---------------------+---------+ +| 1970-01-01T00:02:10 | 5592646 | +| 1970-01-01T00:02:20 | 5592810 | +| 1970-01-01T00:02:30 | 5592997 | +| 1970-01-01T00:02:40 | 5593109 | +| 1970-01-01T00:03:00 | 5593438 | +| 1970-01-01T00:03:10 | 5593513 | +| 1970-01-01T00:03:30 | 5593735 | ++---------------------+---------+ +-- InfluxQL: SELECT bottom(usage_system,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +name: cpu ++---------------------+--------+----------+------+ +| time | bottom | machine | cpu | ++---------------------+--------+----------+------+ +| 1970-01-01T00:01:30 | 73.4 | machine2 | cpu0 | +| 1970-01-01T00:01:30 | 83.4 | machine1 | cpu0 | ++---------------------+--------+----------+------+ +-- InfluxQL: SELECT bottom(usage_system,machine,2),cpu FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +name: cpu ++---------------------+--------+----------+------+ +| time | bottom | machine | cpu | ++---------------------+--------+----------+------+ +| 1970-01-01T00:01:30 | 73.4 | machine2 | cpu0 | +| 1970-01-01T00:01:30 | 83.4 | machine1 | cpu0 | ++---------------------+--------+----------+------+ +-- InfluxQL: SELECT bottom(usage_system,machine,2),machine FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001; +name: cpu ++---------------------+--------+----------+-----------+ +| time | bottom | machine | machine_1 | ++---------------------+--------+----------+-----------+ +| 1970-01-01T00:01:30 | 83.4 | machine1 | machine1 | +| 1970-01-01T00:01:30 | 73.4 | machine2 | machine2 | ++---------------------+--------+----------+-----------+ +-- InfluxQL: SELECT bottom(usage_idle,machine,cpu,2) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 GROUP BY TIME(60s); +name: cpu ++---------------------+--------+----------+------+ +| time | bottom | machine | cpu | ++---------------------+--------+----------+------+ +| 1970-01-01T00:01:30 | 73.4 | machine2 | cpu0 | +| 1970-01-01T00:01:30 | 83.4 | machine1 | cpu0 | +| 1970-01-01T00:02:00 | 76.9 | machine2 | cpu0 | +| 1970-01-01T00:02:00 | 86.9 | machine1 | cpu0 | +| 1970-01-01T00:03:10 | 78.8 | machine2 | cpu0 | +| 1970-01-01T00:03:10 | 88.8 | machine1 | cpu0 | ++---------------------+--------+----------+------+ \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/data/top_bottom.lp b/influxdb_iox/tests/query_tests/data/top_bottom.lp new file mode 100644 index 0000000000..dfc4cff5e9 --- /dev/null +++ b/influxdb_iox/tests/query_tests/data/top_bottom.lp @@ -0,0 +1,80 @@ +# Load into influxdb 1.8: +# +# curl localhost:8086/write\?db=top_bottom --data-binary "@influxdb_iox/tests/query_tests/data/top_bottom.lp" +# +# Float data, regular intervals, usage_system has gaps +# +cpu,cpu=cpu0,machine=machine1 usage_idle=89.5,usage_system=89.5 0000000060000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=79.5,usage_system=79.5 0000000060000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000060000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000060000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=88.6,usage_system=88.6 0000000070000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=78.6,usage_system=78.6 0000000070000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.7,usage_system=99.7 0000000070000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.7,usage_system=89.7 0000000070000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=88.6 0000000080000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=78.6 0000000080000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8 0000000080000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8 0000000080000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.7 0000000090000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.7 0000000090000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=83.4,usage_system=83.4 0000000090000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=73.4,usage_system=73.4 0000000090000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.7 0000000100000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.7 0000000100000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=87.7,usage_system=87.7 0000000100000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=77.7,usage_system=77.7 0000000100000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=88.7 0000000110000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=78.7 0000000110000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.3 0000000110000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.3 0000000110000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=86.9 0000000120000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=76.9 0000000120000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.9,usage_system=99.9 0000000120000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.9,usage_system=89.9 0000000120000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=89.8,usage_system=89.8 0000000130000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=79.8,usage_system=79.8 0000000130000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000130000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000130000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=89.0 0000000140000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=79.0 0000000140000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.9,usage_system=99.9 0000000140000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.9,usage_system=89.9 0000000140000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=90.4 0000000150000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=80.4 0000000150000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.9 0000000150000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.9 0000000150000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=90.2 0000000160000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=80.2 0000000160000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8 0000000160000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8 0000000160000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8 0000000170000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8 0000000170000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=89.8,usage_system=89.8 0000000170000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=79.8,usage_system=79.8 0000000170000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=90.0,usage_system=90.0 0000000180000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=80.0,usage_system=80.0 0000000180000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000180000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000180000000000 +cpu,cpu=cpu0,machine=machine1 usage_idle=88.8 0000000190000000000 +cpu,cpu=cpu0,machine=machine2 usage_idle=78.8 0000000190000000000 +cpu,cpu=cpu1,machine=machine1 usage_idle=99.8,usage_system=99.8 0000000190000000000 +cpu,cpu=cpu1,machine=machine2 usage_idle=89.8,usage_system=89.8 0000000190000000000 + +# integers at regular intervals +diskio,name=disk0 reads=2591520i,writes=5591520i 0000000060000000000 +diskio,name=disk0 writes=5591620i 0000000070000000000 +diskio,name=disk0 writes=5591729i 0000000080000000000 +diskio,name=disk0 writes=5592114i 0000000090000000000 +diskio,name=disk0 writes=5592210i 0000000100000000000 +diskio,name=disk0 reads=2592366i,writes=5592366i 0000000110000000000 +diskio,name=disk0 reads=2592576i,writes=5592576i 0000000120000000000 +diskio,name=disk0 reads=2592646i,writes=5592646i 0000000130000000000 +diskio,name=disk0 writes=5592810i 0000000140000000000 +diskio,name=disk0 reads=2592997i,writes=5592997i 0000000150000000000 +diskio,name=disk0 writes=5593109i 0000000160000000000 +diskio,name=disk0 reads=2593219i,writes=5593219i 0000000170000000000 +diskio,name=disk0 reads=2593438i,writes=5593438i 0000000180000000000 +diskio,name=disk0 writes=5593513i 0000000190000000000 +diskio,name=disk0 reads=2593589i,writes=5593589i 0000000200000000000 +diskio,name=disk0 reads=2593735i,writes=5593735i 0000000210000000000 diff --git a/influxdb_iox/tests/query_tests/setups.rs b/influxdb_iox/tests/query_tests/setups.rs index dc865f0306..e9fdd79340 100644 --- a/influxdb_iox/tests/query_tests/setups.rs +++ b/influxdb_iox/tests/query_tests/setups.rs @@ -1378,6 +1378,20 @@ pub static SETUPS: Lazy> = Lazy::new(|| { }, ], ), + ( + // Used for top/bottom function tests for InfluxQL + "top_bottom", + vec![ + Step::RecordNumParquetFiles, + Step::WriteLineProtocol( + include_str!("data/top_bottom.lp").to_string() + ), + Step::Persist, + Step::WaitForPersisted { + expected_increase: 1, + }, + ], + ), ( "DuplicateDifferentDomains", (0..2) diff --git a/influxdb_line_protocol/Cargo.toml b/influxdb_line_protocol/Cargo.toml index 73c7c176d7..8ab9822686 100644 --- a/influxdb_line_protocol/Cargo.toml +++ b/influxdb_line_protocol/Cargo.toml @@ -20,7 +20,7 @@ repository = "https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line bytes = "1.4" log = "0.4.19" nom = { version = "7", default-features = false, features = ["std"] } -smallvec = { version = "1.10.0", features = ["union"] } +smallvec = { version = "1.11.0", features = ["union"] } snafu = "0.7" [dev-dependencies] # In alphabetical order diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index ee54732e12..32d778d8fb 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -10,7 +10,7 @@ arrow = { workspace = true, features = ["prettyprint"] } arrow_util = { version = "0.1.0", path = "../arrow_util" } arrow-flight = { workspace = true } async-channel = "1.8.0" -async-trait = "0.1.70" +async-trait = "0.1.71" backoff = { version = "0.1.0", path = "../backoff" } bytes = "1.4.0" crossbeam-utils = "0.8.16" diff --git a/iox_catalog/Cargo.toml b/iox_catalog/Cargo.toml index 0bdb5e3fac..b6255c8ed1 100644 --- a/iox_catalog/Cargo.toml +++ b/iox_catalog/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true [dependencies] # In alphabetical order -async-trait = "0.1.70" +async-trait = "0.1.71" data_types = { path = "../data_types" } futures = "0.3" iox_time = { version = "0.1.0", path = "../iox_time" } diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 3a90e111d0..6d858a555e 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -20,13 +20,13 @@ mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch = { path = "../mutable_batch" } parquet_file = { path = "../parquet_file" } rand = { version = "0.8.3", features = ["small_rng"] } -regex = "1.8" +regex = "1.9" schema = { path = "../schema" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.100" snafu = "0.7" tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } -toml = "0.7.5" +toml = "0.7.6" tracing = "0.1" tracing-subscriber = "0.3" uuid = { version = "1", default_features = false } diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index f7cd18fe4a..a095f03db1 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -82,7 +82,7 @@ use schema::{ InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME, INFLUXQL_METADATA_KEY, }; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{hash_map::Entry, BTreeSet, HashMap, HashSet}; use std::fmt::Debug; use std::iter; use std::ops::{Bound, ControlFlow, Deref, Not, Range}; @@ -421,20 +421,6 @@ impl<'a> Context<'a> { fn fill(&self) -> FillClause { self.fill.unwrap_or_default() } - - fn is_aggregate(&self) -> bool { - matches!( - self.projection_type, - ProjectionType::Aggregate - | ProjectionType::WindowAggregate - | ProjectionType::WindowAggregateMixed - | ProjectionType::Selector { .. } - ) - } - - fn is_raw_distinct(&self) -> bool { - matches!(self.projection_type, ProjectionType::RawDistinct) - } } #[allow(missing_debug_implementations)] @@ -775,38 +761,126 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], group_by_tag_set: &[&str], ) -> Result { - if ctx.projection_type == ProjectionType::WindowAggregateMixed { - return error::not_implemented("mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)"); + match ctx.projection_type { + ProjectionType::Raw => self.project_select_raw(input, fields), + ProjectionType::RawDistinct => self.project_select_raw_distinct(input, fields), + ProjectionType::Aggregate | ProjectionType::Selector{..} => self.project_select_aggregate(ctx, input, fields, group_by_tag_set), + ProjectionType::Window => self.project_select_window(ctx, input, fields, group_by_tag_set), + ProjectionType::WindowAggregate => self.project_select_window_aggregate(ctx, input, fields, group_by_tag_set), + ProjectionType::WindowAggregateMixed => error::not_implemented("mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)"), + ProjectionType::TopBottomSelector => self.project_select_top_bottom_selector(ctx, input, fields, group_by_tag_set), } + } + fn project_select_raw(&self, input: LogicalPlan, fields: &[Field]) -> Result { + let schemas = Schemas::new(input.schema())?; + + // Transform InfluxQL AST field expressions to a list of DataFusion expressions. + let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + + // Wrap the plan in a `LogicalPlan::Projection` from the select expressions + project(input, select_exprs) + } + + fn project_select_raw_distinct( + &self, + input: LogicalPlan, + fields: &[Field], + ) -> Result { let schemas = Schemas::new(input.schema())?; // Transform InfluxQL AST field expressions to a list of DataFusion expressions. let mut select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; - if ctx.is_raw_distinct() { - // This is a special case, where exactly one column can be projected with a `DISTINCT` - // clause or the `distinct` function. - // - // In addition, the time column is projected as the Unix epoch. + // This is a special case, where exactly one column can be projected with a `DISTINCT` + // clause or the `distinct` function. + // + // In addition, the time column is projected as the Unix epoch. - let Some(time_column_index) = find_time_column_index(fields) else { + let Some(time_column_index) = find_time_column_index(fields) else { return error::internal("unable to find time column") }; - // Take ownership of the alias, so we don't reallocate, and temporarily place a literal - // `NULL` in its place. - let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else { + // Take ownership of the alias, so we don't reallocate, and temporarily place a literal + // `NULL` in its place. + let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else { return error::internal("time column is not an alias") }; - select_exprs[time_column_index] = lit_timestamp_nano(0).alias(alias); + select_exprs[time_column_index] = lit_timestamp_nano(0).alias(alias); - // Wrap the plan in a `LogicalPlan::Projection` from the select expressions - let plan = project(input, select_exprs)?; + // Wrap the plan in a `LogicalPlan::Projection` from the select expressions + let plan = project(input, select_exprs)?; - return LogicalPlanBuilder::from(plan).distinct()?.build(); + LogicalPlanBuilder::from(plan).distinct()?.build() + } + + fn project_select_aggregate( + &self, + ctx: &Context<'_>, + input: LogicalPlan, + fields: &[Field], + group_by_tag_set: &[&str], + ) -> Result { + let schemas = Schemas::new(input.schema())?; + + // Transform InfluxQL AST field expressions to a list of DataFusion expressions. + let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + + let (plan, select_exprs) = + self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?; + + // Wrap the plan in a `LogicalPlan::Projection` from the select expressions + project(plan, select_exprs) + } + + fn project_select_window( + &self, + ctx: &Context<'_>, + input: LogicalPlan, + fields: &[Field], + group_by_tag_set: &[&str], + ) -> Result { + let schemas = Schemas::new(input.schema())?; + + // Transform InfluxQL AST field expressions to a list of DataFusion expressions. + let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + + let (plan, select_exprs) = + self.select_window(ctx, input, select_exprs, group_by_tag_set)?; + + // Wrap the plan in a `LogicalPlan::Projection` from the select expressions + let plan = project(plan, select_exprs)?; + + // InfluxQL OG physical operators for + + // generate a predicate to filter rows where all field values of the row are `NULL`, + // like: + // + // NOT (field1 IS NULL AND field2 IS NULL AND ...) + match conjunction(fields.iter().filter_map(|f| { + if matches!(f.data_type, Some(InfluxColumnType::Field(_))) { + Some(f.name.as_expr().is_null()) + } else { + None + } + })) { + Some(expr) => LogicalPlanBuilder::from(plan).filter(expr.not())?.build(), + None => Ok(plan), } + } + + fn project_select_window_aggregate( + &self, + ctx: &Context<'_>, + input: LogicalPlan, + fields: &[Field], + group_by_tag_set: &[&str], + ) -> Result { + let schemas = Schemas::new(input.schema())?; + + // Transform InfluxQL AST field expressions to a list of DataFusion expressions. + let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; let (plan, select_exprs) = self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?; @@ -816,29 +890,117 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // Wrap the plan in a `LogicalPlan::Projection` from the select expressions let plan = project(plan, select_exprs)?; - if matches!( - ctx.projection_type, - ProjectionType::WindowAggregate | ProjectionType::Window - ) { - // InfluxQL OG physical operators for + // InfluxQL OG physical operators for - // generate a predicate to filter rows where all field values of the row are `NULL`, - // like: - // - // NOT (field1 IS NULL AND field2 IS NULL AND ...) - match conjunction(fields.iter().filter_map(|f| { - if matches!(f.data_type, Some(InfluxColumnType::Field(_))) { - Some(f.name.as_expr().is_null()) - } else { - None - } - })) { - Some(expr) => LogicalPlanBuilder::from(plan).filter(expr.not())?.build(), - None => Ok(plan), + // generate a predicate to filter rows where all field values of the row are `NULL`, + // like: + // + // NOT (field1 IS NULL AND field2 IS NULL AND ...) + match conjunction(fields.iter().filter_map(|f| { + if matches!(f.data_type, Some(InfluxColumnType::Field(_))) { + Some(f.name.as_expr().is_null()) + } else { + None + } + })) { + Some(expr) => LogicalPlanBuilder::from(plan).filter(expr.not())?.build(), + None => Ok(plan), + } + } + + fn project_select_top_bottom_selector( + &self, + ctx: &Context<'_>, + input: LogicalPlan, + fields: &[Field], + group_by_tag_set: &[&str], + ) -> Result { + let schemas = Schemas::new(input.schema())?; + + let (selector_index, call) = fields + .iter() + .enumerate() + .find_map(|(idx, f)| match &f.expr { + IQLExpr::Call(c) if c.name == "top" || c.name == "bottom" => Some((idx, c.clone())), + _ => None, + }) + .ok_or(error::map::internal( + "ProjectionTopBottomSelector used without top or bottom field", + ))?; + + // Find the selector parameters. + let is_bottom = call.name == "bottom"; + let [field, tag_keys @ .., narg] = call.args.as_slice() else { + return error::internal(format!( + "invalid number of arguments for {}: expected 2 or more, got {}", + call.name, + call.args.len() + )); + }; + let field = if let IQLExpr::VarRef(v) = field { + Field { + expr: IQLExpr::VarRef(v.clone()), + name: v.name.clone().take(), + data_type: None, } } else { - Ok(plan) + return error::internal(format!( + "invalid expression for {} field argument, {field}", + call.name, + )); + }; + let n = if let IQLExpr::Literal(Literal::Integer(v)) = narg { + *v + } else { + return error::internal(format!( + "invalid expression for {} n argument, {narg}", + call.name + )); + }; + + let mut internal_group_by = group_by_tag_set.to_vec(); + let mut fields_vec = fields.to_vec(); + for (i, tag_key) in tag_keys.iter().enumerate() { + if let IQLExpr::VarRef(v) = &tag_key { + fields_vec.insert( + selector_index + i + 1, + Field { + expr: IQLExpr::VarRef(v.clone()), + name: v.name.clone().take(), + data_type: None, + }, + ); + internal_group_by.push(v.name.as_ref()); + } else { + return error::internal(format!( + "invalid expression for {} tag_keys argument, {}", + call.name, &tag_key + )); + } } + + // Transform InfluxQL AST field expressions to a list of DataFusion expressions. + let select_exprs = self.field_list_to_exprs(&input, fields_vec.as_slice(), &schemas)?; + + let plan = if !tag_keys.is_empty() { + self.select_first( + ctx, + input, + &schemas, + &field, + is_bottom, + internal_group_by.as_slice(), + 1, + )? + } else { + input + }; + + let plan = + self.select_first(ctx, plan, &schemas, &field, is_bottom, group_by_tag_set, n)?; + + // Wrap the plan in a `LogicalPlan::Projection` from the select expressions + project(plan, select_exprs) } fn select_aggregate( @@ -849,10 +1011,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> { mut select_exprs: Vec, group_by_tag_set: &[&str], ) -> Result<(LogicalPlan, Vec)> { - if !ctx.is_aggregate() { - return Ok((input, select_exprs)); - } - // Find a list of unique aggregate expressions from the projection. // // For example, a projection such as: @@ -1123,6 +1281,56 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Ok((plan, select_exprs)) } + /// Generate a plan to select the first n rows from each partition in the input data sorted by the requested field. + #[allow(clippy::too_many_arguments)] + fn select_first( + &self, + ctx: &Context<'_>, + input: LogicalPlan, + schemas: &Schemas, + field: &Field, + asc: bool, + group_by_tags: &[&str], + count: i64, + ) -> Result { + let mut group_by = + fields_to_exprs_no_nulls(input.schema(), group_by_tags).collect::>(); + if let Some(i) = ctx.interval { + let stride = lit(ScalarValue::new_interval_mdn(0, 0, i.duration)); + let offset = i.offset.unwrap_or_default(); + + group_by.push(date_bin( + stride, + "time".as_expr(), + lit(ScalarValue::TimestampNanosecond(Some(offset), None)), + )); + } + + let field_sort_expr = self + .field_to_df_expr(field, &input, schemas)? + .sort(asc, false); + + let window_expr = Expr::WindowFunction(WindowFunction::new( + window_function::WindowFunction::BuiltInWindowFunction( + window_function::BuiltInWindowFunction::RowNumber, + ), + Vec::::new(), + group_by, + vec![field_sort_expr, ctx.time_sort_expr()], + WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::Null), + end_bound: WindowFrameBound::CurrentRow, + }, + )); + let column_name = window_expr.display_name()?; + let filter_expr = binary_expr(col(column_name.clone()), Operator::LtEq, lit(count)); + LogicalPlanBuilder::from(input) + .window(vec![window_expr.alias(column_name)])? + .filter(filter_expr)? + .build() + } + /// Transform a UDF to a window expression. fn udf_to_expr( ctx: &Context<'_>, @@ -1380,9 +1588,25 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], schemas: &Schemas, ) -> Result> { + let mut names: HashMap<&str, usize> = HashMap::new(); fields .iter() - .map(|field| self.field_to_df_expr(field, plan, schemas)) + .map(|field| { + let mut new_field = field.clone(); + new_field.name = match names.entry(field.name.as_str()) { + Entry::Vacant(v) => { + v.insert(0); + field.name.clone() + } + Entry::Occupied(mut e) => { + let count = e.get_mut(); + *count += 1; + format!("{}_{}", field.name, *count) + } + }; + new_field + }) + .map(|field| self.field_to_df_expr(&field, plan, schemas)) .collect() } @@ -1729,6 +1953,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Ok(non_negative_derivative(eargs)) } + // The TOP/BOTTOM function is handled as a `ProjectionType::TopBottomSelector` + // query, so the planner only needs to project the single column + // argument. + "top" | "bottom" => self.expr_to_df_expr(scope, &args[0], schemas), + _ => error::query(format!("Invalid function '{name}'")), } } @@ -3799,6 +4028,80 @@ mod test { } } + #[test] + fn test_top() { + assert_snapshot!(plan("SELECT top(usage_idle,10) FROM cpu"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS top [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N] + Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + assert_snapshot!(plan("SELECT top(usage_idle,10),cpu FROM cpu"), @r###" + Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS top, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + assert_snapshot!(plan("SELECT top(usage_idle,10) FROM cpu GROUP BY cpu"), @r###" + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, top:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS top [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, top:Float64;N] + Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + assert_snapshot!(plan("SELECT top(usage_idle,cpu,10) FROM cpu"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS top, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), top:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(1) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle DESC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + } + + #[test] + fn test_bottom() { + assert_snapshot!(plan("SELECT bottom(usage_idle,10) FROM cpu"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS bottom [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N] + Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + assert_snapshot!(plan("SELECT bottom(usage_idle,10),cpu FROM cpu"), @r###" + Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS bottom, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + assert_snapshot!(plan("SELECT bottom(usage_idle,10) FROM cpu GROUP BY cpu"), @r###" + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, bottom:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS bottom [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, bottom:Float64;N] + Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + assert_snapshot!(plan("SELECT bottom(usage_idle,cpu,10) FROM cpu"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS bottom, cpu.cpu AS cpu [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), bottom:Float64;N, cpu:Dictionary(Int32, Utf8);N] + Filter: ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(10) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, ROW_NUMBER() ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + Filter: ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(1) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu.cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, ROW_NUMBER() PARTITION BY [cpu] ORDER BY [cpu.usage_idle AS usage_idle ASC NULLS LAST, time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + } + /// Test InfluxQL-specific behaviour of scalar functions that differ /// from DataFusion #[test] diff --git a/iox_query_influxql/src/plan/udaf.rs b/iox_query_influxql/src/plan/udaf.rs index a1b6eac250..3b96f34a95 100644 --- a/iox_query_influxql/src/plan/udaf.rs +++ b/iox_query_influxql/src/plan/udaf.rs @@ -373,7 +373,7 @@ pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF { } #[derive(Debug)] -pub(super) struct DerivativeAccumulator { +struct DerivativeAccumulator { unit: i64, prev: Option, curr: Option, diff --git a/object_store_metrics/Cargo.toml b/object_store_metrics/Cargo.toml index 9596346f58..b3f2e77c40 100644 --- a/object_store_metrics/Cargo.toml +++ b/object_store_metrics/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true [dependencies] # In alphabetical order -async-trait = "0.1.70" +async-trait = "0.1.71" bytes = "1.4" futures = "0.3" iox_time = { version = "0.1.0", path = "../iox_time" } diff --git a/predicate/src/rpc_predicate/field_rewrite.rs b/predicate/src/rpc_predicate/field_rewrite.rs index eb8de42e98..bcf0299196 100644 --- a/predicate/src/rpc_predicate/field_rewrite.rs +++ b/predicate/src/rpc_predicate/field_rewrite.rs @@ -205,7 +205,12 @@ impl FieldProjectionRewriter { } }); - Ok(predicate.with_field_columns(new_fields).unwrap()) + let predicate = predicate + .with_field_columns(new_fields) + // errors are possible if the field colmns are not supported + .map_err(|e| DataFusionError::NotImplemented(e.to_string()))?; + + Ok(predicate) } } diff --git a/querier/Cargo.toml b/querier/Cargo.toml index 433585cf10..c3ee4a5317 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true [dependencies] arrow = { workspace = true } arrow-flight = { workspace = true } -async-trait = "0.1.70" +async-trait = "0.1.71" backoff = { path = "../backoff" } bytes = "1.4" cache_system = { path = "../cache_system" } diff --git a/query_functions/Cargo.toml b/query_functions/Cargo.toml index d663bff4e8..52581c6748 100644 --- a/query_functions/Cargo.toml +++ b/query_functions/Cargo.toml @@ -11,7 +11,7 @@ chrono = { version = "0.4", default-features = false } datafusion = { workspace = true } once_cell = "1" regex = "1" -regex-syntax = "0.7.1" +regex-syntax = "0.7.3" schema = { path = "../schema" } snafu = "0.7" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/router/Cargo.toml b/router/Cargo.toml index a84c43d764..8327e0628e 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -34,7 +34,7 @@ service_grpc_object_store = { path = "../service_grpc_object_store" } service_grpc_schema = { path = "../service_grpc_schema" } service_grpc_table = { path = "../service_grpc_table" } sharder = { path = "../sharder" } -smallvec = "1.10.0" +smallvec = "1.11.0" thiserror = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } tonic = { workspace = true } diff --git a/service_common/Cargo.toml b/service_common/Cargo.toml index 1c073be488..98be4db0ef 100644 --- a/service_common/Cargo.toml +++ b/service_common/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true license.workspace = true [dependencies] # In alphabetical order -async-trait = "0.1.70" +async-trait = "0.1.71" bytes = "1.4" datafusion = { workspace = true } iox_query = { path = "../iox_query" } diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index c16e6219da..ef0d1c3a45 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -28,6 +28,7 @@ prost = "0.11" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.100" snafu = "0.7" +tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tonic = { workspace = true } workspace-hack = { version = "0.1", path = "../workspace-hack" } @@ -35,4 +36,4 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } assert_matches = "1" async-trait = "0.1" metric = { path = "../metric" } -tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +test_helpers = { path = "../test_helpers" } diff --git a/service_grpc_flight/src/keep_alive.rs b/service_grpc_flight/src/keep_alive.rs new file mode 100644 index 0000000000..e8d1ff0159 --- /dev/null +++ b/service_grpc_flight/src/keep_alive.rs @@ -0,0 +1,383 @@ +//! Keep alive handling for response streaming. +//! +//! # The Problem +//! Under some deployment scenarios, we receive reports of cryptic error messages for certain long-running queries. For +//! example, the InfluxDB IOx CLI will report: +//! +//! ```text +//! Error querying: +//! Tonic( +//! Status { +//! code: Internal, message: "h2 protocol error: error reading a body from connection: stream error received: unexpected internal error encountered", +//! source: Some( +//! hyper::Error( +//! Body, +//! Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) } +//! ) +//! ) +//! } +//! ) +//! ``` +//! +//! And [PyArrow] will report something like: +//! +//! ```text +//! pyarrow._flight.FlightInternalError: +//! Flight returned internal error, with message: +//! Received RST_STREAM with error code 2. gRPC client debug context: +//! UNKNOWN:Error received from peer ipv6:%5B::1%5D:8888 { +//! created_time:"2023-07-03T17:54:56.346363565+02:00", +//! grpc_status:13, +//! grpc_message:"Received RST_STREAM with error code 2" +//! }. +//! Client context: OK +//! ``` +//! +//! `Received RST_STREAM with error code 2` is a good hint. According to [RFC 7540] (the HTTP/2 spec) the error code is +//! (see section 7): +//! +//! > INTERNAL_ERROR (0x2): The endpoint encountered an unexpected internal error. +//! +//! and `RST_STREAM` is (see section 6.4): +//! +//! > The `RST_STREAM` frame (type=0x3) allows for immediate termination of a stream. `RST_STREAM` is sent to request +//! > cancellation of a stream or to indicate that an error condition has occurred. +//! +//! The `grpc_status:13` confirms that -- according to [gRPC Status Codes] this means: +//! +//! > Internal errors. This means that some invariants expected by the underlying system have been broken. This error +//! > code is reserved for serious errors. +//! +//! The issue was replicated using [NGINX] and a hack in InfluxDB that makes streams really slow. +//! +//! The underlying issue is that some middleware or egress component -- e.g. [NGINX] -- terminates the response stream +//! because it thinks it is dead. +//! +//! # The Official Way +//! The [gPRC Keepalive] docs say: +//! +//! > HTTP/2 PING-based keepalives are a way to keep an HTTP/2 connection alive even when there is no data being +//! > transferred. This is done by periodically sending a PING frame to the other end of the connection. +//! +//! The `PING` mechanism is described by [RFC 7540] in section 6.7: +//! +//! > In addition to the frame header, `PING` frames MUST contain 8 octets of opaque data in the payload. ... +//! > +//! > Receivers of a `PING frame that does not include an ACK flag MUST send a `PING` frame with the ACK flag set in +//! > response, with an identical payload. ... +//! +//! So every "ping" has a "pong". However the same section also says: +//! +//! > `PING` frames are not associated with any individual stream. If a `PING` frame is received with a stream +//! > identifier field value other than `0x0`, the recipient MUST respond with a connection error (Section 5.4.1) of +//! > type `PROTOCOL_ERROR`. +//! +//! Now how should an egress proxy deal with this? Because streams may come from multiple upstream servers, they have +//! no way to establish a proper ping-pong end-to-end signaling path per stream. Hence in general it is not feasible to +//! use `PING` as a keep-alive mechanism, contrary to what the [gRPC] spec says. So what DO egress proxies do then? +//! Looking at various egress solutions: +//! +//! - +//! - +//! +//! They all seem to agree that either you set really long timeouts and/or activity-based keep-alive, i.e. they require +//! SOMETHING to be send on that stream. +//! +//! # The Wanted Workaround +//! Since all `PING`-based signalling is broken, we fall back to activity-based keep-alive, i.e. we ensure that we +//! regularly send something in our stream. +//! +//! Our response stream follows the [Apache Flight] defintion. This means that we have a [gRPC] stream with +//! [`FlightData`] messages. Every of these messages has a [`MessageHeader`] describing its content. This is +//! [FlatBuffers] union with the following options: +//! +//! - `None`: This is the implicit default. +//! - `Schema`: Sent before any other data to describe the schema of the stream. +//! - `DictionaryBatch`: Encodes dictionary data. This is not used in practice at the moment because dictionaries are +//! always hydrated. +//! - `RecordBatch`: Content of a `RecordBatch` w/o schema information. +//! - `Tensor`, `SparseTensor`: Irrelevant for us. +//! +//! Ideally we would send a `None` messages with some metdata. However most clients are too broken to accept this and +//! will trip over these messages. E.g. [PyArrow] -- which uses the C++ implementation -- will fail with: +//! +//! ```text +//! OSError: Header-type of flatbuffer-encoded Message is not RecordBatch. +//! ``` +//! +//! # The Actual Workaround +//! So we send actual empty `RecordBatch`es instead. These are encoded as `RecordBatch` messages w/o a schema (see +//! section above). The schema is sent separately right at the start of the stream. The arrow-rs implementation does +//! that for us and also ensures that the schema is adjusted for dictionary hydration. So we just inspect the data +//! stream and wait for that schema (the upstream implementation will always send this without any blocking / wait +//! time / actual `RecordBatch` data). +//! +//! +//! [Apache Flight]: https://arrow.apache.org/docs/format/Flight.html +//! [FlatBuffers]: https://flatbuffers.dev/ +//! [`FlightData`]: https://github.com/apache/arrow/blob/cd1ed18fd1e08912ea47b64edf55be9c046375c4/format/Flight.proto#L401-L429 +//! [gRPC]: https://grpc.io/ +//! [gPRC Keepalive]: https://grpc.io/docs/guides/keepalive/ +//! [gRPC Status Codes]: https://grpc.github.io/grpc/core/md_doc_statuscodes.html +//! [`MessageHeader`]: https://github.com/apache/arrow/blob/cd1ed18fd1e08912ea47b64edf55be9c046375c4/format/Message.fbs#L124-L132 +//! [NGINX]: https://nginx.org/ +//! [PyArrow]: https://arrow.apache.org/docs/python/index.html +//! [RFC 7540]: https://httpwg.org/specs/rfc7540.html + +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use arrow::{ + datatypes::{DataType, Schema, SchemaRef}, + ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}, + record_batch::RecordBatch, +}; +use arrow_flight::{error::FlightError, FlightData}; +use futures::{stream::BoxStream, Stream, StreamExt}; +use observability_deps::tracing::{info, warn}; +use tokio::time::{Interval, MissedTickBehavior}; + +/// Keep alive underlying response stream by sending regular empty [`RecordBatch`]es. +pub struct KeepAliveStream { + inner: BoxStream<'static, Result>, +} + +impl KeepAliveStream { + /// Create new keep-alive wrapper from the underlying stream and the given interval. + /// + /// The interval is measured from the last message -- which can either be a "real" message or a keep-alive. + pub fn new(s: S, interval: Duration) -> Self + where + S: Stream> + Send + 'static, + { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + let state = State { + inner: s.boxed(), + schema: None, + ticker, + }; + + let inner = futures::stream::unfold(state, |mut state| async move { + loop { + tokio::select! { + _ = state.ticker.tick() => { + let Some(data) = build_empty_batch_msg(state.schema.as_ref()) else { + continue; + }; + info!("stream keep-alive"); + return Some((Ok(data), state)); + } + res = state.inner.next() => { + // peek at content to detect schema transmission + if let Some(Ok(data)) = &res { + if let Some(schema) = decode_schema(data) { + if check_schema(&schema) { + state.schema = Some(Arc::new(schema)); + } + } + } + + state.ticker.reset(); + return res.map(|res| (res, state)); + } + } + } + }) + .boxed(); + + Self { inner } + } +} + +impl Stream for KeepAliveStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_unpin(cx) + } +} + +/// Inner state of [`KeepAliveStream`] +struct State { + /// The underlying stream that is kept alive. + inner: BoxStream<'static, Result>, + + /// A [`Schema`] that was already received from the stream. + /// + /// We need this to produce sensible empty [`RecordBatch`]es and because [`RecordBatch`] messages can only come + /// AFTER an encoded [`Schema`]. + schema: Option, + + /// Keep-alive ticker. + ticker: Interval, +} + +/// Decode [`Schema`] from response data stream. +fn decode_schema(data: &FlightData) -> Option { + let message = arrow::ipc::root_as_message(&data.data_header[..]).ok()?; + + if arrow::ipc::MessageHeader::Schema != message.header_type() { + return None; + } + Schema::try_from(data).ok() +} + +/// Check that the [`Schema`] that we've [decoded](decode_schema) is sensible. +/// +/// Returns `true` if the [`Schema`] is OK. Will log a warning and return `false` if there is a problem. +fn check_schema(schema: &Schema) -> bool { + schema.fields().iter().all(|field| match field.data_type() { + DataType::Dictionary(_, _) => { + warn!( + field = field.name(), + "arrow IPC schema still contains dictionary, should have been hydrated by now", + ); + false + } + _ => true, + }) +} + +/// Encode an empty [`RecordBatch`] as a message. +/// +/// This must only be sent AFTER a [`Schema`] was transmitted. +fn build_empty_batch_msg(schema: Option<&SchemaRef>) -> Option { + let Some(schema) = schema else { + warn!( + "cannot send keep-alive because no schema was transmitted yet", + ); + return None; + }; + + let batch = RecordBatch::new_empty(Arc::clone(schema)); + let data_gen = IpcDataGenerator::default(); + let mut dictionary_tracker = DictionaryTracker::new(true); + let write_options = IpcWriteOptions::default(); + let batch_data = match data_gen.encoded_batch(&batch, &mut dictionary_tracker, &write_options) { + Ok((dicts_data, batch_data)) => { + assert!(dicts_data.is_empty()); + batch_data + } + Err(e) => { + warn!( + %e, + "cannot encode empty batch", + ); + return None; + } + }; + + Some(batch_data.into()) +} + +#[cfg(test)] +pub mod test_util { + use std::time::Duration; + + use futures::{stream::BoxStream, Stream, StreamExt}; + + /// Ensure that there is a delay between steam responses. + pub fn make_stream_slow(s: S, delay: Duration) -> BoxStream<'static, S::Item> + where + S: Send + Stream + Unpin + 'static, + { + futures::stream::unfold(s, move |mut s| async move { + tokio::time::sleep(delay).await; + let res = s.next().await; + res.map(|res| (res, s)) + }) + .boxed() + } +} + +#[cfg(test)] +mod tests { + use arrow::{array::Int64Array, datatypes::Field}; + use arrow_flight::{decode::FlightRecordBatchStream, encode::FlightDataEncoderBuilder}; + use datafusion::assert_batches_eq; + use futures::TryStreamExt; + use test_helpers::maybe_start_logging; + + use super::{test_util::make_stream_slow, *}; + + type BatchStream = BoxStream<'static, Result>; + type FlightStream = BoxStream<'static, Result>; + + #[tokio::test] + #[should_panic(expected = "stream timeout")] + async fn test_timeout() { + let s = make_test_stream(false); + let s = FlightRecordBatchStream::new_from_flight_data(s); + s.collect::>().await; + } + + #[tokio::test] + async fn test_keep_alive() { + maybe_start_logging(); + + let s = make_test_stream(true); + let s = FlightRecordBatchStream::new_from_flight_data(s); + let batches: Vec<_> = s.try_collect().await.unwrap(); + assert_batches_eq!( + vec!["+---+", "| f |", "+---+", "| 1 |", "| 2 |", "| 3 |", "| 4 |", "| 5 |", "+---+",], + &batches + ); + } + + /// Creates a stream like the query processing would do. + fn make_query_result_stream() -> (BatchStream, SchemaRef) { + let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int64, false)])); + + let batch_1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let batch_2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![4, 5]))], + ) + .unwrap(); + + let s = futures::stream::iter([batch_1, batch_2]).map(Ok).boxed(); + (s, schema) + } + + /// Convert query result stream (= [`RecordBatch`]es) into a [`FlightData`] stream. + /// + /// This stream will -- as in prod -- send the [`Schema`] data even when there are no [`RecordBatch`]es yet. + fn make_flight_data_stream(s: BatchStream, schema: SchemaRef) -> FlightStream { + FlightDataEncoderBuilder::new() + .with_schema(schema) + .build(s) + .boxed() + } + + fn panic_on_stream_timeout(s: FlightStream, timeout: Duration) -> FlightStream { + futures::stream::unfold(s, move |mut s| async move { + let res = tokio::time::timeout(timeout, s.next()) + .await + .expect("stream timeout"); + res.map(|res| (res, s)) + }) + .boxed() + } + + fn make_test_stream(keep_alive: bool) -> FlightStream { + let (s, schema) = make_query_result_stream(); + let s = make_stream_slow(s, Duration::from_millis(500)); + let s = make_flight_data_stream(s, schema); + let s = if keep_alive { + KeepAliveStream::new(s, Duration::from_millis(100)).boxed() + } else { + s + }; + let s = panic_on_stream_timeout(s, Duration::from_millis(250)); + s + } +} diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index e1fcf5a836..7813f14ff9 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -16,14 +16,16 @@ unused_crate_dependencies )] +use keep_alive::KeepAliveStream; // Workaround for "unused crate" lint false positives. use workspace_hack as _; +mod keep_alive; mod request; use arrow::error::ArrowError; use arrow_flight::{ - encode::{FlightDataEncoder, FlightDataEncoderBuilder}, + encode::FlightDataEncoderBuilder, flight_descriptor::DescriptorType, flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, @@ -44,7 +46,13 @@ use prost::Message; use request::{IoxGetRequest, RunQuery}; use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; use snafu::{OptionExt, ResultExt, Snafu}; -use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; +use std::{ + fmt::Debug, + pin::Pin, + sync::Arc, + task::Poll, + time::{Duration, Instant}, +}; use tonic::{ metadata::{AsciiMetadataValue, MetadataMap}, Request, Response, Streaming, @@ -65,6 +73,9 @@ const IOX_FLIGHT_SQL_DATABASE_HEADERS: [&str; 4] = [ "iox-namespace-name", // deprecated ]; +/// In which interval should the `DoGet` stream send empty messages as keep alive markers? +const DO_GET_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5); + #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { @@ -883,7 +894,7 @@ fn has_debug_header(metadata: &MetadataMap) -> bool { /// Wrapper over a FlightDataEncodeStream that adds IOx specfic /// metadata and records completion struct GetStream { - inner: FlightDataEncoder, + inner: KeepAliveStream, #[allow(dead_code)] permit: InstrumentedAsyncOwnedSemaphorePermit, query_completed_token: QueryCompletedToken, @@ -919,6 +930,9 @@ impl GetStream { .with_metadata(app_metadata.encode_to_vec().into()) .build(query_results); + // add keep alive + let inner = KeepAliveStream::new(inner, DO_GET_KEEP_ALIVE_INTERVAL); + Ok(Self { inner, permit, @@ -958,7 +972,6 @@ impl Stream for GetStream { } } } - #[cfg(test)] mod tests { use arrow_flight::sql::ProstMessageExt; diff --git a/service_grpc_influxrpc/Cargo.toml b/service_grpc_influxrpc/Cargo.toml index 2ddb6674d0..16af170a1a 100644 --- a/service_grpc_influxrpc/Cargo.toml +++ b/service_grpc_influxrpc/Cargo.toml @@ -28,7 +28,7 @@ arrow = { workspace = true, features = ["prettyprint"] } futures = "0.3" pin-project = "1.1" prost = "0.11" -regex = "1.8.4" +regex = "1.9.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.100" snafu = "0.7" diff --git a/test_helpers/Cargo.toml b/test_helpers/Cargo.toml index fba587062c..00beda833f 100644 --- a/test_helpers/Cargo.toml +++ b/test_helpers/Cargo.toml @@ -13,7 +13,7 @@ tracing-log = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } observability_deps = { path = "../observability_deps" } workspace-hack = { version = "0.1", path = "../workspace-hack" } -async-trait = { version = "0.1.70", optional = true } +async-trait = { version = "0.1.71", optional = true } tokio = { version = "1.29.1", optional = true, default_features = false, features = ["time"] } [features] diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index 8e8b40d794..77aeebb193 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -28,7 +28,7 @@ once_cell = { version = "1.18", features = ["parking_lot"] } parking_lot = "0.12" prost = "0.11" rand = "0.8.3" -regex = "1.8" +regex = "1.9" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } snafu = "0.7" sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index c08e3be728..018a55068e 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -69,6 +69,7 @@ prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } +regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.7" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } ring = { version = "0.16", features = ["std"] } @@ -138,6 +139,7 @@ prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } +regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.7" } ring = { version = "0.16", features = ["std"] } serde = { version = "1", features = ["derive", "rc"] }