refactor: DRY up the predicate logic

pull/24376/head
Edd Robinson 2020-12-02 17:52:49 +00:00
parent ab83288067
commit 4dc5cc46a9
2 changed files with 39 additions and 81 deletions

View File

@ -1,5 +1,5 @@
/// Possible comparison operators
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum Operator {
Equal,
NotEqual,

View File

@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::{borrow::Cow, collections::BTreeMap};
use arrow_deps::arrow::datatypes::SchemaRef;
@ -208,13 +208,9 @@ impl Segment {
}
}
// Determines the set of row ids that satisfy the time range and all of the
// optional predicates.
//
// TODO(edd): right now `time_range` is special cased so we can use the
// optimised execution path in the column to filter on a range. However,
// eventually we should be able to express the time range as just another
// one or two predicates.
// Determines the set of row ids that satisfy the provided predicates.
// If `predicates` contains two predicates on the time column they are
// special-cased.
fn row_ids_from_predicates(&self, predicates: &[Predicate<'_>]) -> RowIDsOption {
// TODO(edd): perf - potentially pool this so we can re-use it once
// rows have been materialised and it's no longer needed.
@ -228,6 +224,7 @@ impl Segment {
// segment but not re-used for subsequent calls _to_ the segment.
let mut dst = RowIDs::new_bitmap();
let mut predicates = Cow::Borrowed(predicates);
// If there is a time-range in the predicates (two time predicates), then
// execute an optimised version that will use a range based predicate
// on the time column.
@ -237,10 +234,34 @@ impl Segment {
.count()
== 2
{
return self.row_ids_from_predicates_with_time_range(predicates, dst, result_row_ids);
// Apply optimised filtering to time column
let time_pred_row_ids =
self.row_ids_from_predicates_with_time_range(predicates.as_ref(), dst);
match time_pred_row_ids {
// No matching rows based on time range
RowIDsOption::None(_) => return time_pred_row_ids,
// all rows match - continue to apply other predicates
RowIDsOption::All(_dst) => {
dst = _dst; // hand buffer back
}
for (col_name, (op, value)) in predicates {
// some rows match - continue to apply predicates
RowIDsOption::Some(row_ids) => {
// fill the result row id set with the matching rows from the
// time column.
result_row_ids.union(&row_ids);
dst = row_ids // hand buffer back
}
}
// remove time predicates so they're not processed again
let mut filtered_predicates = predicates.to_vec();
filtered_predicates.retain(|(col, _)| *col != TIME_COLUMN_NAME);
predicates = Cow::Owned(filtered_predicates);
}
for (col_name, (op, value)) in predicates.iter() {
// N.B column should always exist because validation of
// predicates is not the responsibility of the `Segment`.
let col = self.column_by_name(col_name).unwrap();
@ -273,24 +294,19 @@ impl Segment {
}
if result_row_ids.is_empty() {
// All rows matched all predicates - return the empty buffer.
// All rows matched all predicates because any predictates not matching
// any rows would have resulted in an early return.
return RowIDsOption::All(result_row_ids);
}
RowIDsOption::Some(result_row_ids)
}
// Determines the set of row ids that satisfy the time range and all of the
// optional predicates.
//
// TODO(edd): right now `time_range` is special cased so we can use the
// optimised execution path in the column to filter on a range. However,
// eventually we should be able to express the time range as just another
// one or two predicates.
// An optimised function for applying two comparison predicates to a time
// column at once.
fn row_ids_from_predicates_with_time_range(
&self,
predicates: &[Predicate<'_>],
mut dst: RowIDs,
mut result_row_ids: RowIDs,
dst: RowIDs,
) -> RowIDsOption {
// find the time range predicates and execute a specialised range based
// row id lookup.
@ -300,69 +316,11 @@ impl Segment {
.collect::<Vec<_>>();
assert!(time_predicates.len() == 2);
let time_row_ids = self.time_column().row_ids_filter_range(
self.time_column().row_ids_filter_range(
&time_predicates[0].1, // min time
&time_predicates[1].1, // max time
dst,
);
match time_row_ids {
// No matching rows based on time range - return buffer
RowIDsOption::None(_) => return time_row_ids,
// all rows match - continue to apply predicates
RowIDsOption::All(_dst) => {
dst = _dst; // hand buffer back
}
// some rows match - continue to apply predicates
RowIDsOption::Some(row_ids) => {
// union empty result set with matching timestamp rows
result_row_ids.union(&row_ids);
dst = row_ids // hand buffer back
}
}
for (col_name, (op, value)) in predicates {
if col_name == &TIME_COLUMN_NAME {
continue; // we already processed the time column as a special case.
}
// N.B column should always exist because validation of
// predicates should happen at the `Table` level.
let col = self.column_by_name(col_name).unwrap();
// Explanation of how this buffer pattern works here. The idea is
// that the buffer should be returned to the caller so it can be
// re-used on other columns. To do that we need to hand the buffer
// back even if we haven't populated it with any results.
match col.row_ids_filter(op, value, dst) {
// No rows will be returned for the segment because this column
// doe not match any rows.
RowIDsOption::None(_dst) => return RowIDsOption::None(_dst),
// Intersect the row ids found at this column with all those
// found on other column predicates.
RowIDsOption::Some(row_ids) => {
if result_row_ids.is_empty() {
result_row_ids.union(&row_ids)
}
result_row_ids.intersect(&row_ids);
dst = row_ids; // hand buffer back
}
// This is basically a no-op because all rows match the
// predicate on this column.
RowIDsOption::All(_dst) => {
dst = _dst; // hand buffer back
}
}
}
if result_row_ids.is_empty() {
// All rows matched all predicates - return the empty buffer.
return RowIDsOption::All(result_row_ids);
}
RowIDsOption::Some(result_row_ids)
)
}
}