feat: sub-traces for `create_chunks` (#7148)
In one prod case the majority of this was NOT spend on creating the child chunks. I suspect that the summary creation and the string cloning involved in there are quite slow. So let's have slightly more detailed tracing and see.pull/24376/head
parent
e7369449f8
commit
07b7107f9a
|
@ -77,29 +77,37 @@ impl ChunkAdapter {
|
|||
) -> Vec<QuerierParquetChunk> {
|
||||
let span_recorder = SpanRecorder::new(span);
|
||||
|
||||
let basic_summaries: Vec<_> = files
|
||||
.iter()
|
||||
.map(|p| {
|
||||
Arc::new(create_basic_summary(
|
||||
p.row_count as u64,
|
||||
&cached_table.schema,
|
||||
TimestampMinMax {
|
||||
min: p.min_time.get(),
|
||||
max: p.max_time.get(),
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect();
|
||||
let basic_summaries: Vec<_> = {
|
||||
let _span_recorder = span_recorder.child("create basic summaries");
|
||||
|
||||
files
|
||||
.iter()
|
||||
.map(|p| {
|
||||
Arc::new(create_basic_summary(
|
||||
p.row_count as u64,
|
||||
&cached_table.schema,
|
||||
TimestampMinMax {
|
||||
min: p.min_time.get(),
|
||||
max: p.max_time.get(),
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks
|
||||
let keeps = match prune_summaries(&cached_table.schema, &basic_summaries, predicate) {
|
||||
Ok(keeps) => keeps,
|
||||
Err(reason) => {
|
||||
// Ignore pruning failures here - the chunk pruner should have already logged them.
|
||||
// Just skip pruning and gather all the metadata. We have another chance to prune them
|
||||
// once all the metadata is available
|
||||
debug!(?reason, "Could not prune before metadata fetch");
|
||||
vec![true; basic_summaries.len()]
|
||||
let keeps = {
|
||||
let _span_recorder = span_recorder.child("prune summaries");
|
||||
|
||||
match prune_summaries(&cached_table.schema, &basic_summaries, predicate) {
|
||||
Ok(keeps) => keeps,
|
||||
Err(reason) => {
|
||||
// Ignore pruning failures here - the chunk pruner should have already logged them.
|
||||
// Just skip pruning and gather all the metadata. We have another chance to prune them
|
||||
// once all the metadata is available
|
||||
debug!(?reason, "Could not prune before metadata fetch");
|
||||
vec![true; basic_summaries.len()]
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -121,23 +129,31 @@ impl ChunkAdapter {
|
|||
|
||||
// de-correlate parquet files so that subsequent items likely don't block/wait on the same cache lookup
|
||||
// (they are likely ordered by partition)
|
||||
let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64);
|
||||
parquet_files.shuffle(&mut rng);
|
||||
{
|
||||
let _span_recorder = span_recorder.child("shuffle order");
|
||||
|
||||
futures::stream::iter(parquet_files)
|
||||
.map(|cached_parquet_file| {
|
||||
let span_recorder = &span_recorder;
|
||||
let cached_table = Arc::clone(&cached_table);
|
||||
async move {
|
||||
let span = span_recorder.child_span("new_chunk");
|
||||
self.new_chunk(cached_table, cached_parquet_file, span)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
|
||||
.filter_map(|x| async { x })
|
||||
.collect()
|
||||
.await
|
||||
let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64);
|
||||
parquet_files.shuffle(&mut rng);
|
||||
}
|
||||
|
||||
{
|
||||
let _span_recorder = span_recorder.child("create individual chunks");
|
||||
|
||||
futures::stream::iter(parquet_files)
|
||||
.map(|cached_parquet_file| {
|
||||
let span_recorder = &span_recorder;
|
||||
let cached_table = Arc::clone(&cached_table);
|
||||
async move {
|
||||
let span = span_recorder.child_span("new_chunk");
|
||||
self.new_chunk(cached_table, cached_parquet_file, span)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
|
||||
.filter_map(|x| async { x })
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_chunk(
|
||||
|
|
Loading…
Reference in New Issue