Merge pull request #3280 from influxdata/er/refactor/debug_rub_query

refactor: more trace logging in RUB
pull/24376/head
kodiakhq[bot] 2021-12-02 20:20:35 +00:00 committed by GitHub
commit f4cedbd139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 16 deletions

View File

@ -185,9 +185,19 @@ impl Chunk {
select_columns: Selection<'_>,
negated_predicates: Vec<Predicate>,
) -> Result<table::ReadFilterResults> {
self.table
debug!(%predicate, ?select_columns, ?negated_predicates, "read_filter called");
let now = std::time::Instant::now();
let result = self
.table
.read_filter(&select_columns, &predicate, negated_predicates.as_slice())
.context(TableError)
.context(TableError);
let row_groups = result
.as_ref()
.map(|result| result.row_groups())
.unwrap_or(0);
debug!(elapsed=?now.elapsed(), succeeded=result.is_ok(), ?row_groups, "read_filter completed");
result
}
/// Returns an iterable collection of data in group columns and aggregate

View File

@ -1409,6 +1409,7 @@ impl Iterator for RowIDsIterator<'_> {
}
/// Statistics about the composition of a column
#[derive(Debug)]
pub(crate) struct Statistics {
pub enc_type: Cow<'static, str>, // The encoding type
pub log_data_type: &'static str, // The logical data-type

View File

@ -9,6 +9,7 @@ use std::{
use hashbrown::{hash_map, HashMap};
use itertools::Itertools;
use observability_deps::tracing::{debug, trace};
use snafu::{ResultExt, Snafu};
use crate::column::{self, cmp::Operator, Column, RowIDs, RowIDsOption};
@ -259,6 +260,8 @@ impl RowGroup {
predicate: &Predicate,
negated_predicates: &[Predicate],
) -> ReadFilterResult<'_> {
trace!(%predicate, ?columns, ?negated_predicates, row_group_total_rows=?self.rows(), "read_filter called");
let select_columns = self.meta.schema_for_column_names(columns);
assert_eq!(select_columns.len(), columns.len());
@ -268,12 +271,17 @@ impl RowGroup {
};
// apply predicate to determine candidate rows.
let now = std::time::Instant::now();
let row_ids = self.row_ids_from_predicate(predicate);
trace!(elapsed=?now.elapsed(), "row_ids_from_predicate completed");
// identify rows that have been marked as deleted.
let now = std::time::Instant::now();
let deleted_row_ids = self.row_ids_from_delete_predicates(negated_predicates);
trace!(elapsed=?now.elapsed(), "row_ids_from_delete_predicates completed");
// determine final candidate rows
let now = std::time::Instant::now();
let final_row_ids = match (row_ids, deleted_row_ids) {
// no matching rows
(RowIDsOption::None(_), _) => RowIDsOption::new_none(),
@ -309,8 +317,16 @@ impl RowGroup {
}
}
};
trace!(elapsed=?now.elapsed(), rows=?match &final_row_ids{
RowIDsOption::None(_) => 0,
RowIDsOption::Some(row_ids) => row_ids.len(),
RowIDsOption::All(_) => self.rows() as usize,
}, "read_filter candidate rows identified");
let now = std::time::Instant::now();
let col_data = self.materialise_rows(&schema, final_row_ids);
trace!(elapsed=?now.elapsed(), "read_filter materialised rows");
ReadFilterResult {
schema,
data: col_data,
@ -352,6 +368,7 @@ impl RowGroup {
// Determines the set of row ids that satisfy the provided predicate.
fn row_ids_from_predicate(&self, predicate: &Predicate) -> RowIDsOption {
trace!(%predicate, "row_ids_from_predicate called");
// TODO(edd): perf - potentially pool this so we can re-use it once rows
// have been materialised and it's no longer needed. Initialise a bitmap
// RowIDs because it's like that set operations will be necessary.
@ -400,13 +417,21 @@ impl RowGroup {
for expr in predicate.iter() {
// N.B column should always exist because validation of predicates
// should happen at the `Table` level.
let (_, col) = self.column_name_and_column(expr.column());
let (col_name, col) = self.column_name_and_column(expr.column());
// Explanation of how this buffer pattern works. The idea is that
// the buffer should be returned to the caller so it can be re-used
// on other columns. Each call to `row_ids_filter` returns the
// buffer back enabling it to be re-used.
match col.row_ids_filter(&expr.op, &expr.literal_as_value(), dst) {
let now = std::time::Instant::now();
let row_ids = col.row_ids_filter(&expr.op, &expr.literal_as_value(), dst);
trace!(elapsed=?now.elapsed(), rows=?match &row_ids{
RowIDsOption::None(_) => 0,
RowIDsOption::Some(row_ids) => row_ids.len(),
RowIDsOption::All(_) => self.rows() as usize,
}, column=%col_name, column_stats=?col.storage_stats(), column_range=?col.column_range(), "Row IDs filtered for column");
match row_ids {
// No rows will be returned for the `RowGroup` because this
// column does not match any rows.
RowIDsOption::None(_dst) => return RowIDsOption::None(_dst),
@ -414,10 +439,15 @@ impl RowGroup {
// Intersect the row ids found at this column with all those
// found on other column predicates.
RowIDsOption::Some(row_ids) => {
let now = std::time::Instant::now();
if result_row_ids.is_empty() {
result_row_ids.union(&row_ids)
let row_ids = result_row_ids.union(&row_ids);
trace!(elapsed=?now.elapsed(), "unioning row IDs");
row_ids
} else {
result_row_ids.intersect(&row_ids);
let row_ids = result_row_ids.intersect(&row_ids);
trace!(elapsed=?now.elapsed(), "intersecting row IDs");
row_ids
}
// before evaluating the next expression check if we have
@ -448,11 +478,18 @@ impl RowGroup {
// column at once.
fn row_ids_from_time_range(&self, time_range: &[BinaryExpr], dst: RowIDs) -> RowIDsOption {
assert_eq!(time_range.len(), 2);
self.time_column().row_ids_filter_range(
let now = std::time::Instant::now();
let row_ids = self.time_column().row_ids_filter_range(
&(time_range[0].op, time_range[0].literal_as_value()), // min time
&(time_range[1].op, time_range[1].literal_as_value()), // max time
dst,
)
);
trace!(elapsed=?now.elapsed(), rows=?match &row_ids{
RowIDsOption::None(_) => 0,
RowIDsOption::Some(row_ids) => row_ids.len(),
RowIDsOption::All(_) => self.rows() as usize,
}, "row_ids_from_time_range completed");
row_ids
}
// Determines the set of row ids that satisfy *any* of the provided
@ -460,6 +497,7 @@ impl RowGroup {
// a row must satisfy all expressions within a single predicate, but need
// not satisfy more than one of the predicates.
fn row_ids_from_delete_predicates(&self, predicates: &[Predicate]) -> RowIDsOption {
trace!(predicates=?predicates, "row_ids_from_delete_predicates called");
if predicates.is_empty() {
return RowIDsOption::new_none();
}
@ -1404,6 +1442,18 @@ impl Predicate {
}
}
impl Display for &Predicate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (i, expr) in self.0.iter().enumerate() {
expr.fmt(f)?;
if i < self.0.len() - 1 {
write!(f, " AND ")?;
}
}
Ok(())
}
}
impl From<Vec<BinaryExpr>> for Predicate {
fn from(arr: Vec<BinaryExpr>) -> Self {
Self(arr)
@ -1505,6 +1555,12 @@ impl BinaryExpr {
}
}
impl Display for BinaryExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} {} {:?}", self.column(), self.op, self.value)
}
}
impl From<(&str, &str, &str)> for BinaryExpr {
fn from(expr: (&str, &str, &str)) -> Self {
Self::new(
@ -1830,12 +1886,26 @@ impl ReadFilterResult<'_> {
pub fn schema(&self) -> &ResultSchema {
&self.schema
}
// Number of rows in result
pub fn num_rows(&self) -> usize {
match self.is_empty() {
true => 0,
false => self.data[0].len(),
}
}
// Number of columns in result
pub fn num_columns(&self) -> usize {
self.data.len()
}
}
impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
type Error = Error;
fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> {
let now = std::time::Instant::now();
let schema = ::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaConversion { source })?;
@ -1878,6 +1948,7 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
.collect::<Result<Vec<_>, _>>()?;
let arrow_schema: arrow::datatypes::SchemaRef = schema.into();
debug!(elapsed=?now.elapsed(), "result converted to record batch");
// try_new only returns an error if the schema is invalid or the number
// of rows on columns differ. We have full control over both so there

View File

@ -904,7 +904,7 @@ impl ReadFilterResults {
self.row_groups.is_empty()
}
pub fn len(&self) -> usize {
pub fn row_groups(&self) -> usize {
self.row_groups.len()
}

View File

@ -307,7 +307,7 @@ impl QueryChunk for DbChunk {
let rb_predicate = match to_read_buffer_predicate(predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back");
debug!(?predicate, %e, "Cannot push down predicate to RUB, will fully scan");
return Ok(PredicateMatch::Unknown);
}
};
@ -350,7 +350,7 @@ impl QueryChunk for DbChunk {
// Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain
debug!(?predicate, "Input Predicate to read_filter");
debug!(table=?self.table_name(), chunk_id=%self.addr().chunk_id, ?predicate, ?selection, "read_filter called");
self.access_recorder.record_access();
let delete_predicates: Vec<_> = self
@ -363,10 +363,7 @@ impl QueryChunk for DbChunk {
// merge the negated delete predicates into the select predicate
let mut pred_with_deleted_exprs = predicate.clone();
pred_with_deleted_exprs.merge_delete_predicates(&delete_predicates);
debug!(
?pred_with_deleted_exprs,
"Input Predicate plus deleted ranges and deleted predicates"
);
debug!(?pred_with_deleted_exprs, "Merged negated predicate");
match &self.state {
State::MutableBuffer { chunk, .. } => {
@ -382,7 +379,7 @@ impl QueryChunk for DbChunk {
// predicate.
.validate_predicate(to_read_buffer_predicate(predicate).unwrap_or_default())
.unwrap_or_default();
debug!(?rb_predicate, "Predicate pushed down to RUB");
debug!(?rb_predicate, "RUB predicate");
// combine all delete expressions to RUB's negated ones
let negated_delete_exprs = Self::to_rub_negated_predicates(&delete_predicates)?