refactor: Reduce boiler plate calling `SpanRecorder::child` (#5180)

* refactor: call SpanRecorder::child

* refactor: update more locations

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-07-22 07:11:45 -04:00 committed by GitHub
parent 5a0af921c8
commit 495bbe48f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 33 additions and 77 deletions

View File

@ -252,9 +252,7 @@ impl QuerierChunk {
.read_buffer()
.peek(
meta.parquet_file_id,
span_recorder
.span()
.map(|span| span.child("cache PEEK read_buffer")),
span_recorder.child_span("cache PEEK read_buffer"),
)
.await
}
@ -267,9 +265,7 @@ impl QuerierChunk {
Arc::clone(parquet_chunk.parquet_file()),
Arc::clone(&schema),
store.clone(),
span_recorder
.span()
.map(|span| span.child("cache GET read_buffer")),
span_recorder.child_span("cache GET read_buffer"),
)
.await,
)
@ -378,7 +374,7 @@ impl ChunkAdapter {
.chunk_parts(
namespace_name,
Arc::clone(&parquet_file),
span_recorder.span().map(|span| span.child("chunk_parts")),
span_recorder.child_span("chunk_parts"),
)
.await?;
@ -401,9 +397,7 @@ impl ChunkAdapter {
Arc::clone(&self.catalog_cache),
self.store.clone(),
load_settings,
span_recorder
.span()
.map(|span| span.child("QuerierChunk::new")),
span_recorder.child_span("QuerierChunk::new"),
)
.await,
)
@ -424,9 +418,7 @@ impl ChunkAdapter {
.table()
.name(
parquet_file.table_id,
span_recorder
.span()
.map(|span| span.child("cache GET table name")),
span_recorder.child_span("cache GET table name"),
)
.await?;
let namespace_schema = self
@ -435,9 +427,7 @@ impl ChunkAdapter {
.schema(
namespace_name,
&[(&table_name, &file_column_ids)],
span_recorder
.span()
.map(|span| span.child("cache GET namespace schema")),
span_recorder.child_span("cache GET namespace schema"),
)
.await?;
let table_schema_catalog = namespace_schema.tables.get(table_name.as_ref())?;
@ -473,9 +463,7 @@ impl ChunkAdapter {
.sort_key(
parquet_file.partition_id,
&relevant_pk_columns,
span_recorder
.span()
.map(|span| span.child("cache GET partition sort key")),
span_recorder.child_span("cache GET partition sort key"),
)
.await;
let partition_sort_key_ref = partition_sort_key

View File

@ -281,9 +281,7 @@ impl QueryChunk for QuerierChunk {
parquet_file,
schema,
store,
span_recorder
.span()
.map(|span| span.child("cache GET read_buffer")),
span_recorder.child_span("cache GET read_buffer"),
)
.await;
stage.write().load_to_read_buffer(rb_chunk);

View File

@ -161,9 +161,7 @@ impl QuerierDatabase {
Arc::clone(&name),
// we have no specific need for any tables or columns at this point, so nothing to cover
&[],
span_recorder
.span()
.map(|span| span.child("cache GET namespace schema")),
span_recorder.child_span("cache GET namespace schema"),
)
.await?;
Some(Arc::new(QuerierNamespace::new(

View File

@ -526,9 +526,7 @@ async fn execute(
table_name,
catalog_cache,
expected_schema,
span_recorder
.span()
.map(|span| span.child("IngesterStreamDecoder")),
span_recorder.child_span("IngesterStreamDecoder"),
);
for (msg, md) in messages {
decoder.register(msg, md).await?;
@ -611,8 +609,7 @@ impl IngesterStreamDecoder {
current_partition.partition_id(),
&primary_key,
self.span_recorder
.span()
.map(|span| span.child("cache GET partition sort key")),
.child_span("cache GET partition sort key"),
)
.await;
let current_partition = current_partition.with_partition_sort_key(partition_sort_key);
@ -652,8 +649,7 @@ impl IngesterStreamDecoder {
.sequencer_id(
partition_id,
self.span_recorder
.span()
.map(|span| span.child("cache GET partition sequencer ID")),
.child_span("cache GET partition sequencer ID"),
)
.await;

View File

@ -168,23 +168,14 @@ impl QuerierTable {
// ask ingesters for data, also optimistically fetching catalog
// contents at the same time to pre-warm cache
let (partitions, _parquet_files, _tombstones) = join!(
self.ingester_partitions(
predicate,
span_recorder
.span()
.map(|span| span.child("ingester partitions"))
),
self.ingester_partitions(predicate, span_recorder.child_span("ingester partitions")),
catalog_cache.parquet_file().get(
self.id(),
span_recorder
.span()
.map(|span| span.child("cache GET parquet_file (pre-warm"))
span_recorder.child_span("cache GET parquet_file (pre-warm")
),
catalog_cache.tombstone().get(
self.id(),
span_recorder
.span()
.map(|span| span.child("cache GET tombstone (pre-warm)"))
span_recorder.child_span("cache GET tombstone (pre-warm)")
),
);
@ -206,16 +197,11 @@ impl QuerierTable {
let (parquet_files, tombstones) = join!(
catalog_cache.parquet_file().get(
self.id(),
span_recorder
.span()
.map(|span| span.child("cache GET parquet_file"))
span_recorder.child_span("cache GET parquet_file")
),
catalog_cache.tombstone().get(
self.id(),
span_recorder
.span()
.map(|span| span.child("cache GET tombstone"))
)
catalog_cache
.tombstone()
.get(self.id(), span_recorder.child_span("cache GET tombstone"))
);
// filter out parquet files early
@ -223,7 +209,7 @@ impl QuerierTable {
let parquet_files: Vec<_> = futures::stream::iter(parquet_files.files.iter())
.filter_map(|cached_parquet_file| {
let chunk_adapter = Arc::clone(&self.chunk_adapter);
let span = span_recorder.span().map(|span| span.child("new_chunk"));
let span = span_recorder.child_span("new_chunk");
async move {
chunk_adapter
.new_chunk(
@ -260,7 +246,7 @@ impl QuerierTable {
partitions,
tombstones.to_vec(),
parquet_files,
span_recorder.span().map(|span| span.child("reconcile")),
span_recorder.child_span("reconcile"),
)
.await
.context(StateFusionSnafu)
@ -339,9 +325,7 @@ impl QuerierTable {
columns,
predicate,
Arc::clone(&self.schema),
span_recorder
.span()
.map(|span| span.child("IngesterConnection partitions")),
span_recorder.child_span("IngesterConnection partitions"),
)
.await
.context(GettingIngesterPartitionsSnafu);

View File

@ -64,21 +64,14 @@ impl Reconciler {
&ingester_partitions,
tombstones,
parquet_files,
span_recorder
.span()
.map(|span| span.child("build_chunks_from_parquet")),
span_recorder.child_span("build_chunks_from_parquet"),
)
.await?;
chunks.extend(self.build_ingester_chunks(ingester_partitions));
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
let chunks = self
.sync_partition_sort_keys(
chunks,
span_recorder
.span()
.map(|span| span.child("sync_partition_sort_key")),
)
.sync_partition_sort_keys(chunks, span_recorder.child_span("sync_partition_sort_key"))
.await;
let chunks: Vec<Arc<dyn QueryChunk>> = chunks
@ -185,9 +178,7 @@ impl Reconciler {
.exists(
chunk.meta().parquet_file_id(),
tombstone.tombstone_id(),
span_recorder
.span()
.map(|span| span.child("cache GET exists processed_tombstone")),
span_recorder.child_span("cache GET exists processed_tombstone"),
)
.await
{
@ -251,9 +242,7 @@ impl Reconciler {
.sort_key(
partition_id,
&columns,
span_recorder
.span()
.map(|span| span.child("cache GET partition sort key")),
span_recorder.child_span("cache GET partition sort key"),
)
.await;
sort_keys.insert(partition_id, sort_key);

View File

@ -206,10 +206,7 @@ impl SpanRecorder {
/// If this `SpanRecorder` has a `Span`, creates a new child of that `Span` and
/// returns a `SpanRecorder` for it. Otherwise returns an empty `SpanRecorder`
pub fn child(&self, name: &'static str) -> Self {
match &self.span {
Some(span) => Self::new(Some(span.child(name))),
None => Self::new(None),
}
Self::new(self.child_span(name))
}
/// Return a reference to the span contained in this SpanRecorder,
@ -218,6 +215,12 @@ impl SpanRecorder {
self.span.as_ref()
}
/// Return a child span of the specified name, if this SpanRecorder
/// has an active span, `None` otherwise.
pub fn child_span(&self, name: &'static str) -> Option<Span> {
self.span.as_ref().map(|span| span.child(name))
}
/// Link this span to another context.
pub fn link(&mut self, other: &SpanContext) {
if let Some(span) = self.span.as_mut() {