diff --git a/service_grpc_influxrpc/src/expr.rs b/service_grpc_influxrpc/src/expr.rs index 39faff93d6..915c34310f 100644 --- a/service_grpc_influxrpc/src/expr.rs +++ b/service_grpc_influxrpc/src/expr.rs @@ -115,6 +115,12 @@ pub enum Error { FieldColumnsNotSupported { source: Box, }, + + #[snafu(display( + "Multiple table predicate specification not yet supported: {:?}", + tables + ))] + MultipleTablePredicateNotSupported { tables: Vec }, } pub type Result = std::result::Result; @@ -212,17 +218,17 @@ impl InfluxRpcPredicateBuilder { } /// Adds an optional table name restriction to the existing list - pub fn table_option(self, table: Option) -> Self { + pub fn table_option(self, table: Option) -> Result { if let Some(table) = table { self.tables(vec![table]) } else { - self + Ok(self) } } /// Sets table name restrictions from something that can iterate /// over items that can be converted into `Strings` - pub fn tables(mut self, tables: I) -> Self + pub fn tables(mut self, tables: I) -> Result where I: IntoIterator, S: Into, @@ -230,15 +236,17 @@ impl InfluxRpcPredicateBuilder { // We need to distinguish predicates like `table_name In // (foo, bar)` and `table_name = foo and table_name = bar` in order to handle // this - assert!( - self.table_names.is_none(), - "Multiple table predicate specification not yet supported" - ); + if let Some(tables) = &self.table_names { + return MultipleTablePredicateNotSupportedSnafu { + tables: tables.iter().cloned().collect::>(), + } + .fail(); + } let table_names: BTreeSet = tables.into_iter().map(|s| s.into()).collect(); self.table_names = Some(table_names); - self + Ok(self) } pub fn build(self) -> InfluxRpcPredicate { @@ -311,7 +319,7 @@ fn convert_simple_node( match DecodedTagKey::try_from(tag_name) { Ok(DecodedTagKey::Measurement) => { // add the table names as a predicate - return Ok(builder.tables(value_list)); + return builder.tables(value_list); } Ok(DecodedTagKey::Field) => { builder.inner = builder diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 240417e2fb..4a490d8f36 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -118,6 +118,12 @@ pub enum Error { source: DataFusionError, }, + #[snafu(display("Error setting predicate table '{:?}': {}", table, source))] + SettingPredicateTable { + table: Option, + source: super::expr::Error, + }, + #[snafu(display("Error converting Predicate '{}: {}", rpc_predicate_string, source))] ConvertingPredicate { rpc_predicate_string: String, @@ -214,7 +220,12 @@ impl Error { | Self::ConvertingReadGroupType { source, .. } | Self::ConvertingReadGroupAggregate { source, .. } | Self::ConvertingWindowAggregate { source, .. } - if matches!(source, super::expr::Error::FieldColumnsNotSupported { .. }) => + | Self::SettingPredicateTable { source, .. } + if matches!( + source, + super::expr::Error::FieldColumnsNotSupported { .. } + | super::expr::Error::MultipleTablePredicateNotSupported { .. } + ) => { tonic::Code::Unimplemented } @@ -225,6 +236,7 @@ impl Error { | Self::ConvertingTagKeyInTagValues { .. } | Self::ComputingGroupedSeriesSet { .. } | Self::ConvertingFieldList { .. } + | Self::SettingPredicateTable { .. } | Self::MeasurementLiteralOrRegex { .. } | Self::MissingTagKeyPredicate {} | Self::InvalidTagKeyRegex { .. } => tonic::Code::InvalidArgument, @@ -1171,7 +1183,8 @@ where let predicate = InfluxRpcPredicateBuilder::default() .set_range(range) - .table_option(measurement) + .table_option(measurement.clone()) + .context(SettingPredicateTableSnafu { table: measurement })? .rpc_predicate(rpc_predicate) .context(ConvertingPredicateSnafu { rpc_predicate_string, @@ -1213,7 +1226,8 @@ where let predicate = InfluxRpcPredicateBuilder::default() .set_range(range) - .table_option(measurement) + .table_option(measurement.clone()) + .context(SettingPredicateTableSnafu { table: measurement })? .rpc_predicate(rpc_predicate) .context(ConvertingPredicateSnafu { rpc_predicate_string, @@ -1460,7 +1474,8 @@ where let predicate = InfluxRpcPredicateBuilder::default() .set_range(range) - .table_option(measurement) + .table_option(measurement.clone()) + .context(SettingPredicateTableSnafu { table: measurement })? .rpc_predicate(rpc_predicate) .context(ConvertingPredicateSnafu { rpc_predicate_string,