diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index ff180c85d0..6fb9e6c898 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -31,6 +31,11 @@ mod state_reconciler; #[cfg(test)] mod test_util; +/// Number of concurrent chunk creation jobs. +/// +/// This is mostly to fetch per-partition data concurrently. +const CONCURRENT_CHUNK_CREATION_JOBS: usize = 10; + #[derive(Debug, Snafu)] #[allow(clippy::large_enum_variant)] pub enum Error { @@ -294,26 +299,32 @@ impl QuerierTable { let early_pruning_observer = &MetricPruningObserver::new(Arc::clone(&self.prune_metrics)); - futures::stream::iter(parquet_files.files.iter().zip(keeps)) - .filter_map(|(cached_parquet_file, keep)| async move { + + futures::stream::iter(parquet_files.files.iter().cloned().zip(keeps)) + .filter(|(cached_parquet_file, keep)| { if !keep { early_pruning_observer.was_pruned_early( cached_parquet_file.row_count as u64, cached_parquet_file.file_size_bytes as u64, ); - return None; } - let chunk_adapter = Arc::clone(&self.chunk_adapter); + + let keep = *keep; + async move { keep } + }) + .map(|(cached_parquet_file, _keep)| async move { let span = span_recorder.child_span("new_chunk"); - chunk_adapter + self.chunk_adapter .new_chunk( cached_table, Arc::clone(self.table_name()), - Arc::clone(cached_parquet_file), + cached_parquet_file, span, ) .await }) + .buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS) + .filter_map(|x| async { x }) .collect() .await }