refactor: move db predicate conversion to its own module (#664)
parent
601cff9d53
commit
628405bd53
164
server/src/db.rs
164
server/src/db.rs
|
@ -3,7 +3,6 @@
|
|||
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
convert::TryFrom,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc, Mutex,
|
||||
|
@ -13,7 +12,7 @@ use std::{
|
|||
use async_trait::async_trait;
|
||||
use data_types::{data::ReplicatedWrite, database_rules::DatabaseRules};
|
||||
use mutable_buffer::MutableBufferDb;
|
||||
use query::{predicate::Predicate, Database, PartitionChunk};
|
||||
use query::{Database, PartitionChunk};
|
||||
use read_buffer::Database as ReadBufferDb;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
@ -22,6 +21,7 @@ use crate::buffer::Buffer;
|
|||
|
||||
mod chunk;
|
||||
use chunk::DBChunk;
|
||||
pub mod pred;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
|
@ -50,9 +50,6 @@ pub enum Error {
|
|||
MutableBufferWrite {
|
||||
source: mutable_buffer::database::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error translating predicate: {}", msg))]
|
||||
ReadBufferPredicate { msg: String, pred: Predicate },
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
|
@ -272,160 +269,3 @@ impl Database for Db {
|
|||
.context(MutableBufferRead)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result<read_buffer::Predicate, Error> {
|
||||
// Try to convert non-time column expressions into binary expressions
|
||||
// that are compatible with the read buffer.
|
||||
match predicate
|
||||
.exprs
|
||||
.iter()
|
||||
.map(read_buffer::BinaryExpr::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
{
|
||||
Ok(exprs) => {
|
||||
// Construct a `ReadBuffer` predicate with or without
|
||||
// InfluxDB-specific expressions on the time column.
|
||||
Ok(match predicate.range {
|
||||
Some(range) => {
|
||||
read_buffer::Predicate::with_time_range(&exprs, range.start, range.end)
|
||||
}
|
||||
None => read_buffer::Predicate::new(exprs),
|
||||
})
|
||||
}
|
||||
Err(e) => Err(Error::ReadBufferPredicate {
|
||||
msg: e,
|
||||
pred: predicate.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use arrow_deps::datafusion::logical_plan::Expr;
|
||||
use arrow_deps::datafusion::scalar::ScalarValue;
|
||||
|
||||
use query::predicate::PredicateBuilder;
|
||||
use read_buffer::BinaryExpr as RBBinaryExpr;
|
||||
use read_buffer::Predicate as RBPredicate;
|
||||
|
||||
#[test]
|
||||
fn into_read_buffer_predicate() {
|
||||
let cases = vec![
|
||||
// empty predicate
|
||||
(PredicateBuilder::default().build(), RBPredicate::default()),
|
||||
// just a time range
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.timestamp_range(100, 2000)
|
||||
.build(),
|
||||
RBPredicate::with_time_range(&[], 100, 2000),
|
||||
),
|
||||
// just a single non-time-range expression
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.build(),
|
||||
RBPredicate::new(vec![RBBinaryExpr::from(("track", "=", "Star Roving"))]),
|
||||
),
|
||||
// multiple non-time-range expressions
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.add_expr(
|
||||
Expr::Column("counter".to_owned())
|
||||
.gt(Expr::Literal(ScalarValue::Int64(Some(2992)))),
|
||||
)
|
||||
.build(),
|
||||
RBPredicate::new(vec![
|
||||
RBBinaryExpr::from(("track", "=", "Star Roving")),
|
||||
RBBinaryExpr::from(("counter", ">", 2992_i64)),
|
||||
]),
|
||||
),
|
||||
// a bit of everything
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.timestamp_range(100, 2000)
|
||||
.add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.add_expr(
|
||||
Expr::Column("counter".to_owned())
|
||||
.gt(Expr::Literal(ScalarValue::Int64(Some(2992)))),
|
||||
)
|
||||
.build(),
|
||||
RBPredicate::with_time_range(
|
||||
&[
|
||||
RBBinaryExpr::from(("track", "=", "Star Roving")),
|
||||
RBBinaryExpr::from(("counter", ">", 2992_i64)),
|
||||
],
|
||||
100,
|
||||
2000,
|
||||
),
|
||||
),
|
||||
];
|
||||
|
||||
for (predicate, exp) in cases {
|
||||
assert_eq!(to_read_buffer_predicate(&predicate).unwrap(), exp);
|
||||
}
|
||||
|
||||
let cases = vec![
|
||||
// not a binary expression
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Literal(ScalarValue::Int64(Some(100_i64))))
|
||||
.build(),
|
||||
"unsupported expression type Int64(100)",
|
||||
),
|
||||
// left side must be a column
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(
|
||||
Expr::Literal(ScalarValue::Utf8(Some("The Stove &".to_owned()))).eq(
|
||||
Expr::Literal(ScalarValue::Utf8(Some("The Toaster".to_owned()))),
|
||||
),
|
||||
)
|
||||
.build(),
|
||||
"unsupported left expression Utf8(\"The Stove &\")",
|
||||
),
|
||||
// unsupported operator LIKE
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("track".to_owned()).like(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.build(),
|
||||
"unsupported operator Like",
|
||||
),
|
||||
// right side must be a literal
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("Intermezzo 1".to_owned()).eq(Expr::Wildcard))
|
||||
.build(),
|
||||
"unsupported right expression *",
|
||||
),
|
||||
// binary expression like foo = NULL not supported
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(
|
||||
Expr::Column("track".to_owned()).eq(Expr::Literal(ScalarValue::Utf8(None))),
|
||||
)
|
||||
.build(),
|
||||
"NULL literal not supported",
|
||||
),
|
||||
];
|
||||
|
||||
for (predicate, exp) in cases {
|
||||
match to_read_buffer_predicate(&predicate).unwrap_err() {
|
||||
Error::ReadBufferPredicate { msg, pred: _ } => {
|
||||
assert_eq!(msg, exp.to_owned());
|
||||
}
|
||||
_ => panic!("fail"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
//! This module contains code to convert between query predicates and
|
||||
//! the predicates required by the various storage formats
|
||||
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use query::predicate::Predicate;
|
||||
use snafu::Snafu;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error translating predicate: {}", msg))]
|
||||
ReadBufferPredicate { msg: String, pred: Predicate },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result<read_buffer::Predicate> {
|
||||
// Try to convert non-time column expressions into binary expressions
|
||||
// that are compatible with the read buffer.
|
||||
match predicate
|
||||
.exprs
|
||||
.iter()
|
||||
.map(read_buffer::BinaryExpr::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
{
|
||||
Ok(exprs) => {
|
||||
// Construct a `ReadBuffer` predicate with or without
|
||||
// InfluxDB-specific expressions on the time column.
|
||||
Ok(match predicate.range {
|
||||
Some(range) => {
|
||||
read_buffer::Predicate::with_time_range(&exprs, range.start, range.end)
|
||||
}
|
||||
None => read_buffer::Predicate::new(exprs),
|
||||
})
|
||||
}
|
||||
Err(e) => Err(Error::ReadBufferPredicate {
|
||||
msg: e,
|
||||
pred: predicate.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use arrow_deps::datafusion::logical_plan::Expr;
|
||||
use arrow_deps::datafusion::scalar::ScalarValue;
|
||||
|
||||
use query::predicate::PredicateBuilder;
|
||||
use read_buffer::BinaryExpr as RBBinaryExpr;
|
||||
use read_buffer::Predicate as RBPredicate;
|
||||
|
||||
#[test]
|
||||
fn into_read_buffer_predicate() {
|
||||
let cases = vec![
|
||||
// empty predicate
|
||||
(PredicateBuilder::default().build(), RBPredicate::default()),
|
||||
// just a time range
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.timestamp_range(100, 2000)
|
||||
.build(),
|
||||
RBPredicate::with_time_range(&[], 100, 2000),
|
||||
),
|
||||
// just a single non-time-range expression
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.build(),
|
||||
RBPredicate::new(vec![RBBinaryExpr::from(("track", "=", "Star Roving"))]),
|
||||
),
|
||||
// multiple non-time-range expressions
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.add_expr(
|
||||
Expr::Column("counter".to_owned())
|
||||
.gt(Expr::Literal(ScalarValue::Int64(Some(2992)))),
|
||||
)
|
||||
.build(),
|
||||
RBPredicate::new(vec![
|
||||
RBBinaryExpr::from(("track", "=", "Star Roving")),
|
||||
RBBinaryExpr::from(("counter", ">", 2992_i64)),
|
||||
]),
|
||||
),
|
||||
// a bit of everything
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.timestamp_range(100, 2000)
|
||||
.add_expr(Expr::Column("track".to_owned()).eq(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.add_expr(
|
||||
Expr::Column("counter".to_owned())
|
||||
.gt(Expr::Literal(ScalarValue::Int64(Some(2992)))),
|
||||
)
|
||||
.build(),
|
||||
RBPredicate::with_time_range(
|
||||
&[
|
||||
RBBinaryExpr::from(("track", "=", "Star Roving")),
|
||||
RBBinaryExpr::from(("counter", ">", 2992_i64)),
|
||||
],
|
||||
100,
|
||||
2000,
|
||||
),
|
||||
),
|
||||
];
|
||||
|
||||
for (predicate, exp) in cases {
|
||||
assert_eq!(to_read_buffer_predicate(&predicate).unwrap(), exp);
|
||||
}
|
||||
|
||||
let cases = vec![
|
||||
// not a binary expression
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Literal(ScalarValue::Int64(Some(100_i64))))
|
||||
.build(),
|
||||
"unsupported expression type Int64(100)",
|
||||
),
|
||||
// left side must be a column
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(
|
||||
Expr::Literal(ScalarValue::Utf8(Some("The Stove &".to_owned()))).eq(
|
||||
Expr::Literal(ScalarValue::Utf8(Some("The Toaster".to_owned()))),
|
||||
),
|
||||
)
|
||||
.build(),
|
||||
"unsupported left expression Utf8(\"The Stove &\")",
|
||||
),
|
||||
// unsupported operator LIKE
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("track".to_owned()).like(Expr::Literal(
|
||||
ScalarValue::Utf8(Some("Star Roving".to_owned())),
|
||||
)))
|
||||
.build(),
|
||||
"unsupported operator Like",
|
||||
),
|
||||
// right side must be a literal
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(Expr::Column("Intermezzo 1".to_owned()).eq(Expr::Wildcard))
|
||||
.build(),
|
||||
"unsupported right expression *",
|
||||
),
|
||||
// binary expression like foo = NULL not supported
|
||||
(
|
||||
PredicateBuilder::default()
|
||||
.add_expr(
|
||||
Expr::Column("track".to_owned()).eq(Expr::Literal(ScalarValue::Utf8(None))),
|
||||
)
|
||||
.build(),
|
||||
"NULL literal not supported",
|
||||
),
|
||||
];
|
||||
|
||||
for (predicate, exp) in cases {
|
||||
match to_read_buffer_predicate(&predicate).unwrap_err() {
|
||||
Error::ReadBufferPredicate { msg, pred: _ } => {
|
||||
assert_eq!(msg, exp.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue