diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 7e94b19a30..9f6c9513b9 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -185,9 +185,19 @@ impl Chunk { select_columns: Selection<'_>, negated_predicates: Vec, ) -> Result { - self.table + debug!(%predicate, ?select_columns, ?negated_predicates, "read_filter called"); + let now = std::time::Instant::now(); + let result = self + .table .read_filter(&select_columns, &predicate, negated_predicates.as_slice()) - .context(TableError) + .context(TableError); + + let row_groups = result + .as_ref() + .map(|result| result.row_groups()) + .unwrap_or(0); + debug!(elapsed=?now.elapsed(), succeeded=result.is_ok(), ?row_groups, "read_filter completed"); + result } /// Returns an iterable collection of data in group columns and aggregate diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 17eba44af2..00caaa0af5 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -1409,6 +1409,7 @@ impl Iterator for RowIDsIterator<'_> { } /// Statistics about the composition of a column +#[derive(Debug)] pub(crate) struct Statistics { pub enc_type: Cow<'static, str>, // The encoding type pub log_data_type: &'static str, // The logical data-type diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 0a81ba4b3e..d8c52ee84e 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -9,6 +9,7 @@ use std::{ use hashbrown::{hash_map, HashMap}; use itertools::Itertools; +use observability_deps::tracing::{debug, trace}; use snafu::{ResultExt, Snafu}; use crate::column::{self, cmp::Operator, Column, RowIDs, RowIDsOption}; @@ -259,6 +260,8 @@ impl RowGroup { predicate: &Predicate, negated_predicates: &[Predicate], ) -> ReadFilterResult<'_> { + trace!(%predicate, ?columns, ?negated_predicates, row_group_total_rows=?self.rows(), "read_filter called"); + let select_columns = self.meta.schema_for_column_names(columns); assert_eq!(select_columns.len(), columns.len()); @@ -268,12 +271,17 @@ impl RowGroup { }; // apply predicate to determine candidate rows. + let now = std::time::Instant::now(); let row_ids = self.row_ids_from_predicate(predicate); + trace!(elapsed=?now.elapsed(), "row_ids_from_predicate completed"); // identify rows that have been marked as deleted. + let now = std::time::Instant::now(); let deleted_row_ids = self.row_ids_from_delete_predicates(negated_predicates); + trace!(elapsed=?now.elapsed(), "row_ids_from_delete_predicates completed"); // determine final candidate rows + let now = std::time::Instant::now(); let final_row_ids = match (row_ids, deleted_row_ids) { // no matching rows (RowIDsOption::None(_), _) => RowIDsOption::new_none(), @@ -309,8 +317,16 @@ impl RowGroup { } } }; + trace!(elapsed=?now.elapsed(), rows=?match &final_row_ids{ + RowIDsOption::None(_) => 0, + RowIDsOption::Some(row_ids) => row_ids.len(), + RowIDsOption::All(_) => self.rows() as usize, + }, "read_filter candidate rows identified"); + let now = std::time::Instant::now(); let col_data = self.materialise_rows(&schema, final_row_ids); + trace!(elapsed=?now.elapsed(), "read_filter materialised rows"); + ReadFilterResult { schema, data: col_data, @@ -352,6 +368,7 @@ impl RowGroup { // Determines the set of row ids that satisfy the provided predicate. fn row_ids_from_predicate(&self, predicate: &Predicate) -> RowIDsOption { + trace!(%predicate, "row_ids_from_predicate called"); // TODO(edd): perf - potentially pool this so we can re-use it once rows // have been materialised and it's no longer needed. Initialise a bitmap // RowIDs because it's like that set operations will be necessary. @@ -400,13 +417,21 @@ impl RowGroup { for expr in predicate.iter() { // N.B column should always exist because validation of predicates // should happen at the `Table` level. - let (_, col) = self.column_name_and_column(expr.column()); + let (col_name, col) = self.column_name_and_column(expr.column()); // Explanation of how this buffer pattern works. The idea is that // the buffer should be returned to the caller so it can be re-used // on other columns. Each call to `row_ids_filter` returns the // buffer back enabling it to be re-used. - match col.row_ids_filter(&expr.op, &expr.literal_as_value(), dst) { + let now = std::time::Instant::now(); + let row_ids = col.row_ids_filter(&expr.op, &expr.literal_as_value(), dst); + trace!(elapsed=?now.elapsed(), rows=?match &row_ids{ + RowIDsOption::None(_) => 0, + RowIDsOption::Some(row_ids) => row_ids.len(), + RowIDsOption::All(_) => self.rows() as usize, + }, column=%col_name, column_stats=?col.storage_stats(), column_range=?col.column_range(), "Row IDs filtered for column"); + + match row_ids { // No rows will be returned for the `RowGroup` because this // column does not match any rows. RowIDsOption::None(_dst) => return RowIDsOption::None(_dst), @@ -414,10 +439,15 @@ impl RowGroup { // Intersect the row ids found at this column with all those // found on other column predicates. RowIDsOption::Some(row_ids) => { + let now = std::time::Instant::now(); if result_row_ids.is_empty() { - result_row_ids.union(&row_ids) + let row_ids = result_row_ids.union(&row_ids); + trace!(elapsed=?now.elapsed(), "unioning row IDs"); + row_ids } else { - result_row_ids.intersect(&row_ids); + let row_ids = result_row_ids.intersect(&row_ids); + trace!(elapsed=?now.elapsed(), "intersecting row IDs"); + row_ids } // before evaluating the next expression check if we have @@ -448,11 +478,18 @@ impl RowGroup { // column at once. fn row_ids_from_time_range(&self, time_range: &[BinaryExpr], dst: RowIDs) -> RowIDsOption { assert_eq!(time_range.len(), 2); - self.time_column().row_ids_filter_range( + let now = std::time::Instant::now(); + let row_ids = self.time_column().row_ids_filter_range( &(time_range[0].op, time_range[0].literal_as_value()), // min time &(time_range[1].op, time_range[1].literal_as_value()), // max time dst, - ) + ); + trace!(elapsed=?now.elapsed(), rows=?match &row_ids{ + RowIDsOption::None(_) => 0, + RowIDsOption::Some(row_ids) => row_ids.len(), + RowIDsOption::All(_) => self.rows() as usize, + }, "row_ids_from_time_range completed"); + row_ids } // Determines the set of row ids that satisfy *any* of the provided @@ -460,6 +497,7 @@ impl RowGroup { // a row must satisfy all expressions within a single predicate, but need // not satisfy more than one of the predicates. fn row_ids_from_delete_predicates(&self, predicates: &[Predicate]) -> RowIDsOption { + trace!(predicates=?predicates, "row_ids_from_delete_predicates called"); if predicates.is_empty() { return RowIDsOption::new_none(); } @@ -1404,6 +1442,18 @@ impl Predicate { } } +impl Display for &Predicate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for (i, expr) in self.0.iter().enumerate() { + expr.fmt(f)?; + if i < self.0.len() - 1 { + write!(f, " AND ")?; + } + } + Ok(()) + } +} + impl From> for Predicate { fn from(arr: Vec) -> Self { Self(arr) @@ -1505,6 +1555,12 @@ impl BinaryExpr { } } +impl Display for BinaryExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {} {:?}", self.column(), self.op, self.value) + } +} + impl From<(&str, &str, &str)> for BinaryExpr { fn from(expr: (&str, &str, &str)) -> Self { Self::new( @@ -1830,12 +1886,26 @@ impl ReadFilterResult<'_> { pub fn schema(&self) -> &ResultSchema { &self.schema } + + // Number of rows in result + pub fn num_rows(&self) -> usize { + match self.is_empty() { + true => 0, + false => self.data[0].len(), + } + } + + // Number of columns in result + pub fn num_columns(&self) -> usize { + self.data.len() + } } impl TryFrom> for RecordBatch { type Error = Error; fn try_from(result: ReadFilterResult<'_>) -> Result { + let now = std::time::Instant::now(); let schema = ::schema::Schema::try_from(result.schema()) .map_err(|source| Error::SchemaConversion { source })?; @@ -1878,6 +1948,7 @@ impl TryFrom> for RecordBatch { .collect::, _>>()?; let arrow_schema: arrow::datatypes::SchemaRef = schema.into(); + debug!(elapsed=?now.elapsed(), "result converted to record batch"); // try_new only returns an error if the schema is invalid or the number // of rows on columns differ. We have full control over both so there diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 86aee478e8..6e0a4e2e3f 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -904,7 +904,7 @@ impl ReadFilterResults { self.row_groups.is_empty() } - pub fn len(&self) -> usize { + pub fn row_groups(&self) -> usize { self.row_groups.len() } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6522d8b884..bd0bd7b531 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -307,7 +307,7 @@ impl QueryChunk for DbChunk { let rb_predicate = match to_read_buffer_predicate(predicate) { Ok(rb_predicate) => rb_predicate, Err(e) => { - debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back"); + debug!(?predicate, %e, "Cannot push down predicate to RUB, will fully scan"); return Ok(PredicateMatch::Unknown); } }; @@ -350,7 +350,7 @@ impl QueryChunk for DbChunk { // Predicate is not required to be applied for correctness. We only pushed it down // when possible for performance gain - debug!(?predicate, "Input Predicate to read_filter"); + debug!(table=?self.table_name(), chunk_id=%self.addr().chunk_id, ?predicate, ?selection, "read_filter called"); self.access_recorder.record_access(); let delete_predicates: Vec<_> = self @@ -363,10 +363,7 @@ impl QueryChunk for DbChunk { // merge the negated delete predicates into the select predicate let mut pred_with_deleted_exprs = predicate.clone(); pred_with_deleted_exprs.merge_delete_predicates(&delete_predicates); - debug!( - ?pred_with_deleted_exprs, - "Input Predicate plus deleted ranges and deleted predicates" - ); + debug!(?pred_with_deleted_exprs, "Merged negated predicate"); match &self.state { State::MutableBuffer { chunk, .. } => { @@ -382,7 +379,7 @@ impl QueryChunk for DbChunk { // predicate. .validate_predicate(to_read_buffer_predicate(predicate).unwrap_or_default()) .unwrap_or_default(); - debug!(?rb_predicate, "Predicate pushed down to RUB"); + debug!(?rb_predicate, "RUB predicate"); // combine all delete expressions to RUB's negated ones let negated_delete_exprs = Self::to_rub_negated_predicates(&delete_predicates)?