perf: use column_values in read buffer

pull/24376/head
Edd Robinson 2021-04-14 12:06:35 +01:00 committed by kodiakhq[bot]
parent 8f1bf8a960
commit 04f594bf94
1 changed files with 46 additions and 9 deletions

View File

@ -8,7 +8,10 @@ use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk};
use read_buffer::Chunk as ReadBufferChunk;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use super::{
pred::to_read_buffer_predicate,
@ -28,6 +31,9 @@ pub enum Error {
chunk_id: u32,
},
#[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, msg))]
ReadBufferError { chunk_id: u32, msg: String },
#[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))]
ParquetFileChunkError {
source: parquet_file::chunk::Error,
@ -345,9 +351,9 @@ impl PartitionChunk for DBChunk {
fn column_values(
&self,
_table_name: &str,
_column_name: &str,
_predicate: &Predicate,
table_name: &str,
column_name: &str,
predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error> {
match self {
Self::MutableBuffer { .. } => {
@ -355,11 +361,42 @@ impl PartitionChunk for DBChunk {
// vs just letting DataFusion do its thing
Ok(None)
}
Self::ReadBuffer { .. } => {
// TODO hook up read buffer API here when ready. Until
// now, fallback to using a full plan
// https://github.com/influxdata/influxdb_iox/issues/857
Ok(None)
Self::ReadBuffer { chunk, .. } => {
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back");
return Ok(None);
}
};
let mut values = chunk
.column_values(
table_name,
rb_predicate,
Selection::Some(&[column_name]),
BTreeMap::new(),
)
.context(ReadBufferChunkError {
chunk_id: chunk.id(),
})?;
// The InfluxRPC frontend only supports getting column values
// for one column at a time (this is a restriction on the Influx
// Read gRPC API too). However, the Read Buffer support multiple
// columns and will return a map - we just need to pull the
// column out to get the set of values.
let values = values
.remove(column_name)
.ok_or_else(|| Error::ReadBufferError {
chunk_id: chunk.id(),
msg: format!(
"failed to find column_name {:?} in results of tag_values",
column_name
),
})?;
Ok(Some(values))
}
Self::ParquetFile { .. } => {
unimplemented!("parquet file not implemented for column_values")