From ce412dbce2d75f4930031b49641fb2aa9d5ec54f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 11:34:12 +0200 Subject: [PATCH 1/8] fix: use structured error for background cleanup task reporting --- server/src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/db.rs b/server/src/db.rs index 21b7c725a9..052df000de 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -954,7 +954,7 @@ impl Db { tokio::select! { _ = async { if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { - error!("error in background cleanup task: {:?}", e); + error!(%e, "error in background cleanup task"); } tokio::time::sleep(Duration::from_secs(500)).await; } => {}, From bbd73e59be85b0b5200383f9dd0d239121089a31 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 11:34:19 +0200 Subject: [PATCH 2/8] feat: jitter background clean-up job + wait on first job --- Cargo.lock | 1 + data_types/src/database_rules.rs | 5 +++++ .../influxdata/iox/management/v1/database_rules.proto | 3 +++ generated_types/src/database_rules.rs | 8 ++++++++ server/Cargo.toml | 1 + server/src/db.rs | 10 +++++++++- server/src/lib.rs | 4 +++- server/src/query_tests/utils.rs | 9 +++++++-- tests/end_to_end_cases/management_api.rs | 4 ++++ 9 files changed, 41 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94a8241b72..939095cab9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3722,6 +3722,7 @@ dependencies = [ "parquet_file", "query", "rand 0.8.3", + "rand_distr", "read_buffer", "serde", "serde_json", diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 32a0611618..36424ce48d 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -4,6 +4,7 @@ use influxdb_line_protocol::ParsedLine; use regex::Regex; use snafu::{OptionExt, Snafu}; use std::num::NonZeroU64; +use std::time::Duration; use std::{ collections::HashMap, hash::{Hash, Hasher}, @@ -49,6 +50,9 @@ pub struct DatabaseRules { /// An optional config to delegate data plane operations to one or more /// remote servers. pub routing_rules: Option, + + /// Duration for which the cleanup loop should sleep on avarage. + pub worker_cleanup_avg_sleep: Duration, } #[derive(Debug, Eq, PartialEq, Clone)] @@ -79,6 +83,7 @@ impl DatabaseRules { write_buffer_config: None, lifecycle_rules: Default::default(), routing_rules: None, + worker_cleanup_avg_sleep: Duration::from_secs(500), } } diff --git a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto index 61795efc54..d76a735643 100644 --- a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto +++ b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto @@ -184,6 +184,9 @@ message DatabaseRules { // Routing config RoutingConfig routing_config = 9; } + + // Duration for which the cleanup loop should sleep on avarage. + google.protobuf.Duration worker_cleanup_avg_sleep = 10; } message RoutingConfig { diff --git a/generated_types/src/database_rules.rs b/generated_types/src/database_rules.rs index 9425ee79ee..c749d05509 100644 --- a/generated_types/src/database_rules.rs +++ b/generated_types/src/database_rules.rs @@ -1,4 +1,5 @@ use std::convert::{TryFrom, TryInto}; +use std::time::Duration; use thiserror::Error; @@ -23,6 +24,7 @@ impl From for management::DatabaseRules { write_buffer_config: rules.write_buffer_config.map(Into::into), lifecycle_rules: Some(rules.lifecycle_rules.into()), routing_rules: rules.routing_rules.map(Into::into), + worker_cleanup_avg_sleep: Some(rules.worker_cleanup_avg_sleep.into()), } } } @@ -50,12 +52,18 @@ impl TryFrom for DatabaseRules { .optional("routing_rules") .unwrap_or_default(); + let worker_cleanup_avg_sleep = match proto.worker_cleanup_avg_sleep { + Some(d) => d.try_into().field("worker_cleanup_avg_sleep")?, + None => Duration::from_secs(500), + }; + Ok(Self { name, partition_template, write_buffer_config, lifecycle_rules, routing_rules, + worker_cleanup_avg_sleep, }) } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 89d3e93306..9bca8c0045 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -35,6 +35,7 @@ parking_lot = "0.11.1" parquet_file = { path = "../parquet_file" } query = { path = "../query" } rand = "0.8.3" +rand_distr = "0.4.0" read_buffer = { path = "../read_buffer" } serde = "1.0" serde_json = "1.0" diff --git a/server/src/db.rs b/server/src/db.rs index 052df000de..a395d865a4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -41,6 +41,7 @@ use parquet_file::{ }; use query::predicate::{Predicate, PredicateBuilder}; use query::{exec::Executor, Database, DEFAULT_SCHEMA}; +use rand_distr::{Distribution, Poisson}; use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics}; use snafu::{ResultExt, Snafu}; use std::{ @@ -953,10 +954,17 @@ impl Db { .fetch_add(1, Ordering::Relaxed); tokio::select! { _ = async { + // Sleep for a duration drawn from a poisson distribution to de-correlate workers. + // Perform this sleep BEFORE the actual clean-up so that we don't immediately run a clean-up + // on startup. + let dist = Poisson::new(self.rules.read().worker_cleanup_avg_sleep.as_secs_f32().max(1.0)).expect("parameter should be positive and finite"); + let duration = Duration::from_secs_f32(dist.sample(&mut rand::thread_rng())); + debug!(?duration, "cleanup worker sleeps"); + tokio::time::sleep(duration).await; + if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { error!(%e, "error in background cleanup task"); } - tokio::time::sleep(Duration::from_secs(500)).await; } => {}, _ = shutdown.cancelled() => break, } diff --git a/server/src/lib.rs b/server/src/lib.rs index 4e7d8c0ce5..8ac80850d5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1112,7 +1112,7 @@ async fn get_database_config_bytes( #[cfg(test)] mod tests { - use std::{collections::BTreeMap, convert::TryFrom}; + use std::{collections::BTreeMap, convert::TryFrom, time::Duration}; use async_trait::async_trait; use futures::TryStreamExt; @@ -1206,6 +1206,7 @@ mod tests { write_buffer_config: None, lifecycle_rules: Default::default(), routing_rules: None, + worker_cleanup_avg_sleep: Duration::from_secs(2), }; // Create a database @@ -1302,6 +1303,7 @@ mod tests { write_buffer_config: None, lifecycle_rules: Default::default(), routing_rules: None, + worker_cleanup_avg_sleep: Duration::from_secs(2), }; // Create a database diff --git a/server/src/query_tests/utils.rs b/server/src/query_tests/utils.rs index a322495f45..a53d113ca5 100644 --- a/server/src/query_tests/utils.rs +++ b/server/src/query_tests/utils.rs @@ -12,7 +12,7 @@ use crate::{ db::{load_or_create_preserved_catalog, Db}, JobRegistry, }; -use std::{borrow::Cow, convert::TryFrom, sync::Arc}; +use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration}; // A wrapper around a Db and a metrics registry allowing for isolated testing // of a Db and its metrics. @@ -77,10 +77,15 @@ impl TestDbBuilder { .await .unwrap(); + let mut rules = DatabaseRules::new(db_name); + + // make background loop spin a bit faster for tests + rules.worker_cleanup_avg_sleep = Duration::from_secs(2); + TestDb { metric_registry: metrics::TestMetricRegistry::new(metrics_registry), db: Db::new( - DatabaseRules::new(db_name), + rules, server_id, object_store, exec, diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 41097ed124..1ada273b7a 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -223,6 +223,10 @@ async fn test_create_get_update_database() { ..Default::default() }), routing_rules: None, + worker_cleanup_avg_sleep: Some(Duration { + seconds: 2, + nanos: 0, + }), }; client From 85139abbbb519302d344716d0324a6e82125982d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 11:39:07 +0200 Subject: [PATCH 3/8] fix: use structured logging for cleanup logs --- parquet_file/src/cleanup.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 3655189148..41e78c242a 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -103,14 +103,14 @@ where // now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation let n_files = to_remove.len(); - info!("Found {} files to delete, start deletion.", n_files); + info!(%n_files, "Found files to delete, start deletion."); for path in to_remove { - info!("Delete file: {}", path.display()); + info!(path = %path.display(), "Delete file"); store.delete(&path).await.context(WriteError)?; } - info!("Finished deletion, removed {} files.", n_files); + info!(%n_files, "Finished deletion, removed files."); Ok(()) } From 2afa8fa89aa36f923e7a70f21f4f61a914e66f3c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 18:26:54 +0200 Subject: [PATCH 4/8] docs: fix typo and mention default --- data_types/src/database_rules.rs | 3 ++- .../protos/influxdata/iox/management/v1/database_rules.proto | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 36424ce48d..bde36b624d 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -51,7 +51,8 @@ pub struct DatabaseRules { /// remote servers. pub routing_rules: Option, - /// Duration for which the cleanup loop should sleep on avarage. + /// Duration for which the cleanup loop should sleep on average. + /// Defaults to 500 seconds. pub worker_cleanup_avg_sleep: Duration, } diff --git a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto index d76a735643..409ae9e8db 100644 --- a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto +++ b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto @@ -185,7 +185,8 @@ message DatabaseRules { RoutingConfig routing_config = 9; } - // Duration for which the cleanup loop should sleep on avarage. + // Duration for which the cleanup loop should sleep on average. + // Defaults to 500 seconds. google.protobuf.Duration worker_cleanup_avg_sleep = 10; } From 3c9fd81697eb676e3a6799110f5faaa0658ecfe9 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 18:29:01 +0200 Subject: [PATCH 5/8] refactor: split overlong line --- server/src/db.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/db.rs b/server/src/db.rs index a395d865a4..ca98d17598 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -957,7 +957,8 @@ impl Db { // Sleep for a duration drawn from a poisson distribution to de-correlate workers. // Perform this sleep BEFORE the actual clean-up so that we don't immediately run a clean-up // on startup. - let dist = Poisson::new(self.rules.read().worker_cleanup_avg_sleep.as_secs_f32().max(1.0)).expect("parameter should be positive and finite"); + let avg_sleep_secs = self.rules.read().worker_cleanup_avg_sleep.as_secs_f32().max(1.0); + let dist = Poisson::new(avg_sleep_secs).expect("parameter should be positive and finite"); let duration = Duration::from_secs_f32(dist.sample(&mut rand::thread_rng())); debug!(?duration, "cleanup worker sleeps"); tokio::time::sleep(duration).await; From 7b2663a38a97530cff5460a2cf7c4b79627bc12b Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 18:30:30 +0200 Subject: [PATCH 6/8] test: make tests faster --- server/src/query_tests/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/query_tests/utils.rs b/server/src/query_tests/utils.rs index a53d113ca5..1431664fe4 100644 --- a/server/src/query_tests/utils.rs +++ b/server/src/query_tests/utils.rs @@ -80,7 +80,7 @@ impl TestDbBuilder { let mut rules = DatabaseRules::new(db_name); // make background loop spin a bit faster for tests - rules.worker_cleanup_avg_sleep = Duration::from_secs(2); + rules.worker_cleanup_avg_sleep = Duration::from_secs(1); TestDb { metric_registry: metrics::TestMetricRegistry::new(metrics_registry), From 27b9477aa494a28abd6fc84f5103ff9022a87cab Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 3 Jun 2021 11:23:20 +0200 Subject: [PATCH 7/8] test: fix flaky test --- server/src/db.rs | 4 +++- server/src/query_tests/utils.rs | 10 +++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index ca98d17598..eee13b5d6e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2677,6 +2677,8 @@ mod tests { .server_id(server_id) .object_store(Arc::clone(&object_store)) .db_name(db_name) + // "dispable" clean-up by setting it to a very long time to avoid interference with this test + .worker_cleanup_avg_sleep(Duration::from_secs(1_000)) .build() .await; @@ -2759,7 +2761,7 @@ mod tests { let _ = chunk_a.read(); }); - // Hold lock for 100 seconds blocking background task + // Hold lock for 100 milliseconds blocking background task std::thread::sleep(std::time::Duration::from_millis(100)); std::mem::drop(chunk_b); diff --git a/server/src/query_tests/utils.rs b/server/src/query_tests/utils.rs index 1431664fe4..3dcb65b15d 100644 --- a/server/src/query_tests/utils.rs +++ b/server/src/query_tests/utils.rs @@ -34,6 +34,7 @@ pub struct TestDbBuilder { object_store: Option>, db_name: Option>, write_buffer: bool, + worker_cleanup_avg_sleep: Option, } impl TestDbBuilder { @@ -80,7 +81,9 @@ impl TestDbBuilder { let mut rules = DatabaseRules::new(db_name); // make background loop spin a bit faster for tests - rules.worker_cleanup_avg_sleep = Duration::from_secs(1); + rules.worker_cleanup_avg_sleep = self + .worker_cleanup_avg_sleep + .unwrap_or_else(|| Duration::from_secs(1)); TestDb { metric_registry: metrics::TestMetricRegistry::new(metrics_registry), @@ -115,6 +118,11 @@ impl TestDbBuilder { self.write_buffer = enabled; self } + + pub fn worker_cleanup_avg_sleep(mut self, d: Duration) -> Self { + self.worker_cleanup_avg_sleep = Some(d); + self + } } /// Used for testing: create a Database with a local store From c986ce2c19f5c13516c6ba18af0b8bb293862b4f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 3 Jun 2021 07:07:26 -0400 Subject: [PATCH 8/8] feat: Add pruning module to query crate (#1611) * feat: Add pruning module * fix: clippy * fix: Apply suggestions from code review * fix: remove erronious claims of DF bugs * fix: update comments with DF bug reference --- query/src/lib.rs | 1 + query/src/pruning.rs | 858 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 859 insertions(+) create mode 100644 query/src/pruning.rs diff --git a/query/src/lib.rs b/query/src/lib.rs index 0d1289cdaf..d59d44bc5b 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -22,6 +22,7 @@ pub mod group_by; pub mod plan; pub mod predicate; pub mod provider; +pub mod pruning; pub mod util; pub use exec::context::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; diff --git a/query/src/pruning.rs b/query/src/pruning.rs new file mode 100644 index 0000000000..8df8726558 --- /dev/null +++ b/query/src/pruning.rs @@ -0,0 +1,858 @@ +//! Implementation of statistics based pruning +use arrow::{array::ArrayRef, datatypes::SchemaRef}; +use data_types::partition_metadata::{ColumnSummary, Statistics, TableSummary}; +use datafusion::{ + logical_plan::Expr, + physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + scalar::ScalarValue, +}; +use observability_deps::tracing::{debug, trace}; + +use crate::predicate::Predicate; + +/// Trait for an object (designed to be a Chunk) which can provide +/// sufficient information to prune +pub trait Prunable: Sized { + /// Return a summary of the data in this [`Prunable`] + fn summary(&self) -> &TableSummary; + + /// return the schema of the data in this [`Prunable`] + fn schema(&self) -> SchemaRef; +} + +/// Something that cares to be notified when pruning of chunks occurs +pub trait PruningObserver { + type Observed; + + /// Called when the specified chunk was pruned from observation + fn was_pruned(&self, _chunk: &Self::Observed) {} + + /// Called when no pruning can happen at all for some reason + fn could_not_prune(&self, _reason: &str) {} + + /// Called when the specified chunk could not be pruned, for some reason + fn could_not_prune_chunk(&self, _chunk: &Self::Observed, _reason: &str) {} +} + +/// Given a Vec of prunable items, returns a possibly smaller set +/// filtering those that can not pass the predicate. +pub fn prune_chunks(observer: &O, summaries: Vec, predicate: &Predicate) -> Vec +where + C: AsRef

, + P: Prunable, + O: PruningObserver, +{ + let num_chunks = summaries.len(); + debug!(num_chunks, %predicate, "Pruning chunks"); + + let filter_expr = match predicate.filter_expr() { + Some(expr) => expr, + None => { + observer.could_not_prune("No expression on predicate"); + return summaries; + } + }; + + // TODO: performance optimization: batch the chunk pruning by + // grouping the chunks with the same types for all columns + // together and then creating a single PruningPredicate for each + // group. + let pruned_summaries: Vec<_> = summaries + .into_iter() + .filter(|c| must_keep(observer, c.as_ref(), &filter_expr)) + .collect(); + + debug!( + num_chunks, + num_pruned_chunks = pruned_summaries.len(), + "Pruned chunks" + ); + pruned_summaries +} + +/// returns true if rows in chunk may pass the predicate +fn must_keep(observer: &O, chunk: &P, filter_expr: &Expr) -> bool +where + P: Prunable, + O: PruningObserver, +{ + trace!(?filter_expr, schema=?chunk.schema(), "creating pruning predicate"); + + let pruning_predicate = match PruningPredicate::try_new(filter_expr, chunk.schema()) { + Ok(p) => p, + Err(e) => { + observer.could_not_prune_chunk(chunk, "Can not create pruning predicate"); + trace!(%e, ?filter_expr, "Can not create pruning predicate"); + return true; + } + }; + + let statistics = PrunableStats { + summary: chunk.summary(), + }; + + match pruning_predicate.prune(&statistics) { + Ok(results) => { + // Boolean array for each row in stats, false if the + // stats could not pass the predicate + let must_keep = results[0]; // 0 as PrunableStats returns a single row + if !must_keep { + observer.was_pruned(chunk) + } + must_keep + } + Err(e) => { + observer.could_not_prune_chunk(chunk, "Can not evaluate pruning predicate"); + trace!(%e, ?filter_expr, "Can not evauate pruning predicate"); + true + } + } +} + +// struct to implement pruning +struct PrunableStats<'a> { + summary: &'a TableSummary, +} +impl<'a> PrunableStats<'a> { + fn column_summary(&self, column: &str) -> Option<&ColumnSummary> { + self.summary.columns.iter().find(|c| c.name == column) + } +} + +/// Converts stats.min and an appropriate `ScalarValue` +fn min_to_scalar(stats: &Statistics) -> Option { + match stats { + Statistics::I64(v) => v.min.map(ScalarValue::from), + Statistics::U64(v) => v.min.map(ScalarValue::from), + Statistics::F64(v) => v.min.map(ScalarValue::from), + Statistics::Bool(v) => v.min.map(ScalarValue::from), + Statistics::String(v) => v.min.as_deref().map(ScalarValue::from), + } +} + +/// Converts stats.max to an appropriate `ScalarValue` +fn max_to_scalar(stats: &Statistics) -> Option { + match stats { + Statistics::I64(v) => v.max.map(ScalarValue::from), + Statistics::U64(v) => v.max.map(ScalarValue::from), + Statistics::F64(v) => v.max.map(ScalarValue::from), + Statistics::Bool(v) => v.max.map(ScalarValue::from), + Statistics::String(v) => v.max.as_deref().map(ScalarValue::from), + } +} + +impl<'a> PruningStatistics for PrunableStats<'a> { + fn min_values(&self, column: &str) -> Option { + self.column_summary(column) + .and_then(|c| min_to_scalar(&c.stats)) + .map(|s| s.to_array_of_size(1)) + } + + fn max_values(&self, column: &str) -> Option { + self.column_summary(column) + .and_then(|c| max_to_scalar(&c.stats)) + .map(|s| s.to_array_of_size(1)) + } + + fn num_containers(&self) -> usize { + // We don't (yet) group multiple table summaries into a single + // object, so we are always evaluating the pruning predicate + // on a single chunk at a time + 1 + } +} + +#[cfg(test)] +mod test { + use std::{cell::RefCell, fmt, sync::Arc}; + + use arrow::datatypes::{DataType, Field, Schema}; + use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics}; + use datafusion::logical_plan::{col, lit}; + + use crate::predicate::PredicateBuilder; + + use super::*; + + #[test] + fn test_empty() { + test_helpers::maybe_start_logging(); + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1")); + + let predicate = PredicateBuilder::new().build(); + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert_eq!( + observer.events(), + vec!["Could not prune: No expression on predicate"] + ); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + + #[test] + fn test_pruned_f64() { + test_helpers::maybe_start_logging(); + // column1 > 100.0 where + // c1: [0.0, 10.0] --> pruned + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_f64_column("column1", Some(0.0), Some(10.0))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100.0))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + assert_eq!(observer.events(), vec!["chunk1: Pruned"]); + assert!(pruned.is_empty()) + } + + #[test] + fn test_pruned_i64() { + test_helpers::maybe_start_logging(); + // column1 > 100 where + // c1: [0, 10] --> pruned + + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_i64_column("column1", Some(0), Some(10))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert_eq!(observer.events(), vec!["chunk1: Pruned"]); + assert!(pruned.is_empty()) + } + + #[test] + fn test_pruned_u64() { + test_helpers::maybe_start_logging(); + // column1 > 100 where + // c1: [0, 10] --> pruned + + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_u64_column("column1", Some(0), Some(10))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert_eq!(observer.events(), vec!["chunk1: Pruned"]); + assert!(pruned.is_empty()) + } + + #[test] + // Ignore tests as the pruning predicate can't be created. DF + // doesn't support boolean predicates: + // https://github.com/apache/arrow-datafusion/issues/490 + #[ignore] + fn test_pruned_bool() { + test_helpers::maybe_start_logging(); + // column1 where + // c1: [false, true] --> pruned + + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column( + "column1", + Some(false), + Some(true), + )); + + let predicate = PredicateBuilder::new().add_expr(col("column1")).build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert_eq!(observer.events(), vec!["chunk1: Pruned"]); + assert!(pruned.is_empty()) + } + + #[test] + fn test_pruned_string() { + test_helpers::maybe_start_logging(); + // column1 > "z" where + // c1: ["a", "q"] --> pruned + + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1").with_string_column( + "column1", + Some("a"), + Some("q"), + )); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit("z"))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert_eq!(observer.events(), vec!["chunk1: Pruned"]); + assert!(pruned.is_empty()) + } + + #[test] + fn test_not_pruned_f64() { + test_helpers::maybe_start_logging(); + // column1 < 100.0 where + // c1: [0.0, 10.0] --> not pruned + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_f64_column("column1", Some(0.0), Some(10.0))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").lt(lit(100.0))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + assert!(observer.events().is_empty()); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + + #[test] + fn test_not_pruned_i64() { + test_helpers::maybe_start_logging(); + // column1 < 100 where + // c1: [0, 10] --> not pruned + + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_i64_column("column1", Some(0), Some(10))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").lt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert!(observer.events().is_empty()); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + + #[test] + fn test_not_pruned_u64() { + test_helpers::maybe_start_logging(); + // column1 < 100 where + // c1: [0, 10] --> not pruned + + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_u64_column("column1", Some(0), Some(10))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").lt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert!(observer.events().is_empty()); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + + #[test] + // Ignore tests as the pruning predicate can't be created. DF + // doesn't support boolean predicates: + // https://github.com/apache/arrow-datafusion/issues/490 + #[ignore] + fn test_not_pruned_bool() { + test_helpers::maybe_start_logging(); + // column1 + // c1: [false, false] --> pruned + + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column( + "column1", + Some(false), + Some(false), + )); + + let predicate = PredicateBuilder::new().add_expr(col("column1")).build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert!(observer.events().is_empty()); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + + #[test] + fn test_not_pruned_string() { + test_helpers::maybe_start_logging(); + // column1 < "z" where + // c1: ["a", "q"] --> not pruned + + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1").with_string_column( + "column1", + Some("a"), + Some("q"), + )); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").lt(lit("z"))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1], &predicate); + + assert!(observer.events().is_empty()); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + + #[test] + fn test_pruned_null() { + test_helpers::maybe_start_logging(); + // column1 > 100 where + // c1: [Null, 10] --> pruned + // c2: [0, Null] --> not pruned + // c3: [Null, Null] --> not pruned (min/max are not known in chunk 3) + // c4: Null --> not pruned (no statistics at all) + + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1").with_i64_column("column1", None, Some(10))); + + let c2 = Arc::new(TestPrunable::new("chunk2").with_i64_column("column1", Some(0), None)); + + let c3 = Arc::new(TestPrunable::new("chunk3").with_i64_column("column1", None, None)); + + let c4 = Arc::new(TestPrunable::new("chunk4").with_i64_column_no_stats("column1")); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1, c2, c3, c4], &predicate); + + assert_eq!(observer.events(), vec!["chunk1: Pruned"]); + assert_eq!(names(&pruned), vec!["chunk2", "chunk3", "chunk4"]); + } + + #[test] + fn test_pruned_multi_chunk() { + test_helpers::maybe_start_logging(); + // column1 > 100 where + // c1: [0, 10] --> pruned + // c2: [0, 1000] --> not pruned + // c3: [10, 20] --> pruned + // c4: [None, None] --> not pruned + // c5: [10, None] --> not pruned + // c6: [None, 10] --> pruned + + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_i64_column("column1", Some(0), Some(10))); + + let c2 = + Arc::new(TestPrunable::new("chunk2").with_i64_column("column1", Some(0), Some(1000))); + + let c3 = + Arc::new(TestPrunable::new("chunk3").with_i64_column("column1", Some(10), Some(20))); + + let c4 = Arc::new(TestPrunable::new("chunk4").with_i64_column("column1", None, None)); + + let c5 = Arc::new(TestPrunable::new("chunk5").with_i64_column("column1", Some(10), None)); + + let c6 = Arc::new(TestPrunable::new("chunk6").with_i64_column("column1", None, Some(20))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1, c2, c3, c4, c5, c6], &predicate); + + assert_eq!( + observer.events(), + vec!["chunk1: Pruned", "chunk3: Pruned", "chunk6: Pruned"] + ); + assert_eq!(names(&pruned), vec!["chunk2", "chunk4", "chunk5"]); + } + + #[test] + fn test_pruned_different_schema() { + test_helpers::maybe_start_logging(); + // column1 > 100 where + // c1: column1 [0, 100], column2 [0, 4] --> pruned (in range, column2 ignored) + // c2: column1 [0, 1000], column2 [0, 4] --> not pruned (in range, column2 ignored) + // c3: None, column2 [0, 4] --> not pruned (no stats for column1) + let observer = TestObserver::new(); + let c1 = Arc::new( + TestPrunable::new("chunk1") + .with_i64_column("column1", Some(0), Some(100)) + .with_i64_column("column2", Some(0), Some(4)), + ); + + let c2 = Arc::new( + TestPrunable::new("chunk2") + .with_i64_column("column1", Some(0), Some(1000)) + .with_i64_column("column2", Some(0), Some(4)), + ); + + let c3 = Arc::new(TestPrunable::new("chunk3").with_i64_column("column2", Some(0), Some(4))); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1, c2, c3], &predicate); + + assert_eq!( + observer.events(), + vec![ + "chunk1: Pruned", + "chunk3: Could not prune chunk: Can not evaluate pruning predicate" + ] + ); + assert_eq!(names(&pruned), vec!["chunk2", "chunk3"]); + } + + #[test] + fn test_pruned_multi_column() { + test_helpers::maybe_start_logging(); + // column1 > 100 AND column2 < 5 where + // c1: column1 [0, 1000], column2 [0, 4] --> not pruned (both in range) + // c2: column1 [0, 10], column2 [0, 4] --> pruned (column1 and column2 out of range) + // c3: column1 [0, 10], column2 [5, 10] --> pruned (column1 out of range, column2 in of range) + // c4: column1 [1000, 2000], column2 [0, 4] --> not pruned (column1 in range, column2 in range) + // c5: column1 [0, 10], column2 Null --> pruned (column1 out of range, but column2 has no stats) + // c6: column1 Null, column2 [0, 4] --> not pruned (column1 has no stats, column2 out of range) + + let observer = TestObserver::new(); + let c1 = Arc::new( + TestPrunable::new("chunk1") + .with_i64_column("column1", Some(0), Some(1000)) + .with_i64_column("column2", Some(0), Some(4)), + ); + + let c2 = Arc::new( + TestPrunable::new("chunk2") + .with_i64_column("column1", Some(0), Some(10)) + .with_i64_column("column2", Some(0), Some(4)), + ); + + let c3 = Arc::new( + TestPrunable::new("chunk3") + .with_i64_column("column1", Some(0), Some(10)) + .with_i64_column("column2", Some(5), Some(10)), + ); + + let c4 = Arc::new( + TestPrunable::new("chunk4") + .with_i64_column("column1", Some(1000), Some(2000)) + .with_i64_column("column2", Some(0), Some(4)), + ); + + let c5 = Arc::new( + TestPrunable::new("chunk5") + .with_i64_column("column1", Some(0), Some(10)) + .with_i64_column_no_stats("column2"), + ); + + let c6 = Arc::new( + TestPrunable::new("chunk6") + .with_i64_column_no_stats("column1") + .with_i64_column("column2", Some(0), Some(4)), + ); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5)))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1, c2, c3, c4, c5, c6], &predicate); + + assert_eq!( + observer.events(), + vec!["chunk2: Pruned", "chunk3: Pruned", "chunk5: Pruned"] + ); + assert_eq!(names(&pruned), vec!["chunk1", "chunk4", "chunk6"]); + } + + #[test] + fn test_pruned_incompatible_types() { + test_helpers::maybe_start_logging(); + // Ensure pruning doesn't error / works when some chunks + // return stats of incompatible types + + // column1 < 100 + // c1: column1 ["0", "9"] --> not pruned (types are different) + // c2: column1 ["1000", "2000"] --> not pruned (types are still different) + // c3: column1 [1000, 2000] --> pruned (types are correct) + + let observer = TestObserver::new(); + let c1 = Arc::new(TestPrunable::new("chunk1").with_string_column( + "column1", + Some("0"), + Some("9"), + )); + + let c2 = Arc::new(TestPrunable::new("chunk2").with_string_column( + "column1", + Some("1000"), + Some("2000"), + )); + + let c3 = Arc::new(TestPrunable::new("chunk3").with_i64_column( + "column1", + Some(1000), + Some(2000), + )); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").lt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1, c2, c3], &predicate); + + assert_eq!( + observer.events(), + vec![ + "chunk1: Could not prune chunk: Can not create pruning predicate", + "chunk2: Could not prune chunk: Can not create pruning predicate", + "chunk3: Pruned", + ] + ); + assert_eq!(names(&pruned), vec!["chunk1", "chunk2"]); + } + + #[test] + fn test_pruned_different_types() { + test_helpers::maybe_start_logging(); + // Ensure pruning works even when different chunks have + // different types for the columns + + // column1 < 100 + // c1: column1 [0i64, 1000i64] --> not pruned (in range) + // c2: column1 [0u64, 1000u64] --> not pruned (note types are different) + // c3: column1 [1000i64, 2000i64] --> pruned (out of range) + // c4: column1 [1000u64, 2000u64] --> pruned (types are different) + + let observer = TestObserver::new(); + let c1 = + Arc::new(TestPrunable::new("chunk1").with_i64_column("column1", Some(0), Some(1000))); + + let c2 = + Arc::new(TestPrunable::new("chunk2").with_u64_column("column1", Some(0), Some(1000))); + + let c3 = Arc::new(TestPrunable::new("chunk3").with_i64_column( + "column1", + Some(1000), + Some(2000), + )); + + let c4 = Arc::new(TestPrunable::new("chunk4").with_u64_column( + "column1", + Some(1000), + Some(2000), + )); + + let predicate = PredicateBuilder::new() + .add_expr(col("column1").lt(lit(100))) + .build(); + + let pruned = prune_chunks(&observer, vec![c1, c2, c3, c4], &predicate); + + assert_eq!(observer.events(), vec!["chunk3: Pruned", "chunk4: Pruned"]); + assert_eq!(names(&pruned), vec!["chunk1", "chunk2"]); + } + + fn names(pruned: &[Arc]) -> Vec<&str> { + pruned.iter().map(|p| p.name.as_str()).collect() + } + + #[derive(Debug, Default)] + struct TestObserver { + events: RefCell>, + } + + impl TestObserver { + fn new() -> Self { + Self::default() + } + + fn events(&self) -> Vec { + self.events.borrow().iter().cloned().collect() + } + } + + impl PruningObserver for TestObserver { + type Observed = TestPrunable; + + fn was_pruned(&self, chunk: &Self::Observed) { + self.events.borrow_mut().push(format!("{}: Pruned", chunk)) + } + + fn could_not_prune(&self, reason: &str) { + self.events + .borrow_mut() + .push(format!("Could not prune: {}", reason)) + } + + fn could_not_prune_chunk(&self, chunk: &Self::Observed, reason: &str) { + self.events + .borrow_mut() + .push(format!("{}: Could not prune chunk: {}", chunk, reason)) + } + } + + #[derive(Debug, Clone)] + struct TestPrunable { + name: String, + summary: TableSummary, + schema: SchemaRef, + } + + /// Implementation of creating a new column with statitics for TestPrunable + macro_rules! impl_with_column { + ($SELF:expr, $COLUMN_NAME:expr, $MIN:expr, $MAX:expr, $DATA_TYPE:ident, $STAT_TYPE:ident) => {{ + let Self { + name, + summary, + schema, + } = $SELF; + let column_name = $COLUMN_NAME.into(); + let new_self = Self { + name, + schema: Self::add_field_to_schema(&column_name, schema, DataType::$DATA_TYPE), + summary: Self::add_column_to_summary( + summary, + column_name, + Statistics::$STAT_TYPE(StatValues { + distinct_count: None, + min: $MIN, + max: $MAX, + count: 42, + }), + ), + }; + new_self + }}; + } + + impl TestPrunable { + fn new(name: impl Into) -> Self { + let name = name.into(); + let summary = TableSummary::new(&name); + let schema = Arc::new(Schema::new(vec![])); + Self { + name, + summary, + schema, + } + } + + /// Adds an f64 column named into the schema + fn with_f64_column( + self, + column_name: impl Into, + min: Option, + max: Option, + ) -> Self { + impl_with_column!(self, column_name, min, max, Float64, F64) + } + + /// Adds an i64 column named into the schema + fn with_i64_column( + self, + column_name: impl Into, + min: Option, + max: Option, + ) -> Self { + impl_with_column!(self, column_name, min, max, Int64, I64) + } + + /// Adds an i64 column named into the schema, but with no stats + fn with_i64_column_no_stats(self, column_name: impl AsRef) -> Self { + let Self { + name, + summary, + schema, + } = self; + Self { + name, + schema: Self::add_field_to_schema(column_name.as_ref(), schema, DataType::Int64), + // Note we don't add any stats + summary, + } + } + + /// Adds an u64 column named into the schema + fn with_u64_column( + self, + column_name: impl Into, + min: Option, + max: Option, + ) -> Self { + impl_with_column!(self, column_name, min, max, UInt64, U64) + } + + /// Adds bool column named into the schema + fn with_bool_column( + self, + column_name: impl Into, + min: Option, + max: Option, + ) -> Self { + impl_with_column!(self, column_name, min, max, Boolean, Bool) + } + + /// Adds a string column named into the schema + fn with_string_column( + self, + column_name: impl Into, + min: Option<&str>, + max: Option<&str>, + ) -> Self { + let min = min.map(|v| v.to_string()); + let max = max.map(|v| v.to_string()); + impl_with_column!(self, column_name, min, max, Utf8, String) + } + + fn add_field_to_schema( + column_name: &str, + schema: SchemaRef, + data_type: DataType, + ) -> SchemaRef { + let new_field = Field::new(column_name, data_type, true); + let fields: Vec<_> = schema + .fields() + .iter() + .cloned() + .chain(std::iter::once(new_field)) + .collect(); + + Arc::new(Schema::new(fields)) + } + + fn add_column_to_summary( + mut summary: TableSummary, + column_name: impl Into, + stats: Statistics, + ) -> TableSummary { + summary.columns.push(ColumnSummary { + name: column_name.into(), + influxdb_type: None, + stats, + }); + + summary + } + } + + impl fmt::Display for TestPrunable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name) + } + } + + impl Prunable for TestPrunable { + fn summary(&self) -> &TableSummary { + &self.summary + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + } +}