From 3fc2c9fc04228a1e6c4eb2b1a92d49e5e4527c31 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 6 May 2021 10:13:59 +0100 Subject: [PATCH] feat: add DataFusion regex match operator This commit adds a new custom UDF to IOx that provide a regex operator to Datafusion plans. Effectively it allows predicates to contain regex operators that are applied as filters, only allowing rows that satisfy the regex to be returned. I did not use the Arrow regex kernel for this work because that does not return a boolean array indicating which rows matched a regex, but instead returns a new string array of results. This doesn't work well with DF's approach to filtering. --- Cargo.lock | 248 ++++++++++++++++++++++-------------- query/Cargo.toml | 3 +- query/src/func.rs | 1 + query/src/func/regex.rs | 235 ++++++++++++++++++++++++++++++++++ query/src/func/selectors.rs | 2 +- 5 files changed, 389 insertions(+), 100 deletions(-) create mode 100644 query/src/func/regex.rs diff --git a/Cargo.lock b/Cargo.lock index 286b5742ee..ea95d4fa69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,7 +140,7 @@ dependencies = [ "arrow 4.0.0-SNAPSHOT", "bytes", "futures", - "proc-macro2", + "proc-macro2 1.0.24", "prost", "prost-derive", "tokio", @@ -204,9 +204,9 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db134ba52475c060f3329a8ef0f8786d6b872ed01515d4b79c162e5798da1340" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -215,9 +215,9 @@ version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -346,8 +346,8 @@ dependencies = [ "lazycell", "log", "peeking_take_while", - "proc-macro2", - "quote", + "proc-macro2 1.0.24", + "quote 1.0.9", "regex", "rustc-hash", "shlex", @@ -780,7 +780,7 @@ dependencies = [ "regex", "serde", "serde_regex", - "snafu", + "snafu 0.6.10", "test_helpers", ] @@ -974,9 +974,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" dependencies = [ "proc-macro-error", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -995,9 +995,9 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", "synstructure", ] @@ -1141,9 +1141,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b" dependencies = [ "proc-macro-hack", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -1478,7 +1478,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "snafu", + "snafu 0.6.10", "test_helpers", "tokio", "url", @@ -1542,7 +1542,7 @@ dependencies = [ "serde_json", "serde_urlencoded 0.7.0", "server", - "snafu", + "snafu 0.6.10", "structopt", "tempfile", "test_helpers", @@ -1586,7 +1586,7 @@ dependencies = [ "nom", "observability_deps", "smallvec", - "snafu", + "snafu 0.6.10", "test_helpers", ] @@ -1599,7 +1599,7 @@ dependencies = [ "integer-encoding", "observability_deps", "rand 0.8.3", - "snafu", + "snafu 0.6.10", "snap", "test_helpers", ] @@ -1944,9 +1944,9 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8a3e2bde382ebf960c1f3e79689fa5941625fe9bf694a1cb64af3e85faff3af" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -1967,7 +1967,7 @@ dependencies = [ "observability_deps", "parking_lot", "rand 0.8.3", - "snafu", + "snafu 0.6.10", "test_helpers", "tokio", "tracker", @@ -2179,7 +2179,7 @@ dependencies = [ "rusoto_core", "rusoto_credential", "rusoto_s3", - "snafu", + "snafu 0.6.10", "tempfile", "tokio", "tokio-util", @@ -2346,9 +2346,9 @@ checksum = "129943a960e6a08c7e70ca5a09f113c273fe7f10ae8420992c78293e3dffdf65" dependencies = [ "Inflector", "proc-macro-error", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -2362,7 +2362,7 @@ dependencies = [ "observability_deps", "parquet 0.1.0", "rand 0.8.3", - "snafu", + "snafu 0.6.10", "test_helpers", ] @@ -2454,7 +2454,7 @@ dependencies = [ "prost", "prost-types", "query", - "snafu", + "snafu 0.6.10", "thrift", "tokio", "tokio-stream", @@ -2550,9 +2550,9 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -2561,9 +2561,9 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -2668,9 +2668,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", "version_check", ] @@ -2680,8 +2680,8 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ - "proc-macro2", - "quote", + "proc-macro2 1.0.24", + "quote 1.0.9", "version_check", ] @@ -2697,13 +2697,22 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" +[[package]] +name = "proc-macro2" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" +dependencies = [ + "unicode-xid 0.1.0", +] + [[package]] name = "proc-macro2" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" dependencies = [ - "unicode-xid", + "unicode-xid 0.2.1", ] [[package]] @@ -2757,9 +2766,9 @@ checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" dependencies = [ "anyhow", "itertools 0.9.0", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -2797,7 +2806,8 @@ dependencies = [ "metrics", "observability_deps", "parking_lot", - "snafu", + "regex", + "snafu 0.4.4", "sqlparser 0.8.0", "test_helpers", "tokio", @@ -2810,13 +2820,22 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quote" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1" +dependencies = [ + "proc-macro2 0.4.30", +] + [[package]] name = "quote" version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" dependencies = [ - "proc-macro2", + "proc-macro2 1.0.24", ] [[package]] @@ -2964,7 +2983,7 @@ dependencies = [ "permutation", "rand 0.8.3", "rand_distr", - "snafu", + "snafu 0.6.10", "test_helpers", "tracker", ] @@ -3386,9 +3405,9 @@ version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -3474,7 +3493,7 @@ dependencies = [ "read_buffer", "serde", "serde_json", - "snafu", + "snafu 0.6.10", "snap", "tempfile", "test_helpers", @@ -3554,6 +3573,17 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b028158eb06caa8345bee10cccfb25fa632beccf0ef5308832b4fd4b78a7db48" +dependencies = [ + "backtrace", + "doc-comment", + "snafu-derive 0.4.4", +] + [[package]] name = "snafu" version = "0.6.10" @@ -3563,7 +3593,18 @@ dependencies = [ "doc-comment", "futures-core", "pin-project 0.4.28", - "snafu-derive", + "snafu-derive 0.6.10", +] + +[[package]] +name = "snafu-derive" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf50aaef500c248a590e2696e8bf8c7620ca2235b9bb90a70363d82dd1abec6a" +dependencies = [ + "proc-macro2 0.4.30", + "quote 0.6.13", + "syn 0.15.44", ] [[package]] @@ -3572,9 +3613,9 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -3658,11 +3699,11 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef" dependencies = [ - "proc-macro2", - "quote", + "proc-macro2 1.0.24", + "quote 1.0.9", "serde", "serde_derive", - "syn", + "syn 1.0.67", ] [[package]] @@ -3672,13 +3713,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11" dependencies = [ "base-x", - "proc-macro2", - "quote", + "proc-macro2 1.0.24", + "quote 1.0.9", "serde", "serde_derive", "serde_json", "sha1", - "syn", + "syn 1.0.67", ] [[package]] @@ -3712,9 +3753,9 @@ checksum = "5ba9cdfda491b814720b6b06e0cac513d922fc407582032e8706e9f137976f90" dependencies = [ "heck", "proc-macro-error", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -3723,15 +3764,26 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" +[[package]] +name = "syn" +version = "0.15.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5" +dependencies = [ + "proc-macro2 0.4.30", + "quote 0.6.13", + "unicode-xid 0.1.0", +] + [[package]] name = "syn" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6498a9efc342871f91cc2d0d694c674368b4ceb40f62b65a7a08c3792935e702" dependencies = [ - "proc-macro2", - "quote", - "unicode-xid", + "proc-macro2 1.0.24", + "quote 1.0.9", + "unicode-xid 0.2.1", ] [[package]] @@ -3740,10 +3792,10 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" dependencies = [ - "proc-macro2", - "quote", - "syn", - "unicode-xid", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", + "unicode-xid 0.2.1", ] [[package]] @@ -3814,9 +3866,9 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -3924,10 +3976,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5c3be1edfad6027c69f5491cf4cb310d1a71ecd6af742788c6ff8bced86b8fa" dependencies = [ "proc-macro-hack", - "proc-macro2", - "quote", + "proc-macro2 1.0.24", + "quote 1.0.9", "standback", - "syn", + "syn 1.0.67", ] [[package]] @@ -3981,9 +4033,9 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -4067,10 +4119,10 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c695de27302f4697191dda1c7178131a8cb805463dda02864acb80fe1322fdcf" dependencies = [ - "proc-macro2", + "proc-macro2 1.0.24", "prost-build", - "quote", - "syn", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -4154,9 +4206,9 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", ] [[package]] @@ -4406,9 +4458,9 @@ dependencies = [ "bumpalo", "lazy_static", "log", - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", "wasm-bindgen-shared", ] @@ -4430,7 +4482,7 @@ version = "0.2.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e734d91443f177bfdb41969de821e15c516931c3c3db3d318fa1b68975d0f6f" dependencies = [ - "quote", + "quote 1.0.9", "wasm-bindgen-macro-support", ] @@ -4440,9 +4492,9 @@ version = "0.2.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53739ff08c8a68b0fdbcd54c372b8ab800b1449ab3c9d706503bc7dd1621b2c" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.67", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4554,7 +4606,7 @@ dependencies = [ "regex", "serde", "serde_json", - "snafu", + "snafu 0.6.10", "snap", "test_helpers", "tokio", diff --git a/query/Cargo.toml b/query/Cargo.toml index 34d727022c..2920249d06 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -27,7 +27,8 @@ influxdb_line_protocol = { path = "../influxdb_line_protocol" } internal_types = { path = "../internal_types" } metrics = { path = "../metrics" } parking_lot = "0.11.1" -snafu = "0.6.2" +regex = "1" +snafu = "0.4.3" sqlparser = "0.8.0" tokio = { version = "1.0", features = ["macros"] } tokio-stream = "0.1.2" diff --git a/query/src/func.rs b/query/src/func.rs index 74ee046351..a774a88616 100644 --- a/query/src/func.rs +++ b/query/src/func.rs @@ -1,3 +1,4 @@ //! Special IOx functions used in DataFusion plans +pub mod regex; pub mod selectors; pub mod window; diff --git a/query/src/func/regex.rs b/query/src/func/regex.rs new file mode 100644 index 0000000000..99f9637bd1 --- /dev/null +++ b/query/src/func/regex.rs @@ -0,0 +1,235 @@ +use std::sync::Arc; + +use arrow_deps::{ + arrow::{ + array::{ArrayRef, BooleanArray, StringArray}, + datatypes::DataType, + }, + datafusion::{ + error::DataFusionError, + logical_plan::{create_udf, Expr}, + physical_plan::functions::make_scalar_function, + }, +}; + +/// The name of the regex_match UDF given to DataFusion. +pub const REGEX_MATCH_UDF_NAME: &str = "RegexMatch"; +pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexMatch"; + +/// Given a column containing string values and a single regex pattern, +/// `regex_match_expr` determines which values satisfy the pattern and which do +/// not. +/// +/// If `matches` is true then this expression will filter values that do not +/// satisfy the regex (equivalent to `col ~= /pattern/`). If `matches` is `false` +/// then the expression will filter values that *do* match the regex, which is +/// equivalent to `col !~ /pattern/`. +/// +/// This UDF is designed to support the regex operator that can be pushed down +/// via the InfluxRPC API. +/// +pub(crate) fn regex_match_expr( + input: Expr, + pattern: String, + matches: bool, +) -> arrow_deps::datafusion::logical_plan::Expr { + // N.B., this function does not utilise the Arrow regexp compute kernel because + // in order to act as a filter it needs to return a boolean array of comparison + // results, not an array of strings as the regex compute kernel does. + let func = move |args: &[ArrayRef]| { + assert_eq!(args.len(), 1); // only works over a single column at a time. + + let input_arr = &args[0].as_any().downcast_ref::().unwrap(); + + let pattern = regex::Regex::new(&pattern).map_err(|e| { + DataFusionError::Internal(format!("error compiling regex pattern: {}", e)) + })?; + + let results = input_arr + .iter() + .map(|row| { + match row { + // in arrow, any value can be null. + // Here we decide to make our UDF to return null when either base or exponent is null. + Some(v) => Some(pattern.is_match(v) == matches), + None => None, + } + }) + .collect::(); + + Ok(Arc::new(results) as ArrayRef) + }; + + // make_scalar_function is a helper to support accepting scalar values as + // well as arrays. + let func = make_scalar_function(func); + + let udf_name = if matches { + REGEX_MATCH_UDF_NAME + } else { + REGEX_NOT_MATCH_UDF_NAME + }; + + let udf = create_udf( + udf_name, + vec![DataType::Utf8], + // Arc::new(DataType::Utf8), + Arc::new(DataType::Boolean), + func, + ); + + udf.call(vec![input]) +} + +#[cfg(test)] +mod test { + use arrow_deps::{ + arrow::{ + array::{StringArray, UInt64Array}, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + util::pretty::pretty_format_batches, + }, + datafusion::{ + datasource::MemTable, + error::DataFusionError, + logical_plan::{col, Expr}, + prelude::ExecutionContext, + }, + }; + use std::iter::FromIterator; + use std::sync::Arc; + + #[tokio::test] + async fn regex_match_expr() { + let cases = vec![ + ( + ".*", // match everything except NULL values + true, // keep the values matched + vec![ + "+---------------+--------+", + "| words | length |", + "+---------------+--------+", + "| air | 3 |", + "| aphex twin | 10 |", + "| bruce | 5 |", + "| Blood Orange | 12 |", + "| cocteau twins | 13 |", + "+---------------+--------+", + "", + ], + ), + ( + ".*", // match everything except NULL values + false, // filter away all the values matched + vec!["++", "++", ""], + ), + ( + "", // an empty pattern also matches everything except NULL + true, + vec![ + "+---------------+--------+", + "| words | length |", + "+---------------+--------+", + "| air | 3 |", + "| aphex twin | 10 |", + "| bruce | 5 |", + "| Blood Orange | 12 |", + "| cocteau twins | 13 |", + "+---------------+--------+", + "", + ], + ), + ( + ".+O.*", // match just words containing "O". + true, + vec![ + "+--------------+--------+", + "| words | length |", + "+--------------+--------+", + "| Blood Orange | 12 |", + "+--------------+--------+", + "", + ], + ), + ( + "^(a|b).*", // match everything beginning with "a" or "b" + false, // negate expression and filter away anything that matches + vec![ + "+---------------+--------+", + "| words | length |", + "+---------------+--------+", + "| Blood Orange | 12 |", + "| cocteau twins | 13 |", + "+---------------+--------+", + "", + ], + ), + ]; + + for (pattern, matches, expected) in cases.into_iter() { + let regex_expr = super::regex_match_expr(col("words"), pattern.to_string(), matches); + let actual = run_plan(regex_expr).await.unwrap(); + + assert_eq!( + expected, actual, + "\n\nEXPECTED:\n{:#?}\nACTUAL:\n{:#?}\n", + expected, actual + ); + } + } + + #[tokio::test] + async fn regex_match_expr_invalid_regex() { + // an invalid regex pattern + let regex_expr = super::regex_match_expr(col("words"), "[".to_string(), true); + + let actual = run_plan(regex_expr).await.expect_err("expected error"); + assert!(actual.to_string().contains("error compiling regex pattern")) + } + + // Run a plan against the following input table as "t" + async fn run_plan(op: Expr) -> Result, DataFusionError> { + let schema = Arc::new(Schema::new(vec![ + Field::new("words", DataType::Utf8, true), + Field::new("length", DataType::UInt64, false), + ])); + + // define data for table + let words = vec![ + Some("air"), + Some("aphex twin"), + Some("bruce"), + Some("Blood Orange"), + None, + None, + Some("cocteau twins"), + ]; + let rb = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(words.clone())), + Arc::new(UInt64Array::from_iter( + words.iter().map(|word| word.map(|word| word.len() as u64)), + )), + ], + ) + .unwrap(); + + let provider = MemTable::try_new(Arc::clone(&schema), vec![vec![rb]]).unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + let df = ctx.table("t").unwrap(); + let df = df.filter(op).unwrap(); + + // execute the query + let record_batches = df.collect().await?; + + Ok(pretty_format_batches(&record_batches) + .unwrap() + .split('\n') + .map(|s| s.to_owned()) + .collect()) + } +} diff --git a/query/src/func/selectors.rs b/query/src/func/selectors.rs index 42a0bb3ff1..fd8716a4f2 100644 --- a/query/src/func/selectors.rs +++ b/query/src/func/selectors.rs @@ -222,7 +222,7 @@ where ) } -/// Structure that implements the Accumultator trait for DataFusion +/// Structure that implements the Accumulator trait for DataFusion /// and processes (value, timestamp) pair and computes values #[derive(Debug)] struct SelectorAccumulator