refactor: rename builder like predicate methods to be `with_` (#4808)

* refactor: rename builder like predicate methods to be `with_`

* fix: merge conflict

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-06-09 07:26:03 -04:00 committed by GitHub
parent d8331e8679
commit 2ec7764fdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 243 additions and 241 deletions

View File

@ -238,9 +238,9 @@ mod tests {
#[test]
fn query_round_trip() {
let rust_predicate = predicate::Predicate::new()
.timestamp_range(1, 100)
.add_expr(col("foo"))
.add_value_expr(col("_value").eq(lit("bar")).try_into().unwrap());
.with_range(1, 100)
.with_expr(col("foo"))
.with_value_expr(col("_value").eq(lit("bar")).try_into().unwrap());
let rust_query = IngesterQueryRequest::new(
"mydb".into(),

View File

@ -400,7 +400,7 @@ mod tests {
// tag1=VT
let expr = col("tag1").eq(lit("VT"));
let pred = Predicate::default().add_expr(expr);
let pred = Predicate::default().with_expr(expr);
let exc = Executor::new(1);
let stream = query(&exc, batch, pred, selection).await.unwrap();
@ -438,7 +438,7 @@ mod tests {
// tag1=UT
let expr = col("tag1").eq(lit("UT"));
let pred = Predicate::default().add_expr(expr);
let pred = Predicate::default().with_expr(expr);
let exc = Executor::new(1);
let stream = query(&exc, batch, pred, selection).await.unwrap();
@ -534,7 +534,7 @@ mod tests {
// read data from all scenarios, filter out column day, city Medford, time outside range [0, 42)
request.columns = ["city", "temp", "time"].map(Into::into).into();
let expr = col("city").not_eq(lit("Medford"));
let pred = Predicate::default().add_expr(expr).timestamp_range(0, 42);
let pred = Predicate::default().with_expr(expr).with_range(0, 42);
request.predicate = Some(pred);
let expected = vec![
"+------------+------+--------------------------------+",
@ -651,7 +651,7 @@ mod tests {
// read data from all scenarios, filter out column day, city Medford, time outside range [0, 42)
request.columns = vec!["city".to_string(), "temp".to_string(), "time".to_string()];
let expr = col("city").not_eq(lit("Medford"));
let pred = Predicate::default().add_expr(expr).timestamp_range(0, 42);
let pred = Predicate::default().with_expr(expr).with_range(0, 42);
request.predicate = Some(pred);
let expected = vec![
"+------------+------+--------------------------------+",

View File

@ -444,7 +444,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
// tag1 = VT
let expr = col("tag1").eq(lit("VT"));
let pred = Predicate::default().add_expr(expr);
let pred = Predicate::default().with_expr(expr);
let stream = batch
.read_filter(IOxSessionContext::default(), &pred, Selection::All)
@ -469,7 +469,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
// foo = VT
let expr = col("foo").eq(lit("VT")); // `foo` column not available
let pred = Predicate::default().add_expr(expr);
let pred = Predicate::default().with_expr(expr);
let stream = batch
.read_filter(IOxSessionContext::default(), &pred, Selection::All)
@ -489,7 +489,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
// foo is NULL
let expr = col("foo").is_null();
let pred = Predicate::default().add_expr(expr);
let pred = Predicate::default().with_expr(expr);
let stream = batch
.read_filter(IOxSessionContext::default(), &pred, Selection::All)
@ -579,7 +579,7 @@ mod tests {
// foo is NULL AND tag1=CT
let expr = col("foo").is_null().and(col("tag1").eq(lit("CT")));
let pred = Predicate::default().add_expr(expr);
let pred = Predicate::default().with_expr(expr);
let stream = batch
.read_filter(IOxSessionContext::default(), &pred, selection)

View File

@ -1991,11 +1991,11 @@ mod tests {
let expr = when(col("foo").is_null(), lit(""))
.otherwise(col("foo"))
.unwrap();
let silly_predicate = Predicate::new().add_expr(expr.eq(lit("bar")));
let silly_predicate = Predicate::new().with_expr(expr.eq(lit("bar")));
// verify that the predicate was rewritten to `foo = 'bar'`
let expr = col("foo").eq(lit("bar"));
let expected_predicate = Predicate::new().add_expr(expr);
let expected_predicate = Predicate::new().with_expr(expr);
run_test_with_predicate(&func, silly_predicate, expected_predicate).await;
@ -2004,13 +2004,13 @@ mod tests {
//
// https://github.com/influxdata/influxdb_iox/issues/3601
// _measurement = 'foo'
let silly_predicate = Predicate::new().add_expr(col("_measurement").eq(lit("foo")));
let silly_predicate = Predicate::new().with_expr(col("_measurement").eq(lit("foo")));
// verify that the predicate was rewritten to `false` as the
// measurement name is `h20`
let expr = lit(false);
let expected_predicate = Predicate::new().add_expr(expr);
let expected_predicate = Predicate::new().with_expr(expr);
run_test_with_predicate(&func, silly_predicate, expected_predicate).await;
// ------------- Test 3 ----------------
@ -2018,7 +2018,7 @@ mod tests {
//
// https://github.com/influxdata/influxdb_iox/issues/3601
// (_measurement = 'foo' or measurement = 'h2o') AND time > 5
let silly_predicate = Predicate::new().add_expr(
let silly_predicate = Predicate::new().with_expr(
col("_measurement")
.eq(lit("foo"))
.or(col("_measurement").eq(lit("h2o")))
@ -2028,7 +2028,7 @@ mod tests {
// verify that the predicate was rewritten to time > 5
let expr = col("time").gt(lit(5));
let expected_predicate = Predicate::new().add_expr(expr);
let expected_predicate = Predicate::new().with_expr(expr);
run_test_with_predicate(&func, silly_predicate, expected_predicate).await;
}

View File

@ -251,7 +251,7 @@ impl TableProvider for ChunkTableProvider {
// Note that `filters` don't actually need to be evaluated in
// the scan for the plans to be correct, they are an extra
// optimization for providers which can offer them
let predicate = Predicate::default().add_pushdown_exprs(filters);
let predicate = Predicate::default().with_pushdown_exprs(filters);
// Now we have a second attempt to prune out chunks based on
// metadata using the pushed down predicate (e.g. in SQL).

View File

@ -258,7 +258,7 @@ mod test {
Some(10.0),
));
let predicate = Predicate::new().add_expr(col("column1").gt(lit(100.0)));
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100.0)));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
assert_eq!(observer.events(), vec!["chunk1: Pruned"]);
@ -278,7 +278,7 @@ mod test {
Some(10),
));
let predicate = Predicate::new().add_expr(col("column1").gt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100)));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -299,7 +299,7 @@ mod test {
Some(10),
));
let predicate = Predicate::new().add_expr(col("column1").gt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100)));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -320,7 +320,7 @@ mod test {
Some(false),
));
let predicate = Predicate::new().add_expr(col("column1"));
let predicate = Predicate::new().with_expr(col("column1"));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -343,7 +343,7 @@ mod test {
),
);
let predicate = Predicate::new().add_expr(col("column1").gt(lit("z")));
let predicate = Predicate::new().with_expr(col("column1").gt(lit("z")));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -363,7 +363,7 @@ mod test {
Some(10.0),
));
let predicate = Predicate::new().add_expr(col("column1").lt(lit(100.0)));
let predicate = Predicate::new().with_expr(col("column1").lt(lit(100.0)));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
assert!(observer.events().is_empty());
@ -383,7 +383,7 @@ mod test {
Some(10),
));
let predicate = Predicate::new().add_expr(col("column1").lt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").lt(lit(100)));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -404,7 +404,7 @@ mod test {
Some(10),
));
let predicate = Predicate::new().add_expr(col("column1").lt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").lt(lit(100)));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -425,7 +425,7 @@ mod test {
Some(true),
));
let predicate = Predicate::new().add_expr(col("column1"));
let predicate = Predicate::new().with_expr(col("column1"));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -448,7 +448,7 @@ mod test {
),
);
let predicate = Predicate::new().add_expr(col("column1").lt(lit("z")));
let predicate = Predicate::new().with_expr(col("column1").lt(lit("z")));
let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate);
@ -493,7 +493,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1"))
as Arc<dyn QueryChunk>;
let predicate = Predicate::new().add_expr(col("column1").gt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100)));
let chunks = vec![c1, c2, c3, c4];
let schema = merge_schema(&chunks);
@ -550,7 +550,7 @@ mod test {
Some(20),
)) as Arc<dyn QueryChunk>;
let predicate = Predicate::new().add_expr(col("column1").gt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100)));
let chunks = vec![c1, c2, c3, c4, c5, c6];
let schema = merge_schema(&chunks);
@ -590,7 +590,7 @@ mod test {
Some(4),
)) as Arc<dyn QueryChunk>;
let predicate = Predicate::new().add_expr(col("column1").gt(lit(100)));
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100)));
let chunks = vec![c1, c2, c3];
let schema = merge_schema(&chunks);
@ -644,7 +644,7 @@ mod test {
),
) as Arc<dyn QueryChunk>;
let predicate = Predicate::new().add_expr(
let predicate = Predicate::new().with_expr(
col("column1")
.is_null()
.not()
@ -709,7 +709,7 @@ mod test {
) as Arc<dyn QueryChunk>;
let predicate =
Predicate::new().add_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5))));
Predicate::new().with_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5))));
let chunks = vec![c1, c2, c3, c4, c5, c6];
let schema = merge_schema(&chunks);

View File

@ -53,8 +53,8 @@ pub const EMPTY_PREDICATE: Predicate = Predicate {
/// use datafusion::logical_plan::{col, lit};
///
/// let p = Predicate::new()
/// .timestamp_range(1, 100)
/// .add_expr(col("foo").eq(lit(42)));
/// .with_range(1, 100)
/// .with_expr(col("foo").eq(lit(42)));
///
/// assert_eq!(
/// p.to_string(),
@ -214,7 +214,7 @@ impl Predicate {
///
/// This is used in certain cases to retain compatibility with the
/// existing storage engine
pub(crate) fn clear_timestamp_if_max_range(mut self) -> Self {
pub(crate) fn with_clear_timestamp_if_max_range(mut self) -> Self {
self.range = self.range.take().and_then(|range| {
if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME {
None
@ -285,7 +285,7 @@ pub enum PredicateMatch {
impl Predicate {
/// Sets the timestamp range
pub fn timestamp_range(mut self, start: i64, end: i64) -> Self {
pub fn with_range(mut self, start: i64, end: i64) -> Self {
// Without more thought, redefining the timestamp range would
// lose the old range. Asser that that cannot happen.
assert!(
@ -298,7 +298,7 @@ impl Predicate {
}
/// sets the optional timestamp range, if any
pub fn timestamp_range_option(mut self, range: Option<TimestampRange>) -> Self {
pub fn with_maybe_timestamp_range(mut self, range: Option<TimestampRange>) -> Self {
// Without more thought, redefining the timestamp range would
// lose the old range. Asser that that cannot happen.
assert!(
@ -310,35 +310,36 @@ impl Predicate {
}
/// Adds an expression to the list of general purpose predicates
pub fn add_expr(mut self, expr: Expr) -> Self {
pub fn with_expr(mut self, expr: Expr) -> Self {
self.exprs.push(expr);
self
}
/// Adds a ValueExpr to the list of value expressons
pub fn add_value_expr(mut self, value_expr: ValueExpr) -> Self {
pub fn with_value_expr(mut self, value_expr: ValueExpr) -> Self {
self.value_expr.push(value_expr);
self
}
/// Builds a regex matching expression from the provided column name and
/// pattern. Values not matching the regex will be filtered out.
pub fn build_regex_match_expr(mut self, column: &str, pattern: impl Into<String>) -> Self {
pub fn with_regex_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
let expr = query_functions::regex_match_expr(col(column), pattern.into());
self.exprs.push(expr);
self
self.with_expr(expr)
}
/// Builds a regex "not matching" expression from the provided column name
/// and pattern. Values *matching* the regex will be filtered out.
pub fn build_regex_not_match_expr(mut self, column: &str, pattern: impl Into<String>) -> Self {
pub fn with_regex_not_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
let expr = query_functions::regex_not_match_expr(col(column), pattern.into());
self.exprs.push(expr);
self
self.with_expr(expr)
}
/// Sets field_column restriction
pub fn field_columns(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
pub fn with_field_columns(
mut self,
columns: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
// We need to distinguish predicates like `column_name In
// (foo, bar)` and `column_name = foo and column_name = bar` in order to handle
// this
@ -356,7 +357,7 @@ impl Predicate {
}
/// Set the partition key restriction
pub fn partition_key(mut self, partition_key: impl Into<String>) -> Self {
pub fn with_partition_key(mut self, partition_key: impl Into<String>) -> Self {
assert!(
self.partition_key.is_none(),
"multiple partition key predicates not suported"
@ -367,7 +368,7 @@ impl Predicate {
/// Adds only the expressions from `filters` that can be pushed down to
/// execution engines.
pub fn add_pushdown_exprs(mut self, filters: &[Expr]) -> Self {
pub fn with_pushdown_exprs(mut self, filters: &[Expr]) -> Self {
// For each expression of the filters, recursively split it, if it is is an AND conjunction
// For example, expression (x AND y) will be split into a vector of 2 expressions [x, y]
let mut exprs = vec![];
@ -502,7 +503,7 @@ mod tests {
#[test]
fn test_non_default_predicate_is_not_empty() {
let p = Predicate::new().timestamp_range(1, 100);
let p = Predicate::new().with_range(1, 100);
assert!(!p.is_empty());
}
@ -582,7 +583,7 @@ mod tests {
println!(" --------------- Filters: {:#?}", filters);
// Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city]
let predicate = Predicate::default().add_pushdown_exprs(&filters);
let predicate = Predicate::default().with_pushdown_exprs(&filters);
println!(" ------------- Predicates: {:#?}", predicate);
assert_eq!(predicate.exprs.len(), 8);
@ -598,7 +599,7 @@ mod tests {
#[test]
fn predicate_display_ts() {
// TODO make this a doc example?
let p = Predicate::new().timestamp_range(1, 100);
let p = Predicate::new().with_range(1, 100);
assert_eq!(p.to_string(), "Predicate range: [1 - 100]");
}
@ -606,8 +607,8 @@ mod tests {
#[test]
fn predicate_display_ts_and_expr() {
let p = Predicate::new()
.timestamp_range(1, 100)
.add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11))));
.with_range(1, 100)
.with_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11))));
assert_eq!(
p.to_string(),
@ -618,10 +619,10 @@ mod tests {
#[test]
fn predicate_display_full() {
let p = Predicate::new()
.timestamp_range(1, 100)
.add_expr(col("foo").eq(lit(42)))
.field_columns(vec!["f1", "f2"])
.partition_key("the_key");
.with_range(1, 100)
.with_expr(col("foo").eq(lit(42)))
.with_field_columns(vec!["f1", "f2"])
.with_partition_key("the_key");
assert_eq!(p.to_string(), "Predicate field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]");
}
@ -629,47 +630,47 @@ mod tests {
#[test]
fn test_clear_timestamp_if_max_range_out_of_range() {
let p = Predicate::new()
.timestamp_range(1, 100)
.add_expr(col("foo").eq(lit(42)));
.with_range(1, 100)
.with_expr(col("foo").eq(lit(42)));
let expected = p.clone();
// no rewrite
assert_eq!(p.clear_timestamp_if_max_range(), expected);
assert_eq!(p.with_clear_timestamp_if_max_range(), expected);
}
#[test]
fn test_clear_timestamp_if_max_range_out_of_range_low() {
let p = Predicate::new()
.timestamp_range(MIN_NANO_TIME, 100)
.add_expr(col("foo").eq(lit(42)));
.with_range(MIN_NANO_TIME, 100)
.with_expr(col("foo").eq(lit(42)));
let expected = p.clone();
// no rewrite
assert_eq!(p.clear_timestamp_if_max_range(), expected);
assert_eq!(p.with_clear_timestamp_if_max_range(), expected);
}
#[test]
fn test_clear_timestamp_if_max_range_out_of_range_high() {
let p = Predicate::new()
.timestamp_range(0, MAX_NANO_TIME)
.add_expr(col("foo").eq(lit(42)));
.with_range(0, MAX_NANO_TIME)
.with_expr(col("foo").eq(lit(42)));
let expected = p.clone();
// no rewrite
assert_eq!(p.clear_timestamp_if_max_range(), expected);
assert_eq!(p.with_clear_timestamp_if_max_range(), expected);
}
#[test]
fn test_clear_timestamp_if_max_range_in_range() {
let p = Predicate::new()
.timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME)
.add_expr(col("foo").eq(lit(42)));
.with_range(MIN_NANO_TIME, MAX_NANO_TIME)
.with_expr(col("foo").eq(lit(42)));
let expected = Predicate::new().add_expr(col("foo").eq(lit(42)));
let expected = Predicate::new().with_expr(col("foo").eq(lit(42)));
// rewrite
assert_eq!(p.clear_timestamp_if_max_range(), expected);
assert_eq!(p.with_clear_timestamp_if_max_range(), expected);
}
}

View File

@ -85,7 +85,7 @@ impl InfluxRpcPredicate {
/// existing storage engine
pub fn clear_timestamp_if_max_range(self) -> Self {
Self {
inner: self.inner.clear_timestamp_if_max_range(),
inner: self.inner.with_clear_timestamp_if_max_range(),
..self
}
}
@ -206,7 +206,7 @@ fn normalize_predicate(
let predicate = match field_projections.into_projection() {
FieldProjection::None => predicate,
FieldProjection::Include(include_field_names) => {
predicate.field_columns(include_field_names)
predicate.with_field_columns(include_field_names)
}
FieldProjection::Exclude(exclude_field_names) => {
// if we don't have the schema, it means the table doesn't exist so we can safely ignore
@ -222,7 +222,7 @@ fn normalize_predicate(
}
});
predicate.field_columns(new_fields)
predicate.with_field_columns(new_fields)
} else {
predicate
}
@ -302,13 +302,13 @@ mod tests {
let predicate = normalize_predicate(
"table",
Some(schema()),
&Predicate::new().add_expr(col("_field").eq(lit("f1"))),
&Predicate::new().with_expr(col("_field").eq(lit("f1"))),
)
.unwrap();
let expected = Predicate::new()
.field_columns(vec!["f1"])
.add_expr(lit(true));
.with_field_columns(vec!["f1"])
.with_expr(lit(true));
assert_eq!(predicate, expected);
}
@ -318,13 +318,14 @@ mod tests {
let predicate = normalize_predicate(
"table",
Some(schema()),
&Predicate::new().add_expr(col("_field").eq(lit("f1")).or(col("_field").eq(lit("f2")))),
&Predicate::new()
.with_expr(col("_field").eq(lit("f1")).or(col("_field").eq(lit("f2")))),
)
.unwrap();
let expected = Predicate::new()
.field_columns(vec!["f1", "f2"])
.add_expr(lit(true));
.with_field_columns(vec!["f1", "f2"])
.with_expr(lit(true));
assert_eq!(predicate, expected);
}
@ -336,7 +337,7 @@ mod tests {
Some(schema()),
&Predicate::new()
// predicate is connected with `and` which is not supported
.add_expr(col("_field").eq(lit("f1")).and(col("_field").eq(lit("f2")))),
.with_expr(col("_field").eq(lit("f1")).and(col("_field").eq(lit("f2")))),
)
.unwrap_err();
@ -350,13 +351,13 @@ mod tests {
let predicate = normalize_predicate(
"table",
Some(schema()),
&Predicate::new().add_expr(col("_field").not_eq(lit("f1"))),
&Predicate::new().with_expr(col("_field").not_eq(lit("f1"))),
)
.unwrap();
let expected = Predicate::new()
.field_columns(vec!["f2"])
.add_expr(lit(true));
.with_field_columns(vec!["f2"])
.with_expr(lit(true));
assert_eq!(predicate, expected);
}
@ -368,8 +369,8 @@ mod tests {
Some(schema()),
&Predicate::new()
// put = and != predicates in *different* exprs
.add_expr(col("_field").eq(lit("f1")))
.add_expr(col("_field").not_eq(lit("f2"))),
.with_expr(col("_field").eq(lit("f1")))
.with_expr(col("_field").not_eq(lit("f2"))),
)
.unwrap_err();

View File

@ -48,7 +48,7 @@ impl TableProvider for QuerierTable {
let predicate = filters
.iter()
.fold(Predicate::new(), |b, expr| b.add_expr(expr.clone()));
.fold(Predicate::new(), |b, expr| b.with_expr(expr.clone()));
let chunks = self
.chunks(&predicate)

View File

@ -50,7 +50,7 @@ async fn run_field_columns_test_case<D>(
#[tokio::test]
async fn test_field_columns_no_predicate() {
let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new_table("NoSuchTable", predicate);
let expected_fields = FieldList::default();
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
@ -61,7 +61,7 @@ async fn test_field_columns_no_predicate() {
#[tokio::test]
async fn test_field_columns_with_pred() {
// get only fields from h20 (but both chunks)
let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new_table("h2o", predicate);
let expected_fields = FieldList {
@ -90,7 +90,7 @@ async fn test_field_columns_with_pred() {
#[tokio::test]
async fn test_field_columns_measurement_pred() {
// get only fields from h2o using a _measurement predicate
let predicate = Predicate::default().add_expr(col("_measurement").eq(lit("h2o")));
let predicate = Predicate::default().with_expr(col("_measurement").eq(lit("h2o")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -119,8 +119,8 @@ async fn test_field_columns_measurement_pred() {
#[tokio::test]
async fn test_field_columns_with_ts_pred() {
let predicate = Predicate::default()
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("MA"))); // state=MA
.with_range(200, 300)
.with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new_table("h2o", predicate);
let expected_fields = FieldList {
@ -138,7 +138,7 @@ async fn test_field_columns_with_ts_pred() {
async fn test_field_name_plan() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().timestamp_range(0, 2000);
let predicate = Predicate::default().with_range(0, 2000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -173,7 +173,7 @@ async fn test_field_name_plan() {
async fn test_field_name_plan_with_delete() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().timestamp_range(0, 2000);
let predicate = Predicate::default().with_range(0, 2000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -206,7 +206,7 @@ async fn test_field_name_plan_with_delete() {
#[tokio::test]
async fn list_field_columns_max_time() {
let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME);
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -222,7 +222,7 @@ async fn list_field_columns_max_time() {
#[tokio::test]
async fn list_field_columns_max_i64() {
let predicate = Predicate::default().timestamp_range(i64::MIN, i64::MAX);
let predicate = Predicate::default().with_range(i64::MIN, i64::MAX);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -240,7 +240,7 @@ async fn list_field_columns_max_i64() {
async fn list_field_columns_max_time_less_one() {
let predicate = Predicate::default()
// one less than max timestamp
.timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME - 1);
.with_range(MIN_NANO_TIME, MAX_NANO_TIME - 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList { fields: vec![] };
@ -250,7 +250,7 @@ async fn list_field_columns_max_time_less_one() {
#[tokio::test]
async fn list_field_columns_max_time_greater_one() {
let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME);
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList { fields: vec![] };

View File

@ -115,7 +115,7 @@ async fn test_read_filter_data_exclusive_predicate() {
let predicate = Predicate::new()
// should not return the 350 row as predicate is
// range.start <= ts < range.end
.timestamp_range(349, 350);
.with_range(349, 350);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![];
@ -127,7 +127,7 @@ async fn test_read_filter_data_exclusive_predicate() {
async fn test_read_filter_data_inclusive_predicate() {
let predicate = Predicate::new()
// should return 350 row!
.timestamp_range(350, 351);
.with_range(350, 351);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -141,7 +141,7 @@ async fn test_read_filter_data_inclusive_predicate() {
async fn test_read_filter_data_exact_predicate() {
let predicate = Predicate::new()
// should return 250 rows!
.timestamp_range(250, 251);
.with_range(250, 251);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -157,7 +157,7 @@ async fn test_read_filter_data_exact_predicate() {
async fn test_read_filter_data_tag_predicate() {
let predicate = Predicate::new()
// region = region
.add_expr(col("region").eq(col("region")));
.with_expr(col("region").eq(col("region")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// expect both series to be returned
@ -173,7 +173,7 @@ async fn test_read_filter_data_tag_predicate() {
async fn test_read_filter_invalid_predicate() {
let predicate = Predicate::new()
// region > 5 (region is a tag(string) column, so this predicate is invalid)
.add_expr(col("region").gt(lit(5i32)));
.with_expr(col("region").gt(lit(5i32)));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "Error during planning: 'Dictionary(Int32, Utf8) > Int32' can't be evaluated because there isn't a common type to coerce the types to";
@ -187,7 +187,7 @@ async fn test_read_filter_invalid_predicate_case() {
// https://github.com/influxdata/influxdb_iox/issues/3635
// model what happens when a field is treated like a tag
// CASE WHEN system" IS NULL THEN '' ELSE system END = 5;
.add_expr(make_empty_tag_ref_expr("system").eq(lit(5i32)));
.with_expr(make_empty_tag_ref_expr("system").eq(lit(5i32)));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Int32' can't be evaluated because there isn't a common type to coerce the types to";
@ -200,7 +200,7 @@ async fn test_read_filter_unknown_column_in_predicate() {
let predicate = Predicate::new()
// mystery_region is not a real column, so this predicate is
// invalid but IOx should be able to handle it (and produce no results)
.add_expr(
.with_expr(
col("baz")
.eq(lit(4i32))
.or(col("bar").and(col("mystery_region").gt(lit(5i32)))),
@ -250,8 +250,8 @@ async fn test_read_filter_data_no_pred_with_delete_all() {
async fn test_read_filter_data_filter() {
// filter out one row in h20
let predicate = Predicate::default()
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("CA"))); // state=CA
.with_range(200, 300)
.with_expr(col("state").eq(lit("CA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -267,8 +267,8 @@ async fn test_read_filter_data_filter() {
// Same results via a != predicate.
let predicate = Predicate::default()
.timestamp_range(200, 300)
.add_expr(col("state").not_eq(lit("MA"))); // state=CA
.with_range(200, 300)
.with_expr(col("state").not_eq(lit("MA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -278,8 +278,8 @@ async fn test_read_filter_data_filter() {
async fn test_read_filter_data_filter_with_delete() {
// filter out one row in h20 but the leftover row was deleted to nothing will be returned
let predicate = Predicate::default()
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("CA"))); // state=CA
.with_range(200, 300)
.with_expr(col("state").eq(lit("CA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
@ -294,8 +294,8 @@ async fn test_read_filter_data_filter_with_delete() {
// Same results via a != predicate.
let predicate = Predicate::default()
.timestamp_range(200, 300)
.add_expr(col("state").not_eq(lit("MA"))); // state=CA
.with_range(200, 300)
.with_expr(col("state").not_eq(lit("MA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
@ -308,9 +308,9 @@ async fn test_read_filter_data_filter_with_delete() {
// Use different predicate to have data returned
let predicate = Predicate::default()
.timestamp_range(100, 300)
.add_expr(col("state").eq(lit("MA"))) // state=MA
.add_expr(col("_measurement").eq(lit("h2o")));
.with_range(100, 300)
.with_expr(col("state").eq(lit("MA"))) // state=MA
.with_expr(col("_measurement").eq(lit("h2o")));
let predicate = InfluxRpcPredicate::new(None, predicate);
@ -330,8 +330,8 @@ async fn test_read_filter_data_filter_with_delete() {
async fn test_read_filter_data_filter_fields() {
// filter out one row in h20
let predicate = Predicate::default()
.field_columns(vec!["other_temp"])
.add_expr(col("state").eq(lit("CA"))); // state=CA
.with_field_columns(vec!["other_temp"])
.with_expr(col("state").eq(lit("CA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
@ -349,8 +349,8 @@ async fn test_read_filter_data_filter_fields() {
async fn test_read_filter_data_filter_measurement_pred() {
// use an expr on table name to pick just the last row from o2
let predicate = Predicate::default()
.timestamp_range(200, 400)
.add_expr(col("_measurement").eq(lit("o2")));
.with_range(200, 400)
.with_expr(col("_measurement").eq(lit("o2")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Only expect other_temp in this location
@ -363,7 +363,7 @@ async fn test_read_filter_data_filter_measurement_pred() {
#[tokio::test]
async fn test_read_filter_data_pred_refers_to_non_existent_column() {
let predicate = Predicate::default().add_expr(col("tag_not_in_h20").eq(lit("foo")));
let predicate = Predicate::default().with_expr(col("tag_not_in_h20").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![] as Vec<&str>;
@ -373,7 +373,7 @@ async fn test_read_filter_data_pred_refers_to_non_existent_column() {
#[tokio::test]
async fn test_read_filter_data_pred_refers_to_non_existent_column_with_delete() {
let predicate = Predicate::default().add_expr(col("tag_not_in_h20").eq(lit("foo")));
let predicate = Predicate::default().with_expr(col("tag_not_in_h20").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![] as Vec<&str>;
@ -384,7 +384,7 @@ async fn test_read_filter_data_pred_refers_to_non_existent_column_with_delete()
#[tokio::test]
async fn test_read_filter_data_pred_no_columns() {
// predicate with no columns,
let predicate = Predicate::default().add_expr(lit("foo").eq(lit("foo")));
let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -398,7 +398,7 @@ async fn test_read_filter_data_pred_no_columns() {
#[tokio::test]
async fn test_read_filter_data_pred_no_columns_with_delete() {
// predicate with no columns,
let predicate = Predicate::default().add_expr(lit("foo").eq(lit("foo")));
let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -412,7 +412,7 @@ async fn test_read_filter_data_pred_no_columns_with_delete() {
#[tokio::test]
async fn test_read_filter_data_pred_no_columns_with_delete_all() {
// predicate with no columns,
let predicate = Predicate::default().add_expr(lit("foo").eq(lit("foo")));
let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Only table disk has no deleted data
@ -427,8 +427,8 @@ async fn test_read_filter_data_pred_no_columns_with_delete_all() {
async fn test_read_filter_data_pred_refers_to_good_and_non_existent_columns() {
// predicate with both a column that does and does not appear
let predicate = Predicate::default()
.add_expr(col("state").eq(lit("MA")))
.add_expr(col("tag_not_in_h20").eq(lit("foo")));
.with_expr(col("state").eq(lit("MA")))
.with_expr(col("tag_not_in_h20").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![] as Vec<&str>;
@ -451,9 +451,9 @@ async fn test_read_filter_data_pred_refers_to_good_and_non_existent_columns() {
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_match() {
let predicate = Predicate::default()
.timestamp_range(200, 300)
.with_range(200, 300)
// will match CA state
.build_regex_match_expr("state", "C.*");
.with_regex_match_expr("state", "C.*");
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -466,9 +466,9 @@ async fn test_read_filter_data_pred_using_regex_match() {
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_match_with_delete() {
let predicate = Predicate::default()
.timestamp_range(200, 300)
.with_range(200, 300)
// will match CA state
.build_regex_match_expr("state", "C.*");
.with_regex_match_expr("state", "C.*");
let predicate = InfluxRpcPredicate::new(None, predicate);
// the selected row was soft deleted
@ -482,9 +482,9 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() {
// Different predicate to have data returned
let predicate = Predicate::default()
.timestamp_range(200, 400)
.with_range(200, 400)
// will match CA state
.build_regex_match_expr("state", "C.*");
.with_regex_match_expr("state", "C.*");
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -510,9 +510,9 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() {
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_not_match() {
let predicate = Predicate::default()
.timestamp_range(200, 300)
.with_range(200, 300)
// will filter out any rows with a state that matches "CA"
.build_regex_not_match_expr("state", "C.*");
.with_regex_not_match_expr("state", "C.*");
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -528,7 +528,7 @@ async fn test_read_filter_data_pred_using_regex_not_match() {
async fn test_read_filter_data_pred_regex_escape() {
let predicate = Predicate::default()
// Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url =~ /https\:\/\/influxdb\.com/`,
.build_regex_match_expr("url", r#"https\://influxdb\.com"#);
.with_regex_match_expr("url", r#"https\://influxdb\.com"#);
let predicate = InfluxRpcPredicate::new(None, predicate);
// expect one series with influxdb.com
@ -543,7 +543,7 @@ async fn test_read_filter_data_pred_regex_escape() {
async fn test_read_filter_data_pred_not_match_regex_escape() {
let predicate = Predicate::default()
// Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url !~ /https\:\/\/influxdb\.com/`,
.build_regex_not_match_expr("url", r#"https\://influxdb\.com"#);
.with_regex_not_match_expr("url", r#"https\://influxdb\.com"#);
let predicate = InfluxRpcPredicate::new(None, predicate);
// expect one series with influxdb.com
@ -563,7 +563,7 @@ async fn test_read_filter_data_pred_unsupported_in_scan() {
// (STATE = 'CA') OR (READING > 0)
let predicate =
Predicate::default().add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0))));
Predicate::default().with_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0))));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Note these results include data from both o2 and h2o
@ -585,7 +585,7 @@ async fn test_read_filter_data_pred_unsupported_in_scan_with_delete() {
// (STATE = 'CA') OR (READING > 0)
let predicate =
Predicate::default().add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0))));
Predicate::default().with_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0))));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Note these results include data from both o2 and h2o
@ -659,8 +659,8 @@ async fn test_read_filter_filter_on_value() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default()
.add_expr(col("_value").eq(lit(1.77)))
.add_expr(col("_field").eq(lit("load4")));
.with_expr(col("_value").eq(lit(1.77)))
.with_expr(col("_field").eq(lit("load4")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -677,7 +677,7 @@ async fn test_read_filter_on_field() {
// Predicate should pick 'temp' field from h2o
// (_field = 'temp')
let p1 = col("_field").eq(lit("temp"));
let predicate = Predicate::default().add_expr(p1);
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -696,7 +696,7 @@ async fn test_read_filter_on_not_field() {
// Predicate should pick up all fields other than 'temp' from h2o
// (_field != 'temp')
let p1 = col("_field").not_eq(lit("temp"));
let predicate = Predicate::default().add_expr(p1);
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -718,7 +718,7 @@ async fn test_read_filter_unsupported_predicate() {
let p1 = col("_field")
.not_eq(lit("temp"))
.or(col("_field").eq(lit("other_temp")));
let predicate = Predicate::default().add_expr(p1);
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "Unsupported _field predicate";
@ -735,7 +735,7 @@ async fn test_read_filter_on_field_single_measurement() {
let p1 = col("_field")
.eq(lit("temp"))
.and(col("_measurement").eq(lit("h2o")));
let predicate = Predicate::default().add_expr(p1);
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -752,7 +752,7 @@ async fn test_read_filter_multi_negation() {
let host = make_empty_tag_ref_expr("host");
let p1 = host.clone().eq(lit("server01")).or(host.eq(lit("")));
let predicate = Predicate::default().add_expr(p1);
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
@ -781,7 +781,7 @@ async fn test_read_filter_on_field_multi_measurement() {
let p2 = col("_field")
.eq(lit("temp"))
.and(col("_measurement").eq(lit("o2")));
let predicate = Predicate::default().add_expr(p1.or(p2));
let predicate = Predicate::default().with_expr(p1.or(p2));
let predicate = InfluxRpcPredicate::new(None, predicate);
// SHOULD NOT contain temp from h2o

View File

@ -165,8 +165,8 @@ async fn test_read_group_data_no_tag_columns_min_with_delete_all() {
#[tokio::test]
async fn test_read_group_data_pred() {
let predicate = Predicate::default()
.add_expr(col("city").eq(lit("LA")))
.timestamp_range(190, 210);
.with_expr(col("city").eq(lit("LA")))
.with_range(190, 210);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
let group_columns = vec!["state"];
@ -188,7 +188,7 @@ async fn test_read_group_data_pred() {
#[tokio::test]
async fn test_read_group_data_field_restriction() {
// restrict to only the temp column
let predicate = Predicate::default().field_columns(vec!["temp"]);
let predicate = Predicate::default().with_field_columns(vec!["temp"]);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
let group_columns = vec!["state"];
@ -213,13 +213,13 @@ async fn test_read_group_data_field_restriction() {
async fn test_grouped_series_set_plan_sum() {
let predicate = Predicate::default()
// city=Boston OR city=Cambridge (filters out LA rows)
.add_expr(
.with_expr(
col("city")
.eq(lit("Boston"))
.or(col("city").eq(lit("Cambridge"))),
)
// fiter out first Cambridge row
.timestamp_range(100, 1000);
.with_range(100, 1000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
@ -247,13 +247,13 @@ async fn test_grouped_series_set_plan_sum() {
async fn test_grouped_series_set_plan_count() {
let predicate = Predicate::default()
// city=Boston OR city=Cambridge (filters out LA rows)
.add_expr(
.with_expr(
col("city")
.eq(lit("Boston"))
.or(col("city").eq(lit("Cambridge"))),
)
// fiter out first Cambridge row
.timestamp_range(100, 1000);
.with_range(100, 1000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Count;
@ -281,13 +281,13 @@ async fn test_grouped_series_set_plan_count() {
async fn test_grouped_series_set_plan_mean() {
let predicate = Predicate::default()
// city=Boston OR city=Cambridge (filters out LA rows)
.add_expr(
.with_expr(
col("city")
.eq(lit("Boston"))
.or(col("city").eq(lit("Cambridge"))),
)
// fiter out first Cambridge row
.timestamp_range(100, 1000);
.with_range(100, 1000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Mean;
@ -313,7 +313,7 @@ async fn test_grouped_series_set_plan_mean() {
async fn test_grouped_series_set_plan_count_measurement_pred() {
let predicate = Predicate::default()
// city = 'Boston' OR (_measurement = o2)
.add_expr(
.with_expr(
col("city")
.eq(lit("Boston"))
.or(col("_measurement").eq(lit("o2"))),
@ -344,7 +344,7 @@ async fn test_grouped_series_set_plan_count_measurement_pred() {
async fn test_grouped_series_set_plan_first() {
let predicate = Predicate::default()
// fiter out first row (ts 1000)
.timestamp_range(1001, 4001);
.with_range(1001, 4001);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::First;
@ -375,8 +375,8 @@ async fn test_grouped_series_set_plan_first_with_nulls() {
// "h2o,state=MA,city=Boston temp=70.4 50",
// "h2o,state=MA,city=Boston other_temp=70.4 250",
// "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000"
.add_expr(col("state").eq(lit("MA")))
.add_expr(col("city").eq(lit("Boston")));
.with_expr(col("state").eq(lit("MA")))
.with_expr(col("city").eq(lit("Boston")));
let predicate = InfluxRpcPredicate::new_table("h2o", predicate);
let agg = Aggregate::First;
@ -404,7 +404,7 @@ async fn test_grouped_series_set_plan_first_with_nulls() {
async fn test_grouped_series_set_plan_last() {
let predicate = Predicate::default()
// fiter out last row (ts 4000)
.timestamp_range(100, 3999);
.with_range(100, 3999);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Last;
@ -435,8 +435,8 @@ async fn test_grouped_series_set_plan_last_with_nulls() {
// "h2o,state=MA,city=Boston temp=70.4 50",
// "h2o,state=MA,city=Boston other_temp=70.4 250",
// "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000"
.add_expr(col("state").eq(lit("MA")))
.add_expr(col("city").eq(lit("Boston")));
.with_expr(col("state").eq(lit("MA")))
.with_expr(col("city").eq(lit("Boston")));
let predicate = InfluxRpcPredicate::new_table("h2o", predicate);
let agg = Aggregate::Last;
@ -464,7 +464,7 @@ async fn test_grouped_series_set_plan_last_with_nulls() {
async fn test_grouped_series_set_plan_min() {
let predicate = Predicate::default()
// fiter out last row (ts 4000)
.timestamp_range(100, 3999);
.with_range(100, 3999);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Min;
@ -492,7 +492,7 @@ async fn test_grouped_series_set_plan_min() {
async fn test_grouped_series_set_plan_max() {
let predicate = Predicate::default()
// fiter out first row (ts 1000)
.timestamp_range(1001, 4001);
.with_range(1001, 4001);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Max;
@ -798,7 +798,7 @@ async fn test_grouped_series_set_plan_group_field_pred_and_null_fields() {
#[tokio::test]
async fn test_grouped_series_set_plan_group_field_pred_filter_on_field() {
// no predicate
let predicate = Predicate::default().add_expr(col("_field").eq(lit("reading")));
let predicate = Predicate::default().with_expr(col("_field").eq(lit("reading")));
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let agg = Aggregate::Count;
@ -831,8 +831,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_value() {
// no predicate
let predicate = Predicate::default()
// 2018-05-22T19:53:26Z, stop: 2018-05-24T00:00:00Z
.timestamp_range(1527018806000000000, 1527120000000000000)
.add_expr(col("_value").eq(lit(1.77)));
.with_range(1527018806000000000, 1527120000000000000)
.with_expr(col("_value").eq(lit(1.77)));
let predicate = InfluxRpcPredicate::new(None, predicate);
@ -860,8 +860,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_multiple_value(
// no predicate
let predicate = Predicate::default()
// 2018-05-22T19:53:26Z, stop: 2018-05-24T00:00:00Z
.timestamp_range(1527018806000000000, 1527120000000000000)
.add_expr(col("_value").eq(lit(1.77)).or(col("_value").eq(lit(1.72))));
.with_range(1527018806000000000, 1527120000000000000)
.with_expr(col("_value").eq(lit(1.77)).or(col("_value").eq(lit(1.72))));
let predicate = InfluxRpcPredicate::new(None, predicate);
@ -891,8 +891,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_value_sum() {
// no predicate
let predicate = Predicate::default()
// 2018-05-22T19:53:26Z, stop: 2018-05-24T00:00:00Z
.timestamp_range(1527018806000000000, 1527120000000000000)
.add_expr(col("_value").eq(lit(1.77)));
.with_range(1527018806000000000, 1527120000000000000)
.with_expr(col("_value").eq(lit(1.77)));
let predicate = InfluxRpcPredicate::new(None, predicate);

View File

@ -53,8 +53,8 @@ async fn run_read_window_aggregate_test_case<D>(
async fn test_read_window_aggregate_nanoseconds() {
let predicate = Predicate::default()
// city=Boston or city=LA
.add_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA"))))
.timestamp_range(100, 450);
.with_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA"))))
.with_range(100, 450);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Mean;
@ -82,12 +82,12 @@ async fn test_read_window_aggregate_nanoseconds() {
async fn test_read_window_aggregate_nanoseconds_measurement_pred() {
let predicate = Predicate::default()
// city=Cambridge OR (_measurement != 'other' AND city = LA)
.add_expr(
.with_expr(
col("city").eq(lit("Boston")).or(col("_measurement")
.not_eq(lit("other"))
.and(col("city").eq(lit("LA")))),
)
.timestamp_range(100, 450);
.with_range(100, 450);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Mean;
@ -113,7 +113,7 @@ async fn test_read_window_aggregate_nanoseconds_measurement_pred() {
#[tokio::test]
async fn test_read_window_aggregate_nanoseconds_measurement_count() {
// Expect that the type of `Count` is Integer
let predicate = Predicate::default().timestamp_range(100, 450);
let predicate = Predicate::default().with_range(100, 450);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Count;
@ -142,7 +142,7 @@ async fn test_read_window_aggregate_nanoseconds_measurement_count() {
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031);
.with_range(1609459201000000001, 1609459201000000031);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Min;
@ -174,7 +174,7 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697_with_delete() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031);
.with_range(1609459201000000001, 1609459201000000031);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Min;
@ -216,7 +216,7 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697_with_delet
async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031);
.with_range(1609459201000000001, 1609459201000000031);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
@ -250,8 +250,8 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
async fn test_grouped_series_set_plan_group_aggregate_filter_on_field() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031)
.add_expr(col("_field").eq(lit("foo")));
.with_range(1609459201000000001, 1609459201000000031)
.with_expr(col("_field").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
@ -280,7 +280,7 @@ async fn test_grouped_series_set_plan_group_aggregate_filter_on_field() {
async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697_with_delete() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031);
.with_range(1609459201000000001, 1609459201000000031);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
@ -322,7 +322,7 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697_with_delet
#[tokio::test]
async fn test_read_window_aggregate_overflow() {
let predicate = Predicate::default().timestamp_range(1609459201000000001, 1609459201000000024);
let predicate = Predicate::default().with_range(1609459201000000001, 1609459201000000024);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Max;

View File

@ -75,9 +75,9 @@ async fn list_table_names_no_non_null_data_passes() {
// only a single row with a null field passes this predicate (expect no table names)
let predicate = Predicate::default()
// only get last row of o2 (timestamp = 300)
.timestamp_range(200, 400)
.with_range(200, 400)
// model predicate like _field='reading' which last row does not have
.field_columns(vec!["reading"]);
.with_field_columns(vec!["reading"]);
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await;
@ -90,11 +90,11 @@ async fn list_table_names_no_non_null_general_data_passes() {
// force a generic plan
let predicate = Predicate::default()
// only get last row of o2 (timestamp = 300)
.timestamp_range(200, 400)
.with_range(200, 400)
// model predicate like _field='reading' which last row does not have
.field_columns(vec!["reading"])
.with_field_columns(vec!["reading"])
// (state = CA) OR (temp > 50)
.add_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50))));
.with_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50))));
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await;
@ -248,7 +248,7 @@ async fn list_table_names_max_time_greater_one() {
// make a single timestamp predicate between r1 and r2
fn tsp(r1: i64, r2: i64) -> InfluxRpcPredicate {
InfluxRpcPredicate::new(None, Predicate::default().timestamp_range(r1, r2))
InfluxRpcPredicate::new(None, Predicate::default().with_range(r1, r2))
}
fn to_stringset(v: &[&str]) -> StringSetRef {

View File

@ -57,7 +57,7 @@ async fn list_tag_columns_with_no_tags() {
)
.await;
let predicate = Predicate::default().timestamp_range(0, 1000);
let predicate = Predicate::default().with_range(0, 1000);
let predicate = InfluxRpcPredicate::new(None, predicate);
run_tag_keys_test_case(OneMeasurementNoTags {}, predicate, vec![]).await;
}
@ -77,7 +77,7 @@ async fn list_tag_columns_no_predicate() {
#[tokio::test]
async fn list_tag_columns_timestamp() {
let predicate = Predicate::default().timestamp_range(150, 201);
let predicate = Predicate::default().with_range(150, 201);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["city", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -85,7 +85,7 @@ async fn list_tag_columns_timestamp() {
#[tokio::test]
async fn list_tag_columns_predicate() {
let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["city", "county", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -94,8 +94,8 @@ async fn list_tag_columns_predicate() {
#[tokio::test]
async fn list_tag_columns_predicate_negative_nonexistent_column() {
let predicate = Predicate::default()
.add_expr(col("state").eq(lit("MA"))) // state=MA
.add_expr(col("host").not_eq(lit("server01"))); // nonexistent column with !=; always true
.with_expr(col("state").eq(lit("MA"))) // state=MA
.with_expr(col("host").not_eq(lit("server01"))); // nonexistent column with !=; always true
let predicate = InfluxRpcPredicate::new(None, predicate);
// This currently returns nothing, which is incorrect, it should return "city", "county",
// "state" because a nonexistent column is always not equal to anything.
@ -109,8 +109,8 @@ async fn list_tag_columns_measurement_pred() {
//
// "o2,state=NY,city=NYC temp=61.0 500",
let predicate = Predicate::default()
.timestamp_range(450, 550)
.add_expr(col("_measurement").eq(lit("o2"))); // _measurement=o2
.with_range(450, 550)
.with_expr(col("_measurement").eq(lit("o2"))); // _measurement=o2
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["city", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -119,8 +119,8 @@ async fn list_tag_columns_measurement_pred() {
#[tokio::test]
async fn list_tag_columns_timestamp_and_predicate() {
let predicate = Predicate::default()
.timestamp_range(150, 201)
.add_expr(col("state").eq(lit("MA"))); // state=MA
.with_range(150, 201)
.with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["city", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -135,7 +135,7 @@ async fn list_tag_columns_measurement_name() {
#[tokio::test]
async fn list_tag_columns_measurement_name_and_timestamp() {
let predicate = Predicate::default().timestamp_range(150, 201);
let predicate = Predicate::default().with_range(150, 201);
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec!["city", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -143,7 +143,7 @@ async fn list_tag_columns_measurement_name_and_timestamp() {
#[tokio::test]
async fn list_tag_columns_measurement_name_and_predicate() {
let predicate = Predicate::default().add_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = Predicate::default().with_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec!["borough", "city", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -152,8 +152,8 @@ async fn list_tag_columns_measurement_name_and_predicate() {
#[tokio::test]
async fn list_tag_columns_measurement_name_and_predicate_and_timestamp() {
let predicate = Predicate::default()
.timestamp_range(1, 550)
.add_expr(col("state").eq(lit("NY"))); // state=NY
.with_range(1, 550)
.with_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec!["city", "state"];
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
@ -162,8 +162,8 @@ async fn list_tag_columns_measurement_name_and_predicate_and_timestamp() {
#[tokio::test]
async fn list_tag_name_end_to_end() {
let predicate = Predicate::default()
.timestamp_range(0, 10000)
.add_expr(col("host").eq(lit("server01")));
.with_range(0, 10000)
.with_expr(col("host").eq(lit("server01")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["host", "name", "region"];
run_tag_keys_test_case(EndToEndTest {}, predicate, expected_tag_keys).await;
@ -172,8 +172,8 @@ async fn list_tag_name_end_to_end() {
#[tokio::test]
async fn list_tag_name_end_to_end_with_delete() {
let predicate = Predicate::default()
.timestamp_range(0, 10000)
.add_expr(col("host").eq(lit("server01")));
.with_range(0, 10000)
.with_expr(col("host").eq(lit("server01")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["host", "region"];
run_tag_keys_test_case(EndToEndTestWithDelete {}, predicate, expected_tag_keys).await;
@ -182,7 +182,7 @@ async fn list_tag_name_end_to_end_with_delete() {
#[tokio::test]
async fn list_tag_name_max_time() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME);
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["host"];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
@ -193,7 +193,7 @@ async fn list_tag_name_max_i64() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default()
// outside valid timestamp range
.timestamp_range(i64::MIN, i64::MAX);
.with_range(i64::MIN, i64::MAX);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["host"];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
@ -202,7 +202,7 @@ async fn list_tag_name_max_i64() {
#[tokio::test]
async fn list_tag_name_max_time_less_one() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); // one less than max timestamp
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); // one less than max timestamp
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec![];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
@ -211,7 +211,7 @@ async fn list_tag_name_max_time_less_one() {
#[tokio::test]
async fn list_tag_name_max_time_greater_one() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); // one more than min timestamp
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); // one more than min timestamp
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec![];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;

View File

@ -122,7 +122,7 @@ async fn list_tag_values_no_predicate_city_col() {
#[tokio::test]
async fn list_tag_values_timestamp_pred_state_col() {
let tag_name = "state";
let predicate = Predicate::default().timestamp_range(50, 201);
let predicate = Predicate::default().with_range(50, 201);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["CA", "MA"];
run_tag_values_test_case(
@ -137,7 +137,7 @@ async fn list_tag_values_timestamp_pred_state_col() {
#[tokio::test]
async fn list_tag_values_state_pred_state_col() {
let tag_name = "city";
let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["Boston"];
run_tag_values_test_case(
@ -153,8 +153,8 @@ async fn list_tag_values_state_pred_state_col() {
async fn list_tag_values_timestamp_and_state_pred_state_col() {
let tag_name = "state";
let predicate = Predicate::default()
.timestamp_range(150, 301)
.add_expr(col("state").eq(lit("MA"))); // state=MA
.with_range(150, 301)
.with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["MA"];
run_tag_values_test_case(
@ -197,7 +197,7 @@ async fn list_tag_values_table_pred_city_col() {
#[tokio::test]
async fn list_tag_values_table_and_timestamp_and_table_pred_state_col() {
let tag_name = "state";
let predicate = Predicate::default().timestamp_range(50, 201);
let predicate = Predicate::default().with_range(50, 201);
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec!["MA"];
run_tag_values_test_case(
@ -212,7 +212,7 @@ async fn list_tag_values_table_and_timestamp_and_table_pred_state_col() {
#[tokio::test]
async fn list_tag_values_table_and_state_pred_state_col() {
let tag_name = "state";
let predicate = Predicate::default().add_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = Predicate::default().with_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec!["NY"];
run_tag_values_test_case(
@ -228,8 +228,8 @@ async fn list_tag_values_table_and_state_pred_state_col() {
async fn list_tag_values_table_and_timestamp_and_state_pred_state_col() {
let tag_name = "state";
let predicate = Predicate::default()
.timestamp_range(1, 550)
.add_expr(col("state").eq(lit("NY"))); // state=NY
.with_range(1, 550)
.with_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec!["NY"];
run_tag_values_test_case(
@ -245,8 +245,8 @@ async fn list_tag_values_table_and_timestamp_and_state_pred_state_col() {
async fn list_tag_values_table_and_timestamp_and_state_pred_state_col_no_rows() {
let tag_name = "state";
let predicate = Predicate::default()
.timestamp_range(1, 300) // filters out the NY row
.add_expr(col("state").eq(lit("NY"))); // state=NY
.with_range(1, 300) // filters out the NY row
.with_expr(col("state").eq(lit("NY"))); // state=NY
let predicate = InfluxRpcPredicate::new_table("o2", predicate);
let expected_tag_keys = vec![];
@ -263,8 +263,8 @@ async fn list_tag_values_table_and_timestamp_and_state_pred_state_col_no_rows()
async fn list_tag_values_measurement_pred() {
let tag_name = "state";
let predicate = Predicate::default()
.timestamp_range(1, 600) // filters out the NY row
.add_expr(col("_measurement").not_eq(lit("o2")));
.with_range(1, 600) // filters out the NY row
.with_expr(col("_measurement").not_eq(lit("o2")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["CA", "MA"];
@ -281,11 +281,11 @@ async fn list_tag_values_measurement_pred() {
async fn list_tag_values_measurement_pred_and_or() {
let tag_name = "city";
let predicate = Predicate::default()
.timestamp_range(1, 600) // filters out the NY row
.with_range(1, 600) // filters out the NY row
// since there is an 'OR' in this predicate, can't answer
// with metadata alone
// _measurement = 'o2' OR temp > 70.0
.add_expr(
.with_expr(
col("_measurement")
.eq(lit("o2"))
.or(col("temp").gt(lit(70.0))),
@ -334,9 +334,9 @@ async fn list_tag_values_field_col_on_tag() {
async fn list_tag_values_field_col_does_not_exist() {
let tag_name = "state";
let predicate = Predicate::default()
.timestamp_range(0, 1000) // get all rows
.with_range(0, 1000) // get all rows
// since this field doesn't exist this predicate should match no values
.add_expr(col("_field").eq(lit("not_a_column")));
.with_expr(col("_field").eq(lit("not_a_column")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec![];
@ -353,9 +353,9 @@ async fn list_tag_values_field_col_does_not_exist() {
async fn list_tag_values_field_col_does_exist() {
let tag_name = "state";
let predicate = Predicate::default()
.timestamp_range(0, 1000) // get all rows
.with_range(0, 1000) // get all rows
// this field does exist (but only for rows with CA and MA, not NY)
.add_expr(col("_field").eq(lit("county")));
.with_expr(col("_field").eq(lit("county")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["MA", "CA"];

View File

@ -166,7 +166,7 @@ impl InfluxRpcPredicateBuilder {
/// Sets the timestamp range
pub fn set_range(mut self, range: Option<RPCTimestampRange>) -> Self {
if let Some(range) = range {
self.inner = self.inner.timestamp_range(range.start, range.end)
self.inner = self.inner.with_range(range.start, range.end)
}
self
}
@ -310,7 +310,7 @@ fn convert_simple_node(
// add the table names as a predicate
return Ok(builder.tables(value_list));
} else if tag_name.is_field() {
builder.inner = builder.inner.field_columns(value_list);
builder.inner = builder.inner.with_field_columns(value_list);
return Ok(builder);
}
}
@ -318,7 +318,7 @@ fn convert_simple_node(
// If no special case applies, fall back to generic conversion
let expr = convert_node_to_expr(node)?;
builder.inner = builder.inner.add_expr(expr);
builder.inner = builder.inner.with_expr(expr);
Ok(builder)
}

View File

@ -1610,7 +1610,7 @@ mod tests {
// also ensure the plumbing is hooked correctly and that the predicate made it
// down to the chunk
let expected_predicate = Predicate::default().timestamp_range(150, 200);
let expected_predicate = Predicate::default().with_range(150, 200);
fixture
.expect_predicates(
@ -1687,8 +1687,8 @@ mod tests {
// also ensure the plumbing is hooked correctly and that the predicate made it
// down to the chunk
let expected_predicate = Predicate::default()
.timestamp_range(150, 200)
.add_expr(make_state_ma_expr());
.with_range(150, 200)
.with_expr(make_state_ma_expr());
fixture
.expect_predicates(
@ -1790,8 +1790,8 @@ mod tests {
// also ensure the plumbing is hooked correctly and that the predicate made it
// down to the chunk
let expected_predicate = Predicate::default()
.timestamp_range(150, 200)
.add_expr(make_state_ma_expr());
.with_range(150, 200)
.with_expr(make_state_ma_expr());
fixture
.expect_predicates(
@ -2466,11 +2466,11 @@ mod tests {
// also ensure the plumbing is hooked correctly and that the predicate made it
// down to the chunk and it was normalized to namevalue
let expected_predicate = Predicate::default()
.timestamp_range(0, 10000)
.with_range(0, 10000)
// should NOT have CASE nonsense for handling empty strings as
// that should bave been optimized by the time it gets to
// the chunk
.add_expr(col("state").eq(lit("MA")));
.with_expr(col("state").eq(lit("MA")));
fixture
.expect_predicates(
@ -2529,11 +2529,11 @@ mod tests {
// also ensure the plumbing is hooked correctly and that the predicate made it
// down to the chunk and it was normalized to namevalue
let expected_predicate = Predicate::default()
.timestamp_range(0, 10000)
.with_range(0, 10000)
// comparison to empty string conversion results in a messier translation
// to handle backwards compatibility semantics
// #state IS NULL OR #state = Utf8("")
.add_expr(col("state").is_null().or(col("state").eq(lit(""))));
.with_expr(col("state").is_null().or(col("state").eq(lit(""))));
fixture
.expect_predicates(