feat: Querier can get write status from ingesters

Connects to influxdata/influxdb-iox-client-go#27.
pull/24376/head
Carol (Nichols || Goulding) 2022-05-11 14:00:52 -04:00
parent 77205d9a8e
commit 48b84b3bdf
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
10 changed files with 151 additions and 37 deletions

View File

@ -29,7 +29,7 @@ async fn smoke() {
// wait for data to be persisted to parquet
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let write_token = get_write_token(&response);
wait_for_persisted(write_token, all_in_one.ingester_grpc_connection()).await;
wait_for_persisted(write_token, all_in_one.querier_grpc_connection()).await;
// run query
let sql = format!("select * from {}", table_name);

View File

@ -120,8 +120,8 @@ async fn basic_multi_ingesters() {
StepTest::new(&mut cluster, test_steps).run().await
}
/// Return a combined readable response from the ingesters that is
/// readable for all partitions. Retries forever
/// Use the WriteInfo API on the querier that will combine write info from all the ingesters it
/// knows about to get the status of data
async fn get_multi_ingester_readable_combined_response(
state: &mut StepTestState<'_>,
) -> GetWriteInfoResponse {

View File

@ -78,15 +78,19 @@ impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServer
let builder = setup_builder!(builder_input, self);
add_service!(
builder,
rpc::query::make_flight_server(Arc::clone(&self.database),)
rpc::query::make_flight_server(Arc::clone(&self.database))
);
add_service!(
builder,
rpc::query::make_storage_server(Arc::clone(&self.database),)
rpc::query::make_storage_server(Arc::clone(&self.database))
);
add_service!(
builder,
rpc::namespace::namespace_service(Arc::clone(&self.database),)
rpc::namespace::namespace_service(Arc::clone(&self.database))
);
add_service!(
builder,
rpc::write_info::write_info_service(Arc::clone(&self.database))
);
add_service!(builder, self.server.handler().schema_service());
serve_builder!(builder);

View File

@ -1,2 +1,3 @@
pub(crate) mod namespace;
pub(crate) mod query;
pub(crate) mod write_info;

View File

@ -0,0 +1,44 @@
//! WriteInfoService gRPC implementation
use generated_types::influxdata::iox::ingester::v1::{
self as proto,
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
};
use querier::QuerierDatabase;
use std::sync::Arc;
/// Acquire a [`WriteInfoService`] gRPC service implementation.
pub fn write_info_service(
server: Arc<QuerierDatabase>,
) -> WriteInfoServiceServer<impl WriteInfoService> {
WriteInfoServiceServer::new(QuerierWriteInfoServiceImpl::new(server))
}
#[derive(Debug)]
struct QuerierWriteInfoServiceImpl {
server: Arc<QuerierDatabase>,
}
impl QuerierWriteInfoServiceImpl {
pub fn new(server: Arc<QuerierDatabase>) -> Self {
Self { server }
}
}
#[tonic::async_trait]
impl WriteInfoService for QuerierWriteInfoServiceImpl {
async fn get_write_info(
&self,
request: tonic::Request<proto::GetWriteInfoRequest>,
) -> Result<tonic::Response<proto::GetWriteInfoResponse>, tonic::Status> {
let proto::GetWriteInfoRequest { write_token } = request.into_inner();
let progresses = self
.server
.get_write_info(&write_token)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(tonic::Response::new(progresses))
}
}

View File

@ -1,12 +1,16 @@
//! Database for the querier that contains all namespaces.
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection,
namespace::QuerierNamespace, query_log::QueryLog,
cache::CatalogCache,
chunk::ParquetChunkAdapter,
ingester::{Error, IngesterConnection},
namespace::QuerierNamespace,
query_log::QueryLog,
};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::Namespace;
use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse;
use object_store::DynObjectStore;
use parking_lot::RwLock;
use query::exec::Executor;
@ -47,7 +51,7 @@ pub struct QuerierDatabase {
/// Executor for queries.
exec: Arc<Executor>,
/// Connection to ingester
/// Connection to ingester(s)
ingester_connection: Arc<dyn IngesterConnection>,
/// Query log.
@ -123,6 +127,11 @@ impl QuerierDatabase {
.expect("retry forever")
}
/// Get write info for the given token
pub async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse, Error> {
self.ingester_connection.get_write_info(write_token).await
}
/// Executor
pub(crate) fn exec(&self) -> &Executor {
&self.exec

View File

@ -5,13 +5,17 @@ use self::{
use crate::cache::CatalogCache;
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use async_trait::async_trait;
use client_util::connection;
use data_types::{
ChunkAddr, ChunkId, ChunkOrder, ColumnSummary, InfluxDbType, PartitionId, SequenceNumber,
SequencerId, StatValues, Statistics, TableSummary, TimestampMinMax,
};
use datafusion_util::MemoryStream;
use futures::{stream::FuturesUnordered, TryStreamExt};
use generated_types::ingester::IngesterQueryRequest;
use generated_types::{
influxdata::iox::ingester::v1::GetWriteInfoResponse, ingester::IngesterQueryRequest,
write_info::merge_responses,
};
use observability_deps::tracing::{debug, trace};
use predicate::{Predicate, PredicateMatch};
use query::{
@ -91,6 +95,24 @@ pub enum Error {
partition_id: i64,
ingester_address: String,
},
#[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))]
Connecting {
ingester_address: String,
source: connection::Error,
},
#[snafu(display(
"Error retrieving write info from '{}' for write token '{}': {}",
ingester_address,
write_token,
source,
))]
WriteInfo {
ingester_address: String,
write_token: String,
source: influxdb_iox_client::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -126,6 +148,10 @@ pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
expected_schema: Arc<Schema>,
) -> Result<Vec<Arc<IngesterPartition>>>;
/// Returns the most recent partition sstatus info across all ingester(s) for the specified
/// write token.
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse>;
/// Return backend as [`Any`] which can be used to downcast to a specifc implementation.
fn as_any(&self) -> &dyn Any;
}
@ -337,11 +363,41 @@ impl IngesterConnection for IngesterConnectionImpl {
Ok(ingester_partitions)
}
async fn get_write_info(&self, write_token: &str) -> Result<GetWriteInfoResponse> {
let responses = self
.ingester_addresses
.iter()
.map(|ingester_address| execute_get_write_infos(ingester_address, write_token))
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
Ok(merge_responses(responses))
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
async fn execute_get_write_infos(
ingester_address: &str,
write_token: &str,
) -> Result<GetWriteInfoResponse, Error> {
let connection = connection::Builder::new()
.build(ingester_address)
.await
.context(ConnectingSnafu { ingester_address })?;
influxdb_iox_client::write_info::Client::new(connection)
.get_write_info(write_token)
.await
.context(WriteInfoSnafu {
ingester_address,
write_token,
})
}
/// A wrapper around the unpersisted data in a partition returned by
/// the ingester that (will) implement the `QueryChunk` interface
///

View File

@ -1,9 +1,8 @@
use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use parking_lot::Mutex;
use super::IngesterConnection;
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse;
use parking_lot::Mutex;
use std::{any::Any, sync::Arc};
/// IngesterConnection for testing
#[derive(Debug, Default)]
@ -40,6 +39,10 @@ impl IngesterConnection for MockIngesterConnection {
.unwrap_or_else(|| Ok(vec![]))
}
async fn get_write_info(&self, _write_token: &str) -> super::Result<GetWriteInfoResponse> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}

View File

@ -83,12 +83,12 @@ pub fn get_write_token_from_grpc(response: &tonic::Response<WriteResponse>) -> S
.to_string()
}
/// returns the write info from ingester_connection for this token
/// returns the write info from the connection (to either an ingester or a querier) for this token
pub async fn token_info(
write_token: impl AsRef<str>,
ingester_connection: Connection,
connection: Connection,
) -> Result<GetWriteInfoResponse, influxdb_iox_client::error::Error> {
influxdb_iox_client::write_info::Client::new(ingester_connection)
influxdb_iox_client::write_info::Client::new(connection)
.get_write_info(write_token.as_ref())
.await
}
@ -138,11 +138,8 @@ pub async fn token_is_persisted(
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
/// Waits for the specified predicate to return true
pub async fn wait_for_token<F>(
write_token: impl Into<String>,
ingester_connection: Connection,
f: F,
) where
pub async fn wait_for_token<F>(write_token: impl Into<String>, connection: Connection, f: F)
where
F: Fn(&GetWriteInfoResponse) -> bool,
{
let write_token = write_token.into();
@ -151,7 +148,7 @@ pub async fn wait_for_token<F>(
info!(" write token: {}", write_token);
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection);
let mut write_info_client = influxdb_iox_client::write_info::Client::new(connection);
tokio::time::timeout(retry_duration, async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
@ -175,10 +172,10 @@ pub async fn wait_for_token<F>(
}
/// Waits for the specified write token to be readable
pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connection: Connection) {
pub async fn wait_for_readable(write_token: impl Into<String>, connection: Connection) {
info!("Waiting for write token to be readable");
wait_for_token(write_token, ingester_connection, |res| {
wait_for_token(write_token, connection, |res| {
if all_readable(res) {
info!("Write is readable: {:?}", res);
true
@ -190,10 +187,10 @@ pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connecti
}
/// Waits for the write token to be persisted
pub async fn wait_for_persisted(write_token: impl Into<String>, ingester_connection: Connection) {
pub async fn wait_for_persisted(write_token: impl Into<String>, connection: Connection) {
info!("Waiting for write token to be persisted");
wait_for_token(write_token, ingester_connection, |res| {
wait_for_token(write_token, connection, |res| {
if all_persisted(res) {
info!("Write is persisted: {:?}", res);
true

View File

@ -154,29 +154,29 @@ impl<'a> StepTest<'a> {
}
Step::WaitForReadable => {
info!("====Begin waiting for all write tokens to be readable");
let ingester_grpc_connection =
state.cluster().ingester().ingester_grpc_connection();
let querier_grpc_connection =
state.cluster().querier().querier_grpc_connection();
for write_token in &state.write_tokens {
wait_for_readable(write_token, ingester_grpc_connection.clone()).await;
wait_for_readable(write_token, querier_grpc_connection.clone()).await;
}
info!("====Done waiting for all write tokens to be readable");
}
Step::WaitForPersisted => {
info!("====Begin waiting for all write tokens to be persisted");
let ingester_grpc_connection =
state.cluster().ingester().ingester_grpc_connection();
let querier_grpc_connection =
state.cluster().querier().querier_grpc_connection();
for write_token in &state.write_tokens {
wait_for_persisted(write_token, ingester_grpc_connection.clone()).await;
wait_for_persisted(write_token, querier_grpc_connection.clone()).await;
}
info!("====Done waiting for all write tokens to be persisted");
}
Step::AssertNotPersisted => {
info!("====Begin checking all tokens not persisted");
let ingester_grpc_connection =
state.cluster().ingester().ingester_grpc_connection();
let querier_grpc_connection =
state.cluster().querier().querier_grpc_connection();
for write_token in &state.write_tokens {
let persisted =
token_is_persisted(write_token, ingester_grpc_connection.clone()).await;
token_is_persisted(write_token, querier_grpc_connection.clone()).await;
assert!(!persisted);
}
info!("====Done checking all tokens not persisted");