diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 8fe93aa97c..ed2dc28036 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -15,7 +15,6 @@ backoff = { version = "0.1.0", path = "../backoff" } bytes = "1.4.0" crossbeam-utils = "0.8.15" data_types = { version = "0.1.0", path = "../data_types" } -datafusion_util = { path = "../datafusion_util" } datafusion.workspace = true dml = { version = "0.1.0", path = "../dml" } flatbuffers = "23.1.21" diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index e18a669038..f78cfc4e18 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -232,7 +232,7 @@ mod tests { use assert_matches::assert_matches; use data_types::{PartitionId, PartitionKey}; use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; - use futures::{StreamExt, TryStreamExt}; + use futures::StreamExt; use metric::{Attributes, Metric}; use super::*; @@ -360,10 +360,10 @@ mod tests { .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) .await .expect("query should succeed") - .into_record_batches() - .try_collect::>() - .await - .expect("query failed"); + .into_partition_stream() + .flat_map(|ps| futures::stream::iter(ps.into_record_batches())) + .collect::>() + .await; // Assert the contents of ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID assert_batches_sorted_eq!( @@ -952,11 +952,7 @@ mod tests { let partition = partitions.pop().unwrap(); // Perform the partition read - let batches = datafusion::physical_plan::common::collect( - partition.into_record_batch_stream().unwrap(), - ) - .await - .expect("failed to collate query results"); + let batches = partition.into_record_batches(); // Assert the contents of p1 contains both the initial write, and the // 3rd write in a single RecordBatch. diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 3bf805a8be..be7b586df8 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -6,7 +6,6 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; -use datafusion_util::MemoryStream; use mutable_batch::MutableBatch; use parking_lot::Mutex; use schema::Projection; @@ -238,12 +237,10 @@ where Projection::Some(columns.as_ref()) }; - let data = Box::pin(MemoryStream::new( - data.project_selection(selection).into_iter().collect(), - )); - PartitionResponse::new(Some(data), id, completed_persistence_count) + let data = data.project_selection(selection).into_iter().collect(); + PartitionResponse::new(data, id, completed_persistence_count) } - None => PartitionResponse::new(None, id, completed_persistence_count), + None => PartitionResponse::new(vec![], id, completed_persistence_count), }; span.ok("read partition data"); diff --git a/ingester/src/query/partition_response.rs b/ingester/src/query/partition_response.rs index 43c5802749..9fd6e6c672 100644 --- a/ingester/src/query/partition_response.rs +++ b/ingester/src/query/partition_response.rs @@ -2,13 +2,14 @@ //! //! [`QueryResponse`]: super::response::QueryResponse +use arrow::record_batch::RecordBatch; use data_types::PartitionId; -use datafusion::physical_plan::SendableRecordBatchStream; /// Response data for a single partition. +#[derive(Debug)] pub(crate) struct PartitionResponse { /// Stream of snapshots. - batches: Option, + batches: Vec, /// Partition ID. id: PartitionId, @@ -17,28 +18,9 @@ pub(crate) struct PartitionResponse { completed_persistence_count: u64, } -impl std::fmt::Debug for PartitionResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PartitionResponse") - .field( - "batches", - &match self.batches { - Some(_) => "", - None => ",", - }, - ) - .field("partition_id", &self.id) - .field( - "completed_persistence_count", - &self.completed_persistence_count, - ) - .finish() - } -} - impl PartitionResponse { pub(crate) fn new( - data: Option, + data: Vec, id: PartitionId, completed_persistence_count: u64, ) -> Self { @@ -57,7 +39,7 @@ impl PartitionResponse { self.completed_persistence_count } - pub(crate) fn into_record_batch_stream(self) -> Option { + pub(crate) fn into_record_batches(self) -> Vec { self.batches } } diff --git a/ingester/src/query/response.rs b/ingester/src/query/response.rs index e0b7f107bf..cc89d51f4a 100644 --- a/ingester/src/query/response.rs +++ b/ingester/src/query/response.rs @@ -2,10 +2,8 @@ //! //! [`QueryExec::query_exec()`]: super::QueryExec::query_exec() -use std::{future, pin::Pin}; +use std::pin::Pin; -use arrow::record_batch::RecordBatch; -use datafusion::error::DataFusionError; use futures::{Stream, StreamExt}; use super::partition_response::PartitionResponse; @@ -48,13 +46,4 @@ impl QueryResponse { pub(crate) fn into_partition_stream(self) -> impl Stream { self.partitions.0 } - - /// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`]. - pub(crate) fn into_record_batches( - self, - ) -> impl Stream> { - self.into_partition_stream() - .filter_map(|partition| future::ready(partition.into_record_batch_stream())) - .flatten() - } } diff --git a/ingester/src/query/result_instrumentation.rs b/ingester/src/query/result_instrumentation.rs index 21be8a2bfa..02f3a422a8 100644 --- a/ingester/src/query/result_instrumentation.rs +++ b/ingester/src/query/result_instrumentation.rs @@ -25,41 +25,34 @@ //! ┼ //! │ ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ //! ╔═══════════════════╗ -//! └ ─ ─ ─ ─ ─ ─ ▶ ║ PartitionResponse ║ │──────────────┐ -//! ╚═══════════════════╝ Injects -//! └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ │ -//! │ ▼ -//! │ ┌───────────────────┐ -//! ┼ │BatchStreamRecorder│ -//! ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ └───────────────────┘ -//! ╔═══════════════════╗ │ -//! │ ║ RecordBatchStream ║ ├ ─ ─ ─ ─ ─ ─ ─ +//! └ ─ ─ ─ ─ ─ ─ ▶ ║ PartitionResponse ║ │ +//! ╚═══════════════════╝ +//! └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ +//! │ +//! │ +//! ┼ +//! ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ +//! ╔═══════════════════╗ +//! │ ║ RecordBatchStream ║ │ //! ╚═══════════════════╝ //! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ //! ``` //! //! The [`QueryMetricContext`] is injected into the [`QueryResponse`], recording -//! the lifetime of the [`QueryResponse`] itself, and further injecting -//! instances of [`BatchStreamRecorder`] into each [`PartitionResponse`] to -//! observe the per-partition stream of [`RecordBatch`] that are yielded from -//! it. +//! the lifetime of the [`QueryResponse`] itself, and observes the [`RecordBatch`]es +//! produced by each [`PartitionResponse`]. +//! +//! +//! [`RecordBatch`]: arrow::record_batch::RecordBatch use std::{ pin::Pin, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, - }, + sync::atomic::{AtomicUsize, Ordering}, task::{Context, Poll}, }; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; use data_types::{NamespaceId, TableId}; -use datafusion::{ - error::DataFusionError, - physical_plan::{RecordBatchStream, SendableRecordBatchStream}, -}; use futures::Stream; use iox_time::{SystemProvider, Time, TimeProvider}; use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions}; @@ -82,24 +75,21 @@ use crate::query::{ /// /// Additionally the distribution of row, partition and [`RecordBatch`] counts /// are recorded. +/// +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Clone)] pub(crate) struct QueryResultInstrumentation { inner: T, time_provider: P, /// A histogram to capture the consume time for a stream that was entirely - /// consumed (yielded [`Poll::Ready(None)`]) without ever observing an - /// [`Err`]. - completed_ok: DurationHistogram, + /// consumed (yielded [`Poll::Ready(None)`]). + completed: DurationHistogram, - /// As above but the stream returned at least one [`Err`] item; the stream - /// was still consumed to completion. - completed_err: DurationHistogram, - - /// Like [`Self::completed_ok`], but for a stream that was not consumed to + /// Like [`Self::completed`], but for a stream that was not consumed to /// completion (dropped before returning [`Poll::Ready(None)`])]). - aborted_ok: DurationHistogram, - aborted_err: DurationHistogram, + aborted: DurationHistogram, // Histograms to capture the distribution of row/batch/partition // counts per query at the end of the query. @@ -173,10 +163,8 @@ impl QueryResultInstrumentation { Self { inner, time_provider: Default::default(), - completed_ok: duration.recorder(&[("request", "complete"), ("has_error", "false")]), - completed_err: duration.recorder(&[("request", "complete"), ("has_error", "true")]), - aborted_ok: duration.recorder(&[("request", "incomplete"), ("has_error", "false")]), - aborted_err: duration.recorder(&[("request", "incomplete"), ("has_error", "true")]), + completed: duration.recorder(&[("request", "complete")]), + aborted: duration.recorder(&[("request", "incomplete")]), row_hist, record_batch_hist, partition_hist, @@ -193,10 +181,8 @@ impl QueryResultInstrumentation { QueryResultInstrumentation { inner: self.inner, time_provider, - completed_ok: self.completed_ok, - completed_err: self.completed_err, - aborted_ok: self.aborted_ok, - aborted_err: self.aborted_err, + completed: self.completed, + aborted: self.aborted, row_hist: self.row_hist, record_batch_hist: self.record_batch_hist, partition_hist: self.partition_hist, @@ -230,10 +216,8 @@ where stream.into_partition_stream(), started_at, self.time_provider.clone(), - self.completed_ok.clone(), - self.completed_err.clone(), - self.aborted_ok.clone(), - self.aborted_err.clone(), + self.completed.clone(), + self.aborted.clone(), self.row_hist.clone(), self.record_batch_hist.clone(), self.partition_hist.clone(), @@ -248,10 +232,11 @@ where /// Once the last [`PartitionResponse`] is consumed to completion, this type is /// dropped and the metrics it has gathered are emitted at drop time. /// -/// This type is responsible for decorating all [`PartitionResponse`] yielded -/// from the result stream with [`BatchStreamRecorder`] instances, in turn -/// capturing the statistics of each [`RecordBatch`] in the -/// [`PartitionResponse`]. +/// This type is responsible for capturing the statistics of each [`RecordBatch`] +/// in the [`PartitionResponse`]. +/// +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[pin_project(PinnedDrop)] #[derive(Debug)] struct QueryMetricContext @@ -264,8 +249,13 @@ where #[pin] inner: S, - /// The metric state shared with child [`BatchStreamRecorder`] instances. - state: Arc, + /// Running counts of row, partition, and [`RecordBatch`] + /// returned for this query so far. + /// + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch + row_count: AtomicUsize, + record_batch_count: AtomicUsize, /// The timestamp at which the read request began, inclusive of the work /// required to acquire the inner stream (which may involve fetching all the @@ -281,10 +271,8 @@ where partition_count: usize, /// The latency histograms faceted by completion/error state. - completed_ok: DurationHistogram, - completed_err: DurationHistogram, - aborted_ok: DurationHistogram, - aborted_err: DurationHistogram, + completed: DurationHistogram, + aborted: DurationHistogram, /// Row/record batch/partition count distribution histograms. row_hist: U64Histogram, @@ -301,10 +289,8 @@ where stream: S, started_at: Time, time_provider: P, - completed_ok: DurationHistogram, - completed_err: DurationHistogram, - aborted_ok: DurationHistogram, - aborted_err: DurationHistogram, + completed: DurationHistogram, + aborted: DurationHistogram, row_hist: U64Histogram, record_batch_hist: U64Histogram, partition_hist: U64Histogram, @@ -314,15 +300,14 @@ where time_provider, started_at, completed_at: None, - completed_ok, - completed_err, - aborted_ok, - aborted_err, + completed, + aborted, row_hist, record_batch_hist, partition_hist, partition_count: 0, - state: Default::default(), + row_count: Default::default(), + record_batch_count: Default::default(), } } } @@ -348,16 +333,15 @@ where // And wrap the underlying stream of RecordBatch for this // partition with a metric observer. - let record_stream = p.into_record_batch_stream().map(|s| { - Box::pin(BatchStreamRecorder::new(s, Arc::clone(this.state))) - as SendableRecordBatchStream - }); + let data = p.into_record_batches(); + this.row_count.fetch_add( + data.iter().map(|batch| batch.num_rows()).sum::(), + Ordering::Relaxed, + ); + this.record_batch_count + .fetch_add(data.len(), Ordering::Relaxed); - Poll::Ready(Some(PartitionResponse::new( - record_stream, - id, - persist_count, - ))) + Poll::Ready(Some(PartitionResponse::new(data, id, persist_count))) } Poll::Ready(None) => { // Record the wall clock timestamp of the stream end. @@ -380,9 +364,8 @@ where { fn drop(self: Pin<&mut Self>) { // Record the captured metrics. - let did_observe_error = self.state.did_observe_error.load(Ordering::Relaxed); - let row_count = self.state.row_count.load(Ordering::Relaxed) as u64; - let record_batch_count = self.state.record_batch_count.load(Ordering::Relaxed) as u64; + let row_count = self.row_count.load(Ordering::Relaxed) as u64; + let record_batch_count = self.record_batch_count.load(Ordering::Relaxed) as u64; let partition_count = self.partition_count; // Record the row/record batch/partition counts for this query. @@ -395,10 +378,8 @@ where // // If completed_at is None, the stream was aborted before completion. let hist = match self.completed_at { - Some(_) if !did_observe_error => &self.completed_ok, - Some(_) => &self.completed_err, - None if !did_observe_error => &self.aborted_ok, - None => &self.aborted_err, + Some(_) => &self.completed, + None => &self.aborted, }; // Record the duration, either up to the time of stream completion, or @@ -415,104 +396,28 @@ where match self.completed_at { Some(_) => debug!( ?duration, - did_observe_error, - row_count, - record_batch_count, - partition_count, - "completed streaming query results", + row_count, record_batch_count, partition_count, "completed streaming query results", ), None => debug!( ?duration, - did_observe_error, - row_count, - record_batch_count, - partition_count, - "aborted streaming query results", + row_count, record_batch_count, partition_count, "aborted streaming query results", ), }; } } -/// State shared between the parent [`QueryMetricContext`] and all of the child -/// [`BatchStreamRecorder`] it has instantiated. +/// State for every call query (used to aggregate data that will later be written into histograms). #[derive(Debug, Default)] struct MetricState { - /// True if at least one [`Result`] yielded by this result stream so far has - /// been an [`Err`]. - // - /// This is used to select the correct success/error histogram which records - /// the operation duration. - did_observe_error: AtomicBool, - /// Running counts of row, partition, and [`RecordBatch`] /// returned for this query so far. + /// + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch row_count: AtomicUsize, record_batch_count: AtomicUsize, } -/// Capture row/[`RecordBatch`]/error statistics. -/// -/// Inspects each [`RecordBatch`] yielded in the result stream, scoped to a -/// single [`PartitionResponse`]. -#[pin_project] -struct BatchStreamRecorder { - #[pin] - inner: SendableRecordBatchStream, - shared_state: Arc, -} - -impl BatchStreamRecorder { - fn new(stream: SendableRecordBatchStream, shared_state: Arc) -> Self { - Self { - inner: stream, - shared_state, - } - } -} - -impl RecordBatchStream for BatchStreamRecorder { - fn schema(&self) -> arrow::datatypes::SchemaRef { - self.inner.schema() - } -} - -impl Stream for BatchStreamRecorder { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - let res = this.inner.poll_next(cx); - match &res { - Poll::Ready(Some(Ok(batch))) => { - // Record the count statistics in this batch. - this.shared_state - .row_count - .fetch_add(batch.num_rows(), Ordering::Relaxed); - this.shared_state - .record_batch_count - .fetch_add(1, Ordering::Relaxed); - } - Poll::Ready(Some(Err(_e))) => { - // Record that at least one poll returned an error. - this.shared_state - .did_observe_error - .store(true, Ordering::Relaxed); - } - Poll::Ready(None) => {} - Poll::Pending => {} - } - - res - } - - fn size_hint(&self) -> (usize, Option) { - // Impl the default size_hint() so this wrapper doesn't mask the size - // hint from the inner stream, if any. - self.inner.size_hint() - } -} - #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration}; @@ -523,7 +428,6 @@ mod tests { use arrow::array::{Float32Array, Int64Array}; use data_types::PartitionId; - use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::{stream, StreamExt}; use iox_time::MockProvider; use metric::{assert_histogram, Attributes}; @@ -540,7 +444,7 @@ mod tests { // Construct a stream with no batches. let stream = PartitionStream::new(stream::iter([PartitionResponse::new( - None, + vec![], PartitionId::new(42), 42, )])); @@ -560,11 +464,7 @@ mod tests { // Drain the query results, moving past any errors, and collecting the // final set of all Ok record batches for comparison. - let _batches = response - .into_record_batches() - .filter_map(|v| async { v.ok() }) - .collect::>() - .await; + let _partitions = response.into_partition_stream().collect::>().await; assert_histogram!( metrics, @@ -591,7 +491,7 @@ mod tests { metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), + labels = Attributes::from(&[("request", "complete")]), samples = 1, sum = TIME_STEP, ); @@ -599,21 +499,7 @@ mod tests { metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]), + labels = Attributes::from(&[("request", "incomplete")]), samples = 0, ); } @@ -659,11 +545,7 @@ mod tests { // Drain the query results, moving past any errors, and collecting the // final set of all Ok record batches for comparison. - let _batches = response - .into_record_batches() - .filter_map(|v| async { v.ok() }) - .collect::>() - .await; + let _partitions = response.into_partition_stream().collect::>().await; assert_histogram!( metrics, @@ -690,7 +572,7 @@ mod tests { metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), + labels = Attributes::from(&[("request", "complete")]), samples = 1, sum = TIME_STEP, ); @@ -698,21 +580,7 @@ mod tests { metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]), + labels = Attributes::from(&[("request", "incomplete")]), samples = 0, ); } @@ -784,31 +652,17 @@ mod tests { metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), + labels = Attributes::from(&[("request", "complete")]), samples = 0, ); assert_histogram!( metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]), + labels = Attributes::from(&[("request", "incomplete")]), samples = 1, sum = TIME_STEP, // It was recorded as an incomplete request ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]), - samples = 0, - ); } /// A query result which is dropped after partially reading the data should @@ -849,27 +703,25 @@ mod tests { // Now the response has been created, advance the clock mock_time.inc(TIME_STEP); - let mut response = response.into_record_batches(); - let got = response - .next() - .await - .expect("should yield first batch") - .expect("mock doesn't return error"); + let mut response = response.into_partition_stream(); + let got = response.next().await.expect("should yield first batch"); drop(response); + let batches = got.into_record_batches(); + assert_histogram!( metrics, U64Histogram, "ingester_query_result_row", samples = 1, - sum = got.num_rows() as u64, + sum = batches.iter().map(|batch| batch.num_rows()).sum::() as u64, ); assert_histogram!( metrics, U64Histogram, "ingester_query_result_record_batch", samples = 1, - sum = 1, + sum = batches.len() as u64, ); assert_histogram!( metrics, @@ -882,237 +734,16 @@ mod tests { metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), + labels = Attributes::from(&[("request", "complete")]), samples = 0, ); assert_histogram!( metrics, DurationHistogram, "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]), + labels = Attributes::from(&[("request", "incomplete")]), samples = 1, sum = TIME_STEP, // It was recorded as an incomplete request ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]), - samples = 0, - ); - } - - /// A query result which is dropped when observing an error should record - /// the various count statistics from any yielded batches and categorise the - /// result as having observed an error. - #[tokio::test] - async fn test_multi_partition_stream_with_error_abort() { - let metrics = metric::Registry::default(); - - // Construct the set of partitions and their record batches - let (ok_batch, schema) = make_batch!( - Int64Array("c" => vec![1, 2, 3, 4, 5]), - ); - - let stream = Box::pin(RecordBatchStreamAdapter::new( - schema, - stream::iter([ - Ok(ok_batch.clone()), - Err(DataFusionError::Internal("bananas".to_string())), - Ok(ok_batch), - ]), - )) as SendableRecordBatchStream; - - let stream = PartitionStream::new(stream::iter([PartitionResponse::new( - Some(stream), - PartitionId::new(1), - 42, - )])); - - let mock_time = Arc::new(MockProvider::new(Time::MIN)); - let mock_inner = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream))); - let layer = QueryResultInstrumentation::new(mock_inner, &metrics) - .with_time_provider(Arc::clone(&mock_time)); - - let response = layer - .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) - .await - .expect("query should succeed"); - - // Now the response has been created, advance the clock - mock_time.inc(TIME_STEP); - - let mut response = response.into_record_batches(); - let got = response - .next() - .await - .expect("should yield first batch") - .expect("mock doesn't return error"); - - response - .next() - .await - .expect("more results should be available") - .expect_err("this batch should be an error"); - - // Drop the rest of the batches after observing an error. - drop(response); - - assert_histogram!( - metrics, - U64Histogram, - "ingester_query_result_row", - samples = 1, - sum = got.num_rows() as u64, - ); - assert_histogram!( - metrics, - U64Histogram, - "ingester_query_result_record_batch", - samples = 1, - sum = 1, - ); - assert_histogram!( - metrics, - U64Histogram, - "ingester_query_result_partition", - samples = 1, - sum = 1, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]), - samples = 1, - sum = TIME_STEP, // Recorded as an incomplete request with error - ); - } - - /// A query result which is consumed to completion even after observing an - /// error should be correctly catagorised. - #[tokio::test] - async fn test_multi_partition_stream_with_error_completion() { - let metrics = metric::Registry::default(); - - // Construct the set of partitions and their record batches - let (ok_batch, schema) = make_batch!( - Int64Array("c" => vec![1, 2, 3, 4, 5]), - ); - - let stream = Box::pin(RecordBatchStreamAdapter::new( - schema, - stream::iter([ - Ok(ok_batch.clone()), - Err(DataFusionError::Internal("bananas".to_string())), - Ok(ok_batch), - ]), - )) as SendableRecordBatchStream; - - let stream = PartitionStream::new(stream::iter([PartitionResponse::new( - Some(stream), - PartitionId::new(1), - 42, - )])); - - let mock_time = Arc::new(MockProvider::new(Time::MIN)); - let mock_inner = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream))); - let layer = QueryResultInstrumentation::new(mock_inner, &metrics) - .with_time_provider(Arc::clone(&mock_time)); - - let response = layer - .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) - .await - .expect("query should succeed"); - - // Now the response has been created, advance the clock - mock_time.inc(TIME_STEP); - - // Drain the query results, moving past any errors, and collecting the - // final set of all Ok record batches for comparison. - let _batches = response - .into_record_batches() - .filter_map(|v| async { v.ok() }) - .collect::>() - .await; - - assert_histogram!( - metrics, - U64Histogram, - "ingester_query_result_row", - samples = 1, - sum = 10, // 5 + 5 - ); - assert_histogram!( - metrics, - U64Histogram, - "ingester_query_result_record_batch", - samples = 1, - sum = 2, - ); - assert_histogram!( - metrics, - U64Histogram, - "ingester_query_result_partition", - samples = 1, - sum = 1, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "false")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "complete"), ("has_error", "true")]), - samples = 1, - sum = TIME_STEP, // Recorded as a complete request with error - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "false")]), - samples = 0, - ); - assert_histogram!( - metrics, - DurationHistogram, - "ingester_query_stream_duration", - labels = Attributes::from(&[("request", "incomplete"), ("has_error", "true")]), - samples = 0, - ); } } diff --git a/ingester/src/server/grpc/query.rs b/ingester/src/server/grpc/query.rs index aba97e47ce..a4b0f66d4b 100644 --- a/ingester/src/server/grpc/query.rs +++ b/ingester/src/server/grpc/query.rs @@ -306,29 +306,43 @@ fn encode_response( response.into_partition_stream().flat_map(move |partition| { let partition_id = partition.id(); let completed_persistence_count = partition.completed_persistence_count(); + + // prefix payload data w/ metadata for that particular partition let head = futures::stream::once(async move { encode_partition(partition_id, completed_persistence_count, ingester_id) }); - match partition.into_record_batch_stream() { - Some(stream) => { - let stream = stream.map_err(|e| FlightError::ExternalError(Box::new(e))); + // An output vector of FlightDataEncoder streams, each entry stream with + // a differing schema. + // + // Optimized for the common case of there being a single consistent + // schema across all batches (1 stream). + let mut output = Vec::with_capacity(1); - let tail = FlightDataEncoderBuilder::new().build(stream); + let mut batch_iter = partition.into_record_batches().into_iter().peekable(); - head.chain(tail).boxed() - } - None => head.boxed(), + // While there are more batches to process. + while let Some(schema) = batch_iter.peek().map(|v| v.schema()) { + output.push( + FlightDataEncoderBuilder::new().build(futures::stream::iter( + // Take all the RecordBatch with a matching schema + std::iter::from_fn(|| batch_iter.next_if(|v| v.schema() == schema)) + .map(Ok) + .collect::>>(), + )), + ) } + + head.chain(futures::stream::iter(output).flatten()) }) } #[cfg(test)] mod tests { use arrow::array::{Float64Array, Int32Array}; - use arrow_flight::decode::FlightRecordBatchStream; + use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream}; + use assert_matches::assert_matches; use bytes::Bytes; - use datafusion_util::MemoryStream; use tonic::Code; use crate::{ @@ -375,30 +389,40 @@ mod tests { /// Regression test for https://github.com/influxdata/idpe/issues/17408 #[tokio::test] - #[should_panic( - expected = "Invalid argument error: number of columns(1) must match number of fields(2) in schema" - )] async fn test_chunks_with_different_schemas() { - let (batch1, _schema1) = make_batch!( + let ingester_id = IngesterId::new(); + let (batch1, schema1) = make_batch!( Float64Array("float" => vec![1.1, 2.2, 3.3]), Int32Array("int" => vec![1, 2, 3]), ); - let (batch2, _schema2) = make_batch!( - Int32Array("int" => vec![3, 4]), + let (batch2, schema2) = make_batch!( + Float64Array("float" => vec![4.4]), + Int32Array("int" => vec![4]), ); + assert_eq!(schema1, schema2); + let (batch3, schema3) = make_batch!( + Int32Array("int" => vec![5, 6]), + ); + let (batch4, schema4) = make_batch!( + Float64Array("float" => vec![7.7]), + Int32Array("int" => vec![8]), + ); + assert_eq!(schema1, schema4); let flight = FlightService::new( MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new( futures::stream::iter([PartitionResponse::new( - Some(Box::pin(MemoryStream::new(vec![ + vec![ batch1.clone(), batch2.clone(), - ]))), - PartitionId::new(1), - 1, + batch3.clone(), + batch4.clone(), + ], + PartitionId::new(2), + 42, )]), )))), - IngesterId::new(), + ingester_id, 100, &metric::Registry::default(), ); @@ -412,10 +436,82 @@ mod tests { .unwrap() .into_inner() .map_err(FlightError::Tonic); - let batch_stream = FlightRecordBatchStream::new_from_flight_data(response_stream); - let batches = batch_stream.try_collect::>().await.unwrap(); - assert_eq!(batches.len(), 2); - assert_eq!(batches[0], batch1); - assert_eq!(batches[1], batch2); + let flight_decoder = + FlightRecordBatchStream::new_from_flight_data(response_stream).into_inner(); + let flight_data = flight_decoder.try_collect::>().await.unwrap(); + assert_eq!(flight_data.len(), 8); + + // partition info + assert_matches!(flight_data[0].payload, DecodedPayload::None); + let md_actual = + proto::IngesterQueryResponseMetadata::decode(flight_data[0].app_metadata()).unwrap(); + let md_expected = proto::IngesterQueryResponseMetadata { + partition_id: 2, + ingester_uuid: ingester_id.to_string(), + completed_persistence_count: 42, + }; + assert_eq!(md_actual, md_expected); + + // first & second chunk + match &flight_data[1].payload { + DecodedPayload::Schema(actual) => { + assert_eq!(actual, &schema1); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } + match &flight_data[2].payload { + DecodedPayload::RecordBatch(actual) => { + assert_eq!(actual, &batch1); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } + match &flight_data[3].payload { + DecodedPayload::RecordBatch(actual) => { + assert_eq!(actual, &batch2); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } + + // third chunk + match &flight_data[4].payload { + DecodedPayload::Schema(actual) => { + assert_eq!(actual, &schema3); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } + match &flight_data[5].payload { + DecodedPayload::RecordBatch(actual) => { + assert_eq!(actual, &batch3); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } + + // forth chunk + match &flight_data[6].payload { + DecodedPayload::Schema(actual) => { + assert_eq!(actual, &schema4); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } + match &flight_data[7].payload { + DecodedPayload::RecordBatch(actual) => { + assert_eq!(actual, &batch4); + } + other => { + panic!("Unexpected payload: {other:?}"); + } + } } } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 2bda8c2721..8332d691fc 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -223,7 +223,6 @@ macro_rules! make_partition_stream { )+ ) => {{ use arrow::datatypes::Schema; - use datafusion::physical_plan::memory::MemoryStream; use $crate::query::{response::PartitionStream, partition_response::PartitionResponse}; use futures::stream; @@ -236,10 +235,10 @@ macro_rules! make_partition_stream { batches.push(batch); schema = Schema::try_merge([schema, (*this_schema).clone()]).expect("incompatible batch schemas"); )+ + drop(schema); - let batch = MemoryStream::try_new(batches, Arc::new(schema), None).unwrap(); PartitionResponse::new( - Some(Box::pin(batch)), + batches, $id, 42, ) diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index c046fa6ba9..e7f9a20bdb 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -80,7 +80,7 @@ async fn write_query() { let hist = ctx .get_metric::( "ingester_query_stream_duration", - &[("request", "complete"), ("has_error", "false")], + &[("request", "complete")], ) .fetch(); assert_eq!(hist.sample_count(), 1);