fix: apply selection in `TestChunk::read_filter` (#5613)

* fix: apply selection in `TestChunk::read_filter`

TBH I have no idea how this worked so well before, but the chunks are
expected to apply the given selection. This is because
`IOxReadFilterNode::execute` will wrap the `QueryChunk::read_filter`
output into a `SchemaAdapterStream` and this one expects that there are
no input columns that are absent in the output schema (i.e. it will only
add null columns, it won't remove any). Funnily the `SchemaAdapterStream`
error will blame DataFusion for the mess.

* test: make `test_storage_rpc_tag_values_grouped_by_measurement_and_tag_key` a bit harder

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-09-12 13:10:37 +00:00 committed by GitHub
parent caa0dfd1e0
commit b676049358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 2 deletions

View File

@ -16,6 +16,7 @@ use arrow::{
ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray, UInt64Array,
},
datatypes::{DataType, Int32Type, TimeUnit},
error::ArrowError,
record_batch::RecordBatch,
};
use async_trait::async_trait;
@ -919,14 +920,24 @@ impl QueryChunk for TestChunk {
&self,
_ctx: IOxSessionContext,
predicate: &Predicate,
_selection: Selection<'_>,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
self.check_error()?;
// save the predicate
self.predicates.lock().push(predicate.clone());
let batches = self.table_data.clone();
let batches = match self.schema.df_projection(selection)? {
None => self.table_data.clone(),
Some(projection) => self
.table_data
.iter()
.map(|batch| {
let batch = batch.project(&projection)?;
Ok(Arc::new(batch))
})
.collect::<std::result::Result<Vec<_>, ArrowError>>()?,
};
Ok(stream_from_batches(batches))
}

View File

@ -2182,10 +2182,12 @@ mod tests {
let db_info = org_and_bucket();
let chunk1 = TestChunk::new("table_a")
.with_time_column()
.with_id(0)
.with_tag_column("state")
.with_one_row_of_data();
let chunk2 = TestChunk::new("table_b")
.with_time_column()
.with_id(1)
.with_tag_column("state")
.with_one_row_of_data();