fix: apply `_field` restrictions that do not match any fields (#3903)
* fix: apply field restriction correctly * refactor: Use Vec::retainpull/24376/head
parent
b9ffc5bcbe
commit
07fdbe7c6b
|
@ -211,7 +211,7 @@ impl ChunkAccess {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
// Get chunks and schema as a single transaction
|
// Get chunks and schema as a single transaction
|
||||||
let (chunks, schema) = {
|
let (mut chunks, schema) = {
|
||||||
let table = match self.catalog.table(table_name).ok() {
|
let table = match self.catalog.table(table_name).ok() {
|
||||||
Some(table) => table,
|
Some(table) => table,
|
||||||
None => return vec![],
|
None => return vec![],
|
||||||
|
@ -232,6 +232,19 @@ impl ChunkAccess {
|
||||||
.catalog_snapshot_duration
|
.catalog_snapshot_duration
|
||||||
.inc(start.elapsed());
|
.inc(start.elapsed());
|
||||||
|
|
||||||
|
// if there is a field restriction on the predicate, only
|
||||||
|
// chunks with that field should be returned. If the chunk has
|
||||||
|
// none of the fields specified, then it doesn't match
|
||||||
|
if let Some(field_columns) = &predicate.field_columns {
|
||||||
|
chunks.retain(|chunk| {
|
||||||
|
let schema = chunk.schema();
|
||||||
|
// keep chunk if it has any of the columns requested
|
||||||
|
field_columns
|
||||||
|
.iter()
|
||||||
|
.any(|col| schema.find_index_of(col).is_some())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
self.prune_chunks(table_name, schema, chunks, predicate)
|
self.prune_chunks(table_name, schema, chunks, predicate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -311,7 +311,7 @@ async fn list_tag_values_measurement_pred_and_or() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn list_tag_values_field_col() {
|
async fn list_tag_values_field_col_on_tag() {
|
||||||
let db_setup = TwoMeasurementsManyNulls {};
|
let db_setup = TwoMeasurementsManyNulls {};
|
||||||
|
|
||||||
for scenario in db_setup.make().await {
|
for scenario in db_setup.make().await {
|
||||||
|
@ -332,6 +332,46 @@ async fn list_tag_values_field_col() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn list_tag_values_field_col_does_not_exist() {
|
||||||
|
let tag_name = "state";
|
||||||
|
let predicate = PredicateBuilder::default()
|
||||||
|
.timestamp_range(0, 1000) // get all rows
|
||||||
|
// since this field doesn't exist this predicate should match no values
|
||||||
|
.add_expr(col("_field").eq(lit("not_a_column")))
|
||||||
|
.build();
|
||||||
|
let predicate = InfluxRpcPredicate::new(None, predicate);
|
||||||
|
let expected_tag_keys = vec![];
|
||||||
|
|
||||||
|
run_tag_values_test_case(
|
||||||
|
TwoMeasurementsManyNulls {},
|
||||||
|
tag_name,
|
||||||
|
predicate,
|
||||||
|
expected_tag_keys,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn list_tag_values_field_col_does_exist() {
|
||||||
|
let tag_name = "state";
|
||||||
|
let predicate = PredicateBuilder::default()
|
||||||
|
.timestamp_range(0, 1000) // get all rows
|
||||||
|
// this field does exist (but only for rows with CA and MA, not NY)
|
||||||
|
.add_expr(col("_field").eq(lit("county")))
|
||||||
|
.build();
|
||||||
|
let predicate = InfluxRpcPredicate::new(None, predicate);
|
||||||
|
let expected_tag_keys = vec!["MA", "CA"];
|
||||||
|
|
||||||
|
run_tag_values_test_case(
|
||||||
|
TwoMeasurementsManyNulls {},
|
||||||
|
tag_name,
|
||||||
|
predicate,
|
||||||
|
expected_tag_keys,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
fn to_stringset(v: &[&str]) -> StringSetRef {
|
fn to_stringset(v: &[&str]) -> StringSetRef {
|
||||||
v.into_stringset().unwrap()
|
v.into_stringset().unwrap()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue