feat: dDisable using statistics to query data if there are soft deleted rows

pull/24376/head
Nga Tran 2021-10-05 17:52:32 -04:00
parent 5177e69514
commit aa64daca86
10 changed files with 135 additions and 27 deletions

View File

@ -48,8 +48,16 @@ pub trait QueryChunkMeta: Sized {
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> Arc<Schema>;
// return a reference to delete predicates of the chunk
/// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &[Arc<DeletePredicate>];
/// return true if the chunk has delete predicates
fn has_delete_predicates(&self) -> bool {
if self.delete_predicates().is_empty() {
return false;
}
true
}
}
/// A `Database` is the main trait implemented by the IOx subsystems

View File

@ -3,6 +3,7 @@
use std::{fmt, sync::Arc};
use arrow::datatypes::SchemaRef;
use data_types::partition_metadata::TableSummary;
use datafusion::{
error::DataFusionError,
physical_plan::{
@ -161,15 +162,24 @@ impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
}
fn statistics(&self) -> Statistics {
self.chunks
.iter()
.fold(None, |combined_summary, chunk| match combined_summary {
let mut combined_summary_option: Option<TableSummary> = None;
for chunk in &self.chunks {
if chunk.has_delete_predicates() || chunk.may_contain_pk_duplicates() {
// Not use statistics if there is at least one delete predicate or
// if chunk may have duplicates
return Statistics::default();
}
combined_summary_option = match combined_summary_option {
None => Some(chunk.summary().clone()),
Some(mut combined_summary) => {
combined_summary.update_from(chunk.summary());
Some(combined_summary)
}
})
}
}
combined_summary_option
.map(|combined_summary| {
crate::statistics::df_from_iox(self.iox_schema.as_ref(), &combined_summary)
})

View File

@ -0,0 +1,13 @@
-- Test Setup: ThreeDeleteThreeChunks
-- SQL: EXPLAIN SELECT count(*) from cpu;
+---------------+-------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | TableScan: h2o projection=Some([0]) |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ProjectionExec: expr=[3 as COUNT(Uint8(1))] |
| | EmptyExec: produce_one_row=true |
+---------------+-------------------------------------------------------------+

View File

@ -0,0 +1,5 @@
-- Demonstrate plans that are not optimized using statistics
-- IOX_SETUP: ThreeDeleteThreeChunks
-- This plan should scan data as it reads from chunk with delete predicates
EXPLAIN SELECT count(*) from cpu;

View File

@ -1,4 +1,4 @@
-- Test Setup: TwoMeasurementsManyFieldsOneChunk
-- Test Setup: TwoMeasurementsManyFieldsOneRubChunk
-- SQL: EXPLAIN SELECT count(*) from h2o;
+---------------+-------------------------------------------------------------+
| plan_type | plan |
@ -12,22 +12,19 @@
| | EmptyExec: produce_one_row=true |
+---------------+-------------------------------------------------------------+
-- SQL: EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | Filter: #h2o.temp > Float64(70) AND #h2o.temp < Float64(72) |
| | TableScan: h2o projection=Some([3]), filters=[#h2o.temp > Float64(70), #h2o.temp < Float64(72)] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalescePartitionsExec |
| | HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: temp@0 > 70 AND temp@0 < 72 |
| | ProjectionExec: expr=[temp@2 as temp] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [city@0 ASC,state@1 ASC,time@3 ASC] |
| | SortExec: [city@0 ASC,state@1 ASC,time@3 ASC] |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#temp > Float64(70), #temp < Float64(72)] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | Filter: #h2o.temp > Float64(70) AND #h2o.temp < Float64(72) |
| | TableScan: h2o projection=Some([3]), filters=[#h2o.temp > Float64(70), #h2o.temp < Float64(72)] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalescePartitionsExec |
| | HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: temp@0 > 70 AND temp@0 < 72 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#temp > Float64(70), #temp < Float64(72)] |
+---------------+---------------------------------------------------------------------------------------------------------------------------------+

View File

@ -1,8 +1,9 @@
-- Demonstrate plans that are optimized using statistics
-- IOX_SETUP: TwoMeasurementsManyFieldsOneChunk
-- IOX_SETUP: TwoMeasurementsManyFieldsOneRubChunk
-- This plan should not scan data
-- This plan should not scan data as it reads from a RUB chunk (no duplicate) and no soft deleted data
EXPLAIN SELECT count(*) from h2o;
-- However, this plan will still need to scan data given the predicate
EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0;

View File

@ -46,6 +46,20 @@ async fn test_cases_duplicates_sql() {
.expect("flush worked");
}
#[tokio::test]
// Tests from "no_stats_plans.sql",
async fn test_cases_no_stats_plans_sql() {
let input_path = Path::new("cases").join("in").join("no_stats_plans.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "pushdown.sql",
async fn test_cases_pushdown_sql() {

View File

@ -277,6 +277,7 @@ impl<W: Write> Runner<W> {
.with_default_catalog(db)
.build();
println!("---- {}", sql);
let physical_plan = planner
.query(sql, &ctx)
.await

View File

@ -12,6 +12,7 @@ use query::QueryChunk;
use async_trait::async_trait;
use delete::ThreeDeleteThreeChunks;
use server::db::{LockableChunk, LockablePartition};
use server::utils::{
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
@ -52,9 +53,11 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(TwoMeasurements),
register_setup!(TwoMeasurementsPredicatePushDown),
register_setup!(TwoMeasurementsManyFieldsOneChunk),
register_setup!(TwoMeasurementsManyFieldsOneRubChunk),
register_setup!(OneMeasurementFourChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),
register_setup!(ChunkOrder),
register_setup!(ThreeDeleteThreeChunks),
]
.into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))
@ -408,6 +411,35 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
}
}
#[derive(Debug)]
/// This has a single chunk for queries that check the state of the system
pub struct TwoMeasurementsManyFieldsOneRubChunk {}
#[async_trait]
impl DbSetup for TwoMeasurementsManyFieldsOneRubChunk {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n")).await;
// move all data to RUB
db.compact_open_chunk("h2o", partition_key).await.unwrap();
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
}]
}
}
#[derive(Debug)]
/// This has two chunks for queries that check the state of the system
pub struct TwoMeasurementsManyFieldsTwoChunks {}

View File

@ -1060,6 +1060,33 @@ async fn sql_select_with_three_deletes_from_three_chunks() {
&expected,
)
.await;
run_sql_test_case(
scenarios::delete::ThreeDeleteThreeChunks {},
"SELECT count(*) from cpu",
&expected,
)
.await;
}
// Bug: https://github.com/influxdata/influxdb_iox/issues/2745
#[ignore]
#[tokio::test]
async fn sql_select_count_with_three_deletes_from_three_chunks() {
let expected = vec![
"+-----+-----+--------------------------------+",
"| bar | foo | time |",
"+-----+-----+--------------------------------+",
"| 7 | me | 1970-01-01T00:00:00.000000080Z |",
"+-----+-----+--------------------------------+",
];
run_sql_test_case(
scenarios::delete::ThreeDeleteThreeChunks {},
"SELECT count(*) from cpu",
&expected,
)
.await;
}
#[tokio::test]