feat: calculate summaries for `IngesterPartition`

pull/24376/head
Marco Neumann 2022-04-22 10:21:14 +02:00
parent c0ed688043
commit 99f6fb5f59
2 changed files with 289 additions and 10 deletions

View File

@ -28,7 +28,9 @@ pub use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar},
names::{org_and_bucket_to_database, OrgBucketMappingError},
non_empty::NonEmptyString,
partition_metadata::{InfluxDbType, PartitionAddr, TableSummary},
partition_metadata::{
ColumnSummary, InfluxDbType, PartitionAddr, StatValues, Statistics, TableSummary,
},
sequence::Sequence,
timestamp::TimestampRange,
DatabaseName,

View File

@ -3,8 +3,8 @@ use std::{any::Any, collections::HashMap, sync::Arc};
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use async_trait::async_trait;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, PartitionId, SequenceNumber, SequencerId,
TableSummary,
ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, IngesterQueryRequest, PartitionId,
SequenceNumber, SequencerId, StatValues, Statistics, TableSummary,
};
use datafusion_util::MemoryStream;
use futures::{stream::FuturesUnordered, TryStreamExt};
@ -14,7 +14,7 @@ use query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkError, QueryChunkMeta,
};
use schema::{selection::Selection, sort::SortKey, Schema};
use schema::{selection::Selection, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::cache::CatalogCache;
@ -356,6 +356,7 @@ pub struct IngesterPartition {
tombstone_max_sequence_number: Option<SequenceNumber>,
batches: Vec<RecordBatch>,
summary: TableSummary,
}
impl IngesterPartition {
@ -385,6 +386,8 @@ impl IngesterPartition {
.map(|batch| ensure_schema(batch, expected_schema.as_ref()))
.collect::<Result<Vec<RecordBatch>>>()?;
let summary = calculate_summary(&batches, &expected_schema);
Ok(Self {
chunk_id,
namespace_name,
@ -396,33 +399,34 @@ impl IngesterPartition {
parquet_max_sequence_number,
tombstone_max_sequence_number,
batches,
summary,
})
}
pub fn has_batches(&self) -> bool {
pub(crate) fn has_batches(&self) -> bool {
!self.batches.is_empty()
}
pub fn partition_id(&self) -> PartitionId {
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
pub fn sequencer_id(&self) -> SequencerId {
pub(crate) fn sequencer_id(&self) -> SequencerId {
self.sequencer_id
}
pub fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
pub(crate) fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.parquet_max_sequence_number
}
pub fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
pub(crate) fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
}
impl QueryChunkMeta for IngesterPartition {
fn summary(&self) -> Option<&TableSummary> {
None
Some(&self.summary)
}
fn schema(&self) -> Arc<Schema> {
@ -561,6 +565,72 @@ fn ensure_schema(batch: RecordBatch, expected_schema: &Schema) -> Result<RecordB
RecordBatch::try_new(expected_schema.as_arrow(), new_columns).context(CreatingRecordBatchSnafu)
}
fn calculate_summary(batches: &[RecordBatch], schema: &Schema) -> TableSummary {
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>() as u64;
let mut columns = Vec::with_capacity(schema.len());
for i in 0..schema.len() {
let (t, field) = schema.field(i);
let t = t.expect("influx column type must be known");
let influxdb_type = match t {
InfluxColumnType::Tag => InfluxDbType::Tag,
InfluxColumnType::Field(_) => InfluxDbType::Field,
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
};
let stats = match t {
InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => {
Statistics::String(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
})
}
InfluxColumnType::Timestamp | InfluxColumnType::Field(InfluxFieldType::Integer) => {
Statistics::I64(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
})
}
InfluxColumnType::Field(InfluxFieldType::UInteger) => Statistics::U64(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
InfluxColumnType::Field(InfluxFieldType::Float) => Statistics::F64(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
InfluxColumnType::Field(InfluxFieldType::Boolean) => Statistics::Bool(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
};
columns.push(ColumnSummary {
name: field.name().clone(),
influxdb_type: Some(influxdb_type),
stats,
})
}
TableSummary { columns }
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashMap};
@ -1070,4 +1140,211 @@ mod tests {
fn str_vec() -> &'static [Option<&'static str>] {
&[Some("foo"), Some("bar"), Some("baz")]
}
#[test]
fn test_calculate_summary_no_columns_no_rows() {
let schema = SchemaBuilder::new().build().unwrap();
let actual = calculate_summary(&[], &schema);
let expected = TableSummary { columns: vec![] };
assert_eq!(actual, expected);
}
#[test]
fn test_calculate_summary_no_rows() {
let schema = full_schema();
let actual = calculate_summary(&[], &schema);
let expected = TableSummary {
columns: vec![
ColumnSummary {
name: String::from("tag"),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_bool"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::Bool(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_float"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_integer"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::I64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_string"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_uinteger"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::U64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("time"),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: Statistics::I64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
],
};
assert_eq!(actual, expected);
}
#[test]
fn test_calculate_summary() {
let schema = full_schema();
let batches = &[
lp_to_record_batch("table,tag=foo field_bool=true,field_float=1.1,field_integer=1,field_string=\"bar\",field_uinteger=2u 42"),
lp_to_record_batch(&[
"table,tag=foo field_bool=true,field_float=1.1,field_integer=1,field_string=\"bar\",field_uinteger=2u 42",
"table,tag=foo field_bool=true,field_float=1.1,field_integer=1,field_string=\"bar\",field_uinteger=2u 42",
].join("\n")),
];
let actual = calculate_summary(batches, &schema);
let expected = TableSummary {
columns: vec![
ColumnSummary {
name: String::from("tag"),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_bool"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::Bool(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_float"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_integer"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::I64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_string"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_uinteger"),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::U64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("time"),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: Statistics::I64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
],
};
assert_eq!(actual, expected);
}
fn full_schema() -> Schema {
SchemaBuilder::new()
.tag("tag")
.influx_field("field_bool", InfluxFieldType::Boolean)
.influx_field("field_float", InfluxFieldType::Float)
.influx_field("field_integer", InfluxFieldType::Integer)
.influx_field("field_string", InfluxFieldType::String)
.influx_field("field_uinteger", InfluxFieldType::UInteger)
.timestamp()
.build()
.unwrap()
}
}