refactor: address review comments

pull/24376/head
Nga Tran 2021-06-09 14:17:47 -04:00
parent 3d50ff7a60
commit c1c58018fc
4 changed files with 118 additions and 61 deletions

View File

@ -124,7 +124,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
) -> Result<SendableRecordBatchStream, Self::Error>;
/// Returns true if data of this chunk is sorted
fn is_sorted(&self) -> bool;
fn is_sorted_on_pk(&self) -> bool;
}
#[async_trait]

View File

@ -534,7 +534,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
) -> Result<Arc<dyn ExecutionPlan>> {
// Create the bottom node IOxRedFilterNode for this chunk
// Create the bottom node IOxReadFilterNode for this chunk
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
@ -559,28 +559,29 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
chunk: Arc<C>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
if !chunk.is_sorted() {
let key_summaries = chunk.summary().primary_key_columns();
// build sort expression
let mut sort_exprs = vec![];
for key in key_summaries {
sort_exprs.push(PhysicalSortExpr {
expr: col(key.name.as_str()),
options: SortOptions {
descending: false,
nulls_first: false,
},
});
}
// Create SortExec operator
Ok(Arc::new(
SortExec::try_new(sort_exprs, input).context(InternalSort)?,
))
} else {
Ok(input)
if chunk.is_sorted_on_pk() {
return Ok(input);
}
// Sort the chunk on pk
let key_summaries = chunk.summary().primary_key_columns();
// build sort expression
let mut sort_exprs = vec![];
for key in key_summaries {
sort_exprs.push(PhysicalSortExpr {
expr: col(key.name.as_str()),
options: SortOptions {
descending: false,
nulls_first: false,
},
});
}
// Create SortExec operator
Ok(Arc::new(
SortExec::try_new(sort_exprs, input).context(InternalSort)?,
))
}
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
@ -637,10 +638,9 @@ impl<C: PartitionChunk> ChunkPruner<C> for NoOpPruner {
#[cfg(test)]
mod test {
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow_util::assert_batches_eq;
use datafusion::physical_plan::collect;
use internal_types::schema::TIME_COLUMN_NAME;
use internal_types::selection::Selection;
use crate::test::TestChunk;
@ -682,7 +682,61 @@ mod test {
}
#[tokio::test]
async fn sort_planning() {
async fn sort_planning_one_tag_with_time() {
// Chunk 1 with 5 rows of data
let chunk = Arc::new(
TestChunk::new(1)
.with_time_column("t")
.with_int_field_column("t", "field_int")
.with_tag_column("t", "tag1")
.with_five_rows_of_data("t"),
);
// Datafusion schema of the chunk
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
// IOx scan operator
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
Arc::from("t"),
schema,
vec![Arc::clone(&chunk)],
Predicate::default(),
));
let batch = collect(Arc::clone(&input)).await.unwrap();
// data in its original non-sorted form
let expected = vec![
"+------+-----------+-------------------------------+",
"| tag1 | field_int | time |",
"+------+-----------+-------------------------------+",
"| MT | 1000 | 1970-01-01 00:00:00.000001 |",
"| MT | 10 | 1970-01-01 00:00:00.000007 |",
"| CT | 70 | 1970-01-01 00:00:00.000000100 |",
"| AL | 100 | 1970-01-01 00:00:00.000000050 |",
"| MT | 5 | 1970-01-01 00:00:00.000005 |",
"+------+-----------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
// Add Sort operator on top of IOx scan
let sort_plan = Deduplicater::build_sort_plan(chunk, input);
let batch = collect(sort_plan.unwrap()).await.unwrap();
// data is not sorted on primary key(tag1, tag2, time)
let expected = vec![
"+------+-----------+-------------------------------+",
"| tag1 | field_int | time |",
"+------+-----------+-------------------------------+",
"| AL | 100 | 1970-01-01 00:00:00.000000050 |",
"| CT | 70 | 1970-01-01 00:00:00.000000100 |",
"| MT | 1000 | 1970-01-01 00:00:00.000001 |",
"| MT | 5 | 1970-01-01 00:00:00.000005 |",
"| MT | 10 | 1970-01-01 00:00:00.000007 |",
"+------+-----------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
}
#[tokio::test]
async fn sort_planning_two_tags_with_time() {
// Chunk 1 with 5 rows of data
let chunk = Arc::new(
TestChunk::new(1)
@ -690,28 +744,11 @@ mod test {
.with_int_field_column("t", "field_int")
.with_tag_column("t", "tag2")
.with_tag_column("t", "tag1")
.with_five_row_of_null_data("t"),
.with_five_rows_of_data("t"),
);
// Datafusion schema of the chunk
let schema = Arc::new(Schema::new(vec![
Field::new(
"tag1",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
),
Field::new(
"tag2",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
),
Field::new("field_int", DataType::Int64, true),
Field::new(
TIME_COLUMN_NAME,
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
]));
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
// IOx scan operator
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
@ -726,11 +763,11 @@ mod test {
"+------+------+-----------+-------------------------------+",
"| tag1 | tag2 | field_int | time |",
"+------+------+-----------+-------------------------------+",
"| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
"| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
"| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |",
"| MT | AL | 10 | 1970-01-01 00:00:00.000007 |",
"| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
"| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
"| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
"| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |",
"| MT | AL | 5 | 1970-01-01 00:00:00.000005 |",
"+------+------+-----------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
@ -743,11 +780,11 @@ mod test {
"+------+------+-----------+-------------------------------+",
"| tag1 | tag2 | field_int | time |",
"+------+------+-----------+-------------------------------+",
"| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
"| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |",
"| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
"| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
"| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
"| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
"| MT | AL | 5 | 1970-01-01 00:00:00.000005 |",
"| MT | AL | 10 | 1970-01-01 00:00:00.000007 |",
"| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |",
"+------+------+-----------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);

View File

@ -380,7 +380,7 @@ impl TestChunk {
/// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
/// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
/// "+------+------+-----------+-------------------------------+",
pub fn with_five_row_of_null_data(mut self, _table_name: impl Into<String>) -> Self {
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
//let table_name = table_name.into();
let schema = self
.table_schema
@ -395,7 +395,14 @@ impl TestChunk {
Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef
}
DataType::Utf8 => {
Arc::new(StringArray::from(vec!["MA", "MT", "CT", "AL", "MT"])) as ArrayRef
match field.name().as_str() {
"tag1" => Arc::new(StringArray::from(vec!["MT", "MT", "CT", "AL", "MT"]))
as ArrayRef,
"tag2" => Arc::new(StringArray::from(vec!["CT", "AL", "CT", "MA", "AL"]))
as ArrayRef,
_ => Arc::new(StringArray::from(vec!["CT", "MT", "AL", "AL", "MT"]))
as ArrayRef,
}
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None),
@ -403,9 +410,23 @@ impl TestChunk {
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
let dict: DictionaryArray<Int32Type> =
vec!["MA", "MT", "CT", "AL", "MT"].into_iter().collect();
Arc::new(dict) as ArrayRef
match field.name().as_str() {
"tag1" => Arc::new(
vec!["MT", "MT", "CT", "AL", "MT"]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef,
"tag2" => Arc::new(
vec!["CT", "AL", "CT", "MA", "AL"]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef,
_ => Arc::new(
vec!["CT", "MT", "AL", "AL", "MT"]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef,
}
}
_ => unimplemented!(
"Unimplemented data type for test database: {:?}",
@ -415,7 +436,6 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
println!("TestChunk batch data: {:#?}", batch);
self.table_data.push(Arc::new(batch));
self
@ -478,7 +498,7 @@ impl PartitionChunk for TestChunk {
}
/// Returns true if data of this chunk is sorted
fn is_sorted(&self) -> bool {
fn is_sorted_on_pk(&self) -> bool {
false
}

View File

@ -451,7 +451,7 @@ impl PartitionChunk for DbChunk {
}
// TODOs: return the right value. For now the chunk is assumed to be not sorted
fn is_sorted(&self) -> bool {
fn is_sorted_on_pk(&self) -> bool {
match &self.state {
State::MutableBuffer { .. } => false,
State::ReadBuffer { .. } => false,