refactor: prep work for #5032 (#5060)

* refactor: remove parquet chunk ID to `ChunkMeta`

* refactor: return `Arc` from `QueryChunk::summary`

This is similar to how we handle other chunk data like schemas. This
allows a chunk to change/refine its "believe" over its own payload while
it is passed around in the query stack.

Helps w/ #5032.
pull/24376/head
Marco Neumann 2022-07-07 15:21:48 +02:00 committed by GitHub
parent 8f5210ea3e
commit aacdeaca52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 80 additions and 76 deletions

View File

@ -118,7 +118,7 @@ impl QueryableParquetChunk {
}
impl QueryChunkMeta for QueryableParquetChunk {
fn summary(&self) -> Option<&TableSummary> {
fn summary(&self) -> Option<Arc<TableSummary>> {
None
}

View File

@ -94,7 +94,7 @@ impl QueryableBatch {
}
impl QueryChunkMeta for QueryableBatch {
fn summary(&self) -> Option<&TableSummary> {
fn summary(&self) -> Option<Arc<TableSummary>> {
None
}

View File

@ -39,8 +39,8 @@ pub use query_functions::group_by::{Aggregate, WindowDuration};
/// Trait for an object (designed to be a Chunk) which can provide
/// metadata
pub trait QueryChunkMeta {
/// Return a reference to the summary of the data
fn summary(&self) -> Option<&TableSummary>;
/// Return a summary of the data
fn summary(&self) -> Option<Arc<TableSummary>>;
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> Arc<Schema>;
@ -200,7 +200,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
) -> Result<PredicateMatch, QueryChunkError> {
Ok(self
.summary()
.map(|summary| predicate.apply_to_table_summary(summary, self.schema().as_arrow()))
.map(|summary| predicate.apply_to_table_summary(&summary, self.schema().as_arrow()))
.unwrap_or(PredicateMatch::Unknown))
}
@ -259,7 +259,7 @@ impl<P> QueryChunkMeta for Arc<P>
where
P: QueryChunkMeta,
{
fn summary(&self) -> Option<&TableSummary> {
fn summary(&self) -> Option<Arc<TableSummary>> {
self.as_ref().summary()
}
@ -292,7 +292,7 @@ where
/// Implement ChunkMeta for Arc<dyn QueryChunk>
impl QueryChunkMeta for Arc<dyn QueryChunk> {
fn summary(&self) -> Option<&TableSummary> {
fn summary(&self) -> Option<Arc<TableSummary>> {
self.as_ref().summary()
}
@ -344,12 +344,15 @@ pub fn compute_sort_key_for_chunks(schema: &Schema, chunks: &[Arc<dyn QueryChunk
}
}
/// Compute a sort key that orders lower cardinality columns first
/// Compute a sort key that orders lower _estimated_ cardinality columns first
///
/// In the absence of more precise information, this should yield a
/// good ordering for RLE compression
pub fn compute_sort_key<'a>(summaries: impl Iterator<Item = &'a TableSummary>) -> SortKey {
let mut cardinalities: HashMap<&str, u64> = Default::default();
/// good ordering for RLE compression.
///
/// The cardinality is estimated by the sum of unique counts over all summaries. This may overestimate cardinality since
/// it does not account for shared/repeated values.
fn compute_sort_key(summaries: impl Iterator<Item = Arc<TableSummary>>) -> SortKey {
let mut cardinalities: HashMap<String, u64> = Default::default();
for summary in summaries {
for column in &summary.columns {
if column.influxdb_type != Some(InfluxDbType::Tag) {
@ -360,7 +363,7 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator<Item = &'a TableSummary>) -
if let Some(count) = column.stats.distinct_count() {
cnt = count.get();
}
*cardinalities.entry(column.name.as_str()).or_default() += cnt;
*cardinalities.entry_ref(column.name.as_str()).or_default() += cnt;
}
}
@ -368,7 +371,8 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator<Item = &'a TableSummary>) -
let mut cardinalities: Vec<_> = cardinalities.into_iter().collect();
// Sort by (cardinality, column_name) to have deterministic order if same cardinality
cardinalities.sort_by_key(|x| (x.1, x.0));
cardinalities
.sort_by(|(name_1, card_1), (name_2, card_2)| (card_1, name_1).cmp(&(card_2, name_2)));
let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1);
for (col, _) in cardinalities {

View File

@ -37,7 +37,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
// work on ParquetFileWithMetadata. Since group_potential_duplicates only needs 2 functions
// partition_id and timestamp_min_max, other functions are left `umimplemneted` on purpose
impl QueryChunkMeta for ParquetFile {
fn summary(&self) -> Option<&TableSummary> {
fn summary(&self) -> Option<Arc<TableSummary>> {
unimplemented!()
}

View File

@ -181,10 +181,16 @@ impl ExecutionPlan for IOxReadFilterNode {
}
combined_summary_option = match combined_summary_option {
None => Some(chunk.summary().expect("Chunk should have summary").clone()),
None => Some(
chunk
.summary()
.expect("Chunk should have summary")
.as_ref()
.clone(),
),
Some(mut combined_summary) => {
combined_summary
.update_from(chunk.summary().expect("Chunk should have summary"));
.update_from(&chunk.summary().expect("Chunk should have summary"));
Some(combined_summary)
}
}

View File

@ -120,10 +120,10 @@ impl<'a> ChunkPruningStatistics<'a> {
fn column_summaries<'b: 'a>(
&self,
column: &'b Column,
) -> impl Iterator<Item = Option<&Statistics>> + 'a {
) -> impl Iterator<Item = Option<Statistics>> + 'a {
self.chunks
.iter()
.map(|chunk| Some(&chunk.summary()?.column(&column.name)?.stats))
.map(|chunk| Some(chunk.summary()?.column(&column.name)?.stats.clone()))
}
}
@ -155,54 +155,59 @@ impl<'a> PruningStatistics for ChunkPruningStatistics<'a> {
/// Collects an [`ArrayRef`] containing the aggregate statistic corresponding to
/// `aggregate` for each of the provided [`Statistics`]
fn collect_pruning_stats<'a>(
fn collect_pruning_stats(
data_type: &DataType,
statistics: impl Iterator<Item = Option<&'a Statistics>>,
statistics: impl Iterator<Item = Option<Statistics>>,
aggregate: Aggregate,
) -> Option<ArrayRef> {
match data_type {
DataType::Int64 | DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let values = statistics.map(|s| match &s {
let values = statistics.map(|s| match s {
Some(Statistics::I64(v)) => get_aggregate(v, aggregate),
_ => &None,
_ => None,
});
Some(Arc::new(Int64Array::from_iter(values)))
}
DataType::UInt64 => {
let values = statistics.map(|s| match &s {
let values = statistics.map(|s| match s {
Some(Statistics::U64(v)) => get_aggregate(v, aggregate),
_ => &None,
_ => None,
});
Some(Arc::new(UInt64Array::from_iter(values)))
}
DataType::Float64 => {
let values = statistics.map(|s| match &s {
let values = statistics.map(|s| match s {
Some(Statistics::F64(v)) => get_aggregate(v, aggregate),
_ => &None,
_ => None,
});
Some(Arc::new(Float64Array::from_iter(values)))
}
DataType::Boolean => {
let values = statistics.map(|s| match &s {
let values = statistics.map(|s| match s {
Some(Statistics::Bool(v)) => get_aggregate(v, aggregate),
_ => &None,
_ => None,
});
Some(Arc::new(BooleanArray::from_iter(values)))
}
DataType::Utf8 => {
let values = statistics.map(|s| match &s {
let values = statistics.map(|s| match s {
Some(Statistics::String(v)) => get_aggregate(v, aggregate),
_ => &None,
_ => None,
});
Some(Arc::new(StringArray::from_iter(values)))
}
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
let values = statistics.map(|s| match &s {
Some(Statistics::String(v)) => get_aggregate(v, aggregate).as_deref(),
let values = statistics.map(|s| match s {
Some(Statistics::String(v)) => get_aggregate(v, aggregate),
_ => None,
});
// DictionaryArray can only be built from string references (`str`), not from owned strings (`String`), so
// we need to collect the strings first
let values: Vec<_> = values.collect();
let values = values.iter().map(|s| s.as_deref());
Some(Arc::new(DictionaryArray::<Int32Type>::from_iter(values)))
}
_ => None,
@ -210,11 +215,11 @@ fn collect_pruning_stats<'a>(
}
/// Returns the aggregate statistic corresponding to `aggregate` from `stats`
fn get_aggregate<T>(stats: &StatValues<T>, aggregate: Aggregate) -> &Option<T> {
fn get_aggregate<T>(stats: StatValues<T>, aggregate: Aggregate) -> Option<T> {
match aggregate {
Aggregate::Min => &stats.min,
Aggregate::Max => &stats.max,
_ => &None,
Aggregate::Min => stats.min,
Aggregate::Max => stats.max,
_ => None,
}
}

View File

@ -986,8 +986,8 @@ impl QueryChunk for TestChunk {
}
impl QueryChunkMeta for TestChunk {
fn summary(&self) -> Option<&TableSummary> {
Some(&self.table_summary)
fn summary(&self) -> Option<Arc<TableSummary>> {
Some(Arc::new(self.table_summary.clone()))
}
fn schema(&self) -> Arc<Schema> {

View File

@ -21,6 +21,9 @@ pub(crate) mod util;
/// Immutable metadata attached to a [`QuerierParquetChunk`].
#[derive(Debug)]
pub struct ChunkMeta {
/// ID of the Parquet file of the chunk
parquet_file_id: ParquetFileId,
/// The ID of the chunk
chunk_id: ChunkId,
@ -47,6 +50,11 @@ pub struct ChunkMeta {
}
impl ChunkMeta {
/// ID of the Parquet file of the chunk
pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file_id
}
/// Chunk order.
pub fn order(&self) -> ChunkOrder {
self.order
@ -81,14 +89,11 @@ impl ChunkMeta {
/// Chunk representation of `read_buffer::RBChunk`s for the querier.
#[derive(Debug)]
pub struct QuerierRBChunk {
/// ID of the Parquet file of the chunk
parquet_file_id: ParquetFileId,
/// Underlying read buffer chunk
rb_chunk: Arc<RBChunk>,
/// Table summary
table_summary: TableSummary,
table_summary: Arc<TableSummary>,
/// min/max time range of this table (extracted from TableSummary), if known
timestamp_min_max: Option<TimestampMinMax>,
@ -109,17 +114,15 @@ pub struct QuerierRBChunk {
impl QuerierRBChunk {
/// Create new read-buffer-backed chunk
pub fn new(
parquet_file_id: ParquetFileId,
rb_chunk: Arc<RBChunk>,
meta: Arc<ChunkMeta>,
schema: Arc<Schema>,
partition_sort_key: Arc<Option<SortKey>>,
) -> Self {
let table_summary = rb_chunk.table_summary();
let table_summary = Arc::new(rb_chunk.table_summary());
let timestamp_min_max = table_summary.time_range();
Self {
parquet_file_id,
rb_chunk,
table_summary,
timestamp_min_max,
@ -143,11 +146,6 @@ impl QuerierRBChunk {
self.meta.as_ref()
}
/// Parquet file ID
pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file_id
}
/// Set partition sort key
pub fn with_partition_sort_key(self, partition_sort_key: Arc<Option<SortKey>>) -> Self {
Self {
@ -165,9 +163,6 @@ impl QuerierRBChunk {
/// the query engine (DataFusion and InfluxRPC) expect.
#[derive(Debug)]
pub struct QuerierParquetChunk {
/// ID of the Parquet file of the chunk
parquet_file_id: ParquetFileId,
/// Chunk of the Parquet file
parquet_chunk: Arc<ParquetChunk>,
@ -181,24 +176,22 @@ pub struct QuerierParquetChunk {
partition_sort_key: Arc<Option<SortKey>>,
/// Table summary
table_summary: TableSummary,
table_summary: Arc<TableSummary>,
}
impl QuerierParquetChunk {
/// Create new parquet-backed chunk (object store data).
pub fn new(
parquet_file_id: ParquetFileId,
parquet_chunk: Arc<ParquetChunk>,
meta: Arc<ChunkMeta>,
partition_sort_key: Arc<Option<SortKey>>,
) -> Self {
let table_summary = create_basic_summary(
let table_summary = Arc::new(create_basic_summary(
parquet_chunk.rows() as u64,
&parquet_chunk.schema(),
parquet_chunk.timestamp_min_max(),
);
));
Self {
parquet_file_id,
parquet_chunk,
meta,
delete_predicates: Vec::new(),
@ -233,11 +226,6 @@ impl QuerierParquetChunk {
self.meta.as_ref()
}
/// Parquet file ID
pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file_id
}
/// Return time range
pub fn timestamp_min_max(&self) -> Option<TimestampMinMax> {
Some(self.parquet_chunk.timestamp_min_max())
@ -321,7 +309,6 @@ impl ChunkAdapter {
));
Some(QuerierParquetChunk::new(
parts.parquet_file_id,
chunk,
parts.meta,
parts.partition_sort_key,
@ -345,7 +332,6 @@ impl ChunkAdapter {
.await;
Some(QuerierRBChunk::new(
parts.parquet_file_id,
rb_chunk,
parts.meta,
parts.schema,
@ -440,6 +426,7 @@ impl ChunkAdapter {
let order = ChunkOrder::new(parquet_file.min_sequence_number.get());
let meta = Arc::new(ChunkMeta {
parquet_file_id: parquet_file.id,
chunk_id,
table_name,
order,
@ -451,7 +438,6 @@ impl ChunkAdapter {
});
Some(ChunkParts {
parquet_file_id: parquet_file.id,
meta,
schema,
partition_sort_key,
@ -460,7 +446,6 @@ impl ChunkAdapter {
}
struct ChunkParts {
parquet_file_id: ParquetFileId,
meta: Arc<ChunkMeta>,
schema: Arc<Schema>,
partition_sort_key: Arc<Option<SortKey>>,

View File

@ -41,8 +41,8 @@ pub enum Error {
}
impl QueryChunkMeta for QuerierParquetChunk {
fn summary(&self) -> Option<&TableSummary> {
Some(&self.table_summary)
fn summary(&self) -> Option<Arc<TableSummary>> {
Some(Arc::clone(&self.table_summary))
}
fn schema(&self) -> Arc<Schema> {
@ -143,8 +143,8 @@ impl QueryChunk for QuerierParquetChunk {
}
impl QueryChunkMeta for QuerierRBChunk {
fn summary(&self) -> Option<&TableSummary> {
Some(&self.table_summary)
fn summary(&self) -> Option<Arc<TableSummary>> {
Some(Arc::clone(&self.table_summary))
}
fn schema(&self) -> Arc<Schema> {

View File

@ -920,7 +920,11 @@ impl IngesterPartition {
let ts_min_max = compute_timenanosecond_min_max(&batches).expect("Should have time range");
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>() as u64;
let summary = create_basic_summary(row_count, &expected_schema, ts_min_max);
let summary = Arc::new(create_basic_summary(
row_count,
&expected_schema,
ts_min_max,
));
let chunk = IngesterChunk {
chunk_id,
@ -997,7 +1001,7 @@ pub struct IngesterChunk {
ts_min_max: TimestampMinMax,
/// Summary Statistics
summary: TableSummary,
summary: Arc<TableSummary>,
}
impl IngesterChunk {
@ -1010,8 +1014,8 @@ impl IngesterChunk {
}
impl QueryChunkMeta for IngesterChunk {
fn summary(&self) -> Option<&TableSummary> {
Some(&self.summary)
fn summary(&self) -> Option<Arc<TableSummary>> {
Some(Arc::clone(&self.summary))
}
fn schema(&self) -> Arc<Schema> {

View File

@ -183,7 +183,7 @@ impl Reconciler {
.chunk_adapter
.catalog_cache()
.processed_tombstones()
.exists(chunk.parquet_file_id(), tombstone.tombstone_id())
.exists(chunk.meta().parquet_file_id(), tombstone.tombstone_id())
.await
{
continue;