feat: Collate chunks based on their partition hash id if they have it

pull/24376/head
Carol (Nichols || Goulding) 2023-07-13 16:18:28 -04:00
parent c2606ff3ac
commit a9b788b58f
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 197 additions and 20 deletions

View File

@ -1,7 +1,7 @@
//! QueryableParquetChunk for building query plan
use std::{any::Any, sync::Arc};
use data_types::{ChunkId, ChunkOrder, PartitionId};
use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId};
use datafusion::physical_plan::Statistics;
use iox_query::{util::create_basic_summary, QueryChunk, QueryChunkData};
use observability_deps::tracing::debug;
@ -17,6 +17,7 @@ pub struct QueryableParquetChunk {
// Data of the parquet file
data: Arc<ParquetChunk>,
partition_id: PartitionId,
transition_partition_id: TransitionPartitionId,
sort_key: Option<SortKey>,
order: ChunkOrder,
stats: Arc<Statistics>,
@ -26,6 +27,7 @@ impl QueryableParquetChunk {
/// Initialize a QueryableParquetChunk
pub fn new(
partition_id: PartitionId,
transition_partition_id: TransitionPartitionId,
data: Arc<ParquetChunk>,
sort_key: Option<SortKey>,
order: ChunkOrder,
@ -38,6 +40,7 @@ impl QueryableParquetChunk {
Self {
data,
partition_id,
transition_partition_id,
sort_key,
order,
stats,
@ -72,6 +75,10 @@ impl QueryChunk for QueryableParquetChunk {
self.partition_id
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
self.sort_key.as_ref()
}
@ -170,5 +177,11 @@ fn to_queryable_parquet_chunk(
);
let parquet_chunk = ParquetChunk::new(Arc::new(file.file.clone()), schema, store);
QueryableParquetChunk::new(partition_id, Arc::new(parquet_chunk), sort_key, file.order)
QueryableParquetChunk::new(
partition_id,
partition_info.transition_partition_id(),
Arc::new(parquet_chunk),
sort_key,
file.order,
)
}

View File

@ -194,7 +194,11 @@ impl PartitionData {
// is upheld by the FSM, which ensures only non-empty snapshots /
// RecordBatch are generated. Because `data` contains at least one
// RecordBatch, this invariant holds.
Some(QueryAdaptor::new(self.partition_id, data))
Some(QueryAdaptor::new(
self.partition_id,
self.transition_partition_id(),
data,
))
}
/// Snapshot and mark all buffered data as persisting.
@ -234,6 +238,7 @@ impl PartitionData {
let data = PersistingData::new(
QueryAdaptor::new(
self.partition_id,
self.transition_partition_id(),
fsm.get_query_data(&OwnedProjection::default()),
),
batch_ident,

View File

@ -109,7 +109,7 @@ mod tests {
use schema::Projection;
use super::*;
use crate::test_util::ARBITRARY_PARTITION_ID;
use crate::test_util::{ARBITRARY_PARTITION_ID, ARBITRARY_TRANSITION_PARTITION_ID};
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
// where if sending in a single row it would compact into an output of two batches, one of
@ -124,7 +124,11 @@ mod tests {
.to_arrow(Projection::All)
.unwrap();
let batch = QueryAdaptor::new(ARBITRARY_PARTITION_ID, vec![batch]);
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
vec![batch],
);
// verify PK
let schema = batch.schema();
@ -160,6 +164,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_one_record_batch_with_influxtype_no_duplicates().await,
);
@ -209,6 +214,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_different_cardinality().await,
);
@ -263,6 +269,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_different_cardinality().await,
);
@ -322,6 +329,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_different_cardinality().await,
);
@ -385,6 +393,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_different_cardinality().await,
);
@ -447,6 +456,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_one_row_record_batch_with_influxtype().await,
);
@ -487,6 +497,7 @@ mod tests {
// create input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_one_record_batch_with_influxtype_duplicates().await,
);
@ -535,6 +546,7 @@ mod tests {
// create many-batches input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype().await,
);
@ -581,6 +593,7 @@ mod tests {
// create many-batches input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_different_columns().await,
);
@ -631,6 +644,7 @@ mod tests {
// create many-batches input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_different_columns_different_order().await,
);
@ -684,6 +698,7 @@ mod tests {
// create many-batches input data
let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
ARBITRARY_TRANSITION_PARTITION_ID.clone(),
create_batches_with_influxtype_same_columns_different_type().await,
);

View File

@ -5,7 +5,7 @@ use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{ChunkId, ChunkOrder, PartitionId, TimestampMinMax};
use data_types::{ChunkId, ChunkOrder, PartitionId, TimestampMinMax, TransitionPartitionId};
use datafusion::physical_plan::Statistics;
use iox_query::{
util::{compute_timenanosecond_min_max, create_basic_summary},
@ -33,6 +33,9 @@ pub struct QueryAdaptor {
/// The catalog ID of the partition the this data is part of.
partition_id: PartitionId,
/// The identifier of the partition this data is part of.
transition_partition_id: TransitionPartitionId,
/// Chunk ID.
id: ChunkId,
@ -50,7 +53,11 @@ impl QueryAdaptor {
///
/// This constructor panics if `data` contains no [`RecordBatch`], or all
/// [`RecordBatch`] are empty.
pub(crate) fn new(partition_id: PartitionId, data: Vec<RecordBatch>) -> Self {
pub(crate) fn new(
partition_id: PartitionId,
transition_partition_id: TransitionPartitionId,
data: Vec<RecordBatch>,
) -> Self {
// There must always be at least one record batch and one row.
//
// This upholds an invariant that simplifies dealing with empty
@ -61,6 +68,7 @@ impl QueryAdaptor {
Self {
data,
partition_id,
transition_partition_id,
// To return a value for debugging and make it consistent with ChunkId created in Compactor,
// use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process.
id: ChunkId::new(),
@ -145,6 +153,10 @@ impl QueryChunk for QueryAdaptor {
self.partition_id
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
None // Ingester data is not sorted
}

View File

@ -3,7 +3,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{
partition_template::TablePartitionTemplateOverride, ColumnId, ColumnSet, NamespaceId,
ParquetFileParams, PartitionHashId, PartitionId, PartitionKey, SequenceNumber, TableId,
Timestamp,
Timestamp, TransitionPartitionId,
};
use hashbrown::HashSet;
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
@ -78,6 +78,8 @@ lazy_static! {
)));
pub(crate) static ref ARBITRARY_PARTITION_HASH_ID: PartitionHashId =
PartitionHashId::new(ARBITRARY_TABLE_ID, &ARBITRARY_PARTITION_KEY);
pub(crate) static ref ARBITRARY_TRANSITION_PARTITION_ID: TransitionPartitionId =
TransitionPartitionId::Deterministic(ARBITRARY_PARTITION_HASH_ID.clone());
}
/// Build a [`PartitionData`] with mostly arbitrary-yet-valid values for tests.

View File

@ -20,7 +20,7 @@ use arrow::{
record_batch::RecordBatch,
};
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, PartitionId};
use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics, prelude::SessionContext};
use exec::IOxSessionContext;
use hashbrown::HashMap;
@ -71,6 +71,9 @@ pub trait QueryChunk: Debug + Send + Sync + 'static {
/// Return partition id for this chunk
fn partition_id(&self) -> PartitionId;
/// Return partition identifier for this chunk
fn transition_partition_id(&self) -> &TransitionPartitionId;
/// return a reference to the sort key if any
fn sort_key(&self) -> Option<&SortKey>;
@ -248,6 +251,10 @@ where
self.as_ref().partition_id()
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
self.as_ref().transition_partition_id()
}
fn sort_key(&self) -> Option<&SortKey> {
self.as_ref().sort_key()
}
@ -291,6 +298,10 @@ impl QueryChunk for Arc<dyn QueryChunk> {
self.as_ref().partition_id()
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
self.as_ref().transition_partition_id()
}
fn sort_key(&self) -> Option<&SortKey> {
self.as_ref().sort_key()
}

View File

@ -1,6 +1,5 @@
use std::sync::Arc;
use data_types::PartitionId;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
@ -41,11 +40,11 @@ impl PhysicalOptimizerRule for PartitionSplit {
return Ok(Transformed::No(plan));
};
let mut chunks_by_partition: HashMap<PartitionId, Vec<Arc<dyn QueryChunk>>> =
let mut chunks_by_partition: HashMap<_, Vec<Arc<dyn QueryChunk>>> =
Default::default();
for chunk in chunks {
chunks_by_partition
.entry(chunk.partition_id())
.entry(chunk.transition_partition_id().clone())
.or_default()
.push(chunk);
}
@ -111,12 +110,12 @@ impl PhysicalOptimizerRule for PartitionSplit {
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_optimizer::{
dedup::test_util::{chunk, dedup_plan},
test_util::OptimizationTest,
};
use super::*;
use data_types::{PartitionHashId, PartitionId, TransitionPartitionId};
#[test]
fn test_no_chunks() {
@ -203,6 +202,70 @@ mod tests {
);
}
#[test]
fn test_different_partitions_with_and_without_hash_ids() {
// Partition without hash ID in the catalog
let legacy_partition_id = 1;
let legacy_transition_partition_id =
TransitionPartitionId::Deprecated(PartitionId::new(legacy_partition_id));
// Partition with hash ID in the catalog
let partition_id = 2;
let transition_partition_id =
TransitionPartitionId::Deterministic(PartitionHashId::arbitrary_for_testing());
let chunk1 = chunk(1)
.with_partition_id(legacy_partition_id)
.with_transition_partition_id(legacy_transition_partition_id.clone());
let chunk2 = chunk(2)
.with_partition_id(partition_id)
.with_transition_partition_id(transition_partition_id.clone());
let chunk3 = chunk(3)
.with_dummy_parquet_file()
.with_partition_id(legacy_partition_id)
.with_transition_partition_id(legacy_transition_partition_id.clone());
let chunk4 = chunk(4)
.with_dummy_parquet_file()
.with_partition_id(partition_id)
.with_transition_partition_id(transition_partition_id);
let chunk5 = chunk(5)
.with_dummy_parquet_file()
.with_partition_id(legacy_partition_id)
.with_transition_partition_id(legacy_transition_partition_id.clone());
let chunk6 = chunk(6)
.with_dummy_parquet_file()
.with_partition_id(legacy_partition_id)
.with_transition_partition_id(legacy_transition_partition_id);
let schema = chunk1.schema().clone();
let plan = dedup_plan(schema, vec![chunk1, chunk2, chunk3, chunk4, chunk5, chunk6]);
let opt = PartitionSplit;
let mut config = ConfigOptions::default();
config.execution.target_partitions = 2;
insta::assert_yaml_snapshot!(
OptimizationTest::new_with_config(plan, opt, &config),
@r###"
---
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]"
output:
Ok:
- " UnionExec"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 6.parquet], [5.parquet]]}, projection=[field, tag1, tag2, time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]"
"###
);
}
#[test]
fn test_max_split() {
let chunk1 = chunk(1).with_partition_id(1);

View File

@ -20,7 +20,9 @@ use arrow::{
record_batch::RecordBatch,
};
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, PartitionId};
use data_types::{
ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TableId, TransitionPartitionId,
};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
@ -338,6 +340,7 @@ pub struct TestChunk {
id: ChunkId,
partition_id: PartitionId,
transition_partition_id: TransitionPartitionId,
/// Set the flag if this chunk might contain duplicates
may_contain_pk_duplicates: bool,
@ -418,6 +421,10 @@ impl TestChunk {
order: ChunkOrder::MIN,
sort_key: None,
partition_id: PartitionId::new(0),
transition_partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new(
TableId::new(0),
&PartitionKey::from("arbitrary"),
)),
quiet: false,
}
}
@ -486,6 +493,15 @@ impl TestChunk {
pub fn with_partition_id(mut self, id: i64) -> Self {
self.partition_id = PartitionId::new(id);
self.transition_partition_id = TransitionPartitionId::Deterministic(PartitionHashId::new(
TableId::new(id),
&PartitionKey::from("arbitrary"),
));
self
}
pub fn with_transition_partition_id(mut self, id: TransitionPartitionId) -> Self {
self.transition_partition_id = id;
self
}
@ -1098,6 +1114,10 @@ impl QueryChunk for TestChunk {
self.partition_id
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
self.sort_key.as_ref()
}

View File

@ -12,7 +12,9 @@ use arrow_flight::decode::DecodedPayload;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig, BackoffError};
use client_util::connection;
use data_types::{ChunkId, ChunkOrder, NamespaceId, PartitionHashId, PartitionId};
use data_types::{
ChunkId, ChunkOrder, NamespaceId, PartitionHashId, PartitionId, TransitionPartitionId,
};
use datafusion::physical_plan::Statistics;
use futures::{stream::FuturesUnordered, TryStreamExt};
use ingester_query_grpc::{
@ -825,6 +827,7 @@ impl IngesterPartition {
let chunk = IngesterChunk {
chunk_id,
partition_id: self.partition_id,
transition_partition_id: self.transition_partition_id(),
schema: expected_schema,
batches,
stats: None,
@ -860,6 +863,13 @@ impl IngesterPartition {
self.partition_id
}
pub(crate) fn transition_partition_id(&self) -> TransitionPartitionId {
self.partition_hash_id
.clone()
.map(TransitionPartitionId::Deterministic)
.unwrap_or_else(|| TransitionPartitionId::Deprecated(self.partition_id))
}
pub(crate) fn ingester_uuid(&self) -> Uuid {
self.ingester_uuid
}
@ -881,6 +891,7 @@ impl IngesterPartition {
pub struct IngesterChunk {
chunk_id: ChunkId,
partition_id: PartitionId,
transition_partition_id: TransitionPartitionId,
schema: Schema,
/// The raw table data
@ -928,6 +939,10 @@ impl QueryChunk for IngesterChunk {
self.partition_id
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
// Data is not sorted
None

View File

@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId};
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId, TransitionPartitionId};
use futures::StreamExt;
use hashbrown::HashSet;
use iox_catalog::interface::Catalog;
@ -170,11 +170,20 @@ impl ChunkAdapter {
let order = ChunkOrder::new(parquet_file.file.max_l0_created_at.get());
let partition_id = parquet_file.file.partition_id;
let transition_partition_id = parquet_file
.file
.partition_hash_id
.clone()
.map(TransitionPartitionId::Deterministic)
.unwrap_or_else(|| TransitionPartitionId::Deprecated(partition_id));
let meta = Arc::new(QuerierParquetChunkMeta {
chunk_id,
order,
sort_key: Some(sort_key),
partition_id: parquet_file.file.partition_id,
partition_id,
transition_partition_id,
});
let parquet_chunk = Arc::new(ParquetChunk::new(

View File

@ -1,6 +1,6 @@
//! Querier Chunks
use data_types::{ChunkId, ChunkOrder, PartitionId};
use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId};
use datafusion::physical_plan::Statistics;
use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges};
use parquet_file::chunk::ParquetChunk;
@ -26,6 +26,9 @@ pub struct QuerierParquetChunkMeta {
/// Partition ID.
partition_id: PartitionId,
/// Transition partition ID.
transition_partition_id: TransitionPartitionId,
}
impl QuerierParquetChunkMeta {
@ -43,6 +46,11 @@ impl QuerierParquetChunkMeta {
pub fn partition_id(&self) -> PartitionId {
self.partition_id
}
/// Partition ID.
pub fn transition_partition_id(&self) -> &TransitionPartitionId {
&self.transition_partition_id
}
}
#[derive(Debug)]

View File

@ -1,5 +1,5 @@
use crate::parquet::QuerierParquetChunk;
use data_types::{ChunkId, ChunkOrder, PartitionId};
use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId};
use datafusion::physical_plan::Statistics;
use iox_query::{QueryChunk, QueryChunkData};
use schema::{sort::SortKey, Schema};
@ -18,6 +18,10 @@ impl QueryChunk for QuerierParquetChunk {
self.meta().partition_id()
}
fn transition_partition_id(&self) -> &TransitionPartitionId {
self.meta().transition_partition_id()
}
fn sort_key(&self) -> Option<&SortKey> {
self.meta().sort_key()
}