diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index e369896f1c..c84b65727d 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -85,7 +85,10 @@ pub async fn compact_persisting_batch( adjust_sort_key_columns(&sk, &batch.data.schema().primary_key()) } None => { - let sort_key = compute_sort_key(&batch.data); + let sort_key = compute_sort_key( + batch.data.schema().as_ref(), + batch.data.data.iter().map(|sb| sb.data.as_ref()), + ); // Use the sort key computed from the cardinality as the sort key for this parquet // file's metadata, also return the sort key to be stored in the catalog (sort_key.clone(), Some(sort_key)) @@ -756,7 +759,10 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -797,7 +803,10 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -838,7 +847,10 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -884,7 +896,10 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -927,7 +942,10 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -984,7 +1002,10 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -1049,7 +1070,10 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -1107,7 +1131,10 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key(&compact_batch); + let sort_key = compute_sort_key( + &schema, + compact_batch.data.iter().map(|sb| sb.data.as_ref()), + ); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact diff --git a/ingester/src/sort_key.rs b/ingester/src/sort_key.rs index 07e46a1204..db3b694e8a 100644 --- a/ingester/src/sort_key.rs +++ b/ingester/src/sort_key.rs @@ -1,16 +1,14 @@ //! Functions for computing a sort key based on cardinality of primary key columns. -use crate::data::{QueryableBatch, SnapshotBatch}; use arrow::{ array::{Array, DictionaryArray, StringArray}, datatypes::{DataType, Int32Type}, record_batch::RecordBatch, }; use observability_deps::tracing::trace; -use query::QueryChunkMeta; use schema::{ sort::{SortKey, SortKeyBuilder}, - TIME_COLUMN_NAME, + Schema, TIME_COLUMN_NAME, }; use std::{ collections::{HashMap, HashSet}, @@ -18,16 +16,18 @@ use std::{ sync::Arc, }; -/// Given a `QueryableBatch`, compute a sort key based on: +/// Given a `Schema` and an iterator of `RecordBatch`es, compute a sort key based on: /// -/// - The columns that make up the primary key of the schema of this batch +/// - The columns that make up the primary key of the schema /// - Order those columns from low cardinality to high cardinality based on the data /// - Always have the time column last -pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey { - let schema = queryable_batch.schema(); +pub fn compute_sort_key<'a>( + schema: &Schema, + batches: impl Iterator, +) -> SortKey { let primary_key = schema.primary_key(); - let cardinalities = distinct_counts(&queryable_batch.data, &primary_key); + let cardinalities = distinct_counts(batches, &primary_key); trace!(cardinalities=?cardinalities, "cardinalities of of columns to compute sort key"); @@ -51,14 +51,14 @@ pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey { /// Takes batches of data and the columns that make up the primary key. Computes the number of /// distinct values for each primary key column across all batches, also known as "cardinality". /// Used to determine sort order. -fn distinct_counts( - batches: &[Arc], +fn distinct_counts<'a>( + batches: impl Iterator, primary_key: &[&str], ) -> HashMap { let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len()); for batch in batches { - for (column, distinct_values) in distinct_values(&batch.data, primary_key) { + for (column, distinct_values) in distinct_values(batch, primary_key) { let set = distinct_values_across_batches .entry(column) .or_insert_with(HashSet::new); @@ -179,29 +179,16 @@ pub fn adjust_sort_key_columns( #[cfg(test)] mod tests { use super::*; - use data_types2::SequenceNumber; - use schema::selection::Selection; + use schema::{merge::merge_record_batch_schemas, selection::Selection}; - fn lp_to_queryable_batch(line_protocol_batches: &[&str]) -> QueryableBatch { - let data = line_protocol_batches + fn lp_to_record_batches(line_protocol_batches: &[&str]) -> Vec> { + line_protocol_batches .iter() .map(|line_protocol| { let (_, mb) = mutable_batch_lp::test_helpers::lp_to_mutable_batch(line_protocol); - let rb = mb.to_arrow(Selection::All).unwrap(); - - Arc::new(SnapshotBatch { - min_sequencer_number: SequenceNumber::new(0), - max_sequencer_number: SequenceNumber::new(1), - data: Arc::new(rb), - }) + Arc::new(mb.to_arrow(Selection::All).unwrap()) }) - .collect(); - - QueryableBatch { - data, - delete_predicates: Default::default(), - table_name: Default::default(), - } + .collect() } #[test] @@ -212,8 +199,8 @@ mod tests { cpu,host=c,env=stage val=11 1 cpu,host=a,env=prod val=14 2 "#; - let qb = lp_to_queryable_batch(&[lp]); - let rb = &qb.data[0].data; + let rbs = lp_to_record_batches(&[lp]); + let rb = &rbs[0]; // Pass the tag field names plus time as the primary key, this is what should happen let distinct = distinct_values(rb, &["host", "env", "time"]); @@ -271,9 +258,10 @@ mod tests { cpu,host=a,env=dev val=23 5 cpu,host=b,env=dev val=2 6 "#; - let qb = lp_to_queryable_batch(&[lp1, lp2, lp3]); + let rbs = lp_to_record_batches(&[lp1, lp2, lp3]); + let schema = merge_record_batch_schemas(&rbs); - let sort_key = compute_sort_key(&qb); + let sort_key = compute_sort_key(schema.as_ref(), rbs.iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"])); }