feat: concurrent chunk creation (#5646)

Create chunks in querier concurrently after we've pre-filtered them.
Chunk creation still may require a bit of cached information (e.g. the
partition sort key) and we can easily fetch these concurrently instead
of in order.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-09-15 12:30:02 +00:00 committed by GitHub
parent 1bac7792db
commit f7b6f81fe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 6 deletions

View File

@ -31,6 +31,11 @@ mod state_reconciler;
#[cfg(test)]
mod test_util;
/// Number of concurrent chunk creation jobs.
///
/// This is mostly to fetch per-partition data concurrently.
const CONCURRENT_CHUNK_CREATION_JOBS: usize = 10;
#[derive(Debug, Snafu)]
#[allow(clippy::large_enum_variant)]
pub enum Error {
@ -294,26 +299,32 @@ impl QuerierTable {
let early_pruning_observer =
&MetricPruningObserver::new(Arc::clone(&self.prune_metrics));
futures::stream::iter(parquet_files.files.iter().zip(keeps))
.filter_map(|(cached_parquet_file, keep)| async move {
futures::stream::iter(parquet_files.files.iter().cloned().zip(keeps))
.filter(|(cached_parquet_file, keep)| {
if !keep {
early_pruning_observer.was_pruned_early(
cached_parquet_file.row_count as u64,
cached_parquet_file.file_size_bytes as u64,
);
return None;
}
let chunk_adapter = Arc::clone(&self.chunk_adapter);
let keep = *keep;
async move { keep }
})
.map(|(cached_parquet_file, _keep)| async move {
let span = span_recorder.child_span("new_chunk");
chunk_adapter
self.chunk_adapter
.new_chunk(
cached_table,
Arc::clone(self.table_name()),
Arc::clone(cached_parquet_file),
cached_parquet_file,
span,
)
.await
})
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
.filter_map(|x| async { x })
.collect()
.await
}