fix: Have QueryChunk return a TransitionPartitionId

Thus using the PartitionHashId if one is available. This does not
compile yet because of all the uses of QueryChunk, but iox_query
compiles and passes its tests.
pull/24376/head
Carol (Nichols || Goulding) 2023-07-10 16:45:20 -04:00
parent 6146c1114c
commit fd147f871b
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
4 changed files with 37 additions and 69 deletions

View File

@ -342,7 +342,7 @@ mod test {
let chunk1 = Arc::new( let chunk1 = Arc::new(
TestChunk::new("t") TestChunk::new("t")
.with_order(1) .with_order(1)
.with_partition_id(1) .with_partition(1)
.with_time_column_with_stats(Some(50), Some(max_time)) .with_time_column_with_stats(Some(50), Some(max_time))
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) .with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
.with_i64_field_column("field_int") .with_i64_field_column("field_int")
@ -353,7 +353,7 @@ mod test {
let chunk2 = Arc::new( let chunk2 = Arc::new(
TestChunk::new("t") TestChunk::new("t")
.with_order(2) .with_order(2)
.with_partition_id(1) .with_partition(1)
.with_time_column_with_stats(Some(28000), Some(220000)) .with_time_column_with_stats(Some(28000), Some(220000))
.with_tag_column_with_stats("tag1", Some("UT"), Some("WA")) .with_tag_column_with_stats("tag1", Some("UT"), Some("WA"))
.with_i64_field_column("field_int") .with_i64_field_column("field_int")

View File

@ -20,7 +20,7 @@ use arrow::{
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId}; use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics, prelude::SessionContext}; use datafusion::{error::DataFusionError, physical_plan::Statistics, prelude::SessionContext};
use exec::IOxSessionContext; use exec::IOxSessionContext;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -68,11 +68,8 @@ pub trait QueryChunk: Debug + Send + Sync + 'static {
/// return a reference to the summary of the data held in this chunk /// return a reference to the summary of the data held in this chunk
fn schema(&self) -> &Schema; fn schema(&self) -> &Schema;
/// Return partition id for this chunk
fn partition_id(&self) -> &PartitionId;
/// Return partition identifier for this chunk /// Return partition identifier for this chunk
fn transition_partition_id(&self) -> &TransitionPartitionId; fn partition_id(&self) -> &TransitionPartitionId;
/// return a reference to the sort key if any /// return a reference to the sort key if any
fn sort_key(&self) -> Option<&SortKey>; fn sort_key(&self) -> Option<&SortKey>;
@ -247,14 +244,10 @@ where
self.as_ref().schema() self.as_ref().schema()
} }
fn partition_id(&self) -> &PartitionId { fn partition_id(&self) -> &TransitionPartitionId {
self.as_ref().partition_id() self.as_ref().partition_id()
} }
fn transition_partition_id(&self) -> &TransitionPartitionId {
self.as_ref().transition_partition_id()
}
fn sort_key(&self) -> Option<&SortKey> { fn sort_key(&self) -> Option<&SortKey> {
self.as_ref().sort_key() self.as_ref().sort_key()
} }
@ -294,14 +287,10 @@ impl QueryChunk for Arc<dyn QueryChunk> {
self.as_ref().schema() self.as_ref().schema()
} }
fn partition_id(&self) -> &PartitionId { fn partition_id(&self) -> &TransitionPartitionId {
self.as_ref().partition_id() self.as_ref().partition_id()
} }
fn transition_partition_id(&self) -> &TransitionPartitionId {
self.as_ref().transition_partition_id()
}
fn sort_key(&self) -> Option<&SortKey> { fn sort_key(&self) -> Option<&SortKey> {
self.as_ref().sort_key() self.as_ref().sort_key()
} }

View File

@ -1,5 +1,9 @@
use std::sync::Arc; use crate::{
config::IoxConfigExt,
physical_optimizer::chunk_extraction::extract_chunks,
provider::{chunks_to_physical_nodes, DeduplicateExec},
QueryChunk,
};
use datafusion::{ use datafusion::{
common::tree_node::{Transformed, TreeNode}, common::tree_node::{Transformed, TreeNode},
config::ConfigOptions, config::ConfigOptions,
@ -9,13 +13,7 @@ use datafusion::{
}; };
use hashbrown::HashMap; use hashbrown::HashMap;
use observability_deps::tracing::warn; use observability_deps::tracing::warn;
use std::sync::Arc;
use crate::{
config::IoxConfigExt,
physical_optimizer::chunk_extraction::extract_chunks,
provider::{chunks_to_physical_nodes, DeduplicateExec},
QueryChunk,
};
/// Split de-duplication operations based on partitons. /// Split de-duplication operations based on partitons.
/// ///
@ -44,7 +42,7 @@ impl PhysicalOptimizerRule for PartitionSplit {
Default::default(); Default::default();
for chunk in chunks { for chunk in chunks {
chunks_by_partition chunks_by_partition
.entry(chunk.transition_partition_id().clone()) .entry(chunk.partition_id().clone())
.or_default() .or_default()
.push(chunk); .push(chunk);
} }
@ -166,13 +164,13 @@ mod tests {
#[test] #[test]
fn test_different_partitions() { fn test_different_partitions() {
let chunk1 = chunk(1).with_partition_id(1); let chunk1 = chunk(1).with_partition(1);
let chunk2 = chunk(2).with_partition_id(2); let chunk2 = chunk(2).with_partition(2);
// use at least 3 parquet files for one of the two partitions to validate that `target_partitions` is forwared correctly // use at least 3 parquet files for one of the two partitions to validate that `target_partitions` is forwared correctly
let chunk3 = chunk(3).with_dummy_parquet_file().with_partition_id(1); let chunk3 = chunk(3).with_dummy_parquet_file().with_partition(1);
let chunk4 = chunk(4).with_dummy_parquet_file().with_partition_id(2); let chunk4 = chunk(4).with_dummy_parquet_file().with_partition(2);
let chunk5 = chunk(5).with_dummy_parquet_file().with_partition_id(1); let chunk5 = chunk(5).with_dummy_parquet_file().with_partition(1);
let chunk6 = chunk(6).with_dummy_parquet_file().with_partition_id(1); let chunk6 = chunk(6).with_dummy_parquet_file().with_partition(1);
let schema = chunk1.schema().clone(); let schema = chunk1.schema().clone();
let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3, chunk4, chunk5, chunk6]); let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3, chunk4, chunk5, chunk6]);
let opt = PartitionSplit; let opt = PartitionSplit;
@ -210,33 +208,24 @@ mod tests {
TransitionPartitionId::Deprecated(PartitionId::new(legacy_partition_id)); TransitionPartitionId::Deprecated(PartitionId::new(legacy_partition_id));
// Partition with hash ID in the catalog // Partition with hash ID in the catalog
let partition_id = 2;
let transition_partition_id = let transition_partition_id =
TransitionPartitionId::Deterministic(PartitionHashId::arbitrary_for_testing()); TransitionPartitionId::Deterministic(PartitionHashId::arbitrary_for_testing());
let chunk1 = chunk(1) let chunk1 = chunk(1).with_partition_id(legacy_transition_partition_id.clone());
.with_partition_id(legacy_partition_id) let chunk2 = chunk(2).with_partition_id(transition_partition_id.clone());
.with_transition_partition_id(legacy_transition_partition_id.clone());
let chunk2 = chunk(2)
.with_partition_id(partition_id)
.with_transition_partition_id(transition_partition_id.clone());
let chunk3 = chunk(3) let chunk3 = chunk(3)
.with_dummy_parquet_file() .with_dummy_parquet_file()
.with_partition_id(legacy_partition_id) .with_partition_id(legacy_transition_partition_id.clone());
.with_transition_partition_id(legacy_transition_partition_id.clone());
let chunk4 = chunk(4) let chunk4 = chunk(4)
.with_dummy_parquet_file() .with_dummy_parquet_file()
.with_partition_id(partition_id) .with_partition_id(transition_partition_id.clone());
.with_transition_partition_id(transition_partition_id);
let chunk5 = chunk(5) let chunk5 = chunk(5)
.with_dummy_parquet_file() .with_dummy_parquet_file()
.with_partition_id(legacy_partition_id) .with_partition_id(legacy_transition_partition_id.clone());
.with_transition_partition_id(legacy_transition_partition_id.clone());
let chunk6 = chunk(6) let chunk6 = chunk(6)
.with_dummy_parquet_file() .with_dummy_parquet_file()
.with_partition_id(legacy_partition_id) .with_partition_id(legacy_transition_partition_id.clone());
.with_transition_partition_id(legacy_transition_partition_id);
let schema = chunk1.schema().clone(); let schema = chunk1.schema().clone();
let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3, chunk4, chunk5, chunk6]); let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3, chunk4, chunk5, chunk6]);
let opt = PartitionSplit; let opt = PartitionSplit;
@ -268,9 +257,9 @@ mod tests {
#[test] #[test]
fn test_max_split() { fn test_max_split() {
let chunk1 = chunk(1).with_partition_id(1); let chunk1 = chunk(1).with_partition(1);
let chunk2 = chunk(2).with_partition_id(2); let chunk2 = chunk(2).with_partition(2);
let chunk3 = chunk(3).with_partition_id(3); let chunk3 = chunk(3).with_partition(3);
let schema = chunk1.schema().clone(); let schema = chunk1.schema().clone();
let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3]); let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3]);
let opt = PartitionSplit; let opt = PartitionSplit;

View File

@ -21,7 +21,7 @@ use arrow::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{ use data_types::{
ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TableId, TransitionPartitionId, ChunkId, ChunkOrder, PartitionHashId, PartitionKey, TableId, TransitionPartitionId,
}; };
use datafusion::error::DataFusionError; use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState; use datafusion::execution::context::SessionState;
@ -339,8 +339,7 @@ pub struct TestChunk {
id: ChunkId, id: ChunkId,
partition_id: PartitionId, partition_id: TransitionPartitionId,
transition_partition_id: TransitionPartitionId,
/// Set the flag if this chunk might contain duplicates /// Set the flag if this chunk might contain duplicates
may_contain_pk_duplicates: bool, may_contain_pk_duplicates: bool,
@ -420,11 +419,7 @@ impl TestChunk {
saved_error: Default::default(), saved_error: Default::default(),
order: ChunkOrder::MIN, order: ChunkOrder::MIN,
sort_key: None, sort_key: None,
partition_id: PartitionId::new(0), partition_id: TransitionPartitionId::arbitrary_for_testing(),
transition_partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new(
TableId::new(0),
&PartitionKey::from("arbitrary"),
)),
quiet: false, quiet: false,
} }
} }
@ -491,17 +486,16 @@ impl TestChunk {
self self
} }
pub fn with_partition_id(mut self, id: i64) -> Self { pub fn with_partition(mut self, id: i64) -> Self {
self.partition_id = PartitionId::new(id); self.partition_id = TransitionPartitionId::Deterministic(PartitionHashId::new(
self.transition_partition_id = TransitionPartitionId::Deterministic(PartitionHashId::new(
TableId::new(id), TableId::new(id),
&PartitionKey::from("arbitrary"), &PartitionKey::from("arbitrary"),
)); ));
self self
} }
pub fn with_transition_partition_id(mut self, id: TransitionPartitionId) -> Self { pub fn with_partition_id(mut self, id: TransitionPartitionId) -> Self {
self.transition_partition_id = id; self.partition_id = id;
self self
} }
@ -1110,14 +1104,10 @@ impl QueryChunk for TestChunk {
&self.schema &self.schema
} }
fn partition_id(&self) -> &PartitionId { fn partition_id(&self) -> &TransitionPartitionId {
&self.partition_id &self.partition_id
} }
fn transition_partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
fn sort_key(&self) -> Option<&SortKey> { fn sort_key(&self) -> Option<&SortKey> {
self.sort_key.as_ref() self.sort_key.as_ref()
} }