refactor: bundle projection schema calculation (#8108)

* refactor: convert projection mask earlier

* refactor: bundle projection schema calculation

Same as #8102 but for the projected schema. This now has a nice side
effect:

1. there is no longer a per chunk cache lookup
2. there is no longer ANY per chunk async computation
3. we no longer need an early pruning stage for the chunks (we've used
   to do that so we can throw away chunks before doing the more
   expensive part of the chunk creation)

This nicely streamlines and simplifies the code.

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-06-30 10:27:30 +02:00 committed by GitHub
parent 2dacd0e7df
commit 1b8b3ae4c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 120 additions and 169 deletions

1
Cargo.lock generated
View File

@ -4395,6 +4395,7 @@ dependencies = [
"datafusion_util",
"futures",
"generated_types",
"hashbrown 0.14.0",
"influxdb_iox_client",
"ingester_query_grpc",
"insta",

View File

@ -17,6 +17,7 @@ data_types = { path = "../data_types" }
datafusion = { workspace = true }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3"
hashbrown = { version = "0.14.0" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }

View File

@ -36,13 +36,13 @@ impl CacheKey {
/// Create new key.
///
/// This normalizes `projection`.
fn new(table_id: TableId, mut projection: Vec<ColumnId>) -> Self {
fn new(table_id: TableId, mut projection: Box<[ColumnId]>) -> Self {
// normalize column order
projection.sort();
Self {
table_id,
projection: projection.into(),
projection,
}
}
@ -141,7 +141,7 @@ impl ProjectedSchemaCache {
pub async fn get(
&self,
table: Arc<CachedTable>,
projection: Vec<ColumnId>,
projection: Box<[ColumnId]>,
span: Option<Span>,
) -> Schema {
let key = CacheKey::new(table.id, projection);
@ -249,7 +249,7 @@ mod tests {
let projection_1 = cache
.get(
Arc::clone(&table_1a),
vec![ColumnId::new(1), ColumnId::new(2)],
[ColumnId::new(1), ColumnId::new(2)].into(),
None,
)
.await;
@ -259,7 +259,7 @@ mod tests {
let projection_2 = cache
.get(
Arc::clone(&table_1a),
vec![ColumnId::new(1), ColumnId::new(2)],
[ColumnId::new(1), ColumnId::new(2)].into(),
None,
)
.await;
@ -269,7 +269,7 @@ mod tests {
let projection_3 = cache
.get(
Arc::clone(&table_1b),
vec![ColumnId::new(1), ColumnId::new(2)],
[ColumnId::new(1), ColumnId::new(2)].into(),
None,
)
.await;
@ -279,7 +279,7 @@ mod tests {
let projection_4 = cache
.get(
Arc::clone(&table_1a),
vec![ColumnId::new(2), ColumnId::new(1)],
[ColumnId::new(2), ColumnId::new(1)].into(),
None,
)
.await;
@ -290,7 +290,7 @@ mod tests {
let projection_5 = cache
.get(
Arc::clone(&table_1a),
vec![ColumnId::new(1), ColumnId::new(3)],
[ColumnId::new(1), ColumnId::new(3)].into(),
None,
)
.await;
@ -300,7 +300,7 @@ mod tests {
let projection_6 = cache
.get(
Arc::clone(&table_2a),
vec![ColumnId::new(1), ColumnId::new(2)],
[ColumnId::new(1), ColumnId::new(2)].into(),
None,
)
.await;
@ -311,7 +311,7 @@ mod tests {
let projection_7 = cache
.get(
Arc::clone(&table_1a),
vec![ColumnId::new(1), ColumnId::new(2)],
[ColumnId::new(1), ColumnId::new(2)].into(),
None,
)
.await;

View File

@ -1,25 +1,18 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId, TimestampMinMax};
use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId};
use futures::StreamExt;
use hashbrown::HashSet;
use iox_catalog::interface::Catalog;
use iox_query::pruning::prune_summaries;
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use predicate::Predicate;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use schema::sort::SortKey;
use schema::{sort::SortKey, Schema};
use trace::span::{Span, SpanRecorder};
use uuid::Uuid;
use crate::{
cache::{namespace::CachedTable, partition::CachedPartition, CatalogCache},
df_stats::create_chunk_statistics,
parquet::QuerierParquetChunkMeta,
table::MetricPruningObserver,
CONCURRENT_CHUNK_CREATION_JOBS,
};
@ -63,160 +56,108 @@ impl ChunkAdapter {
&self,
cached_table: Arc<CachedTable>,
files: Arc<[Arc<ParquetFile>]>,
predicate: &Predicate,
early_pruning_observer: MetricPruningObserver,
cached_partitions: &HashMap<PartitionId, CachedPartition>,
span: Option<Span>,
) -> Vec<QuerierParquetChunk> {
let span_recorder = SpanRecorder::new(span);
// throw out files that belong to removed partitions
let files = files
.iter()
.filter(|f| cached_partitions.contains_key(&f.partition_id))
.cloned()
.collect::<Vec<_>>();
let chunk_stats: Vec<_> = {
let _span_recorder = span_recorder.child("create chunk stats");
// prepare files
let files = {
let _span_recorder = span_recorder.child("prepare files");
files
.iter()
.map(|p| {
let stats = Arc::new(create_chunk_statistics(
p.row_count as u64,
&cached_table.schema,
TimestampMinMax {
min: p.min_time.get(),
max: p.max_time.get(),
},
&cached_partitions
.get(&p.partition_id)
.expect("filter files down to existing partitions")
.column_ranges,
));
let schema = Arc::clone(cached_table.schema.inner());
(stats, schema)
})
.collect()
// throw out files that belong to removed partitions
.filter(|f| cached_partitions.contains_key(&f.partition_id))
.cloned()
.map(|f| PreparedParquetFile::new(f, &cached_table))
.collect::<Vec<_>>()
};
// Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks
let keeps = {
let _span_recorder = span_recorder.child("prune summaries");
match prune_summaries(&cached_table.schema, &chunk_stats, predicate) {
Ok(keeps) => keeps,
Err(reason) => {
// Ignore pruning failures here - the chunk pruner should have already logged them.
// Just skip pruning and gather all the metadata. We have another chance to prune them
// once all the metadata is available
debug!(?reason, "Could not prune before metadata fetch");
vec![true; chunk_stats.len()]
}
// find all projected schemas
let projections = {
let span_recorder = span_recorder.child("get projected schemas");
let mut projections: HashSet<Box<[ColumnId]>> = HashSet::with_capacity(files.len());
for f in &files {
projections.get_or_insert_owned(&f.col_list);
}
};
// Remove any unused parquet files up front to maximize the
// concurrent catalog requests that could be outstanding
let mut parquet_files = files
.iter()
.zip(keeps)
.filter_map(|(pf, keep)| {
if keep {
Some(Arc::clone(pf))
} else {
early_pruning_observer
.was_pruned_early(pf.row_count as u64, pf.file_size_bytes as u64);
None
}
})
.collect::<Vec<_>>();
// de-correlate parquet files so that subsequent items likely don't block/wait on the same cache lookup
// (they are likely ordered by partition)
//
// Note that we sort before shuffling to achieve a deterministic pseudo-random order
{
let _span_recorder = span_recorder.child("shuffle order");
// de-correlate projections so that subsequent items likely don't block/wait on the same cache lookup
// (they are likely ordered by partition)
//
// Note that we sort before shuffling to achieve a deterministic pseudo-random order
let mut projections = projections.into_iter().collect::<Vec<_>>();
projections.sort();
let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64);
parquet_files.sort_by_key(|f| f.id);
parquet_files.shuffle(&mut rng);
}
projections.shuffle(&mut rng);
{
let span_recorder = span_recorder.child("create individual chunks");
futures::stream::iter(parquet_files)
.map(|cached_parquet_file| {
futures::stream::iter(projections)
.map(|column_ids| {
let span_recorder = &span_recorder;
let cached_table = Arc::clone(&cached_table);
let cached_partition = cached_partitions
.get(&cached_parquet_file.partition_id)
.expect("filter files down to existing partitions");
async move {
let span = span_recorder.child_span("new_chunk");
self.new_chunk(cached_table, cached_parquet_file, cached_partition, span)
.await
let schema = self
.catalog_cache
.projected_schema()
.get(
cached_table,
column_ids.clone(),
span_recorder.child_span("cache GET projected schema"),
)
.await;
(column_ids, schema)
}
})
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
.filter_map(|x| async { x })
.collect()
.collect::<HashMap<_, _>>()
.await
};
{
let _span_recorder = span_recorder.child("finalize chunks");
files
.into_iter()
.map(|file| {
let cached_table = Arc::clone(&cached_table);
let schema = projections
.get(&file.col_list)
.expect("looked up all projections")
.clone();
let cached_partition = cached_partitions
.get(&file.file.partition_id)
.expect("filter files down to existing partitions");
self.new_chunk(cached_table, file, schema, cached_partition)
})
.collect()
}
}
async fn new_chunk(
fn new_chunk(
&self,
cached_table: Arc<CachedTable>,
parquet_file: Arc<ParquetFile>,
parquet_file: PreparedParquetFile,
schema: Schema,
cached_partition: &CachedPartition,
span: Option<Span>,
) -> Option<QuerierParquetChunk> {
let span_recorder = SpanRecorder::new(span);
let parquet_file_cols: HashSet<ColumnId> =
parquet_file.column_set.iter().copied().collect();
let partition_sort_key = cached_partition
.sort_key
.as_ref()
.expect("partition sort key should be set when a parquet file exists");
) -> QuerierParquetChunk {
// NOTE: Because we've looked up the sort key AFTER the namespace schema, it may contain columns for which we
// don't have any schema information yet. This is OK because we've ensured that all file columns are known
// withing the schema and if a column is NOT part of the file, it will also not be part of the chunk sort
// key, so we have consistency here.
// calculate schema
// IMPORTANT: Do NOT use the sort key to list columns because the sort key only contains primary-key columns.
// NOTE: The schema that we calculate here may have a different column order than the actual parquet file. This
// NOTE: The schema that we've projected here may have a different column order than the actual parquet file. This
// is OK because the IOx parquet reader can deal with that (see #4921).
let column_ids: Vec<_> = cached_table
.column_id_map
.keys()
.filter(|id| parquet_file_cols.contains(id))
.copied()
.collect();
let schema = self
.catalog_cache
.projected_schema()
.get(
Arc::clone(&cached_table),
column_ids,
span_recorder.child_span("cache GET projected schema"),
)
.await;
// calculate sort key
let partition_sort_key = cached_partition
.sort_key
.as_ref()
.expect("partition sort key should be set when a parquet file exists");
let sort_key = SortKey::from_columns(
partition_sort_key
.column_order
.iter()
.filter(|c_id| parquet_file_cols.contains(c_id))
.filter(|c_id| parquet_file.col_set.contains(*c_id))
.filter_map(|c_id| cached_table.column_id_map.get(c_id))
.cloned(),
);
@ -225,27 +166,59 @@ impl ChunkAdapter {
"Sort key can never be empty because there should at least be a time column",
);
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.file.id.get() as _));
let order = ChunkOrder::new(parquet_file.max_l0_created_at.get());
let order = ChunkOrder::new(parquet_file.file.max_l0_created_at.get());
let meta = Arc::new(QuerierParquetChunkMeta {
chunk_id,
order,
sort_key: Some(sort_key),
partition_id: parquet_file.partition_id,
partition_id: parquet_file.file.partition_id,
});
let parquet_chunk = Arc::new(ParquetChunk::new(
parquet_file,
parquet_file.file,
schema,
self.catalog_cache.parquet_store(),
));
Some(QuerierParquetChunk::new(
QuerierParquetChunk::new(
parquet_chunk,
meta,
Arc::clone(&cached_partition.column_ranges),
))
)
}
}
/// [`ParquetFile`] with some additional fields.
struct PreparedParquetFile {
/// The parquet file as received from the catalog.
file: Arc<ParquetFile>,
/// The set of columns in this file.
col_set: HashSet<ColumnId>,
/// The columns in this file as ordered in the schema.
col_list: Box<[ColumnId]>,
}
impl PreparedParquetFile {
fn new(file: Arc<ParquetFile>, cached_table: &CachedTable) -> Self {
let col_set: HashSet<ColumnId> = file
.column_set
.iter()
.filter(|id| cached_table.column_id_map.contains_key(*id))
.copied()
.collect();
let mut col_list = col_set.iter().copied().collect::<Box<[ColumnId]>>();
col_list.sort();
Self {
file,
col_set,
col_list,
}
}
}

View File

@ -97,12 +97,9 @@ impl QuerierParquetChunk {
pub mod tests {
use std::collections::HashMap;
use crate::{
cache::{
namespace::{CachedNamespace, CachedTable},
CatalogCache,
},
table::MetricPruningObserver,
use crate::cache::{
namespace::{CachedNamespace, CachedTable},
CatalogCache,
};
use super::*;
@ -116,7 +113,6 @@ pub mod tests {
};
use iox_tests::{TestCatalog, TestParquetFileBuilder};
use metric::{Attributes, Observation, RawReporter};
use predicate::Predicate;
use schema::{builder::SchemaBuilder, sort::SortKeyBuilder};
use test_helpers::maybe_start_logging;
use tokio::runtime::Handle;
@ -258,8 +254,6 @@ pub mod tests {
.new_chunks(
Arc::clone(&self.cached_table),
vec![Arc::clone(&self.parquet_file)].into(),
&Predicate::new(),
MetricPruningObserver::new_unregistered(),
&cached_partitions,
None,
)

View File

@ -24,7 +24,6 @@ use trace::span::{Span, SpanRecorder};
use uuid::Uuid;
pub use self::query_access::metrics::PruneMetrics;
pub(crate) use self::query_access::MetricPruningObserver;
mod query_access;
@ -272,8 +271,6 @@ impl QuerierTable {
.new_chunks(
Arc::clone(cached_table),
Arc::clone(&parquet_files.files),
predicate,
MetricPruningObserver::new(Arc::clone(&self.prune_metrics)),
&cached_partitions,
span_recorder.child_span("new_chunks"),
)

View File

@ -124,9 +124,4 @@ impl PruneMetrics {
could_not_prune_df,
}
}
#[cfg(test)]
pub(crate) fn new_unregistered() -> Self {
Self::new(&metric::Registry::new())
}
}

View File

@ -168,16 +168,6 @@ impl MetricPruningObserver {
pub(crate) fn new(metrics: Arc<PruneMetrics>) -> Self {
Self { metrics }
}
#[cfg(test)]
pub(crate) fn new_unregistered() -> Self {
Self::new(Arc::new(PruneMetrics::new_unregistered()))
}
/// Called when pruning a chunk before fully creating the chunk structure
pub(crate) fn was_pruned_early(&self, row_count: u64, size_estimate: u64) {
self.metrics.pruned_early.inc(1, row_count, size_estimate);
}
}
impl PruningObserver for MetricPruningObserver {