Merge pull request #4080 from influxdata/crepererum/issue3934d

refactor: dyn-dispatch chunks in query subsystem
pull/24376/head
kodiakhq[bot] 2022-03-21 12:47:28 +00:00 committed by GitHub
commit 26a7a61d0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 459 additions and 523 deletions

View File

@ -14,11 +14,11 @@ use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
use observability_deps::tracing::warn;
use parquet_file::metadata::IoxMetadata;
use query::exec::Executor;
use query::{
compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner,
util::compute_timenanosecond_min_max,
};
use query::{exec::Executor, QueryChunk};
use snafu::{ensure, ResultExt, Snafu};
use std::{
cmp::{max, min},
@ -387,6 +387,10 @@ impl Compactor {
}
// Merge schema of the compacting chunks
let query_chunks: Vec<_> = query_chunks
.into_iter()
.map(|c| Arc::new(c) as Arc<dyn QueryChunk>)
.collect();
let merged_schema = QueryableParquetChunk::merge_schemas(&query_chunks);
// Compute the sorted output of the compacting result
@ -394,11 +398,7 @@ impl Compactor {
// Build compact query plan
let plan = ReorgPlanner::new()
.compact_plan(
Arc::clone(&merged_schema),
query_chunks.into_iter().map(Arc::new),
sort_key.clone(),
)
.compact_plan(Arc::clone(&merged_schema), query_chunks, sort_key.clone())
.context(CompactLogicalPlanSnafu)?;
let ctx = self.exec.new_context(ExecutorType::Reorg);
let physical_plan = ctx

View File

@ -62,12 +62,10 @@ impl QueryableParquetChunk {
}
/// Merge schema of the given chunks
pub fn merge_schemas(chunks: &[Self]) -> Arc<Schema> {
pub fn merge_schemas(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
let mut merger = SchemaMerger::new();
for chunk in chunks {
merger = merger
.merge(&chunk.data.schema())
.expect("schemas compatible");
merger = merger.merge(&chunk.schema()).expect("schemas compatible");
}
Arc::new(merger.build())
}

View File

@ -14,11 +14,11 @@ use metric::{Attributes, DurationCounter, Metric, U64Counter};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::exec::IOxExecutionContext;
use query::{exec::IOxExecutionContext, QueryChunk};
use query::{
provider::{ChunkPruner, ProviderBuilder},
pruning::{prune_chunks, PruningObserver},
QueryChunkMeta, QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA,
QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA,
};
use schema::Schema;
use std::time::Instant;
@ -207,11 +207,15 @@ impl ChunkAccess {
/// Returns all chunks from `table_name` that may have data that passes the
/// specified predicates. The chunks are pruned as aggressively as
/// possible based on metadata.
fn candidate_chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<DbChunk>> {
fn candidate_chunks(
&self,
table_name: &str,
predicate: &Predicate,
) -> Vec<Arc<dyn QueryChunk>> {
let start = Instant::now();
// Get chunks and schema as a single transaction
let (mut chunks, schema) = {
let (chunks, schema) = {
let table = match self.catalog.table(table_name).ok() {
Some(table) => table,
None => return vec![],
@ -227,6 +231,11 @@ impl ChunkAccess {
(chunks, schema)
};
let mut chunks: Vec<_> = chunks
.into_iter()
.map(|c| c as Arc<dyn QueryChunk>)
.collect();
self.access_metrics.catalog_snapshot_count.inc(1);
self.access_metrics
.catalog_snapshot_duration
@ -249,14 +258,14 @@ impl ChunkAccess {
}
}
impl ChunkPruner<DbChunk> for ChunkAccess {
impl ChunkPruner for ChunkAccess {
fn prune_chunks(
&self,
table_name: &str,
table_schema: Arc<Schema>,
chunks: Vec<Arc<DbChunk>>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Vec<Arc<DbChunk>> {
) -> Vec<Arc<dyn QueryChunk>> {
let start = Instant::now();
debug!(num_chunks=chunks.len(), %predicate, "Attempting to prune chunks");
@ -271,9 +280,7 @@ impl ChunkPruner<DbChunk> for ChunkAccess {
}
impl PruningObserver for TableAccessMetrics {
type Observed = DbChunk;
fn was_pruned(&self, chunk: &Self::Observed) {
fn was_pruned(&self, chunk: &dyn QueryChunk) {
let chunk_summary = chunk.summary().expect("Chunk should have summary");
self.pruned_chunks.inc(1);
self.pruned_rows.inc(chunk_summary.total_count())
@ -282,10 +289,8 @@ impl PruningObserver for TableAccessMetrics {
#[async_trait]
impl QueryDatabase for QueryCatalogAccess {
type Chunk = DbChunk;
/// Return a covering set of chunks for a particular table and predicate
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<dyn QueryChunk>> {
self.chunk_access.candidate_chunks(table_name, predicate)
}
@ -376,8 +381,7 @@ impl SchemaProvider for DbSchemaProvider {
};
let mut builder = ProviderBuilder::new(table_name, schema);
builder =
builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner<DbChunk>>);
builder = builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner>);
// TODO: Better chunk pruning (#3570)
for chunk in self

View File

@ -44,6 +44,7 @@ use parquet_catalog::{
};
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::QueryChunk;
use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
QueryCompletedToken, QueryDatabase, QueryText,
@ -1228,9 +1229,7 @@ impl Db {
/// can just use Db as a `Database` even though the implementation
/// lives in `catalog_access`
impl QueryDatabase for Db {
type Chunk = DbChunk;
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<dyn QueryChunk>> {
self.catalog_access.chunks(table_name, predicate).await
}

View File

@ -1,4 +1,3 @@
use super::DbChunk;
use crate::{
catalog::{chunk::CatalogChunk, partition::Partition},
Db,
@ -25,7 +24,7 @@ use lifecycle::{
use observability_deps::tracing::{info, trace, warn};
use parking_lot::Mutex;
use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta;
use query::QueryChunk;
use schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME};
use std::{future::Future, sync::Arc};
use time::{Time, TimeProvider};
@ -395,7 +394,7 @@ fn collect_rub(
/// This is infallable because the schemas of chunks within a
/// partition are assumed to be compatible because that schema was
/// enforced as part of writing into the partition
fn merge_schemas(chunks: &[Arc<DbChunk>]) -> Arc<Schema> {
fn merge_schemas(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
let mut merger = SchemaMerger::new();
for db_chunk in chunks {
merger = merger

View File

@ -10,7 +10,8 @@ use crate::{
use data_types::{chunk_metadata::ChunkOrder, delete_predicate::DeletePredicate, job::Job};
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::*;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use query::QueryChunk;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner};
use std::{collections::HashSet, future::Future, sync::Arc};
use time::Time;
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -73,7 +74,7 @@ pub(crate) fn compact_chunks(
max_order = max_order.max(chunk.order());
chunk.set_compacting(&registration)?;
Ok(DbChunk::snapshot(&*chunk))
Ok(DbChunk::snapshot(&*chunk) as Arc<dyn QueryChunk>)
})
.collect::<Result<Vec<_>>>()?;

View File

@ -32,7 +32,7 @@ use parquet_file::{
storage::Storage,
};
use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint};
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunk};
use schema::sort::SortKey;
use schema::Schema;
use snafu::{OptionExt, ResultExt};
@ -281,7 +281,7 @@ fn mark_chunks_to_compact(
// Get the parquet dbchunk snapshot and also keep its file path to remove later
let dbchunk = DbChunk::parquet_file_snapshot(&*chunk);
compacted_parquet_file_paths.push(dbchunk.object_store_path().unwrap().clone());
Ok(dbchunk)
Ok(dbchunk as Arc<dyn QueryChunk>)
})
.collect::<Result<Vec<_>>>()?;
@ -322,7 +322,7 @@ struct CompactingOsChunks {
input_rows: u64,
delete_predicates: HashSet<Arc<DeletePredicate>>,
compacted_parquet_file_paths: Vec<ParquetFilePath>,
os_chunks: Vec<Arc<DbChunk>>,
os_chunks: Vec<Arc<dyn QueryChunk>>,
max_order: ChunkOrder,
database_checkpoint: DatabaseCheckpoint,
partition_checkpoint: PartitionCheckpoint,
@ -335,7 +335,7 @@ struct CompactingOsChunks {
/// Deleted and duplicated data will be eliminated during the scan
/// . Output schema of the compact plan
/// . Sort Key of the output data
async fn compact_chunks(db: &Db, query_chunks: &[Arc<DbChunk>]) -> Result<CompactedStream> {
async fn compact_chunks(db: &Db, query_chunks: &[Arc<dyn QueryChunk>]) -> Result<CompactedStream> {
// Tracking metric
let ctx = db.exec.new_context(ExecutorType::Reorg);

View File

@ -8,7 +8,7 @@ use crate::{catalog::chunk::CatalogChunk, lifecycle::collect_rub, DbChunk};
use data_types::job::Job;
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use query::{exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use query::{exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunk};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -30,7 +30,7 @@ pub fn load_chunk(
chunk.set_loading_to_read_buffer(&registration)?;
// Get queryable chunk
let db_chunk = DbChunk::snapshot(&*chunk);
let db_chunk = DbChunk::snapshot(&*chunk) as Arc<dyn QueryChunk>;
// Drop locks
let chunk = chunk.into_data().chunk;

View File

@ -11,7 +11,8 @@ use data_types::{chunk_metadata::ChunkOrder, delete_predicate::DeletePredicate,
use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition};
use observability_deps::tracing::info;
use persistence_windows::persistence_windows::FlushHandle;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use query::QueryChunk;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner};
use std::{collections::HashSet, future::Future, sync::Arc};
use time::Time;
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -75,7 +76,7 @@ pub fn persist_chunks(
max_order = max_order.max(chunk.order());
chunk.set_writing_to_object_store(&registration)?;
query_chunks.push(DbChunk::snapshot(&*chunk));
query_chunks.push(DbChunk::snapshot(&*chunk) as Arc<dyn QueryChunk>);
}
// drop partition lock guard

View File

@ -2,11 +2,10 @@ use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use db::chunk::DbChunk;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::{
exec::{ExecutionContextProvider, ExecutorType, IOxExecutionContext},
QueryCompletedToken, QueryDatabase, QueryText,
QueryChunk, QueryCompletedToken, QueryDatabase, QueryText,
};
use schema::Schema;
use trace::ctx::SpanContext;
@ -25,9 +24,7 @@ impl QueryDatabaseMeta for QuerierNamespace {
#[async_trait]
impl QueryDatabase for QuerierNamespace {
type Chunk = DbChunk;
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<dyn QueryChunk>> {
self.catalog_access.chunks(table_name, predicate).await
}

View File

@ -19,7 +19,7 @@ mod test {
frontend::reorg::ReorgPlanner,
provider::{DeduplicateExec, IOxReadFilterNode},
test::TestChunk,
QueryChunkMeta,
QueryChunk, QueryChunkMeta,
};
/// A macro to asserts the contents of the extracted metrics is reasonable
@ -89,9 +89,7 @@ mod test {
// now validate metrics are good
let extracted = extract_metrics(plan.as_ref(), |plan| {
plan.as_any()
.downcast_ref::<IOxReadFilterNode<TestChunk>>()
.is_some()
plan.as_any().downcast_ref::<IOxReadFilterNode>().is_some()
})
.unwrap();
@ -197,7 +195,7 @@ mod test {
extractor.inner
}
fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<TestChunk>>) {
fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
let chunk1 = Arc::new(
TestChunk::new("t")
.with_time_column_with_stats(Some(50), Some(7000))

View File

@ -37,7 +37,7 @@ use crate::{
},
provider::ProviderBuilder,
util::MissingColumnsToNull,
QueryChunk, QueryChunkMeta, QueryDatabase,
QueryChunk, QueryDatabase,
};
#[derive(Debug, Snafu)]
@ -914,17 +914,14 @@ impl InfluxRpcPlanner {
/// Filter(predicate)
/// TableScan (of chunks)
/// ```
fn tag_keys_plan<C>(
fn tag_keys_plan(
&self,
ctx: IOxExecutionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<StringSetPlan>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<StringSetPlan>> {
let scan_and_filter = self.scan_and_filter(
ctx.child_ctx("scan_and_filter planning"),
table_name,
@ -989,17 +986,14 @@ impl InfluxRpcPlanner {
/// Filter(predicate) [optional]
/// Scan
/// ```
fn field_columns_plan<C>(
fn field_columns_plan(
&self,
ctx: IOxExecutionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<LogicalPlan>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<LogicalPlan>> {
let scan_and_filter = self.scan_and_filter(
ctx.child_ctx("scan_and_filter planning"),
table_name,
@ -1054,16 +1048,13 @@ impl InfluxRpcPlanner {
/// Filter(predicate) [optional]
/// Scan
/// ```
fn table_name_plan<C>(
fn table_name_plan(
&self,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<LogicalPlan>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<LogicalPlan>> {
debug!(%table_name, "Creating table_name full plan");
let scan_and_filter = self.scan_and_filter(
self.ctx.child_ctx("scan_and_filter planning"),
@ -1107,17 +1098,14 @@ impl InfluxRpcPlanner {
/// Order by (tag_columns, timestamp_column)
/// Filter(predicate)
/// Scan
fn read_filter_plan<C>(
fn read_filter_plan(
&self,
ctx: IOxExecutionContext,
table_name: impl AsRef<str>,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<SeriesSetPlan>> {
let table_name = table_name.as_ref();
let scan_and_filter = self.scan_and_filter(
ctx.child_ctx("scan_and_filter planning"),
@ -1223,18 +1211,15 @@ impl InfluxRpcPlanner {
/// GroupBy(gby cols, aggs, time cols)
/// Filter(predicate)
/// Scan
fn read_group_plan<C>(
fn read_group_plan(
&self,
ctx: IOxExecutionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
agg: Aggregate,
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<SeriesSetPlan>> {
let scan_and_filter = self.scan_and_filter(
ctx.child_ctx("scan_and_filter planning"),
table_name,
@ -1341,7 +1326,7 @@ impl InfluxRpcPlanner {
/// Filter(predicate)
/// Scan
#[allow(clippy::too_many_arguments)]
fn read_window_aggregate_plan<C>(
fn read_window_aggregate_plan(
&self,
ctx: IOxExecutionContext,
table_name: impl Into<String>,
@ -1350,11 +1335,8 @@ impl InfluxRpcPlanner {
agg: Aggregate,
every: &WindowDuration,
offset: &WindowDuration,
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<SeriesSetPlan>> {
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(
ctx.child_ctx("scan_and_filter planning"),
@ -1429,17 +1411,14 @@ impl InfluxRpcPlanner {
/// Filter(predicate) [optional]
/// Scan
/// ```
fn scan_and_filter<C>(
fn scan_and_filter(
&self,
ctx: IOxExecutionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<TableScanAndFilter>>
where
C: QueryChunk + 'static,
{
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<TableScanAndFilter>> {
// Scan all columns to begin with (DataFusion projection
// push-down optimization will prune out unneeded columns later)
let projection = None;
@ -1505,10 +1484,10 @@ impl InfluxRpcPlanner {
/// Prunes the provided list of chunks using [`QueryChunk::apply_predicate_to_metadata`]
///
/// TODO: Should this logic live with the rest of the chunk pruning logic?
fn prune_chunks_metadata<C>(chunks: Vec<Arc<C>>, predicate: &Predicate) -> Result<Vec<Arc<C>>>
where
C: QueryChunk + 'static,
{
fn prune_chunks_metadata(
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Result<Vec<Arc<dyn QueryChunk>>> {
let mut filtered = Vec::with_capacity(chunks.len());
for chunk in chunks {
// Try and apply the predicate using only metadata

View File

@ -48,28 +48,22 @@ impl ReorgPlanner {
/// Creates an execution plan for a full scan of a single chunk.
/// This plan is primarilty used to load chunks from one storage medium to
/// another.
pub fn scan_single_chunk_plan<C>(
pub fn scan_single_chunk_plan(
&self,
schema: Arc<Schema>,
chunk: Arc<C>,
) -> Result<LogicalPlan>
where
C: QueryChunk + 'static,
{
chunk: Arc<dyn QueryChunk>,
) -> Result<LogicalPlan> {
self.scan_single_chunk_plan_with_filter(schema, chunk, None, vec![])
}
/// Creates an execution plan for a scan and filter data of a single chunk
pub fn scan_single_chunk_plan_with_filter<C>(
pub fn scan_single_chunk_plan_with_filter(
&self,
schema: Arc<Schema>,
chunk: Arc<C>,
chunk: Arc<dyn QueryChunk>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
) -> Result<LogicalPlan>
where
C: QueryChunk + 'static,
{
) -> Result<LogicalPlan> {
let table_name = chunk.table_name();
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name, schema);
@ -108,15 +102,14 @@ impl ReorgPlanner {
///
/// (Sort on output_sort)
/// (Scan chunks) <-- any needed deduplication happens here
pub fn compact_plan<C, I>(
pub fn compact_plan<I>(
&self,
schema: Arc<Schema>,
chunks: I,
sort_key: SortKey,
) -> Result<LogicalPlan>
where
C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>,
I: IntoIterator<Item = Arc<dyn QueryChunk>>,
{
let ScanPlan {
plan_builder,
@ -174,7 +167,7 @@ impl ReorgPlanner {
/// e | 3000
/// c | 4000
/// ```
pub fn split_plan<C, I>(
pub fn split_plan<I>(
&self,
schema: Arc<Schema>,
chunks: I,
@ -182,8 +175,7 @@ impl ReorgPlanner {
split_time: i64,
) -> Result<LogicalPlan>
where
C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>,
I: IntoIterator<Item = Arc<dyn QueryChunk>>,
{
let ScanPlan {
plan_builder,
@ -209,15 +201,14 @@ impl ReorgPlanner {
///
/// Refer to query::provider::build_scan_plan for the detail of the plan
///
fn sorted_scan_plan<C, I>(
fn sorted_scan_plan<I>(
&self,
schema: Arc<Schema>,
chunks: I,
sort_key: SortKey,
) -> Result<ScanPlan<C>>
) -> Result<ScanPlan>
where
C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>,
I: IntoIterator<Item = Arc<dyn QueryChunk>>,
{
let mut chunks = chunks.into_iter().peekable();
let table_name = match chunks.peek() {
@ -265,9 +256,9 @@ impl ReorgPlanner {
}
}
struct ScanPlan<C: QueryChunk + 'static> {
struct ScanPlan {
plan_builder: LogicalPlanBuilder,
provider: Arc<ChunkTableProvider<C>>,
provider: Arc<ChunkTableProvider>,
}
#[cfg(test)]
@ -280,12 +271,11 @@ mod test {
use crate::{
exec::{Executor, ExecutorType},
test::{raw_data, TestChunk},
QueryChunkMeta,
};
use super::*;
async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<TestChunk>>) {
async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
// Chunk 1 with 5 rows of data on 2 tags
let chunk1 = Arc::new(
TestChunk::new("t")
@ -293,7 +283,7 @@ mod test {
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 2 has an extra field, and only 4 fields
let chunk2 = Arc::new(
@ -304,7 +294,7 @@ mod test {
.with_i64_field_column("field_int2")
.with_may_contain_pk_duplicates(true)
.with_four_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
let expected = vec![
"+-----------+------+--------------------------------+",

View File

@ -137,12 +137,10 @@ pub type QueryText = Box<dyn std::fmt::Display + Send + Sync>;
/// data in Chunks.
#[async_trait]
pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
type Chunk: QueryChunk;
/// Returns a set of chunks within the partition with data that may match
/// the provided predicate. If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted.
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>>;
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<dyn QueryChunk>>;
/// Record that particular type of query was run / planned
fn record_query(
@ -258,20 +256,14 @@ where
}
/// return true if all the chunks inlcude statistics
pub fn chunks_have_stats<C>(chunks: &[C]) -> bool
where
C: QueryChunkMeta,
{
pub fn chunks_have_stats(chunks: &[Arc<dyn QueryChunk>]) -> bool {
// If at least one of the provided chunk cannot provide stats,
// do not need to compute potential duplicates. We will treat
// as all of them have duplicates
chunks.iter().all(|c| c.summary().is_some())
}
pub fn compute_sort_key_for_chunks<C>(schema: &Schema, chunks: &[C]) -> SortKey
where
C: QueryChunkMeta,
{
pub fn compute_sort_key_for_chunks(schema: &Schema, chunks: &[Arc<dyn QueryChunk>]) -> SortKey {
if !chunks_have_stats(chunks) {
// chunks have not enough stats, return its pk that is
// sorted lexicographically but time column always last

View File

@ -25,7 +25,7 @@ use crate::{
chunks_have_stats, compute_sort_key_for_chunks,
exec::IOxExecutionContext,
util::{arrow_sort_key_exprs, df_physical_expr},
QueryChunk, QueryChunkMeta,
QueryChunk,
};
use snafu::{ResultExt, Snafu};
@ -86,33 +86,33 @@ impl From<Error> for DataFusionError {
}
/// Something that can prune chunks based on their metadata
pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug {
pub trait ChunkPruner: Sync + Send + std::fmt::Debug {
/// prune `chunks`, if possible, based on predicate.
fn prune_chunks(
&self,
table_name: &str,
table_schema: Arc<Schema>,
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Vec<Arc<C>>;
) -> Vec<Arc<dyn QueryChunk>>;
}
/// Builds a `ChunkTableProvider` from a series of `QueryChunk`s
/// and ensures the schema across the chunks is compatible and
/// consistent.
#[derive(Debug)]
pub struct ProviderBuilder<C: QueryChunk + 'static> {
pub struct ProviderBuilder {
table_name: Arc<str>,
schema: Arc<Schema>,
chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>,
chunks: Vec<Arc<C>>,
chunk_pruner: Option<Arc<dyn ChunkPruner>>,
chunks: Vec<Arc<dyn QueryChunk>>,
sort_key: Option<SortKey>,
// execution context used for tracing
ctx: IOxExecutionContext,
}
impl<C: QueryChunk> ProviderBuilder<C> {
impl ProviderBuilder {
pub fn new(table_name: impl AsRef<str>, schema: Arc<Schema>) -> Self {
Self {
table_name: Arc::from(table_name.as_ref()),
@ -137,14 +137,14 @@ impl<C: QueryChunk> ProviderBuilder<C> {
}
/// Add a new chunk to this provider
pub fn add_chunk(mut self, chunk: Arc<C>) -> Self {
pub fn add_chunk(mut self, chunk: Arc<dyn QueryChunk>) -> Self {
self.chunks.push(chunk);
self
}
/// Specify a `ChunkPruner` for the provider that will apply
/// additional chunk level pruning based on pushed down predicates
pub fn add_pruner(mut self, chunk_pruner: Arc<dyn ChunkPruner<C>>) -> Self {
pub fn add_pruner(mut self, chunk_pruner: Arc<dyn ChunkPruner>) -> Self {
assert!(
self.chunk_pruner.is_none(),
"Chunk pruner already specified"
@ -165,7 +165,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
}
/// Create the Provider
pub fn build(self) -> Result<ChunkTableProvider<C>> {
pub fn build(self) -> Result<ChunkTableProvider> {
let chunk_pruner = match self.chunk_pruner {
Some(chunk_pruner) => chunk_pruner,
None => {
@ -192,14 +192,14 @@ impl<C: QueryChunk> ProviderBuilder<C> {
/// This allows DataFusion to see data from Chunks as a single table, as well as
/// push predicates and selections down to chunks
#[derive(Debug)]
pub struct ChunkTableProvider<C: QueryChunk + 'static> {
pub struct ChunkTableProvider {
table_name: Arc<str>,
/// The IOx schema (wrapper around Arrow Schemaref) for this table
iox_schema: Arc<Schema>,
/// Something that can prune chunks
chunk_pruner: Arc<dyn ChunkPruner<C>>,
chunk_pruner: Arc<dyn ChunkPruner>,
/// The chunks
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
/// The sort key if any
sort_key: Option<SortKey>,
@ -207,7 +207,7 @@ pub struct ChunkTableProvider<C: QueryChunk + 'static> {
ctx: IOxExecutionContext,
}
impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
impl ChunkTableProvider {
/// Return the IOx schema view for the data provided by this provider
pub fn iox_schema(&self) -> Arc<Schema> {
Arc::clone(&self.iox_schema)
@ -225,7 +225,7 @@ impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
}
#[async_trait]
impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
impl TableProvider for ChunkTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
@ -252,7 +252,7 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
// Now we have a second attempt to prune out chunks based on
// metadata using the pushed down predicate (e.g. in SQL).
let chunks: Vec<Arc<C>> = self.chunks.to_vec();
let chunks: Vec<Arc<dyn QueryChunk>> = self.chunks.to_vec();
let num_initial_chunks = chunks.len();
let chunks = self.chunk_pruner.prune_chunks(
self.table_name(),
@ -301,21 +301,21 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
#[derive(Debug)]
/// A deduplicater that deduplicate the duplicated data during scan execution
pub(crate) struct Deduplicater<C: QueryChunk + 'static> {
pub(crate) struct Deduplicater {
/// a vector of a vector of overlapped chunks
pub overlapped_chunks_set: Vec<Vec<Arc<C>>>,
pub overlapped_chunks_set: Vec<Vec<Arc<dyn QueryChunk>>>,
/// a vector of non-overlapped chunks each have duplicates in itself
pub in_chunk_duplicates_chunks: Vec<Arc<C>>,
pub in_chunk_duplicates_chunks: Vec<Arc<dyn QueryChunk>>,
/// a vector of non-overlapped and non-duplicates chunks
pub no_duplicates_chunks: Vec<Arc<C>>,
pub no_duplicates_chunks: Vec<Arc<dyn QueryChunk>>,
// execution context
ctx: IOxExecutionContext,
}
impl<C: QueryChunk + 'static> Deduplicater<C> {
impl Deduplicater {
pub(crate) fn new() -> Self {
Self {
overlapped_chunks_set: vec![],
@ -405,7 +405,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
&mut self,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,
output_sort_key: Option<SortKey>,
) -> Result<Arc<dyn ExecutionPlan>> {
@ -518,7 +518,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// 1. vector of vector of overlapped chunks
/// 2. vector of non-overlapped chunks, each have duplicates in itself
/// 3. vectors of non-overlapped chunks without duplicates
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) -> Result<()> {
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<dyn QueryChunk>>) -> Result<()> {
if !chunks_have_stats(&chunks) {
// no statistics, consider all chunks overlap
self.overlapped_chunks_set.push(chunks);
@ -590,7 +590,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
ctx: IOxExecutionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunks: Vec<Arc<C>>, // These chunks are identified overlapped
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks are identified overlapped
predicate: Predicate,
sort_key: &SortKey,
) -> Result<Arc<dyn ExecutionPlan>> {
@ -679,7 +679,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
ctx: IOxExecutionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having duplicates
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
predicate: Predicate,
sort_key: &SortKey,
) -> Result<Arc<dyn ExecutionPlan>> {
@ -807,8 +807,8 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
ctx: IOxExecutionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate, // This is the select predicate of the query
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
predicate: Predicate, // This is the select predicate of the query
sort_key: Option<&SortKey>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode
@ -898,7 +898,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// Add SortExec operator on top of the input plan of the given chunk
/// The plan will be sorted on the chunk's primary key
fn build_sort_plan(
chunk: Arc<C>,
chunk: Arc<dyn QueryChunk>,
input: Arc<dyn ExecutionPlan>,
output_sort_key: &SortKey,
) -> Result<Arc<dyn ExecutionPlan>> {
@ -952,7 +952,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
ctx: IOxExecutionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having no duplicates
chunk: Arc<dyn QueryChunk>, // This chunk is identified having no duplicates
predicate: Predicate,
sort_key: Option<&SortKey>,
) -> Result<Arc<dyn ExecutionPlan>> {
@ -1001,7 +1001,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
ctx: IOxExecutionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunks: Vec<Arc<C>>, // These chunks is identified having no duplicates
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks is identified having no duplicates
predicate: Predicate,
output_sort_key: Option<&SortKey>,
) -> Result<Vec<Arc<dyn ExecutionPlan>>> {
@ -1039,14 +1039,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
sorted_chunk_plans
}
fn no_delete_predicates(chunks: &[Arc<C>]) -> bool {
fn no_delete_predicates(chunks: &[Arc<dyn QueryChunk>]) -> bool {
chunks
.iter()
.all(|chunk| chunk.delete_predicates().is_empty())
}
/// Find the columns needed in chunks' primary keys across schemas
fn compute_pk_schema(chunks: &[Arc<C>]) -> Arc<Schema> {
fn compute_pk_schema(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
let mut schema_merger = SchemaMerger::new();
for chunk in chunks {
let chunk_schema = chunk.schema();
@ -1081,14 +1081,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
#[derive(Debug)]
/// A pruner that does not do pruning (suitable if no additional pruning is possible)
struct NoOpPruner {}
impl<C: QueryChunk> ChunkPruner<C> for NoOpPruner {
impl ChunkPruner for NoOpPruner {
fn prune_chunks(
&self,
_table_name: &str,
_table_schema: Arc<Schema>,
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
_predicate: &Predicate,
) -> Vec<Arc<C>> {
) -> Vec<Arc<dyn QueryChunk>> {
chunks
}
}
@ -1102,10 +1102,7 @@ mod test {
use datafusion_util::test_collect;
use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
use crate::{
test::{raw_data, TestChunk},
QueryChunkMeta,
};
use crate::test::{raw_data, TestChunk};
use super::*;
@ -1174,7 +1171,7 @@ mod test {
.with_tag_column("tag1")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
let sort_key = SortKey::from_columns(vec!["tag1", TIME_COLUMN_NAME]);
@ -1248,7 +1245,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
let sort_key = SortKey::from_columns(vec!["tag1", "tag2", "tag3", TIME_COLUMN_NAME]);
@ -1322,7 +1319,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
let sort_key = SortKey::from_columns(vec!["tag1", "tag2", TIME_COLUMN_NAME]);
@ -1367,7 +1364,7 @@ mod test {
.with_tag_column("tag2")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 2 exactly the same with Chunk 1
let chunk2 = Arc::new(
@ -1378,7 +1375,7 @@ mod test {
.with_tag_column("tag2")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
// the same for 2 chunks
let schema = chunk1.schema();
@ -1443,7 +1440,7 @@ mod test {
.with_tag_column("tag2")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 2 exactly the same with Chunk 1
let chunk2 = Arc::new(
@ -1454,7 +1451,7 @@ mod test {
.with_tag_column("tag2")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
let chunks = vec![chunk1, chunk2];
// data in its original form
@ -1523,7 +1520,7 @@ mod test {
.with_tag_column("tag2")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 2 same tags, but different fields
let chunk2 = Arc::new(
@ -1533,7 +1530,7 @@ mod test {
.with_tag_column("tag1")
.with_i64_field_column("other_field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 3 exactly the same with Chunk 2
let chunk3 = Arc::new(
@ -1543,7 +1540,7 @@ mod test {
.with_tag_column("tag1")
.with_i64_field_column("other_field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
let chunks = vec![chunk1, chunk2, chunk3];
// data in its original form
@ -1621,7 +1618,7 @@ mod test {
.with_tag_column("tag2")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 2 has two different tags
let chunk2 = Arc::new(
@ -1632,7 +1629,7 @@ mod test {
.with_tag_column("tag1")
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Chunk 3 has just tag3
let chunk3 = Arc::new(
@ -1643,7 +1640,7 @@ mod test {
.with_i64_field_column("field_int")
.with_i64_field_column("field_int2")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// With provided stats, the computed key will be (tag2, tag1, tag3, time)
// Requested output schema == the schema for all three
@ -1739,7 +1736,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk.schema();
@ -1791,7 +1788,7 @@ mod test {
.with_i64_field_column("field_int")
.with_may_contain_pk_duplicates(true)
.with_ten_rows_of_data_some_duplicates(),
);
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk.schema();
@ -1861,7 +1858,7 @@ mod test {
.with_i64_field_column("field_int")
.with_may_contain_pk_duplicates(true)
.with_ten_rows_of_data_some_duplicates(),
);
) as Arc<dyn QueryChunk>;
let chunks = vec![chunk];
// data in its original form
@ -1941,7 +1938,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_ten_rows_of_data_some_duplicates(),
);
) as Arc<dyn QueryChunk>;
let chunk2 = Arc::new(
TestChunk::new("t")
@ -1960,7 +1957,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk1.schema();
@ -2036,7 +2033,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_ten_rows_of_data_some_duplicates(),
);
) as Arc<dyn QueryChunk>;
// chunk2 overlaps with chunk 1
let chunk2 = Arc::new(
@ -2057,7 +2054,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// chunk3 no overlap, no duplicates within
let chunk3 = Arc::new(
@ -2078,7 +2075,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// chunk4 no overlap, duplicates within
let chunk4 = Arc::new(
@ -2100,7 +2097,7 @@ mod test {
.with_i64_field_column("field_int")
.with_may_contain_pk_duplicates(true)
.with_four_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk1.schema();
@ -2195,7 +2192,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_ten_rows_of_data_some_duplicates(),
);
) as Arc<dyn QueryChunk>;
// chunk2 overlaps with chunk 1
let chunk2 = Arc::new(
@ -2216,7 +2213,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_five_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// chunk3 no overlap, no duplicates within
let chunk3 = Arc::new(
@ -2237,7 +2234,7 @@ mod test {
)
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// chunk3 no overlap, duplicates within
let chunk4 = Arc::new(
@ -2259,7 +2256,7 @@ mod test {
.with_i64_field_column("field_int")
.with_may_contain_pk_duplicates(true)
.with_four_rows_of_data(),
);
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk1.schema();
@ -2333,7 +2330,7 @@ mod test {
assert_batches_eq!(&expected, &batch);
}
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
fn chunk_ids(group: &[Arc<dyn QueryChunk>]) -> String {
let ids = group
.iter()
.map(|c| c.id().get().to_string())
@ -2341,7 +2338,7 @@ mod test {
ids.join(", ")
}
fn chunk_group_ids(groups: &[Vec<Arc<TestChunk>>]) -> Vec<String> {
fn chunk_group_ids(groups: &[Vec<Arc<dyn QueryChunk>>]) -> Vec<String> {
groups
.iter()
.enumerate()

View File

@ -7,9 +7,9 @@
use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics};
use schema::TIME_COLUMN_NAME;
use snafu::Snafu;
use std::cmp::Ordering;
use std::{cmp::Ordering, sync::Arc};
use crate::QueryChunkMeta;
use crate::QueryChunk;
#[derive(Debug, Snafu)]
pub enum Error {
@ -32,7 +32,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Groups [`QueryChunkMeta`] objects into disjoint sets using values of
/// Groups [`QueryChunk`] objects into disjoint sets using values of
/// min/max statistics. The groups are formed such that each group
/// *may* contain InfluxDB data model primary key duplicates with
/// others in that set.
@ -48,17 +48,16 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// Note 2: this algorithm is O(n^2) worst case (when no chunks have
/// any overlap)
pub fn group_potential_duplicates<C>(chunks: Vec<C>) -> Result<Vec<Vec<C>>>
where
C: QueryChunkMeta,
{
let mut groups: Vec<Vec<KeyStats<'_, _>>> = vec![];
pub fn group_potential_duplicates(
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Vec<Vec<Arc<dyn QueryChunk>>>> {
let mut groups: Vec<Vec<KeyStats<'_>>> = vec![];
// Step 1: find the up groups using references to `chunks` stored
// in KeyStats views
for (idx, chunk) in chunks.iter().enumerate() {
// try to find a place to put this chunk
let mut key_stats = Some(KeyStats::new(idx, chunk));
let mut key_stats = Some(KeyStats::new(idx, chunk.as_ref()));
'outer: for group in &mut groups {
// If this chunk overlaps any existing chunk in group add
@ -89,7 +88,7 @@ where
.map(|group| group.into_iter().map(|key_stats| key_stats.index).collect())
.collect();
let mut chunks: Vec<Option<C>> = chunks.into_iter().map(Some).collect();
let mut chunks: Vec<Option<Arc<dyn QueryChunk>>> = chunks.into_iter().map(Some).collect();
let groups = groups
.into_iter()
@ -101,9 +100,9 @@ where
.take()
.expect("Internal mismatch while gathering into groups")
})
.collect::<Vec<C>>()
.collect::<Vec<_>>()
})
.collect::<Vec<Vec<C>>>();
.collect::<Vec<Vec<_>>>();
Ok(groups)
}
@ -111,29 +110,23 @@ where
/// Holds a view to a chunk along with information about its columns
/// in an easy to compare form
#[derive(Debug)]
struct KeyStats<'a, C>
where
C: QueryChunkMeta,
{
struct KeyStats<'a> {
/// The index of the chunk
index: usize,
/// The underlying chunk
#[allow(dead_code)]
chunk: &'a C,
chunk: &'a dyn QueryChunk,
/// the ColumnSummaries for the chunk's 'primary_key' columns, in
/// "lexographical" order (aka sorted by name)
key_summaries: Vec<&'a ColumnSummary>,
}
impl<'a, C> KeyStats<'a, C>
where
C: QueryChunkMeta,
{
impl<'a> KeyStats<'a> {
/// Create a new view for the specified chunk at index `index`,
/// computing the columns to be used in the primary key comparison
pub fn new(index: usize, chunk: &'a C) -> Self {
pub fn new(index: usize, chunk: &'a dyn QueryChunk) -> Self {
// find summaries for each primary key column:
let key_summaries = chunk
.schema()
@ -297,17 +290,17 @@ mod test {
#[test]
fn one_column_no_overlap() {
let c1 = TestChunk::new("chunk1").with_tag_column_with_stats(
let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats(
"tag1",
Some("boston"),
Some("mumbai"),
);
));
let c2 = TestChunk::new("chunk2").with_tag_column_with_stats(
let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats(
"tag1",
Some("new york"),
Some("zoo york"),
);
));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -317,17 +310,17 @@ mod test {
#[test]
fn one_column_overlap() {
let c1 = TestChunk::new("chunk1").with_tag_column_with_stats(
let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats(
"tag1",
Some("boston"),
Some("new york"),
);
));
let c2 = TestChunk::new("chunk2").with_tag_column_with_stats(
let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats(
"tag1",
Some("denver"),
Some("zoo york"),
);
));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -337,9 +330,11 @@ mod test {
#[test]
fn one_time_column_overlap() {
let c1 = TestChunk::new("chunk1").with_time_column_with_stats(Some(100), Some(1000));
let c1 =
Arc::new(TestChunk::new("chunk1").with_time_column_with_stats(Some(100), Some(1000)));
let c2 = TestChunk::new("chunk2").with_time_column_with_stats(Some(200), Some(500));
let c2 =
Arc::new(TestChunk::new("chunk2").with_time_column_with_stats(Some(200), Some(500)));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -349,24 +344,32 @@ mod test {
#[test]
fn multi_columns() {
let c1 = TestChunk::new("chunk1")
.with_time_column_with_stats(Some(0), Some(1000))
.with_tag_column_with_stats("tag1", Some("boston"), Some("new york"));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column_with_stats(Some(0), Some(1000))
.with_tag_column_with_stats("tag1", Some("boston"), Some("new york")),
);
// Overlaps in tag1, but not in time
let c2 = TestChunk::new("chunk2")
.with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york"))
.with_time_column_with_stats(Some(2000), Some(3000));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york"))
.with_time_column_with_stats(Some(2000), Some(3000)),
);
// Overlaps in time, but not in tag1
let c3 = TestChunk::new("chunk3")
.with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy"))
.with_time_column_with_stats(Some(500), Some(1500));
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy"))
.with_time_column_with_stats(Some(500), Some(1500)),
);
// Overlaps in time, and in tag1
let c4 = TestChunk::new("chunk4")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("zzz"))
.with_time_column_with_stats(Some(500), Some(1500));
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("zzz"))
.with_time_column_with_stats(Some(500), Some(1500)),
);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
@ -380,37 +383,49 @@ mod test {
#[test]
fn missing_columns() {
let c1 = TestChunk::new("chunk1")
.with_time_column_with_stats(Some(0), Some(1000))
.with_tag_column_with_stats("tag1", Some("boston"), Some("new york"))
.with_tag_column_with_stats("tag2", Some("boston"), Some("new york"))
.with_tag_column_with_stats("z", Some("a"), Some("b"));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column_with_stats(Some(0), Some(1000))
.with_tag_column_with_stats("tag1", Some("boston"), Some("new york"))
.with_tag_column_with_stats("tag2", Some("boston"), Some("new york"))
.with_tag_column_with_stats("z", Some("a"), Some("b")),
);
// Overlaps in tag1, but not in time
let c2 = TestChunk::new("chunk2")
.with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york"))
.with_time_column_with_stats(Some(2000), Some(3000));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york"))
.with_time_column_with_stats(Some(2000), Some(3000)),
);
// Overlaps in time and z, but not in tag2
let c3 = TestChunk::new("chunk3")
.with_tag_column_with_stats("tag2", Some("zzx"), Some("zzy"))
.with_tag_column_with_stats("z", Some("a"), Some("b"))
.with_time_column_with_stats(Some(0), Some(1000));
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_tag_column_with_stats("tag2", Some("zzx"), Some("zzy"))
.with_tag_column_with_stats("z", Some("a"), Some("b"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
// Overlaps in time, but not in tag1
let c4 = TestChunk::new("chunk4")
.with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy"))
.with_time_column_with_stats(Some(2000), Some(3000));
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy"))
.with_time_column_with_stats(Some(2000), Some(3000)),
);
// Overlaps in time, but not z
let c5 = TestChunk::new("chunk5")
.with_tag_column_with_stats("z", Some("c"), Some("d"))
.with_time_column_with_stats(Some(0), Some(1000));
let c5 = Arc::new(
TestChunk::new("chunk5")
.with_tag_column_with_stats("z", Some("c"), Some("d"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
// Overlaps in z, but not in time
let c6 = TestChunk::new("chunk6")
.with_tag_column_with_stats("z", Some("a"), Some("b"))
.with_time_column_with_stats(Some(4000), Some(5000));
let c6 = Arc::new(
TestChunk::new("chunk6")
.with_tag_column_with_stats("z", Some("a"), Some("b"))
.with_time_column_with_stats(Some(4000), Some(5000)),
);
let groups =
group_potential_duplicates(vec![c1, c2, c3, c4, c5, c6]).expect("grouping succeeded");
@ -436,24 +451,32 @@ mod test {
// Even "time" column is stored in front of "url", the primary_key function
// invoked inside potential_overlap invoked by group_potential_duplicates
// will return "url", "time"
let c1 = TestChunk::new("chunk1")
.with_time_column_with_stats(Some(0), Some(1000))
.with_tag_column_with_stats("url", Some("boston"), Some("new york")); // "url" > "time"
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column_with_stats(Some(0), Some(1000))
.with_tag_column_with_stats("url", Some("boston"), Some("new york")),
); // "url" > "time"
// Overlaps in tag1, but not in time
let c2 = TestChunk::new("chunk2")
.with_tag_column_with_stats("url", Some("denver"), Some("zoo york"))
.with_time_column_with_stats(Some(2000), Some(3000));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_tag_column_with_stats("url", Some("denver"), Some("zoo york"))
.with_time_column_with_stats(Some(2000), Some(3000)),
);
// Overlaps in time, but not in tag1
let c3 = TestChunk::new("chunk3")
.with_tag_column_with_stats("url", Some("zzx"), Some("zzy"))
.with_time_column_with_stats(Some(500), Some(1500));
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_tag_column_with_stats("url", Some("zzx"), Some("zzy"))
.with_time_column_with_stats(Some(500), Some(1500)),
);
// Overlaps in time, and in tag1
let c4 = TestChunk::new("chunk4")
.with_tag_column_with_stats("url", Some("aaa"), Some("zzz"))
.with_time_column_with_stats(Some(500), Some(1500));
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_tag_column_with_stats("url", Some("aaa"), Some("zzz"))
.with_time_column_with_stats(Some(500), Some(1500)),
);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
@ -468,10 +491,16 @@ mod test {
#[test]
fn boundary() {
// check that overlap calculations include the bound
let c1 =
TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
let c2 =
TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("bbb"), Some("ccc"));
let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats(
"tag1",
Some("aaa"),
Some("bbb"),
));
let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats(
"tag1",
Some("bbb"),
Some("ccc"),
));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -482,10 +511,16 @@ mod test {
#[test]
fn same() {
// check that if chunks overlap exactly on the boundaries they are still grouped
let c1 =
TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
let c2 =
TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats(
"tag1",
Some("aaa"),
Some("bbb"),
));
let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats(
"tag1",
Some("aaa"),
Some("bbb"),
));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -496,10 +531,16 @@ mod test {
#[test]
fn different_tag_names() {
// check that if chunks overlap but in different tag names
let c1 =
TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
let c2 =
TestChunk::new("chunk2").with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"));
let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats(
"tag1",
Some("aaa"),
Some("bbb"),
));
let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats(
"tag2",
Some("aaa"),
Some("bbb"),
));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -512,13 +553,17 @@ mod test {
#[test]
fn different_tag_names_multi_tags() {
// check that if chunks overlap but in different tag names
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")),
);
let c2 = TestChunk::new("chunk2")
.with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag3", Some("aaa"), Some("bbb"));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag3", Some("aaa"), Some("bbb")),
);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -529,22 +574,28 @@ mod test {
#[test]
fn three_column() {
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
// Timestamp doesn't overlap, but the two tags do
.with_time_column_with_stats(Some(2001), Some(3000));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
// Timestamp doesn't overlap, but the two tags do
.with_time_column_with_stats(Some(2001), Some(3000)),
);
let c3 = TestChunk::new("chunk3")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz"))
// all three overlap
.with_time_column_with_stats(Some(1000), Some(2000));
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz"))
// all three overlap
.with_time_column_with_stats(Some(1000), Some(2000)),
);
let groups = group_potential_duplicates(vec![c1, c2, c3]).expect("grouping succeeded");
@ -554,16 +605,20 @@ mod test {
#[test]
fn tag_order() {
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
.with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz"))
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
// all three overlap, but tags in different order
.with_time_column_with_stats(Some(500), Some(1000));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz"))
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
// all three overlap, but tags in different order
.with_time_column_with_stats(Some(500), Some(1000)),
);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -573,16 +628,20 @@ mod test {
#[test]
fn tag_order_no_tags() {
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, but no tag2 (aka it is all null)
// so it could overlap if there was a null tag2 value in chunk1
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_time_column_with_stats(Some(500), Some(1000));
let c2 = Arc::new(
TestChunk::new("chunk2")
// tag1 and timestamp overlap, but no tag2 (aka it is all null)
// so it could overlap if there was a null tag2 value in chunk1
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_time_column_with_stats(Some(500), Some(1000)),
);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -592,17 +651,21 @@ mod test {
#[test]
fn tag_order_null_stats() {
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, tag2 has no stats (is all null)
// so they might overlap if chunk1 had a null in tag 2
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", None, None)
.with_time_column_with_stats(Some(500), Some(1000));
let c2 = Arc::new(
TestChunk::new("chunk2")
// tag1 and timestamp overlap, tag2 has no stats (is all null)
// so they might overlap if chunk1 had a null in tag 2
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_tag_column_with_stats("tag2", None, None)
.with_time_column_with_stats(Some(500), Some(1000)),
);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -612,14 +675,18 @@ mod test {
#[test]
fn tag_order_partial_stats() {
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
// tag1 has a min but not a max. Should result in error
.with_tag_column_with_stats("tag1", Some("aaa"), None)
.with_time_column_with_stats(Some(500), Some(1000));
let c2 = Arc::new(
TestChunk::new("chunk2")
// tag1 has a min but not a max. Should result in error
.with_tag_column_with_stats("tag1", Some("aaa"), None)
.with_time_column_with_stats(Some(500), Some(1000)),
);
let result = group_potential_duplicates(vec![c1, c2]).unwrap_err();
@ -636,17 +703,21 @@ mod test {
#[test]
fn tag_fields_not_counted() {
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_i64_field_column_with_stats("field", Some(0), Some(2))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_i64_field_column_with_stats("field", Some(0), Some(2))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, but field value does not
// should still overlap
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_i64_field_column_with_stats("field", Some(100), Some(200))
.with_time_column_with_stats(Some(500), Some(1000));
let c2 = Arc::new(
TestChunk::new("chunk2")
// tag1 and timestamp overlap, but field value does not
// should still overlap
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_i64_field_column_with_stats("field", Some(100), Some(200))
.with_time_column_with_stats(Some(500), Some(1000)),
);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -659,16 +730,20 @@ mod test {
// When the same column has different types in different
// chunks; this will likely cause errors elsewhere in practice
// as the schemas are incompatible (and can't be merged)
let c1 = TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_time_column_with_stats(Some(0), Some(1000));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let c2 = TestChunk::new("chunk2")
// tag1 column is actually a field is different in chunk
// 2, so since the timestamps overlap these chunks
// might also have duplicates (if tag1 was null in c1)
.with_i64_field_column_with_stats("tag1", Some(100), Some(200))
.with_time_column_with_stats(Some(0), Some(1000));
let c2 = Arc::new(
TestChunk::new("chunk2")
// tag1 column is actually a field is different in chunk
// 2, so since the timestamps overlap these chunks
// might also have duplicates (if tag1 was null in c1)
.with_i64_field_column_with_stats("tag1", Some(100), Some(200))
.with_time_column_with_stats(Some(0), Some(1000)),
);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
@ -678,7 +753,7 @@ mod test {
// --- Test infrastructure --
fn to_string(groups: Vec<Vec<TestChunk>>) -> Vec<String> {
fn to_string(groups: Vec<Vec<Arc<dyn QueryChunk>>>) -> Vec<String> {
let mut s = vec![];
for (idx, group) in groups.iter().enumerate() {
let names = group.iter().map(|c| c.table_name()).collect::<Vec<_>>();

View File

@ -25,12 +25,12 @@ use super::adapter::SchemaAdapterStream;
/// Implements the DataFusion physical plan interface
#[derive(Debug)]
pub(crate) struct IOxReadFilterNode<C: QueryChunk + 'static> {
pub(crate) struct IOxReadFilterNode {
table_name: Arc<str>,
/// The desired output schema (includes selection)
/// note that the chunk may not have all these columns.
iox_schema: Arc<Schema>,
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
@ -39,7 +39,7 @@ pub(crate) struct IOxReadFilterNode<C: QueryChunk + 'static> {
ctx: IOxExecutionContext,
}
impl<C: QueryChunk + 'static> IOxReadFilterNode<C> {
impl IOxReadFilterNode {
/// Create a execution plan node that reads data from `chunks` producing
/// output according to schema, while applying `predicate` and
/// returns
@ -47,7 +47,7 @@ impl<C: QueryChunk + 'static> IOxReadFilterNode<C> {
ctx: IOxExecutionContext,
table_name: Arc<str>,
iox_schema: Arc<Schema>,
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,
) -> Self {
Self {
@ -62,7 +62,7 @@ impl<C: QueryChunk + 'static> IOxReadFilterNode<C> {
}
#[async_trait]
impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
impl ExecutionPlan for IOxReadFilterNode {
fn as_any(&self) -> &dyn std::any::Any {
self
}
@ -91,7 +91,7 @@ impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
assert!(children.is_empty(), "no children expected in iox plan");
let chunks: Vec<Arc<C>> = self.chunks.to_vec();
let chunks: Vec<Arc<dyn QueryChunk>> = self.chunks.to_vec();
// For some reason when I used an automatically derived `Clone` implementation
// the compiler didn't recognize the trait implementation

View File

@ -16,14 +16,13 @@ use observability_deps::tracing::{debug, trace};
use predicate::Predicate;
use schema::Schema;
use crate::{group_by::Aggregate, QueryChunkMeta};
use crate::group_by::Aggregate;
use crate::QueryChunk;
/// Something that cares to be notified when pruning of chunks occurs
pub trait PruningObserver {
type Observed;
/// Called when the specified chunk was pruned from observation
fn was_pruned(&self, _chunk: &Self::Observed) {}
fn was_pruned(&self, _chunk: &dyn QueryChunk) {}
/// Called when no pruning can happen at all for some reason
fn could_not_prune(&self, _reason: &str) {}
@ -35,15 +34,14 @@ pub trait PruningObserver {
///
/// TODO(raphael): Perhaps this should return `Result<Vec<bool>>` instead of
/// the [`PruningObserver`] plumbing
pub fn prune_chunks<C, O>(
pub fn prune_chunks<O>(
observer: &O,
table_schema: Arc<Schema>,
chunks: Vec<Arc<C>>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Vec<Arc<C>>
) -> Vec<Arc<dyn QueryChunk>>
where
C: QueryChunkMeta,
O: PruningObserver<Observed = C>,
O: PruningObserver,
{
let num_chunks = chunks.len();
trace!(num_chunks, %predicate, "Pruning chunks");
@ -104,14 +102,14 @@ where
pruned_chunks
}
/// Wraps a collection of [`QueryChunkMeta`] and implements the [`PruningStatistics`]
/// Wraps a collection of [`QueryChunk`] and implements the [`PruningStatistics`]
/// interface required by [`PruningPredicate`]
struct ChunkPruningStatistics<'a, C> {
struct ChunkPruningStatistics<'a> {
table_schema: &'a Schema,
chunks: &'a [Arc<C>],
chunks: &'a [Arc<dyn QueryChunk>],
}
impl<'a, C: QueryChunkMeta> ChunkPruningStatistics<'a, C> {
impl<'a> ChunkPruningStatistics<'a> {
/// Returns the [`DataType`] for `column`
fn column_type(&self, column: &Column) -> Option<&DataType> {
let index = self.table_schema.find_index_of(&column.name)?;
@ -130,10 +128,7 @@ impl<'a, C: QueryChunkMeta> ChunkPruningStatistics<'a, C> {
}
}
impl<'a, C> PruningStatistics for ChunkPruningStatistics<'a, C>
where
C: QueryChunkMeta,
{
impl<'a> PruningStatistics for ChunkPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let data_type = self.column_type(column)?;
let summaries = self.column_summaries(column);
@ -232,7 +227,7 @@ mod test {
use predicate::PredicateBuilder;
use schema::merge::SchemaMerger;
use crate::{test::TestChunk, QueryChunk};
use crate::{test::TestChunk, QueryChunk, QueryChunkMeta};
use super::*;
@ -478,7 +473,7 @@ mod test {
assert_eq!(names(&pruned), vec!["chunk1"]);
}
fn merge_schema(chunks: &[Arc<TestChunk>]) -> Arc<Schema> {
fn merge_schema(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
let mut merger = SchemaMerger::new();
for chunk in chunks {
merger = merger.merge(chunk.schema().as_ref()).unwrap();
@ -500,19 +495,20 @@ mod test {
"column1",
None,
Some(10),
));
)) as Arc<dyn QueryChunk>;
let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats(
"column1",
Some(0),
None,
));
)) as Arc<dyn QueryChunk>;
let c3 = Arc::new(
TestChunk::new("chunk3").with_i64_field_column_with_stats("column1", None, None),
);
) as Arc<dyn QueryChunk>;
let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1"));
let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1"))
as Arc<dyn QueryChunk>;
let predicate = PredicateBuilder::new()
.add_expr(col("column1").gt(lit(100)))
@ -543,35 +539,35 @@ mod test {
"column1",
Some(0),
Some(10),
));
)) as Arc<dyn QueryChunk>;
let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats(
"column1",
Some(0),
Some(1000),
));
)) as Arc<dyn QueryChunk>;
let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats(
"column1",
Some(10),
Some(20),
));
)) as Arc<dyn QueryChunk>;
let c4 = Arc::new(
TestChunk::new("chunk4").with_i64_field_column_with_stats("column1", None, None),
);
) as Arc<dyn QueryChunk>;
let c5 = Arc::new(TestChunk::new("chunk5").with_i64_field_column_with_stats(
"column1",
Some(10),
None,
));
)) as Arc<dyn QueryChunk>;
let c6 = Arc::new(TestChunk::new("chunk6").with_i64_field_column_with_stats(
"column1",
None,
Some(20),
));
)) as Arc<dyn QueryChunk>;
let predicate = PredicateBuilder::new()
.add_expr(col("column1").gt(lit(100)))
@ -601,19 +597,19 @@ mod test {
TestChunk::new("chunk1")
.with_i64_field_column_with_stats("column1", Some(0), Some(100))
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
);
) as Arc<dyn QueryChunk>;
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_i64_field_column_with_stats("column1", Some(0), Some(1000))
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
);
) as Arc<dyn QueryChunk>;
let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats(
"column2",
Some(0),
Some(4),
));
)) as Arc<dyn QueryChunk>;
let predicate = PredicateBuilder::new()
.add_expr(col("column1").gt(lit(100)))
@ -645,7 +641,7 @@ mod test {
None,
0,
),
);
) as Arc<dyn QueryChunk>;
// Has no nulls, can prune it out based on statistics alone
let c2 = Arc::new(
@ -657,7 +653,7 @@ mod test {
None,
0,
),
);
) as Arc<dyn QueryChunk>;
// Has nulls, can still can prune it out based on statistics alone
let c3 = Arc::new(
@ -669,7 +665,7 @@ mod test {
None,
1, // that one peksy null!
),
);
) as Arc<dyn QueryChunk>;
let predicate = PredicateBuilder::new()
.add_expr(
@ -705,37 +701,37 @@ mod test {
TestChunk::new("chunk1")
.with_i64_field_column_with_stats("column1", Some(0), Some(1000))
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
);
) as Arc<dyn QueryChunk>;
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
);
) as Arc<dyn QueryChunk>;
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
.with_i64_field_column_with_stats("column2", Some(5), Some(10)),
);
) as Arc<dyn QueryChunk>;
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_i64_field_column_with_stats("column1", Some(1000), Some(2000))
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
);
) as Arc<dyn QueryChunk>;
let c5 = Arc::new(
TestChunk::new("chunk5")
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
.with_i64_field_column_no_stats("column2"),
);
) as Arc<dyn QueryChunk>;
let c6 = Arc::new(
TestChunk::new("chunk6")
.with_i64_field_column_no_stats("column1")
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
);
) as Arc<dyn QueryChunk>;
let predicate = PredicateBuilder::new()
.add_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5))))
@ -753,7 +749,7 @@ mod test {
assert_eq!(names(&pruned), vec!["chunk1", "chunk4", "chunk6"]);
}
fn names(pruned: &[Arc<TestChunk>]) -> Vec<&str> {
fn names(pruned: &[Arc<dyn QueryChunk>]) -> Vec<&str> {
pruned.iter().map(|p| p.table_name()).collect()
}
@ -773,10 +769,10 @@ mod test {
}
impl PruningObserver for TestObserver {
type Observed = TestChunk;
fn was_pruned(&self, chunk: &Self::Observed) {
self.events.borrow_mut().push(format!("{}: Pruned", chunk))
fn was_pruned(&self, chunk: &dyn QueryChunk) {
self.events
.borrow_mut()
.push(format!("{}: Pruned", chunk.table_name()))
}
fn could_not_prune(&self, reason: &str) {

View File

@ -102,9 +102,7 @@ impl TestDatabase {
#[async_trait]
impl QueryDatabase for TestDatabase {
type Chunk = TestChunk;
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<dyn QueryChunk>> {
// save last predicate
*self.chunks_predicate.lock() = predicate.clone();
@ -113,7 +111,7 @@ impl QueryDatabase for TestDatabase {
.values()
.flat_map(|x| x.values())
.filter(|x| x.table_name == table_name)
.cloned()
.map(|x| Arc::clone(x) as _)
.collect()
}
@ -975,7 +973,7 @@ impl QueryChunkMeta for TestChunk {
}
/// Return the raw data from the list of chunks
pub async fn raw_data(chunks: &[Arc<TestChunk>]) -> Vec<RecordBatch> {
pub async fn raw_data(chunks: &[Arc<dyn QueryChunk>]) -> Vec<RecordBatch> {
let mut batches = vec![];
for c in chunks {
let pred = Predicate::default();

View File

@ -1,13 +1,12 @@
use std::{any::Any, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use datafusion::catalog::catalog::CatalogProvider;
use db::Db;
use predicate::rpc_predicate::QueryDatabaseMeta;
use query::{
exec::{ExecutionContextProvider, IOxExecutionContext},
QueryChunk, QueryChunkError, QueryChunkMeta, QueryDatabase,
QueryChunk, QueryDatabase,
};
use self::sealed::AbstractDbInterface;
@ -53,13 +52,11 @@ impl CatalogProvider for AbstractDb {
#[async_trait]
impl QueryDatabase for AbstractDb {
type Chunk = AbstractChunk;
async fn chunks(
&self,
table_name: &str,
predicate: &predicate::Predicate,
) -> Vec<Arc<Self::Chunk>> {
) -> Vec<Arc<dyn QueryChunk>> {
self.0.chunks(table_name, predicate).await
}
@ -83,87 +80,6 @@ impl QueryDatabaseMeta for AbstractDb {
}
}
#[derive(Debug)]
pub struct AbstractChunk(Arc<dyn QueryChunk>);
impl QueryChunk for AbstractChunk {
fn id(&self) -> ChunkId {
self.0.id()
}
fn addr(&self) -> ChunkAddr {
self.0.addr()
}
fn table_name(&self) -> &str {
self.0.table_name()
}
fn may_contain_pk_duplicates(&self) -> bool {
self.0.may_contain_pk_duplicates()
}
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, QueryChunkError> {
self.0.apply_predicate_to_metadata(predicate)
}
fn column_names(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, QueryChunkError> {
self.0.column_names(ctx, predicate, columns)
}
fn column_values(
&self,
ctx: IOxExecutionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, QueryChunkError> {
self.0.column_values(ctx, column_name, predicate)
}
fn read_filter(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, QueryChunkError> {
self.0.read_filter(ctx, predicate, selection)
}
fn chunk_type(&self) -> &str {
self.0.chunk_type()
}
fn order(&self) -> ChunkOrder {
self.0.order()
}
}
impl QueryChunkMeta for AbstractChunk {
fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary> {
self.0.summary()
}
fn schema(&self) -> Arc<schema::Schema> {
self.0.schema()
}
fn sort_key(&self) -> Option<&schema::sort::SortKey> {
self.0.sort_key()
}
fn delete_predicates(&self) -> &[Arc<data_types::delete_predicate::DeletePredicate>] {
self.0.delete_predicates()
}
}
mod sealed {
use super::*;
@ -182,7 +98,7 @@ mod sealed {
&self,
table_name: &str,
predicate: &predicate::Predicate,
) -> Vec<Arc<AbstractChunk>>;
) -> Vec<Arc<dyn QueryChunk>>;
fn record_query(
&self,
@ -218,13 +134,8 @@ impl AbstractDbInterface for OldDb {
&self,
table_name: &str,
predicate: &predicate::Predicate,
) -> Vec<Arc<AbstractChunk>> {
self.0
.chunks(table_name, predicate)
.await
.into_iter()
.map(|c| Arc::new(AbstractChunk(c as _)))
.collect()
) -> Vec<Arc<dyn QueryChunk>> {
self.0.chunks(table_name, predicate).await
}
fn record_query(

View File

@ -1,7 +1,7 @@
//! Tests for the table_names implementation
use arrow::datatypes::DataType;
use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use query::QueryDatabase;
use schema::selection::Selection;
use schema::{builder::SchemaBuilder, sort::SortKey, Schema, TIME_COLUMN_NAME};

View File

@ -12,7 +12,7 @@ use db::{
Db,
};
use futures::TryStreamExt;
use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use query::{QueryChunk, QueryDatabase};
use server::{
rules::ProvidedDatabaseRules,
test_utils::{make_application, make_initialized_server},
@ -143,7 +143,8 @@ async fn delete_predicate_preservation() {
async move {
for chunk in db.chunks(table_name, &Default::default()).await {
let partition_key = chunk.addr().partition_key.as_ref();
let addr = chunk.addr();
let partition_key = addr.partition_key.as_ref();
if partition_key == "part_b" {
// Strictly speaking not required because the chunk was persisted AFTER the delete predicate was
// registered so we can get away with materializing it during persistence.