fix: don't panic for unsupported features (#7472)
This happens rarely but I've spotted this in prod, so let's fix this because panics are bad and increase monitoring noise.pull/24376/head
parent
c03a5c7c14
commit
612f2451ee
|
@ -115,6 +115,12 @@ pub enum Error {
|
|||
FieldColumnsNotSupported {
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Multiple table predicate specification not yet supported: {:?}",
|
||||
tables
|
||||
))]
|
||||
MultipleTablePredicateNotSupported { tables: Vec<String> },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -212,17 +218,17 @@ impl InfluxRpcPredicateBuilder {
|
|||
}
|
||||
|
||||
/// Adds an optional table name restriction to the existing list
|
||||
pub fn table_option(self, table: Option<String>) -> Self {
|
||||
pub fn table_option(self, table: Option<String>) -> Result<Self> {
|
||||
if let Some(table) = table {
|
||||
self.tables(vec![table])
|
||||
} else {
|
||||
self
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets table name restrictions from something that can iterate
|
||||
/// over items that can be converted into `Strings`
|
||||
pub fn tables<I, S>(mut self, tables: I) -> Self
|
||||
pub fn tables<I, S>(mut self, tables: I) -> Result<Self>
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
|
@ -230,15 +236,17 @@ impl InfluxRpcPredicateBuilder {
|
|||
// We need to distinguish predicates like `table_name In
|
||||
// (foo, bar)` and `table_name = foo and table_name = bar` in order to handle
|
||||
// this
|
||||
assert!(
|
||||
self.table_names.is_none(),
|
||||
"Multiple table predicate specification not yet supported"
|
||||
);
|
||||
if let Some(tables) = &self.table_names {
|
||||
return MultipleTablePredicateNotSupportedSnafu {
|
||||
tables: tables.iter().cloned().collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let table_names: BTreeSet<String> = tables.into_iter().map(|s| s.into()).collect();
|
||||
|
||||
self.table_names = Some(table_names);
|
||||
self
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn build(self) -> InfluxRpcPredicate {
|
||||
|
@ -311,7 +319,7 @@ fn convert_simple_node(
|
|||
match DecodedTagKey::try_from(tag_name) {
|
||||
Ok(DecodedTagKey::Measurement) => {
|
||||
// add the table names as a predicate
|
||||
return Ok(builder.tables(value_list));
|
||||
return builder.tables(value_list);
|
||||
}
|
||||
Ok(DecodedTagKey::Field) => {
|
||||
builder.inner = builder
|
||||
|
|
|
@ -118,6 +118,12 @@ pub enum Error {
|
|||
source: DataFusionError,
|
||||
},
|
||||
|
||||
#[snafu(display("Error setting predicate table '{:?}': {}", table, source))]
|
||||
SettingPredicateTable {
|
||||
table: Option<String>,
|
||||
source: super::expr::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error converting Predicate '{}: {}", rpc_predicate_string, source))]
|
||||
ConvertingPredicate {
|
||||
rpc_predicate_string: String,
|
||||
|
@ -214,7 +220,12 @@ impl Error {
|
|||
| Self::ConvertingReadGroupType { source, .. }
|
||||
| Self::ConvertingReadGroupAggregate { source, .. }
|
||||
| Self::ConvertingWindowAggregate { source, .. }
|
||||
if matches!(source, super::expr::Error::FieldColumnsNotSupported { .. }) =>
|
||||
| Self::SettingPredicateTable { source, .. }
|
||||
if matches!(
|
||||
source,
|
||||
super::expr::Error::FieldColumnsNotSupported { .. }
|
||||
| super::expr::Error::MultipleTablePredicateNotSupported { .. }
|
||||
) =>
|
||||
{
|
||||
tonic::Code::Unimplemented
|
||||
}
|
||||
|
@ -225,6 +236,7 @@ impl Error {
|
|||
| Self::ConvertingTagKeyInTagValues { .. }
|
||||
| Self::ComputingGroupedSeriesSet { .. }
|
||||
| Self::ConvertingFieldList { .. }
|
||||
| Self::SettingPredicateTable { .. }
|
||||
| Self::MeasurementLiteralOrRegex { .. }
|
||||
| Self::MissingTagKeyPredicate {}
|
||||
| Self::InvalidTagKeyRegex { .. } => tonic::Code::InvalidArgument,
|
||||
|
@ -1171,7 +1183,8 @@ where
|
|||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.set_range(range)
|
||||
.table_option(measurement)
|
||||
.table_option(measurement.clone())
|
||||
.context(SettingPredicateTableSnafu { table: measurement })?
|
||||
.rpc_predicate(rpc_predicate)
|
||||
.context(ConvertingPredicateSnafu {
|
||||
rpc_predicate_string,
|
||||
|
@ -1213,7 +1226,8 @@ where
|
|||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.set_range(range)
|
||||
.table_option(measurement)
|
||||
.table_option(measurement.clone())
|
||||
.context(SettingPredicateTableSnafu { table: measurement })?
|
||||
.rpc_predicate(rpc_predicate)
|
||||
.context(ConvertingPredicateSnafu {
|
||||
rpc_predicate_string,
|
||||
|
@ -1460,7 +1474,8 @@ where
|
|||
|
||||
let predicate = InfluxRpcPredicateBuilder::default()
|
||||
.set_range(range)
|
||||
.table_option(measurement)
|
||||
.table_option(measurement.clone())
|
||||
.context(SettingPredicateTableSnafu { table: measurement })?
|
||||
.rpc_predicate(rpc_predicate)
|
||||
.context(ConvertingPredicateSnafu {
|
||||
rpc_predicate_string,
|
||||
|
|
Loading…
Reference in New Issue