diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 270961550d..6f1ea57efb 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -8,7 +8,10 @@ use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk}; use read_buffer::Chunk as ReadBufferChunk; use snafu::{ResultExt, Snafu}; -use std::{collections::BTreeSet, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use super::{ pred::to_read_buffer_predicate, @@ -28,6 +31,9 @@ pub enum Error { chunk_id: u32, }, + #[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, msg))] + ReadBufferError { chunk_id: u32, msg: String }, + #[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))] ParquetFileChunkError { source: parquet_file::chunk::Error, @@ -345,9 +351,9 @@ impl PartitionChunk for DBChunk { fn column_values( &self, - _table_name: &str, - _column_name: &str, - _predicate: &Predicate, + table_name: &str, + column_name: &str, + predicate: &Predicate, ) -> Result, Self::Error> { match self { Self::MutableBuffer { .. } => { @@ -355,11 +361,42 @@ impl PartitionChunk for DBChunk { // vs just letting DataFusion do its thing Ok(None) } - Self::ReadBuffer { .. } => { - // TODO hook up read buffer API here when ready. Until - // now, fallback to using a full plan - // https://github.com/influxdata/influxdb_iox/issues/857 - Ok(None) + Self::ReadBuffer { chunk, .. } => { + let rb_predicate = match to_read_buffer_predicate(predicate) { + Ok(rb_predicate) => rb_predicate, + Err(e) => { + debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back"); + return Ok(None); + } + }; + + let mut values = chunk + .column_values( + table_name, + rb_predicate, + Selection::Some(&[column_name]), + BTreeMap::new(), + ) + .context(ReadBufferChunkError { + chunk_id: chunk.id(), + })?; + + // The InfluxRPC frontend only supports getting column values + // for one column at a time (this is a restriction on the Influx + // Read gRPC API too). However, the Read Buffer support multiple + // columns and will return a map - we just need to pull the + // column out to get the set of values. + let values = values + .remove(column_name) + .ok_or_else(|| Error::ReadBufferError { + chunk_id: chunk.id(), + msg: format!( + "failed to find column_name {:?} in results of tag_values", + column_name + ), + })?; + + Ok(Some(values)) } Self::ParquetFile { .. } => { unimplemented!("parquet file not implemented for column_values")