fix: Fix gRPC panic` when multiple field selections are provided (#523)
* fix: do not assert when multiple fields are selected * fix: clippy * fix: write unit test, fix bug * fix: tweak commentspull/24376/head
parent
8c0e14e039
commit
4ec75a4f22
|
@ -36,20 +36,20 @@ impl TimestampRange {
|
|||
}
|
||||
|
||||
/// Represents a parsed predicate for evaluation by the
|
||||
/// InfluxDB IOx storage system.
|
||||
/// TSDatabase InfluxDB IOx query engine.
|
||||
///
|
||||
/// Note that the input data model (e.g. ParsedLine's) distinguishes
|
||||
/// between some types of columns (tags and fields), and likewise
|
||||
/// this structure has some types of restrictions that only apply to
|
||||
/// certain types of columns.
|
||||
/// Note that the data model of TSDatabase (e.g. ParsedLine's)
|
||||
/// distinguishes between some types of columns (tags and fields), and
|
||||
/// likewise the semantics of this structure has some types of
|
||||
/// restrictions that only apply to certain types of columns.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Predicate {
|
||||
/// Optional filter. If present, restrict the results to only
|
||||
/// those tables whose names are in `table_names`
|
||||
/// Optional table restriction. If present, restricts the results
|
||||
/// to only tables whose names are in `table_names`
|
||||
pub table_names: Option<BTreeSet<String>>,
|
||||
|
||||
// Optional field column selection. If present, further restrict any
|
||||
// field columns returned to only those named
|
||||
// Optional field restriction. If present, restricts the results to only
|
||||
// tables which have *at least one* of the fields in field_columns.
|
||||
pub field_columns: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional arbitrary predicates, represented as list of
|
||||
|
@ -86,12 +86,24 @@ impl From<Predicate> for PredicateBuilder {
|
|||
impl PredicateBuilder {
|
||||
/// Sets the timestamp range
|
||||
pub fn timestamp_range(mut self, start: i64, end: i64) -> Self {
|
||||
// Without more thought, redefining the timestamp range would
|
||||
// lose the old range. Asser that that cannot happen.
|
||||
assert!(
|
||||
self.inner.range.is_none(),
|
||||
"Unexpected re-definition of timestamp range"
|
||||
);
|
||||
self.inner.range = Some(TimestampRange { start, end });
|
||||
self
|
||||
}
|
||||
|
||||
/// sets the optional timestamp range, if any
|
||||
pub fn timestamp_range_option(mut self, range: Option<TimestampRange>) -> Self {
|
||||
// Without more thought, redefining the timestamp range would
|
||||
// lose the old range. Asser that that cannot happen.
|
||||
assert!(
|
||||
range.is_none() || self.inner.range.is_none(),
|
||||
"Unexpected re-definition of timestamp range"
|
||||
);
|
||||
self.inner.range = range;
|
||||
self
|
||||
}
|
||||
|
@ -134,10 +146,9 @@ impl PredicateBuilder {
|
|||
pub fn field_columns(mut self, columns: Vec<String>) -> Self {
|
||||
// We need to distinguish predicates like `column_name In
|
||||
// (foo, bar)` and `column_name = foo and column_name = bar` in order to handle this
|
||||
assert!(
|
||||
self.inner.field_columns.is_none(),
|
||||
"Multiple table predicate specification not yet supported"
|
||||
);
|
||||
if self.inner.field_columns.is_some() {
|
||||
unimplemented!("Complex/Multi field predicates are not yet supported");
|
||||
}
|
||||
|
||||
let column_names = columns.into_iter().collect::<BTreeSet<_>>();
|
||||
self.inner.field_columns = Some(column_names);
|
||||
|
|
|
@ -885,7 +885,7 @@ where
|
|||
|
||||
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
|
||||
let values = tag_keys_to_byte_vecs(tag_keys);
|
||||
// Debugging help: comment this out to see what is coming back
|
||||
// Debugging help: uncomment this out to see what is coming back
|
||||
// info!("Returning tag keys");
|
||||
// values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k)));
|
||||
|
||||
|
|
|
@ -232,6 +232,11 @@ impl Column {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return true of this column's type is a Tag
|
||||
pub fn is_tag(&self) -> bool {
|
||||
matches!(self, Self::Tag(..))
|
||||
}
|
||||
|
||||
/// Returns true if there exists at least one row idx where this
|
||||
/// self[i] is within the range [min_value, max_value). Inclusive
|
||||
/// of `start`, exclusive of `end` and where col[i] is non null
|
||||
|
|
|
@ -406,7 +406,7 @@ impl TSDatabase for Db {
|
|||
let partition_predicate = partition.compile_predicate(&predicate)?;
|
||||
// this doesn't seem to make any sense
|
||||
assert!(
|
||||
partition_predicate.field_restriction.is_none(),
|
||||
partition_predicate.field_name_predicate.is_none(),
|
||||
"Column selection for table names not supported"
|
||||
);
|
||||
|
||||
|
|
|
@ -116,9 +116,11 @@ pub struct PartitionPredicate {
|
|||
/// partition (so no table can pass)
|
||||
pub table_name_predicate: Option<BTreeSet<u32>>,
|
||||
|
||||
// Optional field column selection. If present, further restrict
|
||||
// any field columns returnedto only those named
|
||||
pub field_restriction: Option<BTreeSet<u32>>,
|
||||
/// Optional column restriction. If present, further
|
||||
/// restrict any field columns returned to only those named, and
|
||||
/// skip tables entirely when querying metadata that do not have
|
||||
/// *any* of the fields
|
||||
pub field_name_predicate: Option<BTreeSet<u32>>,
|
||||
|
||||
/// General DataFusion expressions (arbitrary predicates) applied
|
||||
/// as a filter using logical conjuction (aka are 'AND'ed
|
||||
|
@ -152,18 +154,10 @@ impl PartitionPredicate {
|
|||
builder.build()
|
||||
}
|
||||
|
||||
/// Return true if there is a non empty field restriction
|
||||
pub fn has_field_restriction(&self) -> bool {
|
||||
match &self.field_restriction {
|
||||
None => false,
|
||||
Some(field_restiction) => !field_restiction.is_empty(),
|
||||
}
|
||||
}
|
||||
|
||||
/// For plans which select a subset of fields, returns true if
|
||||
/// the field should be included in the results
|
||||
pub fn should_include_field(&self, field_id: u32) -> bool {
|
||||
match &self.field_restriction {
|
||||
match &self.field_name_predicate {
|
||||
None => true,
|
||||
Some(field_restriction) => field_restriction.contains(&field_id),
|
||||
}
|
||||
|
@ -265,7 +259,7 @@ impl Partition {
|
|||
|
||||
Ok(PartitionPredicate {
|
||||
table_name_predicate,
|
||||
field_restriction,
|
||||
field_name_predicate: field_restriction,
|
||||
partition_exprs,
|
||||
required_columns,
|
||||
time_column_id,
|
||||
|
|
|
@ -345,9 +345,6 @@ impl Table {
|
|||
projected_schema,
|
||||
});
|
||||
|
||||
// Shouldn't have field selections here (as we are getting the tags...)
|
||||
assert!(!partition_predicate.has_field_restriction());
|
||||
|
||||
let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?;
|
||||
|
||||
// add optional selection to remove time column
|
||||
|
@ -415,9 +412,6 @@ impl Table {
|
|||
projected_schema,
|
||||
});
|
||||
|
||||
// shouldn't have columns selection (as this is getting tag values...)
|
||||
assert!(!partition_predicate.has_field_restriction());
|
||||
|
||||
let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?;
|
||||
|
||||
plan_builder
|
||||
|
@ -948,7 +942,7 @@ impl Table {
|
|||
/// false means that no rows in this table could possibly match
|
||||
pub fn could_match_predicate(&self, partition_predicate: &PartitionPredicate) -> Result<bool> {
|
||||
Ok(
|
||||
self.matches_column_selection(partition_predicate.field_restriction.as_ref())
|
||||
self.matches_column_name_predicate(partition_predicate.field_name_predicate.as_ref())
|
||||
&& self.matches_table_name_predicate(
|
||||
partition_predicate.table_name_predicate.as_ref(),
|
||||
)
|
||||
|
@ -957,15 +951,16 @@ impl Table {
|
|||
)
|
||||
}
|
||||
|
||||
/// Returns true if the table contains at least one of the fields
|
||||
/// Returns true if the table contains any of the field columns
|
||||
/// requested or there are no specific fields requested.
|
||||
fn matches_column_selection(&self, column_selection: Option<&BTreeSet<u32>>) -> bool {
|
||||
fn matches_column_name_predicate(&self, column_selection: Option<&BTreeSet<u32>>) -> bool {
|
||||
match column_selection {
|
||||
Some(column_selection) => {
|
||||
// figure out if any of the columns exists
|
||||
self.column_id_to_index
|
||||
.keys()
|
||||
.any(|column_id| column_selection.contains(column_id))
|
||||
.iter()
|
||||
.any(|(column_id, &column_index)| {
|
||||
column_selection.contains(column_id) && !self.columns[column_index].is_tag()
|
||||
})
|
||||
}
|
||||
None => true, // no specific selection
|
||||
}
|
||||
|
@ -1216,6 +1211,52 @@ mod tests {
|
|||
assert!(!table.matches_table_name_predicate(Some(&set)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_matches_column_name_predicate() {
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
|
||||
|
||||
let lp_lines = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.4,awesomeness=1000 100",
|
||||
"h2o,state=MA,city=Boston temp=72.4,awesomeness=2000 250",
|
||||
];
|
||||
write_lines_to_table(&mut table, dictionary, lp_lines);
|
||||
|
||||
let state_symbol = dictionary.id("state").unwrap();
|
||||
let temp_symbol = dictionary.id("temp").unwrap();
|
||||
let awesomeness_symbol = dictionary.id("awesomeness").unwrap();
|
||||
|
||||
assert!(table.matches_column_name_predicate(None));
|
||||
|
||||
let set = BTreeSet::new();
|
||||
assert!(!table.matches_column_name_predicate(Some(&set)));
|
||||
|
||||
// tag columns should not count
|
||||
let mut set = BTreeSet::new();
|
||||
set.insert(state_symbol);
|
||||
assert!(!table.matches_column_name_predicate(Some(&set)));
|
||||
|
||||
let mut set = BTreeSet::new();
|
||||
set.insert(temp_symbol);
|
||||
assert!(table.matches_column_name_predicate(Some(&set)));
|
||||
|
||||
let mut set = BTreeSet::new();
|
||||
set.insert(temp_symbol);
|
||||
set.insert(awesomeness_symbol);
|
||||
assert!(table.matches_column_name_predicate(Some(&set)));
|
||||
|
||||
let mut set = BTreeSet::new();
|
||||
set.insert(temp_symbol);
|
||||
set.insert(awesomeness_symbol);
|
||||
set.insert(1337); // some other symbol, but that is ok
|
||||
assert!(table.matches_column_name_predicate(Some(&set)));
|
||||
|
||||
let mut set = BTreeSet::new();
|
||||
set.insert(1337);
|
||||
assert!(!table.matches_column_name_predicate(Some(&set)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_series_set_plan() {
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
|
|
Loading…
Reference in New Issue