feat: filter chunks before requesting read buffers (#4996)

Fixes #4976.
pull/24376/head
Marco Neumann 2022-07-01 10:59:07 +02:00 committed by GitHub
parent 09aaf18fb4
commit 016dd93d9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 119 additions and 27 deletions

View File

@ -33,6 +33,11 @@ impl ParquetChunk {
} }
} }
/// Return raw parquet file metadata.
pub fn parquet_file(&self) -> &Arc<ParquetFile> {
&self.parquet_file
}
/// Return object store id /// Return object store id
pub fn object_store_id(&self) -> Uuid { pub fn object_store_id(&self) -> Uuid {
self.parquet_file.object_store_id self.parquet_file.object_store_id

View File

@ -46,15 +46,11 @@ impl CachedParquetFiles {
} }
/// return the underying files as a new Vec /// return the underying files as a new Vec
pub fn vec(&self) -> Vec<Arc<ParquetFile>> { #[cfg(test)]
fn vec(&self) -> Vec<Arc<ParquetFile>> {
self.files.as_ref().clone() self.files.as_ref().clone()
} }
/// return the number of cached files
pub fn len(&self) -> usize {
self.files.len()
}
/// Estimate the memory consumption of this object and its contents /// Estimate the memory consumption of this object and its contents
fn size(&self) -> usize { fn size(&self) -> usize {
// simplify accounting by ensuring len and capacity of vector are the same // simplify accounting by ensuring len and capacity of vector are the same

View File

@ -208,6 +208,11 @@ impl QuerierParquetChunk {
} }
} }
/// Return raw parquet file metadata.
pub fn parquet_file(&self) -> &Arc<ParquetFile> {
self.parquet_chunk.parquet_file()
}
/// Set delete predicates of the given chunk. /// Set delete predicates of the given chunk.
pub fn with_delete_predicates(self, delete_predicates: Vec<Arc<DeletePredicate>>) -> Self { pub fn with_delete_predicates(self, delete_predicates: Vec<Arc<DeletePredicate>>) -> Self {
Self { Self {

View File

@ -6,10 +6,10 @@ use crate::{
IngesterConnection, IngesterConnection,
}; };
use data_types::{KafkaPartition, PartitionId, TableId}; use data_types::{KafkaPartition, PartitionId, TableId};
use futures::join; use futures::{join, StreamExt, TryStreamExt};
use iox_query::{provider::ChunkPruner, QueryChunk}; use iox_query::{provider::ChunkPruner, QueryChunk};
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use predicate::Predicate; use predicate::{Predicate, PredicateMatch};
use schema::Schema; use schema::Schema;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{ use std::{
@ -163,6 +163,41 @@ impl QuerierTable {
catalog_cache.tombstone().get(self.id()) catalog_cache.tombstone().get(self.id())
); );
// filter out parquet files early
let n_parquet_files_pre_filter = parquet_files.files.len();
let parquet_files: Vec<_> = futures::stream::iter(parquet_files.files.iter())
.filter_map(|cached_parquet_file| {
let chunk_adapter = Arc::clone(&self.chunk_adapter);
async move {
chunk_adapter
.new_parquet_chunk(
Arc::clone(&self.namespace_name),
Arc::clone(cached_parquet_file),
)
.await
}
})
.filter_map(|chunk| {
let res = chunk
.apply_predicate_to_metadata(predicate)
.map(|pmatch| {
let keep = !matches!(pmatch, PredicateMatch::Zero);
keep.then(|| chunk)
})
.transpose();
async move { res }
})
.try_collect()
.await
.unwrap();
debug!(
namespace=%self.namespace_name,
table_name=%self.table_name(),
n_parquet_files_pre_filter,
n_parquet_files_post_filter=parquet_files.len(),
"Applied predicate-based filter to parquet file"
);
self.reconciler self.reconciler
.reconcile(partitions, tombstones.to_vec(), parquet_files) .reconcile(partitions, tombstones.to_vec(), parquet_files)
.await .await
@ -336,25 +371,28 @@ mod tests {
assert!(querier_table.chunks().await.unwrap().is_empty()); assert!(querier_table.chunks().await.unwrap().is_empty());
let file111 = partition11 let file111 = partition11
.create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, 11, 11)
.await; .await;
let file112 = partition11 let file112 = partition11
.create_parquet_file_with_min_max("table1 foo=2 22", 3, 4, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table1 foo=2 22", 3, 4, 22, 22)
.await; .await;
let file113 = partition11 let file113 = partition11
.create_parquet_file_with_min_max("table1 foo=3 33", 5, 6, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table1 foo=3 33", 5, 6, 33, 33)
.await; .await;
let file114 = partition11 let file114 = partition11
.create_parquet_file_with_min_max("table1 foo=4 44", 7, 8, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table1 foo=4 44", 7, 8, 44, 44)
.await; .await;
let file115 = partition11 let file115 = partition11
.create_parquet_file_with_min_max("table1 foo=5 55", 9, 10, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table1 foo=5 55", 9, 10, 55, 55)
.await; .await;
let file121 = partition12 let file121 = partition12
.create_parquet_file_with_min_max("table1 foo=5 55", 1, 2, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table1 foo=5 55", 1, 2, 55, 55)
.await;
let _file122 = partition12
.create_parquet_file_with_min_max("table1 foo=10 100", 1, 2, 100, 100)
.await; .await;
let _file211 = partition21 let _file211 = partition21
.create_parquet_file_with_min_max("table2 foo=6 66", 1, 2, now_nanos(), now_nanos()) .create_parquet_file_with_min_max("table2 foo=6 66", 1, 2, 66, 66)
.await; .await;
file111.flag_for_delete().await; file111.flag_for_delete().await;
@ -377,8 +415,10 @@ mod tests {
// now we have some files // now we have some files
// this contains all files except for: // this contains all files except for:
// - file111: marked for delete // - file111: marked for delete
// - file122: filtered by predicate
// - file221: wrong table // - file221: wrong table
let mut chunks = querier_table.chunks().await.unwrap(); let pred = Predicate::new().with_range(0, 100);
let mut chunks = querier_table.chunks_with_predicate(&pred).await.unwrap();
chunks.sort_by_key(|c| c.id()); chunks.sort_by_key(|c| c.id());
assert_eq!(chunks.len(), 5); assert_eq!(chunks.len(), 5);
@ -821,7 +861,14 @@ mod tests {
/// Invokes querier_table.chunks modeling the ingester sending the partitions in this table /// Invokes querier_table.chunks modeling the ingester sending the partitions in this table
async fn chunks(&self) -> Result<Vec<Arc<dyn QueryChunk>>> { async fn chunks(&self) -> Result<Vec<Arc<dyn QueryChunk>>> {
let pred = Predicate::default(); let pred = Predicate::default();
self.chunks_with_predicate(&pred).await
}
/// Invokes querier_table.chunks modeling the ingester sending the partitions in this table
async fn chunks_with_predicate(
&self,
pred: &Predicate,
) -> Result<Vec<Arc<dyn QueryChunk>>> {
self.querier_table self.querier_table
.ingester_connection .ingester_connection
.as_ref() .as_ref()
@ -831,7 +878,7 @@ mod tests {
.unwrap() .unwrap()
.next_response(Ok(self.ingester_partitions.clone())); .next_response(Ok(self.ingester_partitions.clone()));
self.querier_table.chunks(&pred).await self.querier_table.chunks(pred).await
} }
} }

View File

@ -14,7 +14,6 @@ use std::{
}; };
use crate::{ use crate::{
cache::parquet_file::CachedParquetFiles,
chunk::{ChunkAdapter, QuerierParquetChunk, QuerierRBChunk}, chunk::{ChunkAdapter, QuerierParquetChunk, QuerierRBChunk},
ingester::IngesterChunk, ingester::IngesterChunk,
tombstone::QuerierTombstone, tombstone::QuerierTombstone,
@ -56,7 +55,7 @@ impl Reconciler {
&self, &self,
ingester_partitions: Vec<IngesterPartition>, ingester_partitions: Vec<IngesterPartition>,
tombstones: Vec<Arc<Tombstone>>, tombstones: Vec<Arc<Tombstone>>,
parquet_files: Arc<CachedParquetFiles>, parquet_files: Vec<QuerierParquetChunk>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> { ) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
let mut chunks = self let mut chunks = self
.build_chunks_from_parquet(&ingester_partitions, tombstones, parquet_files) .build_chunks_from_parquet(&ingester_partitions, tombstones, parquet_files)
@ -78,7 +77,7 @@ impl Reconciler {
&self, &self,
ingester_partitions: &[IngesterPartition], ingester_partitions: &[IngesterPartition],
tombstones: Vec<Arc<Tombstone>>, tombstones: Vec<Arc<Tombstone>>,
parquet_files: Arc<CachedParquetFiles>, parquet_files: Vec<QuerierParquetChunk>,
) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> { ) -> Result<Vec<Box<dyn UpdatableQuerierChunk>>, ReconcileError> {
debug!( debug!(
namespace=%self.namespace_name(), namespace=%self.namespace_name(),
@ -104,10 +103,10 @@ impl Reconciler {
.push(tombstone); .push(tombstone);
} }
let parquet_files = filter_parquet_files(ingester_partitions, parquet_files.vec())?; let parquet_files = filter_parquet_files(ingester_partitions, parquet_files)?;
debug!( debug!(
parquet_ids=?parquet_files.iter().map(|f| f.id).collect::<Vec<_>>(), parquet_ids=?parquet_files.iter().map(|f| f.parquet_file().id).collect::<Vec<_>>(),
namespace=%self.namespace_name(), namespace=%self.namespace_name(),
table_name=%self.table_name(), table_name=%self.table_name(),
"Parquet files after filtering" "Parquet files after filtering"
@ -117,10 +116,13 @@ impl Reconciler {
let chunks_from_parquet: Vec<_> = futures::stream::iter(parquet_files) let chunks_from_parquet: Vec<_> = futures::stream::iter(parquet_files)
// use `.map` instead of `then` or `filter_map` because the next step is `buffered_unordered` and that // use `.map` instead of `then` or `filter_map` because the next step is `buffered_unordered` and that
// requires a stream of futures // requires a stream of futures
.map(|cached_parquet_file| async { .map(|chunk| {
let parquet_file = Arc::clone(chunk.parquet_file());
async {
self.chunk_adapter self.chunk_adapter
.new_rb_chunk(Arc::clone(&self.namespace_name), cached_parquet_file) .new_rb_chunk(Arc::clone(&self.namespace_name), parquet_file)
.await .await
}
}) })
// fetch multiple RB chunks in parallel to hide latency // fetch multiple RB chunks in parallel to hide latency
// TODO(marco): expose this as a config // TODO(marco): expose this as a config

View File

@ -1,6 +1,6 @@
//! Interface for reconciling Ingester and catalog state //! Interface for reconciling Ingester and catalog state
use crate::ingester::IngesterPartition; use crate::{chunk::QuerierParquetChunk, ingester::IngesterPartition};
use data_types::{ParquetFile, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId}; use data_types::{ParquetFile, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId};
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
@ -76,6 +76,20 @@ impl ParquetFileInfo for Arc<ParquetFile> {
} }
} }
impl ParquetFileInfo for QuerierParquetChunk {
fn partition_id(&self) -> PartitionId {
self.parquet_file().partition_id
}
fn min_sequence_number(&self) -> SequenceNumber {
self.parquet_file().min_sequence_number
}
fn max_sequence_number(&self) -> SequenceNumber {
self.parquet_file().max_sequence_number
}
}
/// Information about a tombstone. /// Information about a tombstone.
/// ///
/// This is mostly the same as [`Tombstone`] but allows easier mocking. /// This is mostly the same as [`Tombstone`] but allows easier mocking.

View File

@ -61,3 +61,23 @@
| | IOxReadFilterNode: table_name=h2o, chunks=2 predicate=Predicate | | | IOxReadFilterNode: table_name=h2o, chunks=2 predicate=Predicate |
| | | | | |
+---------------+---------------------------------------------------------------------------------------+ +---------------+---------------------------------------------------------------------------------------+
-- SQL: EXPLAIN SELECT * from h2o where time >= to_timestamp('1970-01-01T00:00:00.000000250+00:00');
+---------------+------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #h2o.city, #h2o.other_temp, #h2o.state, #h2o.temp, #h2o.time |
| | Filter: #h2o.time >= TimestampNanosecond(250, None) |
| | TableScan: h2o projection=[city, other_temp, state, temp, time], partial_filters=[#h2o.time >= TimestampNanosecond(250, None)] |
| physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: time@4 >= 250 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | UnionExec |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | UnionExec |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#time >= TimestampNanosecond(250, None)] |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#time >= TimestampNanosecond(250, None)] |
| | IOxReadFilterNode: table_name=h2o, chunks=2 predicate=Predicate exprs: [#time >= TimestampNanosecond(250, None)] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -13,3 +13,6 @@ EXPLAIN SELECT * from h2o;
-- IOX_COMPARE: sorted -- IOX_COMPARE: sorted
select temp, other_temp, time from h2o; select temp, other_temp, time from h2o;
EXPLAIN select temp, other_temp, time from h2o; EXPLAIN select temp, other_temp, time from h2o;
-- early pruning
EXPLAIN SELECT * from h2o where time >= to_timestamp('1970-01-01T00:00:00.000000250+00:00');