From 59e1c1d5b99dc24bcc98ec8ed846d34229408c68 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 19 Oct 2022 12:10:17 -0400 Subject: [PATCH] feat: Pass trace id through Flight requests from querier to ingester Fixes #5723. --- Cargo.lock | 4 + client_util/src/tower.rs | 15 ++++ influxdb_iox/src/commands/query_ingester.rs | 2 +- .../tests/end_to_end_cases/ingester.rs | 6 +- influxdb_iox_client/Cargo.toml | 3 + .../src/client/flight/low_level.rs | 82 ++++++++++++------- influxdb_iox_client/src/client/flight/mod.rs | 2 +- ingester/src/handler.rs | 16 +++- ingester/src/querier_handler.rs | 72 ++++++++++------ ingester/src/server/grpc.rs | 72 ++++++++-------- ingester/tests/common/mod.rs | 2 +- querier/src/ingester/flight_client.rs | 8 +- querier/src/ingester/mod.rs | 11 ++- query_tests/Cargo.toml | 1 + query_tests/src/scenarios/util.rs | 5 +- 15 files changed, 194 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 199fcdf609..b44c9949c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2216,6 +2216,9 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "trace", + "trace_exporters", + "trace_http", ] [[package]] @@ -3992,6 +3995,7 @@ dependencies = [ "tempfile", "test_helpers", "tokio", + "trace", "workspace-hack", ] diff --git a/client_util/src/tower.rs b/client_util/src/tower.rs index 3f5225b7a2..21fa4a8a42 100644 --- a/client_util/src/tower.rs +++ b/client_util/src/tower.rs @@ -38,6 +38,21 @@ pub struct SetRequestHeadersService { headers: Arc>, } +impl SetRequestHeadersService { + pub fn new(service: S, headers: Vec<(HeaderName, HeaderValue)>) -> Self { + Self { + service, + headers: Arc::new(headers), + } + } + + pub fn into_parts(self) -> (S, Arc>) { + let SetRequestHeadersService { service, headers } = self; + + (service, headers) + } +} + impl Service> for SetRequestHeadersService where S: Service, Response = Response>, diff --git a/influxdb_iox/src/commands/query_ingester.rs b/influxdb_iox/src/commands/query_ingester.rs index 987e4c5d18..5b79b8d6d4 100644 --- a/influxdb_iox/src/commands/query_ingester.rs +++ b/influxdb_iox/src/commands/query_ingester.rs @@ -53,7 +53,7 @@ pub struct Config { } pub async fn command(connection: Connection, config: Config) -> Result<()> { - let mut client = flight::low_level::Client::new(connection); + let mut client = flight::low_level::Client::new(connection, None); let Config { namespace, format, diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index edf93bb305..519697fe6d 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -29,7 +29,7 @@ async fn ingester_flight_api() { let mut querier_flight = influxdb_iox_client::flight::low_level::Client::< influxdb_iox_client::flight::generated_types::IngesterQueryRequest, - >::new(cluster.ingester().ingester_grpc_connection()); + >::new(cluster.ingester().ingester_grpc_connection(), None); let query = IngesterQueryRequest::new( cluster.namespace().to_string(), @@ -98,7 +98,7 @@ async fn ingester_flight_api_namespace_not_found() { let mut querier_flight = influxdb_iox_client::flight::low_level::Client::< influxdb_iox_client::flight::generated_types::IngesterQueryRequest, - >::new(cluster.ingester().ingester_grpc_connection()); + >::new(cluster.ingester().ingester_grpc_connection(), None); let query = IngesterQueryRequest::new( String::from("does_not_exist"), @@ -137,7 +137,7 @@ async fn ingester_flight_api_table_not_found() { let mut querier_flight = influxdb_iox_client::flight::low_level::Client::< influxdb_iox_client::flight::generated_types::IngesterQueryRequest, - >::new(cluster.ingester().ingester_grpc_connection()); + >::new(cluster.ingester().ingester_grpc_connection(), None); let query = IngesterQueryRequest::new( cluster.namespace().to_string(), diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 7adaecd4ec..efa64806f0 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -25,3 +25,6 @@ tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-threa tokio-stream = "0.1.11" thiserror = "1.0.37" tonic = { version = "0.8" } +trace = { path = "../trace" } +trace_exporters = { path = "../trace_exporters" } +trace_http = { path = "../trace_http" } diff --git a/influxdb_iox_client/src/client/flight/low_level.rs b/influxdb_iox_client/src/client/flight/low_level.rs index 6e0860aa50..80567b17c7 100644 --- a/influxdb_iox_client/src/client/flight/low_level.rs +++ b/influxdb_iox_client/src/client/flight/low_level.rs @@ -1,32 +1,30 @@ //! Low-level flight client. //! -//! This client allows more inspection of the flight messages which can be helpful to implement more advanced protocols. +//! This client allows more inspection of the flight messages which can be helpful to implement +//! more advanced protocols. //! //! # Protocol Usage +//! //! The client handles flight messages as followes: //! -//! - **None:** App metadata is extracted. Otherwise this message has no effect. This is useful to transmit metadata -//! without any actual payload. -//! - **Schema:** The schema is (re-)set. Dictionaries are cleared. App metadata is extraced and both the schema and the -//! metadata are presented to the user. -//! - **Dictionary Batch:** A new dictionary for a given column is registered. An existing dictionary for the same -//! column will be overwritten. No app metadata is extracted. This message is NOT visible to the user. -//! - **Record Batch:** Record batch is created based on the current schema and dictionaries. This fails if no schema -//! was transmitted yet. App metadata is extracted is is presented -- together with the record batch -- to the user. +//! - **None:** App metadata is extracted. Otherwise this message has no effect. This is useful to +//! transmit metadata without any actual payload. +//! - **Schema:** The schema is (re-)set. Dictionaries are cleared. App metadata is extraced and +//! both the schema and the metadata are presented to the user. +//! - **Dictionary Batch:** A new dictionary for a given column is registered. An existing +//! dictionary for the same column will be overwritten. No app metadata is extracted. This +//! message is NOT visible to the user. +//! - **Record Batch:** Record batch is created based on the current schema and dictionaries. This +//! fails if no schema was transmitted yet. App metadata is extracted is is presented -- together +//! with the record batch -- to the user. //! //! All other message types (at the time of writing: tensor and sparse tensor) lead to an error. -use std::{collections::HashMap, convert::TryFrom, marker::PhantomData, sync::Arc}; +use super::Error; use ::generated_types::influxdata::iox::{ ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata}, querier::v1::{AppMetadata, ReadInfo}, }; -use client_util::connection::{Connection, GrpcConnection}; -use futures_util::stream; -use futures_util::stream::StreamExt; -use prost::Message; -use tonic::Streaming; - use arrow::{ array::ArrayRef, buffer::Buffer, @@ -38,9 +36,18 @@ use arrow_flight::{ flight_service_client::FlightServiceClient, utils::flight_data_to_arrow_batch, FlightData, HandshakeRequest, Ticket, }; - -use super::Error; +use client_util::connection::{Connection, GrpcConnection}; +use futures_util::stream; +use futures_util::stream::StreamExt; +use prost::Message; use rand::Rng; +use std::{collections::HashMap, convert::TryFrom, marker::PhantomData, str::FromStr, sync::Arc}; +use tonic::{ + codegen::http::header::{HeaderName, HeaderValue}, + Streaming, +}; +use trace::ctx::SpanContext; +use trace_http::ctx::format_jaeger_trace_context; /// Metadata that can be send during flight requests. pub trait ClientMetadata: Message { @@ -59,9 +66,11 @@ impl ClientMetadata for IngesterQueryRequest { /// Low-level flight client. /// /// # Request and Response Metadata -/// The type parameter `T` -- which must implement [`ClientMetadata`] describes the request and response metadata that -/// is send and received during the flight request. The request is encoded as protobuf and send as the Flight "ticket", -/// the response is received via the so called "app metadata". +/// +/// The type parameter `T` -- which must implement [`ClientMetadata`] -- describes the request and +/// response metadata that is sent and received during the flight request. The request is encoded +/// as protobuf and send as the Flight "ticket". The response is received via the so-called "app +/// metadata". #[derive(Debug)] pub struct Client where @@ -76,15 +85,32 @@ where T: ClientMetadata, { /// Creates a new client with the provided connection - pub fn new(connection: Connection) -> Self { + pub fn new(connection: Connection, span_context: Option) -> Self { + let grpc_conn = connection.into_grpc_connection(); + + let grpc_conn = if let Some(ctx) = span_context { + let (service, headers) = grpc_conn.into_parts(); + + let mut headers: HashMap<_, _> = headers.iter().cloned().collect(); + let key = + HeaderName::from_str(trace_exporters::DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME) + .unwrap(); + let value = HeaderValue::from_str(&format_jaeger_trace_context(&ctx)).unwrap(); + headers.insert(key, value); + + GrpcConnection::new(service, headers.into_iter().collect()) + } else { + grpc_conn + }; + Self { - inner: FlightServiceClient::new(connection.into_grpc_connection()), + inner: FlightServiceClient::new(grpc_conn), _phantom: PhantomData::default(), } } - /// Query the given database with the given SQL query, and return a - /// [`PerformQuery`] instance that streams low-level message results. + /// Query the given database with the given SQL query, and return a [`PerformQuery`] instance + /// that streams low-level message results. pub async fn perform_query(&mut self, request: T) -> Result, Error> { PerformQuery::::new(self, request).await } @@ -141,6 +167,7 @@ impl LowLevelMessage { LowLevelMessage::RecordBatch(_) => panic!("Contains record batch"), } } + /// Unwrap schema. pub fn unwrap_schema(self) -> Arc { match self { @@ -160,9 +187,8 @@ impl LowLevelMessage { } } -/// A struct that manages the stream of Arrow `RecordBatch` results from an -/// Arrow Flight query. Created by calling the `perform_query` method on a -/// Flight [`Client`]. +/// A struct that manages the stream of Arrow `RecordBatch` results from an Arrow Flight query. +/// Created by calling the `perform_query` method on a Flight [`Client`]. #[derive(Debug)] pub struct PerformQuery where diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index 2a98b83f4c..3b3e3c6044 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -120,7 +120,7 @@ impl Client { /// Creates a new client with the provided connection pub fn new(connection: Connection) -> Self { Self { - inner: LowLevelClient::new(connection), + inner: LowLevelClient::new(connection, None), } } diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index d5c2def7e9..7ecf700e69 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -23,6 +23,7 @@ use tokio::{ task::{JoinError, JoinHandle}, }; use tokio_util::sync::CancellationToken; +use trace::span::{Span, SpanRecorder}; use write_buffer::core::WriteBufferReading; use write_summary::ShardProgress; @@ -71,6 +72,7 @@ pub trait IngestHandler: Send + Sync { async fn query( &self, request: IngesterQueryRequest, + span: Option, ) -> Result; /// Return shard progress for the requested shard indexes @@ -331,7 +333,10 @@ impl IngestHandler for IngestHandlerImpl { async fn query( &self, request: IngesterQueryRequest, + span: Option, ) -> Result { + let span_recorder = SpanRecorder::new(span); + // TODO(4567): move this into a instrumented query delegate // Acquire and hold a permit for the duration of this request, or return @@ -361,7 +366,12 @@ impl IngestHandler for IngestHandlerImpl { let t = self.time_provider.now(); let request = Arc::new(request); - let res = prepare_data_to_querier(&self.data, &request).await; + let res = prepare_data_to_querier( + &self.data, + &request, + span_recorder.child_span("ingester prepare data to querier"), + ) + .await; if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { @@ -668,14 +678,14 @@ mod tests { predicate: None, }; - let res = ingester.query(request.clone()).await.unwrap_err(); + let res = ingester.query(request.clone(), None).await.unwrap_err(); assert!(matches!( res, crate::querier_handler::Error::NamespaceNotFound { .. } )); ingester.request_sem = Semaphore::new(0); - let res = ingester.query(request).await.unwrap_err(); + let res = ingester.query(request, None).await.unwrap_err(); assert!(matches!(res, crate::querier_handler::Error::RequestLimit)); } } diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index e9e13e72f5..7eaa269289 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -1,7 +1,12 @@ //! Handle all requests from Querier -use std::{pin::Pin, sync::Arc}; - +use crate::{ + data::{ + namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName, + IngesterData, + }, + query::QueryableBatch, +}; use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use data_types::{PartitionId, SequenceNumber}; @@ -12,14 +17,8 @@ use generated_types::ingester::IngesterQueryRequest; use observability_deps::tracing::debug; use schema::{merge::SchemaMerger, selection::Selection}; use snafu::{ensure, Snafu}; - -use crate::{ - data::{ - namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName, - IngesterData, - }, - query::QueryableBatch, -}; +use std::{pin::Pin, sync::Arc}; +use trace::span::{Span, SpanRecorder}; /// Number of table data read locks that shall be acquired in parallel const CONCURRENT_TABLE_DATA_LOCKS: usize = 10; @@ -171,12 +170,13 @@ impl IngesterQueryResponse { /// Convert [`IngesterQueryResponse`] to a set of [`RecordBatch`]es. /// - /// If the response contains multiple snapshots, this will merge the schemas into a single one and create - /// NULL-columns for snapshots that miss columns. + /// If the response contains multiple snapshots, this will merge the schemas into a single one + /// and create NULL-columns for snapshots that miss columns. /// /// # Panic - /// Panics if there are no batches returned at all. Also panics if the snapshot-scoped schemas do not line up with - /// the snapshot-scoped record batches. + /// + /// Panics if there are no batches returned at all. Also panics if the snapshot-scoped schemas + /// do not line up with the snapshot-scoped record batches. pub async fn into_record_batches(self) -> Vec { let mut snapshot_schema = None; let mut schema_merger = SchemaMerger::new(); @@ -260,8 +260,12 @@ pub enum FlatIngesterQueryResponse { pub async fn prepare_data_to_querier( ingest_data: &Arc, request: &Arc, + span: Option, ) -> Result { debug!(?request, "prepare_data_to_querier"); + + let span_recorder = SpanRecorder::new(span); + let mut tables_data = vec![]; let mut found_namespace = false; for (shard_id, shard_data) in ingest_data.shards() { @@ -324,13 +328,18 @@ pub async fn prepare_data_to_querier( // extract payload let partition_id = partition.partition_id; let status = partition.partition_status.clone(); - let snapshots: Vec<_> = prepare_data_to_querier_for_partition(partition, &request) - .into_iter() - .map(Ok) - .collect(); + let snapshots: Vec<_> = prepare_data_to_querier_for_partition( + partition, + &request, + span_recorder.child_span("ingester prepare data to querier for partition"), + ) + .into_iter() + .map(Ok) + .collect(); - // Note: include partition in `unpersisted_partitions` even when there we might filter out all the data, because - // the metadata (e.g. max persisted parquet file) is important for the querier. + // Note: include partition in `unpersisted_partitions` even when there we might filter + // out all the data, because the metadata (e.g. max persisted parquet file) is + // important for the querier. Ok(IngesterQueryPartition::new( Box::pin(futures::stream::iter(snapshots)), partition_id, @@ -344,7 +353,10 @@ pub async fn prepare_data_to_querier( fn prepare_data_to_querier_for_partition( unpersisted_partition_data: UnpersistedPartitionData, request: &IngesterQueryRequest, + span: Option, ) -> Vec { + let mut span_recorder = SpanRecorder::new(span); + // ------------------------------------------------ // Accumulate data @@ -368,7 +380,7 @@ fn prepare_data_to_querier_for_partition( }) .with_data(unpersisted_partition_data.non_persisted); - queryable_batch + let streams = queryable_batch .data .iter() .map(|snapshot_batch| { @@ -393,7 +405,11 @@ fn prepare_data_to_querier_for_partition( // create stream Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream }) - .collect() + .collect(); + + span_recorder.ok("done"); + + streams } #[cfg(test)] @@ -501,6 +517,8 @@ mod tests { async fn test_prepare_data_to_querier() { test_helpers::maybe_start_logging(); + let span = None; + // make 14 scenarios for ingester data let mut scenarios = vec![]; for two_partitions in [false, true] { @@ -541,7 +559,7 @@ mod tests { ]; for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let result = prepare_data_to_querier(scenario, &request) + let result = prepare_data_to_querier(scenario, &request, span.clone()) .await .unwrap() .into_record_batches() @@ -577,7 +595,7 @@ mod tests { ]; for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let result = prepare_data_to_querier(scenario, &request) + let result = prepare_data_to_querier(scenario, &request, span.clone()) .await .unwrap() .into_record_batches() @@ -622,7 +640,7 @@ mod tests { ]; for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let result = prepare_data_to_querier(scenario, &request) + let result = prepare_data_to_querier(scenario, &request, span.clone()) .await .unwrap() .into_record_batches() @@ -639,7 +657,7 @@ mod tests { )); for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let err = prepare_data_to_querier(scenario, &request) + let err = prepare_data_to_querier(scenario, &request, span.clone()) .await .unwrap_err(); assert_matches!(err, Error::TableNotFound { .. }); @@ -654,7 +672,7 @@ mod tests { )); for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let err = prepare_data_to_querier(scenario, &request) + let err = prepare_data_to_querier(scenario, &request, span.clone()) .await .unwrap_err(); assert_matches!(err, Error::NamespaceNotFound { .. }); diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index 0512d53239..1155536e70 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -1,14 +1,9 @@ //! gRPC service implementations for `ingester`. -use std::{ - pin::Pin, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - task::Poll, +use crate::{ + handler::IngestHandler, + querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, }; - use arrow::error::ArrowError; use arrow_flight::{ flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, @@ -25,17 +20,19 @@ use observability_deps::tracing::{debug, info, warn}; use pin_project::pin_project; use prost::Message; use snafu::{ResultExt, Snafu}; +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + task::Poll, +}; use tonic::{Request, Response, Streaming}; -use trace::ctx::SpanContext; +use trace::{ctx::SpanContext, span::SpanExt}; use write_summary::WriteSummary; -use crate::{ - handler::IngestHandler, - querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, -}; - -/// This type is responsible for managing all gRPC services exposed by -/// `ingester`. +/// This type is responsible for managing all gRPC services exposed by `ingester`. #[derive(Debug, Default)] pub struct GrpcDelegate { ingest_handler: Arc, @@ -47,8 +44,7 @@ pub struct GrpcDelegate { } impl GrpcDelegate { - /// Initialise a new [`GrpcDelegate`] passing valid requests to the - /// specified `ingest_handler`. + /// Initialise a new [`GrpcDelegate`] passing valid requests to the specified `ingest_handler`. pub fn new(ingest_handler: Arc, test_flight_do_get_panic: Arc) -> Self { Self { ingest_handler, @@ -260,7 +256,7 @@ impl Flight for FlightService { &self, request: Request, ) -> Result, tonic::Status> { - let _span_ctx: Option = request.extensions().get().cloned(); + let span_ctx: Option = request.extensions().get().cloned(); let ticket = request.into_inner(); let proto_query_request = @@ -272,25 +268,25 @@ impl Flight for FlightService { self.maybe_panic_in_flight_do_get(); - let query_response = - self.ingest_handler - .query(query_request) - .await - .map_err(|e| match e { - crate::querier_handler::Error::NamespaceNotFound { namespace_name } => { - Error::NamespaceNotFound { namespace_name } - } - crate::querier_handler::Error::TableNotFound { - namespace_name, - table_name, - } => Error::TableNotFound { - namespace_name, - table_name, - }, - _ => Error::Query { - source: Box::new(e), - }, - })?; + let query_response = self + .ingest_handler + .query(query_request, span_ctx.child_span("ingest handler query")) + .await + .map_err(|e| match e { + crate::querier_handler::Error::NamespaceNotFound { namespace_name } => { + Error::NamespaceNotFound { namespace_name } + } + crate::querier_handler::Error::TableNotFound { + namespace_name, + table_name, + } => Error::TableNotFound { + namespace_name, + table_name, + }, + _ => Error::Query { + source: Box::new(e), + }, + })?; let output = GetStream::new(query_response.flatten()); diff --git a/ingester/tests/common/mod.rs b/ingester/tests/common/mod.rs index 026a40e132..97d798701a 100644 --- a/ingester/tests/common/mod.rs +++ b/ingester/tests/common/mod.rs @@ -314,7 +314,7 @@ impl TestContext { &self, req: IngesterQueryRequest, ) -> Result { - self.ingester.query(req).await + self.ingester.query(req, None).await } /// Retrieve the specified metric value. diff --git a/querier/src/ingester/flight_client.rs b/querier/src/ingester/flight_client.rs index 19a9865a6c..adb366f8c9 100644 --- a/querier/src/ingester/flight_client.rs +++ b/querier/src/ingester/flight_client.rs @@ -8,6 +8,7 @@ use influxdb_iox_client::flight::{ use observability_deps::tracing::debug; use snafu::{ResultExt, Snafu}; use std::{collections::HashMap, fmt::Debug, ops::DerefMut, sync::Arc}; +use trace::ctx::SpanContext; pub use influxdb_iox_client::flight::Error as FlightError; @@ -45,6 +46,7 @@ pub trait FlightClient: Debug + Send + Sync + 'static { &self, ingester_address: Arc, request: IngesterQueryRequest, + span_context: Option, ) -> Result, Error>; } @@ -90,10 +92,12 @@ impl FlightClient for FlightClientImpl { &self, ingester_addr: Arc, request: IngesterQueryRequest, + span_context: Option, ) -> Result, Error> { let connection = self.connect(Arc::clone(&ingester_addr)).await?; - let mut client = LowLevelFlightClient::::new(connection); + let mut client = + LowLevelFlightClient::::new(connection, span_context); debug!(%ingester_addr, ?request, "Sending request to ingester"); let request: proto::IngesterQueryRequest = @@ -172,7 +176,7 @@ impl CachedConnection { // sanity check w/ a handshake let mut client = - LowLevelFlightClient::::new(connection.clone()); + LowLevelFlightClient::::new(connection.clone(), None); // make contact with the ingester client diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 927b975a54..4dbea0d06d 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -418,7 +418,13 @@ async fn execute( }; let query_res = flight_client - .query(Arc::clone(&ingester_address), ingester_query_request) + .query( + Arc::clone(&ingester_address), + ingester_query_request, + span_recorder + .child_span("IngesterQuery") + .map(|span| span.ctx), + ) .await; if let Err(FlightClientError::Flight { @@ -1230,7 +1236,7 @@ mod tests { }; use test_helpers::assert_error; use tokio::{runtime::Handle, sync::Mutex}; - use trace::{span::SpanStatus, RingBufferTraceCollector}; + use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector}; #[tokio::test] async fn test_flight_handshake_error() { @@ -1902,6 +1908,7 @@ mod tests { &self, ingester_address: Arc, _request: IngesterQueryRequest, + _span_context: Option, ) -> Result, FlightClientError> { self.responses .lock() diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index e879fcb81d..a2f757ff28 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -31,6 +31,7 @@ querier = { path = "../querier" } schema = { path = "../schema" } sharder = { path = "../sharder" } tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread", "time"] } +trace = { path = "../trace" } workspace-hack = { path = "../workspace-hack"} [dev-dependencies] diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index c23bd3f2fd..6966e6ee1b 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -41,6 +41,7 @@ use std::{ time::Duration, }; use tokio::runtime::Handle; +use trace::ctx::SpanContext; // Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle // & when delete predicates are applied @@ -945,11 +946,13 @@ impl IngesterFlightClient for MockIngester { &self, _ingester_address: Arc, request: IngesterQueryRequest, + _span_context: Option, ) -> Result, IngesterFlightClientError> { + let span = None; // NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior // (e.g. passing predicates of wrong types) let request = Arc::new(request); - let response = prepare_data_to_querier(&self.ingester_data, &request) + let response = prepare_data_to_querier(&self.ingester_data, &request, span) .await .map_err(|e| IngesterFlightClientError::Flight { source: FlightError::ArrowError(arrow::error::ArrowError::ExternalError(Box::new(