Merge pull request #46 from influxdata/unify-predicate

refactor: unify predicate evaluation between RocksDB and in-memory
pull/24376/head
Carol (Nichols || Goulding) 2020-02-24 08:21:05 -05:00 committed by GitHub
commit 6ec6f282f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 158 deletions

View File

@ -1,12 +1,12 @@
use crate::delorean::node::{Comparison, Logical, Value};
use crate::delorean::{Node, Predicate};
use crate::line_parser::{ParseError, Point, PointType};
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::predicate::{Evaluate, EvaluateVisitor};
use crate::storage::series_store::{ReadPoint, SeriesStore};
use crate::storage::{Range, SeriesDataType, StorageError};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use std::sync::{Arc, Mutex, RwLock};
use croaring::Treemap;
/// memdb implements an in memory database for the InvertedIndex and SeriesStore traits. It
@ -491,87 +491,16 @@ impl<T: Clone> Iterator for PointsIterator<T> {
}
}
fn evaluate_node(
series_map: &RwLockReadGuard<'_, SeriesMap>,
n: &Node,
) -> Result<Treemap, StorageError> {
if n.children.len() != 2 {
return Err(StorageError {
description: format!(
"expected only two children of node but found {}",
n.children.len()
),
});
}
fn evaluate_node(series_map: &SeriesMap, n: &Node) -> Result<Treemap, StorageError> {
struct Visitor<'a>(&'a SeriesMap);
match &n.value {
Some(node_value) => match node_value {
Value::Logical(l) => {
let l = Logical::from_i32(*l).unwrap();
evaluate_logical(series_map, &n.children[0], &n.children[1], l)
}
Value::Comparison(c) => {
let c = Comparison::from_i32(*c).unwrap();
evaluate_comparison(series_map, &n.children[0], &n.children[1], c)
}
val => Err(StorageError {
description: format!("evaluate_node called on wrong type {:?}", val),
}),
},
None => Err(StorageError {
description: "emtpy node value".to_string(),
}),
}
}
fn evaluate_logical(
series_map: &RwLockReadGuard<'_, SeriesMap>,
left: &Node,
right: &Node,
op: Logical,
) -> Result<Treemap, StorageError> {
let mut left_result = evaluate_node(series_map, left)?;
let right_result = evaluate_node(series_map, right)?;
match op {
Logical::And => left_result.and_inplace(&right_result),
Logical::Or => left_result.or_inplace(&right_result),
};
Ok(left_result)
}
fn evaluate_comparison(
series_map: &RwLockReadGuard<'_, SeriesMap>,
left: &Node,
right: &Node,
op: Comparison,
) -> Result<Treemap, StorageError> {
let left = match &left.value {
Some(Value::TagRefValue(s)) => s,
_ => {
return Err(StorageError {
description: "expected left operand to be a TagRefValue".to_string(),
})
impl EvaluateVisitor for Visitor<'_> {
fn equal(&mut self, left: &str, right: &str) -> Result<Treemap, StorageError> {
Ok(self.0.posting_list_for_key_value(left, right))
}
};
let right = match &right.value {
Some(Value::StringValue(s)) => s,
_ => {
return Err(StorageError {
description: "unable to run comparison against anything other than a string"
.to_string(),
})
}
};
match op {
Comparison::Equal => Ok(series_map.posting_list_for_key_value(&left, &right)),
comp => Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
}),
}
Evaluate::evaluate(Visitor(series_map), n)
}
impl InvertedIndex for MemDB {

View File

@ -1,7 +1,8 @@
use crate::delorean::node::Logical;
use crate::delorean::node::{Comparison, Value};
use crate::delorean::{node, Node, Predicate};
use crate::storage::StorageError;
use croaring::Treemap;
use std::iter::Peekable;
use std::str::Chars;
@ -217,6 +218,93 @@ fn eat_whitespace(chars: &mut Peekable<Chars<'_>>) {
}
}
pub trait EvaluateVisitor {
fn equal(&mut self, left: &str, right: &str) -> Result<Treemap, StorageError>;
}
pub struct Evaluate<V: EvaluateVisitor>(V);
impl<V: EvaluateVisitor> Evaluate<V> {
pub fn evaluate(visitor: V, node: &Node) -> Result<Treemap, StorageError> {
Self(visitor).node(node)
}
fn node(&mut self, n: &Node) -> Result<Treemap, StorageError> {
if n.children.len() != 2 {
return Err(StorageError {
description: format!(
"expected only two children of node but found {}",
n.children.len()
),
});
}
match &n.value {
Some(node_value) => match node_value {
Value::Logical(l) => {
let l = Logical::from_i32(*l).unwrap();
self.logical(&n.children[0], &n.children[1], l)
}
Value::Comparison(c) => {
let c = Comparison::from_i32(*c).unwrap();
self.comparison(&n.children[0], &n.children[1], c)
}
val => Err(StorageError {
description: format!("Evaluate::node called on wrong type {:?}", val),
}),
},
None => Err(StorageError {
description: "emtpy node value".to_string(),
}),
}
}
fn logical(&mut self, left: &Node, right: &Node, op: Logical) -> Result<Treemap, StorageError> {
let mut left_result = self.node(left)?;
let right_result = self.node(right)?;
match op {
Logical::And => left_result.and_inplace(&right_result),
Logical::Or => left_result.or_inplace(&right_result),
};
Ok(left_result)
}
fn comparison(
&mut self,
left: &Node,
right: &Node,
op: Comparison,
) -> Result<Treemap, StorageError> {
let left = match &left.value {
Some(Value::TagRefValue(s)) => s,
_ => {
return Err(StorageError {
description: "expected left operand to be a TagRefValue".to_string(),
})
}
};
let right = match &right.value {
Some(Value::StringValue(s)) => s,
_ => {
return Err(StorageError {
description: "unable to run comparison against anything other than a string"
.to_string(),
})
}
};
match op {
Comparison::Equal => self.0.equal(left, right),
comp => Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,8 +1,8 @@
use crate::delorean::node::{Comparison, Logical, Value};
use crate::delorean::{Bucket, IndexLevel, Node, Predicate};
use crate::line_parser::PointType;
use crate::storage::config_store::ConfigStore;
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::predicate::{Evaluate, EvaluateVisitor};
use crate::storage::series_store::{ReadPoint, SeriesStore};
use crate::storage::{Range, SeriesDataType, StorageError};
@ -318,85 +318,25 @@ impl RocksDB {
}
fn evaluate_node(&self, bucket_id: u32, n: &Node) -> Result<Treemap, StorageError> {
if n.children.len() != 2 {
return Err(StorageError {
description: format!(
"expected only two children of node but found {}",
n.children.len()
),
});
struct Visitor<'a> {
db: &'a RocksDB,
bucket_id: u32,
};
impl EvaluateVisitor for Visitor<'_> {
fn equal(&mut self, left: &str, right: &str) -> Result<Treemap, StorageError> {
self.db
.get_posting_list_for_tag_key_value(self.bucket_id, left, right)
}
}
match &n.value {
Some(node_value) => match node_value {
Value::Logical(l) => {
let l = Logical::from_i32(*l).unwrap();
self.evaluate_logical(bucket_id, &n.children[0], &n.children[1], l)
}
Value::Comparison(c) => {
let c = Comparison::from_i32(*c).unwrap();
self.evaluate_comparison(bucket_id, &n.children[0], &n.children[1], c)
}
val => Err(StorageError {
description: format!("evaluate_node called on wrong type {:?}", val),
}),
Evaluate::evaluate(
Visitor {
db: self,
bucket_id,
},
None => Err(StorageError {
description: "emtpy node value".to_string(),
}),
}
}
fn evaluate_logical(
&self,
bucket_id: u32,
left: &Node,
right: &Node,
op: Logical,
) -> Result<Treemap, StorageError> {
let mut left_result = self.evaluate_node(bucket_id, left)?;
let right_result = self.evaluate_node(bucket_id, right)?;
match op {
Logical::And => left_result.and_inplace(&right_result),
Logical::Or => left_result.or_inplace(&right_result),
};
Ok(left_result)
}
fn evaluate_comparison(
&self,
bucket_id: u32,
left: &Node,
right: &Node,
op: Comparison,
) -> Result<Treemap, StorageError> {
let left = match &left.value {
Some(Value::TagRefValue(s)) => s,
_ => {
return Err(StorageError {
description: "expected left operand to be a TagRefValue".to_string(),
})
}
};
let right = match &right.value {
Some(Value::StringValue(s)) => s,
_ => {
return Err(StorageError {
description: "unable to run comparison against anything other than a string"
.to_string(),
})
}
};
match op {
Comparison::Equal => self.get_posting_list_for_tag_key_value(bucket_id, &left, &right),
comp => Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
}),
}
n,
)
}
fn get_posting_list_for_tag_key_value(