feat: continue buidling and using sort_key if available
parent
5418a1fe6b
commit
8fd0df04f2
|
@ -147,13 +147,14 @@ impl<'a> SortKey<'a> {
|
|||
self.columns.is_empty()
|
||||
}
|
||||
|
||||
/// Returns sort keys of the given columns
|
||||
pub fn sort_columns(&self, select_keys: Vec<&str>) -> Vec<(&str, SortOptions)> {
|
||||
let keys: Vec<(&str, SortOptions)> = self
|
||||
/// Returns sort keys of the given columns
|
||||
pub fn selected_sort_key(&self, select_keys: Vec<&str>) -> SortKey<'a> {
|
||||
//Vec<(&str, SortOptions)> {
|
||||
let keys: IndexMap<&'a str, SortOptions> = self
|
||||
.columns
|
||||
.iter()
|
||||
.filter_map(|(col, options)| {
|
||||
if select_keys.iter().any(|key | key == col) {
|
||||
if select_keys.iter().any(|key| key == col) {
|
||||
Some((*col, *options))
|
||||
} else {
|
||||
None
|
||||
|
@ -161,7 +162,7 @@ impl<'a> SortKey<'a> {
|
|||
})
|
||||
.collect();
|
||||
|
||||
keys
|
||||
SortKey { columns: keys }
|
||||
}
|
||||
|
||||
/// Returns merge key of the 2 given keys if one covers the other. Returns None otherwise.
|
||||
|
|
|
@ -80,7 +80,7 @@ impl ReorgPlanner {
|
|||
|
||||
// Set the sort_key of the schema to the compacted chunk's sort key
|
||||
// Try to do this only if the sort key changes so we avoid unnecessary schema copies.
|
||||
trace!(input_schema=?schema, "Setting sort key on schema");
|
||||
trace!(input_schema=?schema, "Setting sort key on schema for compact plan");
|
||||
if schema
|
||||
.sort_key()
|
||||
.map_or(true, |existing_key| existing_key != output_sort)
|
||||
|
@ -89,7 +89,7 @@ impl ReorgPlanner {
|
|||
schema_cloned.set_sort_key(&output_sort);
|
||||
schema = Arc::new(schema_cloned);
|
||||
}
|
||||
trace!(output_schema=?schema, "Setting sort key on schema");
|
||||
trace!(output_schema=?schema, "Setting sort key on schema for compact plan");
|
||||
|
||||
let plan = plan_builder.build().context(BuildingPlan)?;
|
||||
|
||||
|
@ -161,9 +161,9 @@ impl ReorgPlanner {
|
|||
|
||||
let mut schema = provider.iox_schema();
|
||||
|
||||
// Set output_sort as the sort_key of the schema
|
||||
// Set output_sort as the sort_key of the schema
|
||||
// Try to do this only if the sort key changes so we avoid unnecessary schema copies.
|
||||
trace!(input_schema=?schema, "Setting sort key on schema");
|
||||
trace!(input_schema=?schema, "Setting sort key on schema for split plan");
|
||||
if schema
|
||||
.sort_key()
|
||||
.map_or(true, |existing_key| existing_key != output_sort)
|
||||
|
@ -172,8 +172,7 @@ impl ReorgPlanner {
|
|||
schema_cloned.set_sort_key(&output_sort);
|
||||
schema = Arc::new(schema_cloned);
|
||||
}
|
||||
trace!(output_schema=?schema, "Setting sort key on schema");
|
||||
|
||||
trace!(output_schema=?schema, "Setting sort key on schema for split plan");
|
||||
|
||||
// time <= split_time
|
||||
let ts_literal = Expr::Literal(ScalarValue::TimestampNanosecond(Some(split_time)));
|
||||
|
|
|
@ -8,14 +8,21 @@
|
|||
)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{chunk_metadata::ChunkSummary, partition_metadata::{InfluxDbType, TableSummary}};
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkSummary,
|
||||
partition_metadata::{InfluxDbType, TableSummary},
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use exec::{stringset::StringSet, Executor};
|
||||
use internal_types::{schema::{Schema, TIME_COLUMN_NAME, sort::SortKey}, selection::Selection};
|
||||
use internal_types::{
|
||||
schema::{sort::SortKey, Schema, TIME_COLUMN_NAME},
|
||||
selection::Selection,
|
||||
};
|
||||
use observability_deps::tracing::trace;
|
||||
use predicate::PredicateMatch;
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
use hashbrown::HashMap;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
pub mod exec;
|
||||
pub mod frontend;
|
||||
|
@ -139,6 +146,10 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
|
|||
|
||||
/// Returns the sort key of the chunk if any
|
||||
fn sort_key(&self) -> Option<SortKey<'_>>;
|
||||
//fn sort_key(&self) -> Option<SortKey>;
|
||||
|
||||
/// Sets sort key for the schema of this chunk
|
||||
fn set_sort_key(&mut self, sort_key: &SortKey<'_>);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -192,12 +203,20 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator<Item = &'a TableSummary>) -
|
|||
continue;
|
||||
}
|
||||
|
||||
let mut cnt = 0;
|
||||
if let Some(count) = column.stats.distinct_count() {
|
||||
*cardinalities.entry(column.name.as_str()).or_default() += count.get()
|
||||
cnt = count.get();
|
||||
}
|
||||
*cardinalities.entry(column.name.as_str()).or_default() += cnt;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(cardinalities=?cardinalities, "cardinalities of of columns to compute sort key");
|
||||
print!(
|
||||
"cardinalities of of columns to compute sort key: {:#?}",
|
||||
cardinalities
|
||||
);
|
||||
|
||||
let mut cardinalities: Vec<_> = cardinalities.into_iter().collect();
|
||||
cardinalities.sort_by_key(|x| x.1);
|
||||
|
||||
|
@ -206,10 +225,13 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator<Item = &'a TableSummary>) -
|
|||
key.push(col, Default::default())
|
||||
}
|
||||
key.push(TIME_COLUMN_NAME, Default::default());
|
||||
|
||||
trace!(computed_sort_key=?key, "Value of sort key from compute_sort_key");
|
||||
println!("Value of sort key from compute_sort_key: {:#?}", key);
|
||||
|
||||
key
|
||||
}
|
||||
|
||||
|
||||
// Note: I would like to compile this module only in the 'test' cfg,
|
||||
// but when I do so then other modules can not find them. For example:
|
||||
//
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
|
||||
use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
|
||||
use datafusion::{
|
||||
datasource::{
|
||||
datasource::{Statistics, TableProviderFilterPushDown},
|
||||
|
@ -19,10 +19,15 @@ use datafusion::{
|
|||
ExecutionPlan,
|
||||
},
|
||||
};
|
||||
use internal_types::schema::{Schema, merge::SchemaMerger, sort::SortKey};
|
||||
use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema};
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
|
||||
use crate::{QueryChunk, compute_sort_key, predicate::{Predicate, PredicateBuilder}, util::arrow_pk_sort_exprs};
|
||||
use crate::{
|
||||
compute_sort_key,
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
util::arrow_sort_key_exprs,
|
||||
QueryChunk,
|
||||
};
|
||||
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
|
@ -475,6 +480,8 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
|
||||
// Compute the output sort key which is the super key of all the key of the chunk base on their data cardinality
|
||||
let output_sort_key = compute_sort_key(chunks.iter().map(|x| x.summary()));
|
||||
trace!(output_sort_key=?output_sort_key, "Computed the sort key for many chunks in build_deduplicate_plan_for_overlapped_chunks");
|
||||
println!("Computed the sort key for many chunks in build_deduplicate_plan_for_overlapped_chunks: {:#?}", output_sort_key);
|
||||
|
||||
trace!(
|
||||
?output_schema,
|
||||
|
@ -495,7 +502,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
Arc::clone(&input_schema),
|
||||
Arc::clone(&chunk),
|
||||
predicate.clone(),
|
||||
&output_sort_key
|
||||
&output_sort_key,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
@ -506,7 +513,8 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
let plan = UnionExec::new(sorted_chunk_plans?);
|
||||
|
||||
// Now (sort) merge the already sorted chunks
|
||||
let sort_exprs = arrow_pk_sort_exprs(pk_schema.primary_key(), &plan.schema());
|
||||
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
|
||||
|
||||
let plan = Arc::new(SortPreservingMergeExec::new(
|
||||
sort_exprs.clone(),
|
||||
Arc::new(plan),
|
||||
|
@ -563,13 +571,20 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
Arc::clone(&input_schema),
|
||||
Arc::clone(&chunk),
|
||||
predicate,
|
||||
&output_sort_key
|
||||
&output_sort_key,
|
||||
)?;
|
||||
|
||||
// Add DeduplicateExc
|
||||
// Sort exprs for the deduplication
|
||||
// todo (see todo below)
|
||||
let sort_exprs = arrow_pk_sort_exprs(pk_schema.primary_key(), &plan.schema());
|
||||
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
|
||||
|
||||
trace!(Sort_Exprs=?sort_exprs, "Sort Expression for the sort operator of chunk {}", chunk.id());
|
||||
println!(
|
||||
"Sort Expression for the sort operator of chunk {}, Sort Exprs: {:#?}",
|
||||
chunk.id(),
|
||||
sort_exprs
|
||||
);
|
||||
|
||||
let plan = Self::add_deduplicate_node(sort_exprs, plan);
|
||||
|
||||
// select back to the requested output schema
|
||||
|
@ -642,7 +657,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
output_schema: Arc<Schema>,
|
||||
chunk: Arc<C>, // This chunk is identified having duplicates
|
||||
predicate: Predicate,
|
||||
super_sort_key: &SortKey<'_>
|
||||
super_sort_key: &SortKey<'_>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
// Create the bottom node IOxReadFilterNode for this chunk
|
||||
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
|
||||
|
@ -661,54 +676,63 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
fn build_sort_plan(
|
||||
chunk: Arc<C>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
super_sort_key: &SortKey<'_>
|
||||
super_sort_key: &SortKey<'_>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
|
||||
// super_sort_key cannot be empty
|
||||
if super_sort_key.is_empty() {
|
||||
panic!("Super sort key is empty");
|
||||
}
|
||||
|
||||
trace!(super_sort_key=?super_sort_key, "Super sort key input to build_sort_plan");
|
||||
println!(
|
||||
"Super sort key input to build_sort_plan: {:#?}",
|
||||
super_sort_key
|
||||
);
|
||||
|
||||
// Check to see if the plan is sorted on the subset of the super_sort_key
|
||||
let sort_key = chunk.sort_key();
|
||||
match sort_key {
|
||||
Some(chunk_sort_key) => {
|
||||
match SortKey::try_merge_key(super_sort_key, &chunk_sort_key) {
|
||||
Some(merge_key) => {
|
||||
if merge_key == *super_sort_key {
|
||||
// the chunk is already sorted on the subset of the super_sort_key,
|
||||
// no need to resort it
|
||||
return Ok(input);
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
let sort_key = chunk.sort_key();
|
||||
if let Some(chunk_sort_key) = sort_key {
|
||||
if let Some(merge_key) = SortKey::try_merge_key(super_sort_key, &chunk_sort_key) {
|
||||
if merge_key == *super_sort_key {
|
||||
// the chunk is already sorted on the subset of the super_sort_key,
|
||||
// no need to resort it
|
||||
trace!(ChunkID=?chunk.id(), "Chunk is sorted and no need the sort operator");
|
||||
println!(
|
||||
"Chunk is sorted and no need the sort operator, ChunkID={:#?}",
|
||||
chunk.id()
|
||||
);
|
||||
return Ok(input);
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Build the chunk's sort key that is a subset of the super_sort_key
|
||||
//
|
||||
//
|
||||
// First get the chunk pk columns
|
||||
let schema = chunk.schema();
|
||||
let key_columns = schema.primary_key();
|
||||
trace!(pk_columns=?key_columns, "PK columns of the chunk that have not been sorted yet");
|
||||
println!(
|
||||
"PK columns of the chunk that have not been sorted yet, pk_columns={:#?}",
|
||||
key_columns
|
||||
);
|
||||
|
||||
// Now get the key subset of the super key that includes the chunk pk columns
|
||||
let chunk_sort_keys = super_sort_key.sort_columns(key_columns);
|
||||
let chunk_sort_key = super_sort_key.selected_sort_key(key_columns);
|
||||
|
||||
// Build arrow sort expression for the chunk sort key
|
||||
let input_schema = input.schema();
|
||||
let mut sort_exprs = vec![];
|
||||
for (key, options) in chunk_sort_keys {
|
||||
let expr = physical_col(key, &input_schema).expect("pk in schema");
|
||||
sort_exprs.push(PhysicalSortExpr {
|
||||
expr,
|
||||
options: SortOptions {
|
||||
descending: options.descending,
|
||||
nulls_first: options.nulls_first
|
||||
}
|
||||
});
|
||||
}
|
||||
//let expr = physical_col(key.as_str(), &input_schema).expect("pk in schema");
|
||||
let sort_exprs = arrow_sort_key_exprs(chunk_sort_key, &input_schema);
|
||||
|
||||
trace!(Sort_Exprs=?sort_exprs, "Sort Expression for the sort operator of chunk {}", chunk.id());
|
||||
println!(
|
||||
"Sort Expression for the sort operator of chunk {}, Sort Exprs: {:#?}",
|
||||
chunk.id(),
|
||||
sort_exprs
|
||||
);
|
||||
// The chunk must be sorted after this, set sort key for it
|
||||
//chunk.set_sort_key(&chunk_sort_key);
|
||||
|
||||
// Create SortExec operator
|
||||
Ok(Arc::new(
|
||||
|
@ -802,7 +826,7 @@ mod test {
|
|||
use arrow::datatypes::DataType;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use datafusion::physical_plan::collect;
|
||||
use internal_types::schema::{TIME_COLUMN_NAME, builder::SchemaBuilder};
|
||||
use internal_types::schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
|
||||
|
||||
use crate::{
|
||||
test::{raw_data, TestChunk},
|
||||
|
@ -861,6 +885,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn sort_planning_one_tag_with_time() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -870,7 +896,7 @@ mod test {
|
|||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
let sort_key = SortKey::with_capacity(2);
|
||||
let mut sort_key = SortKey::with_capacity(2);
|
||||
sort_key.with_col("tag1");
|
||||
sort_key.with_col(TIME_COLUMN_NAME);
|
||||
|
||||
|
@ -919,6 +945,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn sort_planning_two_tags_with_time() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -929,9 +957,10 @@ mod test {
|
|||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
let sort_key = SortKey::with_capacity(3);
|
||||
let mut sort_key = SortKey::with_capacity(3);
|
||||
sort_key.with_col("tag1");
|
||||
sort_key.with_col("tag2");
|
||||
sort_key.with_col("tag3");
|
||||
sort_key.with_col(TIME_COLUMN_NAME);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -979,6 +1008,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn sort_read_filter_plan_for_two_tags_with_time() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -989,7 +1020,7 @@ mod test {
|
|||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
let sort_key = SortKey::with_capacity(3);
|
||||
let mut sort_key = SortKey::with_capacity(3);
|
||||
sort_key.with_col("tag1");
|
||||
sort_key.with_col("tag2");
|
||||
sort_key.with_col(TIME_COLUMN_NAME);
|
||||
|
@ -1002,7 +1033,7 @@ mod test {
|
|||
schema,
|
||||
Arc::clone(&chunk),
|
||||
Predicate::default(),
|
||||
&sort_key
|
||||
&sort_key,
|
||||
);
|
||||
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
||||
// data is not sorted on primary key(tag1, tag2, time)
|
||||
|
@ -1022,6 +1053,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn deduplicate_plan_for_overlapped_chunks() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -1091,6 +1124,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn deduplicate_plan_for_overlapped_chunks_subset() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Same two chunks but only select the field and timestamp, not the tag values
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
|
@ -1141,6 +1176,8 @@ mod test {
|
|||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Since no stats provided, the sort key will be computed as if all column has the same cardinality
|
||||
// which is tag2, tag1, time
|
||||
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
|
||||
Arc::from("t"),
|
||||
Arc::new(schema),
|
||||
|
@ -1148,16 +1185,16 @@ mod test {
|
|||
Predicate::default(),
|
||||
);
|
||||
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
||||
// expect onlt only 5 values, with "f1" and "timestamp" (even though input has 10)
|
||||
// expect only 5 values, with "f1" and "timestamp" (even though input has 10)
|
||||
let expected = vec![
|
||||
"+-----------+-------------------------------+",
|
||||
"| field_int | time |",
|
||||
"+-----------+-------------------------------+",
|
||||
"| 100 | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 70 | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 5 | 1970-01-01 00:00:00.000005 |",
|
||||
"| 10 | 1970-01-01 00:00:00.000007 |",
|
||||
"| 70 | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 1000 | 1970-01-01 00:00:00.000001 |",
|
||||
"| 100 | 1970-01-01 00:00:00.000000050 |",
|
||||
"+-----------+-------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &batch);
|
||||
|
@ -1165,6 +1202,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn deduplicate_plan_for_overlapped_chunks_subset_different_fields() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Chunks with different fields / tags, and select a subset
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
|
@ -1229,6 +1268,8 @@ mod test {
|
|||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Since no stats provided, the sort key will be computed as all pk columns has the same cardinality
|
||||
// which is tag2, tag1, time
|
||||
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
|
||||
Arc::from("t"),
|
||||
Arc::new(schema),
|
||||
|
@ -1241,16 +1282,16 @@ mod test {
|
|||
"+-----------+-----------------+",
|
||||
"| field_int | other_field_int |",
|
||||
"+-----------+-----------------+",
|
||||
"| 100 | |",
|
||||
"| | 100 |",
|
||||
"| 70 | |",
|
||||
"| | 70 |",
|
||||
"| 5 | |",
|
||||
"| 10 | |",
|
||||
"| 1000 | |",
|
||||
"| | 1000 |",
|
||||
"| | 5 |",
|
||||
"| | 10 |",
|
||||
"| 5 | |",
|
||||
"| 10 | |",
|
||||
"| 70 | |",
|
||||
"| 1000 | |",
|
||||
"| 100 | |",
|
||||
"+-----------+-----------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &batch);
|
||||
|
@ -1258,6 +1299,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn deduplicate_plan_for_overlapped_chunks_with_different_schemas() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -1333,26 +1376,27 @@ mod test {
|
|||
Predicate::default(),
|
||||
);
|
||||
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
||||
// data is sorted on primary key(tag1, tag2, tag3, time)
|
||||
// todo: set stats to have deterministic sort key
|
||||
// since no stat, data is sorted on any of combination of this pk cols(tag1, tag2, tag3, time)
|
||||
let expected = vec![
|
||||
"+-----------+------------+------+------+------+-------------------------------+",
|
||||
"| field_int | field_int2 | tag1 | tag2 | tag3 | time |",
|
||||
"+-----------+------------+------+------+------+-------------------------------+",
|
||||
"| 100 | | AL | MA | | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 100 | | AL | | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 70 | | CT | CT | | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 70 | | CT | | AL | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 5 | | MT | AL | | 1970-01-01 00:00:00.000005 |",
|
||||
"| 10 | | MT | AL | | 1970-01-01 00:00:00.000007 |",
|
||||
"| 1000 | | MT | CT | | 1970-01-01 00:00:00.000001 |",
|
||||
"| 1000 | | MT | | CT | 1970-01-01 00:00:00.000001 |",
|
||||
"| 5 | | MT | | MT | 1970-01-01 00:00:00.000005 |",
|
||||
"| 10 | | MT | | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 100 | 100 | | | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 70 | 70 | | | AL | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 1000 | 1000 | | | CT | 1970-01-01 00:00:00.000001 |",
|
||||
"| 5 | 5 | | | MT | 1970-01-01 00:00:00.000005 |",
|
||||
"| 10 | 10 | | | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 100 | | AL | | AL | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 100 | | AL | MA | | 1970-01-01 00:00:00.000000050 |",
|
||||
"| 70 | | CT | | AL | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 70 | | CT | CT | | 1970-01-01 00:00:00.000000100 |",
|
||||
"| 1000 | | MT | | CT | 1970-01-01 00:00:00.000001 |",
|
||||
"| 5 | | MT | | MT | 1970-01-01 00:00:00.000005 |",
|
||||
"| 10 | | MT | | MT | 1970-01-01 00:00:00.000007 |",
|
||||
"| 5 | | MT | AL | | 1970-01-01 00:00:00.000005 |",
|
||||
"| 10 | | MT | AL | | 1970-01-01 00:00:00.000007 |",
|
||||
"| 1000 | | MT | CT | | 1970-01-01 00:00:00.000001 |",
|
||||
"+-----------+------------+------+------+------+-------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &batch);
|
||||
|
@ -1360,6 +1404,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn scan_plan_with_one_chunk_no_duplicates() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Test no duplicate at all
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -1408,6 +1454,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn scan_plan_with_one_chunk_with_duplicates() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Test one chunk with duplicate within
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -1464,6 +1512,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn scan_plan_with_one_chunk_with_duplicates_subset() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Test one chunk with duplicate within
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -1529,6 +1579,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn scan_plan_with_two_overlapped_chunks_with_duplicates() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// test overlapped chunks
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -1598,6 +1650,8 @@ mod test {
|
|||
|
||||
#[tokio::test]
|
||||
async fn scan_plan_with_four_chunks() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
|
|
@ -793,6 +793,13 @@ impl QueryChunk for TestChunk {
|
|||
None
|
||||
}
|
||||
|
||||
/// Sets sort key for the schema of this chunk
|
||||
fn set_sort_key(&mut self, sort_key: &SortKey<'_>) {
|
||||
let mut schema_cloned = self.schema.as_ref().clone();
|
||||
schema_cloned.set_sort_key(sort_key);
|
||||
self.schema = Arc::new(schema_cloned);
|
||||
}
|
||||
|
||||
fn apply_predicate_to_metadata(&self, predicate: &Predicate) -> Result<PredicateMatch> {
|
||||
self.check_error()?;
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ use datafusion::{
|
|||
optimizer::utils::expr_to_columns,
|
||||
physical_plan::expressions::{col as physical_col, PhysicalSortExpr},
|
||||
};
|
||||
use internal_types::schema::Schema;
|
||||
use internal_types::schema::{sort::SortKey, Schema};
|
||||
|
||||
/// Create a logical plan that produces the record batch
|
||||
pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result<LogicalPlan, DataFusionError> {
|
||||
|
@ -51,6 +51,25 @@ pub fn arrow_pk_sort_exprs(
|
|||
sort_exprs
|
||||
}
|
||||
|
||||
pub fn arrow_sort_key_exprs(
|
||||
sort_key: SortKey<'_>,
|
||||
input_schema: &ArrowSchema,
|
||||
) -> Vec<PhysicalSortExpr> {
|
||||
let mut sort_exprs = vec![];
|
||||
for (key, options) in sort_key.iter() {
|
||||
let expr = physical_col(key, &input_schema).expect("sort key column in schema");
|
||||
sort_exprs.push(PhysicalSortExpr {
|
||||
expr,
|
||||
options: SortOptions {
|
||||
descending: options.descending,
|
||||
nulls_first: options.nulls_first,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
sort_exprs
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::prelude::*;
|
||||
|
|
|
@ -9,7 +9,10 @@ use snafu::{ResultExt, Snafu};
|
|||
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use datafusion_util::MemoryStream;
|
||||
use internal_types::{schema::{Schema, sort::SortKey}, selection::Selection};
|
||||
use internal_types::{
|
||||
schema::{sort::SortKey, Schema},
|
||||
selection::Selection,
|
||||
};
|
||||
use mutable_buffer::chunk::snapshot::ChunkSnapshot;
|
||||
use object_store::path::Path;
|
||||
use observability_deps::tracing::debug;
|
||||
|
@ -438,36 +441,35 @@ impl QueryChunk for DbChunk {
|
|||
/// However, since we current sorted data based on their cardinality (see compute_sort_key),
|
||||
/// 2 different chunks may be sorted on different order of key columns.
|
||||
fn is_sorted_on_pk(&self) -> bool {
|
||||
match self.schema().sort_key() {
|
||||
Some(_sort_key) => return true,
|
||||
_ => return false,
|
||||
}
|
||||
self.schema().sort_key().is_some()
|
||||
}
|
||||
|
||||
/// Returns the sort key of the chunk if any
|
||||
fn sort_key(&self) -> Option<SortKey<'_>> {
|
||||
// Try1
|
||||
// let schema = self.schema();
|
||||
// schema.sort_key()
|
||||
self.meta.schema.sort_key()
|
||||
}
|
||||
|
||||
// try 2
|
||||
// let schema_cloned = self.schema().as_ref().clone();
|
||||
// let schema = Arc::new(schema_cloned);
|
||||
// let sort_key = schema.sort_key();
|
||||
// sort_key
|
||||
/// Sets sort key for the schema of this chunk
|
||||
fn set_sort_key(&mut self, _sort_key: &SortKey<'_>) {
|
||||
|
||||
// try3 : still compile error
|
||||
let sort_key = self.schema().sort_key();
|
||||
match sort_key {
|
||||
Some(keys) => {
|
||||
let mut sk = SortKey::with_capacity(keys.len());
|
||||
for (key, options) in keys.iter() {
|
||||
sk.with_col_opts(key.clone(), options.descending, options.nulls_first);
|
||||
}
|
||||
return Some(sk);
|
||||
}
|
||||
None => None
|
||||
}
|
||||
// todo
|
||||
// trace!(sort_key=?sort_key, "Input sort key to set_sort_key");
|
||||
|
||||
// // Update schema of the DBChunk
|
||||
// let mut schema_cloned = self.meta.schema.as_ref().clone();
|
||||
// schema_cloned.set_sort_key(sort_key);
|
||||
|
||||
// self.meta = Arc::new(ChunkMetadata {
|
||||
// table_summary: Arc::new(self.meta.table_summary.as_ref()),
|
||||
// schema: Arc::new(schema_cloned)
|
||||
// });
|
||||
|
||||
// Update schema of the chunk itself
|
||||
// match &self.state {
|
||||
// State::MutableBuffer { chunk, .. } => {}
|
||||
// State::ReadBuffer { chunk, .. } => {}
|
||||
// State::ParquetFile { chunk, .. } => {}
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ use lifecycle::LifecycleWriteGuard;
|
|||
use observability_deps::tracing::info;
|
||||
use query::exec::ExecutorType;
|
||||
use query::frontend::reorg::ReorgPlanner;
|
||||
use query::{QueryChunkMeta, compute_sort_key};
|
||||
use query::{compute_sort_key, QueryChunkMeta};
|
||||
use read_buffer::{ChunkMetrics, RBChunk};
|
||||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
pub(crate) use crate::db::chunk::DbChunk;
|
||||
use crate::db::catalog::chunk::CatalogChunk;
|
||||
pub(crate) use crate::db::chunk::DbChunk;
|
||||
use ::lifecycle::LifecycleWriteGuard;
|
||||
use data_types::job::Job;
|
||||
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use query::{QueryChunkMeta, compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner};
|
||||
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
|
||||
use std::{future::Future, sync::Arc};
|
||||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ use lifecycle::{LifecycleWriteGuard, LockableChunk};
|
|||
use observability_deps::tracing::info;
|
||||
use query::exec::ExecutorType;
|
||||
use query::frontend::reorg::ReorgPlanner;
|
||||
use query::{QueryChunkMeta, compute_sort_key};
|
||||
use query::{compute_sort_key, QueryChunkMeta};
|
||||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
||||
use crate::db::catalog::chunk::CatalogChunk;
|
||||
|
|
Loading…
Reference in New Issue