From 4a8bb871dcd8c4d8c45d11622abb6cbd2578efb9 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 1 Dec 2022 12:51:47 +0100 Subject: [PATCH] refactor: revert stream-based `SeriesSetConvert::convert` interface (#6282) This reverts commit dad6dee924ef71b414e4fc3b79864e454f4f7fea. --- iox_query/src/exec/context.rs | 10 ++- iox_query/src/exec/seriesset/converter.rs | 93 +++++++++++------------ 2 files changed, 55 insertions(+), 48 deletions(-) diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 3dd8925ed1..d37cd724c0 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -451,9 +451,17 @@ impl IOxSessionContext { let it = ctx.execute_stream(physical_plan).await?; - SeriesSetConverter::default() + let series_sets = SeriesSetConverter::default() .convert(table_name, tag_columns, field_columns, it) .await + .map_err(|e| { + Error::Execution(format!( + "Error executing series set conversion: {}", + e + )) + })?; + + Ok(futures::stream::iter(series_sets).map(|x| Ok(x) as Result<_>)) }) }) .try_flatten() diff --git a/iox_query/src/exec/seriesset/converter.rs b/iox_query/src/exec/seriesset/converter.rs index 7ea4a1ca81..4de278b54d 100644 --- a/iox_query/src/exec/seriesset/converter.rs +++ b/iox_query/src/exec/seriesset/converter.rs @@ -15,8 +15,9 @@ use datafusion::{ }; use futures::{Stream, StreamExt, TryStreamExt}; -use snafu::{OptionExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; +use tokio::sync::mpsc::error::SendError; use crate::exec::{ field::{self, FieldColumns, FieldIndexes}, @@ -30,11 +31,40 @@ use super::{ #[derive(Debug, Snafu)] pub enum Error { + #[snafu(display("Plan Execution Error: {}", source))] + Execution { + source: Box, + }, + + #[snafu(display( + "Error reading record batch while converting from SeriesSet: {:?}", + source + ))] + Reading { + source: datafusion::error::DataFusionError, + }, + + #[snafu(display( + "Error concatenating record batch while converting from SeriesSet: {:?}", + source + ))] + Concatenating { source: arrow::error::ArrowError }, + #[snafu(display("Internal field error while converting series set: {}", source))] InternalField { source: field::Error }, #[snafu(display("Internal error finding grouping colum: {}", column_name))] FindingGroupColumn { column_name: String }, + + #[snafu(display("Sending series set results during conversion: {:?}", source))] + SendingDuringConversion { + source: Box>>, + }, + + #[snafu(display("Sending grouped series set results during conversion: {:?}", source))] + SendingDuringGroupedConversion { + source: Box>>, + }, } pub type Result = std::result::Result; @@ -63,54 +93,26 @@ impl SeriesSetConverter { tag_columns: Arc>>, field_columns: FieldColumns, it: SendableRecordBatchStream, - ) -> Result>, DataFusionError> { - assert_eq!( - tag_columns.as_ref(), - &{ - let mut tmp = tag_columns.as_ref().clone(); - tmp.sort(); - tmp - }, - "Tag column sorted", - ); - - let schema = it.schema(); - + ) -> Result, Error> { // for now, this logic only handles a single `RecordBatch` so // concat data together. // // proper streaming support tracked by: // https://github.com/influxdata/influxdb_iox/issues/4445 - let batches = collect(it).await.map_err(|e| { - DataFusionError::Context( - "Error reading record batch while converting from SeriesSet".to_string(), - Box::new(e), - ) - })?; + let batches = collect(it).await.context(ReadingSnafu)?; - let batch = compute::concat_batches(&schema, &batches).map_err(|e| { - DataFusionError::Context( - "Error concatenating record batch while converting from SeriesSet".to_string(), - Box::new(DataFusionError::ArrowError(e)), - ) - })?; - if batch.num_rows() == 0 { - return Ok(futures::stream::empty().boxed()); - } + let batch = if !batches.is_empty() { + compute::concat_batches(&batches[0].schema(), &batches).context(ConcatenatingSnafu)? + } else { + return Ok(vec![]); + }; - let tag_indexes = FieldIndexes::names_to_indexes(&schema, &tag_columns).map_err(|e| { - DataFusionError::Context( - "Internal field error while converting series set".to_string(), - Box::new(DataFusionError::External(Box::new(e))), - ) - })?; - let field_indexes = - FieldIndexes::from_field_columns(&schema, &field_columns).map_err(|e| { - DataFusionError::Context( - "Internal field error while converting series set".to_string(), - Box::new(DataFusionError::External(Box::new(e))), - ) - })?; + let schema = batch.schema(); + // TODO: check that the tag columns are sorted by tag name... + let tag_indexes = + FieldIndexes::names_to_indexes(&schema, &tag_columns).context(InternalFieldSnafu)?; + let field_indexes = FieldIndexes::from_field_columns(&schema, &field_columns) + .context(InternalFieldSnafu)?; // Algorithm: compute, via bitsets, the rows at which each // tag column changes and thereby where the tagset @@ -146,7 +148,7 @@ impl SeriesSetConverter { // call await during the loop) // emit each series - let series_sets: Vec<_> = intersections + let series_sets = intersections .into_iter() .map(|end_row| { let series_set = SeriesSet { @@ -161,7 +163,7 @@ impl SeriesSetConverter { }) .collect(); - Ok(futures::stream::iter(series_sets).map(Ok).boxed()) + Ok(series_sets) } /// returns a bitset with all row indexes where the value of the @@ -775,9 +777,6 @@ mod tests { .convert(table_name, tag_columns, field_columns, it) .await .expect("Conversion happened without error") - .try_collect() - .await - .expect("Conversion happened without error") } /// Test helper: parses the csv content into a single record batch arrow