feat: split "pruned" metric into "early" and "late" (#5645)
* feat: split "pruned" metric into "early" and "late" * docs: improve Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * docs: explain `PruningMetrics` * test: try to test pruning Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>pull/24376/head
parent
f7b6f81fe1
commit
513fdf1e26
|
@ -251,6 +251,13 @@ mod tests {
|
|||
.flag_for_delete() // will be pruned because of soft delete
|
||||
.await;
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol("cpu,host=z load=0 0")
|
||||
.with_max_seq(2)
|
||||
.with_min_time(22)
|
||||
.with_max_time(22);
|
||||
partition_cpu_a_1.create_parquet_file(builder).await;
|
||||
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol("cpu,host=a load=3 33")
|
||||
.with_max_seq(3)
|
||||
|
@ -318,7 +325,7 @@ mod tests {
|
|||
|
||||
assert_query_with_span_ctx(
|
||||
&querier_namespace,
|
||||
"SELECT * FROM cpu ORDER BY host,time",
|
||||
"SELECT * FROM cpu WHERE host != 'z' ORDER BY host,time",
|
||||
&[
|
||||
"+-----+------+------+--------------------------------+",
|
||||
"| foo | host | load | time |",
|
||||
|
@ -348,7 +355,7 @@ mod tests {
|
|||
reporter
|
||||
.metric("query_pruner_chunks")
|
||||
.unwrap()
|
||||
.observation(&[("result", "pruned")])
|
||||
.observation(&[("result", "pruned_early")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
|
@ -356,7 +363,7 @@ mod tests {
|
|||
reporter
|
||||
.metric("query_pruner_rows")
|
||||
.unwrap()
|
||||
.observation(&[("result", "pruned")])
|
||||
.observation(&[("result", "pruned_early")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
|
@ -364,7 +371,31 @@ mod tests {
|
|||
reporter
|
||||
.metric("query_pruner_bytes")
|
||||
.unwrap()
|
||||
.observation(&[("result", "pruned")])
|
||||
.observation(&[("result", "pruned_early")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_chunks")
|
||||
.unwrap()
|
||||
.observation(&[("result", "pruned_late")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_rows")
|
||||
.unwrap()
|
||||
.observation(&[("result", "pruned_late")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_bytes")
|
||||
.unwrap()
|
||||
.observation(&[("result", "pruned_late")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
|
@ -374,7 +405,7 @@ mod tests {
|
|||
.unwrap()
|
||||
.observation(&[("result", "not_pruned")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
&Observation::U64Counter(5),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
|
@ -382,51 +413,51 @@ mod tests {
|
|||
.unwrap()
|
||||
.observation(&[("result", "not_pruned")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_bytes")
|
||||
.unwrap()
|
||||
.observation(&[("result", "not_pruned")])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_chunks")
|
||||
.unwrap()
|
||||
.observation(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", "No expression on predicate")
|
||||
])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(4),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_rows")
|
||||
.unwrap()
|
||||
.observation(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", "No expression on predicate")
|
||||
])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(4),
|
||||
&Observation::U64Counter(5),
|
||||
);
|
||||
if let Observation::U64Counter(bytes) = reporter
|
||||
.metric("query_pruner_bytes")
|
||||
.unwrap()
|
||||
.observation(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", "No expression on predicate"),
|
||||
])
|
||||
.observation(&[("result", "not_pruned")])
|
||||
.unwrap()
|
||||
{
|
||||
assert!(*bytes > 6000, "bytes ({bytes}) must be > 6000");
|
||||
} else {
|
||||
panic!("Wrong metrics type");
|
||||
}
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_chunks")
|
||||
.unwrap()
|
||||
.observation(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", "No expression on predicate")
|
||||
])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_rows")
|
||||
.unwrap()
|
||||
.observation(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", "No expression on predicate")
|
||||
])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
assert_eq!(
|
||||
reporter
|
||||
.metric("query_pruner_bytes")
|
||||
.unwrap()
|
||||
.observation(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", "No expression on predicate")
|
||||
])
|
||||
.unwrap(),
|
||||
&Observation::U64Counter(0),
|
||||
);
|
||||
|
||||
assert_query(
|
||||
&querier_namespace,
|
||||
|
@ -458,7 +489,7 @@ mod tests {
|
|||
"| logical_plan | Projection: #cpu.foo, #cpu.host, #cpu.load, #cpu.time |",
|
||||
"| | TableScan: cpu projection=[foo, host, load, time] |",
|
||||
"| physical_plan | ProjectionExec: expr=[foo@0 as foo, host@1 as host, load@2 as load, time@3 as time] |",
|
||||
"| | IOxReadFilterNode: table_name=cpu, chunks=4 predicate=Predicate |",
|
||||
"| | IOxReadFilterNode: table_name=cpu, chunks=5 predicate=Predicate |",
|
||||
"| | |",
|
||||
"+---------------+-------------------------------------------------------------------------------------+",
|
||||
],
|
||||
|
@ -519,6 +550,7 @@ mod tests {
|
|||
"| | a | 3 | 1970-01-01T00:00:00.000000033Z |",
|
||||
"| | a | 14 | 1970-01-01T00:00:00.000010001Z |", // load has most recent value 14
|
||||
"| | b | 5 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"| | z | 0 | 1970-01-01T00:00:00Z |",
|
||||
"+-----+------+------+--------------------------------+",
|
||||
],
|
||||
)
|
||||
|
@ -543,7 +575,7 @@ mod tests {
|
|||
"| | UnionExec |",
|
||||
"| | IOxReadFilterNode: table_name=cpu, chunks=1 predicate=Predicate |",
|
||||
"| | IOxReadFilterNode: table_name=cpu, chunks=1 predicate=Predicate |",
|
||||
"| | IOxReadFilterNode: table_name=cpu, chunks=3 predicate=Predicate |",
|
||||
"| | IOxReadFilterNode: table_name=cpu, chunks=4 predicate=Predicate |",
|
||||
"| | |",
|
||||
"+---------------+-------------------------------------------------------------------------------------+",
|
||||
],
|
||||
|
|
|
@ -23,7 +23,7 @@ use std::{
|
|||
};
|
||||
use trace::span::{Span, SpanRecorder};
|
||||
|
||||
pub use self::query_access::PruneMetrics;
|
||||
pub use self::query_access::metrics::PruneMetrics;
|
||||
|
||||
mod query_access;
|
||||
mod state_reconciler;
|
||||
|
|
|
@ -1,338 +0,0 @@
|
|||
use std::{any::Any, sync::Arc};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use async_trait::async_trait;
|
||||
use datafusion::{
|
||||
datasource::{TableProvider, TableType},
|
||||
error::DataFusionError,
|
||||
execution::context::SessionState,
|
||||
logical_expr::TableProviderFilterPushDown,
|
||||
logical_plan::Expr,
|
||||
physical_plan::ExecutionPlan,
|
||||
};
|
||||
use iox_query::{
|
||||
exec::{ExecutorType, SessionContextIOxExt},
|
||||
provider::{ChunkPruner, Error as ProviderError, ProviderBuilder},
|
||||
pruning::{prune_chunks, NotPrunedReason, PruningObserver},
|
||||
QueryChunk,
|
||||
};
|
||||
use metric::U64Counter;
|
||||
use predicate::Predicate;
|
||||
use schema::Schema;
|
||||
|
||||
use crate::{chunk::QuerierChunk, ingester::IngesterChunk};
|
||||
|
||||
use super::QuerierTable;
|
||||
|
||||
#[async_trait]
|
||||
impl TableProvider for QuerierTable {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema().as_arrow()
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
TableType::Base
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
ctx: &SessionState,
|
||||
projection: &Option<Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
limit: Option<usize>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
// build provider out of all chunks
|
||||
// TODO: push down some predicates to catalog
|
||||
let iox_ctx = self.exec.new_context_from_df(ExecutorType::Query, ctx);
|
||||
|
||||
let mut builder =
|
||||
ProviderBuilder::new(self.table_name(), Arc::clone(self.schema()), iox_ctx);
|
||||
|
||||
let pruning_predicate = Predicate::default().with_pushdown_exprs(filters);
|
||||
let chunks = self
|
||||
.chunks(&pruning_predicate, ctx.child_span("querier table chunks"))
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
for chunk in chunks {
|
||||
builder = builder.add_chunk(chunk);
|
||||
}
|
||||
|
||||
let provider = match builder.build() {
|
||||
Ok(provider) => provider,
|
||||
Err(e) => panic!("unexpected error: {:?}", e),
|
||||
};
|
||||
|
||||
provider.scan(ctx, projection, filters, limit).await
|
||||
}
|
||||
|
||||
fn supports_filter_pushdown(
|
||||
&self,
|
||||
_filter: &Expr,
|
||||
) -> Result<TableProviderFilterPushDown, DataFusionError> {
|
||||
// we may apply filtering (via pruning) but can not guarantee
|
||||
// that the filter catches all row during scan
|
||||
Ok(TableProviderFilterPushDown::Inexact)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct QuerierTableChunkPruner {
|
||||
max_bytes: usize,
|
||||
metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
||||
impl QuerierTableChunkPruner {
|
||||
pub fn new(max_bytes: usize, metrics: Arc<PruneMetrics>) -> Self {
|
||||
Self { max_bytes, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkPruner for QuerierTableChunkPruner {
|
||||
fn prune_chunks(
|
||||
&self,
|
||||
_table_name: &str,
|
||||
table_schema: Arc<Schema>,
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
predicate: &Predicate,
|
||||
) -> Result<Vec<Arc<dyn QueryChunk>>, ProviderError> {
|
||||
let observer = &MetricPruningObserver::new(Arc::clone(&self.metrics));
|
||||
|
||||
let chunks = match prune_chunks(table_schema, &chunks, predicate) {
|
||||
Ok(keeps) => {
|
||||
assert_eq!(chunks.len(), keeps.len());
|
||||
chunks
|
||||
.into_iter()
|
||||
.zip(keeps.iter())
|
||||
.filter_map(|(chunk, keep)| {
|
||||
if *keep {
|
||||
observer.was_not_pruned(chunk.as_ref());
|
||||
Some(chunk)
|
||||
} else {
|
||||
observer.was_pruned(chunk.as_ref());
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
Err(reason) => {
|
||||
for chunk in &chunks {
|
||||
observer.could_not_prune(reason, chunk.as_ref())
|
||||
}
|
||||
chunks
|
||||
}
|
||||
};
|
||||
|
||||
let estimated_bytes = chunks
|
||||
.iter()
|
||||
.map(|chunk| chunk_estimate_size(chunk.as_ref()))
|
||||
.sum::<usize>();
|
||||
if estimated_bytes > self.max_bytes {
|
||||
return Err(ProviderError::TooMuchData {
|
||||
actual_bytes: estimated_bytes,
|
||||
limit_bytes: self.max_bytes,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MetricPruningObserver {
|
||||
metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
||||
impl MetricPruningObserver {
|
||||
pub(crate) fn new(metrics: Arc<PruneMetrics>) -> Self {
|
||||
Self { metrics }
|
||||
}
|
||||
|
||||
/// Called when pruning a chunk before fully creating the chunk structure
|
||||
pub(crate) fn was_pruned_early(&self, row_count: u64, size_estimate: u64) {
|
||||
self.metrics.chunks_pruned.inc(1);
|
||||
self.metrics.rows_pruned.inc(row_count);
|
||||
self.metrics.bytes_pruned.inc(size_estimate);
|
||||
}
|
||||
}
|
||||
|
||||
impl PruningObserver for MetricPruningObserver {
|
||||
fn was_pruned(&self, chunk: &dyn QueryChunk) {
|
||||
self.metrics.chunks_pruned.inc(1);
|
||||
self.metrics.rows_pruned.inc(chunk_rows(chunk) as u64);
|
||||
self.metrics
|
||||
.bytes_pruned
|
||||
.inc(chunk_estimate_size(chunk) as u64);
|
||||
}
|
||||
|
||||
fn was_not_pruned(&self, chunk: &dyn QueryChunk) {
|
||||
self.metrics.chunks_not_pruned.inc(1);
|
||||
self.metrics.rows_not_pruned.inc(chunk_rows(chunk) as u64);
|
||||
self.metrics
|
||||
.bytes_not_pruned
|
||||
.inc(chunk_estimate_size(chunk) as u64);
|
||||
}
|
||||
|
||||
fn could_not_prune(&self, reason: NotPrunedReason, chunk: &dyn QueryChunk) {
|
||||
let (chunks, rows, bytes) = match reason {
|
||||
NotPrunedReason::NoExpressionOnPredicate => (
|
||||
&self.metrics.chunks_could_not_prune_no_expression,
|
||||
&self.metrics.rows_could_not_prune_no_expression,
|
||||
&self.metrics.bytes_could_not_prune_no_expression,
|
||||
),
|
||||
NotPrunedReason::CanNotCreatePruningPredicate => (
|
||||
&self.metrics.chunks_could_not_prune_cannot_create_predicate,
|
||||
&self.metrics.rows_could_not_prune_cannot_create_predicate,
|
||||
&self.metrics.bytes_could_not_prune_cannot_create_predicate,
|
||||
),
|
||||
NotPrunedReason::DataFusionPruningFailed => (
|
||||
&self.metrics.chunks_could_not_prune_df,
|
||||
&self.metrics.rows_could_not_prune_df,
|
||||
&self.metrics.bytes_could_not_prune_df,
|
||||
),
|
||||
};
|
||||
|
||||
chunks.inc(1);
|
||||
rows.inc(chunk_rows(chunk) as u64);
|
||||
bytes.inc(chunk_estimate_size(chunk) as u64);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PruneMetrics {
|
||||
// number of chunks
|
||||
chunks_pruned: U64Counter,
|
||||
chunks_not_pruned: U64Counter,
|
||||
chunks_could_not_prune_no_expression: U64Counter,
|
||||
chunks_could_not_prune_cannot_create_predicate: U64Counter,
|
||||
chunks_could_not_prune_df: U64Counter,
|
||||
|
||||
// number of rows
|
||||
rows_pruned: U64Counter,
|
||||
rows_not_pruned: U64Counter,
|
||||
rows_could_not_prune_no_expression: U64Counter,
|
||||
rows_could_not_prune_cannot_create_predicate: U64Counter,
|
||||
rows_could_not_prune_df: U64Counter,
|
||||
|
||||
// size in bytes
|
||||
bytes_pruned: U64Counter,
|
||||
bytes_not_pruned: U64Counter,
|
||||
bytes_could_not_prune_no_expression: U64Counter,
|
||||
bytes_could_not_prune_cannot_create_predicate: U64Counter,
|
||||
bytes_could_not_prune_df: U64Counter,
|
||||
}
|
||||
|
||||
impl PruneMetrics {
|
||||
pub fn new(metric_registry: &metric::Registry) -> Self {
|
||||
let chunks = metric_registry.register_metric::<U64Counter>(
|
||||
"query_pruner_chunks",
|
||||
"Number of chunks seen by the statistics-based chunk pruner",
|
||||
);
|
||||
let chunks_pruned = chunks.recorder(&[("result", "pruned")]);
|
||||
let chunks_not_pruned = chunks.recorder(&[("result", "not_pruned")]);
|
||||
let chunks_could_not_prune_no_expression = chunks.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::NoExpressionOnPredicate.name()),
|
||||
]);
|
||||
let chunks_could_not_prune_cannot_create_predicate = chunks.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
(
|
||||
"reason",
|
||||
NotPrunedReason::CanNotCreatePruningPredicate.name(),
|
||||
),
|
||||
]);
|
||||
let chunks_could_not_prune_df = chunks.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::DataFusionPruningFailed.name()),
|
||||
]);
|
||||
|
||||
let rows = metric_registry.register_metric::<U64Counter>(
|
||||
"query_pruner_rows",
|
||||
"Number of rows seen by the statistics-based chunk pruner",
|
||||
);
|
||||
let rows_pruned = rows.recorder(&[("result", "pruned")]);
|
||||
let rows_not_pruned = rows.recorder(&[("result", "not_pruned")]);
|
||||
let rows_could_not_prune_no_expression = rows.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::NoExpressionOnPredicate.name()),
|
||||
]);
|
||||
let rows_could_not_prune_cannot_create_predicate = rows.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
(
|
||||
"reason",
|
||||
NotPrunedReason::CanNotCreatePruningPredicate.name(),
|
||||
),
|
||||
]);
|
||||
let rows_could_not_prune_df = rows.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::DataFusionPruningFailed.name()),
|
||||
]);
|
||||
|
||||
let bytes = metric_registry.register_metric::<U64Counter>(
|
||||
"query_pruner_bytes",
|
||||
"Size (in bytes) of chunks seen by the statistics-based chunk pruner",
|
||||
);
|
||||
let bytes_pruned = bytes.recorder(&[("result", "pruned")]);
|
||||
let bytes_not_pruned = bytes.recorder(&[("result", "not_pruned")]);
|
||||
let bytes_could_not_prune_no_expression = bytes.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::NoExpressionOnPredicate.name()),
|
||||
]);
|
||||
let bytes_could_not_prune_cannot_create_predicate = bytes.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
(
|
||||
"reason",
|
||||
NotPrunedReason::CanNotCreatePruningPredicate.name(),
|
||||
),
|
||||
]);
|
||||
let bytes_could_not_prune_df = bytes.recorder(&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::DataFusionPruningFailed.name()),
|
||||
]);
|
||||
|
||||
Self {
|
||||
chunks_pruned,
|
||||
chunks_not_pruned,
|
||||
chunks_could_not_prune_no_expression,
|
||||
chunks_could_not_prune_cannot_create_predicate,
|
||||
chunks_could_not_prune_df,
|
||||
rows_pruned,
|
||||
rows_not_pruned,
|
||||
rows_could_not_prune_no_expression,
|
||||
rows_could_not_prune_cannot_create_predicate,
|
||||
rows_could_not_prune_df,
|
||||
bytes_pruned,
|
||||
bytes_not_pruned,
|
||||
bytes_could_not_prune_no_expression,
|
||||
bytes_could_not_prune_cannot_create_predicate,
|
||||
bytes_could_not_prune_df,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_estimate_size(chunk: &dyn QueryChunk) -> usize {
|
||||
let chunk = chunk.as_any();
|
||||
|
||||
if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() {
|
||||
chunk.estimate_size()
|
||||
} else if let Some(chunk) = chunk.downcast_ref::<QuerierChunk>() {
|
||||
chunk.estimate_size()
|
||||
} else {
|
||||
panic!("Unknown chunk type")
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_rows(chunk: &dyn QueryChunk) -> usize {
|
||||
let chunk = chunk.as_any();
|
||||
|
||||
if let Some(chunk) = chunk.downcast_ref::<QuerierChunk>() {
|
||||
chunk.rows()
|
||||
} else if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() {
|
||||
chunk.rows()
|
||||
} else {
|
||||
panic!("Unknown chunk type");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
use iox_query::pruning::NotPrunedReason;
|
||||
use metric::{Attributes, U64Counter};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PruneMetricsGroup {
|
||||
/// number of chunks
|
||||
chunks: U64Counter,
|
||||
|
||||
/// number of rows
|
||||
rows: U64Counter,
|
||||
|
||||
/// estimated size in bytes
|
||||
bytes: U64Counter,
|
||||
}
|
||||
|
||||
impl PruneMetricsGroup {
|
||||
fn new(metric_registry: &metric::Registry, attributes: impl Into<Attributes>) -> Self {
|
||||
let attributes: Attributes = attributes.into();
|
||||
|
||||
let chunks = metric_registry
|
||||
.register_metric::<U64Counter>(
|
||||
"query_pruner_chunks",
|
||||
"Number of chunks seen by the statistics-based chunk pruner",
|
||||
)
|
||||
.recorder(attributes.clone());
|
||||
|
||||
let rows = metric_registry
|
||||
.register_metric::<U64Counter>(
|
||||
"query_pruner_rows",
|
||||
"Number of rows seen by the statistics-based chunk pruner",
|
||||
)
|
||||
.recorder(attributes.clone());
|
||||
|
||||
let bytes = metric_registry
|
||||
.register_metric::<U64Counter>(
|
||||
"query_pruner_bytes",
|
||||
"Size (estimated bytes) of chunks seen by the statistics-based chunk pruner",
|
||||
)
|
||||
.recorder(attributes);
|
||||
|
||||
Self {
|
||||
chunks,
|
||||
rows,
|
||||
bytes,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inc(&self, chunks: u64, rows: u64, bytes: u64) {
|
||||
self.chunks.inc(chunks);
|
||||
self.rows.inc(rows);
|
||||
self.bytes.inc(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PruneMetrics {
|
||||
/// Chunks that have been pruned based on cheaply-available metadata.
|
||||
///
|
||||
/// This was done before the actual [`QueryChunk`](iox_query::QueryChunk) was created because the latter needs some
|
||||
/// slightly more expensive data like the partition sort key.
|
||||
///
|
||||
/// At the moment we can prune chunks early only based on "time".
|
||||
pub pruned_early: PruneMetricsGroup,
|
||||
|
||||
/// Chunks that have been pruned after they have been created. At this stage we likely had better/more statistics available.
|
||||
pub pruned_late: PruneMetricsGroup,
|
||||
|
||||
/// The pruning process worked but the chunk was not pruned and needs to be scanned.
|
||||
pub not_pruned: PruneMetricsGroup,
|
||||
|
||||
/// We could not prune these chunks because there was no filter expression available.
|
||||
///
|
||||
/// This may happen for "scan all" type of queries.
|
||||
pub could_not_prune_no_expression: PruneMetricsGroup,
|
||||
|
||||
/// We could not prune these chunks because we were unable to create the DataFusion pruning predicate. This is most
|
||||
/// likely a missing feature in DataFusion.
|
||||
pub could_not_prune_cannot_create_predicate: PruneMetricsGroup,
|
||||
|
||||
/// We could not prune these chunks because DataFusion failed to apply the pruning predicate to the chunks. This is
|
||||
/// most likely a missing feature in DataFusion.
|
||||
pub could_not_prune_df: PruneMetricsGroup,
|
||||
}
|
||||
|
||||
impl PruneMetrics {
|
||||
pub fn new(metric_registry: &metric::Registry) -> Self {
|
||||
let pruned_early = PruneMetricsGroup::new(metric_registry, &[("result", "pruned_early")]);
|
||||
let pruned_late = PruneMetricsGroup::new(metric_registry, &[("result", "pruned_late")]);
|
||||
let not_pruned = PruneMetricsGroup::new(metric_registry, &[("result", "not_pruned")]);
|
||||
let could_not_prune_no_expression = PruneMetricsGroup::new(
|
||||
metric_registry,
|
||||
&[
|
||||
("result", "could_not_prune"),
|
||||
("reason", NotPrunedReason::NoExpressionOnPredicate.name()),
|
||||
],
|
||||
);
|
||||
let could_not_prune_cannot_create_predicate = PruneMetricsGroup::new(
|
||||
metric_registry,
|
||||
&[
|
||||
("result", "could_not_prune"),
|
||||
(
|
||||
"reason",
|
||||
NotPrunedReason::CanNotCreatePruningPredicate.name(),
|
||||
),
|
||||
],
|
||||
);
|
||||
let could_not_prune_df = PruneMetricsGroup::new(
|
||||
metric_registry,
|
||||
&[
|
||||
("result", "could_not_prune"),
|
||||
(
|
||||
"reason",
|
||||
NotPrunedReason::CanNotCreatePruningPredicate.name(),
|
||||
),
|
||||
],
|
||||
);
|
||||
|
||||
Self {
|
||||
pruned_early,
|
||||
pruned_late,
|
||||
not_pruned,
|
||||
could_not_prune_no_expression,
|
||||
could_not_prune_cannot_create_predicate,
|
||||
could_not_prune_df,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
use std::{any::Any, sync::Arc};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use async_trait::async_trait;
|
||||
use datafusion::{
|
||||
datasource::{TableProvider, TableType},
|
||||
error::DataFusionError,
|
||||
execution::context::SessionState,
|
||||
logical_expr::TableProviderFilterPushDown,
|
||||
logical_plan::Expr,
|
||||
physical_plan::ExecutionPlan,
|
||||
};
|
||||
use iox_query::{
|
||||
exec::{ExecutorType, SessionContextIOxExt},
|
||||
provider::{ChunkPruner, Error as ProviderError, ProviderBuilder},
|
||||
pruning::{prune_chunks, NotPrunedReason, PruningObserver},
|
||||
QueryChunk,
|
||||
};
|
||||
use predicate::Predicate;
|
||||
use schema::Schema;
|
||||
|
||||
use crate::{chunk::QuerierChunk, ingester::IngesterChunk};
|
||||
|
||||
use self::metrics::PruneMetrics;
|
||||
|
||||
use super::QuerierTable;
|
||||
|
||||
pub mod metrics;
|
||||
|
||||
#[async_trait]
|
||||
impl TableProvider for QuerierTable {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema().as_arrow()
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
TableType::Base
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
ctx: &SessionState,
|
||||
projection: &Option<Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
limit: Option<usize>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
// build provider out of all chunks
|
||||
// TODO: push down some predicates to catalog
|
||||
let iox_ctx = self.exec.new_context_from_df(ExecutorType::Query, ctx);
|
||||
|
||||
let mut builder =
|
||||
ProviderBuilder::new(self.table_name(), Arc::clone(self.schema()), iox_ctx);
|
||||
|
||||
let pruning_predicate = filters
|
||||
.iter()
|
||||
.cloned()
|
||||
.fold(Predicate::default(), Predicate::with_expr);
|
||||
let chunks = self
|
||||
.chunks(&pruning_predicate, ctx.child_span("querier table chunks"))
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
for chunk in chunks {
|
||||
builder = builder.add_chunk(chunk);
|
||||
}
|
||||
|
||||
let provider = match builder.build() {
|
||||
Ok(provider) => provider,
|
||||
Err(e) => panic!("unexpected error: {:?}", e),
|
||||
};
|
||||
|
||||
provider.scan(ctx, projection, filters, limit).await
|
||||
}
|
||||
|
||||
fn supports_filter_pushdown(
|
||||
&self,
|
||||
_filter: &Expr,
|
||||
) -> Result<TableProviderFilterPushDown, DataFusionError> {
|
||||
// we may apply filtering (via pruning) but can not guarantee
|
||||
// that the filter catches all row during scan
|
||||
Ok(TableProviderFilterPushDown::Inexact)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct QuerierTableChunkPruner {
|
||||
max_bytes: usize,
|
||||
metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
||||
impl QuerierTableChunkPruner {
|
||||
pub fn new(max_bytes: usize, metrics: Arc<PruneMetrics>) -> Self {
|
||||
Self { max_bytes, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkPruner for QuerierTableChunkPruner {
|
||||
fn prune_chunks(
|
||||
&self,
|
||||
_table_name: &str,
|
||||
table_schema: Arc<Schema>,
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
predicate: &Predicate,
|
||||
) -> Result<Vec<Arc<dyn QueryChunk>>, ProviderError> {
|
||||
let observer = &MetricPruningObserver::new(Arc::clone(&self.metrics));
|
||||
|
||||
let chunks = match prune_chunks(table_schema, &chunks, predicate) {
|
||||
Ok(keeps) => {
|
||||
assert_eq!(chunks.len(), keeps.len());
|
||||
chunks
|
||||
.into_iter()
|
||||
.zip(keeps.iter())
|
||||
.filter_map(|(chunk, keep)| {
|
||||
if *keep {
|
||||
observer.was_not_pruned(chunk.as_ref());
|
||||
Some(chunk)
|
||||
} else {
|
||||
observer.was_pruned(chunk.as_ref());
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
Err(reason) => {
|
||||
for chunk in &chunks {
|
||||
observer.could_not_prune(reason, chunk.as_ref())
|
||||
}
|
||||
chunks
|
||||
}
|
||||
};
|
||||
|
||||
let estimated_bytes = chunks
|
||||
.iter()
|
||||
.map(|chunk| chunk_estimate_size(chunk.as_ref()))
|
||||
.sum::<usize>();
|
||||
if estimated_bytes > self.max_bytes {
|
||||
return Err(ProviderError::TooMuchData {
|
||||
actual_bytes: estimated_bytes,
|
||||
limit_bytes: self.max_bytes,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MetricPruningObserver {
|
||||
metrics: Arc<PruneMetrics>,
|
||||
}
|
||||
|
||||
impl MetricPruningObserver {
|
||||
pub(crate) fn new(metrics: Arc<PruneMetrics>) -> Self {
|
||||
Self { metrics }
|
||||
}
|
||||
|
||||
/// Called when pruning a chunk before fully creating the chunk structure
|
||||
pub(crate) fn was_pruned_early(&self, row_count: u64, size_estimate: u64) {
|
||||
self.metrics.pruned_early.inc(1, row_count, size_estimate);
|
||||
}
|
||||
}
|
||||
|
||||
impl PruningObserver for MetricPruningObserver {
|
||||
fn was_pruned(&self, chunk: &dyn QueryChunk) {
|
||||
self.metrics.pruned_late.inc(
|
||||
1,
|
||||
chunk_rows(chunk) as u64,
|
||||
chunk_estimate_size(chunk) as u64,
|
||||
);
|
||||
}
|
||||
|
||||
fn was_not_pruned(&self, chunk: &dyn QueryChunk) {
|
||||
self.metrics.not_pruned.inc(
|
||||
1,
|
||||
chunk_rows(chunk) as u64,
|
||||
chunk_estimate_size(chunk) as u64,
|
||||
);
|
||||
}
|
||||
|
||||
fn could_not_prune(&self, reason: NotPrunedReason, chunk: &dyn QueryChunk) {
|
||||
let group = match reason {
|
||||
NotPrunedReason::NoExpressionOnPredicate => &self.metrics.could_not_prune_no_expression,
|
||||
NotPrunedReason::CanNotCreatePruningPredicate => {
|
||||
&self.metrics.could_not_prune_cannot_create_predicate
|
||||
}
|
||||
NotPrunedReason::DataFusionPruningFailed => &self.metrics.could_not_prune_df,
|
||||
};
|
||||
|
||||
group.inc(
|
||||
1,
|
||||
chunk_rows(chunk) as u64,
|
||||
chunk_estimate_size(chunk) as u64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_estimate_size(chunk: &dyn QueryChunk) -> usize {
|
||||
let chunk = chunk.as_any();
|
||||
|
||||
if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() {
|
||||
chunk.estimate_size()
|
||||
} else if let Some(chunk) = chunk.downcast_ref::<QuerierChunk>() {
|
||||
chunk.estimate_size()
|
||||
} else {
|
||||
panic!("Unknown chunk type")
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_rows(chunk: &dyn QueryChunk) -> usize {
|
||||
let chunk = chunk.as_any();
|
||||
|
||||
if let Some(chunk) = chunk.downcast_ref::<QuerierChunk>() {
|
||||
chunk.rows()
|
||||
} else if let Some(chunk) = chunk.downcast_ref::<IngesterChunk>() {
|
||||
chunk.rows()
|
||||
} else {
|
||||
panic!("Unknown chunk type");
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
use super::{query_access::PruneMetrics, QuerierTable, QuerierTableArgs};
|
||||
use super::{PruneMetrics, QuerierTable, QuerierTableArgs};
|
||||
use crate::{
|
||||
cache::CatalogCache, chunk::ChunkAdapter, create_ingester_connection_for_testing,
|
||||
IngesterPartition, QuerierChunkLoadSetting,
|
||||
|
|
Loading…
Reference in New Issue