diff --git a/server/src/db.rs b/server/src/db.rs index d4e4a326d5..b5fa699b15 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -3,7 +3,6 @@ use std::{ collections::BTreeMap, - convert::TryFrom, sync::{ atomic::{AtomicU64, Ordering}, Arc, Mutex, @@ -13,7 +12,7 @@ use std::{ use async_trait::async_trait; use data_types::{data::ReplicatedWrite, database_rules::DatabaseRules}; use mutable_buffer::MutableBufferDb; -use query::{predicate::Predicate, Database, PartitionChunk}; +use query::{Database, PartitionChunk}; use read_buffer::Database as ReadBufferDb; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -22,6 +21,7 @@ use crate::buffer::Buffer; mod chunk; use chunk::DBChunk; +pub mod pred; #[derive(Debug, Snafu)] pub enum Error { @@ -50,9 +50,6 @@ pub enum Error { MutableBufferWrite { source: mutable_buffer::database::Error, }, - - #[snafu(display("Error translating predicate: {}", msg))] - ReadBufferPredicate { msg: String, pred: Predicate }, } pub type Result = std::result::Result; @@ -272,160 +269,3 @@ impl Database for Db { .context(MutableBufferRead) } } - -pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result { - // Try to convert non-time column expressions into binary expressions - // that are compatible with the read buffer. - match predicate - .exprs - .iter() - .map(read_buffer::BinaryExpr::try_from) - .collect::, _>>() - { - Ok(exprs) => { - // Construct a `ReadBuffer` predicate with or without - // InfluxDB-specific expressions on the time column. - Ok(match predicate.range { - Some(range) => { - read_buffer::Predicate::with_time_range(&exprs, range.start, range.end) - } - None => read_buffer::Predicate::new(exprs), - }) - } - Err(e) => Err(Error::ReadBufferPredicate { - msg: e, - pred: predicate.clone(), - }), - } -} - -#[cfg(test)] -pub mod test { - use super::*; - use arrow_deps::datafusion::logical_plan::Expr; - use arrow_deps::datafusion::scalar::ScalarValue; - - use query::predicate::PredicateBuilder; - use read_buffer::BinaryExpr as RBBinaryExpr; - use read_buffer::Predicate as RBPredicate; - - #[test] - fn into_read_buffer_predicate() { - let cases = vec![ - // empty predicate - (PredicateBuilder::default().build(), RBPredicate::default()), - // just a time range - ( - PredicateBuilder::default() - .timestamp_range(100, 2000) - .build(), - RBPredicate::with_time_range(&[], 100, 2000), - ), - // just a single non-time-range expression - ( - PredicateBuilder::default() - .add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal( - ScalarValue::Utf8(Some("Star Roving".to_owned())), - ))) - .build(), - RBPredicate::new(vec![RBBinaryExpr::from(("track", "=", "Star Roving"))]), - ), - // multiple non-time-range expressions - ( - PredicateBuilder::default() - .add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal( - ScalarValue::Utf8(Some("Star Roving".to_owned())), - ))) - .add_expr( - Expr::Column("counter".to_owned()) - .gt(Expr::Literal(ScalarValue::Int64(Some(2992)))), - ) - .build(), - RBPredicate::new(vec![ - RBBinaryExpr::from(("track", "=", "Star Roving")), - RBBinaryExpr::from(("counter", ">", 2992_i64)), - ]), - ), - // a bit of everything - ( - PredicateBuilder::default() - .timestamp_range(100, 2000) - .add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal( - ScalarValue::Utf8(Some("Star Roving".to_owned())), - ))) - .add_expr( - Expr::Column("counter".to_owned()) - .gt(Expr::Literal(ScalarValue::Int64(Some(2992)))), - ) - .build(), - RBPredicate::with_time_range( - &[ - RBBinaryExpr::from(("track", "=", "Star Roving")), - RBBinaryExpr::from(("counter", ">", 2992_i64)), - ], - 100, - 2000, - ), - ), - ]; - - for (predicate, exp) in cases { - assert_eq!(to_read_buffer_predicate(&predicate).unwrap(), exp); - } - - let cases = vec![ - // not a binary expression - ( - PredicateBuilder::default() - .add_expr(Expr::Literal(ScalarValue::Int64(Some(100_i64)))) - .build(), - "unsupported expression type Int64(100)", - ), - // left side must be a column - ( - PredicateBuilder::default() - .add_expr( - Expr::Literal(ScalarValue::Utf8(Some("The Stove &".to_owned()))).eq( - Expr::Literal(ScalarValue::Utf8(Some("The Toaster".to_owned()))), - ), - ) - .build(), - "unsupported left expression Utf8(\"The Stove &\")", - ), - // unsupported operator LIKE - ( - PredicateBuilder::default() - .add_expr(Expr::Column("track".to_owned()).like(Expr::Literal( - ScalarValue::Utf8(Some("Star Roving".to_owned())), - ))) - .build(), - "unsupported operator Like", - ), - // right side must be a literal - ( - PredicateBuilder::default() - .add_expr(Expr::Column("Intermezzo 1".to_owned()).eq(Expr::Wildcard)) - .build(), - "unsupported right expression *", - ), - // binary expression like foo = NULL not supported - ( - PredicateBuilder::default() - .add_expr( - Expr::Column("track".to_owned()).eq(Expr::Literal(ScalarValue::Utf8(None))), - ) - .build(), - "NULL literal not supported", - ), - ]; - - for (predicate, exp) in cases { - match to_read_buffer_predicate(&predicate).unwrap_err() { - Error::ReadBufferPredicate { msg, pred: _ } => { - assert_eq!(msg, exp.to_owned()); - } - _ => panic!("fail"), - } - } - } -} diff --git a/server/src/db/pred.rs b/server/src/db/pred.rs new file mode 100644 index 0000000000..b5f03b36eb --- /dev/null +++ b/server/src/db/pred.rs @@ -0,0 +1,171 @@ +//! This module contains code to convert between query predicates and +//! the predicates required by the various storage formats + +use std::convert::TryFrom; + +use query::predicate::Predicate; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error translating predicate: {}", msg))] + ReadBufferPredicate { msg: String, pred: Predicate }, +} + +pub type Result = std::result::Result; + +pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result { + // Try to convert non-time column expressions into binary expressions + // that are compatible with the read buffer. + match predicate + .exprs + .iter() + .map(read_buffer::BinaryExpr::try_from) + .collect::, _>>() + { + Ok(exprs) => { + // Construct a `ReadBuffer` predicate with or without + // InfluxDB-specific expressions on the time column. + Ok(match predicate.range { + Some(range) => { + read_buffer::Predicate::with_time_range(&exprs, range.start, range.end) + } + None => read_buffer::Predicate::new(exprs), + }) + } + Err(e) => Err(Error::ReadBufferPredicate { + msg: e, + pred: predicate.clone(), + }), + } +} + +#[cfg(test)] +pub mod test { + use super::*; + use arrow_deps::datafusion::logical_plan::Expr; + use arrow_deps::datafusion::scalar::ScalarValue; + + use query::predicate::PredicateBuilder; + use read_buffer::BinaryExpr as RBBinaryExpr; + use read_buffer::Predicate as RBPredicate; + + #[test] + fn into_read_buffer_predicate() { + let cases = vec![ + // empty predicate + (PredicateBuilder::default().build(), RBPredicate::default()), + // just a time range + ( + PredicateBuilder::default() + .timestamp_range(100, 2000) + .build(), + RBPredicate::with_time_range(&[], 100, 2000), + ), + // just a single non-time-range expression + ( + PredicateBuilder::default() + .add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal( + ScalarValue::Utf8(Some("Star Roving".to_owned())), + ))) + .build(), + RBPredicate::new(vec![RBBinaryExpr::from(("track", "=", "Star Roving"))]), + ), + // multiple non-time-range expressions + ( + PredicateBuilder::default() + .add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal( + ScalarValue::Utf8(Some("Star Roving".to_owned())), + ))) + .add_expr( + Expr::Column("counter".to_owned()) + .gt(Expr::Literal(ScalarValue::Int64(Some(2992)))), + ) + .build(), + RBPredicate::new(vec![ + RBBinaryExpr::from(("track", "=", "Star Roving")), + RBBinaryExpr::from(("counter", ">", 2992_i64)), + ]), + ), + // a bit of everything + ( + PredicateBuilder::default() + .timestamp_range(100, 2000) + .add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal( + ScalarValue::Utf8(Some("Star Roving".to_owned())), + ))) + .add_expr( + Expr::Column("counter".to_owned()) + .gt(Expr::Literal(ScalarValue::Int64(Some(2992)))), + ) + .build(), + RBPredicate::with_time_range( + &[ + RBBinaryExpr::from(("track", "=", "Star Roving")), + RBBinaryExpr::from(("counter", ">", 2992_i64)), + ], + 100, + 2000, + ), + ), + ]; + + for (predicate, exp) in cases { + assert_eq!(to_read_buffer_predicate(&predicate).unwrap(), exp); + } + + let cases = vec![ + // not a binary expression + ( + PredicateBuilder::default() + .add_expr(Expr::Literal(ScalarValue::Int64(Some(100_i64)))) + .build(), + "unsupported expression type Int64(100)", + ), + // left side must be a column + ( + PredicateBuilder::default() + .add_expr( + Expr::Literal(ScalarValue::Utf8(Some("The Stove &".to_owned()))).eq( + Expr::Literal(ScalarValue::Utf8(Some("The Toaster".to_owned()))), + ), + ) + .build(), + "unsupported left expression Utf8(\"The Stove &\")", + ), + // unsupported operator LIKE + ( + PredicateBuilder::default() + .add_expr(Expr::Column("track".to_owned()).like(Expr::Literal( + ScalarValue::Utf8(Some("Star Roving".to_owned())), + ))) + .build(), + "unsupported operator Like", + ), + // right side must be a literal + ( + PredicateBuilder::default() + .add_expr(Expr::Column("Intermezzo 1".to_owned()).eq(Expr::Wildcard)) + .build(), + "unsupported right expression *", + ), + // binary expression like foo = NULL not supported + ( + PredicateBuilder::default() + .add_expr( + Expr::Column("track".to_owned()).eq(Expr::Literal(ScalarValue::Utf8(None))), + ) + .build(), + "NULL literal not supported", + ), + ]; + + for (predicate, exp) in cases { + match to_read_buffer_predicate(&predicate).unwrap_err() { + Error::ReadBufferPredicate { msg, pred: _ } => { + assert_eq!(msg, exp.to_owned()); + } + } + } + } +}