diff --git a/querier/src/ingester/flight_client.rs b/querier/src/ingester/flight_client.rs index 41fc3dddc7..350e3b79f2 100644 --- a/querier/src/ingester/flight_client.rs +++ b/querier/src/ingester/flight_client.rs @@ -1,11 +1,12 @@ use arrow::{datatypes::Schema, record_batch::RecordBatch}; use async_trait::async_trait; -use client_util::connection; +use client_util::connection::{self, Connection}; use generated_types::ingester::IngesterQueryRequest; use influxdb_iox_client::flight::{self, generated_types::IngesterQueryResponseMetadata}; use observability_deps::tracing::debug; use snafu::{ResultExt, Snafu}; use std::{ + collections::HashMap, fmt::Debug, ops::{Deref, DerefMut}, sync::Arc, @@ -45,14 +46,23 @@ pub trait FlightClient: Debug + Send + Sync + 'static { /// Send query to given ingester. async fn query( &self, - ingester_address: &str, + ingester_address: Arc, request: IngesterQueryRequest, ) -> Result, Error>; } -/// Default [`FlightClient`] implemenetation that uses a real client. +/// Default [`FlightClient`] implementation that uses a real connection #[derive(Debug, Default)] -pub struct FlightClientImpl {} +pub struct FlightClientImpl { + /// Cached connections + /// key: ingester_address (e.g. "http://ingester-1:8082") + /// value: CachedConnection + /// + /// Note: Use sync (parking_log) mutex because it is always held + /// for a very short period of time, and any actual connection (and + /// waiting) is done in CachedConnection + connections: parking_lot::Mutex>, +} impl FlightClientImpl { /// Create new client. @@ -61,28 +71,19 @@ impl FlightClientImpl { } /// Establish connection to given addr and perform handshake. - async fn connect( - &self, - ingester_address: &str, - ) -> Result, Error> { - debug!( - %ingester_address, - "Connecting to ingester", - ); - let connection = connection::Builder::new() - .build(ingester_address) - .await - .context(ConnectingSnafu { ingester_address })?; - let mut client = - flight::Client::::new(connection); - - // make contact with the ingester - client - .handshake() - .await - .context(HandshakeSnafu { ingester_address })?; - - Ok(client) + async fn connect(&self, ingester_address: Arc) -> Result { + let cached_connection = { + let mut connections = self.connections.lock(); + if let Some(cached_connection) = connections.get(ingester_address.as_ref()) { + cached_connection.clone() + } else { + // need to make a new one; + let cached_connection = CachedConnection::new(&ingester_address); + connections.insert(ingester_address.to_string(), cached_connection.clone()); + cached_connection + } + }; + cached_connection.connect().await } } @@ -90,11 +91,13 @@ impl FlightClientImpl { impl FlightClient for FlightClientImpl { async fn query( &self, - ingester_addr: &str, + ingester_addr: Arc, request: IngesterQueryRequest, ) -> Result, Error> { - // TODO maybe cache this connection - let mut client = self.connect(ingester_addr).await?; + let connection = self.connect(ingester_addr).await?; + + let mut client = + flight::Client::::new(connection); debug!(?request, "Sending request to ingester"); let request: flight::generated_types::IngesterQueryRequest = @@ -153,3 +156,53 @@ impl QueryData for PerformQuery { self.schema() } } + +#[derive(Debug, Clone)] +struct CachedConnection { + ingester_address: Arc, + /// Real async mutex to + maybe_connection: Arc>>, +} + +impl CachedConnection { + fn new(ingester_address: &Arc) -> Self { + Self { + ingester_address: Arc::clone(ingester_address), + maybe_connection: Arc::new(tokio::sync::Mutex::new(None)), + } + } + + /// Return the underlying connection, creating it if needed + async fn connect(&self) -> Result { + let mut maybe_connection = self.maybe_connection.lock().await; + + let ingester_address = self.ingester_address.as_ref(); + + if let Some(connection) = maybe_connection.as_ref() { + debug!(%ingester_address, "Reusing connection to ingester"); + + Ok(connection.clone()) + } else { + debug!(%ingester_address, "Connecting to ingester"); + + let connection = connection::Builder::new() + .build(ingester_address) + .await + .context(ConnectingSnafu { ingester_address })?; + + // sanity check w/ a handshake + let mut client = flight::Client::::new( + connection.clone(), + ); + + // make contact with the ingester + client + .handshake() + .await + .context(HandshakeSnafu { ingester_address })?; + + *maybe_connection = Some(connection.clone()); + Ok(connection) + } + } +} diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 972d4c3022..06b6481e8e 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -222,8 +222,6 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result) -> Result, _request: IngesterQueryRequest, ) -> Result, FlightClientError> { self.responses .lock() .await - .remove(ingester_address) + .remove(ingester_address.as_ref()) .expect("Response not mocked") .map(|query_data| Box::new(query_data) as _) } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index b6d08a4033..da080ac6ed 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -877,7 +877,7 @@ impl Partitioner for ConstantPartitioner { impl IngesterFlightClient for MockIngester { async fn query( &self, - _ingester_address: &str, + _ingester_address: Arc, request: IngesterQueryRequest, ) -> Result, IngesterFlightClientError> { // NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates