feat: add DataFusion regex match operator

This commit adds a new custom UDF to IOx that provide a regex operator to Datafusion plans.
Effectively it allows predicates to contain regex operators that are applied as filters, only allowing rows that satisfy the regex to be returned.

I did not use the Arrow regex kernel for this work because that does not return a boolean array indicating which rows matched a regex, but instead returns a new string array of results. This doesn't work well with DF's approach to filtering.
pull/24376/head
Edd Robinson 2021-05-06 10:13:59 +01:00
parent b5ea71f45f
commit 3fc2c9fc04
5 changed files with 389 additions and 100 deletions

248
Cargo.lock generated
View File

@ -140,7 +140,7 @@ dependencies = [
"arrow 4.0.0-SNAPSHOT",
"bytes",
"futures",
"proc-macro2",
"proc-macro2 1.0.24",
"prost",
"prost-derive",
"tokio",
@ -204,9 +204,9 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db134ba52475c060f3329a8ef0f8786d6b872ed01515d4b79c162e5798da1340"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -215,9 +215,9 @@ version = "0.1.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -346,8 +346,8 @@ dependencies = [
"lazycell",
"log",
"peeking_take_while",
"proc-macro2",
"quote",
"proc-macro2 1.0.24",
"quote 1.0.9",
"regex",
"rustc-hash",
"shlex",
@ -780,7 +780,7 @@ dependencies = [
"regex",
"serde",
"serde_regex",
"snafu",
"snafu 0.6.10",
"test_helpers",
]
@ -974,9 +974,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05"
dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -995,9 +995,9 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
"synstructure",
]
@ -1141,9 +1141,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -1478,7 +1478,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"snafu",
"snafu 0.6.10",
"test_helpers",
"tokio",
"url",
@ -1542,7 +1542,7 @@ dependencies = [
"serde_json",
"serde_urlencoded 0.7.0",
"server",
"snafu",
"snafu 0.6.10",
"structopt",
"tempfile",
"test_helpers",
@ -1586,7 +1586,7 @@ dependencies = [
"nom",
"observability_deps",
"smallvec",
"snafu",
"snafu 0.6.10",
"test_helpers",
]
@ -1599,7 +1599,7 @@ dependencies = [
"integer-encoding",
"observability_deps",
"rand 0.8.3",
"snafu",
"snafu 0.6.10",
"snap",
"test_helpers",
]
@ -1944,9 +1944,9 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a3e2bde382ebf960c1f3e79689fa5941625fe9bf694a1cb64af3e85faff3af"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -1967,7 +1967,7 @@ dependencies = [
"observability_deps",
"parking_lot",
"rand 0.8.3",
"snafu",
"snafu 0.6.10",
"test_helpers",
"tokio",
"tracker",
@ -2179,7 +2179,7 @@ dependencies = [
"rusoto_core",
"rusoto_credential",
"rusoto_s3",
"snafu",
"snafu 0.6.10",
"tempfile",
"tokio",
"tokio-util",
@ -2346,9 +2346,9 @@ checksum = "129943a960e6a08c7e70ca5a09f113c273fe7f10ae8420992c78293e3dffdf65"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -2362,7 +2362,7 @@ dependencies = [
"observability_deps",
"parquet 0.1.0",
"rand 0.8.3",
"snafu",
"snafu 0.6.10",
"test_helpers",
]
@ -2454,7 +2454,7 @@ dependencies = [
"prost",
"prost-types",
"query",
"snafu",
"snafu 0.6.10",
"thrift",
"tokio",
"tokio-stream",
@ -2550,9 +2550,9 @@ version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -2561,9 +2561,9 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -2668,9 +2668,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
"version_check",
]
@ -2680,8 +2680,8 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"proc-macro2 1.0.24",
"quote 1.0.9",
"version_check",
]
@ -2697,13 +2697,22 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759"
dependencies = [
"unicode-xid 0.1.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
dependencies = [
"unicode-xid",
"unicode-xid 0.2.1",
]
[[package]]
@ -2757,9 +2766,9 @@ checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4"
dependencies = [
"anyhow",
"itertools 0.9.0",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -2797,7 +2806,8 @@ dependencies = [
"metrics",
"observability_deps",
"parking_lot",
"snafu",
"regex",
"snafu 0.4.4",
"sqlparser 0.8.0",
"test_helpers",
"tokio",
@ -2810,13 +2820,22 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1"
dependencies = [
"proc-macro2 0.4.30",
]
[[package]]
name = "quote"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7"
dependencies = [
"proc-macro2",
"proc-macro2 1.0.24",
]
[[package]]
@ -2964,7 +2983,7 @@ dependencies = [
"permutation",
"rand 0.8.3",
"rand_distr",
"snafu",
"snafu 0.6.10",
"test_helpers",
"tracker",
]
@ -3386,9 +3405,9 @@ version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -3474,7 +3493,7 @@ dependencies = [
"read_buffer",
"serde",
"serde_json",
"snafu",
"snafu 0.6.10",
"snap",
"tempfile",
"test_helpers",
@ -3554,6 +3573,17 @@ dependencies = [
"serde",
]
[[package]]
name = "snafu"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b028158eb06caa8345bee10cccfb25fa632beccf0ef5308832b4fd4b78a7db48"
dependencies = [
"backtrace",
"doc-comment",
"snafu-derive 0.4.4",
]
[[package]]
name = "snafu"
version = "0.6.10"
@ -3563,7 +3593,18 @@ dependencies = [
"doc-comment",
"futures-core",
"pin-project 0.4.28",
"snafu-derive",
"snafu-derive 0.6.10",
]
[[package]]
name = "snafu-derive"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf50aaef500c248a590e2696e8bf8c7620ca2235b9bb90a70363d82dd1abec6a"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"syn 0.15.44",
]
[[package]]
@ -3572,9 +3613,9 @@ version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -3658,11 +3699,11 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef"
dependencies = [
"proc-macro2",
"quote",
"proc-macro2 1.0.24",
"quote 1.0.9",
"serde",
"serde_derive",
"syn",
"syn 1.0.67",
]
[[package]]
@ -3672,13 +3713,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11"
dependencies = [
"base-x",
"proc-macro2",
"quote",
"proc-macro2 1.0.24",
"quote 1.0.9",
"serde",
"serde_derive",
"serde_json",
"sha1",
"syn",
"syn 1.0.67",
]
[[package]]
@ -3712,9 +3753,9 @@ checksum = "5ba9cdfda491b814720b6b06e0cac513d922fc407582032e8706e9f137976f90"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -3723,15 +3764,26 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]]
name = "syn"
version = "0.15.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"unicode-xid 0.1.0",
]
[[package]]
name = "syn"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6498a9efc342871f91cc2d0d694c674368b4ceb40f62b65a7a08c3792935e702"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"proc-macro2 1.0.24",
"quote 1.0.9",
"unicode-xid 0.2.1",
]
[[package]]
@ -3740,10 +3792,10 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
"unicode-xid 0.2.1",
]
[[package]]
@ -3814,9 +3866,9 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -3924,10 +3976,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5c3be1edfad6027c69f5491cf4cb310d1a71ecd6af742788c6ff8bced86b8fa"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"proc-macro2 1.0.24",
"quote 1.0.9",
"standback",
"syn",
"syn 1.0.67",
]
[[package]]
@ -3981,9 +4033,9 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -4067,10 +4119,10 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c695de27302f4697191dda1c7178131a8cb805463dda02864acb80fe1322fdcf"
dependencies = [
"proc-macro2",
"proc-macro2 1.0.24",
"prost-build",
"quote",
"syn",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -4154,9 +4206,9 @@ version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
]
[[package]]
@ -4406,9 +4458,9 @@ dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
"wasm-bindgen-shared",
]
@ -4430,7 +4482,7 @@ version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e734d91443f177bfdb41969de821e15c516931c3c3db3d318fa1b68975d0f6f"
dependencies = [
"quote",
"quote 1.0.9",
"wasm-bindgen-macro-support",
]
@ -4440,9 +4492,9 @@ version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53739ff08c8a68b0fdbcd54c372b8ab800b1449ab3c9d706503bc7dd1621b2c"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.67",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -4554,7 +4606,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"snafu",
"snafu 0.6.10",
"snap",
"test_helpers",
"tokio",

View File

@ -27,7 +27,8 @@ influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
metrics = { path = "../metrics" }
parking_lot = "0.11.1"
snafu = "0.6.2"
regex = "1"
snafu = "0.4.3"
sqlparser = "0.8.0"
tokio = { version = "1.0", features = ["macros"] }
tokio-stream = "0.1.2"

View File

@ -1,3 +1,4 @@
//! Special IOx functions used in DataFusion plans
pub mod regex;
pub mod selectors;
pub mod window;

235
query/src/func/regex.rs Normal file
View File

@ -0,0 +1,235 @@
use std::sync::Arc;
use arrow_deps::{
arrow::{
array::{ArrayRef, BooleanArray, StringArray},
datatypes::DataType,
},
datafusion::{
error::DataFusionError,
logical_plan::{create_udf, Expr},
physical_plan::functions::make_scalar_function,
},
};
/// The name of the regex_match UDF given to DataFusion.
pub const REGEX_MATCH_UDF_NAME: &str = "RegexMatch";
pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexMatch";
/// Given a column containing string values and a single regex pattern,
/// `regex_match_expr` determines which values satisfy the pattern and which do
/// not.
///
/// If `matches` is true then this expression will filter values that do not
/// satisfy the regex (equivalent to `col ~= /pattern/`). If `matches` is `false`
/// then the expression will filter values that *do* match the regex, which is
/// equivalent to `col !~ /pattern/`.
///
/// This UDF is designed to support the regex operator that can be pushed down
/// via the InfluxRPC API.
///
pub(crate) fn regex_match_expr(
input: Expr,
pattern: String,
matches: bool,
) -> arrow_deps::datafusion::logical_plan::Expr {
// N.B., this function does not utilise the Arrow regexp compute kernel because
// in order to act as a filter it needs to return a boolean array of comparison
// results, not an array of strings as the regex compute kernel does.
let func = move |args: &[ArrayRef]| {
assert_eq!(args.len(), 1); // only works over a single column at a time.
let input_arr = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
let pattern = regex::Regex::new(&pattern).map_err(|e| {
DataFusionError::Internal(format!("error compiling regex pattern: {}", e))
})?;
let results = input_arr
.iter()
.map(|row| {
match row {
// in arrow, any value can be null.
// Here we decide to make our UDF to return null when either base or exponent is null.
Some(v) => Some(pattern.is_match(v) == matches),
None => None,
}
})
.collect::<BooleanArray>();
Ok(Arc::new(results) as ArrayRef)
};
// make_scalar_function is a helper to support accepting scalar values as
// well as arrays.
let func = make_scalar_function(func);
let udf_name = if matches {
REGEX_MATCH_UDF_NAME
} else {
REGEX_NOT_MATCH_UDF_NAME
};
let udf = create_udf(
udf_name,
vec![DataType::Utf8],
// Arc::new(DataType::Utf8),
Arc::new(DataType::Boolean),
func,
);
udf.call(vec![input])
}
#[cfg(test)]
mod test {
use arrow_deps::{
arrow::{
array::{StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
},
datafusion::{
datasource::MemTable,
error::DataFusionError,
logical_plan::{col, Expr},
prelude::ExecutionContext,
},
};
use std::iter::FromIterator;
use std::sync::Arc;
#[tokio::test]
async fn regex_match_expr() {
let cases = vec![
(
".*", // match everything except NULL values
true, // keep the values matched
vec![
"+---------------+--------+",
"| words | length |",
"+---------------+--------+",
"| air | 3 |",
"| aphex twin | 10 |",
"| bruce | 5 |",
"| Blood Orange | 12 |",
"| cocteau twins | 13 |",
"+---------------+--------+",
"",
],
),
(
".*", // match everything except NULL values
false, // filter away all the values matched
vec!["++", "++", ""],
),
(
"", // an empty pattern also matches everything except NULL
true,
vec![
"+---------------+--------+",
"| words | length |",
"+---------------+--------+",
"| air | 3 |",
"| aphex twin | 10 |",
"| bruce | 5 |",
"| Blood Orange | 12 |",
"| cocteau twins | 13 |",
"+---------------+--------+",
"",
],
),
(
".+O.*", // match just words containing "O".
true,
vec![
"+--------------+--------+",
"| words | length |",
"+--------------+--------+",
"| Blood Orange | 12 |",
"+--------------+--------+",
"",
],
),
(
"^(a|b).*", // match everything beginning with "a" or "b"
false, // negate expression and filter away anything that matches
vec![
"+---------------+--------+",
"| words | length |",
"+---------------+--------+",
"| Blood Orange | 12 |",
"| cocteau twins | 13 |",
"+---------------+--------+",
"",
],
),
];
for (pattern, matches, expected) in cases.into_iter() {
let regex_expr = super::regex_match_expr(col("words"), pattern.to_string(), matches);
let actual = run_plan(regex_expr).await.unwrap();
assert_eq!(
expected, actual,
"\n\nEXPECTED:\n{:#?}\nACTUAL:\n{:#?}\n",
expected, actual
);
}
}
#[tokio::test]
async fn regex_match_expr_invalid_regex() {
// an invalid regex pattern
let regex_expr = super::regex_match_expr(col("words"), "[".to_string(), true);
let actual = run_plan(regex_expr).await.expect_err("expected error");
assert!(actual.to_string().contains("error compiling regex pattern"))
}
// Run a plan against the following input table as "t"
async fn run_plan(op: Expr) -> Result<Vec<String>, DataFusionError> {
let schema = Arc::new(Schema::new(vec![
Field::new("words", DataType::Utf8, true),
Field::new("length", DataType::UInt64, false),
]));
// define data for table
let words = vec![
Some("air"),
Some("aphex twin"),
Some("bruce"),
Some("Blood Orange"),
None,
None,
Some("cocteau twins"),
];
let rb = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(words.clone())),
Arc::new(UInt64Array::from_iter(
words.iter().map(|word| word.map(|word| word.len() as u64)),
)),
],
)
.unwrap();
let provider = MemTable::try_new(Arc::clone(&schema), vec![vec![rb]]).unwrap();
let mut ctx = ExecutionContext::new();
ctx.register_table("t", Arc::new(provider)).unwrap();
let df = ctx.table("t").unwrap();
let df = df.filter(op).unwrap();
// execute the query
let record_batches = df.collect().await?;
Ok(pretty_format_batches(&record_batches)
.unwrap()
.split('\n')
.map(|s| s.to_owned())
.collect())
}
}

View File

@ -222,7 +222,7 @@ where
)
}
/// Structure that implements the Accumultator trait for DataFusion
/// Structure that implements the Accumulator trait for DataFusion
/// and processes (value, timestamp) pair and computes values
#[derive(Debug)]
struct SelectorAccumulator<SELECTOR>