Merge pull request #6295 from influxdata/crepererum/revert_dad6dee924ef71b414e4fc3b79864e454f4f7fea

refactor: revert stream-based `SeriesSetConvert::convert` interface (#6282)
pull/24376/head
Marco Neumann 2022-12-01 12:20:46 +00:00 committed by GitHub
commit 88de327f70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 48 deletions

View File

@ -451,9 +451,17 @@ impl IOxSessionContext {
let it = ctx.execute_stream(physical_plan).await?;
SeriesSetConverter::default()
let series_sets = SeriesSetConverter::default()
.convert(table_name, tag_columns, field_columns, it)
.await
.map_err(|e| {
Error::Execution(format!(
"Error executing series set conversion: {}",
e
))
})?;
Ok(futures::stream::iter(series_sets).map(|x| Ok(x) as Result<_>))
})
})
.try_flatten()

View File

@ -15,8 +15,9 @@ use datafusion::{
};
use futures::{Stream, StreamExt, TryStreamExt};
use snafu::{OptionExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use crate::exec::{
field::{self, FieldColumns, FieldIndexes},
@ -30,11 +31,40 @@ use super::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Plan Execution Error: {}", source))]
Execution {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display(
"Error reading record batch while converting from SeriesSet: {:?}",
source
))]
Reading {
source: datafusion::error::DataFusionError,
},
#[snafu(display(
"Error concatenating record batch while converting from SeriesSet: {:?}",
source
))]
Concatenating { source: arrow::error::ArrowError },
#[snafu(display("Internal field error while converting series set: {}", source))]
InternalField { source: field::Error },
#[snafu(display("Internal error finding grouping colum: {}", column_name))]
FindingGroupColumn { column_name: String },
#[snafu(display("Sending series set results during conversion: {:?}", source))]
SendingDuringConversion {
source: Box<SendError<Result<SeriesSet>>>,
},
#[snafu(display("Sending grouped series set results during conversion: {:?}", source))]
SendingDuringGroupedConversion {
source: Box<SendError<Result<SeriesSet>>>,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -63,54 +93,26 @@ impl SeriesSetConverter {
tag_columns: Arc<Vec<Arc<str>>>,
field_columns: FieldColumns,
it: SendableRecordBatchStream,
) -> Result<impl Stream<Item = Result<SeriesSet, DataFusionError>>, DataFusionError> {
assert_eq!(
tag_columns.as_ref(),
&{
let mut tmp = tag_columns.as_ref().clone();
tmp.sort();
tmp
},
"Tag column sorted",
);
let schema = it.schema();
) -> Result<Vec<SeriesSet>, Error> {
// for now, this logic only handles a single `RecordBatch` so
// concat data together.
//
// proper streaming support tracked by:
// https://github.com/influxdata/influxdb_iox/issues/4445
let batches = collect(it).await.map_err(|e| {
DataFusionError::Context(
"Error reading record batch while converting from SeriesSet".to_string(),
Box::new(e),
)
})?;
let batches = collect(it).await.context(ReadingSnafu)?;
let batch = compute::concat_batches(&schema, &batches).map_err(|e| {
DataFusionError::Context(
"Error concatenating record batch while converting from SeriesSet".to_string(),
Box::new(DataFusionError::ArrowError(e)),
)
})?;
if batch.num_rows() == 0 {
return Ok(futures::stream::empty().boxed());
}
let batch = if !batches.is_empty() {
compute::concat_batches(&batches[0].schema(), &batches).context(ConcatenatingSnafu)?
} else {
return Ok(vec![]);
};
let tag_indexes = FieldIndexes::names_to_indexes(&schema, &tag_columns).map_err(|e| {
DataFusionError::Context(
"Internal field error while converting series set".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
let field_indexes =
FieldIndexes::from_field_columns(&schema, &field_columns).map_err(|e| {
DataFusionError::Context(
"Internal field error while converting series set".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
let schema = batch.schema();
// TODO: check that the tag columns are sorted by tag name...
let tag_indexes =
FieldIndexes::names_to_indexes(&schema, &tag_columns).context(InternalFieldSnafu)?;
let field_indexes = FieldIndexes::from_field_columns(&schema, &field_columns)
.context(InternalFieldSnafu)?;
// Algorithm: compute, via bitsets, the rows at which each
// tag column changes and thereby where the tagset
@ -146,7 +148,7 @@ impl SeriesSetConverter {
// call await during the loop)
// emit each series
let series_sets: Vec<_> = intersections
let series_sets = intersections
.into_iter()
.map(|end_row| {
let series_set = SeriesSet {
@ -161,7 +163,7 @@ impl SeriesSetConverter {
})
.collect();
Ok(futures::stream::iter(series_sets).map(Ok).boxed())
Ok(series_sets)
}
/// returns a bitset with all row indexes where the value of the
@ -775,9 +777,6 @@ mod tests {
.convert(table_name, tag_columns, field_columns, it)
.await
.expect("Conversion happened without error")
.try_collect()
.await
.expect("Conversion happened without error")
}
/// Test helper: parses the csv content into a single record batch arrow