From 495bbe48f27bfec13aa62bdd6489a8a008c2d3f0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 22 Jul 2022 07:11:45 -0400 Subject: [PATCH] refactor: Reduce boiler plate calling `SpanRecorder::child` (#5180) * refactor: call SpanRecorder::child * refactor: update more locations Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- querier/src/chunk/mod.rs | 26 ++++++------------- querier/src/chunk/query_access.rs | 4 +-- querier/src/database.rs | 4 +-- querier/src/ingester/mod.rs | 10 +++----- querier/src/table/mod.rs | 36 ++++++++------------------- querier/src/table/state_reconciler.rs | 19 +++----------- trace/src/span.rs | 11 +++++--- 7 files changed, 33 insertions(+), 77 deletions(-) diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 3cc1c54fb2..9b2144f7a5 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -252,9 +252,7 @@ impl QuerierChunk { .read_buffer() .peek( meta.parquet_file_id, - span_recorder - .span() - .map(|span| span.child("cache PEEK read_buffer")), + span_recorder.child_span("cache PEEK read_buffer"), ) .await } @@ -267,9 +265,7 @@ impl QuerierChunk { Arc::clone(parquet_chunk.parquet_file()), Arc::clone(&schema), store.clone(), - span_recorder - .span() - .map(|span| span.child("cache GET read_buffer")), + span_recorder.child_span("cache GET read_buffer"), ) .await, ) @@ -378,7 +374,7 @@ impl ChunkAdapter { .chunk_parts( namespace_name, Arc::clone(&parquet_file), - span_recorder.span().map(|span| span.child("chunk_parts")), + span_recorder.child_span("chunk_parts"), ) .await?; @@ -401,9 +397,7 @@ impl ChunkAdapter { Arc::clone(&self.catalog_cache), self.store.clone(), load_settings, - span_recorder - .span() - .map(|span| span.child("QuerierChunk::new")), + span_recorder.child_span("QuerierChunk::new"), ) .await, ) @@ -424,9 +418,7 @@ impl ChunkAdapter { .table() .name( parquet_file.table_id, - span_recorder - .span() - .map(|span| span.child("cache GET table name")), + span_recorder.child_span("cache GET table name"), ) .await?; let namespace_schema = self @@ -435,9 +427,7 @@ impl ChunkAdapter { .schema( namespace_name, &[(&table_name, &file_column_ids)], - span_recorder - .span() - .map(|span| span.child("cache GET namespace schema")), + span_recorder.child_span("cache GET namespace schema"), ) .await?; let table_schema_catalog = namespace_schema.tables.get(table_name.as_ref())?; @@ -473,9 +463,7 @@ impl ChunkAdapter { .sort_key( parquet_file.partition_id, &relevant_pk_columns, - span_recorder - .span() - .map(|span| span.child("cache GET partition sort key")), + span_recorder.child_span("cache GET partition sort key"), ) .await; let partition_sort_key_ref = partition_sort_key diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 9392163718..f28da26c08 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -281,9 +281,7 @@ impl QueryChunk for QuerierChunk { parquet_file, schema, store, - span_recorder - .span() - .map(|span| span.child("cache GET read_buffer")), + span_recorder.child_span("cache GET read_buffer"), ) .await; stage.write().load_to_read_buffer(rb_chunk); diff --git a/querier/src/database.rs b/querier/src/database.rs index 62121bd210..636b8217ab 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -161,9 +161,7 @@ impl QuerierDatabase { Arc::clone(&name), // we have no specific need for any tables or columns at this point, so nothing to cover &[], - span_recorder - .span() - .map(|span| span.child("cache GET namespace schema")), + span_recorder.child_span("cache GET namespace schema"), ) .await?; Some(Arc::new(QuerierNamespace::new( diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 5edb2d7e6e..0f845624d8 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -526,9 +526,7 @@ async fn execute( table_name, catalog_cache, expected_schema, - span_recorder - .span() - .map(|span| span.child("IngesterStreamDecoder")), + span_recorder.child_span("IngesterStreamDecoder"), ); for (msg, md) in messages { decoder.register(msg, md).await?; @@ -611,8 +609,7 @@ impl IngesterStreamDecoder { current_partition.partition_id(), &primary_key, self.span_recorder - .span() - .map(|span| span.child("cache GET partition sort key")), + .child_span("cache GET partition sort key"), ) .await; let current_partition = current_partition.with_partition_sort_key(partition_sort_key); @@ -652,8 +649,7 @@ impl IngesterStreamDecoder { .sequencer_id( partition_id, self.span_recorder - .span() - .map(|span| span.child("cache GET partition sequencer ID")), + .child_span("cache GET partition sequencer ID"), ) .await; diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index de13ecd29f..9a9d64772b 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -168,23 +168,14 @@ impl QuerierTable { // ask ingesters for data, also optimistically fetching catalog // contents at the same time to pre-warm cache let (partitions, _parquet_files, _tombstones) = join!( - self.ingester_partitions( - predicate, - span_recorder - .span() - .map(|span| span.child("ingester partitions")) - ), + self.ingester_partitions(predicate, span_recorder.child_span("ingester partitions")), catalog_cache.parquet_file().get( self.id(), - span_recorder - .span() - .map(|span| span.child("cache GET parquet_file (pre-warm")) + span_recorder.child_span("cache GET parquet_file (pre-warm") ), catalog_cache.tombstone().get( self.id(), - span_recorder - .span() - .map(|span| span.child("cache GET tombstone (pre-warm)")) + span_recorder.child_span("cache GET tombstone (pre-warm)") ), ); @@ -206,16 +197,11 @@ impl QuerierTable { let (parquet_files, tombstones) = join!( catalog_cache.parquet_file().get( self.id(), - span_recorder - .span() - .map(|span| span.child("cache GET parquet_file")) + span_recorder.child_span("cache GET parquet_file") ), - catalog_cache.tombstone().get( - self.id(), - span_recorder - .span() - .map(|span| span.child("cache GET tombstone")) - ) + catalog_cache + .tombstone() + .get(self.id(), span_recorder.child_span("cache GET tombstone")) ); // filter out parquet files early @@ -223,7 +209,7 @@ impl QuerierTable { let parquet_files: Vec<_> = futures::stream::iter(parquet_files.files.iter()) .filter_map(|cached_parquet_file| { let chunk_adapter = Arc::clone(&self.chunk_adapter); - let span = span_recorder.span().map(|span| span.child("new_chunk")); + let span = span_recorder.child_span("new_chunk"); async move { chunk_adapter .new_chunk( @@ -260,7 +246,7 @@ impl QuerierTable { partitions, tombstones.to_vec(), parquet_files, - span_recorder.span().map(|span| span.child("reconcile")), + span_recorder.child_span("reconcile"), ) .await .context(StateFusionSnafu) @@ -339,9 +325,7 @@ impl QuerierTable { columns, predicate, Arc::clone(&self.schema), - span_recorder - .span() - .map(|span| span.child("IngesterConnection partitions")), + span_recorder.child_span("IngesterConnection partitions"), ) .await .context(GettingIngesterPartitionsSnafu); diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index a797154422..a62e82c681 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -64,21 +64,14 @@ impl Reconciler { &ingester_partitions, tombstones, parquet_files, - span_recorder - .span() - .map(|span| span.child("build_chunks_from_parquet")), + span_recorder.child_span("build_chunks_from_parquet"), ) .await?; chunks.extend(self.build_ingester_chunks(ingester_partitions)); debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); let chunks = self - .sync_partition_sort_keys( - chunks, - span_recorder - .span() - .map(|span| span.child("sync_partition_sort_key")), - ) + .sync_partition_sort_keys(chunks, span_recorder.child_span("sync_partition_sort_key")) .await; let chunks: Vec> = chunks @@ -185,9 +178,7 @@ impl Reconciler { .exists( chunk.meta().parquet_file_id(), tombstone.tombstone_id(), - span_recorder - .span() - .map(|span| span.child("cache GET exists processed_tombstone")), + span_recorder.child_span("cache GET exists processed_tombstone"), ) .await { @@ -251,9 +242,7 @@ impl Reconciler { .sort_key( partition_id, &columns, - span_recorder - .span() - .map(|span| span.child("cache GET partition sort key")), + span_recorder.child_span("cache GET partition sort key"), ) .await; sort_keys.insert(partition_id, sort_key); diff --git a/trace/src/span.rs b/trace/src/span.rs index 802122606a..82bf95debb 100644 --- a/trace/src/span.rs +++ b/trace/src/span.rs @@ -206,10 +206,7 @@ impl SpanRecorder { /// If this `SpanRecorder` has a `Span`, creates a new child of that `Span` and /// returns a `SpanRecorder` for it. Otherwise returns an empty `SpanRecorder` pub fn child(&self, name: &'static str) -> Self { - match &self.span { - Some(span) => Self::new(Some(span.child(name))), - None => Self::new(None), - } + Self::new(self.child_span(name)) } /// Return a reference to the span contained in this SpanRecorder, @@ -218,6 +215,12 @@ impl SpanRecorder { self.span.as_ref() } + /// Return a child span of the specified name, if this SpanRecorder + /// has an active span, `None` otherwise. + pub fn child_span(&self, name: &'static str) -> Option { + self.span.as_ref().map(|span| span.child(name)) + } + /// Link this span to another context. pub fn link(&mut self, other: &SpanContext) { if let Some(span) = self.span.as_mut() {