feat: Get the sort key from the schema and data in the QueryableBatch

Connects to #4194.
pull/24376/head
Carol (Nichols || Goulding) 2022-03-31 15:06:00 -04:00
parent 9043966443
commit 9d83554f20
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 51 additions and 13 deletions

View File

@ -1,6 +1,9 @@
//! This module is responsible for compacting Ingester's data //! This module is responsible for compacting Ingester's data
use crate::data::{PersistingBatch, QueryableBatch}; use crate::{
data::{PersistingBatch, QueryableBatch},
sort_key::compute_sort_key,
};
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use data_types2::NamespaceId; use data_types2::NamespaceId;
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
@ -10,8 +13,9 @@ use query::{
exec::{Executor, ExecutorType}, exec::{Executor, ExecutorType},
frontend::reorg::ReorgPlanner, frontend::reorg::ReorgPlanner,
util::compute_timenanosecond_min_max, util::compute_timenanosecond_min_max,
QueryChunkMeta, QueryChunk, QueryChunkMeta,
}; };
use schema::sort::SortKey;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::sync::Arc; use std::sync::Arc;
use time::{Time, TimeProvider}; use time::{Time, TimeProvider};
@ -67,8 +71,11 @@ pub async fn compact_persisting_batch(
return Ok(None); return Ok(None);
} }
// Get sort key based on cardinality
let sort_key = compute_sort_key(&batch.data);
// Compact // Compact
let stream = compact(executor, Arc::clone(&batch.data)).await?; let stream = compact(executor, Arc::clone(&batch.data), sort_key.clone()).await?;
// Collect compacted data into record batches for computing statistics // Collect compacted data into record batches for computing statistics
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
@ -106,7 +113,7 @@ pub async fn compact_persisting_batch(
max_sequence_number: max_seq, max_sequence_number: max_seq,
row_count, row_count,
compaction_level: INITIAL_COMPACTION_LEVEL, compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: None, sort_key: Some(sort_key),
}; };
Ok(Some((output_batches, meta))) Ok(Some((output_batches, meta)))
@ -116,11 +123,12 @@ pub async fn compact_persisting_batch(
pub async fn compact( pub async fn compact(
executor: &Executor, executor: &Executor,
data: Arc<QueryableBatch>, data: Arc<QueryableBatch>,
sort_key: SortKey,
) -> Result<SendableRecordBatchStream> { ) -> Result<SendableRecordBatchStream> {
// Build logical plan for compaction // Build logical plan for compaction
let ctx = executor.new_context(ExecutorType::Reorg); let ctx = executor.new_context(ExecutorType::Reorg);
let logical_plan = ReorgPlanner::new() let logical_plan = ReorgPlanner::new()
.scan_single_chunk_plan(data.schema(), data) .compact_plan(data.schema(), [data as Arc<dyn QueryChunk>], sort_key)
.context(LogicalPlanSnafu {})?; .context(LogicalPlanSnafu {})?;
// Build physical plan // Build physical plan
@ -322,9 +330,11 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -360,9 +370,11 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -398,9 +410,11 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -441,9 +455,11 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -481,9 +497,11 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -535,9 +553,11 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -597,9 +617,11 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();
@ -652,9 +674,11 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch);
// compact // compact
let exc = Executor::new(1); let exc = Executor::new(1);
let stream = compact(&exc, compact_batch).await.unwrap(); let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream) let output_batches = datafusion::physical_plan::common::collect(stream)
.await .await
.unwrap(); .unwrap();

View File

@ -24,6 +24,7 @@ mod poison;
pub mod querier_handler; pub mod querier_handler;
pub mod query; pub mod query;
pub mod server; pub mod server;
pub mod sort_key;
#[cfg(test)] #[cfg(test)]
pub mod test_util; pub mod test_util;

13
ingester/src/sort_key.rs Normal file
View File

@ -0,0 +1,13 @@
//! Functions for computing a sort key based on cardinality of primary key columns.
use crate::data::QueryableBatch;
use schema::sort::SortKey;
/// Given a `QueryableBatch`, compute a sort key based on:
///
/// - The columns that make up the primary key of the schema of this batch
/// - Order those columns from low cardinality to high cardinality based on the data
/// - Always have the time column last
pub fn compute_sort_key(_queryable_batch: &QueryableBatch) -> SortKey {
unimplemented!()
}