feat: Add get_write_info service (#4227)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
a3ee11a8e8
commit
a30a85e62c
|
@ -50,6 +50,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
|
||||||
deployment_path.join("service.proto"),
|
deployment_path.join("service.proto"),
|
||||||
ingester_path.join("parquet_metadata.proto"),
|
ingester_path.join("parquet_metadata.proto"),
|
||||||
ingester_path.join("query.proto"),
|
ingester_path.join("query.proto"),
|
||||||
|
ingester_path.join("write_info.proto"),
|
||||||
management_path.join("chunk.proto"),
|
management_path.join("chunk.proto"),
|
||||||
management_path.join("database_rules.proto"),
|
management_path.join("database_rules.proto"),
|
||||||
management_path.join("jobs.proto"),
|
management_path.join("jobs.proto"),
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
package influxdata.iox.ingester.v1;
|
||||||
|
|
||||||
|
service WriteInfoService {
|
||||||
|
// Get information about a particular write
|
||||||
|
rpc GetWriteInfo(GetWriteInfoRequest) returns (GetWriteInfoResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetWriteInfoRequest {
|
||||||
|
// The write token returned from a write
|
||||||
|
string write_token = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetWriteInfoResponse {
|
||||||
|
// Is the data in this write entirely readable (will be included in
|
||||||
|
// a query response)?
|
||||||
|
bool readable = 1;
|
||||||
|
|
||||||
|
// Is the data in this write completely persisted to parquet files?
|
||||||
|
bool persisted = 2;
|
||||||
|
}
|
|
@ -34,6 +34,7 @@ async fn smoke() {
|
||||||
sql,
|
sql,
|
||||||
namespace,
|
namespace,
|
||||||
write_token,
|
write_token,
|
||||||
|
all_in_one.server().ingester_grpc_connection(),
|
||||||
all_in_one.server().querier_grpc_connection(),
|
all_in_one.server().querier_grpc_connection(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use test_helpers_end_to_end_ng::{maybe_skip_integration, MiniCluster, TestConfig};
|
use test_helpers_end_to_end_ng::{
|
||||||
|
get_write_token, maybe_skip_integration, wait_for_readable, MiniCluster, TestConfig,
|
||||||
|
};
|
||||||
|
|
||||||
use arrow_util::assert_batches_sorted_eq;
|
use arrow_util::assert_batches_sorted_eq;
|
||||||
use data_types2::{IngesterQueryRequest, SequencerId};
|
use data_types2::{IngesterQueryRequest, SequencerId};
|
||||||
|
@ -14,16 +16,21 @@ async fn ingester_flight_api() {
|
||||||
let router2_config = TestConfig::new_router2(&database_url);
|
let router2_config = TestConfig::new_router2(&database_url);
|
||||||
let ingester_config = TestConfig::new_ingester(&router2_config);
|
let ingester_config = TestConfig::new_ingester(&router2_config);
|
||||||
|
|
||||||
// Set up router2 ====================================
|
// Set up cluster
|
||||||
let cluster = MiniCluster::new().with_router2(router2_config).await;
|
let cluster = MiniCluster::new()
|
||||||
|
.with_router2(router2_config)
|
||||||
|
.await
|
||||||
|
.with_ingester(ingester_config)
|
||||||
|
.await;
|
||||||
|
|
||||||
// Write some data into the v2 HTTP API ==============
|
// Write some data into the v2 HTTP API ==============
|
||||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||||
let response = cluster.write_to_router(lp).await;
|
let response = cluster.write_to_router(lp).await;
|
||||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||||
|
|
||||||
// Set up ingester ===================================
|
// wait for the write to become visible
|
||||||
let cluster = cluster.with_ingester(ingester_config).await;
|
let write_token = get_write_token(&response);
|
||||||
|
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||||
|
|
||||||
let mut querier_flight =
|
let mut querier_flight =
|
||||||
querier::QuerierFlightClient::new(cluster.ingester().ingester_grpc_connection());
|
querier::QuerierFlightClient::new(cluster.ingester().ingester_grpc_connection());
|
||||||
|
|
|
@ -38,6 +38,7 @@ async fn basic_on_parquet() {
|
||||||
sql,
|
sql,
|
||||||
cluster.namespace(),
|
cluster.namespace(),
|
||||||
write_token,
|
write_token,
|
||||||
|
cluster.ingester().ingester_grpc_connection(),
|
||||||
cluster.querier().querier_grpc_connection(),
|
cluster.querier().querier_grpc_connection(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
|
@ -34,3 +34,6 @@ pub mod operations;
|
||||||
#[cfg(feature = "flight")]
|
#[cfg(feature = "flight")]
|
||||||
/// Client for query API (based on Arrow flight)
|
/// Client for query API (based on Arrow flight)
|
||||||
pub mod flight;
|
pub mod flight;
|
||||||
|
|
||||||
|
/// Client for fetching write info
|
||||||
|
pub mod write_info;
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
use self::generated_types::{write_info_service_client::WriteInfoServiceClient, *};
|
||||||
|
|
||||||
|
use crate::connection::Connection;
|
||||||
|
use crate::error::Error;
|
||||||
|
|
||||||
|
/// Re-export generated_types
|
||||||
|
pub mod generated_types {
|
||||||
|
pub use generated_types::influxdata::iox::ingester::v1::{
|
||||||
|
write_info_service_client, write_info_service_server, GetWriteInfoRequest,
|
||||||
|
GetWriteInfoResponse,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A basic client for fetching information about write tokens
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Client {
|
||||||
|
inner: WriteInfoServiceClient<Connection>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
/// Creates a new client with the provided connection
|
||||||
|
pub fn new(channel: Connection) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: WriteInfoServiceClient::new(channel),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the write information for a write token
|
||||||
|
pub async fn get_write_info(
|
||||||
|
&mut self,
|
||||||
|
write_token: &str,
|
||||||
|
) -> Result<GetWriteInfoResponse, Error> {
|
||||||
|
let response = self
|
||||||
|
.inner
|
||||||
|
.get_write_info(GetWriteInfoRequest {
|
||||||
|
write_token: write_token.to_string(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(response.into_inner())
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,7 +13,11 @@ use arrow_flight::{
|
||||||
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
||||||
};
|
};
|
||||||
use futures::{SinkExt, Stream, StreamExt};
|
use futures::{SinkExt, Stream, StreamExt};
|
||||||
use generated_types::influxdata::iox::ingester::v1 as proto;
|
use generated_types::influxdata::iox::ingester::v1::{
|
||||||
|
self as proto,
|
||||||
|
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
|
||||||
|
GetWriteInfoRequest, GetWriteInfoResponse,
|
||||||
|
};
|
||||||
use observability_deps::tracing::{info, warn};
|
use observability_deps::tracing::{info, warn};
|
||||||
use pin_project::{pin_project, pinned_drop};
|
use pin_project::{pin_project, pinned_drop};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
|
@ -22,6 +26,7 @@ use std::{pin::Pin, sync::Arc, task::Poll};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tonic::{Request, Response, Streaming};
|
use tonic::{Request, Response, Streaming};
|
||||||
use trace::ctx::SpanContext;
|
use trace::ctx::SpanContext;
|
||||||
|
use write_summary::WriteSummary;
|
||||||
|
|
||||||
/// This type is responsible for managing all gRPC services exposed by
|
/// This type is responsible for managing all gRPC services exposed by
|
||||||
/// `ingester`.
|
/// `ingester`.
|
||||||
|
@ -43,6 +48,69 @@ impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
|
||||||
ingest_handler: Arc::clone(&self.ingest_handler),
|
ingest_handler: Arc::clone(&self.ingest_handler),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Acquire an WriteInfo gRPC service implementation.
|
||||||
|
pub fn write_info_service(&self) -> WriteInfoServiceServer<impl WriteInfoService> {
|
||||||
|
WriteInfoServiceServer::new(WriteInfoServiceImpl::new(
|
||||||
|
Arc::clone(&self.ingest_handler) as _
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Implementation of write info
|
||||||
|
struct WriteInfoServiceImpl {
|
||||||
|
handler: Arc<dyn IngestHandler + Send + Sync + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriteInfoServiceImpl {
|
||||||
|
pub fn new(handler: Arc<dyn IngestHandler + Send + Sync + 'static>) -> Self {
|
||||||
|
Self { handler }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_summary_error_to_status(e: write_summary::Error) -> tonic::Status {
|
||||||
|
use write_summary::Error;
|
||||||
|
|
||||||
|
match e {
|
||||||
|
// treat "unknown partition error" as a failed precondition
|
||||||
|
// (so the client can distinguish between "write isn't
|
||||||
|
// readable" from "we can't tell if write is readable"
|
||||||
|
e @ Error::UnknownKafkaPartition { .. } => {
|
||||||
|
tonic::Status::failed_precondition(format!("Can not determine status of write: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl WriteInfoService for WriteInfoServiceImpl {
|
||||||
|
async fn get_write_info(
|
||||||
|
&self,
|
||||||
|
request: Request<GetWriteInfoRequest>,
|
||||||
|
) -> Result<Response<GetWriteInfoResponse>, tonic::Status> {
|
||||||
|
let GetWriteInfoRequest { write_token } = request.into_inner();
|
||||||
|
|
||||||
|
let write_summary =
|
||||||
|
WriteSummary::try_from_token(&write_token).map_err(tonic::Status::invalid_argument)?;
|
||||||
|
|
||||||
|
let progress = self
|
||||||
|
.handler
|
||||||
|
.progresses(write_summary.kafka_partitions())
|
||||||
|
.await
|
||||||
|
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
|
||||||
|
|
||||||
|
let readable = write_summary
|
||||||
|
.readable(&progress)
|
||||||
|
.map_err(write_summary_error_to_status)?;
|
||||||
|
|
||||||
|
let persisted = write_summary
|
||||||
|
.persisted(&progress)
|
||||||
|
.map_err(write_summary_error_to_status)?;
|
||||||
|
|
||||||
|
Ok(tonic::Response::new(GetWriteInfoResponse {
|
||||||
|
readable,
|
||||||
|
persisted,
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
|
|
|
@ -91,6 +91,7 @@ impl<I: IngestHandler + Sync + Send + Debug + 'static> ServerType for IngesterSe
|
||||||
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
|
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
|
||||||
let builder = setup_builder!(builder_input, self);
|
let builder = setup_builder!(builder_input, self);
|
||||||
add_service!(builder, self.server.grpc().flight_service());
|
add_service!(builder, self.server.grpc().flight_service());
|
||||||
|
add_service!(builder, self.server.grpc().write_info_service());
|
||||||
serve_builder!(builder);
|
serve_builder!(builder);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -6,6 +6,7 @@ use http::Response;
|
||||||
use hyper::{Body, Client, Request};
|
use hyper::{Body, Client, Request};
|
||||||
|
|
||||||
use influxdb_iox_client::connection::Connection;
|
use influxdb_iox_client::connection::Connection;
|
||||||
|
use influxdb_iox_client::write_info::generated_types::GetWriteInfoResponse;
|
||||||
|
|
||||||
/// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router)
|
/// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router)
|
||||||
pub async fn write_to_router(
|
pub async fn write_to_router(
|
||||||
|
@ -48,64 +49,102 @@ pub fn get_write_token(response: &Response<Body>) -> String {
|
||||||
|
|
||||||
const MAX_QUERY_RETRY_TIME_SEC: u64 = 10;
|
const MAX_QUERY_RETRY_TIME_SEC: u64 = 10;
|
||||||
|
|
||||||
|
/// Waits for the specified predicate to return true
|
||||||
|
pub async fn wait_for_token<F>(
|
||||||
|
write_token: impl Into<String>,
|
||||||
|
ingester_connection: Connection,
|
||||||
|
f: F,
|
||||||
|
) where
|
||||||
|
F: Fn(&GetWriteInfoResponse) -> bool,
|
||||||
|
{
|
||||||
|
let write_token = write_token.into();
|
||||||
|
|
||||||
|
println!("Waiting for Write Token {}", write_token);
|
||||||
|
|
||||||
|
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
|
||||||
|
let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection);
|
||||||
|
tokio::time::timeout(retry_duration, async move {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_millis(500));
|
||||||
|
loop {
|
||||||
|
match write_info_client.get_write_info(&write_token).await {
|
||||||
|
Ok(res) => {
|
||||||
|
if f(&res) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
println!("Retrying; predicate not satistified: {:?}", res);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(e) => {
|
||||||
|
println!("Retrying; Got error getting write_info: {}", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
interval.tick().await;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("did not get passing predicate on token");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits for the specified write token to be readable
|
||||||
|
pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connection: Connection) {
|
||||||
|
println!("Waiting for Write Token to be readable");
|
||||||
|
|
||||||
|
wait_for_token(write_token, ingester_connection, |res| {
|
||||||
|
if res.readable {
|
||||||
|
println!("Write is readable: {:?}", res);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits for the write token to be persisted
|
||||||
|
pub async fn wait_for_persisted(write_token: impl Into<String>, ingester_connection: Connection) {
|
||||||
|
println!("Waiting for Write Token to be persisted");
|
||||||
|
|
||||||
|
wait_for_token(write_token, ingester_connection, |res| {
|
||||||
|
if res.persisted {
|
||||||
|
println!("Write is persisted: {:?}", res);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
/// Runs a query using the flight API on the specified connection
|
/// Runs a query using the flight API on the specified connection
|
||||||
/// until responses are produced.
|
/// until responses are produced.
|
||||||
///
|
///
|
||||||
/// (Will) eventually Wait until data from the specified write token
|
/// (Will) eventually wait until data from the specified write token
|
||||||
/// is readable, but currently waits for
|
/// is readable, but currently waits for the data to be persisted (as
|
||||||
|
/// the querier doesn't know how to ask the ingester yet)
|
||||||
///
|
///
|
||||||
/// The retry loop is used to wait for writes to become visible
|
/// The retry loop is used to wait for writes to become visible
|
||||||
pub async fn query_when_readable(
|
pub async fn query_when_readable(
|
||||||
sql: impl Into<String>,
|
sql: impl Into<String>,
|
||||||
namespace: impl Into<String>,
|
namespace: impl Into<String>,
|
||||||
write_token: impl Into<String>,
|
write_token: impl Into<String>,
|
||||||
connection: Connection,
|
ingester_connection: Connection,
|
||||||
|
querier_connection: Connection,
|
||||||
) -> Vec<RecordBatch> {
|
) -> Vec<RecordBatch> {
|
||||||
let namespace = namespace.into();
|
let namespace = namespace.into();
|
||||||
let sql = sql.into();
|
let sql = sql.into();
|
||||||
|
|
||||||
println!(
|
// TODO: this should be "wait_for_readable" once the querier can talk to ingester
|
||||||
"(TODO) Waiting for Write Token to be visible {}",
|
wait_for_persisted(write_token, ingester_connection).await;
|
||||||
write_token.into()
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut client = influxdb_iox_client::flight::Client::new(connection);
|
let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
|
||||||
|
|
||||||
// This does nothing except test the client handshake implementation.
|
// This does nothing except test the client handshake implementation.
|
||||||
client.handshake().await.unwrap();
|
client.handshake().await.unwrap();
|
||||||
|
|
||||||
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
|
let mut response = client
|
||||||
tokio::time::timeout(retry_duration, async move {
|
.perform_query(&namespace, &sql)
|
||||||
let mut interval = tokio::time::interval(Duration::from_millis(100));
|
|
||||||
loop {
|
|
||||||
let mut response = match client.perform_query(&namespace, &sql).await {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(e) => {
|
|
||||||
println!("Retrying; Got error performing query: {}", e);
|
|
||||||
interval.tick().await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let batches = match response.collect().await {
|
|
||||||
Ok(batches) => batches,
|
|
||||||
Err(e) => {
|
|
||||||
println!("Retrying; Got error running query: {}", e);
|
|
||||||
interval.tick().await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// wait for some data to actually arrive
|
|
||||||
if batches.is_empty() {
|
|
||||||
println!("Retrying: No record results yet");
|
|
||||||
interval.tick().await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return batches;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
.await
|
||||||
.expect("successfully ran the query in the allotted time")
|
.expect("Error performing query");
|
||||||
|
|
||||||
|
response.collect().await.expect("Error executing query")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue