Merge branch 'main' into ntran/dedup_within_chunk

pull/24376/head
Nga Tran 2021-06-08 13:18:40 -04:00
commit edbf1b7d5e
22 changed files with 1462 additions and 331 deletions

33
Cargo.lock generated
View File

@ -20,9 +20,9 @@ checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5"
[[package]] [[package]]
name = "addr2line" name = "addr2line"
version = "0.15.1" version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03345e98af8f3d786b6d9f656ccfa6ac316d954e92bc4841f0bba20789d5fb5a" checksum = "e7a2e47a1fbe209ee101dd6d61285226744c6c8d3c21c8dc878ba6cb9f467f3a"
dependencies = [ dependencies = [
"gimli", "gimli",
] ]
@ -190,9 +190,9 @@ dependencies = [
[[package]] [[package]]
name = "assert_cmd" name = "assert_cmd"
version = "1.0.4" version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f57fec1ac7e4de72dcc69811795f1a7172ed06012f80a5d1ee651b62484f588" checksum = "a88b6bd5df287567ffdf4ddf4d33060048e1068308e5f62d81c6f9824a045a48"
dependencies = [ dependencies = [
"bstr", "bstr",
"doc-comment", "doc-comment",
@ -323,9 +323,9 @@ dependencies = [
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.59" version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4717cfcbfaa661a0fd48f8453951837ae7e8f81e481fbb136e3202d72805a744" checksum = "b7815ea54e4d821e791162e078acbebfd6d8c8939cd559c9335dceb1c8ca7282"
dependencies = [ dependencies = [
"addr2line", "addr2line",
"cc", "cc",
@ -904,7 +904,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "4.0.0-SNAPSHOT" version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=16011120a1b73798049c5be49f9548b00f8a0a00#16011120a1b73798049c5be49f9548b00f8a0a00" source = "git+https://github.com/apache/arrow-datafusion.git?rev=964f49449ae7b338999fec133ae0174f01a931ae#964f49449ae7b338999fec133ae0174f01a931ae"
dependencies = [ dependencies = [
"ahash 0.7.4", "ahash 0.7.4",
"arrow", "arrow",
@ -1449,9 +1449,9 @@ dependencies = [
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.2" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
@ -2324,9 +2324,12 @@ dependencies = [
[[package]] [[package]]
name = "object" name = "object"
version = "0.24.0" version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a5b3dd1c072ee7963717671d1ca129f1048fda25edea6b752bfc71ac8854170" checksum = "f8bc1d42047cf336f0f939c99e97183cf31551bf0f2865a2ec9c8d91fd4ffb5e"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "object_store" name = "object_store"
@ -4569,9 +4572,9 @@ dependencies = [
[[package]] [[package]]
name = "unicode-normalization" name = "unicode-normalization"
version = "0.1.18" version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33717dca7ac877f497014e10d73f3acf948c342bee31b5ca7892faf94ccc6b49" checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
dependencies = [ dependencies = [
"tinyvec", "tinyvec",
] ]
@ -4859,9 +4862,9 @@ checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
[[package]] [[package]]
name = "zstd" name = "zstd"
version = "0.8.1+zstd.1.5.0" version = "0.8.3+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357d6bb1bd9c6f6a55a5a15c74d01260b272f724dc60cc829b86ebd2172ac5ef" checksum = "5ea7094c7b4a58fbd738eb0d4a2fc7684a0e6949a31597e074ffe20a07cbc2bf"
dependencies = [ dependencies = [
"zstd-safe", "zstd-safe",
] ]

View File

@ -50,6 +50,9 @@ pub struct DatabaseRules {
/// Duration for which the cleanup loop should sleep on average. /// Duration for which the cleanup loop should sleep on average.
/// Defaults to 500 seconds. /// Defaults to 500 seconds.
pub worker_cleanup_avg_sleep: Duration, pub worker_cleanup_avg_sleep: Duration,
/// An optional connection string to a write buffer.
pub write_buffer_connection_string: Option<String>,
} }
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone)]
@ -80,6 +83,7 @@ impl DatabaseRules {
lifecycle_rules: Default::default(), lifecycle_rules: Default::default(),
routing_rules: None, routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(500), worker_cleanup_avg_sleep: Duration::from_secs(500),
write_buffer_connection_string: None,
} }
} }

View File

@ -1,7 +1,7 @@
//! This module contains structs that describe the metadata for a partition //! This module contains structs that describe the metadata for a partition
//! including schema, summary statistics, and file locations in storage. //! including schema, summary statistics, and file locations in storage.
use std::{borrow::Cow, mem}; use std::{borrow::Cow, cmp::Ordering, mem};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::Borrow; use std::borrow::Borrow;
@ -79,8 +79,9 @@ pub struct UnaggregatedTableSummary {
pub table: TableSummary, pub table: TableSummary,
} }
/// Metadata and statistics information for a table, aggregated across /// Metadata and statistics information for a table. This can be
/// chunks. /// either for the portion of a Table stored within a single chunk or
/// aggregated across chunks.
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
pub struct TableSummary { pub struct TableSummary {
/// Table name /// Table name
@ -141,6 +142,37 @@ impl TableSummary {
pub fn column(&self, name: &str) -> Option<&ColumnSummary> { pub fn column(&self, name: &str) -> Option<&ColumnSummary> {
self.columns.iter().find(|c| c.name == name) self.columns.iter().find(|c| c.name == name)
} }
/// Return the columns used for the "primary key" in this table.
///
/// Currently this relies on the InfluxDB data model annotations
/// for what columns to include in the key columns
pub fn primary_key_columns(&self) -> Vec<&ColumnSummary> {
use InfluxDbType::*;
let mut key_summaries: Vec<&ColumnSummary> = self
.columns
.iter()
.filter(|s| match s.influxdb_type {
Some(Tag) => true,
Some(Field) => false,
Some(Timestamp) => true,
None => false,
})
.collect();
// Now, sort lexographically (but put timestamp last)
key_summaries.sort_by(
|a, b| match (a.influxdb_type.as_ref(), b.influxdb_type.as_ref()) {
(Some(Tag), Some(Tag)) => a.name.cmp(&b.name),
(Some(Timestamp), Some(Tag)) => Ordering::Greater,
(Some(Tag), Some(Timestamp)) => Ordering::Less,
(Some(Timestamp), Some(Timestamp)) => panic!("multiple timestamps in summary"),
_ => panic!("Unexpected types in key summary"),
},
);
key_summaries
}
} }
// Replicate this enum here as it can't be derived from the existing statistics // Replicate this enum here as it can't be derived from the existing statistics
@ -273,6 +305,17 @@ impl Statistics {
} }
} }
/// Returns true if both the min and max values are None (aka not known)
pub fn is_none(&self) -> bool {
match self {
Self::I64(v) => v.is_none(),
Self::U64(v) => v.is_none(),
Self::F64(v) => v.is_none(),
Self::Bool(v) => v.is_none(),
Self::String(v) => v.is_none(),
}
}
/// Return the minimum value, if any, formatted as a string /// Return the minimum value, if any, formatted as a string
pub fn min_as_str(&self) -> Option<Cow<'_, str>> { pub fn min_as_str(&self) -> Option<Cow<'_, str>> {
match self { match self {
@ -397,6 +440,11 @@ where
} }
}; };
} }
/// Returns true if both the min and max values are None (aka not known)
pub fn is_none(&self) -> bool {
self.min.is_none() && self.max.is_none()
}
} }
impl<T> StatValues<T> { impl<T> StatValues<T> {
@ -442,6 +490,40 @@ impl StatValues<String> {
} }
} }
/// Represents the result of comparing the min/max ranges of two [`StatValues`]
#[derive(Debug, PartialEq)]
pub enum StatOverlap {
/// There is at least one value that exists in both ranges
NonZero,
/// There are zero values that exists in both ranges
Zero,
/// It is not known if there are any intersections (e.g. because
/// one of the bounds is not Known / is None)
Unknown,
}
impl<T> StatValues<T>
where
T: PartialOrd,
{
/// returns information about the overlap between two `StatValues`
pub fn overlaps(&self, other: &Self) -> StatOverlap {
match (&self.min, &self.max, &other.min, &other.max) {
(Some(self_min), Some(self_max), Some(other_min), Some(other_max)) => {
if self_min <= other_max && self_max >= other_min {
StatOverlap::NonZero
} else {
StatOverlap::Zero
}
}
// At least one of the values was None
_ => StatOverlap::Unknown,
}
}
}
pub trait IsNan { pub trait IsNan {
fn is_nan(&self) -> bool; fn is_nan(&self) -> bool;
} }
@ -535,6 +617,116 @@ mod tests {
assert_eq!(stat.count, 2); assert_eq!(stat.count, 2);
} }
#[test]
fn statistics_is_none() {
let mut stat = StatValues::default();
assert!(stat.is_none());
stat.min = Some(0);
assert!(!stat.is_none());
stat.max = Some(1);
assert!(!stat.is_none());
}
#[test]
fn statistics_overlaps() {
let stat1 = StatValues {
min: Some(10),
max: Some(20),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat1), StatOverlap::NonZero);
// [--stat1--]
// [--stat2--]
let stat2 = StatValues {
min: Some(5),
max: Some(15),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat2), StatOverlap::NonZero);
assert_eq!(stat2.overlaps(&stat1), StatOverlap::NonZero);
// [--stat1--]
// [--stat3--]
let stat3 = StatValues {
min: Some(15),
max: Some(25),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat3), StatOverlap::NonZero);
assert_eq!(stat3.overlaps(&stat1), StatOverlap::NonZero);
// [--stat1--]
// [--stat4--]
let stat4 = StatValues {
min: Some(25),
max: Some(35),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat4), StatOverlap::Zero);
assert_eq!(stat4.overlaps(&stat1), StatOverlap::Zero);
// [--stat1--]
// [--stat5--]
let stat5 = StatValues {
min: Some(0),
max: Some(5),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat5), StatOverlap::Zero);
assert_eq!(stat5.overlaps(&stat1), StatOverlap::Zero);
}
#[test]
fn statistics_overlaps_none() {
let stat1 = StatValues {
min: Some(10),
max: Some(20),
..Default::default()
};
let stat2 = StatValues {
min: None,
max: Some(20),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat2), StatOverlap::Unknown);
assert_eq!(stat2.overlaps(&stat1), StatOverlap::Unknown);
let stat3 = StatValues {
min: Some(10),
max: None,
..Default::default()
};
assert_eq!(stat1.overlaps(&stat3), StatOverlap::Unknown);
assert_eq!(stat3.overlaps(&stat1), StatOverlap::Unknown);
let stat4 = StatValues {
min: None,
max: None,
..Default::default()
};
assert_eq!(stat1.overlaps(&stat4), StatOverlap::Unknown);
assert_eq!(stat4.overlaps(&stat1), StatOverlap::Unknown);
}
#[test]
fn statistics_overlaps_mixed_none() {
let stat1 = StatValues {
min: Some(10),
max: None,
..Default::default()
};
let stat2 = StatValues {
min: None,
max: Some(5),
..Default::default()
};
assert_eq!(stat1.overlaps(&stat2), StatOverlap::Unknown);
assert_eq!(stat2.overlaps(&stat1), StatOverlap::Unknown);
}
#[test] #[test]
fn update_string() { fn update_string() {
let mut stat = StatValues::new_with_value("bbb".to_string()); let mut stat = StatValues::new_with_value("bbb".to_string());
@ -558,6 +750,18 @@ mod tests {
assert_eq!(stat.count, 4); assert_eq!(stat.count, 4);
} }
#[test]
fn stats_is_none() {
let stat = Statistics::I64(StatValues::new(Some(-1), Some(100), 1));
assert!(!stat.is_none());
let stat = Statistics::I64(StatValues::new(None, Some(100), 1));
assert!(!stat.is_none());
let stat = Statistics::I64(StatValues::new(None, None, 0));
assert!(stat.is_none());
}
#[test] #[test]
fn stats_as_str_i64() { fn stats_as_str_i64() {
let stat = Statistics::I64(StatValues::new(Some(-1), Some(100), 1)); let stat = Statistics::I64(StatValues::new(Some(-1), Some(100), 1));

View File

@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug # Rename to workaround doctest bug
# Turn off optional datafusion features (function packages) # Turn off optional datafusion features (function packages)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "16011120a1b73798049c5be49f9548b00f8a0a00", default-features = false, package = "datafusion" } upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "964f49449ae7b338999fec133ae0174f01a931ae", default-features = false, package = "datafusion" }

View File

@ -1,7 +1,7 @@
namespace influxdata.iox.write.v1; namespace influxdata.iox.write.v1;
// Every modification to a database is represented as an entry. These can be forwarded // Every modification to a database is represented as an entry. These can be forwarded
// on to other IOx servers or to Kafka. // on to other IOx servers or to the write buffer.
// An entry can only be one of these Operation types // An entry can only be one of these Operation types
union Operation { union Operation {

View File

@ -290,7 +290,7 @@ pub struct ShardedEntry {
/// Wrapper type for the flatbuffer Entry struct. Has convenience methods for /// Wrapper type for the flatbuffer Entry struct. Has convenience methods for
/// iterating through the partitioned writes. /// iterating through the partitioned writes.
#[self_referencing] #[self_referencing]
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Clone)]
pub struct Entry { pub struct Entry {
data: Vec<u8>, data: Vec<u8>,
#[borrows(data)] #[borrows(data)]
@ -1201,13 +1201,9 @@ pub enum SequencedEntryError {
}, },
} }
#[self_referencing]
#[derive(Debug)] #[derive(Debug)]
pub struct SequencedEntry { pub struct SequencedEntry {
data: Vec<u8>, entry: Entry,
#[borrows(data)]
#[covariant]
entry: entry_fb::Entry<'this>,
sequence: Sequence, sequence: Sequence,
} }
@ -1221,33 +1217,30 @@ impl SequencedEntry {
pub fn new_from_process_clock( pub fn new_from_process_clock(
process_clock: ClockValue, process_clock: ClockValue,
server_id: ServerId, server_id: ServerId,
data: &[u8], entry: Entry,
) -> Result<Self, SequencedEntryError> { ) -> Result<Self, SequencedEntryError> {
SequencedEntryTryBuilder { Ok(Self {
data: data.to_vec(), entry,
entry_builder: |data| {
flatbuffers::root::<entry_fb::Entry<'_>>(data).context(InvalidFlatbuffer)
},
sequence: Sequence { sequence: Sequence {
id: server_id.get_u32(), id: server_id.get_u32(),
number: process_clock.get_u64(), number: process_clock.get_u64(),
}, },
} })
.try_build() }
pub fn new_from_sequence(
sequence: Sequence,
entry: Entry,
) -> Result<Self, SequencedEntryError> {
Ok(Self { entry, sequence })
} }
pub fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> { pub fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> {
match self.borrow_entry().operation_as_write().as_ref() { self.entry.partition_writes()
Some(w) => w
.partition_writes()
.as_ref()
.map(|w| w.iter().map(|fb| PartitionWrite { fb }).collect::<Vec<_>>()),
None => None,
}
} }
pub fn sequence(&self) -> &Sequence { pub fn sequence(&self) -> &Sequence {
self.borrow_sequence() &self.sequence
} }
} }

View File

@ -133,6 +133,9 @@ message DatabaseRules {
// Duration for which the cleanup loop should sleep on average. // Duration for which the cleanup loop should sleep on average.
// Defaults to 500 seconds. // Defaults to 500 seconds.
google.protobuf.Duration worker_cleanup_avg_sleep = 10; google.protobuf.Duration worker_cleanup_avg_sleep = 10;
// Optionally, the address of the write buffer
string write_buffer_connection_string = 11;
} }
message RoutingConfig { message RoutingConfig {

View File

@ -8,7 +8,7 @@ use data_types::database_rules::{
}; };
use data_types::DatabaseName; use data_types::DatabaseName;
use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt}; use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt, FromFieldString};
use crate::influxdata::iox::management::v1 as management; use crate::influxdata::iox::management::v1 as management;
mod lifecycle; mod lifecycle;
@ -23,6 +23,9 @@ impl From<DatabaseRules> for management::DatabaseRules {
lifecycle_rules: Some(rules.lifecycle_rules.into()), lifecycle_rules: Some(rules.lifecycle_rules.into()),
routing_rules: rules.routing_rules.map(Into::into), routing_rules: rules.routing_rules.map(Into::into),
worker_cleanup_avg_sleep: Some(rules.worker_cleanup_avg_sleep.into()), worker_cleanup_avg_sleep: Some(rules.worker_cleanup_avg_sleep.into()),
write_buffer_connection_string: rules
.write_buffer_connection_string
.unwrap_or_default(),
} }
} }
} }
@ -53,12 +56,15 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
None => Duration::from_secs(500), None => Duration::from_secs(500),
}; };
let write_buffer_connection_string = proto.write_buffer_connection_string.optional();
Ok(Self { Ok(Self {
name, name,
partition_template, partition_template,
lifecycle_rules, lifecycle_rules,
routing_rules, routing_rules,
worker_cleanup_avg_sleep, worker_cleanup_avg_sleep,
write_buffer_connection_string,
}) })
} }
} }

View File

@ -23,7 +23,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Builder for a Schema /// Builder for a Schema
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct SchemaBuilder { pub struct SchemaBuilder {
/// Optional measurement name /// Optional measurement name
measurement: Option<String>, measurement: Option<String>,
@ -121,9 +121,10 @@ impl SchemaBuilder {
self self
} }
/// Creates an Arrow schema with embedded metadata, consuming self. All /// Creates an Arrow schema with embedded metadata, resetting the
/// schema validation happens at this time. /// builder back to `default`. All schema validation happens at
/// this time.
///
/// ``` /// ```
/// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; /// use internal_types::schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
/// ///

View File

@ -422,29 +422,13 @@ impl Storage {
let (tx, rx) = tokio::sync::mpsc::channel(2); let (tx, rx) = tokio::sync::mpsc::channel(2);
// Do an an async dance here to make sure any error returned // Run async dance here to make sure any error returned
// `download_and_scan_parquet` is sent back to the reader and // `download_and_scan_parquet` is sent back to the reader and
// not silently ignored // not silently ignored
tokio::task::spawn(async move { tokio::task::spawn(async move {
// Channels to/from the parquet reader let download_result =
let (parquet_tx, mut parquet_rx) = tokio::sync::mpsc::channel(2); Self::download_and_scan_parquet(predicate, projection, path, store, tx.clone())
.await;
let download_future =
Self::download_and_scan_parquet(predicate, projection, path, store, parquet_tx);
// task whose only job in life is to shuffle messages from
// the parquet reader to the final output receiver
let captured_tx = tx.clone();
tokio::task::spawn(async move {
while let Some(msg) = parquet_rx.recv().await {
if let Err(e) = captured_tx.send(msg).await {
debug!(%e, "Receiver hung up on parquet writer");
}
}
});
// in this task, wait for the future that is doing the actual work on this task
let download_result = download_future.await;
// If there was an error returned from download_and_scan_parquet send it back to the receiver. // If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result { if let Err(e) = download_result {

583
query/src/duplicate.rs Normal file
View File

@ -0,0 +1,583 @@
//! Contains the algorithm to determine which chunks may contain
//! "duplicate" primary keys (that is where data with the same
//! combination of "tag" columns and timestamp in the InfluxDB
//! DataModel have been written in via multiple distinct line protocol
//! writes (and thus are stored in separate rows)
use crate::pruning::Prunable;
use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics};
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Mismatched type when comparing statistics for column '{}'",
column_name
))]
MismatchedStatsTypes { column_name: String },
#[snafu(display(
"Internal error. Partial statistics found for column '{}' looking for duplicates. s1: '{:?}' s2: '{:?}'",
column_name, s1, s2
))]
InternalPartialStatistics {
column_name: String,
s1: Statistics,
s2: Statistics,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Groups [`Prunable`] objects into disjoint sets using values of
/// min/max statistics. The groups are formed such that each group
/// *may* contain InfluxDB data model primary key duplicates with
/// others in that set.
///
/// The *may* overlap calculation is conservative -- that is it may
/// flag two chunks as having overlapping data when in reality they do
/// not. If chunks are split into different groups, then they are
/// guaranteed not to contain any rows with the same primary key.
///
/// Note 1: since this algorithm is based on statistics, it may have
/// false positives (flag that two objects may have overlap when in
/// reality they do not)
///
/// Note 2: this algorithm is O(n^2) worst case (when no chunks have
/// any overlap)
pub fn group_potential_duplicates<C>(chunks: Vec<C>) -> Result<Vec<Vec<C>>>
where
C: Prunable,
{
let mut groups: Vec<Vec<KeyStats<'_, _>>> = vec![];
// Step 1: find the up groups using references to `chunks` stored
// in KeyStats views
for (idx, chunk) in chunks.iter().enumerate() {
// try to find a place to put this chunk
let mut key_stats = Some(KeyStats::new(idx, chunk));
'outer: for group in &mut groups {
// If this chunk overlaps any existing chunk in group add
// it to group
for ks in group.iter() {
if ks.potential_overlap(key_stats.as_ref().unwrap())? {
group.push(key_stats.take().unwrap());
break 'outer;
}
}
}
if let Some(key_stats) = key_stats {
// couldn't place key_stats in any existing group, needs a
// new group
groups.push(vec![key_stats])
}
}
// Now some shenanigans to rearrange the actual input chunks into
// the final resulting groups corresponding to the groups of
// KeyStats
// drop all references to chunks, and only keep indicides
let groups: Vec<Vec<usize>> = groups
.into_iter()
.map(|group| group.into_iter().map(|key_stats| key_stats.index).collect())
.collect();
let mut chunks: Vec<Option<C>> = chunks.into_iter().map(Some).collect();
let groups = groups
.into_iter()
.map(|group| {
group
.into_iter()
.map(|index| {
chunks[index]
.take()
.expect("Internal mismatch while gathering into groups")
})
.collect::<Vec<C>>()
})
.collect::<Vec<Vec<C>>>();
Ok(groups)
}
/// Holds a view to a chunk along with information about its columns
/// in an easy to compare form
#[derive(Debug)]
struct KeyStats<'a, C>
where
C: Prunable,
{
/// The index of the chunk
index: usize,
/// The underlying chunk
chunk: &'a C,
/// the ColumnSummaries for the chunk's 'primary_key' columns, in
/// "lexographical" order (aka sorted by name)
key_summaries: Vec<&'a ColumnSummary>,
}
impl<'a, C> KeyStats<'a, C>
where
C: Prunable,
{
/// Create a new view for the specified chunk at index `index`,
/// computing the columns to be used in the primary key comparison
pub fn new(index: usize, chunk: &'a C) -> Self {
let key_summaries = chunk.summary().primary_key_columns();
Self {
index,
chunk,
key_summaries,
}
}
/// Returns true if the chunk has a potential primary key overlap with the other chunk
fn potential_overlap(&self, other: &Self) -> Result<bool> {
// in order to have overlap, *all* the columns in the sort order
// need to be the same. Note gaps in the sort order mean they
// are for different parts of the keyspace
if self.key_summaries.len() != other.key_summaries.len() {
// Short circuit on different lengths
return Ok(false);
}
let iter = self.key_summaries.iter().zip(other.key_summaries.iter());
for (s1, s2) in iter {
if s1.name != s2.name || !Self::columns_might_overlap(s1, s2)? {
return Ok(false);
}
}
Ok(true)
}
/// Returns true if the two columns MAY overlap other, based on
/// statistics
pub fn columns_might_overlap(s1: &ColumnSummary, s2: &ColumnSummary) -> Result<bool> {
use Statistics::*;
let overlap = match (&s1.stats, &s2.stats) {
(I64(s1), I64(s2)) => s1.overlaps(s2),
(U64(s1), U64(s2)) => s1.overlaps(s2),
(F64(s1), F64(s2)) => s1.overlaps(s2),
(Bool(s1), Bool(s2)) => s1.overlaps(s2),
(String(s1), String(s2)) => s1.overlaps(s2),
_ => {
return MismatchedStatsTypes {
column_name: s1.name.clone(),
}
.fail()
}
};
// If either column has no min/max, treat the column as
// being entirely null
let is_none = s1.stats.is_none() || s2.stats.is_none();
match overlap {
StatOverlap::NonZero => Ok(true),
StatOverlap::Zero => Ok(false),
StatOverlap::Unknown if is_none => Ok(false), // no stats means no values
// This case means there some stats, but not all.
// Unclear how this could happen, so throw an error for now
StatOverlap::Unknown => InternalPartialStatistics {
column_name: s1.name.clone(),
s1: s1.stats.clone(),
s2: s2.stats.clone(),
}
.fail(),
}
}
}
#[cfg(test)]
mod test {
use arrow::datatypes::SchemaRef;
use data_types::partition_metadata::{
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary,
};
use internal_types::schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
use super::*;
#[macro_export]
macro_rules! assert_groups_eq {
($EXPECTED_LINES: expr, $GROUPS: expr) => {
let expected_lines: Vec<String> =
$EXPECTED_LINES.into_iter().map(|s| s.to_string()).collect();
let actual_lines = to_string($GROUPS);
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
// Test cases:
#[test]
fn one_column_no_overlap() {
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("mumbai"));
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("new york"), Some("zoo york"));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn one_column_overlap() {
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("new york"));
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("denver"), Some("zoo york"));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn multi_columns() {
let c1 = TestChunk::new("chunk1").with_timestamp(0, 1000).with_tag(
"tag1",
Some("boston"),
Some("new york"),
);
// Overlaps in tag1, but not in time
let c2 = TestChunk::new("chunk2")
.with_tag("tag1", Some("denver"), Some("zoo york"))
.with_timestamp(2000, 3000);
// Overlaps in time, but not in tag1
let c3 = TestChunk::new("chunk3")
.with_tag("tag1", Some("zzx"), Some("zzy"))
.with_timestamp(500, 1500);
// Overlaps in time, and in tag1
let c4 = TestChunk::new("chunk4")
.with_tag("tag1", Some("aaa"), Some("zzz"))
.with_timestamp(500, 1500);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
let expected = vec![
"Group 0: [chunk1, chunk4]",
"Group 1: [chunk2]",
"Group 2: [chunk3]",
];
assert_groups_eq!(expected, groups);
}
#[test]
fn boundary() {
// check that overlap calculations include the bound
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb"));
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("bbb"), Some("ccc"));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn same() {
// check that if chunks overlap exactly on the boundaries they are still grouped
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb"));
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("aaa"), Some("bbb"));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn different_tag_names() {
// check that if chunks overlap but in different tag names
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb"));
let c2 = TestChunk::new("chunk2").with_tag("tag2", Some("aaa"), Some("bbb"));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn three_column() {
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("xxx"), Some("yyy"))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("xxx"), Some("yyy"))
// Timestamp doesn't overlap, but the two tags do
.with_timestamp(2001, 3000);
let c3 = TestChunk::new("chunk3")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("aaa"), Some("zzz"))
// all three overlap
.with_timestamp(1000, 2000);
let groups = group_potential_duplicates(vec![c1, c2, c3]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1, chunk3]", "Group 1: [chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn tag_order() {
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("xxx"), Some("yyy"))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
.with_tag("tag2", Some("aaa"), Some("zzz"))
.with_tag("tag1", Some("aaa"), Some("bbb"))
// all three overlap, but tags in different order
.with_timestamp(500, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn tag_order_no_tags() {
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("xxx"), Some("yyy"))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, but no tag2 (aka it is all null)
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_timestamp(500, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn tag_order_null_stats() {
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("xxx"), Some("yyy"))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, tag2 has no stats (null)
// so we say they can't overlap
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", None, None)
.with_timestamp(500, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn tag_order_partial_stats() {
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
// tag1 has a min but not a max. Should result in error
.with_tag("tag1", Some("aaa"), None)
.with_timestamp(500, 1000);
let result = group_potential_duplicates(vec![c1, c2]).unwrap_err();
let result = result.to_string();
let expected =
"Internal error. Partial statistics found for column 'tag1' looking for duplicates";
assert!(
result.contains(expected),
"can not find {} in {}",
expected,
result
);
}
#[test]
fn tag_fields_not_counted() {
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_int_field("field", Some(0), Some(2))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, but field value does not
// should still overlap
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_int_field("field", Some(100), Some(200))
.with_timestamp(500, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn mismatched_types() {
// Test if same column has different types in different
// chunks; this will likely cause errors elsewhere in practice
// as the schemas are incompatible (and can't be merged)
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
// tag1 column is actually a field is different in chunk
// 2, so even though the timestamps overlap these chunks
// don't have duplicates
.with_int_field("tag1", Some(100), Some(200))
.with_timestamp(0, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
assert_groups_eq!(expected, groups);
}
// --- Test infrastructure --
fn to_string(groups: Vec<Vec<TestChunk>>) -> Vec<String> {
let mut s = vec![];
for (idx, group) in groups.iter().enumerate() {
let names = group.iter().map(|c| c.name.as_str()).collect::<Vec<_>>();
s.push(format!("Group {}: [{}]", idx, names.join(", ")));
}
s
}
/// Mocked out prunable provider to use testing overlaps
#[derive(Debug)]
struct TestChunk {
// The name of this chunk
name: String,
summary: TableSummary,
builder: SchemaBuilder,
}
/// Implementation of creating a new column with statitics for TestPrunable
macro_rules! make_stats {
($MIN:expr, $MAX:expr, $STAT_TYPE:ident) => {{
Statistics::$STAT_TYPE(StatValues {
distinct_count: None,
min: $MIN,
max: $MAX,
count: 42,
})
}};
}
impl TestChunk {
/// Create a new TestChunk with a specified name
fn new(name: impl Into<String>) -> Self {
let name = name.into();
let summary = TableSummary::new(name.clone());
let builder = SchemaBuilder::new();
Self {
name,
summary,
builder,
}
}
/// Adds a tag column with the specified min/max values
fn with_tag(
mut self,
name: impl Into<String>,
min: Option<&str>,
max: Option<&str>,
) -> Self {
let min = min.map(|v| v.to_string());
let max = max.map(|v| v.to_string());
let tag_name = name.into();
self.builder.tag(&tag_name);
self.summary.columns.push(ColumnSummary {
name: tag_name,
influxdb_type: Some(InfluxDbType::Tag),
stats: make_stats!(min, max, String),
});
self
}
/// Adds a timestamp column with the specified min/max values
fn with_timestamp(mut self, min: i64, max: i64) -> Self {
self.builder.timestamp();
let min = Some(min);
let max = Some(max);
self.summary.columns.push(ColumnSummary {
name: TIME_COLUMN_NAME.into(),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: make_stats!(min, max, I64),
});
self
}
/// Adds an I64 field column with the specified min/max values
fn with_int_field(
mut self,
name: impl Into<String>,
min: Option<i64>,
max: Option<i64>,
) -> Self {
let field_name = name.into();
self.builder
.field(&field_name, arrow::datatypes::DataType::Int64);
self.summary.columns.push(ColumnSummary {
name: field_name,
influxdb_type: Some(InfluxDbType::Field),
stats: make_stats!(min, max, I64),
});
self
}
}
impl Prunable for TestChunk {
fn summary(&self) -> &TableSummary {
&self.summary
}
fn schema(&self) -> SchemaRef {
self.builder
// need to clone because `build` resets builder state
.clone()
.build()
.expect("created schema")
.as_arrow()
}
}
}

View File

@ -1206,6 +1206,7 @@ impl ExpressionVisitor for SupportVisitor {
Expr::BinaryExpr { op, .. } => { Expr::BinaryExpr { op, .. } => {
match op { match op {
Operator::Eq Operator::Eq
| Operator::NotEq
| Operator::Lt | Operator::Lt
| Operator::LtEq | Operator::LtEq
| Operator::Gt | Operator::Gt
@ -1217,7 +1218,7 @@ impl ExpressionVisitor for SupportVisitor {
| Operator::And | Operator::And
| Operator::Or => Ok(Recursion::Continue(self)), | Operator::Or => Ok(Recursion::Continue(self)),
// Unsupported (need to think about ramifications) // Unsupported (need to think about ramifications)
Operator::NotEq | Operator::Modulus | Operator::Like | Operator::NotLike => { Operator::Modulus | Operator::Like | Operator::NotLike => {
Err(DataFusionError::NotImplemented(format!( Err(DataFusionError::NotImplemented(format!(
"Unsupported operator in gRPC: {:?} in expression {:?}", "Unsupported operator in gRPC: {:?} in expression {:?}",
op, expr op, expr

View File

@ -16,6 +16,7 @@ use pruning::Prunable;
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
pub mod duplicate;
pub mod exec; pub mod exec;
pub mod frontend; pub mod frontend;
pub mod func; pub mod func;

View File

@ -249,20 +249,16 @@ mod test {
} }
#[test] #[test]
// Ignore tests as the pruning predicate can't be created. DF
// doesn't support boolean predicates:
// https://github.com/apache/arrow-datafusion/issues/490
#[ignore]
fn test_pruned_bool() { fn test_pruned_bool() {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();
// column1 where // column1 where
// c1: [false, true] --> pruned // c1: [false, false] --> pruned
let observer = TestObserver::new(); let observer = TestObserver::new();
let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column( let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column(
"column1", "column1",
Some(false), Some(false),
Some(true), Some(false),
)); ));
let predicate = PredicateBuilder::new().add_expr(col("column1")).build(); let predicate = PredicateBuilder::new().add_expr(col("column1")).build();
@ -355,20 +351,16 @@ mod test {
} }
#[test] #[test]
// Ignore tests as the pruning predicate can't be created. DF
// doesn't support boolean predicates:
// https://github.com/apache/arrow-datafusion/issues/490
#[ignore]
fn test_not_pruned_bool() { fn test_not_pruned_bool() {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();
// column1 // column1
// c1: [false, false] --> pruned // c1: [false, true] --> not pruned
let observer = TestObserver::new(); let observer = TestObserver::new();
let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column( let c1 = Arc::new(TestPrunable::new("chunk1").with_bool_column(
"column1", "column1",
Some(false), Some(false),
Some(false), Some(true),
)); ));
let predicate = PredicateBuilder::new().add_expr(col("column1")).build(); let predicate = PredicateBuilder::new().add_expr(col("column1")).build();

View File

@ -160,6 +160,18 @@ async fn test_read_filter_data_filter() {
"+------+-------+------+-------------------------------+", "+------+-------+------+-------------------------------+",
]; ];
run_read_filter_test_case!(
TwoMeasurementsMultiSeries {},
predicate,
expected_results.clone()
);
// Same results via a != predicate.
let predicate = PredicateBuilder::default()
.timestamp_range(200, 300)
.add_expr(col("state").not_eq(lit("MA"))) // state=CA
.build();
run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results); run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results);
} }

View File

@ -435,7 +435,7 @@ async fn sql_select_with_schema_merge_subset() {
} }
#[tokio::test] #[tokio::test]
async fn sql_predicate_pushdown_correctness() { async fn sql_predicate_pushdown_correctness_1() {
// Test 1: Select everything // Test 1: Select everything
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+-----------+", "+-------+--------+-------------------------------+-----------+",
@ -455,7 +455,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant", "SELECT * from restaurant",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_2() {
// Test 2: One push-down expression: count > 200 // Test 2: One push-down expression: count > 200
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+-----------+", "+-------+--------+-------------------------------+-----------+",
@ -474,7 +477,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where count > 200", "SELECT * from restaurant where count > 200",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_3() {
// Test 3: Two push-down expression: count > 200 and town != 'tewsbury' // Test 3: Two push-down expression: count > 200 and town != 'tewsbury'
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+-----------+", "+-------+--------+-------------------------------+-----------+",
@ -492,7 +498,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where count > 200 and town != 'tewsbury'", "SELECT * from restaurant where count > 200 and town != 'tewsbury'",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_4() {
// Test 4: Still two push-down expression: count > 200 and town != 'tewsbury' // Test 4: Still two push-down expression: count > 200 and town != 'tewsbury'
// even though the results are different // even though the results are different
let expected = vec![ let expected = vec![
@ -510,7 +519,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')", "SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_5() {
// Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000 // Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+-----------+", "+-------+--------+-------------------------------+-----------+",
@ -526,7 +538,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000", "SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_6() {
// Test 6: two push-down expression: count > 200 and count < 40000 // Test 6: two push-down expression: count > 200 and count < 40000
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+-----------+", "+-------+--------+-------------------------------+-----------+",
@ -544,7 +559,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where count > 200 and count < 40000", "SELECT * from restaurant where count > 200 and count < 40000",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_7() {
// Test 7: two push-down expression on float: system > 4.0 and system < 7.0 // Test 7: two push-down expression on float: system > 4.0 and system < 7.0
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+-----------+", "+-------+--------+-------------------------------+-----------+",
@ -563,7 +581,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where system > 4.0 and system < 7.0", "SELECT * from restaurant where system > 4.0 and system < 7.0",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_8() {
// Test 8: two push-down expression on float: system > 5.0 and system < 7.0 // Test 8: two push-down expression on float: system > 5.0 and system < 7.0
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+----------+", "+-------+--------+-------------------------------+----------+",
@ -579,7 +600,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where system > 5.0 and system < 7.0", "SELECT * from restaurant where system > 5.0 and system < 7.0",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_9() {
// Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 // Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
let expected = vec![ let expected = vec![
"+-------+--------+-------------------------------+----------+", "+-------+--------+-------------------------------+----------+",
@ -594,7 +618,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system", "SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_10() {
// Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 // Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
// even though there are more expressions,(count = 632 or town = 'reading'), in the filter // even though there are more expressions,(count = 632 or town = 'reading'), in the filter
let expected = vec![ let expected = vec![
@ -609,7 +636,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')", "SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_11() {
// Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and // Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
// time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') (rewritten to time GT int(130)) // time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') (rewritten to time GT int(130))
// //
@ -619,7 +649,10 @@ async fn sql_predicate_pushdown_correctness() {
"SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00')", "SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00')",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_12() {
// TODO: Hit stackoverflow in DF. Ticket https://github.com/apache/arrow-datafusion/issues/419 // TODO: Hit stackoverflow in DF. Ticket https://github.com/apache/arrow-datafusion/issues/419
// // Test 12: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and town = 'reading' // // Test 12: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and town = 'reading'
// // // //
@ -636,7 +669,10 @@ async fn sql_predicate_pushdown_correctness() {
// "SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and town = 'reading'", // "SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and town = 'reading'",
// &expected // &expected
// ); // );
}
#[tokio::test]
async fn sql_predicate_pushdown_correctness_13() {
// Test 13: three push-down expression: system > 5.0 and system < 7.0 and town = 'reading' // Test 13: three push-down expression: system > 5.0 and system < 7.0 and town = 'reading'
// //
// Check correctness // Check correctness
@ -655,7 +691,7 @@ async fn sql_predicate_pushdown_correctness() {
} }
#[tokio::test] #[tokio::test]
async fn sql_predicate_pushdown_explain() { async fn sql_predicate_pushdown_explain_1() {
// Test 1: Select everything // Test 1: Select everything
let expected = vec![ let expected = vec![
"+-----------------------------------------+--------------------------------------------------------------------------+", "+-----------------------------------------+--------------------------------------------------------------------------+",
@ -665,8 +701,12 @@ async fn sql_predicate_pushdown_explain() {
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+--------------------------------------------------------------------------+", "+-----------------------------------------+--------------------------------------------------------------------------+",
@ -676,304 +716,412 @@ async fn sql_predicate_pushdown_explain() {
"EXPLAIN VERBOSE SELECT * from restaurant", "EXPLAIN VERBOSE SELECT * from restaurant",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_2() {
// Test 2: One push-down expression: count > 200 // Test 2: One push-down expression: count > 200
// TODO: Make push-down predicates shown in explain verbose. Ticket #1538 // TODO: Make push-down predicates shown in explain verbose. Ticket #1538
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |", "| | Filter: #count Gt Int64(200) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |", "| | Filter: #count Gt Int64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |", "| | Filter: #count Gt Int64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: CAST(count AS Int64) > 200 |", "| | Filter: #count Gt Int64(200) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Int64) > 200 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200", "EXPLAIN VERBOSE SELECT * from restaurant where count > 200",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_2_2() {
// Test 2.2: One push-down expression: count > 200.0 // Test 2.2: One push-down expression: count > 200.0
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Float64(200) |", "| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Float64(200) |", "| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Float64(200) |", "| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: CAST(count AS Float64) > 200 |", "| | Filter: #count Gt Float64(200) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Float64) > 200 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200.0", "EXPLAIN VERBOSE SELECT * from restaurant where count > 200.0",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_2_3() {
// Test 2.3: One push-down expression: system > 4.0 // Test 2.3: One push-down expression: system > 4.0
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |", "| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |", "| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |", "| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: system > 4 |", "| | Filter: #system Gt Float64(4) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: system > 4 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0", "EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_3() {
// Test 3: Two push-down expression: count > 200 and town != 'tewsbury' // Test 3: Two push-down expression: count > 200 and town != 'tewsbury'
let expected = vec![ let expected = vec![
"+-----------------------------------------+-----------------------------------------------------------------------------+", "+-----------------------------------------+-----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+-----------------------------------------------------------------------------+", "+-----------------------------------------+-----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+-----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+-----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury'", "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury'",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_4() {
// Test 4: Still two push-down expression: count > 200 and town != 'tewsbury' // Test 4: Still two push-down expression: count > 200 and town != 'tewsbury'
// even though the results are different // even though the results are different
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')", "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_5() {
// Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000 // Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000
let expected = vec![ let expected = vec![
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence AND CAST(count AS Int64) < 40000 |", "| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence AND CAST(count AS Int64) < 40000 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000", "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_6() {
// Test 6: two push-down expression: count > 200 and count < 40000 // Test 6: two push-down expression: count > 200 and count < 40000
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |", "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(count AS Int64) < 40000 |", "| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(count AS Int64) < 40000 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and count < 40000", "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and count < 40000",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_7() {
// Test 7: two push-down expression on float: system > 4.0 and system < 7.0 // Test 7: two push-down expression on float: system > 4.0 and system < 7.0
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |", "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: system > 4 AND system < 7 |", "| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: system > 4 AND system < 7 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0 and system < 7.0", "EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0 and system < 7.0",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_8() {
// Test 8: two push-down expression on float: system > 5.0 and system < 7.0 // Test 8: two push-down expression on float: system > 5.0 and system < 7.0
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |", "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: system > 5 AND system < 7 |", "| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+----------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: system > 5 AND system < 7 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and system < 7.0", "EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and system < 7.0",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_9() {
// Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 // Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
let expected = vec![ let expected = vec![
"+-----------------------------------------+--------------------------------------------------------------------------------------------+", "+-----------------------------------------+--------------------------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+--------------------------------------------------------------------------------------------+", "+-----------------------------------------+--------------------------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |", "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: system > 5 AND CAST(town AS Utf8) != tewsbury AND 7 > system |", "| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+--------------------------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: system > 5 AND CAST(town AS Utf8) != tewsbury AND 7 > system |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+--------------------------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system", "EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_10() {
// Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 // Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
// even though there are more expressions,(count = 632 or town = 'reading'), in the filter // even though there are more expressions,(count = 632 or town = 'reading'), in the filter
let expected = vec![ let expected = vec![
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |", "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: system > 5 AND tewsbury != CAST(town AS Utf8) AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading |", "| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: system > 5 AND tewsbury != CAST(town AS Utf8) AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')", "EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')",
&expected &expected
); );
}
#[tokio::test]
async fn sql_predicate_pushdown_explain_11() {
// Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and // Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
// time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130) // time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130)
let expected = vec![ let expected = vec![
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", "+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |", "| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
"| | TableScan: restaurant projection=None |", "| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |", "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |", "| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | FilterExec: 5 < system AND CAST(town AS Utf8) != tewsbury AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading AND time > totimestamp(1970-01-01T00:00:00.000000130+00:00) |", "| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |", "| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #count, #system, #time, #town |",
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: 5 < system AND CAST(town AS Utf8) != tewsbury AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading AND time > totimestamp(1970-01-01T00:00:00.000000130+00:00) |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!( run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {}, TwoMeasurementsPredicatePushDown {},
@ -983,7 +1131,7 @@ async fn sql_predicate_pushdown_explain() {
} }
#[tokio::test] #[tokio::test]
async fn sql_deduplicate() { async fn sql_deduplicate_1() {
// This current expected is wrong because deduplicate is not available yet // This current expected is wrong because deduplicate is not available yet
let sql = let sql =
"select time, state, city, min_temp, max_temp, area from h2o order by time, state, city"; "select time, state, city, min_temp, max_temp, area from h2o order by time, state, city";
@ -1016,7 +1164,10 @@ async fn sql_deduplicate() {
"+-------------------------------+-------+---------+----------+----------+------+", "+-------------------------------+-------+---------+----------+----------+------+",
]; ];
run_sql_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); run_sql_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
}
#[tokio::test]
async fn sql_deduplicate_2() {
// Plan with order by // Plan with order by
let expected = vec![ let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+", "+-----------------------------------------+----------------------------------------------------------------------------+",
@ -1028,9 +1179,15 @@ async fn sql_deduplicate() {
"| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |", "| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| logical_plan after simplify_expressions | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |", "| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| logical_plan after simplify_expressions | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| physical_plan | SortExec: [time ASC,state ASC,city ASC] |", "| physical_plan | SortExec: [time ASC,state ASC,city ASC] |",
"| | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |", "| | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", "| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
@ -1038,53 +1195,73 @@ async fn sql_deduplicate() {
]; ];
let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o order by time, state, city"; let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o order by time, state, city";
run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
}
#[tokio::test]
async fn sql_deduplicate_3() {
// plan without order by // plan without order by
let expected = vec![ let expected = vec![
"+-----------------------------------------+--------------------------------------------------------------------+", "+-----------------------------------------+--------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+--------------------------------------------------------------------+", "+-----------------------------------------+--------------------------------------------------------------------+",
"| logical_plan | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| logical_plan | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=None |", "| | TableScan: h2o projection=None |",
"| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |", "| logical_plan after simplify_expressions | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| physical_plan | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |", "| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", "| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"+-----------------------------------------+--------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| physical_plan | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"+-----------------------------------------+--------------------------------------------------------------------+",
]; ];
let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o"; let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o";
run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
}
#[tokio::test]
async fn sql_deduplicate_4() {
// Union plan // Union plan
let sql = let sql =
"EXPLAIN VERBOSE select state as name from h2o UNION ALL select city as name from h2o"; "EXPLAIN VERBOSE select state as name from h2o UNION ALL select city as name from h2o";
let expected = vec![ let expected = vec![
"+-----------------------------------------+---------------------------------------------------------------------+", "+-----------------------------------------+---------------------------------------------------------------------+",
"| plan_type | plan |", "| plan_type | plan |",
"+-----------------------------------------+---------------------------------------------------------------------+", "+-----------------------------------------+---------------------------------------------------------------------+",
"| logical_plan | Union |", "| logical_plan | Union |",
"| | Projection: #state AS name |", "| | Projection: #state AS name |",
"| | TableScan: h2o projection=None |", "| | TableScan: h2o projection=None |",
"| | Projection: #city AS name |", "| | Projection: #city AS name |",
"| | TableScan: h2o projection=None |", "| | TableScan: h2o projection=None |",
"| logical_plan after projection_push_down | Union |", "| logical_plan after projection_push_down | Union |",
"| | Projection: #state AS name |", "| | Projection: #state AS name |",
"| | TableScan: h2o projection=Some([4]) |", "| | TableScan: h2o projection=Some([4]) |",
"| | Projection: #city AS name |", "| | Projection: #city AS name |",
"| | TableScan: h2o projection=Some([1]) |", "| | TableScan: h2o projection=Some([1]) |",
"| logical_plan after projection_push_down | Union |", "| logical_plan after simplify_expressions | Union |",
"| | Projection: #state AS name |", "| | Projection: #state AS name |",
"| | TableScan: h2o projection=Some([4]) |", "| | TableScan: h2o projection=Some([4]) |",
"| | Projection: #city AS name |", "| | Projection: #city AS name |",
"| | TableScan: h2o projection=Some([1]) |", "| | TableScan: h2o projection=Some([1]) |",
"| physical_plan | ExecutionPlan(PlaceHolder) |", "| logical_plan after projection_push_down | Union |",
"| | ProjectionExec: expr=[state as name] |", "| | Projection: #state AS name |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", "| | TableScan: h2o projection=Some([4]) |",
"| | ProjectionExec: expr=[city as name] |", "| | Projection: #city AS name |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |", "| | TableScan: h2o projection=Some([1]) |",
"+-----------------------------------------+---------------------------------------------------------------------+", "| logical_plan after simplify_expressions | Union |",
"| | Projection: #state AS name |",
"| | TableScan: h2o projection=Some([4]) |",
"| | Projection: #city AS name |",
"| | TableScan: h2o projection=Some([1]) |",
"| physical_plan | ExecutionPlan(PlaceHolder) |",
"| | ProjectionExec: expr=[state as name] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"| | ProjectionExec: expr=[city as name] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"+-----------------------------------------+---------------------------------------------------------------------+",
]; ];
run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected); run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
} }

View File

@ -12,6 +12,7 @@ use query::exec::Executor;
/// This module contains code for managing the configuration of the server. /// This module contains code for managing the configuration of the server.
use crate::{ use crate::{
db::{catalog::Catalog, Db}, db::{catalog::Catalog, Db},
write_buffer::KafkaBuffer,
Error, JobRegistry, Result, Error, JobRegistry, Result,
}; };
use observability_deps::tracing::{self, error, info, warn, Instrument}; use observability_deps::tracing::{self, error, info, warn, Instrument};
@ -150,6 +151,15 @@ impl Config {
return; return;
} }
// Right now, `KafkaBuffer` is the only production implementation of the `WriteBuffer`
// trait, so always use `KafkaBuffer` when there is a write buffer connection string
// specified. If/when there are other kinds of write buffers, additional configuration will
// be needed to determine what kind of write buffer to use here.
let write_buffer = rules
.write_buffer_connection_string
.as_ref()
.map(|conn| Arc::new(KafkaBuffer::new(conn)) as _);
let db = Arc::new(Db::new( let db = Arc::new(Db::new(
rules, rules,
server_id, server_id,
@ -157,6 +167,7 @@ impl Config {
exec, exec,
Arc::clone(&self.jobs), Arc::clone(&self.jobs),
preserved_catalog, preserved_catalog,
write_buffer,
)); ));
let shutdown = self.shutdown.child_token(); let shutdown = self.shutdown.child_token();

View File

@ -3,7 +3,7 @@
use self::access::QueryCatalogAccess; use self::access::QueryCatalogAccess;
use self::catalog::TableNameFilter; use self::catalog::TableNameFilter;
use super::JobRegistry; use super::{write_buffer::WriteBuffer, JobRegistry};
use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use async_trait::async_trait; use async_trait::async_trait;
use catalog::{chunk::Chunk as CatalogChunk, Catalog}; use catalog::{chunk::Chunk as CatalogChunk, Catalog};
@ -166,6 +166,11 @@ pub enum Error {
#[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))] #[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))]
UnknownMutableBufferChunk { chunk_id: u32 }, UnknownMutableBufferChunk { chunk_id: u32 },
#[snafu(display("Error sending entry to write buffer"))]
WriteBufferError {
source: Box<dyn std::error::Error + Sync + Send>,
},
#[snafu(display("Cannot write to this database: no mutable buffer configured"))] #[snafu(display("Cannot write to this database: no mutable buffer configured"))]
DatabaseNotWriteable {}, DatabaseNotWriteable {},
@ -301,6 +306,9 @@ pub struct Db {
/// Metric labels /// Metric labels
metric_labels: Vec<KeyValue>, metric_labels: Vec<KeyValue>,
/// Optionally buffer writes
write_buffer: Option<Arc<dyn WriteBuffer>>,
} }
/// Load preserved catalog state from store. /// Load preserved catalog state from store.
@ -393,6 +401,7 @@ impl Db {
exec: Arc<Executor>, exec: Arc<Executor>,
jobs: Arc<JobRegistry>, jobs: Arc<JobRegistry>,
preserved_catalog: PreservedCatalog<Catalog>, preserved_catalog: PreservedCatalog<Catalog>,
write_buffer: Option<Arc<dyn WriteBuffer>>,
) -> Self { ) -> Self {
let db_name = rules.name.clone(); let db_name = rules.name.clone();
@ -426,6 +435,7 @@ impl Db {
worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0),
metric_labels, metric_labels,
write_buffer,
} }
} }
@ -978,23 +988,44 @@ impl Db {
rules.lifecycle_rules.immutable rules.lifecycle_rules.immutable
}; };
// If the database is immutable, we don't even need to build a `SequencedEntry`. match (self.write_buffer.as_ref(), immutable) {
// There will be additional cases when we add the write buffer as the `SequencedEntry` (Some(write_buffer), true) => {
// will potentially need to be constructed from other values, like the Kafka partition // If only the write buffer is configured, this is passing the data through to
// and offset, instead of the process clock. // the write buffer, and it's not an error. We ignore the returned metadata; it
if immutable { // will get picked up when data is read from the write buffer.
DatabaseNotWriteable {}.fail() let _ = write_buffer.store_entry(&entry).context(WriteBufferError)?;
} else { Ok(())
let sequenced_entry = Arc::new( }
SequencedEntry::new_from_process_clock( (Some(write_buffer), false) => {
self.process_clock.next(), // If using both write buffer and mutable buffer, we want to wait for the write
self.server_id, // buffer to return success before adding the entry to the mutable buffer.
entry.data(), let sequence = write_buffer.store_entry(&entry).context(WriteBufferError)?;
) let sequenced_entry = Arc::new(
.context(SequencedEntryError)?, SequencedEntry::new_from_sequence(sequence, entry)
); .context(SequencedEntryError)?,
);
self.store_sequenced_entry(sequenced_entry) self.store_sequenced_entry(sequenced_entry)
}
(None, true) => {
// If no write buffer is configured and the database is immutable, trying to
// store an entry is an error and we don't need to build a `SequencedEntry`.
DatabaseNotWriteable {}.fail()
}
(None, false) => {
// If no write buffer is configured, use the process clock and send to the mutable
// buffer.
let sequenced_entry = Arc::new(
SequencedEntry::new_from_process_clock(
self.process_clock.next(),
self.server_id,
entry,
)
.context(SequencedEntryError)?,
);
self.store_sequenced_entry(sequenced_entry)
}
} }
} }
@ -1010,7 +1041,7 @@ impl Db {
// We may have gotten here through `store_entry`, in which case this is checking the // We may have gotten here through `store_entry`, in which case this is checking the
// configuration again unnecessarily, but we may have come here by consuming records from // configuration again unnecessarily, but we may have come here by consuming records from
// Kafka, so this check is necessary in that case. // the write buffer, so this check is necessary in that case.
if immutable { if immutable {
return DatabaseNotWriteable {}.fail(); return DatabaseNotWriteable {}.fail();
} }
@ -1302,8 +1333,11 @@ mod tests {
test_helpers::{try_write_lp, write_lp}, test_helpers::{try_write_lp, write_lp},
*, *,
}; };
use crate::db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}; use crate::{
use crate::utils::{make_db, TestDb}; db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr},
utils::{make_db, TestDb},
write_buffer::test_helpers::MockBuffer,
};
use ::test_helpers::assert_contains; use ::test_helpers::assert_contains;
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
@ -1352,8 +1386,57 @@ mod tests {
); );
} }
#[tokio::test]
async fn write_with_write_buffer_no_mutable_buffer() {
// Writes should be forwarded to the write buffer and *not* rejected if the write buffer is
// configured and the mutable buffer isn't
let write_buffer = Arc::new(MockBuffer::default());
let test_db = TestDb::builder()
.write_buffer(Arc::clone(&write_buffer) as _)
.build()
.await
.db;
test_db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn write_buffer_and_mutable_buffer() {
// Writes should be forwarded to the write buffer *and* the mutable buffer if both are
// configured.
let write_buffer = Arc::new(MockBuffer::default());
let test_db = TestDb::builder()
.write_buffer(Arc::clone(&write_buffer) as _)
.build()
.await
.db;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
let db = Arc::new(test_db);
let batches = run_query(db, "select * from cpu").await;
let expected = vec![
"+-----+-------------------------------+",
"| bar | time |",
"+-----+-------------------------------+",
"| 1 | 1970-01-01 00:00:00.000000010 |",
"+-----+-------------------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test] #[tokio::test]
async fn read_write() { async fn read_write() {
// This test also exercises the path without a write buffer.
let db = Arc::new(make_db().await.db); let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 10"); write_lp(&db, "cpu bar=1 10");

View File

@ -108,6 +108,7 @@ use std::collections::HashMap;
mod config; mod config;
pub mod db; pub mod db;
mod write_buffer;
/// Utility modules used by benchmarks and tests /// Utility modules used by benchmarks and tests
pub mod utils; pub mod utils;
@ -1166,6 +1167,7 @@ mod tests {
lifecycle_rules: Default::default(), lifecycle_rules: Default::default(),
routing_rules: None, routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2), worker_cleanup_avg_sleep: Duration::from_secs(2),
write_buffer_connection_string: None,
}; };
// Create a database // Create a database
@ -1262,6 +1264,7 @@ mod tests {
lifecycle_rules: Default::default(), lifecycle_rules: Default::default(),
routing_rules: None, routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2), worker_cleanup_avg_sleep: Duration::from_secs(2),
write_buffer_connection_string: None,
}; };
// Create a database // Create a database

View File

@ -9,6 +9,7 @@ use query::{exec::Executor, Database};
use crate::{ use crate::{
db::{load_or_create_preserved_catalog, Db}, db::{load_or_create_preserved_catalog, Db},
write_buffer::WriteBuffer,
JobRegistry, JobRegistry,
}; };
use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration}; use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration};
@ -33,6 +34,7 @@ pub struct TestDbBuilder {
object_store: Option<Arc<ObjectStore>>, object_store: Option<Arc<ObjectStore>>,
db_name: Option<DatabaseName<'static>>, db_name: Option<DatabaseName<'static>>,
worker_cleanup_avg_sleep: Option<Duration>, worker_cleanup_avg_sleep: Option<Duration>,
write_buffer: Option<Arc<dyn WriteBuffer>>,
} }
impl TestDbBuilder { impl TestDbBuilder {
@ -79,6 +81,7 @@ impl TestDbBuilder {
exec, exec,
Arc::new(JobRegistry::new()), Arc::new(JobRegistry::new()),
preserved_catalog, preserved_catalog,
self.write_buffer,
), ),
} }
} }
@ -102,6 +105,11 @@ impl TestDbBuilder {
self.worker_cleanup_avg_sleep = Some(d); self.worker_cleanup_avg_sleep = Some(d);
self self
} }
pub fn write_buffer(mut self, write_buffer: Arc<dyn WriteBuffer>) -> Self {
self.write_buffer = Some(write_buffer);
self
}
} }
/// Used for testing: create a Database with a local store /// Used for testing: create a Database with a local store

View File

@ -0,0 +1,61 @@
use entry::{Entry, Sequence};
/// A Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading entries
/// from the Write Buffer at a later time.
pub trait WriteBuffer: Sync + Send + std::fmt::Debug + 'static {
/// Send an `Entry` to the write buffer and return information that can be used to restore
/// entries at a later time.
fn store_entry(
&self,
entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>>;
// TODO: interface for restoring, will look something like:
// fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>;
}
#[derive(Debug)]
pub struct KafkaBuffer {
conn: String,
}
impl WriteBuffer for KafkaBuffer {
fn store_entry(
&self,
_entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>> {
unimplemented!()
}
}
impl KafkaBuffer {
pub fn new(conn: impl Into<String>) -> Self {
Self { conn: conn.into() }
}
}
pub mod test_helpers {
use super::*;
use std::sync::{Arc, Mutex};
#[derive(Debug, Default)]
pub struct MockBuffer {
pub entries: Arc<Mutex<Vec<Entry>>>,
}
impl WriteBuffer for MockBuffer {
fn store_entry(
&self,
entry: &Entry,
) -> Result<Sequence, Box<dyn std::error::Error + Sync + Send>> {
let mut entries = self.entries.lock().unwrap();
let offset = entries.len() as u64;
entries.push(entry.clone());
Ok(Sequence {
id: 0,
number: offset,
})
}
}
}

View File

@ -218,6 +218,7 @@ async fn test_create_get_update_database() {
seconds: 2, seconds: 2,
nanos: 0, nanos: 0,
}), }),
write_buffer_connection_string: "".into(),
}; };
client client