refactor: Make compute_sort_key more general than the ingester

Enable computing sort keys for a schema and an iterator of record
batches.
pull/24376/head
Carol (Nichols || Goulding) 2022-04-07 14:05:28 -04:00
parent 941dcc8e80
commit a053077a05
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 57 additions and 42 deletions

View File

@ -85,7 +85,10 @@ pub async fn compact_persisting_batch(
adjust_sort_key_columns(&sk, &batch.data.schema().primary_key()) adjust_sort_key_columns(&sk, &batch.data.schema().primary_key())
} }
None => { None => {
let sort_key = compute_sort_key(&batch.data); let sort_key = compute_sort_key(
batch.data.schema().as_ref(),
batch.data.data.iter().map(|sb| sb.data.as_ref()),
);
// Use the sort key computed from the cardinality as the sort key for this parquet // Use the sort key computed from the cardinality as the sort key for this parquet
// file's metadata, also return the sort key to be stored in the catalog // file's metadata, also return the sort key to be stored in the catalog
(sort_key.clone(), Some(sort_key)) (sort_key.clone(), Some(sort_key))
@ -756,7 +759,10 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact // compact
@ -797,7 +803,10 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact // compact
@ -838,7 +847,10 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact // compact
@ -884,7 +896,10 @@ mod tests {
let expected_pk = vec!["tag1", "time"]; let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact // compact
@ -927,7 +942,10 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact // compact
@ -984,7 +1002,10 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact // compact
@ -1049,7 +1070,10 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact // compact
@ -1107,7 +1131,10 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"]; let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk); assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(&compact_batch); let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact // compact

View File

@ -1,16 +1,14 @@
//! Functions for computing a sort key based on cardinality of primary key columns. //! Functions for computing a sort key based on cardinality of primary key columns.
use crate::data::{QueryableBatch, SnapshotBatch};
use arrow::{ use arrow::{
array::{Array, DictionaryArray, StringArray}, array::{Array, DictionaryArray, StringArray},
datatypes::{DataType, Int32Type}, datatypes::{DataType, Int32Type},
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use observability_deps::tracing::trace; use observability_deps::tracing::trace;
use query::QueryChunkMeta;
use schema::{ use schema::{
sort::{SortKey, SortKeyBuilder}, sort::{SortKey, SortKeyBuilder},
TIME_COLUMN_NAME, Schema, TIME_COLUMN_NAME,
}; };
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -18,16 +16,18 @@ use std::{
sync::Arc, sync::Arc,
}; };
/// Given a `QueryableBatch`, compute a sort key based on: /// Given a `Schema` and an iterator of `RecordBatch`es, compute a sort key based on:
/// ///
/// - The columns that make up the primary key of the schema of this batch /// - The columns that make up the primary key of the schema
/// - Order those columns from low cardinality to high cardinality based on the data /// - Order those columns from low cardinality to high cardinality based on the data
/// - Always have the time column last /// - Always have the time column last
pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey { pub fn compute_sort_key<'a>(
let schema = queryable_batch.schema(); schema: &Schema,
batches: impl Iterator<Item = &'a RecordBatch>,
) -> SortKey {
let primary_key = schema.primary_key(); let primary_key = schema.primary_key();
let cardinalities = distinct_counts(&queryable_batch.data, &primary_key); let cardinalities = distinct_counts(batches, &primary_key);
trace!(cardinalities=?cardinalities, "cardinalities of of columns to compute sort key"); trace!(cardinalities=?cardinalities, "cardinalities of of columns to compute sort key");
@ -51,14 +51,14 @@ pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey {
/// Takes batches of data and the columns that make up the primary key. Computes the number of /// Takes batches of data and the columns that make up the primary key. Computes the number of
/// distinct values for each primary key column across all batches, also known as "cardinality". /// distinct values for each primary key column across all batches, also known as "cardinality".
/// Used to determine sort order. /// Used to determine sort order.
fn distinct_counts( fn distinct_counts<'a>(
batches: &[Arc<SnapshotBatch>], batches: impl Iterator<Item = &'a RecordBatch>,
primary_key: &[&str], primary_key: &[&str],
) -> HashMap<String, NonZeroU64> { ) -> HashMap<String, NonZeroU64> {
let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len()); let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len());
for batch in batches { for batch in batches {
for (column, distinct_values) in distinct_values(&batch.data, primary_key) { for (column, distinct_values) in distinct_values(batch, primary_key) {
let set = distinct_values_across_batches let set = distinct_values_across_batches
.entry(column) .entry(column)
.or_insert_with(HashSet::new); .or_insert_with(HashSet::new);
@ -179,29 +179,16 @@ pub fn adjust_sort_key_columns(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use data_types2::SequenceNumber; use schema::{merge::merge_record_batch_schemas, selection::Selection};
use schema::selection::Selection;
fn lp_to_queryable_batch(line_protocol_batches: &[&str]) -> QueryableBatch { fn lp_to_record_batches(line_protocol_batches: &[&str]) -> Vec<Arc<RecordBatch>> {
let data = line_protocol_batches line_protocol_batches
.iter() .iter()
.map(|line_protocol| { .map(|line_protocol| {
let (_, mb) = mutable_batch_lp::test_helpers::lp_to_mutable_batch(line_protocol); let (_, mb) = mutable_batch_lp::test_helpers::lp_to_mutable_batch(line_protocol);
let rb = mb.to_arrow(Selection::All).unwrap(); Arc::new(mb.to_arrow(Selection::All).unwrap())
Arc::new(SnapshotBatch {
min_sequencer_number: SequenceNumber::new(0),
max_sequencer_number: SequenceNumber::new(1),
data: Arc::new(rb),
})
}) })
.collect(); .collect()
QueryableBatch {
data,
delete_predicates: Default::default(),
table_name: Default::default(),
}
} }
#[test] #[test]
@ -212,8 +199,8 @@ mod tests {
cpu,host=c,env=stage val=11 1 cpu,host=c,env=stage val=11 1
cpu,host=a,env=prod val=14 2 cpu,host=a,env=prod val=14 2
"#; "#;
let qb = lp_to_queryable_batch(&[lp]); let rbs = lp_to_record_batches(&[lp]);
let rb = &qb.data[0].data; let rb = &rbs[0];
// Pass the tag field names plus time as the primary key, this is what should happen // Pass the tag field names plus time as the primary key, this is what should happen
let distinct = distinct_values(rb, &["host", "env", "time"]); let distinct = distinct_values(rb, &["host", "env", "time"]);
@ -271,9 +258,10 @@ mod tests {
cpu,host=a,env=dev val=23 5 cpu,host=a,env=dev val=23 5
cpu,host=b,env=dev val=2 6 cpu,host=b,env=dev val=2 6
"#; "#;
let qb = lp_to_queryable_batch(&[lp1, lp2, lp3]); let rbs = lp_to_record_batches(&[lp1, lp2, lp3]);
let schema = merge_record_batch_schemas(&rbs);
let sort_key = compute_sort_key(&qb); let sort_key = compute_sort_key(schema.as_ref(), rbs.iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"])); assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"]));
} }