refactor: bring back "stream-based `SeriesSetConvert::convert` interface (#6282)" (#6301)

This reverts commit 4a8bb871dc.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-12-01 14:27:43 +00:00 committed by GitHub
parent 14a9bc92e9
commit 01315bc063
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 55 deletions

View File

@ -451,17 +451,9 @@ impl IOxSessionContext {
let it = ctx.execute_stream(physical_plan).await?;
let series_sets = SeriesSetConverter::default()
SeriesSetConverter::default()
.convert(table_name, tag_columns, field_columns, it)
.await
.map_err(|e| {
Error::Execution(format!(
"Error executing series set conversion: {}",
e
))
})?;
Ok(futures::stream::iter(series_sets).map(|x| Ok(x) as Result<_>))
})
})
.try_flatten()

View File

@ -15,9 +15,8 @@ use datafusion::{
};
use futures::{Stream, StreamExt, TryStreamExt};
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{OptionExt, Snafu};
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use crate::exec::{
field::{self, FieldColumns, FieldIndexes},
@ -31,40 +30,11 @@ use super::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Plan Execution Error: {}", source))]
Execution {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display(
"Error reading record batch while converting from SeriesSet: {:?}",
source
))]
Reading {
source: datafusion::error::DataFusionError,
},
#[snafu(display(
"Error concatenating record batch while converting from SeriesSet: {:?}",
source
))]
Concatenating { source: arrow::error::ArrowError },
#[snafu(display("Internal field error while converting series set: {}", source))]
InternalField { source: field::Error },
#[snafu(display("Internal error finding grouping colum: {}", column_name))]
FindingGroupColumn { column_name: String },
#[snafu(display("Sending series set results during conversion: {:?}", source))]
SendingDuringConversion {
source: Box<SendError<Result<SeriesSet>>>,
},
#[snafu(display("Sending grouped series set results during conversion: {:?}", source))]
SendingDuringGroupedConversion {
source: Box<SendError<Result<SeriesSet>>>,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -93,26 +63,54 @@ impl SeriesSetConverter {
tag_columns: Arc<Vec<Arc<str>>>,
field_columns: FieldColumns,
it: SendableRecordBatchStream,
) -> Result<Vec<SeriesSet>, Error> {
) -> Result<impl Stream<Item = Result<SeriesSet, DataFusionError>>, DataFusionError> {
assert_eq!(
tag_columns.as_ref(),
&{
let mut tmp = tag_columns.as_ref().clone();
tmp.sort();
tmp
},
"Tag column sorted",
);
let schema = it.schema();
// for now, this logic only handles a single `RecordBatch` so
// concat data together.
//
// proper streaming support tracked by:
// https://github.com/influxdata/influxdb_iox/issues/4445
let batches = collect(it).await.context(ReadingSnafu)?;
let batches = collect(it).await.map_err(|e| {
DataFusionError::Context(
"Error reading record batch while converting from SeriesSet".to_string(),
Box::new(e),
)
})?;
let batch = if !batches.is_empty() {
compute::concat_batches(&batches[0].schema(), &batches).context(ConcatenatingSnafu)?
} else {
return Ok(vec![]);
};
let batch = compute::concat_batches(&schema, &batches).map_err(|e| {
DataFusionError::Context(
"Error concatenating record batch while converting from SeriesSet".to_string(),
Box::new(DataFusionError::ArrowError(e)),
)
})?;
if batch.num_rows() == 0 {
return Ok(futures::stream::empty().boxed());
}
let schema = batch.schema();
// TODO: check that the tag columns are sorted by tag name...
let tag_indexes =
FieldIndexes::names_to_indexes(&schema, &tag_columns).context(InternalFieldSnafu)?;
let field_indexes = FieldIndexes::from_field_columns(&schema, &field_columns)
.context(InternalFieldSnafu)?;
let tag_indexes = FieldIndexes::names_to_indexes(&schema, &tag_columns).map_err(|e| {
DataFusionError::Context(
"Internal field error while converting series set".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
let field_indexes =
FieldIndexes::from_field_columns(&schema, &field_columns).map_err(|e| {
DataFusionError::Context(
"Internal field error while converting series set".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
// Algorithm: compute, via bitsets, the rows at which each
// tag column changes and thereby where the tagset
@ -148,7 +146,7 @@ impl SeriesSetConverter {
// call await during the loop)
// emit each series
let series_sets = intersections
let series_sets: Vec<_> = intersections
.into_iter()
.map(|end_row| {
let series_set = SeriesSet {
@ -165,7 +163,7 @@ impl SeriesSetConverter {
})
.collect();
Ok(series_sets)
Ok(futures::stream::iter(series_sets).map(Ok).boxed())
}
/// returns a bitset with all row indexes where the value of the
@ -757,6 +755,9 @@ mod tests {
.convert(table_name, tag_columns, field_columns, it)
.await
.expect("Conversion happened without error")
.try_collect()
.await
.expect("Conversion happened without error")
}
/// Test helper: parses the csv content into a single record batch arrow