From a30a85e62cc80b8e912d8fe463f7a8446933a6d6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 7 Apr 2022 15:24:58 -0400 Subject: [PATCH] feat: Add get_write_info service (#4227) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- generated_types/build.rs | 1 + .../iox/ingester/v1/write_info.proto | 21 +++ .../tests/end_to_end_ng_cases/all_in_one.rs | 1 + .../tests/end_to_end_ng_cases/ingester.rs | 17 ++- .../tests/end_to_end_ng_cases/querier.rs | 1 + influxdb_iox_client/src/client.rs | 3 + influxdb_iox_client/src/client/write_info.rs | 42 ++++++ ingester/src/server/grpc.rs | 70 +++++++++- ioxd_ingester/src/lib.rs | 1 + test_helpers_end_to_end_ng/src/client.rs | 121 ++++++++++++------ 10 files changed, 231 insertions(+), 47 deletions(-) create mode 100644 generated_types/protos/influxdata/iox/ingester/v1/write_info.proto create mode 100644 influxdb_iox_client/src/client/write_info.rs diff --git a/generated_types/build.rs b/generated_types/build.rs index 6baa0e0af1..ba3c27447d 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -50,6 +50,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { deployment_path.join("service.proto"), ingester_path.join("parquet_metadata.proto"), ingester_path.join("query.proto"), + ingester_path.join("write_info.proto"), management_path.join("chunk.proto"), management_path.join("database_rules.proto"), management_path.join("jobs.proto"), diff --git a/generated_types/protos/influxdata/iox/ingester/v1/write_info.proto b/generated_types/protos/influxdata/iox/ingester/v1/write_info.proto new file mode 100644 index 0000000000..85d763a43c --- /dev/null +++ b/generated_types/protos/influxdata/iox/ingester/v1/write_info.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +package influxdata.iox.ingester.v1; + +service WriteInfoService { + // Get information about a particular write + rpc GetWriteInfo(GetWriteInfoRequest) returns (GetWriteInfoResponse); +} + +message GetWriteInfoRequest { + // The write token returned from a write + string write_token = 1; +} + +message GetWriteInfoResponse { + // Is the data in this write entirely readable (will be included in + // a query response)? + bool readable = 1; + + // Is the data in this write completely persisted to parquet files? + bool persisted = 2; +} diff --git a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs index dfef9382f6..b10e9ba032 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs @@ -34,6 +34,7 @@ async fn smoke() { sql, namespace, write_token, + all_in_one.server().ingester_grpc_connection(), all_in_one.server().querier_grpc_connection(), ) .await; diff --git a/influxdb_iox/tests/end_to_end_ng_cases/ingester.rs b/influxdb_iox/tests/end_to_end_ng_cases/ingester.rs index 5aa099d957..8d72cc83e4 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/ingester.rs @@ -1,5 +1,7 @@ use http::StatusCode; -use test_helpers_end_to_end_ng::{maybe_skip_integration, MiniCluster, TestConfig}; +use test_helpers_end_to_end_ng::{ + get_write_token, maybe_skip_integration, wait_for_readable, MiniCluster, TestConfig, +}; use arrow_util::assert_batches_sorted_eq; use data_types2::{IngesterQueryRequest, SequencerId}; @@ -14,16 +16,21 @@ async fn ingester_flight_api() { let router2_config = TestConfig::new_router2(&database_url); let ingester_config = TestConfig::new_ingester(&router2_config); - // Set up router2 ==================================== - let cluster = MiniCluster::new().with_router2(router2_config).await; + // Set up cluster + let cluster = MiniCluster::new() + .with_router2(router2_config) + .await + .with_ingester(ingester_config) + .await; // Write some data into the v2 HTTP API ============== let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name); let response = cluster.write_to_router(lp).await; assert_eq!(response.status(), StatusCode::NO_CONTENT); - // Set up ingester =================================== - let cluster = cluster.with_ingester(ingester_config).await; + // wait for the write to become visible + let write_token = get_write_token(&response); + wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await; let mut querier_flight = querier::QuerierFlightClient::new(cluster.ingester().ingester_grpc_connection()); diff --git a/influxdb_iox/tests/end_to_end_ng_cases/querier.rs b/influxdb_iox/tests/end_to_end_ng_cases/querier.rs index 779268ece2..d74b081862 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/querier.rs @@ -38,6 +38,7 @@ async fn basic_on_parquet() { sql, cluster.namespace(), write_token, + cluster.ingester().ingester_grpc_connection(), cluster.querier().querier_grpc_connection(), ) .await; diff --git a/influxdb_iox_client/src/client.rs b/influxdb_iox_client/src/client.rs index b51eab171d..f69fb2e10c 100644 --- a/influxdb_iox_client/src/client.rs +++ b/influxdb_iox_client/src/client.rs @@ -34,3 +34,6 @@ pub mod operations; #[cfg(feature = "flight")] /// Client for query API (based on Arrow flight) pub mod flight; + +/// Client for fetching write info +pub mod write_info; diff --git a/influxdb_iox_client/src/client/write_info.rs b/influxdb_iox_client/src/client/write_info.rs new file mode 100644 index 0000000000..bd530a0795 --- /dev/null +++ b/influxdb_iox_client/src/client/write_info.rs @@ -0,0 +1,42 @@ +use self::generated_types::{write_info_service_client::WriteInfoServiceClient, *}; + +use crate::connection::Connection; +use crate::error::Error; + +/// Re-export generated_types +pub mod generated_types { + pub use generated_types::influxdata::iox::ingester::v1::{ + write_info_service_client, write_info_service_server, GetWriteInfoRequest, + GetWriteInfoResponse, + }; +} + +/// A basic client for fetching information about write tokens +#[derive(Debug, Clone)] +pub struct Client { + inner: WriteInfoServiceClient, +} + +impl Client { + /// Creates a new client with the provided connection + pub fn new(channel: Connection) -> Self { + Self { + inner: WriteInfoServiceClient::new(channel), + } + } + + /// Get the write information for a write token + pub async fn get_write_info( + &mut self, + write_token: &str, + ) -> Result { + let response = self + .inner + .get_write_info(GetWriteInfoRequest { + write_token: write_token.to_string(), + }) + .await?; + + Ok(response.into_inner()) + } +} diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index 57dc7971f1..c6fc3ff42a 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -13,7 +13,11 @@ use arrow_flight::{ HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; use futures::{SinkExt, Stream, StreamExt}; -use generated_types::influxdata::iox::ingester::v1 as proto; +use generated_types::influxdata::iox::ingester::v1::{ + self as proto, + write_info_service_server::{WriteInfoService, WriteInfoServiceServer}, + GetWriteInfoRequest, GetWriteInfoResponse, +}; use observability_deps::tracing::{info, warn}; use pin_project::{pin_project, pinned_drop}; use prost::Message; @@ -22,6 +26,7 @@ use std::{pin::Pin, sync::Arc, task::Poll}; use tokio::task::JoinHandle; use tonic::{Request, Response, Streaming}; use trace::ctx::SpanContext; +use write_summary::WriteSummary; /// This type is responsible for managing all gRPC services exposed by /// `ingester`. @@ -43,6 +48,69 @@ impl GrpcDelegate { ingest_handler: Arc::clone(&self.ingest_handler), }) } + + /// Acquire an WriteInfo gRPC service implementation. + pub fn write_info_service(&self) -> WriteInfoServiceServer { + WriteInfoServiceServer::new(WriteInfoServiceImpl::new( + Arc::clone(&self.ingest_handler) as _ + )) + } +} + +/// Implementation of write info +struct WriteInfoServiceImpl { + handler: Arc, +} + +impl WriteInfoServiceImpl { + pub fn new(handler: Arc) -> Self { + Self { handler } + } +} + +fn write_summary_error_to_status(e: write_summary::Error) -> tonic::Status { + use write_summary::Error; + + match e { + // treat "unknown partition error" as a failed precondition + // (so the client can distinguish between "write isn't + // readable" from "we can't tell if write is readable" + e @ Error::UnknownKafkaPartition { .. } => { + tonic::Status::failed_precondition(format!("Can not determine status of write: {}", e)) + } + } +} + +#[tonic::async_trait] +impl WriteInfoService for WriteInfoServiceImpl { + async fn get_write_info( + &self, + request: Request, + ) -> Result, tonic::Status> { + let GetWriteInfoRequest { write_token } = request.into_inner(); + + let write_summary = + WriteSummary::try_from_token(&write_token).map_err(tonic::Status::invalid_argument)?; + + let progress = self + .handler + .progresses(write_summary.kafka_partitions()) + .await + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let readable = write_summary + .readable(&progress) + .map_err(write_summary_error_to_status)?; + + let persisted = write_summary + .persisted(&progress) + .map_err(write_summary_error_to_status)?; + + Ok(tonic::Response::new(GetWriteInfoResponse { + readable, + persisted, + })) + } } #[derive(Debug, Snafu)] diff --git a/ioxd_ingester/src/lib.rs b/ioxd_ingester/src/lib.rs index 63274ebff3..74b0c4e415 100644 --- a/ioxd_ingester/src/lib.rs +++ b/ioxd_ingester/src/lib.rs @@ -91,6 +91,7 @@ impl ServerType for IngesterSe async fn server_grpc(self: Arc, builder_input: RpcBuilderInput) -> Result<(), RpcError> { let builder = setup_builder!(builder_input, self); add_service!(builder, self.server.grpc().flight_service()); + add_service!(builder, self.server.grpc().write_info_service()); serve_builder!(builder); Ok(()) diff --git a/test_helpers_end_to_end_ng/src/client.rs b/test_helpers_end_to_end_ng/src/client.rs index d595138cca..6721575d4f 100644 --- a/test_helpers_end_to_end_ng/src/client.rs +++ b/test_helpers_end_to_end_ng/src/client.rs @@ -6,6 +6,7 @@ use http::Response; use hyper::{Body, Client, Request}; use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::write_info::generated_types::GetWriteInfoResponse; /// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router) pub async fn write_to_router( @@ -48,64 +49,102 @@ pub fn get_write_token(response: &Response) -> String { const MAX_QUERY_RETRY_TIME_SEC: u64 = 10; +/// Waits for the specified predicate to return true +pub async fn wait_for_token( + write_token: impl Into, + ingester_connection: Connection, + f: F, +) where + F: Fn(&GetWriteInfoResponse) -> bool, +{ + let write_token = write_token.into(); + + println!("Waiting for Write Token {}", write_token); + + let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC); + let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection); + tokio::time::timeout(retry_duration, async move { + let mut interval = tokio::time::interval(Duration::from_millis(500)); + loop { + match write_info_client.get_write_info(&write_token).await { + Ok(res) => { + if f(&res) { + return; + } + println!("Retrying; predicate not satistified: {:?}", res); + } + + Err(e) => { + println!("Retrying; Got error getting write_info: {}", e); + } + }; + interval.tick().await; + } + }) + .await + .expect("did not get passing predicate on token"); +} + +/// Waits for the specified write token to be readable +pub async fn wait_for_readable(write_token: impl Into, ingester_connection: Connection) { + println!("Waiting for Write Token to be readable"); + + wait_for_token(write_token, ingester_connection, |res| { + if res.readable { + println!("Write is readable: {:?}", res); + true + } else { + false + } + }) + .await +} + +/// Waits for the write token to be persisted +pub async fn wait_for_persisted(write_token: impl Into, ingester_connection: Connection) { + println!("Waiting for Write Token to be persisted"); + + wait_for_token(write_token, ingester_connection, |res| { + if res.persisted { + println!("Write is persisted: {:?}", res); + true + } else { + false + } + }) + .await +} + /// Runs a query using the flight API on the specified connection /// until responses are produced. /// -/// (Will) eventually Wait until data from the specified write token -/// is readable, but currently waits for +/// (Will) eventually wait until data from the specified write token +/// is readable, but currently waits for the data to be persisted (as +/// the querier doesn't know how to ask the ingester yet) /// /// The retry loop is used to wait for writes to become visible pub async fn query_when_readable( sql: impl Into, namespace: impl Into, write_token: impl Into, - connection: Connection, + ingester_connection: Connection, + querier_connection: Connection, ) -> Vec { let namespace = namespace.into(); let sql = sql.into(); - println!( - "(TODO) Waiting for Write Token to be visible {}", - write_token.into() - ); + // TODO: this should be "wait_for_readable" once the querier can talk to ingester + wait_for_persisted(write_token, ingester_connection).await; - let mut client = influxdb_iox_client::flight::Client::new(connection); + let mut client = influxdb_iox_client::flight::Client::new(querier_connection); // This does nothing except test the client handshake implementation. client.handshake().await.unwrap(); - let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC); - tokio::time::timeout(retry_duration, async move { - let mut interval = tokio::time::interval(Duration::from_millis(100)); - loop { - let mut response = match client.perform_query(&namespace, &sql).await { - Ok(res) => res, - Err(e) => { - println!("Retrying; Got error performing query: {}", e); - interval.tick().await; - continue; - } - }; + let mut response = client + .perform_query(&namespace, &sql) + .await + .expect("Error performing query"); - let batches = match response.collect().await { - Ok(batches) => batches, - Err(e) => { - println!("Retrying; Got error running query: {}", e); - interval.tick().await; - continue; - } - }; - - // wait for some data to actually arrive - if batches.is_empty() { - println!("Retrying: No record results yet"); - interval.tick().await; - continue; - } - - return batches; - } - }) - .await - .expect("successfully ran the query in the allotted time") + response.collect().await.expect("Error executing query") }