use super::{
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
use data_types::chunk_metadata::ChunkAddr;
use data_types::{
chunk_metadata::{ChunkId, ChunkOrder},
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use internal_types::access::AccessRecorder;
use iox_object_store::ParquetFilePath;
use mutable_buffer::snapshot::ChunkSnapshot;
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use partition_metadata::TableSummary;
use predicate::{Predicate, PredicateMatch};
use query::exec::IOxSessionContext;
use query::QueryChunkError;
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
use read_buffer::RBChunk;
use schema::InfluxColumnType;
use schema::{selection::Selection, sort::SortKey, Schema};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
use time::Time;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Mutable Buffer Chunk Error: {}", source))]
MutableBufferChunk {
source: mutable_buffer::snapshot::Error,
#[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, source))]
ReadBufferChunkError {
source: read_buffer::Error,
chunk_id: ChunkId,
#[snafu(display("Read Buffer Error in chunk {}: {}", chunk_id, msg))]
ReadBufferError { chunk_id: ChunkId, msg: String },
#[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))]
ParquetFileChunkError {
source: parquet_file::chunk::Error,
chunk_id: ChunkId,
#[snafu(display("Internal error restricting schema: {}", source))]
InternalSelectingSchema { source: schema::Error },
#[snafu(display("Predicate conversion error: {}", source))]
PredicateConversion { source: super::pred::Error },
"Internal error: mutable buffer does not support predicate pushdown, but got: {:?}",
InternalPredicateNotSupported { predicate: Predicate },
#[snafu(display("internal error creating plan: {}", source))]
InternalPlanCreation {
source: datafusion::error::DataFusionError,
#[snafu(display("arrow conversion error: {}", source))]
ArrowConversion { source: arrow::error::ArrowError },
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A IOx DatabaseChunk can come from one of three places:
/// MutableBuffer, ReadBuffer, or a ParquetFile
pub struct DbChunk {
addr: ChunkAddr,
access_recorder: AccessRecorder,
state: State,
meta: Arc<ChunkMetadata>,
time_of_first_write: Time,
time_of_last_write: Time,
order: ChunkOrder,
enum State {
MutableBuffer { chunk: Arc<ChunkSnapshot> },
ReadBuffer { chunk: Arc<RBChunk> },
ParquetFile { chunk: Arc<ParquetChunk> },
impl State {
fn state_name(&self) -> &'static str {
match self {
State::MutableBuffer { .. } => "Mutable Buffer",
State::ReadBuffer { .. } => "Read Buffer",
State::ParquetFile { .. } => "Parquet",
impl DbChunk {
/// Create a DBChunk snapshot of the catalog chunk
pub fn snapshot(chunk: &super::catalog::chunk::CatalogChunk) -> Arc<Self> {
let addr = chunk.addr().clone();
use super::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr};
let (state, meta) = match chunk.stage() {
ChunkStage::Open {
} => {
let (snapshot, just_cached) = mb_chunk.snapshot();
// the snapshot might be cached, so we need to update the chunk metrics
if just_cached {
let state = State::MutableBuffer {
chunk: Arc::clone(&snapshot),
let meta = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()),
schema: snapshot.full_schema(),
delete_predicates: vec![], // open chunk does not have delete predicate
time_of_first_write: *time_of_first_write,
time_of_last_write: *time_of_last_write,
sort_key: None,
(state, Arc::new(meta))
ChunkStage::Frozen {
} => {
let state = match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => State::MutableBuffer {
chunk: Arc::clone(snapshot),
ChunkStageFrozenRepr::ReadBuffer(repr) => State::ReadBuffer {
chunk: Arc::clone(repr),
(state, Arc::clone(meta))
ChunkStage::Persisted {
} => {
let state = if let Some(read_buffer) = &read_buffer {
State::ReadBuffer {
chunk: Arc::clone(read_buffer),
} else {
State::ParquetFile {
chunk: Arc::clone(parquet),
(state, Arc::clone(meta))
Arc::new(Self {
access_recorder: chunk.access_recorder().clone(),
time_of_first_write: chunk.time_of_first_write(),
time_of_last_write: chunk.time_of_last_write(),
order: chunk.order(),
/// Return the snapshot of the chunk with type ParquetFile
/// This function should be only invoked when you know your chunk
/// is ParquetFile type whose state is Persisted. The
/// reason we have this function is because the above snapshot
/// function always returns the read buffer one for the same state
pub fn parquet_file_snapshot(chunk: &super::catalog::chunk::CatalogChunk) -> Arc<Self> {
use super::catalog::chunk::ChunkStage;
let (state, meta) = match chunk.stage() {
ChunkStage::Persisted { parquet, meta, .. } => {
let chunk = Arc::clone(parquet);
let state = State::ParquetFile { chunk };
(state, Arc::clone(meta))
_ => {
panic!("Internal error: This chunk's stage is not Persisted");
Arc::new(Self {
addr: chunk.addr().clone(),
access_recorder: chunk.access_recorder().clone(),
time_of_first_write: chunk.time_of_first_write(),
time_of_last_write: chunk.time_of_last_write(),
order: chunk.order(),
/// Return the Path in ObjectStorage where this chunk is
/// persisted, if any
pub fn object_store_path(&self) -> Option<&ParquetFilePath> {
match &self.state {
State::ParquetFile { chunk } => Some(chunk.path()),
_ => None,
/// Returns the contained `ParquetChunk`, if this chunk is stored as parquet
pub fn parquet_chunk(&self) -> Option<&Arc<ParquetChunk>> {
match &self.state {
State::ParquetFile { chunk } => Some(chunk),
_ => None,
/// Return the address of this chunk
pub fn addr(&self) -> &ChunkAddr {
/// Return the name of the table in this chunk
pub fn table_name(&self) -> &Arc<str> {
pub fn time_of_first_write(&self) -> Time {
pub fn time_of_last_write(&self) -> Time {
/// NOTE: valid Read Buffer predicates are not guaranteed to be applicable
/// to an arbitrary Read Buffer chunk, because the applicability of a
/// predicate depends on the schema of the chunk. Callers should validate
/// predicates against chunks they are to be executed against using
/// `read_buffer::Chunk::validate_predicate`
fn to_rub_negated_predicates(
delete_predicates: &[Arc<Predicate>],
) -> Result<Vec<read_buffer::Predicate>> {
let mut rub_preds: Vec<read_buffer::Predicate> = vec![];
for pred in delete_predicates {
let rub_pred = to_read_buffer_predicate(pred).context(PredicateConversionSnafu)?;
debug!(?rub_preds, "RUB delete predicates");
/// Return true if any of the fields called for in the `predicate`
/// contain at least 1 null value. Returns false ONLY if all
/// fields that pass `predicate` are entirely non null
fn fields_have_nulls(&self, predicate: &Predicate) -> bool {
self.meta.schema.iter().any(|(influx_column_type, field)| {
if matches!(influx_column_type, Some(InfluxColumnType::Field(_)))
&& predicate.should_include_field(field.name())
match self
.and_then(|column_summary| column_summary.null_count())
Some(null_count) => {
// only if this is false can we return false
null_count > 0
None => {
// don't know the stats for this column, so assume there can be nulls
} else {
// not a field column
impl QueryChunk for DbChunk {
fn id(&self) -> ChunkId {
fn addr(&self) -> ChunkAddr {
fn table_name(&self) -> &str {
fn may_contain_pk_duplicates(&self) -> bool {
// Assume that only data in the MUB can contain duplicates
// within itself as it has the raw incoming stream of writes.
// All other types of chunks are deduplicated as part of
// of the reorganization plan run as part of their creation
matches!(self.state, State::MutableBuffer { .. })
fn apply_predicate_to_metadata(
predicate: &Predicate,
) -> Result<PredicateMatch, QueryChunkError> {
let pred_result = match &self.state {
State::MutableBuffer { chunk, .. } => {
if predicate.has_exprs() || chunk.has_timerange(&predicate.range) {
// TODO some more work to figure out if we
// definite have / do not have result
} else {
State::ReadBuffer { chunk, .. } => {
// If the predicate is not supported by the Read Buffer then
// it can't determine if the predicate can be answered by
// meta-data only. A future improvement could be to apply this
// logic to chunk meta-data without involving the backing
// execution engine.
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "Cannot push down predicate to RUB, will fully scan");
return Ok(PredicateMatch::Unknown);
// TODO: currently this will provide an exact answer, which may
// be expensive in pathological cases. It might make more
// sense to implement logic that works to rule out chunks based
// on meta-data only. This should be possible without needing to
// know the execution engine the chunk is held in.
if chunk.satisfies_predicate(&rb_predicate) {
// if any of the fields referred to in the
// predicate has nulls, don't know without more
// work if the rows that matched had non null values
if self.fields_have_nulls(predicate) {
} else {
} else {
State::ParquetFile { chunk, .. } => {
if predicate.has_exprs() || chunk.has_timerange(predicate.range.as_ref()) {
} else {
fn read_filter(
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
// Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain
debug!(table=?self.table_name(), chunk_id=%self.addr().chunk_id, ?predicate, ?selection, "read_filter called");
let delete_predicates: Vec<_> = self
.map(|pred| Arc::new(pred.as_ref().clone().into()))
ctx.set_metadata("delete_predicates", delete_predicates.len() as i64);
// merge the negated delete predicates into the select predicate
let mut pred_with_deleted_exprs = predicate.clone();
debug!(?pred_with_deleted_exprs, "Merged negated predicate");
ctx.set_metadata("storage", self.state.state_name());
ctx.set_metadata("projection", format!("{}", selection));
match &self.state {
State::MutableBuffer { chunk, .. } => {
let batch = chunk
State::ReadBuffer { chunk, .. } => {
// Only apply pushdownable predicates
let rb_predicate = chunk
// A predicate unsupported by the Read Buffer or against
// this chunk's schema is replaced with a default empty
// predicate.
debug!(?rb_predicate, "RUB predicate");
ctx.set_metadata("predicate", format!("{}", &rb_predicate));
// combine all delete expressions to RUB's negated ones
let negated_delete_exprs = Self::to_rub_negated_predicates(&delete_predicates)?
// Any delete predicates unsupported by the Read Buffer will be elided.
.filter_map(|p| chunk.validate_predicate(p).ok())
"Negated Predicate pushed down to RUB"
let read_results = chunk
.read_filter(rb_predicate, selection, negated_delete_exprs)
.context(ReadBufferChunkSnafu {
chunk_id: self.id(),
let schema =
.context(ReadBufferChunkSnafu {
chunk_id: self.id(),
State::ParquetFile { chunk, .. } => {
ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs));
.read_filter(&pred_with_deleted_exprs, selection)
.context(ParquetFileChunkSnafu {
chunk_id: self.id(),
.map_err(|e| Box::new(e) as _)
fn column_names(
mut ctx: IOxSessionContext,
predicate: &Predicate,
columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
ctx.set_metadata("storage", self.state.state_name());
ctx.set_metadata("projection", format!("{}", columns));
ctx.set_metadata("predicate", format!("{}", &predicate));
match &self.state {
State::MutableBuffer { chunk, .. } => {
if !predicate.is_empty() {
// TODO: Support predicates
return Ok(None);
State::ReadBuffer { chunk, .. } => {
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back");
return Ok(None);
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
// TODO(edd): wire up delete predicates to be pushed down to
// the read buffer.
let names = chunk
.column_names(rb_predicate, vec![], columns, BTreeSet::new())
.context(ReadBufferChunkSnafu {
chunk_id: self.id(),
ctx.set_metadata("output_values", names.len() as i64);
State::ParquetFile { chunk, .. } => {
if !predicate.is_empty() {
// TODO: Support predicates when MB supports it
return Ok(None);
fn column_values(
mut ctx: IOxSessionContext,
column_name: &str,
predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
ctx.set_metadata("storage", self.state.state_name());
ctx.set_metadata("column_name", column_name.to_string());
ctx.set_metadata("predicate", format!("{}", &predicate));
match &self.state {
State::MutableBuffer { .. } => {
// There is no advantage to manually implementing this
// vs just letting DataFusion do its thing
State::ReadBuffer { chunk, .. } => {
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back");
return Ok(None);
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
let mut values = chunk
.context(ReadBufferChunkSnafu {
chunk_id: self.id(),
// The InfluxRPC frontend only supports getting column values
// for one column at a time (this is a restriction on the Influx
// Read gRPC API too). However, the Read Buffer support multiple
// columns and will return a map - we just need to pull the
// column out to get the set of values.
let values = values
.with_context(|| ReadBufferSnafu {
chunk_id: self.id(),
msg: format!(
"failed to find column_name {:?} in results of tag_values",
ctx.set_metadata("output_values", values.len() as i64);
State::ParquetFile { .. } => {
// Since DataFusion can read Parquet, there is no advantage to
// manually implementing this vs just letting DataFusion do its thing
fn chunk_type(&self) -> &str {
match &self.state {
State::MutableBuffer { .. } => "MUB",
State::ReadBuffer { .. } => "RUB",
State::ParquetFile { .. } => "OS",
fn order(&self) -> ChunkOrder {
impl QueryChunkMeta for DbChunk {
fn summary(&self) -> Option<&TableSummary> {
fn schema(&self) -> Arc<Schema> {
fn sort_key(&self) -> Option<&SortKey> {
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = &self.meta.delete_predicates;
debug!(?pred, "Delete predicate in DbChunk");
mod tests {
use super::*;
use crate::{
catalog::chunk::{CatalogChunk, ChunkStage},
use data_types::chunk_metadata::ChunkStorage;
use std::time::Duration;
async fn test_chunk_access(chunk: &CatalogChunk, time: Arc<time::MockProvider>) {
let m1 = chunk.access_recorder().get_metrics();
let snapshot = DbChunk::snapshot(chunk);
let m2 = chunk.access_recorder().get_metrics();
let t1 = time.inc(Duration::from_secs(1));
let m3 = chunk.access_recorder().get_metrics();
let t2 = time.inc(Duration::from_secs(1));
let column_names = snapshot
let m4 = chunk.access_recorder().get_metrics();
let t3 = time.inc(Duration::from_secs(1));
let column_values = snapshot
.column_values(IOxSessionContext::default(), "tag", &Default::default())
let m5 = chunk.access_recorder().get_metrics();
// Snapshot shouldn't count as an access
assert_eq!(m1, m2);
// Query should count as an access
assert_eq!(m2.count + 1, m3.count);
assert!(m2.last_access < m3.last_access);
assert_eq!(m3.last_access, t1);
// If column names successful should record access
match column_names {
true => {
assert_eq!(m3.count + 1, m4.count);
assert_eq!(m4.last_access, t2);
false => {
assert_eq!(m3, m4);
// If column values successful should record access
match column_values {
true => {
assert_eq!(m4.count + 1, m5.count);
assert!(m4.last_access < m5.last_access);
assert_eq!(m5.last_access, t3);
false => {
assert_eq!(m4, m5);
async fn mub_records_access() {
let (db, time) = make_db_time().await;
write_lp(&db, "cpu,tag=1 bar=1 1");
let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1);
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert_eq!(chunk.storage().1, ChunkStorage::OpenMutableBuffer);
test_chunk_access(&chunk, time).await;
async fn rub_records_access() {
let (db, time) = make_db_time().await;
write_lp(&db, "cpu,tag=1 bar=1 1");
db.compact_partition("cpu", "1970-01-01T00").await.unwrap();
let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1);
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert_eq!(chunk.storage().1, ChunkStorage::ReadBuffer);
test_chunk_access(&chunk, time).await
async fn parquet_records_access() {
let (db, time) = make_db_time().await;
let t0 = time.inc(Duration::from_secs(324));
write_lp(&db, "cpu,tag=1 bar=1 1");
let id = db
.persist_partition("cpu", "1970-01-01T00", true)
db.unload_read_buffer("cpu", "1970-01-01T00", id).unwrap();
let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1);
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly);
let first_write = chunk.time_of_first_write();
let last_write = chunk.time_of_last_write();
assert_eq!(first_write, t0);
assert_eq!(last_write, t0);
test_chunk_access(&chunk, time).await
async fn parquet_snapshot() {
let (db, time) = make_db_time().await;
let w0 = time.inc(Duration::from_secs(10));
write_lp(&db, "cpu,tag=1 bar=1 1");
let w1 = time.inc(Duration::from_secs(10));
write_lp(&db, "cpu,tag=2 bar=2 2");
db.persist_partition("cpu", "1970-01-01T00", true)
let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1);
let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read();
assert!(matches!(chunk.stage(), ChunkStage::Persisted { .. }));
let snapshot = DbChunk::parquet_file_snapshot(&chunk);
let first_write = snapshot.time_of_first_write();
let last_write = snapshot.time_of_last_write();
assert_eq!(w0, first_write);
assert_eq!(w1, last_write);