diff --git a/generated_types/src/ingester.rs b/generated_types/src/ingester.rs index 553d8feb73..77709e4d5e 100644 --- a/generated_types/src/ingester.rs +++ b/generated_types/src/ingester.rs @@ -238,9 +238,9 @@ mod tests { #[test] fn query_round_trip() { let rust_predicate = predicate::Predicate::new() - .timestamp_range(1, 100) - .add_expr(col("foo")) - .add_value_expr(col("_value").eq(lit("bar")).try_into().unwrap()); + .with_range(1, 100) + .with_expr(col("foo")) + .with_value_expr(col("_value").eq(lit("bar")).try_into().unwrap()); let rust_query = IngesterQueryRequest::new( "mydb".into(), diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 6819c5a1b2..1c0ad5af07 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -400,7 +400,7 @@ mod tests { // tag1=VT let expr = col("tag1").eq(lit("VT")); - let pred = Predicate::default().add_expr(expr); + let pred = Predicate::default().with_expr(expr); let exc = Executor::new(1); let stream = query(&exc, batch, pred, selection).await.unwrap(); @@ -438,7 +438,7 @@ mod tests { // tag1=UT let expr = col("tag1").eq(lit("UT")); - let pred = Predicate::default().add_expr(expr); + let pred = Predicate::default().with_expr(expr); let exc = Executor::new(1); let stream = query(&exc, batch, pred, selection).await.unwrap(); @@ -534,7 +534,7 @@ mod tests { // read data from all scenarios, filter out column day, city Medford, time outside range [0, 42) request.columns = ["city", "temp", "time"].map(Into::into).into(); let expr = col("city").not_eq(lit("Medford")); - let pred = Predicate::default().add_expr(expr).timestamp_range(0, 42); + let pred = Predicate::default().with_expr(expr).with_range(0, 42); request.predicate = Some(pred); let expected = vec![ "+------------+------+--------------------------------+", @@ -651,7 +651,7 @@ mod tests { // read data from all scenarios, filter out column day, city Medford, time outside range [0, 42) request.columns = vec!["city".to_string(), "temp".to_string(), "time".to_string()]; let expr = col("city").not_eq(lit("Medford")); - let pred = Predicate::default().add_expr(expr).timestamp_range(0, 42); + let pred = Predicate::default().with_expr(expr).with_range(0, 42); request.predicate = Some(pred); let expected = vec![ "+------------+------+--------------------------------+", diff --git a/ingester/src/query.rs b/ingester/src/query.rs index ae0b7f9ee5..79b6c45f14 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -444,7 +444,7 @@ mod tests { let batch = make_queryable_batch("test_table", 1, batches); // tag1 = VT let expr = col("tag1").eq(lit("VT")); - let pred = Predicate::default().add_expr(expr); + let pred = Predicate::default().with_expr(expr); let stream = batch .read_filter(IOxSessionContext::default(), &pred, Selection::All) @@ -469,7 +469,7 @@ mod tests { let batch = make_queryable_batch("test_table", 1, batches); // foo = VT let expr = col("foo").eq(lit("VT")); // `foo` column not available - let pred = Predicate::default().add_expr(expr); + let pred = Predicate::default().with_expr(expr); let stream = batch .read_filter(IOxSessionContext::default(), &pred, Selection::All) @@ -489,7 +489,7 @@ mod tests { let batch = make_queryable_batch("test_table", 1, batches); // foo is NULL let expr = col("foo").is_null(); - let pred = Predicate::default().add_expr(expr); + let pred = Predicate::default().with_expr(expr); let stream = batch .read_filter(IOxSessionContext::default(), &pred, Selection::All) @@ -579,7 +579,7 @@ mod tests { // foo is NULL AND tag1=CT let expr = col("foo").is_null().and(col("tag1").eq(lit("CT"))); - let pred = Predicate::default().add_expr(expr); + let pred = Predicate::default().with_expr(expr); let stream = batch .read_filter(IOxSessionContext::default(), &pred, selection) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 6e01aa3d5f..5ec9c39d4f 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -1991,11 +1991,11 @@ mod tests { let expr = when(col("foo").is_null(), lit("")) .otherwise(col("foo")) .unwrap(); - let silly_predicate = Predicate::new().add_expr(expr.eq(lit("bar"))); + let silly_predicate = Predicate::new().with_expr(expr.eq(lit("bar"))); // verify that the predicate was rewritten to `foo = 'bar'` let expr = col("foo").eq(lit("bar")); - let expected_predicate = Predicate::new().add_expr(expr); + let expected_predicate = Predicate::new().with_expr(expr); run_test_with_predicate(&func, silly_predicate, expected_predicate).await; @@ -2004,13 +2004,13 @@ mod tests { // // https://github.com/influxdata/influxdb_iox/issues/3601 // _measurement = 'foo' - let silly_predicate = Predicate::new().add_expr(col("_measurement").eq(lit("foo"))); + let silly_predicate = Predicate::new().with_expr(col("_measurement").eq(lit("foo"))); // verify that the predicate was rewritten to `false` as the // measurement name is `h20` let expr = lit(false); - let expected_predicate = Predicate::new().add_expr(expr); + let expected_predicate = Predicate::new().with_expr(expr); run_test_with_predicate(&func, silly_predicate, expected_predicate).await; // ------------- Test 3 ---------------- @@ -2018,7 +2018,7 @@ mod tests { // // https://github.com/influxdata/influxdb_iox/issues/3601 // (_measurement = 'foo' or measurement = 'h2o') AND time > 5 - let silly_predicate = Predicate::new().add_expr( + let silly_predicate = Predicate::new().with_expr( col("_measurement") .eq(lit("foo")) .or(col("_measurement").eq(lit("h2o"))) @@ -2028,7 +2028,7 @@ mod tests { // verify that the predicate was rewritten to time > 5 let expr = col("time").gt(lit(5)); - let expected_predicate = Predicate::new().add_expr(expr); + let expected_predicate = Predicate::new().with_expr(expr); run_test_with_predicate(&func, silly_predicate, expected_predicate).await; } diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index f00968e423..ca60208084 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -251,7 +251,7 @@ impl TableProvider for ChunkTableProvider { // Note that `filters` don't actually need to be evaluated in // the scan for the plans to be correct, they are an extra // optimization for providers which can offer them - let predicate = Predicate::default().add_pushdown_exprs(filters); + let predicate = Predicate::default().with_pushdown_exprs(filters); // Now we have a second attempt to prune out chunks based on // metadata using the pushed down predicate (e.g. in SQL). diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index cf133845ff..add166ccea 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -258,7 +258,7 @@ mod test { Some(10.0), )); - let predicate = Predicate::new().add_expr(col("column1").gt(lit(100.0))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit(100.0))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); assert_eq!(observer.events(), vec!["chunk1: Pruned"]); @@ -278,7 +278,7 @@ mod test { Some(10), )); - let predicate = Predicate::new().add_expr(col("column1").gt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit(100))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -299,7 +299,7 @@ mod test { Some(10), )); - let predicate = Predicate::new().add_expr(col("column1").gt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit(100))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -320,7 +320,7 @@ mod test { Some(false), )); - let predicate = Predicate::new().add_expr(col("column1")); + let predicate = Predicate::new().with_expr(col("column1")); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -343,7 +343,7 @@ mod test { ), ); - let predicate = Predicate::new().add_expr(col("column1").gt(lit("z"))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit("z"))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -363,7 +363,7 @@ mod test { Some(10.0), )); - let predicate = Predicate::new().add_expr(col("column1").lt(lit(100.0))); + let predicate = Predicate::new().with_expr(col("column1").lt(lit(100.0))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); assert!(observer.events().is_empty()); @@ -383,7 +383,7 @@ mod test { Some(10), )); - let predicate = Predicate::new().add_expr(col("column1").lt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").lt(lit(100))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -404,7 +404,7 @@ mod test { Some(10), )); - let predicate = Predicate::new().add_expr(col("column1").lt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").lt(lit(100))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -425,7 +425,7 @@ mod test { Some(true), )); - let predicate = Predicate::new().add_expr(col("column1")); + let predicate = Predicate::new().with_expr(col("column1")); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -448,7 +448,7 @@ mod test { ), ); - let predicate = Predicate::new().add_expr(col("column1").lt(lit("z"))); + let predicate = Predicate::new().with_expr(col("column1").lt(lit("z"))); let pruned = prune_chunks(&observer, c1.schema(), vec![c1], &predicate); @@ -493,7 +493,7 @@ mod test { let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1")) as Arc; - let predicate = Predicate::new().add_expr(col("column1").gt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit(100))); let chunks = vec![c1, c2, c3, c4]; let schema = merge_schema(&chunks); @@ -550,7 +550,7 @@ mod test { Some(20), )) as Arc; - let predicate = Predicate::new().add_expr(col("column1").gt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit(100))); let chunks = vec![c1, c2, c3, c4, c5, c6]; let schema = merge_schema(&chunks); @@ -590,7 +590,7 @@ mod test { Some(4), )) as Arc; - let predicate = Predicate::new().add_expr(col("column1").gt(lit(100))); + let predicate = Predicate::new().with_expr(col("column1").gt(lit(100))); let chunks = vec![c1, c2, c3]; let schema = merge_schema(&chunks); @@ -644,7 +644,7 @@ mod test { ), ) as Arc; - let predicate = Predicate::new().add_expr( + let predicate = Predicate::new().with_expr( col("column1") .is_null() .not() @@ -709,7 +709,7 @@ mod test { ) as Arc; let predicate = - Predicate::new().add_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5)))); + Predicate::new().with_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5)))); let chunks = vec![c1, c2, c3, c4, c5, c6]; let schema = merge_schema(&chunks); diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index f329af8274..4d296ca5e9 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -53,8 +53,8 @@ pub const EMPTY_PREDICATE: Predicate = Predicate { /// use datafusion::logical_plan::{col, lit}; /// /// let p = Predicate::new() -/// .timestamp_range(1, 100) -/// .add_expr(col("foo").eq(lit(42))); +/// .with_range(1, 100) +/// .with_expr(col("foo").eq(lit(42))); /// /// assert_eq!( /// p.to_string(), @@ -214,7 +214,7 @@ impl Predicate { /// /// This is used in certain cases to retain compatibility with the /// existing storage engine - pub(crate) fn clear_timestamp_if_max_range(mut self) -> Self { + pub(crate) fn with_clear_timestamp_if_max_range(mut self) -> Self { self.range = self.range.take().and_then(|range| { if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME { None @@ -285,7 +285,7 @@ pub enum PredicateMatch { impl Predicate { /// Sets the timestamp range - pub fn timestamp_range(mut self, start: i64, end: i64) -> Self { + pub fn with_range(mut self, start: i64, end: i64) -> Self { // Without more thought, redefining the timestamp range would // lose the old range. Asser that that cannot happen. assert!( @@ -298,7 +298,7 @@ impl Predicate { } /// sets the optional timestamp range, if any - pub fn timestamp_range_option(mut self, range: Option) -> Self { + pub fn with_maybe_timestamp_range(mut self, range: Option) -> Self { // Without more thought, redefining the timestamp range would // lose the old range. Asser that that cannot happen. assert!( @@ -310,35 +310,36 @@ impl Predicate { } /// Adds an expression to the list of general purpose predicates - pub fn add_expr(mut self, expr: Expr) -> Self { + pub fn with_expr(mut self, expr: Expr) -> Self { self.exprs.push(expr); self } /// Adds a ValueExpr to the list of value expressons - pub fn add_value_expr(mut self, value_expr: ValueExpr) -> Self { + pub fn with_value_expr(mut self, value_expr: ValueExpr) -> Self { self.value_expr.push(value_expr); self } /// Builds a regex matching expression from the provided column name and /// pattern. Values not matching the regex will be filtered out. - pub fn build_regex_match_expr(mut self, column: &str, pattern: impl Into) -> Self { + pub fn with_regex_match_expr(self, column: &str, pattern: impl Into) -> Self { let expr = query_functions::regex_match_expr(col(column), pattern.into()); - self.exprs.push(expr); - self + self.with_expr(expr) } /// Builds a regex "not matching" expression from the provided column name /// and pattern. Values *matching* the regex will be filtered out. - pub fn build_regex_not_match_expr(mut self, column: &str, pattern: impl Into) -> Self { + pub fn with_regex_not_match_expr(self, column: &str, pattern: impl Into) -> Self { let expr = query_functions::regex_not_match_expr(col(column), pattern.into()); - self.exprs.push(expr); - self + self.with_expr(expr) } /// Sets field_column restriction - pub fn field_columns(mut self, columns: impl IntoIterator>) -> Self { + pub fn with_field_columns( + mut self, + columns: impl IntoIterator>, + ) -> Self { // We need to distinguish predicates like `column_name In // (foo, bar)` and `column_name = foo and column_name = bar` in order to handle // this @@ -356,7 +357,7 @@ impl Predicate { } /// Set the partition key restriction - pub fn partition_key(mut self, partition_key: impl Into) -> Self { + pub fn with_partition_key(mut self, partition_key: impl Into) -> Self { assert!( self.partition_key.is_none(), "multiple partition key predicates not suported" @@ -367,7 +368,7 @@ impl Predicate { /// Adds only the expressions from `filters` that can be pushed down to /// execution engines. - pub fn add_pushdown_exprs(mut self, filters: &[Expr]) -> Self { + pub fn with_pushdown_exprs(mut self, filters: &[Expr]) -> Self { // For each expression of the filters, recursively split it, if it is is an AND conjunction // For example, expression (x AND y) will be split into a vector of 2 expressions [x, y] let mut exprs = vec![]; @@ -502,7 +503,7 @@ mod tests { #[test] fn test_non_default_predicate_is_not_empty() { - let p = Predicate::new().timestamp_range(1, 100); + let p = Predicate::new().with_range(1, 100); assert!(!p.is_empty()); } @@ -582,7 +583,7 @@ mod tests { println!(" --------------- Filters: {:#?}", filters); // Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city] - let predicate = Predicate::default().add_pushdown_exprs(&filters); + let predicate = Predicate::default().with_pushdown_exprs(&filters); println!(" ------------- Predicates: {:#?}", predicate); assert_eq!(predicate.exprs.len(), 8); @@ -598,7 +599,7 @@ mod tests { #[test] fn predicate_display_ts() { // TODO make this a doc example? - let p = Predicate::new().timestamp_range(1, 100); + let p = Predicate::new().with_range(1, 100); assert_eq!(p.to_string(), "Predicate range: [1 - 100]"); } @@ -606,8 +607,8 @@ mod tests { #[test] fn predicate_display_ts_and_expr() { let p = Predicate::new() - .timestamp_range(1, 100) - .add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11)))); + .with_range(1, 100) + .with_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11)))); assert_eq!( p.to_string(), @@ -618,10 +619,10 @@ mod tests { #[test] fn predicate_display_full() { let p = Predicate::new() - .timestamp_range(1, 100) - .add_expr(col("foo").eq(lit(42))) - .field_columns(vec!["f1", "f2"]) - .partition_key("the_key"); + .with_range(1, 100) + .with_expr(col("foo").eq(lit(42))) + .with_field_columns(vec!["f1", "f2"]) + .with_partition_key("the_key"); assert_eq!(p.to_string(), "Predicate field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]"); } @@ -629,47 +630,47 @@ mod tests { #[test] fn test_clear_timestamp_if_max_range_out_of_range() { let p = Predicate::new() - .timestamp_range(1, 100) - .add_expr(col("foo").eq(lit(42))); + .with_range(1, 100) + .with_expr(col("foo").eq(lit(42))); let expected = p.clone(); // no rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); + assert_eq!(p.with_clear_timestamp_if_max_range(), expected); } #[test] fn test_clear_timestamp_if_max_range_out_of_range_low() { let p = Predicate::new() - .timestamp_range(MIN_NANO_TIME, 100) - .add_expr(col("foo").eq(lit(42))); + .with_range(MIN_NANO_TIME, 100) + .with_expr(col("foo").eq(lit(42))); let expected = p.clone(); // no rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); + assert_eq!(p.with_clear_timestamp_if_max_range(), expected); } #[test] fn test_clear_timestamp_if_max_range_out_of_range_high() { let p = Predicate::new() - .timestamp_range(0, MAX_NANO_TIME) - .add_expr(col("foo").eq(lit(42))); + .with_range(0, MAX_NANO_TIME) + .with_expr(col("foo").eq(lit(42))); let expected = p.clone(); // no rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); + assert_eq!(p.with_clear_timestamp_if_max_range(), expected); } #[test] fn test_clear_timestamp_if_max_range_in_range() { let p = Predicate::new() - .timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME) - .add_expr(col("foo").eq(lit(42))); + .with_range(MIN_NANO_TIME, MAX_NANO_TIME) + .with_expr(col("foo").eq(lit(42))); - let expected = Predicate::new().add_expr(col("foo").eq(lit(42))); + let expected = Predicate::new().with_expr(col("foo").eq(lit(42))); // rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); + assert_eq!(p.with_clear_timestamp_if_max_range(), expected); } } diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index 62bff30d96..968f0a394d 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -85,7 +85,7 @@ impl InfluxRpcPredicate { /// existing storage engine pub fn clear_timestamp_if_max_range(self) -> Self { Self { - inner: self.inner.clear_timestamp_if_max_range(), + inner: self.inner.with_clear_timestamp_if_max_range(), ..self } } @@ -206,7 +206,7 @@ fn normalize_predicate( let predicate = match field_projections.into_projection() { FieldProjection::None => predicate, FieldProjection::Include(include_field_names) => { - predicate.field_columns(include_field_names) + predicate.with_field_columns(include_field_names) } FieldProjection::Exclude(exclude_field_names) => { // if we don't have the schema, it means the table doesn't exist so we can safely ignore @@ -222,7 +222,7 @@ fn normalize_predicate( } }); - predicate.field_columns(new_fields) + predicate.with_field_columns(new_fields) } else { predicate } @@ -302,13 +302,13 @@ mod tests { let predicate = normalize_predicate( "table", Some(schema()), - &Predicate::new().add_expr(col("_field").eq(lit("f1"))), + &Predicate::new().with_expr(col("_field").eq(lit("f1"))), ) .unwrap(); let expected = Predicate::new() - .field_columns(vec!["f1"]) - .add_expr(lit(true)); + .with_field_columns(vec!["f1"]) + .with_expr(lit(true)); assert_eq!(predicate, expected); } @@ -318,13 +318,14 @@ mod tests { let predicate = normalize_predicate( "table", Some(schema()), - &Predicate::new().add_expr(col("_field").eq(lit("f1")).or(col("_field").eq(lit("f2")))), + &Predicate::new() + .with_expr(col("_field").eq(lit("f1")).or(col("_field").eq(lit("f2")))), ) .unwrap(); let expected = Predicate::new() - .field_columns(vec!["f1", "f2"]) - .add_expr(lit(true)); + .with_field_columns(vec!["f1", "f2"]) + .with_expr(lit(true)); assert_eq!(predicate, expected); } @@ -336,7 +337,7 @@ mod tests { Some(schema()), &Predicate::new() // predicate is connected with `and` which is not supported - .add_expr(col("_field").eq(lit("f1")).and(col("_field").eq(lit("f2")))), + .with_expr(col("_field").eq(lit("f1")).and(col("_field").eq(lit("f2")))), ) .unwrap_err(); @@ -350,13 +351,13 @@ mod tests { let predicate = normalize_predicate( "table", Some(schema()), - &Predicate::new().add_expr(col("_field").not_eq(lit("f1"))), + &Predicate::new().with_expr(col("_field").not_eq(lit("f1"))), ) .unwrap(); let expected = Predicate::new() - .field_columns(vec!["f2"]) - .add_expr(lit(true)); + .with_field_columns(vec!["f2"]) + .with_expr(lit(true)); assert_eq!(predicate, expected); } @@ -368,8 +369,8 @@ mod tests { Some(schema()), &Predicate::new() // put = and != predicates in *different* exprs - .add_expr(col("_field").eq(lit("f1"))) - .add_expr(col("_field").not_eq(lit("f2"))), + .with_expr(col("_field").eq(lit("f1"))) + .with_expr(col("_field").not_eq(lit("f2"))), ) .unwrap_err(); diff --git a/querier/src/table/query_access.rs b/querier/src/table/query_access.rs index 3747ef2aa7..eb6b71b50a 100644 --- a/querier/src/table/query_access.rs +++ b/querier/src/table/query_access.rs @@ -48,7 +48,7 @@ impl TableProvider for QuerierTable { let predicate = filters .iter() - .fold(Predicate::new(), |b, expr| b.add_expr(expr.clone())); + .fold(Predicate::new(), |b, expr| b.with_expr(expr.clone())); let chunks = self .chunks(&predicate) diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index f3bb89a604..a0341276b8 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -50,7 +50,7 @@ async fn run_field_columns_test_case( #[tokio::test] async fn test_field_columns_no_predicate() { - let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA + let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new_table("NoSuchTable", predicate); let expected_fields = FieldList::default(); run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await; @@ -61,7 +61,7 @@ async fn test_field_columns_no_predicate() { #[tokio::test] async fn test_field_columns_with_pred() { // get only fields from h20 (but both chunks) - let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA + let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new_table("h2o", predicate); let expected_fields = FieldList { @@ -90,7 +90,7 @@ async fn test_field_columns_with_pred() { #[tokio::test] async fn test_field_columns_measurement_pred() { // get only fields from h2o using a _measurement predicate - let predicate = Predicate::default().add_expr(col("_measurement").eq(lit("h2o"))); + let predicate = Predicate::default().with_expr(col("_measurement").eq(lit("h2o"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { @@ -119,8 +119,8 @@ async fn test_field_columns_measurement_pred() { #[tokio::test] async fn test_field_columns_with_ts_pred() { let predicate = Predicate::default() - .timestamp_range(200, 300) - .add_expr(col("state").eq(lit("MA"))); // state=MA + .with_range(200, 300) + .with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new_table("h2o", predicate); let expected_fields = FieldList { @@ -138,7 +138,7 @@ async fn test_field_columns_with_ts_pred() { async fn test_field_name_plan() { test_helpers::maybe_start_logging(); - let predicate = Predicate::default().timestamp_range(0, 2000); + let predicate = Predicate::default().with_range(0, 2000); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { @@ -173,7 +173,7 @@ async fn test_field_name_plan() { async fn test_field_name_plan_with_delete() { test_helpers::maybe_start_logging(); - let predicate = Predicate::default().timestamp_range(0, 2000); + let predicate = Predicate::default().with_range(0, 2000); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { @@ -206,7 +206,7 @@ async fn test_field_name_plan_with_delete() { #[tokio::test] async fn list_field_columns_max_time() { - let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME); + let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { @@ -222,7 +222,7 @@ async fn list_field_columns_max_time() { #[tokio::test] async fn list_field_columns_max_i64() { - let predicate = Predicate::default().timestamp_range(i64::MIN, i64::MAX); + let predicate = Predicate::default().with_range(i64::MIN, i64::MAX); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { @@ -240,7 +240,7 @@ async fn list_field_columns_max_i64() { async fn list_field_columns_max_time_less_one() { let predicate = Predicate::default() // one less than max timestamp - .timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); + .with_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { fields: vec![] }; @@ -250,7 +250,7 @@ async fn list_field_columns_max_time_less_one() { #[tokio::test] async fn list_field_columns_max_time_greater_one() { - let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); + let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_fields = FieldList { fields: vec![] }; diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 9313e447c7..c44a94f751 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -115,7 +115,7 @@ async fn test_read_filter_data_exclusive_predicate() { let predicate = Predicate::new() // should not return the 350 row as predicate is // range.start <= ts < range.end - .timestamp_range(349, 350); + .with_range(349, 350); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![]; @@ -127,7 +127,7 @@ async fn test_read_filter_data_exclusive_predicate() { async fn test_read_filter_data_inclusive_predicate() { let predicate = Predicate::new() // should return 350 row! - .timestamp_range(350, 351); + .with_range(350, 351); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -141,7 +141,7 @@ async fn test_read_filter_data_inclusive_predicate() { async fn test_read_filter_data_exact_predicate() { let predicate = Predicate::new() // should return 250 rows! - .timestamp_range(250, 251); + .with_range(250, 251); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -157,7 +157,7 @@ async fn test_read_filter_data_exact_predicate() { async fn test_read_filter_data_tag_predicate() { let predicate = Predicate::new() // region = region - .add_expr(col("region").eq(col("region"))); + .with_expr(col("region").eq(col("region"))); let predicate = InfluxRpcPredicate::new(None, predicate); // expect both series to be returned @@ -173,7 +173,7 @@ async fn test_read_filter_data_tag_predicate() { async fn test_read_filter_invalid_predicate() { let predicate = Predicate::new() // region > 5 (region is a tag(string) column, so this predicate is invalid) - .add_expr(col("region").gt(lit(5i32))); + .with_expr(col("region").gt(lit(5i32))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_error = "Error during planning: 'Dictionary(Int32, Utf8) > Int32' can't be evaluated because there isn't a common type to coerce the types to"; @@ -187,7 +187,7 @@ async fn test_read_filter_invalid_predicate_case() { // https://github.com/influxdata/influxdb_iox/issues/3635 // model what happens when a field is treated like a tag // CASE WHEN system" IS NULL THEN '' ELSE system END = 5; - .add_expr(make_empty_tag_ref_expr("system").eq(lit(5i32))); + .with_expr(make_empty_tag_ref_expr("system").eq(lit(5i32))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Int32' can't be evaluated because there isn't a common type to coerce the types to"; @@ -200,7 +200,7 @@ async fn test_read_filter_unknown_column_in_predicate() { let predicate = Predicate::new() // mystery_region is not a real column, so this predicate is // invalid but IOx should be able to handle it (and produce no results) - .add_expr( + .with_expr( col("baz") .eq(lit(4i32)) .or(col("bar").and(col("mystery_region").gt(lit(5i32)))), @@ -250,8 +250,8 @@ async fn test_read_filter_data_no_pred_with_delete_all() { async fn test_read_filter_data_filter() { // filter out one row in h20 let predicate = Predicate::default() - .timestamp_range(200, 300) - .add_expr(col("state").eq(lit("CA"))); // state=CA + .with_range(200, 300) + .with_expr(col("state").eq(lit("CA"))); // state=CA let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -267,8 +267,8 @@ async fn test_read_filter_data_filter() { // Same results via a != predicate. let predicate = Predicate::default() - .timestamp_range(200, 300) - .add_expr(col("state").not_eq(lit("MA"))); // state=CA + .with_range(200, 300) + .with_expr(col("state").not_eq(lit("MA"))); // state=CA let predicate = InfluxRpcPredicate::new(None, predicate); run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await; @@ -278,8 +278,8 @@ async fn test_read_filter_data_filter() { async fn test_read_filter_data_filter_with_delete() { // filter out one row in h20 but the leftover row was deleted to nothing will be returned let predicate = Predicate::default() - .timestamp_range(200, 300) - .add_expr(col("state").eq(lit("CA"))); // state=CA + .with_range(200, 300) + .with_expr(col("state").eq(lit("CA"))); // state=CA let predicate = InfluxRpcPredicate::new(None, predicate); @@ -294,8 +294,8 @@ async fn test_read_filter_data_filter_with_delete() { // Same results via a != predicate. let predicate = Predicate::default() - .timestamp_range(200, 300) - .add_expr(col("state").not_eq(lit("MA"))); // state=CA + .with_range(200, 300) + .with_expr(col("state").not_eq(lit("MA"))); // state=CA let predicate = InfluxRpcPredicate::new(None, predicate); @@ -308,9 +308,9 @@ async fn test_read_filter_data_filter_with_delete() { // Use different predicate to have data returned let predicate = Predicate::default() - .timestamp_range(100, 300) - .add_expr(col("state").eq(lit("MA"))) // state=MA - .add_expr(col("_measurement").eq(lit("h2o"))); + .with_range(100, 300) + .with_expr(col("state").eq(lit("MA"))) // state=MA + .with_expr(col("_measurement").eq(lit("h2o"))); let predicate = InfluxRpcPredicate::new(None, predicate); @@ -330,8 +330,8 @@ async fn test_read_filter_data_filter_with_delete() { async fn test_read_filter_data_filter_fields() { // filter out one row in h20 let predicate = Predicate::default() - .field_columns(vec!["other_temp"]) - .add_expr(col("state").eq(lit("CA"))); // state=CA + .with_field_columns(vec!["other_temp"]) + .with_expr(col("state").eq(lit("CA"))); // state=CA let predicate = InfluxRpcPredicate::new(None, predicate); @@ -349,8 +349,8 @@ async fn test_read_filter_data_filter_fields() { async fn test_read_filter_data_filter_measurement_pred() { // use an expr on table name to pick just the last row from o2 let predicate = Predicate::default() - .timestamp_range(200, 400) - .add_expr(col("_measurement").eq(lit("o2"))); + .with_range(200, 400) + .with_expr(col("_measurement").eq(lit("o2"))); let predicate = InfluxRpcPredicate::new(None, predicate); // Only expect other_temp in this location @@ -363,7 +363,7 @@ async fn test_read_filter_data_filter_measurement_pred() { #[tokio::test] async fn test_read_filter_data_pred_refers_to_non_existent_column() { - let predicate = Predicate::default().add_expr(col("tag_not_in_h20").eq(lit("foo"))); + let predicate = Predicate::default().with_expr(col("tag_not_in_h20").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![] as Vec<&str>; @@ -373,7 +373,7 @@ async fn test_read_filter_data_pred_refers_to_non_existent_column() { #[tokio::test] async fn test_read_filter_data_pred_refers_to_non_existent_column_with_delete() { - let predicate = Predicate::default().add_expr(col("tag_not_in_h20").eq(lit("foo"))); + let predicate = Predicate::default().with_expr(col("tag_not_in_h20").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![] as Vec<&str>; @@ -384,7 +384,7 @@ async fn test_read_filter_data_pred_refers_to_non_existent_column_with_delete() #[tokio::test] async fn test_read_filter_data_pred_no_columns() { // predicate with no columns, - let predicate = Predicate::default().add_expr(lit("foo").eq(lit("foo"))); + let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -398,7 +398,7 @@ async fn test_read_filter_data_pred_no_columns() { #[tokio::test] async fn test_read_filter_data_pred_no_columns_with_delete() { // predicate with no columns, - let predicate = Predicate::default().add_expr(lit("foo").eq(lit("foo"))); + let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -412,7 +412,7 @@ async fn test_read_filter_data_pred_no_columns_with_delete() { #[tokio::test] async fn test_read_filter_data_pred_no_columns_with_delete_all() { // predicate with no columns, - let predicate = Predicate::default().add_expr(lit("foo").eq(lit("foo"))); + let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); // Only table disk has no deleted data @@ -427,8 +427,8 @@ async fn test_read_filter_data_pred_no_columns_with_delete_all() { async fn test_read_filter_data_pred_refers_to_good_and_non_existent_columns() { // predicate with both a column that does and does not appear let predicate = Predicate::default() - .add_expr(col("state").eq(lit("MA"))) - .add_expr(col("tag_not_in_h20").eq(lit("foo"))); + .with_expr(col("state").eq(lit("MA"))) + .with_expr(col("tag_not_in_h20").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![] as Vec<&str>; @@ -451,9 +451,9 @@ async fn test_read_filter_data_pred_refers_to_good_and_non_existent_columns() { #[tokio::test] async fn test_read_filter_data_pred_using_regex_match() { let predicate = Predicate::default() - .timestamp_range(200, 300) + .with_range(200, 300) // will match CA state - .build_regex_match_expr("state", "C.*"); + .with_regex_match_expr("state", "C.*"); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -466,9 +466,9 @@ async fn test_read_filter_data_pred_using_regex_match() { #[tokio::test] async fn test_read_filter_data_pred_using_regex_match_with_delete() { let predicate = Predicate::default() - .timestamp_range(200, 300) + .with_range(200, 300) // will match CA state - .build_regex_match_expr("state", "C.*"); + .with_regex_match_expr("state", "C.*"); let predicate = InfluxRpcPredicate::new(None, predicate); // the selected row was soft deleted @@ -482,9 +482,9 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() { // Different predicate to have data returned let predicate = Predicate::default() - .timestamp_range(200, 400) + .with_range(200, 400) // will match CA state - .build_regex_match_expr("state", "C.*"); + .with_regex_match_expr("state", "C.*"); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -510,9 +510,9 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() { #[tokio::test] async fn test_read_filter_data_pred_using_regex_not_match() { let predicate = Predicate::default() - .timestamp_range(200, 300) + .with_range(200, 300) // will filter out any rows with a state that matches "CA" - .build_regex_not_match_expr("state", "C.*"); + .with_regex_not_match_expr("state", "C.*"); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -528,7 +528,7 @@ async fn test_read_filter_data_pred_using_regex_not_match() { async fn test_read_filter_data_pred_regex_escape() { let predicate = Predicate::default() // Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url =~ /https\:\/\/influxdb\.com/`, - .build_regex_match_expr("url", r#"https\://influxdb\.com"#); + .with_regex_match_expr("url", r#"https\://influxdb\.com"#); let predicate = InfluxRpcPredicate::new(None, predicate); // expect one series with influxdb.com @@ -543,7 +543,7 @@ async fn test_read_filter_data_pred_regex_escape() { async fn test_read_filter_data_pred_not_match_regex_escape() { let predicate = Predicate::default() // Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url !~ /https\:\/\/influxdb\.com/`, - .build_regex_not_match_expr("url", r#"https\://influxdb\.com"#); + .with_regex_not_match_expr("url", r#"https\://influxdb\.com"#); let predicate = InfluxRpcPredicate::new(None, predicate); // expect one series with influxdb.com @@ -563,7 +563,7 @@ async fn test_read_filter_data_pred_unsupported_in_scan() { // (STATE = 'CA') OR (READING > 0) let predicate = - Predicate::default().add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0)))); + Predicate::default().with_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0)))); let predicate = InfluxRpcPredicate::new(None, predicate); // Note these results include data from both o2 and h2o @@ -585,7 +585,7 @@ async fn test_read_filter_data_pred_unsupported_in_scan_with_delete() { // (STATE = 'CA') OR (READING > 0) let predicate = - Predicate::default().add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0)))); + Predicate::default().with_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0)))); let predicate = InfluxRpcPredicate::new(None, predicate); // Note these results include data from both o2 and h2o @@ -659,8 +659,8 @@ async fn test_read_filter_filter_on_value() { test_helpers::maybe_start_logging(); let predicate = Predicate::default() - .add_expr(col("_value").eq(lit(1.77))) - .add_expr(col("_field").eq(lit("load4"))); + .with_expr(col("_value").eq(lit(1.77))) + .with_expr(col("_field").eq(lit("load4"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -677,7 +677,7 @@ async fn test_read_filter_on_field() { // Predicate should pick 'temp' field from h2o // (_field = 'temp') let p1 = col("_field").eq(lit("temp")); - let predicate = Predicate::default().add_expr(p1); + let predicate = Predicate::default().with_expr(p1); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -696,7 +696,7 @@ async fn test_read_filter_on_not_field() { // Predicate should pick up all fields other than 'temp' from h2o // (_field != 'temp') let p1 = col("_field").not_eq(lit("temp")); - let predicate = Predicate::default().add_expr(p1); + let predicate = Predicate::default().with_expr(p1); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -718,7 +718,7 @@ async fn test_read_filter_unsupported_predicate() { let p1 = col("_field") .not_eq(lit("temp")) .or(col("_field").eq(lit("other_temp"))); - let predicate = Predicate::default().add_expr(p1); + let predicate = Predicate::default().with_expr(p1); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_error = "Unsupported _field predicate"; @@ -735,7 +735,7 @@ async fn test_read_filter_on_field_single_measurement() { let p1 = col("_field") .eq(lit("temp")) .and(col("_measurement").eq(lit("h2o"))); - let predicate = Predicate::default().add_expr(p1); + let predicate = Predicate::default().with_expr(p1); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -752,7 +752,7 @@ async fn test_read_filter_multi_negation() { let host = make_empty_tag_ref_expr("host"); let p1 = host.clone().eq(lit("server01")).or(host.eq(lit(""))); - let predicate = Predicate::default().add_expr(p1); + let predicate = Predicate::default().with_expr(p1); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_results = vec![ @@ -781,7 +781,7 @@ async fn test_read_filter_on_field_multi_measurement() { let p2 = col("_field") .eq(lit("temp")) .and(col("_measurement").eq(lit("o2"))); - let predicate = Predicate::default().add_expr(p1.or(p2)); + let predicate = Predicate::default().with_expr(p1.or(p2)); let predicate = InfluxRpcPredicate::new(None, predicate); // SHOULD NOT contain temp from h2o diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 836eaf7e14..568de3af2f 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -165,8 +165,8 @@ async fn test_read_group_data_no_tag_columns_min_with_delete_all() { #[tokio::test] async fn test_read_group_data_pred() { let predicate = Predicate::default() - .add_expr(col("city").eq(lit("LA"))) - .timestamp_range(190, 210); + .with_expr(col("city").eq(lit("LA"))) + .with_range(190, 210); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Sum; let group_columns = vec!["state"]; @@ -188,7 +188,7 @@ async fn test_read_group_data_pred() { #[tokio::test] async fn test_read_group_data_field_restriction() { // restrict to only the temp column - let predicate = Predicate::default().field_columns(vec!["temp"]); + let predicate = Predicate::default().with_field_columns(vec!["temp"]); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Sum; let group_columns = vec!["state"]; @@ -213,13 +213,13 @@ async fn test_read_group_data_field_restriction() { async fn test_grouped_series_set_plan_sum() { let predicate = Predicate::default() // city=Boston OR city=Cambridge (filters out LA rows) - .add_expr( + .with_expr( col("city") .eq(lit("Boston")) .or(col("city").eq(lit("Cambridge"))), ) // fiter out first Cambridge row - .timestamp_range(100, 1000); + .with_range(100, 1000); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Sum; @@ -247,13 +247,13 @@ async fn test_grouped_series_set_plan_sum() { async fn test_grouped_series_set_plan_count() { let predicate = Predicate::default() // city=Boston OR city=Cambridge (filters out LA rows) - .add_expr( + .with_expr( col("city") .eq(lit("Boston")) .or(col("city").eq(lit("Cambridge"))), ) // fiter out first Cambridge row - .timestamp_range(100, 1000); + .with_range(100, 1000); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Count; @@ -281,13 +281,13 @@ async fn test_grouped_series_set_plan_count() { async fn test_grouped_series_set_plan_mean() { let predicate = Predicate::default() // city=Boston OR city=Cambridge (filters out LA rows) - .add_expr( + .with_expr( col("city") .eq(lit("Boston")) .or(col("city").eq(lit("Cambridge"))), ) // fiter out first Cambridge row - .timestamp_range(100, 1000); + .with_range(100, 1000); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Mean; @@ -313,7 +313,7 @@ async fn test_grouped_series_set_plan_mean() { async fn test_grouped_series_set_plan_count_measurement_pred() { let predicate = Predicate::default() // city = 'Boston' OR (_measurement = o2) - .add_expr( + .with_expr( col("city") .eq(lit("Boston")) .or(col("_measurement").eq(lit("o2"))), @@ -344,7 +344,7 @@ async fn test_grouped_series_set_plan_count_measurement_pred() { async fn test_grouped_series_set_plan_first() { let predicate = Predicate::default() // fiter out first row (ts 1000) - .timestamp_range(1001, 4001); + .with_range(1001, 4001); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::First; @@ -375,8 +375,8 @@ async fn test_grouped_series_set_plan_first_with_nulls() { // "h2o,state=MA,city=Boston temp=70.4 50", // "h2o,state=MA,city=Boston other_temp=70.4 250", // "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000" - .add_expr(col("state").eq(lit("MA"))) - .add_expr(col("city").eq(lit("Boston"))); + .with_expr(col("state").eq(lit("MA"))) + .with_expr(col("city").eq(lit("Boston"))); let predicate = InfluxRpcPredicate::new_table("h2o", predicate); let agg = Aggregate::First; @@ -404,7 +404,7 @@ async fn test_grouped_series_set_plan_first_with_nulls() { async fn test_grouped_series_set_plan_last() { let predicate = Predicate::default() // fiter out last row (ts 4000) - .timestamp_range(100, 3999); + .with_range(100, 3999); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Last; @@ -435,8 +435,8 @@ async fn test_grouped_series_set_plan_last_with_nulls() { // "h2o,state=MA,city=Boston temp=70.4 50", // "h2o,state=MA,city=Boston other_temp=70.4 250", // "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000" - .add_expr(col("state").eq(lit("MA"))) - .add_expr(col("city").eq(lit("Boston"))); + .with_expr(col("state").eq(lit("MA"))) + .with_expr(col("city").eq(lit("Boston"))); let predicate = InfluxRpcPredicate::new_table("h2o", predicate); let agg = Aggregate::Last; @@ -464,7 +464,7 @@ async fn test_grouped_series_set_plan_last_with_nulls() { async fn test_grouped_series_set_plan_min() { let predicate = Predicate::default() // fiter out last row (ts 4000) - .timestamp_range(100, 3999); + .with_range(100, 3999); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Min; @@ -492,7 +492,7 @@ async fn test_grouped_series_set_plan_min() { async fn test_grouped_series_set_plan_max() { let predicate = Predicate::default() // fiter out first row (ts 1000) - .timestamp_range(1001, 4001); + .with_range(1001, 4001); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Max; @@ -798,7 +798,7 @@ async fn test_grouped_series_set_plan_group_field_pred_and_null_fields() { #[tokio::test] async fn test_grouped_series_set_plan_group_field_pred_filter_on_field() { // no predicate - let predicate = Predicate::default().add_expr(col("_field").eq(lit("reading"))); + let predicate = Predicate::default().with_expr(col("_field").eq(lit("reading"))); let predicate = InfluxRpcPredicate::new_table("o2", predicate); let agg = Aggregate::Count; @@ -831,8 +831,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_value() { // no predicate let predicate = Predicate::default() // 2018-05-22T19:53:26Z, stop: 2018-05-24T00:00:00Z - .timestamp_range(1527018806000000000, 1527120000000000000) - .add_expr(col("_value").eq(lit(1.77))); + .with_range(1527018806000000000, 1527120000000000000) + .with_expr(col("_value").eq(lit(1.77))); let predicate = InfluxRpcPredicate::new(None, predicate); @@ -860,8 +860,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_multiple_value( // no predicate let predicate = Predicate::default() // 2018-05-22T19:53:26Z, stop: 2018-05-24T00:00:00Z - .timestamp_range(1527018806000000000, 1527120000000000000) - .add_expr(col("_value").eq(lit(1.77)).or(col("_value").eq(lit(1.72)))); + .with_range(1527018806000000000, 1527120000000000000) + .with_expr(col("_value").eq(lit(1.77)).or(col("_value").eq(lit(1.72)))); let predicate = InfluxRpcPredicate::new(None, predicate); @@ -891,8 +891,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_value_sum() { // no predicate let predicate = Predicate::default() // 2018-05-22T19:53:26Z, stop: 2018-05-24T00:00:00Z - .timestamp_range(1527018806000000000, 1527120000000000000) - .add_expr(col("_value").eq(lit(1.77))); + .with_range(1527018806000000000, 1527120000000000000) + .with_expr(col("_value").eq(lit(1.77))); let predicate = InfluxRpcPredicate::new(None, predicate); diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index fa34d54bef..7b26fe54ee 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -53,8 +53,8 @@ async fn run_read_window_aggregate_test_case( async fn test_read_window_aggregate_nanoseconds() { let predicate = Predicate::default() // city=Boston or city=LA - .add_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA")))) - .timestamp_range(100, 450); + .with_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA")))) + .with_range(100, 450); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Mean; @@ -82,12 +82,12 @@ async fn test_read_window_aggregate_nanoseconds() { async fn test_read_window_aggregate_nanoseconds_measurement_pred() { let predicate = Predicate::default() // city=Cambridge OR (_measurement != 'other' AND city = LA) - .add_expr( + .with_expr( col("city").eq(lit("Boston")).or(col("_measurement") .not_eq(lit("other")) .and(col("city").eq(lit("LA")))), ) - .timestamp_range(100, 450); + .with_range(100, 450); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Mean; @@ -113,7 +113,7 @@ async fn test_read_window_aggregate_nanoseconds_measurement_pred() { #[tokio::test] async fn test_read_window_aggregate_nanoseconds_measurement_count() { // Expect that the type of `Count` is Integer - let predicate = Predicate::default().timestamp_range(100, 450); + let predicate = Predicate::default().with_range(100, 450); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Count; @@ -142,7 +142,7 @@ async fn test_read_window_aggregate_nanoseconds_measurement_count() { async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() { let predicate = Predicate::default() // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' - .timestamp_range(1609459201000000001, 1609459201000000031); + .with_range(1609459201000000001, 1609459201000000031); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Min; @@ -174,7 +174,7 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() { async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697_with_delete() { let predicate = Predicate::default() // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' - .timestamp_range(1609459201000000001, 1609459201000000031); + .with_range(1609459201000000001, 1609459201000000031); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Min; @@ -216,7 +216,7 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697_with_delet async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() { let predicate = Predicate::default() // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' - .timestamp_range(1609459201000000001, 1609459201000000031); + .with_range(1609459201000000001, 1609459201000000031); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Sum; @@ -250,8 +250,8 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() { async fn test_grouped_series_set_plan_group_aggregate_filter_on_field() { let predicate = Predicate::default() // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' - .timestamp_range(1609459201000000001, 1609459201000000031) - .add_expr(col("_field").eq(lit("foo"))); + .with_range(1609459201000000001, 1609459201000000031) + .with_expr(col("_field").eq(lit("foo"))); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Sum; @@ -280,7 +280,7 @@ async fn test_grouped_series_set_plan_group_aggregate_filter_on_field() { async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697_with_delete() { let predicate = Predicate::default() // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' - .timestamp_range(1609459201000000001, 1609459201000000031); + .with_range(1609459201000000001, 1609459201000000031); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Sum; @@ -322,7 +322,7 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697_with_delet #[tokio::test] async fn test_read_window_aggregate_overflow() { - let predicate = Predicate::default().timestamp_range(1609459201000000001, 1609459201000000024); + let predicate = Predicate::default().with_range(1609459201000000001, 1609459201000000024); let predicate = InfluxRpcPredicate::new(None, predicate); let agg = Aggregate::Max; diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index 3050dd5f80..9495d13bd8 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -75,9 +75,9 @@ async fn list_table_names_no_non_null_data_passes() { // only a single row with a null field passes this predicate (expect no table names) let predicate = Predicate::default() // only get last row of o2 (timestamp = 300) - .timestamp_range(200, 400) + .with_range(200, 400) // model predicate like _field='reading' which last row does not have - .field_columns(vec!["reading"]); + .with_field_columns(vec!["reading"]); let predicate = InfluxRpcPredicate::new_table("o2", predicate); run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await; @@ -90,11 +90,11 @@ async fn list_table_names_no_non_null_general_data_passes() { // force a generic plan let predicate = Predicate::default() // only get last row of o2 (timestamp = 300) - .timestamp_range(200, 400) + .with_range(200, 400) // model predicate like _field='reading' which last row does not have - .field_columns(vec!["reading"]) + .with_field_columns(vec!["reading"]) // (state = CA) OR (temp > 50) - .add_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50)))); + .with_expr(col("state").eq(lit("CA")).or(col("temp").gt(lit(50)))); let predicate = InfluxRpcPredicate::new_table("o2", predicate); run_table_names_test_case(TwoMeasurementsManyFields {}, predicate, vec![]).await; @@ -248,7 +248,7 @@ async fn list_table_names_max_time_greater_one() { // make a single timestamp predicate between r1 and r2 fn tsp(r1: i64, r2: i64) -> InfluxRpcPredicate { - InfluxRpcPredicate::new(None, Predicate::default().timestamp_range(r1, r2)) + InfluxRpcPredicate::new(None, Predicate::default().with_range(r1, r2)) } fn to_stringset(v: &[&str]) -> StringSetRef { diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 75b08d5c7b..c2ef37f801 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -57,7 +57,7 @@ async fn list_tag_columns_with_no_tags() { ) .await; - let predicate = Predicate::default().timestamp_range(0, 1000); + let predicate = Predicate::default().with_range(0, 1000); let predicate = InfluxRpcPredicate::new(None, predicate); run_tag_keys_test_case(OneMeasurementNoTags {}, predicate, vec![]).await; } @@ -77,7 +77,7 @@ async fn list_tag_columns_no_predicate() { #[tokio::test] async fn list_tag_columns_timestamp() { - let predicate = Predicate::default().timestamp_range(150, 201); + let predicate = Predicate::default().with_range(150, 201); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["city", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -85,7 +85,7 @@ async fn list_tag_columns_timestamp() { #[tokio::test] async fn list_tag_columns_predicate() { - let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA + let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["city", "county", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -94,8 +94,8 @@ async fn list_tag_columns_predicate() { #[tokio::test] async fn list_tag_columns_predicate_negative_nonexistent_column() { let predicate = Predicate::default() - .add_expr(col("state").eq(lit("MA"))) // state=MA - .add_expr(col("host").not_eq(lit("server01"))); // nonexistent column with !=; always true + .with_expr(col("state").eq(lit("MA"))) // state=MA + .with_expr(col("host").not_eq(lit("server01"))); // nonexistent column with !=; always true let predicate = InfluxRpcPredicate::new(None, predicate); // This currently returns nothing, which is incorrect, it should return "city", "county", // "state" because a nonexistent column is always not equal to anything. @@ -109,8 +109,8 @@ async fn list_tag_columns_measurement_pred() { // // "o2,state=NY,city=NYC temp=61.0 500", let predicate = Predicate::default() - .timestamp_range(450, 550) - .add_expr(col("_measurement").eq(lit("o2"))); // _measurement=o2 + .with_range(450, 550) + .with_expr(col("_measurement").eq(lit("o2"))); // _measurement=o2 let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["city", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -119,8 +119,8 @@ async fn list_tag_columns_measurement_pred() { #[tokio::test] async fn list_tag_columns_timestamp_and_predicate() { let predicate = Predicate::default() - .timestamp_range(150, 201) - .add_expr(col("state").eq(lit("MA"))); // state=MA + .with_range(150, 201) + .with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["city", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -135,7 +135,7 @@ async fn list_tag_columns_measurement_name() { #[tokio::test] async fn list_tag_columns_measurement_name_and_timestamp() { - let predicate = Predicate::default().timestamp_range(150, 201); + let predicate = Predicate::default().with_range(150, 201); let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec!["city", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -143,7 +143,7 @@ async fn list_tag_columns_measurement_name_and_timestamp() { #[tokio::test] async fn list_tag_columns_measurement_name_and_predicate() { - let predicate = Predicate::default().add_expr(col("state").eq(lit("NY"))); // state=NY + let predicate = Predicate::default().with_expr(col("state").eq(lit("NY"))); // state=NY let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec!["borough", "city", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -152,8 +152,8 @@ async fn list_tag_columns_measurement_name_and_predicate() { #[tokio::test] async fn list_tag_columns_measurement_name_and_predicate_and_timestamp() { let predicate = Predicate::default() - .timestamp_range(1, 550) - .add_expr(col("state").eq(lit("NY"))); // state=NY + .with_range(1, 550) + .with_expr(col("state").eq(lit("NY"))); // state=NY let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec!["city", "state"]; run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await; @@ -162,8 +162,8 @@ async fn list_tag_columns_measurement_name_and_predicate_and_timestamp() { #[tokio::test] async fn list_tag_name_end_to_end() { let predicate = Predicate::default() - .timestamp_range(0, 10000) - .add_expr(col("host").eq(lit("server01"))); + .with_range(0, 10000) + .with_expr(col("host").eq(lit("server01"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["host", "name", "region"]; run_tag_keys_test_case(EndToEndTest {}, predicate, expected_tag_keys).await; @@ -172,8 +172,8 @@ async fn list_tag_name_end_to_end() { #[tokio::test] async fn list_tag_name_end_to_end_with_delete() { let predicate = Predicate::default() - .timestamp_range(0, 10000) - .add_expr(col("host").eq(lit("server01"))); + .with_range(0, 10000) + .with_expr(col("host").eq(lit("server01"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["host", "region"]; run_tag_keys_test_case(EndToEndTestWithDelete {}, predicate, expected_tag_keys).await; @@ -182,7 +182,7 @@ async fn list_tag_name_end_to_end_with_delete() { #[tokio::test] async fn list_tag_name_max_time() { test_helpers::maybe_start_logging(); - let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME); + let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["host"]; run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await; @@ -193,7 +193,7 @@ async fn list_tag_name_max_i64() { test_helpers::maybe_start_logging(); let predicate = Predicate::default() // outside valid timestamp range - .timestamp_range(i64::MIN, i64::MAX); + .with_range(i64::MIN, i64::MAX); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["host"]; run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await; @@ -202,7 +202,7 @@ async fn list_tag_name_max_i64() { #[tokio::test] async fn list_tag_name_max_time_less_one() { test_helpers::maybe_start_logging(); - let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); // one less than max timestamp + let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); // one less than max timestamp let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec![]; run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await; @@ -211,7 +211,7 @@ async fn list_tag_name_max_time_less_one() { #[tokio::test] async fn list_tag_name_max_time_greater_one() { test_helpers::maybe_start_logging(); - let predicate = Predicate::default().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); // one more than min timestamp + let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); // one more than min timestamp let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec![]; run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await; diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index 022e9c1905..efb5cd34c4 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -122,7 +122,7 @@ async fn list_tag_values_no_predicate_city_col() { #[tokio::test] async fn list_tag_values_timestamp_pred_state_col() { let tag_name = "state"; - let predicate = Predicate::default().timestamp_range(50, 201); + let predicate = Predicate::default().with_range(50, 201); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["CA", "MA"]; run_tag_values_test_case( @@ -137,7 +137,7 @@ async fn list_tag_values_timestamp_pred_state_col() { #[tokio::test] async fn list_tag_values_state_pred_state_col() { let tag_name = "city"; - let predicate = Predicate::default().add_expr(col("state").eq(lit("MA"))); // state=MA + let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["Boston"]; run_tag_values_test_case( @@ -153,8 +153,8 @@ async fn list_tag_values_state_pred_state_col() { async fn list_tag_values_timestamp_and_state_pred_state_col() { let tag_name = "state"; let predicate = Predicate::default() - .timestamp_range(150, 301) - .add_expr(col("state").eq(lit("MA"))); // state=MA + .with_range(150, 301) + .with_expr(col("state").eq(lit("MA"))); // state=MA let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["MA"]; run_tag_values_test_case( @@ -197,7 +197,7 @@ async fn list_tag_values_table_pred_city_col() { #[tokio::test] async fn list_tag_values_table_and_timestamp_and_table_pred_state_col() { let tag_name = "state"; - let predicate = Predicate::default().timestamp_range(50, 201); + let predicate = Predicate::default().with_range(50, 201); let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec!["MA"]; run_tag_values_test_case( @@ -212,7 +212,7 @@ async fn list_tag_values_table_and_timestamp_and_table_pred_state_col() { #[tokio::test] async fn list_tag_values_table_and_state_pred_state_col() { let tag_name = "state"; - let predicate = Predicate::default().add_expr(col("state").eq(lit("NY"))); // state=NY + let predicate = Predicate::default().with_expr(col("state").eq(lit("NY"))); // state=NY let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec!["NY"]; run_tag_values_test_case( @@ -228,8 +228,8 @@ async fn list_tag_values_table_and_state_pred_state_col() { async fn list_tag_values_table_and_timestamp_and_state_pred_state_col() { let tag_name = "state"; let predicate = Predicate::default() - .timestamp_range(1, 550) - .add_expr(col("state").eq(lit("NY"))); // state=NY + .with_range(1, 550) + .with_expr(col("state").eq(lit("NY"))); // state=NY let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec!["NY"]; run_tag_values_test_case( @@ -245,8 +245,8 @@ async fn list_tag_values_table_and_timestamp_and_state_pred_state_col() { async fn list_tag_values_table_and_timestamp_and_state_pred_state_col_no_rows() { let tag_name = "state"; let predicate = Predicate::default() - .timestamp_range(1, 300) // filters out the NY row - .add_expr(col("state").eq(lit("NY"))); // state=NY + .with_range(1, 300) // filters out the NY row + .with_expr(col("state").eq(lit("NY"))); // state=NY let predicate = InfluxRpcPredicate::new_table("o2", predicate); let expected_tag_keys = vec![]; @@ -263,8 +263,8 @@ async fn list_tag_values_table_and_timestamp_and_state_pred_state_col_no_rows() async fn list_tag_values_measurement_pred() { let tag_name = "state"; let predicate = Predicate::default() - .timestamp_range(1, 600) // filters out the NY row - .add_expr(col("_measurement").not_eq(lit("o2"))); + .with_range(1, 600) // filters out the NY row + .with_expr(col("_measurement").not_eq(lit("o2"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["CA", "MA"]; @@ -281,11 +281,11 @@ async fn list_tag_values_measurement_pred() { async fn list_tag_values_measurement_pred_and_or() { let tag_name = "city"; let predicate = Predicate::default() - .timestamp_range(1, 600) // filters out the NY row + .with_range(1, 600) // filters out the NY row // since there is an 'OR' in this predicate, can't answer // with metadata alone // _measurement = 'o2' OR temp > 70.0 - .add_expr( + .with_expr( col("_measurement") .eq(lit("o2")) .or(col("temp").gt(lit(70.0))), @@ -334,9 +334,9 @@ async fn list_tag_values_field_col_on_tag() { async fn list_tag_values_field_col_does_not_exist() { let tag_name = "state"; let predicate = Predicate::default() - .timestamp_range(0, 1000) // get all rows + .with_range(0, 1000) // get all rows // since this field doesn't exist this predicate should match no values - .add_expr(col("_field").eq(lit("not_a_column"))); + .with_expr(col("_field").eq(lit("not_a_column"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec![]; @@ -353,9 +353,9 @@ async fn list_tag_values_field_col_does_not_exist() { async fn list_tag_values_field_col_does_exist() { let tag_name = "state"; let predicate = Predicate::default() - .timestamp_range(0, 1000) // get all rows + .with_range(0, 1000) // get all rows // this field does exist (but only for rows with CA and MA, not NY) - .add_expr(col("_field").eq(lit("county"))); + .with_expr(col("_field").eq(lit("county"))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_tag_keys = vec!["MA", "CA"]; diff --git a/service_grpc_influxrpc/src/expr.rs b/service_grpc_influxrpc/src/expr.rs index bb49157706..0c53682b0a 100644 --- a/service_grpc_influxrpc/src/expr.rs +++ b/service_grpc_influxrpc/src/expr.rs @@ -166,7 +166,7 @@ impl InfluxRpcPredicateBuilder { /// Sets the timestamp range pub fn set_range(mut self, range: Option) -> Self { if let Some(range) = range { - self.inner = self.inner.timestamp_range(range.start, range.end) + self.inner = self.inner.with_range(range.start, range.end) } self } @@ -310,7 +310,7 @@ fn convert_simple_node( // add the table names as a predicate return Ok(builder.tables(value_list)); } else if tag_name.is_field() { - builder.inner = builder.inner.field_columns(value_list); + builder.inner = builder.inner.with_field_columns(value_list); return Ok(builder); } } @@ -318,7 +318,7 @@ fn convert_simple_node( // If no special case applies, fall back to generic conversion let expr = convert_node_to_expr(node)?; - builder.inner = builder.inner.add_expr(expr); + builder.inner = builder.inner.with_expr(expr); Ok(builder) } diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 71b1e73487..52f13514a9 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -1610,7 +1610,7 @@ mod tests { // also ensure the plumbing is hooked correctly and that the predicate made it // down to the chunk - let expected_predicate = Predicate::default().timestamp_range(150, 200); + let expected_predicate = Predicate::default().with_range(150, 200); fixture .expect_predicates( @@ -1687,8 +1687,8 @@ mod tests { // also ensure the plumbing is hooked correctly and that the predicate made it // down to the chunk let expected_predicate = Predicate::default() - .timestamp_range(150, 200) - .add_expr(make_state_ma_expr()); + .with_range(150, 200) + .with_expr(make_state_ma_expr()); fixture .expect_predicates( @@ -1790,8 +1790,8 @@ mod tests { // also ensure the plumbing is hooked correctly and that the predicate made it // down to the chunk let expected_predicate = Predicate::default() - .timestamp_range(150, 200) - .add_expr(make_state_ma_expr()); + .with_range(150, 200) + .with_expr(make_state_ma_expr()); fixture .expect_predicates( @@ -2466,11 +2466,11 @@ mod tests { // also ensure the plumbing is hooked correctly and that the predicate made it // down to the chunk and it was normalized to namevalue let expected_predicate = Predicate::default() - .timestamp_range(0, 10000) + .with_range(0, 10000) // should NOT have CASE nonsense for handling empty strings as // that should bave been optimized by the time it gets to // the chunk - .add_expr(col("state").eq(lit("MA"))); + .with_expr(col("state").eq(lit("MA"))); fixture .expect_predicates( @@ -2529,11 +2529,11 @@ mod tests { // also ensure the plumbing is hooked correctly and that the predicate made it // down to the chunk and it was normalized to namevalue let expected_predicate = Predicate::default() - .timestamp_range(0, 10000) + .with_range(0, 10000) // comparison to empty string conversion results in a messier translation // to handle backwards compatibility semantics // #state IS NULL OR #state = Utf8("") - .add_expr(col("state").is_null().or(col("state").eq(lit("")))); + .with_expr(col("state").is_null().or(col("state").eq(lit("")))); fixture .expect_predicates(