fix(ingester): re-transmit schema over flight if it changes (#7812)

* fix(ingester): re-transmit schema over flight if it changes

Fixes https://github.com/influxdata/idpe/issues/17408 .

So a `[Sendable]RecordBatchStream` contains `RecordBatch`es of the SAME
schema. When the ingester crafts a response for a specific partition,
this is also almost always the case however when there's a persist job
running (I think) it may have multiple snapshots for a partition. These
snapshots may have different schemas (since the ingester only creates
columns if the contain any data). Now the current implementation munches
all these snapshots into a single stream, and hands them over to arrow
flight which has a high-perf encode routine (i.e. it does not re-check
every single schema) so it sends the schema once and then sends the data
for every batch (the data only, schema data is NOT repeated). On the
receiver side (= querier) we decode that data and get confused why on
earth some batches have a different column count compared to the schema.

For the OG ingester I carefully crafted the response to ensure that we
do not run into this problem, but apparently a number of rewrites and
refactors broke that. So here is the fix:

- remove the stream that isn't really as stream (and cannot error)
- for each partition go over the `RecordBatch`es and chunk them
  according to the schema (because this check is likely cheaper than
  re-transmitting the schema for every `RecordBatch`)
- adjust a bunch of testing code to cope with this

* refactor: nicify code

* test: adjust test
pull/24376/head
Marco Neumann 2023-05-23 16:27:11 +02:00 committed by GitHub
parent 43078576b8
commit 6729b5681a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 222 additions and 533 deletions

View File

@ -15,7 +15,6 @@ backoff = { version = "0.1.0", path = "../backoff" }
bytes = "1.4.0" bytes = "1.4.0"
crossbeam-utils = "0.8.15" crossbeam-utils = "0.8.15"
data_types = { version = "0.1.0", path = "../data_types" } data_types = { version = "0.1.0", path = "../data_types" }
datafusion_util = { path = "../datafusion_util" }
datafusion.workspace = true datafusion.workspace = true
dml = { version = "0.1.0", path = "../dml" } dml = { version = "0.1.0", path = "../dml" }
flatbuffers = "23.1.21" flatbuffers = "23.1.21"

View File

@ -232,7 +232,7 @@ mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::{PartitionId, PartitionKey}; use data_types::{PartitionId, PartitionKey};
use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use futures::{StreamExt, TryStreamExt}; use futures::StreamExt;
use metric::{Attributes, Metric}; use metric::{Attributes, Metric};
use super::*; use super::*;
@ -360,10 +360,10 @@ mod tests {
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.await .await
.expect("query should succeed") .expect("query should succeed")
.into_record_batches() .into_partition_stream()
.try_collect::<Vec<_>>() .flat_map(|ps| futures::stream::iter(ps.into_record_batches()))
.await .collect::<Vec<_>>()
.expect("query failed"); .await;
// Assert the contents of ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID // Assert the contents of ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID
assert_batches_sorted_eq!( assert_batches_sorted_eq!(
@ -952,11 +952,7 @@ mod tests {
let partition = partitions.pop().unwrap(); let partition = partitions.pop().unwrap();
// Perform the partition read // Perform the partition read
let batches = datafusion::physical_plan::common::collect( let batches = partition.into_record_batches();
partition.into_record_batch_stream().unwrap(),
)
.await
.expect("failed to collate query results");
// Assert the contents of p1 contains both the initial write, and the // Assert the contents of p1 contains both the initial write, and the
// 3rd write in a single RecordBatch. // 3rd write in a single RecordBatch.

View File

@ -6,7 +6,6 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use datafusion_util::MemoryStream;
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;
use parking_lot::Mutex; use parking_lot::Mutex;
use schema::Projection; use schema::Projection;
@ -238,12 +237,10 @@ where
Projection::Some(columns.as_ref()) Projection::Some(columns.as_ref())
}; };
let data = Box::pin(MemoryStream::new( let data = data.project_selection(selection).into_iter().collect();
data.project_selection(selection).into_iter().collect(), PartitionResponse::new(data, id, completed_persistence_count)
));
PartitionResponse::new(Some(data), id, completed_persistence_count)
} }
None => PartitionResponse::new(None, id, completed_persistence_count), None => PartitionResponse::new(vec![], id, completed_persistence_count),
}; };
span.ok("read partition data"); span.ok("read partition data");

View File

@ -2,13 +2,14 @@
//! //!
//! [`QueryResponse`]: super::response::QueryResponse //! [`QueryResponse`]: super::response::QueryResponse
use arrow::record_batch::RecordBatch;
use data_types::PartitionId; use data_types::PartitionId;
use datafusion::physical_plan::SendableRecordBatchStream;
/// Response data for a single partition. /// Response data for a single partition.
#[derive(Debug)]
pub(crate) struct PartitionResponse { pub(crate) struct PartitionResponse {
/// Stream of snapshots. /// Stream of snapshots.
batches: Option<SendableRecordBatchStream>, batches: Vec<RecordBatch>,
/// Partition ID. /// Partition ID.
id: PartitionId, id: PartitionId,
@ -17,28 +18,9 @@ pub(crate) struct PartitionResponse {
completed_persistence_count: u64, completed_persistence_count: u64,
} }
impl std::fmt::Debug for PartitionResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PartitionResponse")
.field(
"batches",
&match self.batches {
Some(_) => "<SNAPSHOT STREAM>",
None => "<NO DATA>,",
},
)
.field("partition_id", &self.id)
.field(
"completed_persistence_count",
&self.completed_persistence_count,
)
.finish()
}
}
impl PartitionResponse { impl PartitionResponse {
pub(crate) fn new( pub(crate) fn new(
data: Option<SendableRecordBatchStream>, data: Vec<RecordBatch>,
id: PartitionId, id: PartitionId,
completed_persistence_count: u64, completed_persistence_count: u64,
) -> Self { ) -> Self {
@ -57,7 +39,7 @@ impl PartitionResponse {
self.completed_persistence_count self.completed_persistence_count
} }
pub(crate) fn into_record_batch_stream(self) -> Option<SendableRecordBatchStream> { pub(crate) fn into_record_batches(self) -> Vec<RecordBatch> {
self.batches self.batches
} }
} }

View File

@ -2,10 +2,8 @@
//! //!
//! [`QueryExec::query_exec()`]: super::QueryExec::query_exec() //! [`QueryExec::query_exec()`]: super::QueryExec::query_exec()
use std::{future, pin::Pin}; use std::pin::Pin;
use arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use super::partition_response::PartitionResponse; use super::partition_response::PartitionResponse;
@ -48,13 +46,4 @@ impl QueryResponse {
pub(crate) fn into_partition_stream(self) -> impl Stream<Item = PartitionResponse> { pub(crate) fn into_partition_stream(self) -> impl Stream<Item = PartitionResponse> {
self.partitions.0 self.partitions.0
} }
/// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`].
pub(crate) fn into_record_batches(
self,
) -> impl Stream<Item = Result<RecordBatch, DataFusionError>> {
self.into_partition_stream()
.filter_map(|partition| future::ready(partition.into_record_batch_stream()))
.flatten()
}
} }

View File

@ -25,41 +25,34 @@
//! ┼ //! ┼
//! │ ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ //! │ ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐
//! ╔═══════════════════╗ //! ╔═══════════════════╗
//! └ ─ ─ ─ ─ ─ ─ ▶ ║ PartitionResponse ║ │──────────────┐ //! └ ─ ─ ─ ─ ─ ─ ▶ ║ PartitionResponse ║ │
//! ╚═══════════════════╝ Injects //! ╚═══════════════════╝
//! └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ //! └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘
//! │ //! │
//! │ ┌───────────────────┐ //! │
//! ┼ │BatchStreamRecorder│ //! ┼
//! ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ └───────────────────┘ //! ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐
//! ╔═══════════════════╗ //! ╔═══════════════════╗
//! │ ║ RecordBatchStream ║ ├ ─ ─ ─ ─ ─ ─ ─ //! │ ║ RecordBatchStream ║
//! ╚═══════════════════╝ //! ╚═══════════════════╝
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ //! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
//! ``` //! ```
//! //!
//! The [`QueryMetricContext`] is injected into the [`QueryResponse`], recording //! The [`QueryMetricContext`] is injected into the [`QueryResponse`], recording
//! the lifetime of the [`QueryResponse`] itself, and further injecting //! the lifetime of the [`QueryResponse`] itself, and observes the [`RecordBatch`]es
//! instances of [`BatchStreamRecorder`] into each [`PartitionResponse`] to //! produced by each [`PartitionResponse`].
//! observe the per-partition stream of [`RecordBatch`] that are yielded from //!
//! it. //!
//! [`RecordBatch`]: arrow::record_batch::RecordBatch
use std::{ use std::{
pin::Pin, pin::Pin,
sync::{ sync::atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll}, task::{Context, Poll},
}; };
use arrow::record_batch::RecordBatch;
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, TableId};
use datafusion::{
error::DataFusionError,
physical_plan::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::Stream; use futures::Stream;
use iox_time::{SystemProvider, Time, TimeProvider}; use iox_time::{SystemProvider, Time, TimeProvider};
use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions}; use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions};
@ -82,24 +75,21 @@ use crate::query::{
/// ///
/// Additionally the distribution of row, partition and [`RecordBatch`] counts /// Additionally the distribution of row, partition and [`RecordBatch`] counts
/// are recorded. /// are recorded.
///
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct QueryResultInstrumentation<T, P = SystemProvider> { pub(crate) struct QueryResultInstrumentation<T, P = SystemProvider> {
inner: T, inner: T,
time_provider: P, time_provider: P,
/// A histogram to capture the consume time for a stream that was entirely /// A histogram to capture the consume time for a stream that was entirely
/// consumed (yielded [`Poll::Ready(None)`]) without ever observing an /// consumed (yielded [`Poll::Ready(None)`]).
/// [`Err`]. completed: DurationHistogram,
completed_ok: DurationHistogram,
/// As above but the stream returned at least one [`Err`] item; the stream /// Like [`Self::completed`], but for a stream that was not consumed to
/// was still consumed to completion.
completed_err: DurationHistogram,
/// Like [`Self::completed_ok`], but for a stream that was not consumed to
/// completion (dropped before returning [`Poll::Ready(None)`])]). /// completion (dropped before returning [`Poll::Ready(None)`])]).
aborted_ok: DurationHistogram, aborted: DurationHistogram,
aborted_err: DurationHistogram,
// Histograms to capture the distribution of row/batch/partition // Histograms to capture the distribution of row/batch/partition
// counts per query at the end of the query. // counts per query at the end of the query.
@ -173,10 +163,8 @@ impl<T> QueryResultInstrumentation<T> {
Self { Self {
inner, inner,
time_provider: Default::default(), time_provider: Default::default(),
completed_ok: duration.recorder(&[("request", "complete"), ("has_error", "false")]), completed: duration.recorder(&[("request", "complete")]),
completed_err: duration.recorder(&[("request", "complete"), ("has_error", "true")]), aborted: duration.recorder(&[("request", "incomplete")]),
aborted_ok: duration.recorder(&[("request", "incomplete"), ("has_error", "false")]),
aborted_err: duration.recorder(&[("request", "incomplete"), ("has_error", "true")]),
row_hist, row_hist,
record_batch_hist, record_batch_hist,
partition_hist, partition_hist,
@ -193,10 +181,8 @@ impl<T, P> QueryResultInstrumentation<T, P> {
QueryResultInstrumentation { QueryResultInstrumentation {
inner: self.inner, inner: self.inner,
time_provider, time_provider,
completed_ok: self.completed_ok, completed: self.completed,
completed_err: self.completed_err, aborted: self.aborted,
aborted_ok: self.aborted_ok,
aborted_err: self.aborted_err,
row_hist: self.row_hist, row_hist: self.row_hist,
record_batch_hist: self.record_batch_hist, record_batch_hist: self.record_batch_hist,
partition_hist: self.partition_hist, partition_hist: self.partition_hist,
@ -230,10 +216,8 @@ where
stream.into_partition_stream(), stream.into_partition_stream(),
started_at, started_at,
self.time_provider.clone(), self.time_provider.clone(),
self.completed_ok.clone(), self.completed.clone(),
self.completed_err.clone(), self.aborted.clone(),
self.aborted_ok.clone(),
self.aborted_err.clone(),
self.row_hist.clone(), self.row_hist.clone(),
self.record_batch_hist.clone(), self.record_batch_hist.clone(),
self.partition_hist.clone(), self.partition_hist.clone(),
@ -248,10 +232,11 @@ where
/// Once the last [`PartitionResponse`] is consumed to completion, this type is /// Once the last [`PartitionResponse`] is consumed to completion, this type is
/// dropped and the metrics it has gathered are emitted at drop time. /// dropped and the metrics it has gathered are emitted at drop time.
/// ///
/// This type is responsible for decorating all [`PartitionResponse`] yielded /// This type is responsible for capturing the statistics of each [`RecordBatch`]
/// from the result stream with [`BatchStreamRecorder`] instances, in turn /// in the [`PartitionResponse`].
/// capturing the statistics of each [`RecordBatch`] in the ///
/// [`PartitionResponse`]. ///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[pin_project(PinnedDrop)] #[pin_project(PinnedDrop)]
#[derive(Debug)] #[derive(Debug)]
struct QueryMetricContext<S, P = SystemProvider> struct QueryMetricContext<S, P = SystemProvider>
@ -264,8 +249,13 @@ where
#[pin] #[pin]
inner: S, inner: S,
/// The metric state shared with child [`BatchStreamRecorder`] instances. /// Running counts of row, partition, and [`RecordBatch`]
state: Arc<MetricState>, /// returned for this query so far.
///
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
row_count: AtomicUsize,
record_batch_count: AtomicUsize,
/// The timestamp at which the read request began, inclusive of the work /// The timestamp at which the read request began, inclusive of the work
/// required to acquire the inner stream (which may involve fetching all the /// required to acquire the inner stream (which may involve fetching all the
@ -281,10 +271,8 @@ where
partition_count: usize, partition_count: usize,
/// The latency histograms faceted by completion/error state. /// The latency histograms faceted by completion/error state.
completed_ok: DurationHistogram, completed: DurationHistogram,
completed_err: DurationHistogram, aborted: DurationHistogram,
aborted_ok: DurationHistogram,
aborted_err: DurationHistogram,
/// Row/record batch/partition count distribution histograms. /// Row/record batch/partition count distribution histograms.
row_hist: U64Histogram, row_hist: U64Histogram,
@ -301,10 +289,8 @@ where
stream: S, stream: S,
started_at: Time, started_at: Time,
time_provider: P, time_provider: P,
completed_ok: DurationHistogram, completed: DurationHistogram,
completed_err: DurationHistogram, aborted: DurationHistogram,
aborted_ok: DurationHistogram,
aborted_err: DurationHistogram,
row_hist: U64Histogram, row_hist: U64Histogram,
record_batch_hist: U64Histogram, record_batch_hist: U64Histogram,
partition_hist: U64Histogram, partition_hist: U64Histogram,
@ -314,15 +300,14 @@ where
time_provider, time_provider,
started_at, started_at,
completed_at: None, completed_at: None,
completed_ok, completed,
completed_err, aborted,
aborted_ok,
aborted_err,
row_hist, row_hist,
record_batch_hist, record_batch_hist,
partition_hist, partition_hist,
partition_count: 0, partition_count: 0,
state: Default::default(), row_count: Default::default(),
record_batch_count: Default::default(),
} }
} }
} }
@ -348,16 +333,15 @@ where
// And wrap the underlying stream of RecordBatch for this // And wrap the underlying stream of RecordBatch for this
// partition with a metric observer. // partition with a metric observer.
let record_stream = p.into_record_batch_stream().map(|s| { let data = p.into_record_batches();
Box::pin(BatchStreamRecorder::new(s, Arc::clone(this.state))) this.row_count.fetch_add(
as SendableRecordBatchStream data.iter().map(|batch| batch.num_rows()).sum::<usize>(),
}); Ordering::Relaxed,
);
this.record_batch_count
.fetch_add(data.len(), Ordering::Relaxed);
Poll::Ready(Some(PartitionResponse::new( Poll::Ready(Some(PartitionResponse::new(data, id, persist_count)))
record_stream,
id,
persist_count,
)))
} }
Poll::Ready(None) => { Poll::Ready(None) => {
// Record the wall clock timestamp of the stream end. // Record the wall clock timestamp of the stream end.
@ -380,9 +364,8 @@ where
{ {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
// Record the captured metrics. // Record the captured metrics.
let did_observe_error = self.state.did_observe_error.load(Ordering::Relaxed); let row_count = self.row_count.load(Ordering::Relaxed) as u64;
let row_count = self.state.row_count.load(Ordering::Relaxed) as u64; let record_batch_count = self.record_batch_count.load(Ordering::Relaxed) as u64;
let record_batch_count = self.state.record_batch_count.load(Ordering::Relaxed) as u64;
let partition_count = self.partition_count; let partition_count = self.partition_count;
// Record the row/record batch/partition counts for this query. // Record the row/record batch/partition counts for this query.
@ -395,10 +378,8 @@ where
// //
// If completed_at is None, the stream was aborted before completion. // If completed_at is None, the stream was aborted before completion.
let hist = match self.completed_at { let hist = match self.completed_at {
Some(_) if !did_observe_error => &self.completed_ok, Some(_) => &self.completed,
Some(_) => &self.completed_err, None => &self.aborted,
None if !did_observe_error => &self.aborted_ok,
None => &self.aborted_err,
}; };
// Record the duration, either up to the time of stream completion, or // Record the duration, either up to the time of stream completion, or
@ -415,104 +396,28 @@ where
match self.completed_at { match self.completed_at {
Some(_) => debug!( Some(_) => debug!(
?duration, ?duration,
did_observe_error, row_count, record_batch_count, partition_count, "completed streaming query results",
row_count,
record_batch_count,
partition_count,
"completed streaming query results",
), ),
None => debug!( None => debug!(
?duration, ?duration,
did_observe_error, row_count, record_batch_count, partition_count, "aborted streaming query results",
row_count,
record_batch_count,
partition_count,
"aborted streaming query results",
), ),
}; };
} }
} }
/// State shared between the parent [`QueryMetricContext`] and all of the child /// State for every call query (used to aggregate data that will later be written into histograms).
/// [`BatchStreamRecorder`] it has instantiated.
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct MetricState { struct MetricState {
/// True if at least one [`Result`] yielded by this result stream so far has
/// been an [`Err`].
//
/// This is used to select the correct success/error histogram which records
/// the operation duration.
did_observe_error: AtomicBool,
/// Running counts of row, partition, and [`RecordBatch`] /// Running counts of row, partition, and [`RecordBatch`]
/// returned for this query so far. /// returned for this query so far.
///
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
row_count: AtomicUsize, row_count: AtomicUsize,
record_batch_count: AtomicUsize, record_batch_count: AtomicUsize,
} }
/// Capture row/[`RecordBatch`]/error statistics.
///
/// Inspects each [`RecordBatch`] yielded in the result stream, scoped to a
/// single [`PartitionResponse`].
#[pin_project]
struct BatchStreamRecorder {
#[pin]
inner: SendableRecordBatchStream,
shared_state: Arc<MetricState>,
}
impl BatchStreamRecorder {
fn new(stream: SendableRecordBatchStream, shared_state: Arc<MetricState>) -> Self {
Self {
inner: stream,
shared_state,
}
}
}
impl RecordBatchStream for BatchStreamRecorder {
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.inner.schema()
}
}
impl Stream for BatchStreamRecorder {
type Item = Result<RecordBatch, DataFusionError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let res = this.inner.poll_next(cx);
match &res {
Poll::Ready(Some(Ok(batch))) => {
// Record the count statistics in this batch.
this.shared_state
.row_count
.fetch_add(batch.num_rows(), Ordering::Relaxed);
this.shared_state
.record_batch_count
.fetch_add(1, Ordering::Relaxed);
}
Poll::Ready(Some(Err(_e))) => {
// Record that at least one poll returned an error.
this.shared_state
.did_observe_error
.store(true, Ordering::Relaxed);
}
Poll::Ready(None) => {}
Poll::Pending => {}
}
res
}
fn size_hint(&self) -> (usize, Option<usize>) {
// Impl the default size_hint() so this wrapper doesn't mask the size
// hint from the inner stream, if any.
self.inner.size_hint()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
@ -523,7 +428,6 @@ mod tests {
use arrow::array::{Float32Array, Int64Array}; use arrow::array::{Float32Array, Int64Array};
use data_types::PartitionId; use data_types::PartitionId;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::{stream, StreamExt}; use futures::{stream, StreamExt};
use iox_time::MockProvider; use iox_time::MockProvider;
use metric::{assert_histogram, Attributes}; use metric::{assert_histogram, Attributes};
@ -540,7 +444,7 @@ mod tests {
// Construct a stream with no batches. // Construct a stream with no batches.
let stream = PartitionStream::new(stream::iter([PartitionResponse::new( let stream = PartitionStream::new(stream::iter([PartitionResponse::new(
None, vec![],
PartitionId::new(42), PartitionId::new(42),
42, 42,
)])); )]));
@ -560,11 +464,7 @@ mod tests {
// Drain the query results, moving past any errors, and collecting the // Drain the query results, moving past any errors, and collecting the
// final set of all Ok record batches for comparison. // final set of all Ok record batches for comparison.
let _batches = response let _partitions = response.into_partition_stream().collect::<Vec<_>>().await;
.into_record_batches()
.filter_map(|v| async { v.ok() })
.collect::<Vec<_>>()
.await;
assert_histogram!( assert_histogram!(
metrics, metrics,
@ -591,7 +491,7 @@ mod tests {
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), labels = Attributes::from(&[("request", "complete")]),
samples = 1, samples = 1,
sum = TIME_STEP, sum = TIME_STEP,
); );
@ -599,21 +499,7 @@ mod tests {
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), labels = Attributes::from(&[("request", "incomplete")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]),
samples = 0, samples = 0,
); );
} }
@ -659,11 +545,7 @@ mod tests {
// Drain the query results, moving past any errors, and collecting the // Drain the query results, moving past any errors, and collecting the
// final set of all Ok record batches for comparison. // final set of all Ok record batches for comparison.
let _batches = response let _partitions = response.into_partition_stream().collect::<Vec<_>>().await;
.into_record_batches()
.filter_map(|v| async { v.ok() })
.collect::<Vec<_>>()
.await;
assert_histogram!( assert_histogram!(
metrics, metrics,
@ -690,7 +572,7 @@ mod tests {
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), labels = Attributes::from(&[("request", "complete")]),
samples = 1, samples = 1,
sum = TIME_STEP, sum = TIME_STEP,
); );
@ -698,21 +580,7 @@ mod tests {
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), labels = Attributes::from(&[("request", "incomplete")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]),
samples = 0, samples = 0,
); );
} }
@ -784,31 +652,17 @@ mod tests {
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), labels = Attributes::from(&[("request", "complete")]),
samples = 0, samples = 0,
); );
assert_histogram!( assert_histogram!(
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), labels = Attributes::from(&[("request", "incomplete")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]),
samples = 1, samples = 1,
sum = TIME_STEP, // It was recorded as an incomplete request sum = TIME_STEP, // It was recorded as an incomplete request
); );
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]),
samples = 0,
);
} }
/// A query result which is dropped after partially reading the data should /// A query result which is dropped after partially reading the data should
@ -849,27 +703,25 @@ mod tests {
// Now the response has been created, advance the clock // Now the response has been created, advance the clock
mock_time.inc(TIME_STEP); mock_time.inc(TIME_STEP);
let mut response = response.into_record_batches(); let mut response = response.into_partition_stream();
let got = response let got = response.next().await.expect("should yield first batch");
.next()
.await
.expect("should yield first batch")
.expect("mock doesn't return error");
drop(response); drop(response);
let batches = got.into_record_batches();
assert_histogram!( assert_histogram!(
metrics, metrics,
U64Histogram, U64Histogram,
"ingester_query_result_row", "ingester_query_result_row",
samples = 1, samples = 1,
sum = got.num_rows() as u64, sum = batches.iter().map(|batch| batch.num_rows()).sum::<usize>() as u64,
); );
assert_histogram!( assert_histogram!(
metrics, metrics,
U64Histogram, U64Histogram,
"ingester_query_result_record_batch", "ingester_query_result_record_batch",
samples = 1, samples = 1,
sum = 1, sum = batches.len() as u64,
); );
assert_histogram!( assert_histogram!(
metrics, metrics,
@ -882,237 +734,16 @@ mod tests {
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), labels = Attributes::from(&[("request", "complete")]),
samples = 0, samples = 0,
); );
assert_histogram!( assert_histogram!(
metrics, metrics,
DurationHistogram, DurationHistogram,
"ingester_query_stream_duration", "ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), labels = Attributes::from(&[("request", "incomplete")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]),
samples = 1, samples = 1,
sum = TIME_STEP, // It was recorded as an incomplete request sum = TIME_STEP, // It was recorded as an incomplete request
); );
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]),
samples = 0,
);
}
/// A query result which is dropped when observing an error should record
/// the various count statistics from any yielded batches and categorise the
/// result as having observed an error.
#[tokio::test]
async fn test_multi_partition_stream_with_error_abort() {
let metrics = metric::Registry::default();
// Construct the set of partitions and their record batches
let (ok_batch, schema) = make_batch!(
Int64Array("c" => vec![1, 2, 3, 4, 5]),
);
let stream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::iter([
Ok(ok_batch.clone()),
Err(DataFusionError::Internal("bananas".to_string())),
Ok(ok_batch),
]),
)) as SendableRecordBatchStream;
let stream = PartitionStream::new(stream::iter([PartitionResponse::new(
Some(stream),
PartitionId::new(1),
42,
)]));
let mock_time = Arc::new(MockProvider::new(Time::MIN));
let mock_inner = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream)));
let layer = QueryResultInstrumentation::new(mock_inner, &metrics)
.with_time_provider(Arc::clone(&mock_time));
let response = layer
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
.await
.expect("query should succeed");
// Now the response has been created, advance the clock
mock_time.inc(TIME_STEP);
let mut response = response.into_record_batches();
let got = response
.next()
.await
.expect("should yield first batch")
.expect("mock doesn't return error");
response
.next()
.await
.expect("more results should be available")
.expect_err("this batch should be an error");
// Drop the rest of the batches after observing an error.
drop(response);
assert_histogram!(
metrics,
U64Histogram,
"ingester_query_result_row",
samples = 1,
sum = got.num_rows() as u64,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_query_result_record_batch",
samples = 1,
sum = 1,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_query_result_partition",
samples = 1,
sum = 1,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]),
samples = 1,
sum = TIME_STEP, // Recorded as an incomplete request with error
);
}
/// A query result which is consumed to completion even after observing an
/// error should be correctly catagorised.
#[tokio::test]
async fn test_multi_partition_stream_with_error_completion() {
let metrics = metric::Registry::default();
// Construct the set of partitions and their record batches
let (ok_batch, schema) = make_batch!(
Int64Array("c" => vec![1, 2, 3, 4, 5]),
);
let stream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::iter([
Ok(ok_batch.clone()),
Err(DataFusionError::Internal("bananas".to_string())),
Ok(ok_batch),
]),
)) as SendableRecordBatchStream;
let stream = PartitionStream::new(stream::iter([PartitionResponse::new(
Some(stream),
PartitionId::new(1),
42,
)]));
let mock_time = Arc::new(MockProvider::new(Time::MIN));
let mock_inner = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream)));
let layer = QueryResultInstrumentation::new(mock_inner, &metrics)
.with_time_provider(Arc::clone(&mock_time));
let response = layer
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
.await
.expect("query should succeed");
// Now the response has been created, advance the clock
mock_time.inc(TIME_STEP);
// Drain the query results, moving past any errors, and collecting the
// final set of all Ok record batches for comparison.
let _batches = response
.into_record_batches()
.filter_map(|v| async { v.ok() })
.collect::<Vec<_>>()
.await;
assert_histogram!(
metrics,
U64Histogram,
"ingester_query_result_row",
samples = 1,
sum = 10, // 5 + 5
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_query_result_record_batch",
samples = 1,
sum = 2,
);
assert_histogram!(
metrics,
U64Histogram,
"ingester_query_result_partition",
samples = 1,
sum = 1,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]),
samples = 1,
sum = TIME_STEP, // Recorded as a complete request with error
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]),
samples = 0,
);
assert_histogram!(
metrics,
DurationHistogram,
"ingester_query_stream_duration",
labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]),
samples = 0,
);
} }
} }

View File

@ -306,29 +306,43 @@ fn encode_response(
response.into_partition_stream().flat_map(move |partition| { response.into_partition_stream().flat_map(move |partition| {
let partition_id = partition.id(); let partition_id = partition.id();
let completed_persistence_count = partition.completed_persistence_count(); let completed_persistence_count = partition.completed_persistence_count();
// prefix payload data w/ metadata for that particular partition
let head = futures::stream::once(async move { let head = futures::stream::once(async move {
encode_partition(partition_id, completed_persistence_count, ingester_id) encode_partition(partition_id, completed_persistence_count, ingester_id)
}); });
match partition.into_record_batch_stream() { // An output vector of FlightDataEncoder streams, each entry stream with
Some(stream) => { // a differing schema.
let stream = stream.map_err(|e| FlightError::ExternalError(Box::new(e))); //
// Optimized for the common case of there being a single consistent
// schema across all batches (1 stream).
let mut output = Vec::with_capacity(1);
let tail = FlightDataEncoderBuilder::new().build(stream); let mut batch_iter = partition.into_record_batches().into_iter().peekable();
head.chain(tail).boxed() // While there are more batches to process.
} while let Some(schema) = batch_iter.peek().map(|v| v.schema()) {
None => head.boxed(), output.push(
FlightDataEncoderBuilder::new().build(futures::stream::iter(
// Take all the RecordBatch with a matching schema
std::iter::from_fn(|| batch_iter.next_if(|v| v.schema() == schema))
.map(Ok)
.collect::<Vec<Result<_, FlightError>>>(),
)),
)
} }
head.chain(futures::stream::iter(output).flatten())
}) })
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use arrow::array::{Float64Array, Int32Array}; use arrow::array::{Float64Array, Int32Array};
use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream};
use assert_matches::assert_matches;
use bytes::Bytes; use bytes::Bytes;
use datafusion_util::MemoryStream;
use tonic::Code; use tonic::Code;
use crate::{ use crate::{
@ -375,30 +389,40 @@ mod tests {
/// Regression test for https://github.com/influxdata/idpe/issues/17408 /// Regression test for https://github.com/influxdata/idpe/issues/17408
#[tokio::test] #[tokio::test]
#[should_panic(
expected = "Invalid argument error: number of columns(1) must match number of fields(2) in schema"
)]
async fn test_chunks_with_different_schemas() { async fn test_chunks_with_different_schemas() {
let (batch1, _schema1) = make_batch!( let ingester_id = IngesterId::new();
let (batch1, schema1) = make_batch!(
Float64Array("float" => vec![1.1, 2.2, 3.3]), Float64Array("float" => vec![1.1, 2.2, 3.3]),
Int32Array("int" => vec![1, 2, 3]), Int32Array("int" => vec![1, 2, 3]),
); );
let (batch2, _schema2) = make_batch!( let (batch2, schema2) = make_batch!(
Int32Array("int" => vec![3, 4]), Float64Array("float" => vec![4.4]),
Int32Array("int" => vec![4]),
); );
assert_eq!(schema1, schema2);
let (batch3, schema3) = make_batch!(
Int32Array("int" => vec![5, 6]),
);
let (batch4, schema4) = make_batch!(
Float64Array("float" => vec![7.7]),
Int32Array("int" => vec![8]),
);
assert_eq!(schema1, schema4);
let flight = FlightService::new( let flight = FlightService::new(
MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new( MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new(
futures::stream::iter([PartitionResponse::new( futures::stream::iter([PartitionResponse::new(
Some(Box::pin(MemoryStream::new(vec![ vec![
batch1.clone(), batch1.clone(),
batch2.clone(), batch2.clone(),
]))), batch3.clone(),
PartitionId::new(1), batch4.clone(),
1, ],
PartitionId::new(2),
42,
)]), )]),
)))), )))),
IngesterId::new(), ingester_id,
100, 100,
&metric::Registry::default(), &metric::Registry::default(),
); );
@ -412,10 +436,82 @@ mod tests {
.unwrap() .unwrap()
.into_inner() .into_inner()
.map_err(FlightError::Tonic); .map_err(FlightError::Tonic);
let batch_stream = FlightRecordBatchStream::new_from_flight_data(response_stream); let flight_decoder =
let batches = batch_stream.try_collect::<Vec<_>>().await.unwrap(); FlightRecordBatchStream::new_from_flight_data(response_stream).into_inner();
assert_eq!(batches.len(), 2); let flight_data = flight_decoder.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches[0], batch1); assert_eq!(flight_data.len(), 8);
assert_eq!(batches[1], batch2);
// partition info
assert_matches!(flight_data[0].payload, DecodedPayload::None);
let md_actual =
proto::IngesterQueryResponseMetadata::decode(flight_data[0].app_metadata()).unwrap();
let md_expected = proto::IngesterQueryResponseMetadata {
partition_id: 2,
ingester_uuid: ingester_id.to_string(),
completed_persistence_count: 42,
};
assert_eq!(md_actual, md_expected);
// first & second chunk
match &flight_data[1].payload {
DecodedPayload::Schema(actual) => {
assert_eq!(actual, &schema1);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
match &flight_data[2].payload {
DecodedPayload::RecordBatch(actual) => {
assert_eq!(actual, &batch1);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
match &flight_data[3].payload {
DecodedPayload::RecordBatch(actual) => {
assert_eq!(actual, &batch2);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
// third chunk
match &flight_data[4].payload {
DecodedPayload::Schema(actual) => {
assert_eq!(actual, &schema3);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
match &flight_data[5].payload {
DecodedPayload::RecordBatch(actual) => {
assert_eq!(actual, &batch3);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
// forth chunk
match &flight_data[6].payload {
DecodedPayload::Schema(actual) => {
assert_eq!(actual, &schema4);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
match &flight_data[7].payload {
DecodedPayload::RecordBatch(actual) => {
assert_eq!(actual, &batch4);
}
other => {
panic!("Unexpected payload: {other:?}");
}
}
} }
} }

View File

@ -223,7 +223,6 @@ macro_rules! make_partition_stream {
)+ )+
) => {{ ) => {{
use arrow::datatypes::Schema; use arrow::datatypes::Schema;
use datafusion::physical_plan::memory::MemoryStream;
use $crate::query::{response::PartitionStream, partition_response::PartitionResponse}; use $crate::query::{response::PartitionStream, partition_response::PartitionResponse};
use futures::stream; use futures::stream;
@ -236,10 +235,10 @@ macro_rules! make_partition_stream {
batches.push(batch); batches.push(batch);
schema = Schema::try_merge([schema, (*this_schema).clone()]).expect("incompatible batch schemas"); schema = Schema::try_merge([schema, (*this_schema).clone()]).expect("incompatible batch schemas");
)+ )+
drop(schema);
let batch = MemoryStream::try_new(batches, Arc::new(schema), None).unwrap();
PartitionResponse::new( PartitionResponse::new(
Some(Box::pin(batch)), batches,
$id, $id,
42, 42,
) )

View File

@ -80,7 +80,7 @@ async fn write_query() {
let hist = ctx let hist = ctx
.get_metric::<DurationHistogram, _>( .get_metric::<DurationHistogram, _>(
"ingester_query_stream_duration", "ingester_query_stream_duration",
&[("request", "complete"), ("has_error", "false")], &[("request", "complete")],
) )
.fetch(); .fetch();
assert_eq!(hist.sample_count(), 1); assert_eq!(hist.sample_count(), 1);