From 01315bc06346eab9117745ded3452fe6f03efc7a Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 1 Dec 2022 14:27:43 +0000 Subject: [PATCH] refactor: bring back "stream-based `SeriesSetConvert::convert` interface (#6282)" (#6301) This reverts commit 4a8bb871dcd8c4d8c45d11622abb6cbd2578efb9. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_query/src/exec/context.rs | 10 +-- iox_query/src/exec/seriesset/converter.rs | 93 ++++++++++++----------- 2 files changed, 48 insertions(+), 55 deletions(-) diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index d37cd724c0..3dd8925ed1 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -451,17 +451,9 @@ impl IOxSessionContext { let it = ctx.execute_stream(physical_plan).await?; - let series_sets = SeriesSetConverter::default() + SeriesSetConverter::default() .convert(table_name, tag_columns, field_columns, it) .await - .map_err(|e| { - Error::Execution(format!( - "Error executing series set conversion: {}", - e - )) - })?; - - Ok(futures::stream::iter(series_sets).map(|x| Ok(x) as Result<_>)) }) }) .try_flatten() diff --git a/iox_query/src/exec/seriesset/converter.rs b/iox_query/src/exec/seriesset/converter.rs index ff7bac7286..d3013c9e38 100644 --- a/iox_query/src/exec/seriesset/converter.rs +++ b/iox_query/src/exec/seriesset/converter.rs @@ -15,9 +15,8 @@ use datafusion::{ }; use futures::{Stream, StreamExt, TryStreamExt}; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{OptionExt, Snafu}; use std::sync::Arc; -use tokio::sync::mpsc::error::SendError; use crate::exec::{ field::{self, FieldColumns, FieldIndexes}, @@ -31,40 +30,11 @@ use super::{ #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Plan Execution Error: {}", source))] - Execution { - source: Box, - }, - - #[snafu(display( - "Error reading record batch while converting from SeriesSet: {:?}", - source - ))] - Reading { - source: datafusion::error::DataFusionError, - }, - - #[snafu(display( - "Error concatenating record batch while converting from SeriesSet: {:?}", - source - ))] - Concatenating { source: arrow::error::ArrowError }, - #[snafu(display("Internal field error while converting series set: {}", source))] InternalField { source: field::Error }, #[snafu(display("Internal error finding grouping colum: {}", column_name))] FindingGroupColumn { column_name: String }, - - #[snafu(display("Sending series set results during conversion: {:?}", source))] - SendingDuringConversion { - source: Box>>, - }, - - #[snafu(display("Sending grouped series set results during conversion: {:?}", source))] - SendingDuringGroupedConversion { - source: Box>>, - }, } pub type Result = std::result::Result; @@ -93,26 +63,54 @@ impl SeriesSetConverter { tag_columns: Arc>>, field_columns: FieldColumns, it: SendableRecordBatchStream, - ) -> Result, Error> { + ) -> Result>, DataFusionError> { + assert_eq!( + tag_columns.as_ref(), + &{ + let mut tmp = tag_columns.as_ref().clone(); + tmp.sort(); + tmp + }, + "Tag column sorted", + ); + + let schema = it.schema(); + // for now, this logic only handles a single `RecordBatch` so // concat data together. // // proper streaming support tracked by: // https://github.com/influxdata/influxdb_iox/issues/4445 - let batches = collect(it).await.context(ReadingSnafu)?; + let batches = collect(it).await.map_err(|e| { + DataFusionError::Context( + "Error reading record batch while converting from SeriesSet".to_string(), + Box::new(e), + ) + })?; - let batch = if !batches.is_empty() { - compute::concat_batches(&batches[0].schema(), &batches).context(ConcatenatingSnafu)? - } else { - return Ok(vec![]); - }; + let batch = compute::concat_batches(&schema, &batches).map_err(|e| { + DataFusionError::Context( + "Error concatenating record batch while converting from SeriesSet".to_string(), + Box::new(DataFusionError::ArrowError(e)), + ) + })?; + if batch.num_rows() == 0 { + return Ok(futures::stream::empty().boxed()); + } - let schema = batch.schema(); - // TODO: check that the tag columns are sorted by tag name... - let tag_indexes = - FieldIndexes::names_to_indexes(&schema, &tag_columns).context(InternalFieldSnafu)?; - let field_indexes = FieldIndexes::from_field_columns(&schema, &field_columns) - .context(InternalFieldSnafu)?; + let tag_indexes = FieldIndexes::names_to_indexes(&schema, &tag_columns).map_err(|e| { + DataFusionError::Context( + "Internal field error while converting series set".to_string(), + Box::new(DataFusionError::External(Box::new(e))), + ) + })?; + let field_indexes = + FieldIndexes::from_field_columns(&schema, &field_columns).map_err(|e| { + DataFusionError::Context( + "Internal field error while converting series set".to_string(), + Box::new(DataFusionError::External(Box::new(e))), + ) + })?; // Algorithm: compute, via bitsets, the rows at which each // tag column changes and thereby where the tagset @@ -148,7 +146,7 @@ impl SeriesSetConverter { // call await during the loop) // emit each series - let series_sets = intersections + let series_sets: Vec<_> = intersections .into_iter() .map(|end_row| { let series_set = SeriesSet { @@ -165,7 +163,7 @@ impl SeriesSetConverter { }) .collect(); - Ok(series_sets) + Ok(futures::stream::iter(series_sets).map(Ok).boxed()) } /// returns a bitset with all row indexes where the value of the @@ -757,6 +755,9 @@ mod tests { .convert(table_name, tag_columns, field_columns, it) .await .expect("Conversion happened without error") + .try_collect() + .await + .expect("Conversion happened without error") } /// Test helper: parses the csv content into a single record batch arrow