diff --git a/Cargo.lock b/Cargo.lock index 7e4dd59df4..9a719e9366 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,9 +20,9 @@ checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" [[package]] name = "addr2line" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03345e98af8f3d786b6d9f656ccfa6ac316d954e92bc4841f0bba20789d5fb5a" +checksum = "e7a2e47a1fbe209ee101dd6d61285226744c6c8d3c21c8dc878ba6cb9f467f3a" dependencies = [ "gimli", ] @@ -190,9 +190,9 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f57fec1ac7e4de72dcc69811795f1a7172ed06012f80a5d1ee651b62484f588" +checksum = "a88b6bd5df287567ffdf4ddf4d33060048e1068308e5f62d81c6f9824a045a48" dependencies = [ "bstr", "doc-comment", @@ -323,9 +323,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.59" +version = "0.3.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4717cfcbfaa661a0fd48f8453951837ae7e8f81e481fbb136e3202d72805a744" +checksum = "b7815ea54e4d821e791162e078acbebfd6d8c8939cd559c9335dceb1c8ca7282" dependencies = [ "addr2line", "cc", @@ -904,7 +904,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=16011120a1b73798049c5be49f9548b00f8a0a00#16011120a1b73798049c5be49f9548b00f8a0a00" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=964f49449ae7b338999fec133ae0174f01a931ae#964f49449ae7b338999fec133ae0174f01a931ae" dependencies = [ "ahash 0.7.4", "arrow", @@ -1449,9 +1449,9 @@ dependencies = [ [[package]] name = "heck" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" dependencies = [ "unicode-segmentation", ] @@ -2324,9 +2324,12 @@ dependencies = [ [[package]] name = "object" -version = "0.24.0" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a5b3dd1c072ee7963717671d1ca129f1048fda25edea6b752bfc71ac8854170" +checksum = "f8bc1d42047cf336f0f939c99e97183cf31551bf0f2865a2ec9c8d91fd4ffb5e" +dependencies = [ + "memchr", +] [[package]] name = "object_store" @@ -4569,9 +4572,9 @@ dependencies = [ [[package]] name = "unicode-normalization" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33717dca7ac877f497014e10d73f3acf948c342bee31b5ca7892faf94ccc6b49" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" dependencies = [ "tinyvec", ] @@ -4859,9 +4862,9 @@ checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" [[package]] name = "zstd" -version = "0.8.1+zstd.1.5.0" +version = "0.8.3+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357d6bb1bd9c6f6a55a5a15c74d01260b272f724dc60cc829b86ebd2172ac5ef" +checksum = "5ea7094c7b4a58fbd738eb0d4a2fc7684a0e6949a31597e074ffe20a07cbc2bf" dependencies = [ "zstd-safe", ] diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 2561661afc..f2d364b559 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -50,6 +50,9 @@ pub struct DatabaseRules { /// Duration for which the cleanup loop should sleep on average. /// Defaults to 500 seconds. pub worker_cleanup_avg_sleep: Duration, + + /// An optional connection string to a write buffer. + pub write_buffer_connection_string: Option, } #[derive(Debug, Eq, PartialEq, Clone)] @@ -80,6 +83,7 @@ impl DatabaseRules { lifecycle_rules: Default::default(), routing_rules: None, worker_cleanup_avg_sleep: Duration::from_secs(500), + write_buffer_connection_string: None, } } diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 1de80d4a4f..63e197e2fe 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -1,7 +1,7 @@ //! This module contains structs that describe the metadata for a partition //! including schema, summary statistics, and file locations in storage. -use std::{borrow::Cow, mem}; +use std::{borrow::Cow, cmp::Ordering, mem}; use serde::{Deserialize, Serialize}; use std::borrow::Borrow; @@ -79,8 +79,9 @@ pub struct UnaggregatedTableSummary { pub table: TableSummary, } -/// Metadata and statistics information for a table, aggregated across -/// chunks. +/// Metadata and statistics information for a table. This can be +/// either for the portion of a Table stored within a single chunk or +/// aggregated across chunks. #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] pub struct TableSummary { /// Table name @@ -141,6 +142,37 @@ impl TableSummary { pub fn column(&self, name: &str) -> Option<&ColumnSummary> { self.columns.iter().find(|c| c.name == name) } + + /// Return the columns used for the "primary key" in this table. + /// + /// Currently this relies on the InfluxDB data model annotations + /// for what columns to include in the key columns + pub fn primary_key_columns(&self) -> Vec<&ColumnSummary> { + use InfluxDbType::*; + let mut key_summaries: Vec<&ColumnSummary> = self + .columns + .iter() + .filter(|s| match s.influxdb_type { + Some(Tag) => true, + Some(Field) => false, + Some(Timestamp) => true, + None => false, + }) + .collect(); + + // Now, sort lexographically (but put timestamp last) + key_summaries.sort_by( + |a, b| match (a.influxdb_type.as_ref(), b.influxdb_type.as_ref()) { + (Some(Tag), Some(Tag)) => a.name.cmp(&b.name), + (Some(Timestamp), Some(Tag)) => Ordering::Greater, + (Some(Tag), Some(Timestamp)) => Ordering::Less, + (Some(Timestamp), Some(Timestamp)) => panic!("multiple timestamps in summary"), + _ => panic!("Unexpected types in key summary"), + }, + ); + + key_summaries + } } // Replicate this enum here as it can't be derived from the existing statistics @@ -273,6 +305,17 @@ impl Statistics { } } + /// Returns true if both the min and max values are None (aka not known) + pub fn is_none(&self) -> bool { + match self { + Self::I64(v) => v.is_none(), + Self::U64(v) => v.is_none(), + Self::F64(v) => v.is_none(), + Self::Bool(v) => v.is_none(), + Self::String(v) => v.is_none(), + } + } + /// Return the minimum value, if any, formatted as a string pub fn min_as_str(&self) -> Option> { match self { @@ -397,6 +440,11 @@ where } }; } + + /// Returns true if both the min and max values are None (aka not known) + pub fn is_none(&self) -> bool { + self.min.is_none() && self.max.is_none() + } } impl StatValues { @@ -442,6 +490,40 @@ impl StatValues { } } +/// Represents the result of comparing the min/max ranges of two [`StatValues`] +#[derive(Debug, PartialEq)] +pub enum StatOverlap { + /// There is at least one value that exists in both ranges + NonZero, + + /// There are zero values that exists in both ranges + Zero, + + /// It is not known if there are any intersections (e.g. because + /// one of the bounds is not Known / is None) + Unknown, +} + +impl StatValues +where + T: PartialOrd, +{ + /// returns information about the overlap between two `StatValues` + pub fn overlaps(&self, other: &Self) -> StatOverlap { + match (&self.min, &self.max, &other.min, &other.max) { + (Some(self_min), Some(self_max), Some(other_min), Some(other_max)) => { + if self_min <= other_max && self_max >= other_min { + StatOverlap::NonZero + } else { + StatOverlap::Zero + } + } + // At least one of the values was None + _ => StatOverlap::Unknown, + } + } +} + pub trait IsNan { fn is_nan(&self) -> bool; } @@ -535,6 +617,116 @@ mod tests { assert_eq!(stat.count, 2); } + #[test] + fn statistics_is_none() { + let mut stat = StatValues::default(); + assert!(stat.is_none()); + stat.min = Some(0); + assert!(!stat.is_none()); + stat.max = Some(1); + assert!(!stat.is_none()); + } + + #[test] + fn statistics_overlaps() { + let stat1 = StatValues { + min: Some(10), + max: Some(20), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat1), StatOverlap::NonZero); + + // [--stat1--] + // [--stat2--] + let stat2 = StatValues { + min: Some(5), + max: Some(15), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat2), StatOverlap::NonZero); + assert_eq!(stat2.overlaps(&stat1), StatOverlap::NonZero); + + // [--stat1--] + // [--stat3--] + let stat3 = StatValues { + min: Some(15), + max: Some(25), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat3), StatOverlap::NonZero); + assert_eq!(stat3.overlaps(&stat1), StatOverlap::NonZero); + + // [--stat1--] + // [--stat4--] + let stat4 = StatValues { + min: Some(25), + max: Some(35), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat4), StatOverlap::Zero); + assert_eq!(stat4.overlaps(&stat1), StatOverlap::Zero); + + // [--stat1--] + // [--stat5--] + let stat5 = StatValues { + min: Some(0), + max: Some(5), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat5), StatOverlap::Zero); + assert_eq!(stat5.overlaps(&stat1), StatOverlap::Zero); + } + + #[test] + fn statistics_overlaps_none() { + let stat1 = StatValues { + min: Some(10), + max: Some(20), + ..Default::default() + }; + + let stat2 = StatValues { + min: None, + max: Some(20), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat2), StatOverlap::Unknown); + assert_eq!(stat2.overlaps(&stat1), StatOverlap::Unknown); + + let stat3 = StatValues { + min: Some(10), + max: None, + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat3), StatOverlap::Unknown); + assert_eq!(stat3.overlaps(&stat1), StatOverlap::Unknown); + + let stat4 = StatValues { + min: None, + max: None, + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat4), StatOverlap::Unknown); + assert_eq!(stat4.overlaps(&stat1), StatOverlap::Unknown); + } + + #[test] + fn statistics_overlaps_mixed_none() { + let stat1 = StatValues { + min: Some(10), + max: None, + ..Default::default() + }; + + let stat2 = StatValues { + min: None, + max: Some(5), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat2), StatOverlap::Unknown); + assert_eq!(stat2.overlaps(&stat1), StatOverlap::Unknown); + } + #[test] fn update_string() { let mut stat = StatValues::new_with_value("bbb".to_string()); @@ -558,6 +750,18 @@ mod tests { assert_eq!(stat.count, 4); } + #[test] + fn stats_is_none() { + let stat = Statistics::I64(StatValues::new(Some(-1), Some(100), 1)); + assert!(!stat.is_none()); + + let stat = Statistics::I64(StatValues::new(None, Some(100), 1)); + assert!(!stat.is_none()); + + let stat = Statistics::I64(StatValues::new(None, None, 0)); + assert!(stat.is_none()); + } + #[test] fn stats_as_str_i64() { let stat = Statistics::I64(StatValues::new(Some(-1), Some(100), 1)); diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 3a67f548a0..8e52248a40 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (function packages) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "16011120a1b73798049c5be49f9548b00f8a0a00", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "964f49449ae7b338999fec133ae0174f01a931ae", default-features = false, package = "datafusion" } diff --git a/entry/src/entry.fbs b/entry/src/entry.fbs index b05c708895..e0323f93d7 100644 --- a/entry/src/entry.fbs +++ b/entry/src/entry.fbs @@ -1,7 +1,7 @@ namespace influxdata.iox.write.v1; // Every modification to a database is represented as an entry. These can be forwarded -// on to other IOx servers or to Kafka. +// on to other IOx servers or to the write buffer. // An entry can only be one of these Operation types union Operation { diff --git a/entry/src/entry.rs b/entry/src/entry.rs index e385dbf7fc..f836e89d19 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -290,7 +290,7 @@ pub struct ShardedEntry { /// Wrapper type for the flatbuffer Entry struct. Has convenience methods for /// iterating through the partitioned writes. #[self_referencing] -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct Entry { data: Vec, #[borrows(data)] @@ -1201,13 +1201,9 @@ pub enum SequencedEntryError { }, } -#[self_referencing] #[derive(Debug)] pub struct SequencedEntry { - data: Vec, - #[borrows(data)] - #[covariant] - entry: entry_fb::Entry<'this>, + entry: Entry, sequence: Sequence, } @@ -1221,33 +1217,30 @@ impl SequencedEntry { pub fn new_from_process_clock( process_clock: ClockValue, server_id: ServerId, - data: &[u8], + entry: Entry, ) -> Result { - SequencedEntryTryBuilder { - data: data.to_vec(), - entry_builder: |data| { - flatbuffers::root::>(data).context(InvalidFlatbuffer) - }, + Ok(Self { + entry, sequence: Sequence { id: server_id.get_u32(), number: process_clock.get_u64(), }, - } - .try_build() + }) + } + + pub fn new_from_sequence( + sequence: Sequence, + entry: Entry, + ) -> Result { + Ok(Self { entry, sequence }) } pub fn partition_writes(&self) -> Option>> { - match self.borrow_entry().operation_as_write().as_ref() { - Some(w) => w - .partition_writes() - .as_ref() - .map(|w| w.iter().map(|fb| PartitionWrite { fb }).collect::>()), - None => None, - } + self.entry.partition_writes() } pub fn sequence(&self) -> &Sequence { - self.borrow_sequence() + &self.sequence } } diff --git a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto index d2dc37dc79..9dfb405d71 100644 --- a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto +++ b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto @@ -133,6 +133,9 @@ message DatabaseRules { // Duration for which the cleanup loop should sleep on average. // Defaults to 500 seconds. google.protobuf.Duration worker_cleanup_avg_sleep = 10; + + // Optionally, the address of the write buffer + string write_buffer_connection_string = 11; } message RoutingConfig { diff --git a/generated_types/src/database_rules.rs b/generated_types/src/database_rules.rs index 053551fe43..374078ecd1 100644 --- a/generated_types/src/database_rules.rs +++ b/generated_types/src/database_rules.rs @@ -8,7 +8,7 @@ use data_types::database_rules::{ }; use data_types::DatabaseName; -use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt}; +use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt, FromFieldString}; use crate::influxdata::iox::management::v1 as management; mod lifecycle; @@ -23,6 +23,9 @@ impl From for management::DatabaseRules { lifecycle_rules: Some(rules.lifecycle_rules.into()), routing_rules: rules.routing_rules.map(Into::into), worker_cleanup_avg_sleep: Some(rules.worker_cleanup_avg_sleep.into()), + write_buffer_connection_string: rules + .write_buffer_connection_string + .unwrap_or_default(), } } } @@ -53,12 +56,15 @@ impl TryFrom for DatabaseRules { None => Duration::from_secs(500), }; + let write_buffer_connection_string = proto.write_buffer_connection_string.optional(); + Ok(Self { name, partition_template, lifecycle_rules, routing_rules, worker_cleanup_avg_sleep, + write_buffer_connection_string, }) } } diff --git a/internal_types/src/schema/builder.rs b/internal_types/src/schema/builder.rs index 15aa31949b..b533ebfde2 100644 --- a/internal_types/src/schema/builder.rs +++ b/internal_types/src/schema/builder.rs @@ -23,7 +23,7 @@ pub enum Error { pub type Result = std::result::Result; /// Builder for a Schema -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct SchemaBuilder { /// Optional measurement name measurement: Option, @@ -121,9 +121,10 @@ impl SchemaBuilder { self } - /// Creates an Arrow schema with embedded metadata, consuming self. All - /// schema validation happens at this time. - + /// Creates an Arrow schema with embedded metadata, resetting the + /// builder back to `default`. All schema validation happens at + /// this time. + /// /// ``` /// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; /// diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 15e29811b5..44ac4ef63a 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -422,29 +422,13 @@ impl Storage { let (tx, rx) = tokio::sync::mpsc::channel(2); - // Do an an async dance here to make sure any error returned + // Run async dance here to make sure any error returned // `download_and_scan_parquet` is sent back to the reader and // not silently ignored tokio::task::spawn(async move { - // Channels to/from the parquet reader - let (parquet_tx, mut parquet_rx) = tokio::sync::mpsc::channel(2); - - let download_future = - Self::download_and_scan_parquet(predicate, projection, path, store, parquet_tx); - - // task whose only job in life is to shuffle messages from - // the parquet reader to the final output receiver - let captured_tx = tx.clone(); - tokio::task::spawn(async move { - while let Some(msg) = parquet_rx.recv().await { - if let Err(e) = captured_tx.send(msg).await { - debug!(%e, "Receiver hung up on parquet writer"); - } - } - }); - - // in this task, wait for the future that is doing the actual work on this task - let download_result = download_future.await; + let download_result = + Self::download_and_scan_parquet(predicate, projection, path, store, tx.clone()) + .await; // If there was an error returned from download_and_scan_parquet send it back to the receiver. if let Err(e) = download_result { diff --git a/query/src/duplicate.rs b/query/src/duplicate.rs new file mode 100644 index 0000000000..39687d55c3 --- /dev/null +++ b/query/src/duplicate.rs @@ -0,0 +1,583 @@ +//! Contains the algorithm to determine which chunks may contain +//! "duplicate" primary keys (that is where data with the same +//! combination of "tag" columns and timestamp in the InfluxDB +//! DataModel have been written in via multiple distinct line protocol +//! writes (and thus are stored in separate rows) + +use crate::pruning::Prunable; +use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics}; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display( + "Mismatched type when comparing statistics for column '{}'", + column_name + ))] + MismatchedStatsTypes { column_name: String }, + + #[snafu(display( + "Internal error. Partial statistics found for column '{}' looking for duplicates. s1: '{:?}' s2: '{:?}'", + column_name, s1, s2 + ))] + InternalPartialStatistics { + column_name: String, + s1: Statistics, + s2: Statistics, + }, +} + +pub type Result = std::result::Result; + +/// Groups [`Prunable`] objects into disjoint sets using values of +/// min/max statistics. The groups are formed such that each group +/// *may* contain InfluxDB data model primary key duplicates with +/// others in that set. +/// +/// The *may* overlap calculation is conservative -- that is it may +/// flag two chunks as having overlapping data when in reality they do +/// not. If chunks are split into different groups, then they are +/// guaranteed not to contain any rows with the same primary key. +/// +/// Note 1: since this algorithm is based on statistics, it may have +/// false positives (flag that two objects may have overlap when in +/// reality they do not) +/// +/// Note 2: this algorithm is O(n^2) worst case (when no chunks have +/// any overlap) +pub fn group_potential_duplicates(chunks: Vec) -> Result>> +where + C: Prunable, +{ + let mut groups: Vec>> = vec![]; + + // Step 1: find the up groups using references to `chunks` stored + // in KeyStats views + for (idx, chunk) in chunks.iter().enumerate() { + // try to find a place to put this chunk + let mut key_stats = Some(KeyStats::new(idx, chunk)); + + 'outer: for group in &mut groups { + // If this chunk overlaps any existing chunk in group add + // it to group + for ks in group.iter() { + if ks.potential_overlap(key_stats.as_ref().unwrap())? { + group.push(key_stats.take().unwrap()); + break 'outer; + } + } + } + + if let Some(key_stats) = key_stats { + // couldn't place key_stats in any existing group, needs a + // new group + groups.push(vec![key_stats]) + } + } + + // Now some shenanigans to rearrange the actual input chunks into + // the final resulting groups corresponding to the groups of + // KeyStats + + // drop all references to chunks, and only keep indicides + let groups: Vec> = groups + .into_iter() + .map(|group| group.into_iter().map(|key_stats| key_stats.index).collect()) + .collect(); + + let mut chunks: Vec> = chunks.into_iter().map(Some).collect(); + + let groups = groups + .into_iter() + .map(|group| { + group + .into_iter() + .map(|index| { + chunks[index] + .take() + .expect("Internal mismatch while gathering into groups") + }) + .collect::>() + }) + .collect::>>(); + + Ok(groups) +} + +/// Holds a view to a chunk along with information about its columns +/// in an easy to compare form +#[derive(Debug)] +struct KeyStats<'a, C> +where + C: Prunable, +{ + /// The index of the chunk + index: usize, + + /// The underlying chunk + chunk: &'a C, + + /// the ColumnSummaries for the chunk's 'primary_key' columns, in + /// "lexographical" order (aka sorted by name) + key_summaries: Vec<&'a ColumnSummary>, +} + +impl<'a, C> KeyStats<'a, C> +where + C: Prunable, +{ + /// Create a new view for the specified chunk at index `index`, + /// computing the columns to be used in the primary key comparison + pub fn new(index: usize, chunk: &'a C) -> Self { + let key_summaries = chunk.summary().primary_key_columns(); + + Self { + index, + chunk, + key_summaries, + } + } + + /// Returns true if the chunk has a potential primary key overlap with the other chunk + fn potential_overlap(&self, other: &Self) -> Result { + // in order to have overlap, *all* the columns in the sort order + // need to be the same. Note gaps in the sort order mean they + // are for different parts of the keyspace + if self.key_summaries.len() != other.key_summaries.len() { + // Short circuit on different lengths + return Ok(false); + } + + let iter = self.key_summaries.iter().zip(other.key_summaries.iter()); + for (s1, s2) in iter { + if s1.name != s2.name || !Self::columns_might_overlap(s1, s2)? { + return Ok(false); + } + } + + Ok(true) + } + + /// Returns true if the two columns MAY overlap other, based on + /// statistics + pub fn columns_might_overlap(s1: &ColumnSummary, s2: &ColumnSummary) -> Result { + use Statistics::*; + let overlap = match (&s1.stats, &s2.stats) { + (I64(s1), I64(s2)) => s1.overlaps(s2), + (U64(s1), U64(s2)) => s1.overlaps(s2), + (F64(s1), F64(s2)) => s1.overlaps(s2), + (Bool(s1), Bool(s2)) => s1.overlaps(s2), + (String(s1), String(s2)) => s1.overlaps(s2), + _ => { + return MismatchedStatsTypes { + column_name: s1.name.clone(), + } + .fail() + } + }; + + // If either column has no min/max, treat the column as + // being entirely null + let is_none = s1.stats.is_none() || s2.stats.is_none(); + + match overlap { + StatOverlap::NonZero => Ok(true), + StatOverlap::Zero => Ok(false), + StatOverlap::Unknown if is_none => Ok(false), // no stats means no values + // This case means there some stats, but not all. + // Unclear how this could happen, so throw an error for now + StatOverlap::Unknown => InternalPartialStatistics { + column_name: s1.name.clone(), + s1: s1.stats.clone(), + s2: s2.stats.clone(), + } + .fail(), + } + } +} + +#[cfg(test)] +mod test { + use arrow::datatypes::SchemaRef; + use data_types::partition_metadata::{ + ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary, + }; + use internal_types::schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; + + use super::*; + + #[macro_export] + macro_rules! assert_groups_eq { + ($EXPECTED_LINES: expr, $GROUPS: expr) => { + let expected_lines: Vec = + $EXPECTED_LINES.into_iter().map(|s| s.to_string()).collect(); + + let actual_lines = to_string($GROUPS); + + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + }; + } + + // Test cases: + + #[test] + fn one_column_no_overlap() { + let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("mumbai")); + + let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("new york"), Some("zoo york")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn one_column_overlap() { + let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("new york")); + + let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("denver"), Some("zoo york")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn multi_columns() { + let c1 = TestChunk::new("chunk1").with_timestamp(0, 1000).with_tag( + "tag1", + Some("boston"), + Some("new york"), + ); + + // Overlaps in tag1, but not in time + let c2 = TestChunk::new("chunk2") + .with_tag("tag1", Some("denver"), Some("zoo york")) + .with_timestamp(2000, 3000); + + // Overlaps in time, but not in tag1 + let c3 = TestChunk::new("chunk3") + .with_tag("tag1", Some("zzx"), Some("zzy")) + .with_timestamp(500, 1500); + + // Overlaps in time, and in tag1 + let c4 = TestChunk::new("chunk4") + .with_tag("tag1", Some("aaa"), Some("zzz")) + .with_timestamp(500, 1500); + + let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded"); + + let expected = vec![ + "Group 0: [chunk1, chunk4]", + "Group 1: [chunk2]", + "Group 2: [chunk3]", + ]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn boundary() { + // check that overlap calculations include the bound + let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb")); + let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("bbb"), Some("ccc")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn same() { + // check that if chunks overlap exactly on the boundaries they are still grouped + let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb")); + let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("aaa"), Some("bbb")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn different_tag_names() { + // check that if chunks overlap but in different tag names + let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb")); + let c2 = TestChunk::new("chunk2").with_tag("tag2", Some("aaa"), Some("bbb")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn three_column() { + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("xxx"), Some("yyy")) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("xxx"), Some("yyy")) + // Timestamp doesn't overlap, but the two tags do + .with_timestamp(2001, 3000); + + let c3 = TestChunk::new("chunk3") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("aaa"), Some("zzz")) + // all three overlap + .with_timestamp(1000, 2000); + + let groups = group_potential_duplicates(vec![c1, c2, c3]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1, chunk3]", "Group 1: [chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn tag_order() { + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("xxx"), Some("yyy")) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + .with_tag("tag2", Some("aaa"), Some("zzz")) + .with_tag("tag1", Some("aaa"), Some("bbb")) + // all three overlap, but tags in different order + .with_timestamp(500, 1000); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn tag_order_no_tags() { + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("xxx"), Some("yyy")) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + // tag1 and timestamp overlap, but no tag2 (aka it is all null) + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_timestamp(500, 1000); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn tag_order_null_stats() { + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("xxx"), Some("yyy")) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + // tag1 and timestamp overlap, tag2 has no stats (null) + // so we say they can't overlap + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", None, None) + .with_timestamp(500, 1000); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn tag_order_partial_stats() { + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + // tag1 has a min but not a max. Should result in error + .with_tag("tag1", Some("aaa"), None) + .with_timestamp(500, 1000); + + let result = group_potential_duplicates(vec![c1, c2]).unwrap_err(); + + let result = result.to_string(); + let expected = + "Internal error. Partial statistics found for column 'tag1' looking for duplicates"; + assert!( + result.contains(expected), + "can not find {} in {}", + expected, + result + ); + } + + #[test] + fn tag_fields_not_counted() { + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_int_field("field", Some(0), Some(2)) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + // tag1 and timestamp overlap, but field value does not + // should still overlap + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_int_field("field", Some(100), Some(200)) + .with_timestamp(500, 1000); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn mismatched_types() { + // Test if same column has different types in different + // chunks; this will likely cause errors elsewhere in practice + // as the schemas are incompatible (and can't be merged) + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_timestamp(0, 1000); + + let c2 = TestChunk::new("chunk2") + // tag1 column is actually a field is different in chunk + // 2, so even though the timestamps overlap these chunks + // don't have duplicates + .with_int_field("tag1", Some(100), Some(200)) + .with_timestamp(0, 1000); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + assert_groups_eq!(expected, groups); + } + + // --- Test infrastructure -- + + fn to_string(groups: Vec>) -> Vec { + let mut s = vec![]; + for (idx, group) in groups.iter().enumerate() { + let names = group.iter().map(|c| c.name.as_str()).collect::>(); + s.push(format!("Group {}: [{}]", idx, names.join(", "))); + } + s + } + + /// Mocked out prunable provider to use testing overlaps + #[derive(Debug)] + struct TestChunk { + // The name of this chunk + name: String, + summary: TableSummary, + builder: SchemaBuilder, + } + + /// Implementation of creating a new column with statitics for TestPrunable + macro_rules! make_stats { + ($MIN:expr, $MAX:expr, $STAT_TYPE:ident) => {{ + Statistics::$STAT_TYPE(StatValues { + distinct_count: None, + min: $MIN, + max: $MAX, + count: 42, + }) + }}; + } + + impl TestChunk { + /// Create a new TestChunk with a specified name + fn new(name: impl Into) -> Self { + let name = name.into(); + let summary = TableSummary::new(name.clone()); + let builder = SchemaBuilder::new(); + Self { + name, + summary, + builder, + } + } + + /// Adds a tag column with the specified min/max values + fn with_tag( + mut self, + name: impl Into, + min: Option<&str>, + max: Option<&str>, + ) -> Self { + let min = min.map(|v| v.to_string()); + let max = max.map(|v| v.to_string()); + + let tag_name = name.into(); + self.builder.tag(&tag_name); + + self.summary.columns.push(ColumnSummary { + name: tag_name, + influxdb_type: Some(InfluxDbType::Tag), + stats: make_stats!(min, max, String), + }); + self + } + + /// Adds a timestamp column with the specified min/max values + fn with_timestamp(mut self, min: i64, max: i64) -> Self { + self.builder.timestamp(); + + let min = Some(min); + let max = Some(max); + + self.summary.columns.push(ColumnSummary { + name: TIME_COLUMN_NAME.into(), + influxdb_type: Some(InfluxDbType::Timestamp), + stats: make_stats!(min, max, I64), + }); + self + } + + /// Adds an I64 field column with the specified min/max values + fn with_int_field( + mut self, + name: impl Into, + min: Option, + max: Option, + ) -> Self { + let field_name = name.into(); + self.builder + .field(&field_name, arrow::datatypes::DataType::Int64); + + self.summary.columns.push(ColumnSummary { + name: field_name, + influxdb_type: Some(InfluxDbType::Field), + stats: make_stats!(min, max, I64), + }); + self + } + } + + impl Prunable for TestChunk { + fn summary(&self) -> &TableSummary { + &self.summary + } + + fn schema(&self) -> SchemaRef { + self.builder + // need to clone because `build` resets builder state + .clone() + .build() + .expect("created schema") + .as_arrow() + } + } +} diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 30e979b642..6a318a1669 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -1206,6 +1206,7 @@ impl ExpressionVisitor for SupportVisitor { Expr::BinaryExpr { op, .. } => { match op { Operator::Eq + | Operator::NotEq | Operator::Lt | Operator::LtEq | Operator::Gt @@ -1217,7 +1218,7 @@ impl ExpressionVisitor for SupportVisitor { | Operator::And | Operator::Or => Ok(Recursion::Continue(self)), // Unsupported (need to think about ramifications) - Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => { + Operator::Modulus | Operator::Like | Operator::NotLike => { Err(DataFusionError::NotImplemented(format!( "Unsupported operator in gRPC: {:?} in expression {:?}", op, expr diff --git a/query/src/lib.rs b/query/src/lib.rs index 2d119f1a6b..8b82b1b945 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -16,6 +16,7 @@ use pruning::Prunable; use std::{fmt::Debug, sync::Arc}; +pub mod duplicate; pub mod exec; pub mod frontend; pub mod func; diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 8df8726558..850ae65700 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -249,20 +249,16 @@ mod test { } #[test] - // Ignore tests as the pruning predicate can't be created. DF - // doesn't support boolean predicates: - // https://github.com/apache/arrow-datafusion/issues/490 - #[ignore] fn test_pruned_bool() { test_helpers::maybe_start_logging(); // column1 where - // c1: [false, true] --> pruned + // c1: [false, false] --> pruned let observer = TestObserver::new(); let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column( "column1", Some(false), - Some(true), + Some(false), )); let predicate = PredicateBuilder::new().add_expr(col("column1")).build(); @@ -355,20 +351,16 @@ mod test { } #[test] - // Ignore tests as the pruning predicate can't be created. DF - // doesn't support boolean predicates: - // https://github.com/apache/arrow-datafusion/issues/490 - #[ignore] fn test_not_pruned_bool() { test_helpers::maybe_start_logging(); // column1 - // c1: [false, false] --> pruned + // c1: [false, true] --> not pruned let observer = TestObserver::new(); let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column( "column1", Some(false), - Some(false), + Some(true), )); let predicate = PredicateBuilder::new().add_expr(col("column1")).build(); diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 845ce5d9e8..7c368e170e 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -160,6 +160,18 @@ async fn test_read_filter_data_filter() { "+------+-------+------+-------------------------------+", ]; + run_read_filter_test_case!( + TwoMeasurementsMultiSeries {}, + predicate, + expected_results.clone() + ); + + // Same results via a != predicate. + let predicate = PredicateBuilder::default() + .timestamp_range(200, 300) + .add_expr(col("state").not_eq(lit("MA"))) // state=CA + .build(); + run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results); } diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 0f1c7b1842..99b1d252d1 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -435,7 +435,7 @@ async fn sql_select_with_schema_merge_subset() { } #[tokio::test] -async fn sql_predicate_pushdown_correctness() { +async fn sql_predicate_pushdown_correctness_1() { // Test 1: Select everything let expected = vec![ "+-------+--------+-------------------------------+-----------+", @@ -455,7 +455,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_2() { // Test 2: One push-down expression: count > 200 let expected = vec![ "+-------+--------+-------------------------------+-----------+", @@ -474,7 +477,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where count > 200", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_3() { // Test 3: Two push-down expression: count > 200 and town != 'tewsbury' let expected = vec![ "+-------+--------+-------------------------------+-----------+", @@ -492,7 +498,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where count > 200 and town != 'tewsbury'", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_4() { // Test 4: Still two push-down expression: count > 200 and town != 'tewsbury' // even though the results are different let expected = vec![ @@ -510,7 +519,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_5() { // Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000 let expected = vec![ "+-------+--------+-------------------------------+-----------+", @@ -526,7 +538,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_6() { // Test 6: two push-down expression: count > 200 and count < 40000 let expected = vec![ "+-------+--------+-------------------------------+-----------+", @@ -544,7 +559,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where count > 200 and count < 40000", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_7() { // Test 7: two push-down expression on float: system > 4.0 and system < 7.0 let expected = vec![ "+-------+--------+-------------------------------+-----------+", @@ -563,7 +581,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where system > 4.0 and system < 7.0", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_8() { // Test 8: two push-down expression on float: system > 5.0 and system < 7.0 let expected = vec![ "+-------+--------+-------------------------------+----------+", @@ -579,7 +600,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where system > 5.0 and system < 7.0", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_9() { // Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 let expected = vec![ "+-------+--------+-------------------------------+----------+", @@ -594,7 +618,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_10() { // Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 // even though there are more expressions,(count = 632 or town = 'reading'), in the filter let expected = vec![ @@ -609,7 +636,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_11() { // Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and // time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') (rewritten to time GT int(130)) // @@ -619,7 +649,10 @@ async fn sql_predicate_pushdown_correctness() { "SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00')", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_12() { // TODO: Hit stackoverflow in DF. Ticket https://github.com/apache/arrow-datafusion/issues/419 // // Test 12: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and town = 'reading' // // @@ -636,7 +669,10 @@ async fn sql_predicate_pushdown_correctness() { // "SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and town = 'reading'", // &expected // ); +} +#[tokio::test] +async fn sql_predicate_pushdown_correctness_13() { // Test 13: three push-down expression: system > 5.0 and system < 7.0 and town = 'reading' // // Check correctness @@ -655,7 +691,7 @@ async fn sql_predicate_pushdown_correctness() { } #[tokio::test] -async fn sql_predicate_pushdown_explain() { +async fn sql_predicate_pushdown_explain_1() { // Test 1: Select everything let expected = vec![ "+-----------------------------------------+--------------------------------------------------------------------------+", @@ -665,8 +701,12 @@ async fn sql_predicate_pushdown_explain() { "| | TableScan: restaurant projection=None |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "+-----------------------------------------+--------------------------------------------------------------------------+", @@ -676,304 +716,412 @@ async fn sql_predicate_pushdown_explain() { "EXPLAIN VERBOSE SELECT * from restaurant", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_2() { // Test 2: One push-down expression: count > 200 // TODO: Make push-down predicates shown in explain verbose. Ticket #1538 let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: CAST(count AS Int64) > 200 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: CAST(count AS Int64) > 200 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where count > 200", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_2_2() { // Test 2.2: One push-down expression: count > 200.0 let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Float64(200) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Float64(200) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Float64(200) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: CAST(count AS Float64) > 200 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Float64(200) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Float64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Float64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Float64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Float64(200) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: CAST(count AS Float64) > 200 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where count > 200.0", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_2_3() { // Test 2.3: One push-down expression: system > 4.0 let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(4) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(4) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(4) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: system > 4 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: system > 4 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_3() { // Test 3: Two push-down expression: count > 200 and town != 'tewsbury' let expected = vec![ - "+-----------------------------------------+-----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+-----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+-----------------------------------------------------------------------------+", + "+-----------------------------------------+-----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+-----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+-----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury'", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_4() { // Test 4: Still two push-down expression: count > 200 and town != 'tewsbury' // even though the results are different let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_5() { // Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000 let expected = vec![ - "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence AND CAST(count AS Int64) < 40000 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence AND CAST(count AS Int64) < 40000 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_6() { // Test 6: two push-down expression: count > 200 and count < 40000 let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(count AS Int64) < 40000 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: CAST(count AS Int64) > 200 AND CAST(count AS Int64) < 40000 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and count < 40000", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_7() { // Test 7: two push-down expression on float: system > 4.0 and system < 7.0 let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: system > 4 AND system < 7 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: system > 4 AND system < 7 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0 and system < 7.0", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_8() { // Test 8: two push-down expression on float: system > 5.0 and system < 7.0 let expected = vec![ - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+----------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: system > 5 AND system < 7 |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+----------------------------------------------------------------------------+", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+----------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: system > 5 AND system < 7 |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+----------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and system < 7.0", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_9() { // Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 let expected = vec![ - "+-----------------------------------------+--------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+--------------------------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: system > 5 AND CAST(town AS Utf8) != tewsbury AND 7 > system |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+--------------------------------------------------------------------------------------------+", + "+-----------------------------------------+--------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+--------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: system > 5 AND CAST(town AS Utf8) != tewsbury AND 7 > system |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+--------------------------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_10() { // Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 // even though there are more expressions,(count = 632 or town = 'reading'), in the filter let expected = vec![ - "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: system > 5 AND tewsbury != CAST(town AS Utf8) AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", + "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: system > 5 AND tewsbury != CAST(town AS Utf8) AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, "EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')", &expected ); +} +#[tokio::test] +async fn sql_predicate_pushdown_explain_11() { // Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and // time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130) let expected = vec![ - "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - "| logical_plan | Projection: #count, #system, #time, #town |", - "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", - "| | TableScan: restaurant projection=None |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", - "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", - "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", - "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", - "| | FilterExec: 5 < system AND CAST(town AS Utf8) != tewsbury AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading AND time > totimestamp(1970-01-01T00:00:00.000000130+00:00) |", - "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", - "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: #count, #system, #time, #town |", + "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", + "| | TableScan: restaurant projection=None |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", + "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |", + "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", + "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", + "| physical_plan | ProjectionExec: expr=[count, system, time, town] |", + "| | FilterExec: 5 < system AND CAST(town AS Utf8) != tewsbury AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading AND time > totimestamp(1970-01-01T00:00:00.000000130+00:00) |", + "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", + "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", ]; run_sql_explain_test_case!( TwoMeasurementsPredicatePushDown {}, @@ -983,7 +1131,7 @@ async fn sql_predicate_pushdown_explain() { } #[tokio::test] -async fn sql_deduplicate() { +async fn sql_deduplicate_1() { // This current expected is wrong because deduplicate is not available yet let sql = "select time, state, city, min_temp, max_temp, area from h2o order by time, state, city"; @@ -1016,7 +1164,10 @@ async fn sql_deduplicate() { "+-------------------------------+-------+---------+----------+----------+------+", ]; run_sql_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); +} +#[tokio::test] +async fn sql_deduplicate_2() { // Plan with order by let expected = vec![ "+-----------------------------------------+----------------------------------------------------------------------------+", @@ -1028,9 +1179,15 @@ async fn sql_deduplicate() { "| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |", "| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", + "| logical_plan after simplify_expressions | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |", + "| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", "| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |", "| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", + "| logical_plan after simplify_expressions | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |", + "| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", "| physical_plan | SortExec: [time ASC,state ASC,city ASC] |", "| | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |", "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", @@ -1038,53 +1195,73 @@ async fn sql_deduplicate() { ]; let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o order by time, state, city"; run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); +} +#[tokio::test] +async fn sql_deduplicate_3() { // plan without order by let expected = vec![ - "+-----------------------------------------+--------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+--------------------------------------------------------------------+", - "| logical_plan | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", - "| | TableScan: h2o projection=None |", - "| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", - "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", - "| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", - "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", - "| physical_plan | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |", - "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", - "+-----------------------------------------+--------------------------------------------------------------------+", + "+-----------------------------------------+--------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+--------------------------------------------------------------------+", + "| logical_plan | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=None |", + "| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", + "| logical_plan after simplify_expressions | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", + "| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", + "| logical_plan after simplify_expressions | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", + "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", + "| physical_plan | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |", + "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", + "+-----------------------------------------+--------------------------------------------------------------------+", ]; let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o"; run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); +} +#[tokio::test] +async fn sql_deduplicate_4() { // Union plan let sql = "EXPLAIN VERBOSE select state as name from h2o UNION ALL select city as name from h2o"; let expected = vec![ - "+-----------------------------------------+---------------------------------------------------------------------+", - "| plan_type | plan |", - "+-----------------------------------------+---------------------------------------------------------------------+", - "| logical_plan | Union |", - "| | Projection: #state AS name |", - "| | TableScan: h2o projection=None |", - "| | Projection: #city AS name |", - "| | TableScan: h2o projection=None |", - "| logical_plan after projection_push_down | Union |", - "| | Projection: #state AS name |", - "| | TableScan: h2o projection=Some([4]) |", - "| | Projection: #city AS name |", - "| | TableScan: h2o projection=Some([1]) |", - "| logical_plan after projection_push_down | Union |", - "| | Projection: #state AS name |", - "| | TableScan: h2o projection=Some([4]) |", - "| | Projection: #city AS name |", - "| | TableScan: h2o projection=Some([1]) |", - "| physical_plan | ExecutionPlan(PlaceHolder) |", - "| | ProjectionExec: expr=[state as name] |", - "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", - "| | ProjectionExec: expr=[city as name] |", - "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", - "+-----------------------------------------+---------------------------------------------------------------------+", + "+-----------------------------------------+---------------------------------------------------------------------+", + "| plan_type | plan |", + "+-----------------------------------------+---------------------------------------------------------------------+", + "| logical_plan | Union |", + "| | Projection: #state AS name |", + "| | TableScan: h2o projection=None |", + "| | Projection: #city AS name |", + "| | TableScan: h2o projection=None |", + "| logical_plan after projection_push_down | Union |", + "| | Projection: #state AS name |", + "| | TableScan: h2o projection=Some([4]) |", + "| | Projection: #city AS name |", + "| | TableScan: h2o projection=Some([1]) |", + "| logical_plan after simplify_expressions | Union |", + "| | Projection: #state AS name |", + "| | TableScan: h2o projection=Some([4]) |", + "| | Projection: #city AS name |", + "| | TableScan: h2o projection=Some([1]) |", + "| logical_plan after projection_push_down | Union |", + "| | Projection: #state AS name |", + "| | TableScan: h2o projection=Some([4]) |", + "| | Projection: #city AS name |", + "| | TableScan: h2o projection=Some([1]) |", + "| logical_plan after simplify_expressions | Union |", + "| | Projection: #state AS name |", + "| | TableScan: h2o projection=Some([4]) |", + "| | Projection: #city AS name |", + "| | TableScan: h2o projection=Some([1]) |", + "| physical_plan | ExecutionPlan(PlaceHolder) |", + "| | ProjectionExec: expr=[state as name] |", + "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", + "| | ProjectionExec: expr=[city as name] |", + "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", + "+-----------------------------------------+---------------------------------------------------------------------+", ]; run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); } diff --git a/server/src/config.rs b/server/src/config.rs index b4dd63e91a..82d857223c 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -12,6 +12,7 @@ use query::exec::Executor; /// This module contains code for managing the configuration of the server. use crate::{ db::{catalog::Catalog, Db}, + write_buffer::KafkaBuffer, Error, JobRegistry, Result, }; use observability_deps::tracing::{self, error, info, warn, Instrument}; @@ -150,6 +151,15 @@ impl Config { return; } + // Right now, `KafkaBuffer` is the only production implementation of the `WriteBuffer` + // trait, so always use `KafkaBuffer` when there is a write buffer connection string + // specified. If/when there are other kinds of write buffers, additional configuration will + // be needed to determine what kind of write buffer to use here. + let write_buffer = rules + .write_buffer_connection_string + .as_ref() + .map(|conn| Arc::new(KafkaBuffer::new(conn)) as _); + let db = Arc::new(Db::new( rules, server_id, @@ -157,6 +167,7 @@ impl Config { exec, Arc::clone(&self.jobs), preserved_catalog, + write_buffer, )); let shutdown = self.shutdown.child_token(); diff --git a/server/src/db.rs b/server/src/db.rs index 9aeac4128e..068296bf67 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -3,7 +3,7 @@ use self::access::QueryCatalogAccess; use self::catalog::TableNameFilter; -use super::JobRegistry; +use super::{write_buffer::WriteBuffer, JobRegistry}; use arrow::datatypes::SchemaRef as ArrowSchemaRef; use async_trait::async_trait; use catalog::{chunk::Chunk as CatalogChunk, Catalog}; @@ -166,6 +166,11 @@ pub enum Error { #[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))] UnknownMutableBufferChunk { chunk_id: u32 }, + #[snafu(display("Error sending entry to write buffer"))] + WriteBufferError { + source: Box, + }, + #[snafu(display("Cannot write to this database: no mutable buffer configured"))] DatabaseNotWriteable {}, @@ -301,6 +306,9 @@ pub struct Db { /// Metric labels metric_labels: Vec, + + /// Optionally buffer writes + write_buffer: Option>, } /// Load preserved catalog state from store. @@ -393,6 +401,7 @@ impl Db { exec: Arc, jobs: Arc, preserved_catalog: PreservedCatalog, + write_buffer: Option>, ) -> Self { let db_name = rules.name.clone(); @@ -426,6 +435,7 @@ impl Db { worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0), metric_labels, + write_buffer, } } @@ -978,23 +988,44 @@ impl Db { rules.lifecycle_rules.immutable }; - // If the database is immutable, we don't even need to build a `SequencedEntry`. - // There will be additional cases when we add the write buffer as the `SequencedEntry` - // will potentially need to be constructed from other values, like the Kafka partition - // and offset, instead of the process clock. - if immutable { - DatabaseNotWriteable {}.fail() - } else { - let sequenced_entry = Arc::new( - SequencedEntry::new_from_process_clock( - self.process_clock.next(), - self.server_id, - entry.data(), - ) - .context(SequencedEntryError)?, - ); + match (self.write_buffer.as_ref(), immutable) { + (Some(write_buffer), true) => { + // If only the write buffer is configured, this is passing the data through to + // the write buffer, and it's not an error. We ignore the returned metadata; it + // will get picked up when data is read from the write buffer. + let _ = write_buffer.store_entry(&entry).context(WriteBufferError)?; + Ok(()) + } + (Some(write_buffer), false) => { + // If using both write buffer and mutable buffer, we want to wait for the write + // buffer to return success before adding the entry to the mutable buffer. + let sequence = write_buffer.store_entry(&entry).context(WriteBufferError)?; + let sequenced_entry = Arc::new( + SequencedEntry::new_from_sequence(sequence, entry) + .context(SequencedEntryError)?, + ); - self.store_sequenced_entry(sequenced_entry) + self.store_sequenced_entry(sequenced_entry) + } + (None, true) => { + // If no write buffer is configured and the database is immutable, trying to + // store an entry is an error and we don't need to build a `SequencedEntry`. + DatabaseNotWriteable {}.fail() + } + (None, false) => { + // If no write buffer is configured, use the process clock and send to the mutable + // buffer. + let sequenced_entry = Arc::new( + SequencedEntry::new_from_process_clock( + self.process_clock.next(), + self.server_id, + entry, + ) + .context(SequencedEntryError)?, + ); + + self.store_sequenced_entry(sequenced_entry) + } } } @@ -1010,7 +1041,7 @@ impl Db { // We may have gotten here through `store_entry`, in which case this is checking the // configuration again unnecessarily, but we may have come here by consuming records from - // Kafka, so this check is necessary in that case. + // the write buffer, so this check is necessary in that case. if immutable { return DatabaseNotWriteable {}.fail(); } @@ -1302,8 +1333,11 @@ mod tests { test_helpers::{try_write_lp, write_lp}, *, }; - use crate::db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}; - use crate::utils::{make_db, TestDb}; + use crate::{ + db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, + utils::{make_db, TestDb}, + write_buffer::test_helpers::MockBuffer, + }; use ::test_helpers::assert_contains; use arrow::record_batch::RecordBatch; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; @@ -1352,8 +1386,57 @@ mod tests { ); } + #[tokio::test] + async fn write_with_write_buffer_no_mutable_buffer() { + // Writes should be forwarded to the write buffer and *not* rejected if the write buffer is + // configured and the mutable buffer isn't + let write_buffer = Arc::new(MockBuffer::default()); + let test_db = TestDb::builder() + .write_buffer(Arc::clone(&write_buffer) as _) + .build() + .await + .db; + + test_db.rules.write().lifecycle_rules.immutable = true; + + let entry = lp_to_entry("cpu bar=1 10"); + test_db.store_entry(entry).unwrap(); + + assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn write_buffer_and_mutable_buffer() { + // Writes should be forwarded to the write buffer *and* the mutable buffer if both are + // configured. + let write_buffer = Arc::new(MockBuffer::default()); + let test_db = TestDb::builder() + .write_buffer(Arc::clone(&write_buffer) as _) + .build() + .await + .db; + + let entry = lp_to_entry("cpu bar=1 10"); + test_db.store_entry(entry).unwrap(); + + assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); + + let db = Arc::new(test_db); + let batches = run_query(db, "select * from cpu").await; + + let expected = vec![ + "+-----+-------------------------------+", + "| bar | time |", + "+-----+-------------------------------+", + "| 1 | 1970-01-01 00:00:00.000000010 |", + "+-----+-------------------------------+", + ]; + assert_batches_eq!(expected, &batches); + } + #[tokio::test] async fn read_write() { + // This test also exercises the path without a write buffer. let db = Arc::new(make_db().await.db); write_lp(&db, "cpu bar=1 10"); diff --git a/server/src/lib.rs b/server/src/lib.rs index 16e9b0d06b..7ff1ad6c9d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -108,6 +108,7 @@ use std::collections::HashMap; mod config; pub mod db; +mod write_buffer; /// Utility modules used by benchmarks and tests pub mod utils; @@ -1166,6 +1167,7 @@ mod tests { lifecycle_rules: Default::default(), routing_rules: None, worker_cleanup_avg_sleep: Duration::from_secs(2), + write_buffer_connection_string: None, }; // Create a database @@ -1262,6 +1264,7 @@ mod tests { lifecycle_rules: Default::default(), routing_rules: None, worker_cleanup_avg_sleep: Duration::from_secs(2), + write_buffer_connection_string: None, }; // Create a database diff --git a/server/src/utils.rs b/server/src/utils.rs index cc9b31ef2e..0107c8bd8d 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -9,6 +9,7 @@ use query::{exec::Executor, Database}; use crate::{ db::{load_or_create_preserved_catalog, Db}, + write_buffer::WriteBuffer, JobRegistry, }; use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration}; @@ -33,6 +34,7 @@ pub struct TestDbBuilder { object_store: Option>, db_name: Option>, worker_cleanup_avg_sleep: Option, + write_buffer: Option>, } impl TestDbBuilder { @@ -79,6 +81,7 @@ impl TestDbBuilder { exec, Arc::new(JobRegistry::new()), preserved_catalog, + self.write_buffer, ), } } @@ -102,6 +105,11 @@ impl TestDbBuilder { self.worker_cleanup_avg_sleep = Some(d); self } + + pub fn write_buffer(mut self, write_buffer: Arc) -> Self { + self.write_buffer = Some(write_buffer); + self + } } /// Used for testing: create a Database with a local store diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs new file mode 100644 index 0000000000..960d27acbe --- /dev/null +++ b/server/src/write_buffer.rs @@ -0,0 +1,61 @@ +use entry::{Entry, Sequence}; + +/// A Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading entries +/// from the Write Buffer at a later time. +pub trait WriteBuffer: Sync + Send + std::fmt::Debug + 'static { + /// Send an `Entry` to the write buffer and return information that can be used to restore + /// entries at a later time. + fn store_entry( + &self, + entry: &Entry, + ) -> Result>; + + // TODO: interface for restoring, will look something like: + // fn restore_from(&self, sequence: &Sequence) -> Result, Err>; +} + +#[derive(Debug)] +pub struct KafkaBuffer { + conn: String, +} + +impl WriteBuffer for KafkaBuffer { + fn store_entry( + &self, + _entry: &Entry, + ) -> Result> { + unimplemented!() + } +} + +impl KafkaBuffer { + pub fn new(conn: impl Into) -> Self { + Self { conn: conn.into() } + } +} + +pub mod test_helpers { + use super::*; + use std::sync::{Arc, Mutex}; + + #[derive(Debug, Default)] + pub struct MockBuffer { + pub entries: Arc>>, + } + + impl WriteBuffer for MockBuffer { + fn store_entry( + &self, + entry: &Entry, + ) -> Result> { + let mut entries = self.entries.lock().unwrap(); + let offset = entries.len() as u64; + entries.push(entry.clone()); + + Ok(Sequence { + id: 0, + number: offset, + }) + } + } +} diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 065ed94a28..f0b8ab31e2 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -218,6 +218,7 @@ async fn test_create_get_update_database() { seconds: 2, nanos: 0, }), + write_buffer_connection_string: "".into(), }; client