fix: Use one (shared) http2 connection per querier and ingester pair (#4583)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
31f3d988ae
commit
ff8241ea57
|
@ -1,11 +1,12 @@
|
|||
use arrow::{datatypes::Schema, record_batch::RecordBatch};
|
||||
use async_trait::async_trait;
|
||||
use client_util::connection;
|
||||
use client_util::connection::{self, Connection};
|
||||
use generated_types::ingester::IngesterQueryRequest;
|
||||
use influxdb_iox_client::flight::{self, generated_types::IngesterQueryResponseMetadata};
|
||||
use observability_deps::tracing::debug;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
ops::{Deref, DerefMut},
|
||||
sync::Arc,
|
||||
|
@ -45,14 +46,23 @@ pub trait FlightClient: Debug + Send + Sync + 'static {
|
|||
/// Send query to given ingester.
|
||||
async fn query(
|
||||
&self,
|
||||
ingester_address: &str,
|
||||
ingester_address: Arc<str>,
|
||||
request: IngesterQueryRequest,
|
||||
) -> Result<Box<dyn QueryData>, Error>;
|
||||
}
|
||||
|
||||
/// Default [`FlightClient`] implemenetation that uses a real client.
|
||||
/// Default [`FlightClient`] implementation that uses a real connection
|
||||
#[derive(Debug, Default)]
|
||||
pub struct FlightClientImpl {}
|
||||
pub struct FlightClientImpl {
|
||||
/// Cached connections
|
||||
/// key: ingester_address (e.g. "http://ingester-1:8082")
|
||||
/// value: CachedConnection
|
||||
///
|
||||
/// Note: Use sync (parking_log) mutex because it is always held
|
||||
/// for a very short period of time, and any actual connection (and
|
||||
/// waiting) is done in CachedConnection
|
||||
connections: parking_lot::Mutex<HashMap<String, CachedConnection>>,
|
||||
}
|
||||
|
||||
impl FlightClientImpl {
|
||||
/// Create new client.
|
||||
|
@ -61,28 +71,19 @@ impl FlightClientImpl {
|
|||
}
|
||||
|
||||
/// Establish connection to given addr and perform handshake.
|
||||
async fn connect(
|
||||
&self,
|
||||
ingester_address: &str,
|
||||
) -> Result<flight::Client<flight::generated_types::IngesterQueryRequest>, Error> {
|
||||
debug!(
|
||||
%ingester_address,
|
||||
"Connecting to ingester",
|
||||
);
|
||||
let connection = connection::Builder::new()
|
||||
.build(ingester_address)
|
||||
.await
|
||||
.context(ConnectingSnafu { ingester_address })?;
|
||||
let mut client =
|
||||
flight::Client::<flight::generated_types::IngesterQueryRequest>::new(connection);
|
||||
|
||||
// make contact with the ingester
|
||||
client
|
||||
.handshake()
|
||||
.await
|
||||
.context(HandshakeSnafu { ingester_address })?;
|
||||
|
||||
Ok(client)
|
||||
async fn connect(&self, ingester_address: Arc<str>) -> Result<Connection, Error> {
|
||||
let cached_connection = {
|
||||
let mut connections = self.connections.lock();
|
||||
if let Some(cached_connection) = connections.get(ingester_address.as_ref()) {
|
||||
cached_connection.clone()
|
||||
} else {
|
||||
// need to make a new one;
|
||||
let cached_connection = CachedConnection::new(&ingester_address);
|
||||
connections.insert(ingester_address.to_string(), cached_connection.clone());
|
||||
cached_connection
|
||||
}
|
||||
};
|
||||
cached_connection.connect().await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,11 +91,13 @@ impl FlightClientImpl {
|
|||
impl FlightClient for FlightClientImpl {
|
||||
async fn query(
|
||||
&self,
|
||||
ingester_addr: &str,
|
||||
ingester_addr: Arc<str>,
|
||||
request: IngesterQueryRequest,
|
||||
) -> Result<Box<dyn QueryData>, Error> {
|
||||
// TODO maybe cache this connection
|
||||
let mut client = self.connect(ingester_addr).await?;
|
||||
let connection = self.connect(ingester_addr).await?;
|
||||
|
||||
let mut client =
|
||||
flight::Client::<flight::generated_types::IngesterQueryRequest>::new(connection);
|
||||
|
||||
debug!(?request, "Sending request to ingester");
|
||||
let request: flight::generated_types::IngesterQueryRequest =
|
||||
|
@ -153,3 +156,53 @@ impl QueryData for PerformQuery<IngesterQueryResponseMetadata> {
|
|||
self.schema()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct CachedConnection {
|
||||
ingester_address: Arc<str>,
|
||||
/// Real async mutex to
|
||||
maybe_connection: Arc<tokio::sync::Mutex<Option<Connection>>>,
|
||||
}
|
||||
|
||||
impl CachedConnection {
|
||||
fn new(ingester_address: &Arc<str>) -> Self {
|
||||
Self {
|
||||
ingester_address: Arc::clone(ingester_address),
|
||||
maybe_connection: Arc::new(tokio::sync::Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the underlying connection, creating it if needed
|
||||
async fn connect(&self) -> Result<Connection, Error> {
|
||||
let mut maybe_connection = self.maybe_connection.lock().await;
|
||||
|
||||
let ingester_address = self.ingester_address.as_ref();
|
||||
|
||||
if let Some(connection) = maybe_connection.as_ref() {
|
||||
debug!(%ingester_address, "Reusing connection to ingester");
|
||||
|
||||
Ok(connection.clone())
|
||||
} else {
|
||||
debug!(%ingester_address, "Connecting to ingester");
|
||||
|
||||
let connection = connection::Builder::new()
|
||||
.build(ingester_address)
|
||||
.await
|
||||
.context(ConnectingSnafu { ingester_address })?;
|
||||
|
||||
// sanity check w/ a handshake
|
||||
let mut client = flight::Client::<flight::generated_types::IngesterQueryRequest>::new(
|
||||
connection.clone(),
|
||||
);
|
||||
|
||||
// make contact with the ingester
|
||||
client
|
||||
.handshake()
|
||||
.await
|
||||
.context(HandshakeSnafu { ingester_address })?;
|
||||
|
||||
*maybe_connection = Some(connection.clone());
|
||||
Ok(connection)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -222,8 +222,6 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
|
|||
expected_schema,
|
||||
} = request;
|
||||
|
||||
let ingester_address = ingester_address.as_ref();
|
||||
|
||||
let ingester_query_request = IngesterQueryRequest {
|
||||
namespace: namespace_name.to_string(),
|
||||
table: table_name.to_string(),
|
||||
|
@ -232,8 +230,10 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
|
|||
};
|
||||
|
||||
let query_res = flight_client
|
||||
.query(ingester_address, ingester_query_request)
|
||||
.query(Arc::clone(&ingester_address), ingester_query_request)
|
||||
.await;
|
||||
|
||||
let ingester_address = ingester_address.as_ref();
|
||||
if let Err(FlightClientError::Flight {
|
||||
source: FlightError::GrpcError(status),
|
||||
}) = &query_res
|
||||
|
@ -1206,13 +1206,13 @@ mod tests {
|
|||
impl FlightClient for MockFlightClient {
|
||||
async fn query(
|
||||
&self,
|
||||
ingester_address: &str,
|
||||
ingester_address: Arc<str>,
|
||||
_request: IngesterQueryRequest,
|
||||
) -> Result<Box<dyn QueryData>, FlightClientError> {
|
||||
self.responses
|
||||
.lock()
|
||||
.await
|
||||
.remove(ingester_address)
|
||||
.remove(ingester_address.as_ref())
|
||||
.expect("Response not mocked")
|
||||
.map(|query_data| Box::new(query_data) as _)
|
||||
}
|
||||
|
|
|
@ -877,7 +877,7 @@ impl Partitioner for ConstantPartitioner {
|
|||
impl IngesterFlightClient for MockIngester {
|
||||
async fn query(
|
||||
&self,
|
||||
_ingester_address: &str,
|
||||
_ingester_address: Arc<str>,
|
||||
request: IngesterQueryRequest,
|
||||
) -> Result<Box<dyn IngesterFlightClientQueryData>, IngesterFlightClientError> {
|
||||
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates
|
||||
|
|
Loading…
Reference in New Issue