feat: Implement distinct counts in terms of distinct values
For one record batch. Connects to #4194.pull/24376/head
parent
832495a7c9
commit
f4b5fa1b5e
|
@ -309,7 +309,7 @@ mod tests {
|
|||
seq_num_end,
|
||||
3,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
None, // todo: this will have value when #3968 is done
|
||||
Some(SortKey::from_columns(["tag1", "time"])),
|
||||
);
|
||||
assert_eq!(expected_meta, meta);
|
||||
}
|
||||
|
|
|
@ -1,13 +1,22 @@
|
|||
//! Functions for computing a sort key based on cardinality of primary key columns.
|
||||
|
||||
use crate::data::{QueryableBatch, SnapshotBatch};
|
||||
use arrow::{
|
||||
array::{Array, DictionaryArray, StringArray},
|
||||
datatypes::{DataType, Int32Type},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use observability_deps::tracing::trace;
|
||||
use query::QueryChunkMeta;
|
||||
use schema::{
|
||||
sort::{SortKey, SortKeyBuilder},
|
||||
TIME_COLUMN_NAME,
|
||||
};
|
||||
use std::{collections::HashMap, num::NonZeroU64, sync::Arc};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
num::NonZeroU64,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
/// Given a `QueryableBatch`, compute a sort key based on:
|
||||
///
|
||||
|
@ -43,10 +52,81 @@ pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey {
|
|||
/// distinct values for each primary key column across all batches, also known as "cardinality".
|
||||
/// Used to determine sort order.
|
||||
fn distinct_counts(
|
||||
_batches: &[Arc<SnapshotBatch>],
|
||||
_primary_key: &[&str],
|
||||
batches: &[Arc<SnapshotBatch>],
|
||||
primary_key: &[&str],
|
||||
) -> HashMap<String, NonZeroU64> {
|
||||
HashMap::new()
|
||||
let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len());
|
||||
|
||||
for batch in batches {
|
||||
for (column, distinct_values) in distinct_values(&batch.data, primary_key) {
|
||||
let set = distinct_values_across_batches
|
||||
.entry(column)
|
||||
.or_insert_with(HashSet::new);
|
||||
set.extend(distinct_values.into_iter());
|
||||
}
|
||||
}
|
||||
|
||||
distinct_values_across_batches
|
||||
.into_iter()
|
||||
.filter_map(|(column, distinct_values)| {
|
||||
distinct_values
|
||||
.len()
|
||||
.try_into()
|
||||
.ok()
|
||||
.and_then(NonZeroU64::new)
|
||||
.map(|count| (column, count))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Takes a `RecordBatch` and the column names that make up the primary key of the schema. Returns
|
||||
/// a map of column names to the set of the distinct string values, for the specified columns. Used
|
||||
/// to compute cardinality across multiple `RecordBatch`es.
|
||||
fn distinct_values(batch: &RecordBatch, primary_key: &[&str]) -> HashMap<String, HashSet<String>> {
|
||||
let schema = batch.schema();
|
||||
batch
|
||||
.columns()
|
||||
.iter()
|
||||
.zip(schema.fields())
|
||||
.filter(|(_col, field)| primary_key.contains(&field.name().as_str()))
|
||||
.flat_map(|(col, field)| match field.data_type() {
|
||||
// Dictionaries of I32 => Utf8 are supported as tags in
|
||||
// `schema::InfluxColumnType::valid_arrow_type`
|
||||
DataType::Dictionary(key, value)
|
||||
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||
{
|
||||
let col = col
|
||||
.as_any()
|
||||
.downcast_ref::<DictionaryArray<Int32Type>>()
|
||||
.expect("unexpected datatype");
|
||||
|
||||
let values = col.values();
|
||||
let values = values
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("unexpected datatype");
|
||||
|
||||
Some((
|
||||
field.name().into(),
|
||||
values.iter().flatten().map(ToString::to_string).collect(),
|
||||
))
|
||||
}
|
||||
// Utf8 types are supported as tags
|
||||
DataType::Utf8 => {
|
||||
let values = col
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("unexpected datatype");
|
||||
|
||||
Some((
|
||||
field.name().into(),
|
||||
values.iter().flatten().map(ToString::to_string).collect(),
|
||||
))
|
||||
}
|
||||
// No other data types are supported as tags; don't compute distinct values for them
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -77,6 +157,52 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_distinct_values() {
|
||||
let lp = r#"
|
||||
cpu,host=a val=23 1
|
||||
cpu,host=b,env=prod val=2 1
|
||||
cpu,host=c,env=stage val=11 1
|
||||
cpu,host=a,env=prod val=14 2
|
||||
"#;
|
||||
let qb = lp_to_queryable_batch(&[lp]);
|
||||
let rb = &qb.data[0].data;
|
||||
|
||||
// Pass the tag field names plus time as the primary key, this is what should happen
|
||||
let distinct = distinct_values(rb, &["host", "env", "time"]);
|
||||
|
||||
// The hashmap should contain the distinct values for "host" and "env" only
|
||||
assert_eq!(distinct.len(), 2);
|
||||
|
||||
// Return unique values
|
||||
assert_eq!(
|
||||
*distinct.get("host").unwrap(),
|
||||
HashSet::from(["a".into(), "b".into(), "c".into()]),
|
||||
);
|
||||
// TODO: do nulls count as a value?
|
||||
assert_eq!(
|
||||
*distinct.get("env").unwrap(),
|
||||
HashSet::from(["prod".into(), "stage".into()]),
|
||||
);
|
||||
|
||||
// Requesting a column not present returns None
|
||||
assert_eq!(distinct.get("foo"), None);
|
||||
|
||||
// Distinct count isn't computed for the time column or fields
|
||||
assert_eq!(distinct.get("time"), None);
|
||||
assert_eq!(distinct.get("val"), None);
|
||||
|
||||
// Specify a column in the primary key that doesn't appear in the data
|
||||
let distinct = distinct_values(rb, &["host", "env", "foo", "time"]);
|
||||
// The hashmap should contain the distinct values for "host" and "env" only
|
||||
assert_eq!(distinct.len(), 2);
|
||||
|
||||
// Don't specify one of the tag columns for the primary key
|
||||
let distinct = distinct_values(rb, &["host", "foo", "time"]);
|
||||
// The hashmap should contain the distinct values for the specified columns only
|
||||
assert_eq!(distinct.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sort_key() {
|
||||
// Across these three record batches:
|
||||
|
|
Loading…
Reference in New Issue