refactor: chunk InfluxRPC `ReadResponses` (#6094)

Currently we see some prod panics:

```
'assertion failed: len <= std::u32::MAX as usize', tonic/src/codec/encode.rs:127:5
```

This is due to an upstream bug in tonic:
https://github.com/hyperium/tonic/issues/1141

However the fix will only turn this into an error instead of panicking.
We should instead NOT return such overlarge results, esp. because
InfluxRPC supports streaming.

While we currently don't perform streaming conversion (like streaming
the data out of the query stack into the gRPC layer), the 4GB size limit
can easily be triggered (in prod) w/ enough RAM. So let's re-chunk our
in-memory responses so that they stream nicely to the client.

We may later implement proper streaming conversion, see #4445 and #503.
pull/24376/head
Marco Neumann 2022-11-10 08:13:22 +00:00 committed by GitHub
parent 07c25335cf
commit 5f7a6e696f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 176 additions and 4 deletions

View File

@ -16,8 +16,8 @@ use datafusion::error::DataFusionError;
use futures::Stream;
use generated_types::{
google::protobuf::Empty, literal_or_regex::Value as RegexOrLiteralValue,
offsets_response::PartitionOffsetResponse, storage_server::Storage, tag_key_predicate,
CapabilitiesResponse, Capability, Int64ValuesResponse, LiteralOrRegex,
offsets_response::PartitionOffsetResponse, read_response::Frame, storage_server::Storage,
tag_key_predicate, CapabilitiesResponse, Capability, Int64ValuesResponse, LiteralOrRegex,
MeasurementFieldsRequest, MeasurementFieldsResponse, MeasurementNamesRequest,
MeasurementTagKeysRequest, MeasurementTagValuesRequest, OffsetsResponse, Predicate,
ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest,
@ -32,7 +32,7 @@ use iox_query::{
},
QueryDatabase, QueryText,
};
use observability_deps::tracing::{error, info, trace};
use observability_deps::tracing::{error, info, trace, warn};
use pin_project::pin_project;
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryDatabaseProvider};
use snafu::{OptionExt, ResultExt, Snafu};
@ -47,6 +47,12 @@ use trace::{ctx::SpanContext, span::SpanExt};
use trace_http::ctx::{RequestLogContext, RequestLogContextExt};
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
/// The size to which we limit our [`ReadResponse`] payloads.
///
/// We will regroup the returned frames (preserving order) to only produce [`ReadResponse`] objects of approximately
/// this size (there's a bit of additional encoding overhead on top of that, but that should be OK).
const MAX_READ_RESPONSE_SIZE: usize = 4194304 - 100_000; // 4MB - <wiggle room>
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Database not found: {}", db_name))]
@ -270,7 +276,8 @@ where
let mut query_completed_token = db.record_query(&ctx, "read_filter", defer_json(&req));
let results = read_filter_impl(Arc::clone(&db), db_name, req, &ctx)
.await?
.await
.map(|responses| chunk_read_responses(responses, MAX_READ_RESPONSE_SIZE))?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
@ -350,6 +357,7 @@ where
&ctx,
)
.await
.map(|responses| chunk_read_responses(responses, MAX_READ_RESPONSE_SIZE))
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
@ -429,6 +437,7 @@ where
&ctx,
)
.await
.map(|responses| chunk_read_responses(responses, MAX_READ_RESPONSE_SIZE))
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
@ -1586,6 +1595,58 @@ where
}
}
/// Chunk given [`ReadResponse`]s -- while preserving the [`Frame`] order -- into responses that shall at max have the
/// given size.
///
/// # Panic
/// Panics if `size_limit` is 0.
fn chunk_read_responses(responses: Vec<ReadResponse>, size_limit: usize) -> Vec<ReadResponse> {
assert!(size_limit > 0, "zero size limit");
let mut out = Vec::with_capacity(1);
let it = responses
.into_iter()
.flat_map(|response| response.frames.into_iter());
let mut frames = vec![];
let mut size = 0;
for frame in it {
let fsize = frame_size(&frame);
// flush?
if size + fsize > size_limit {
size = 0;
out.push(ReadResponse {
frames: std::mem::take(&mut frames),
});
}
if fsize > size_limit {
warn!(
frame_size = fsize,
size_limit, "Oversized frame in read response",
);
}
frames.push(frame);
size += fsize;
}
// final flush
if !frames.is_empty() {
out.push(ReadResponse { frames });
}
out
}
fn frame_size(frame: &Frame) -> usize {
frame
.data
.as_ref()
.map(|data| data.encoded_len())
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::*;
@ -3474,6 +3535,117 @@ mod tests {
assert_eq!(response.metadata().get("storage-type").unwrap(), "iox");
}
#[test]
#[should_panic(expected = "zero size limit")]
fn test_chunk_read_responses_panics() {
chunk_read_responses(vec![], 0);
}
#[test]
fn test_chunk_read_responses_ok() {
use generated_types::influxdata::platform::storage::read_response::{
frame::Data, BooleanPointsFrame,
};
let frame1 = Frame {
data: Some(Data::BooleanPoints(BooleanPointsFrame {
timestamps: vec![1, 2, 3],
values: vec![false, true, false],
})),
};
let frame2 = Frame {
data: Some(Data::BooleanPoints(BooleanPointsFrame {
timestamps: vec![4],
values: vec![true],
})),
};
let fsize1 = frame_size(&frame1);
let fsize2 = frame_size(&frame2);
// no respones
assert_eq!(chunk_read_responses(vec![], 1), vec![],);
// no frames
assert_eq!(
chunk_read_responses(vec![ReadResponse { frames: vec![] }], 1),
vec![],
);
// split
assert_eq!(
chunk_read_responses(
vec![ReadResponse {
frames: vec![
frame1.clone(),
frame1.clone(),
frame2.clone(),
frame2.clone(),
frame1.clone(),
],
}],
fsize1 + fsize1 + fsize2,
),
vec![
ReadResponse {
frames: vec![frame1.clone(), frame1.clone(), frame2.clone()],
},
ReadResponse {
frames: vec![frame2.clone(), frame1.clone()],
},
],
);
// join
assert_eq!(
chunk_read_responses(
vec![
ReadResponse {
frames: vec![frame1.clone(), frame2.clone(),],
},
ReadResponse {
frames: vec![frame2.clone(),],
},
],
fsize1 + fsize2 + fsize2,
),
vec![ReadResponse {
frames: vec![frame1.clone(), frame2.clone(), frame2.clone()],
},],
);
// re-arrange
assert_eq!(
chunk_read_responses(
vec![
ReadResponse {
frames: vec![
frame1.clone(),
frame1.clone(),
frame2.clone(),
frame2.clone(),
frame1.clone(),
],
},
ReadResponse {
frames: vec![frame1.clone(), frame2.clone(),],
},
],
fsize1 + fsize1 + fsize2,
),
vec![
ReadResponse {
frames: vec![frame1.clone(), frame1.clone(), frame2.clone()],
},
ReadResponse {
frames: vec![frame2.clone(), frame1.clone(), frame1],
},
ReadResponse {
frames: vec![frame2],
},
],
);
}
fn make_timestamp_range(start: i64, end: i64) -> TimestampRange {
TimestampRange { start, end }
}