diff --git a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs index 13271c95bf..852f3a48ef 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs @@ -29,7 +29,7 @@ async fn smoke() { // wait for data to be persisted to parquet assert_eq!(response.status(), StatusCode::NO_CONTENT); let write_token = get_write_token(&response); - wait_for_persisted(write_token, all_in_one.ingester_grpc_connection()).await; + wait_for_persisted(write_token, all_in_one.querier_grpc_connection()).await; // run query let sql = format!("select * from {}", table_name); diff --git a/influxdb_iox/tests/end_to_end_ng_cases/querier/multi_ingester.rs b/influxdb_iox/tests/end_to_end_ng_cases/querier/multi_ingester.rs index 09e95fe821..a0ad3d04af 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/querier/multi_ingester.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/querier/multi_ingester.rs @@ -120,8 +120,8 @@ async fn basic_multi_ingesters() { StepTest::new(&mut cluster, test_steps).run().await } -/// Return a combined readable response from the ingesters that is -/// readable for all partitions. Retries forever +/// Use the WriteInfo API on the querier that will combine write info from all the ingesters it +/// knows about to get the status of data async fn get_multi_ingester_readable_combined_response( state: &mut StepTestState<'_>, ) -> GetWriteInfoResponse { diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index b37f3155de..7f24d8aaa7 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -78,15 +78,19 @@ impl ServerType for QuerierServer let builder = setup_builder!(builder_input, self); add_service!( builder, - rpc::query::make_flight_server(Arc::clone(&self.database),) + rpc::query::make_flight_server(Arc::clone(&self.database)) ); add_service!( builder, - rpc::query::make_storage_server(Arc::clone(&self.database),) + rpc::query::make_storage_server(Arc::clone(&self.database)) ); add_service!( builder, - rpc::namespace::namespace_service(Arc::clone(&self.database),) + rpc::namespace::namespace_service(Arc::clone(&self.database)) + ); + add_service!( + builder, + rpc::write_info::write_info_service(Arc::clone(&self.database)) ); add_service!(builder, self.server.handler().schema_service()); serve_builder!(builder); diff --git a/ioxd_querier/src/rpc/mod.rs b/ioxd_querier/src/rpc/mod.rs index 1bb1c407f6..f3e28368dc 100644 --- a/ioxd_querier/src/rpc/mod.rs +++ b/ioxd_querier/src/rpc/mod.rs @@ -1,2 +1,3 @@ pub(crate) mod namespace; pub(crate) mod query; +pub(crate) mod write_info; diff --git a/ioxd_querier/src/rpc/write_info.rs b/ioxd_querier/src/rpc/write_info.rs new file mode 100644 index 0000000000..324fc61b3d --- /dev/null +++ b/ioxd_querier/src/rpc/write_info.rs @@ -0,0 +1,44 @@ +//! WriteInfoService gRPC implementation + +use generated_types::influxdata::iox::ingester::v1::{ + self as proto, + write_info_service_server::{WriteInfoService, WriteInfoServiceServer}, +}; +use querier::QuerierDatabase; +use std::sync::Arc; + +/// Acquire a [`WriteInfoService`] gRPC service implementation. +pub fn write_info_service( + server: Arc, +) -> WriteInfoServiceServer { + WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server)) +} + +#[derive(Debug)] +struct QuerierWriteInfoServiceImpl { + server: Arc, +} + +impl QuerierWriteInfoServiceImpl { + pub fn new(server: Arc) -> Self { + Self { server } + } +} + +#[tonic::async_trait] +impl WriteInfoService for QuerierWriteInfoServiceImpl { + async fn get_write_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let proto::GetWriteInfoRequest { write_token } = request.into_inner(); + + let progresses = self + .server + .get_write_info(&write_token) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(tonic::Response::new(progresses)) + } +} diff --git a/querier/src/database.rs b/querier/src/database.rs index 8b41bf9e86..95dfd670d8 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -1,12 +1,16 @@ //! Database for the querier that contains all namespaces. use crate::{ - cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection, - namespace::QuerierNamespace, query_log::QueryLog, + cache::CatalogCache, + chunk::ParquetChunkAdapter, + ingester::{Error, IngesterConnection}, + namespace::QuerierNamespace, + query_log::QueryLog, }; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; use data_types::Namespace; +use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse; use object_store::DynObjectStore; use parking_lot::RwLock; use query::exec::Executor; @@ -47,7 +51,7 @@ pub struct QuerierDatabase { /// Executor for queries. exec: Arc, - /// Connection to ingester + /// Connection to ingester(s) ingester_connection: Arc, /// Query log. @@ -123,6 +127,11 @@ impl QuerierDatabase { .expect("retry forever") } + /// Get write info for the given token + pub async fn get_write_info(&self, write_token: &str) -> Result { + self.ingester_connection.get_write_info(write_token).await + } + /// Executor pub(crate) fn exec(&self) -> &Executor { &self.exec diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 70bb71b4ae..972d4c3022 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -5,13 +5,17 @@ use self::{ use crate::cache::CatalogCache; use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use async_trait::async_trait; +use client_util::connection; use data_types::{ ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber, SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax, }; use datafusion_util::MemoryStream; use futures::{stream::FuturesUnordered, TryStreamExt}; -use generated_types::ingester::IngesterQueryRequest; +use generated_types::{ + influxdata::iox::ingester::v1::GetWriteInfoResponse, ingester::IngesterQueryRequest, + write_info::merge_responses, +}; use observability_deps::tracing::{debug, trace}; use predicate::{Predicate, PredicateMatch}; use query::{ @@ -91,6 +95,24 @@ pub enum Error { partition_id: i64, ingester_address: String, }, + + #[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))] + Connecting { + ingester_address: String, + source: connection::Error, + }, + + #[snafu(display( + "Error retrieving write info from '{}' for write token '{}': {}", + ingester_address, + write_token, + source, + ))] + WriteInfo { + ingester_address: String, + write_token: String, + source: influxdb_iox_client::error::Error, + }, } pub type Result = std::result::Result; @@ -126,6 +148,10 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { expected_schema: Arc, ) -> Result>>; + /// Returns the most recent partition sstatus info across all ingester(s) for the specified + /// write token. + async fn get_write_info(&self, write_token: &str) -> Result; + /// Return backend as [`Any`] which can be used to downcast to a specifc implementation. fn as_any(&self) -> &dyn Any; } @@ -337,11 +363,41 @@ impl IngesterConnection for IngesterConnectionImpl { Ok(ingester_partitions) } + async fn get_write_info(&self, write_token: &str) -> Result { + let responses = self + .ingester_addresses + .iter() + .map(|ingester_address| execute_get_write_infos(ingester_address, write_token)) + .collect::>() + .try_collect::>() + .await?; + + Ok(merge_responses(responses)) + } + fn as_any(&self) -> &dyn Any { self as &dyn Any } } +async fn execute_get_write_infos( + ingester_address: &str, + write_token: &str, +) -> Result { + let connection = connection::Builder::new() + .build(ingester_address) + .await + .context(ConnectingSnafu { ingester_address })?; + + influxdb_iox_client::write_info::Client::new(connection) + .get_write_info(write_token) + .await + .context(WriteInfoSnafu { + ingester_address, + write_token, + }) +} + /// A wrapper around the unpersisted data in a partition returned by /// the ingester that (will) implement the `QueryChunk` interface /// diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index e09f890505..5bbb76013f 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -1,9 +1,8 @@ -use std::{any::Any, sync::Arc}; - -use async_trait::async_trait; -use parking_lot::Mutex; - use super::IngesterConnection; +use async_trait::async_trait; +use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse; +use parking_lot::Mutex; +use std::{any::Any, sync::Arc}; /// IngesterConnection for testing #[derive(Debug, Default)] @@ -40,6 +39,10 @@ impl IngesterConnection for MockIngesterConnection { .unwrap_or_else(|| Ok(vec![])) } + async fn get_write_info(&self, _write_token: &str) -> super::Result { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { self as &dyn Any } diff --git a/test_helpers_end_to_end_ng/src/client.rs b/test_helpers_end_to_end_ng/src/client.rs index 8a23f3ad8a..eae0c875fb 100644 --- a/test_helpers_end_to_end_ng/src/client.rs +++ b/test_helpers_end_to_end_ng/src/client.rs @@ -83,12 +83,12 @@ pub fn get_write_token_from_grpc(response: &tonic::Response) -> S .to_string() } -/// returns the write info from ingester_connection for this token +/// returns the write info from the connection (to either an ingester or a querier) for this token pub async fn token_info( write_token: impl AsRef, - ingester_connection: Connection, + connection: Connection, ) -> Result { - influxdb_iox_client::write_info::Client::new(ingester_connection) + influxdb_iox_client::write_info::Client::new(connection) .get_write_info(write_token.as_ref()) .await } @@ -138,11 +138,8 @@ pub async fn token_is_persisted( const MAX_QUERY_RETRY_TIME_SEC: u64 = 20; /// Waits for the specified predicate to return true -pub async fn wait_for_token( - write_token: impl Into, - ingester_connection: Connection, - f: F, -) where +pub async fn wait_for_token(write_token: impl Into, connection: Connection, f: F) +where F: Fn(&GetWriteInfoResponse) -> bool, { let write_token = write_token.into(); @@ -151,7 +148,7 @@ pub async fn wait_for_token( info!(" write token: {}", write_token); let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC); - let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection); + let mut write_info_client = influxdb_iox_client::write_info::Client::new(connection); tokio::time::timeout(retry_duration, async move { let mut interval = tokio::time::interval(Duration::from_millis(500)); loop { @@ -175,10 +172,10 @@ pub async fn wait_for_token( } /// Waits for the specified write token to be readable -pub async fn wait_for_readable(write_token: impl Into, ingester_connection: Connection) { +pub async fn wait_for_readable(write_token: impl Into, connection: Connection) { info!("Waiting for write token to be readable"); - wait_for_token(write_token, ingester_connection, |res| { + wait_for_token(write_token, connection, |res| { if all_readable(res) { info!("Write is readable: {:?}", res); true @@ -190,10 +187,10 @@ pub async fn wait_for_readable(write_token: impl Into, ingester_connecti } /// Waits for the write token to be persisted -pub async fn wait_for_persisted(write_token: impl Into, ingester_connection: Connection) { +pub async fn wait_for_persisted(write_token: impl Into, connection: Connection) { info!("Waiting for write token to be persisted"); - wait_for_token(write_token, ingester_connection, |res| { + wait_for_token(write_token, connection, |res| { if all_persisted(res) { info!("Write is persisted: {:?}", res); true diff --git a/test_helpers_end_to_end_ng/src/steps.rs b/test_helpers_end_to_end_ng/src/steps.rs index 08fffbbfe9..2d71a87801 100644 --- a/test_helpers_end_to_end_ng/src/steps.rs +++ b/test_helpers_end_to_end_ng/src/steps.rs @@ -154,29 +154,29 @@ impl<'a> StepTest<'a> { } Step::WaitForReadable => { info!("====Begin waiting for all write tokens to be readable"); - let ingester_grpc_connection = - state.cluster().ingester().ingester_grpc_connection(); + let querier_grpc_connection = + state.cluster().querier().querier_grpc_connection(); for write_token in &state.write_tokens { - wait_for_readable(write_token, ingester_grpc_connection.clone()).await; + wait_for_readable(write_token, querier_grpc_connection.clone()).await; } info!("====Done waiting for all write tokens to be readable"); } Step::WaitForPersisted => { info!("====Begin waiting for all write tokens to be persisted"); - let ingester_grpc_connection = - state.cluster().ingester().ingester_grpc_connection(); + let querier_grpc_connection = + state.cluster().querier().querier_grpc_connection(); for write_token in &state.write_tokens { - wait_for_persisted(write_token, ingester_grpc_connection.clone()).await; + wait_for_persisted(write_token, querier_grpc_connection.clone()).await; } info!("====Done waiting for all write tokens to be persisted"); } Step::AssertNotPersisted => { info!("====Begin checking all tokens not persisted"); - let ingester_grpc_connection = - state.cluster().ingester().ingester_grpc_connection(); + let querier_grpc_connection = + state.cluster().querier().querier_grpc_connection(); for write_token in &state.write_tokens { let persisted = - token_is_persisted(write_token, ingester_grpc_connection.clone()).await; + token_is_persisted(write_token, querier_grpc_connection.clone()).await; assert!(!persisted); } info!("====Done checking all tokens not persisted");