From 1670966119463418167a63a3b035de604f2096a7 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 6 Jul 2023 11:53:31 +0200 Subject: [PATCH] fix: send flight keep-alives (#8144) * fix: send flight keep-alives This was tested using a local nginx proxy and a small modication to the IOx source code: insert `make_stream_slow` into `GetStream::new` just between `let query_results = ...` and `let innr = FlightDataEncoderBuilder::new()...`. Fixes https://github.com/influxdata/idpe/issues/17824 . * fix: typo * refactor: reduce keep-alive interval to 5s --- Cargo.lock | 1 + service_grpc_flight/Cargo.toml | 3 +- service_grpc_flight/src/keep_alive.rs | 383 ++++++++++++++++++++++++++ service_grpc_flight/src/lib.rs | 21 +- 4 files changed, 403 insertions(+), 5 deletions(-) create mode 100644 service_grpc_flight/src/keep_alive.rs diff --git a/Cargo.lock b/Cargo.lock index f371d9e516..a6aa14a354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4963,6 +4963,7 @@ dependencies = [ "serde_json", "service_common", "snafu", + "test_helpers", "tokio", "tonic", "trace", diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index c16e6219da..ef0d1c3a45 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -28,6 +28,7 @@ prost = "0.11" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.100" snafu = "0.7" +tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tonic = { workspace = true } workspace-hack = { version = "0.1", path = "../workspace-hack" } @@ -35,4 +36,4 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } assert_matches = "1" async-trait = "0.1" metric = { path = "../metric" } -tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +test_helpers = { path = "../test_helpers" } diff --git a/service_grpc_flight/src/keep_alive.rs b/service_grpc_flight/src/keep_alive.rs new file mode 100644 index 0000000000..e8d1ff0159 --- /dev/null +++ b/service_grpc_flight/src/keep_alive.rs @@ -0,0 +1,383 @@ +//! Keep alive handling for response streaming. +//! +//! # The Problem +//! Under some deployment scenarios, we receive reports of cryptic error messages for certain long-running queries. For +//! example, the InfluxDB IOx CLI will report: +//! +//! ```text +//! Error querying: +//! Tonic( +//! Status { +//! code: Internal, message: "h2 protocol error: error reading a body from connection: stream error received: unexpected internal error encountered", +//! source: Some( +//! hyper::Error( +//! Body, +//! Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) } +//! ) +//! ) +//! } +//! ) +//! ``` +//! +//! And [PyArrow] will report something like: +//! +//! ```text +//! pyarrow._flight.FlightInternalError: +//! Flight returned internal error, with message: +//! Received RST_STREAM with error code 2. gRPC client debug context: +//! UNKNOWN:Error received from peer ipv6:%5B::1%5D:8888 { +//! created_time:"2023-07-03T17:54:56.346363565+02:00", +//! grpc_status:13, +//! grpc_message:"Received RST_STREAM with error code 2" +//! }. +//! Client context: OK +//! ``` +//! +//! `Received RST_STREAM with error code 2` is a good hint. According to [RFC 7540] (the HTTP/2 spec) the error code is +//! (see section 7): +//! +//! > INTERNAL_ERROR (0x2): The endpoint encountered an unexpected internal error. +//! +//! and `RST_STREAM` is (see section 6.4): +//! +//! > The `RST_STREAM` frame (type=0x3) allows for immediate termination of a stream. `RST_STREAM` is sent to request +//! > cancellation of a stream or to indicate that an error condition has occurred. +//! +//! The `grpc_status:13` confirms that -- according to [gRPC Status Codes] this means: +//! +//! > Internal errors. This means that some invariants expected by the underlying system have been broken. This error +//! > code is reserved for serious errors. +//! +//! The issue was replicated using [NGINX] and a hack in InfluxDB that makes streams really slow. +//! +//! The underlying issue is that some middleware or egress component -- e.g. [NGINX] -- terminates the response stream +//! because it thinks it is dead. +//! +//! # The Official Way +//! The [gPRC Keepalive] docs say: +//! +//! > HTTP/2 PING-based keepalives are a way to keep an HTTP/2 connection alive even when there is no data being +//! > transferred. This is done by periodically sending a PING frame to the other end of the connection. +//! +//! The `PING` mechanism is described by [RFC 7540] in section 6.7: +//! +//! > In addition to the frame header, `PING` frames MUST contain 8 octets of opaque data in the payload. ... +//! > +//! > Receivers of a `PING frame that does not include an ACK flag MUST send a `PING` frame with the ACK flag set in +//! > response, with an identical payload. ... +//! +//! So every "ping" has a "pong". However the same section also says: +//! +//! > `PING` frames are not associated with any individual stream. If a `PING` frame is received with a stream +//! > identifier field value other than `0x0`, the recipient MUST respond with a connection error (Section 5.4.1) of +//! > type `PROTOCOL_ERROR`. +//! +//! Now how should an egress proxy deal with this? Because streams may come from multiple upstream servers, they have +//! no way to establish a proper ping-pong end-to-end signaling path per stream. Hence in general it is not feasible to +//! use `PING` as a keep-alive mechanism, contrary to what the [gRPC] spec says. So what DO egress proxies do then? +//! Looking at various egress solutions: +//! +//! - +//! - +//! +//! They all seem to agree that either you set really long timeouts and/or activity-based keep-alive, i.e. they require +//! SOMETHING to be send on that stream. +//! +//! # The Wanted Workaround +//! Since all `PING`-based signalling is broken, we fall back to activity-based keep-alive, i.e. we ensure that we +//! regularly send something in our stream. +//! +//! Our response stream follows the [Apache Flight] defintion. This means that we have a [gRPC] stream with +//! [`FlightData`] messages. Every of these messages has a [`MessageHeader`] describing its content. This is +//! [FlatBuffers] union with the following options: +//! +//! - `None`: This is the implicit default. +//! - `Schema`: Sent before any other data to describe the schema of the stream. +//! - `DictionaryBatch`: Encodes dictionary data. This is not used in practice at the moment because dictionaries are +//! always hydrated. +//! - `RecordBatch`: Content of a `RecordBatch` w/o schema information. +//! - `Tensor`, `SparseTensor`: Irrelevant for us. +//! +//! Ideally we would send a `None` messages with some metdata. However most clients are too broken to accept this and +//! will trip over these messages. E.g. [PyArrow] -- which uses the C++ implementation -- will fail with: +//! +//! ```text +//! OSError: Header-type of flatbuffer-encoded Message is not RecordBatch. +//! ``` +//! +//! # The Actual Workaround +//! So we send actual empty `RecordBatch`es instead. These are encoded as `RecordBatch` messages w/o a schema (see +//! section above). The schema is sent separately right at the start of the stream. The arrow-rs implementation does +//! that for us and also ensures that the schema is adjusted for dictionary hydration. So we just inspect the data +//! stream and wait for that schema (the upstream implementation will always send this without any blocking / wait +//! time / actual `RecordBatch` data). +//! +//! +//! [Apache Flight]: https://arrow.apache.org/docs/format/Flight.html +//! [FlatBuffers]: https://flatbuffers.dev/ +//! [`FlightData`]: https://github.com/apache/arrow/blob/cd1ed18fd1e08912ea47b64edf55be9c046375c4/format/Flight.proto#L401-L429 +//! [gRPC]: https://grpc.io/ +//! [gPRC Keepalive]: https://grpc.io/docs/guides/keepalive/ +//! [gRPC Status Codes]: https://grpc.github.io/grpc/core/md_doc_statuscodes.html +//! [`MessageHeader`]: https://github.com/apache/arrow/blob/cd1ed18fd1e08912ea47b64edf55be9c046375c4/format/Message.fbs#L124-L132 +//! [NGINX]: https://nginx.org/ +//! [PyArrow]: https://arrow.apache.org/docs/python/index.html +//! [RFC 7540]: https://httpwg.org/specs/rfc7540.html + +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use arrow::{ + datatypes::{DataType, Schema, SchemaRef}, + ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}, + record_batch::RecordBatch, +}; +use arrow_flight::{error::FlightError, FlightData}; +use futures::{stream::BoxStream, Stream, StreamExt}; +use observability_deps::tracing::{info, warn}; +use tokio::time::{Interval, MissedTickBehavior}; + +/// Keep alive underlying response stream by sending regular empty [`RecordBatch`]es. +pub struct KeepAliveStream { + inner: BoxStream<'static, Result>, +} + +impl KeepAliveStream { + /// Create new keep-alive wrapper from the underlying stream and the given interval. + /// + /// The interval is measured from the last message -- which can either be a "real" message or a keep-alive. + pub fn new(s: S, interval: Duration) -> Self + where + S: Stream> + Send + 'static, + { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + let state = State { + inner: s.boxed(), + schema: None, + ticker, + }; + + let inner = futures::stream::unfold(state, |mut state| async move { + loop { + tokio::select! { + _ = state.ticker.tick() => { + let Some(data) = build_empty_batch_msg(state.schema.as_ref()) else { + continue; + }; + info!("stream keep-alive"); + return Some((Ok(data), state)); + } + res = state.inner.next() => { + // peek at content to detect schema transmission + if let Some(Ok(data)) = &res { + if let Some(schema) = decode_schema(data) { + if check_schema(&schema) { + state.schema = Some(Arc::new(schema)); + } + } + } + + state.ticker.reset(); + return res.map(|res| (res, state)); + } + } + } + }) + .boxed(); + + Self { inner } + } +} + +impl Stream for KeepAliveStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_unpin(cx) + } +} + +/// Inner state of [`KeepAliveStream`] +struct State { + /// The underlying stream that is kept alive. + inner: BoxStream<'static, Result>, + + /// A [`Schema`] that was already received from the stream. + /// + /// We need this to produce sensible empty [`RecordBatch`]es and because [`RecordBatch`] messages can only come + /// AFTER an encoded [`Schema`]. + schema: Option, + + /// Keep-alive ticker. + ticker: Interval, +} + +/// Decode [`Schema`] from response data stream. +fn decode_schema(data: &FlightData) -> Option { + let message = arrow::ipc::root_as_message(&data.data_header[..]).ok()?; + + if arrow::ipc::MessageHeader::Schema != message.header_type() { + return None; + } + Schema::try_from(data).ok() +} + +/// Check that the [`Schema`] that we've [decoded](decode_schema) is sensible. +/// +/// Returns `true` if the [`Schema`] is OK. Will log a warning and return `false` if there is a problem. +fn check_schema(schema: &Schema) -> bool { + schema.fields().iter().all(|field| match field.data_type() { + DataType::Dictionary(_, _) => { + warn!( + field = field.name(), + "arrow IPC schema still contains dictionary, should have been hydrated by now", + ); + false + } + _ => true, + }) +} + +/// Encode an empty [`RecordBatch`] as a message. +/// +/// This must only be sent AFTER a [`Schema`] was transmitted. +fn build_empty_batch_msg(schema: Option<&SchemaRef>) -> Option { + let Some(schema) = schema else { + warn!( + "cannot send keep-alive because no schema was transmitted yet", + ); + return None; + }; + + let batch = RecordBatch::new_empty(Arc::clone(schema)); + let data_gen = IpcDataGenerator::default(); + let mut dictionary_tracker = DictionaryTracker::new(true); + let write_options = IpcWriteOptions::default(); + let batch_data = match data_gen.encoded_batch(&batch, &mut dictionary_tracker, &write_options) { + Ok((dicts_data, batch_data)) => { + assert!(dicts_data.is_empty()); + batch_data + } + Err(e) => { + warn!( + %e, + "cannot encode empty batch", + ); + return None; + } + }; + + Some(batch_data.into()) +} + +#[cfg(test)] +pub mod test_util { + use std::time::Duration; + + use futures::{stream::BoxStream, Stream, StreamExt}; + + /// Ensure that there is a delay between steam responses. + pub fn make_stream_slow(s: S, delay: Duration) -> BoxStream<'static, S::Item> + where + S: Send + Stream + Unpin + 'static, + { + futures::stream::unfold(s, move |mut s| async move { + tokio::time::sleep(delay).await; + let res = s.next().await; + res.map(|res| (res, s)) + }) + .boxed() + } +} + +#[cfg(test)] +mod tests { + use arrow::{array::Int64Array, datatypes::Field}; + use arrow_flight::{decode::FlightRecordBatchStream, encode::FlightDataEncoderBuilder}; + use datafusion::assert_batches_eq; + use futures::TryStreamExt; + use test_helpers::maybe_start_logging; + + use super::{test_util::make_stream_slow, *}; + + type BatchStream = BoxStream<'static, Result>; + type FlightStream = BoxStream<'static, Result>; + + #[tokio::test] + #[should_panic(expected = "stream timeout")] + async fn test_timeout() { + let s = make_test_stream(false); + let s = FlightRecordBatchStream::new_from_flight_data(s); + s.collect::>().await; + } + + #[tokio::test] + async fn test_keep_alive() { + maybe_start_logging(); + + let s = make_test_stream(true); + let s = FlightRecordBatchStream::new_from_flight_data(s); + let batches: Vec<_> = s.try_collect().await.unwrap(); + assert_batches_eq!( + vec!["+---+", "| f |", "+---+", "| 1 |", "| 2 |", "| 3 |", "| 4 |", "| 5 |", "+---+",], + &batches + ); + } + + /// Creates a stream like the query processing would do. + fn make_query_result_stream() -> (BatchStream, SchemaRef) { + let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int64, false)])); + + let batch_1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let batch_2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![4, 5]))], + ) + .unwrap(); + + let s = futures::stream::iter([batch_1, batch_2]).map(Ok).boxed(); + (s, schema) + } + + /// Convert query result stream (= [`RecordBatch`]es) into a [`FlightData`] stream. + /// + /// This stream will -- as in prod -- send the [`Schema`] data even when there are no [`RecordBatch`]es yet. + fn make_flight_data_stream(s: BatchStream, schema: SchemaRef) -> FlightStream { + FlightDataEncoderBuilder::new() + .with_schema(schema) + .build(s) + .boxed() + } + + fn panic_on_stream_timeout(s: FlightStream, timeout: Duration) -> FlightStream { + futures::stream::unfold(s, move |mut s| async move { + let res = tokio::time::timeout(timeout, s.next()) + .await + .expect("stream timeout"); + res.map(|res| (res, s)) + }) + .boxed() + } + + fn make_test_stream(keep_alive: bool) -> FlightStream { + let (s, schema) = make_query_result_stream(); + let s = make_stream_slow(s, Duration::from_millis(500)); + let s = make_flight_data_stream(s, schema); + let s = if keep_alive { + KeepAliveStream::new(s, Duration::from_millis(100)).boxed() + } else { + s + }; + let s = panic_on_stream_timeout(s, Duration::from_millis(250)); + s + } +} diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index e1fcf5a836..7813f14ff9 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -16,14 +16,16 @@ unused_crate_dependencies )] +use keep_alive::KeepAliveStream; // Workaround for "unused crate" lint false positives. use workspace_hack as _; +mod keep_alive; mod request; use arrow::error::ArrowError; use arrow_flight::{ - encode::{FlightDataEncoder, FlightDataEncoderBuilder}, + encode::FlightDataEncoderBuilder, flight_descriptor::DescriptorType, flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, @@ -44,7 +46,13 @@ use prost::Message; use request::{IoxGetRequest, RunQuery}; use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; use snafu::{OptionExt, ResultExt, Snafu}; -use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; +use std::{ + fmt::Debug, + pin::Pin, + sync::Arc, + task::Poll, + time::{Duration, Instant}, +}; use tonic::{ metadata::{AsciiMetadataValue, MetadataMap}, Request, Response, Streaming, @@ -65,6 +73,9 @@ const IOX_FLIGHT_SQL_DATABASE_HEADERS: [&str; 4] = [ "iox-namespace-name", // deprecated ]; +/// In which interval should the `DoGet` stream send empty messages as keep alive markers? +const DO_GET_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5); + #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { @@ -883,7 +894,7 @@ fn has_debug_header(metadata: &MetadataMap) -> bool { /// Wrapper over a FlightDataEncodeStream that adds IOx specfic /// metadata and records completion struct GetStream { - inner: FlightDataEncoder, + inner: KeepAliveStream, #[allow(dead_code)] permit: InstrumentedAsyncOwnedSemaphorePermit, query_completed_token: QueryCompletedToken, @@ -919,6 +930,9 @@ impl GetStream { .with_metadata(app_metadata.encode_to_vec().into()) .build(query_results); + // add keep alive + let inner = KeepAliveStream::new(inner, DO_GET_KEEP_ALIVE_INTERVAL); + Ok(Self { inner, permit, @@ -958,7 +972,6 @@ impl Stream for GetStream { } } } - #[cfg(test)] mod tests { use arrow_flight::sql::ProstMessageExt;