refactor: stream-based series-set conversion (#6285)

* refactor: stream-based series-set conversion

Closes #6216.

* docs: improve

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

* refactor: improve algo docs and tests

* test: fix after rebase

* fix: broken `Series` conversion when slices are present

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-12-01 17:24:36 +00:00 committed by GitHub
parent d0f1f6a4fd
commit e2168ae859
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 837 additions and 250 deletions

View File

@ -413,8 +413,6 @@ impl IOxSessionContext {
/// Executes the SeriesSetPlans on the query executor, in
/// parallel, producing series or groups
///
/// TODO make this streaming rather than buffering the results
pub async fn to_series_and_groups(
&self,
series_set_plans: SeriesSetPlans,

File diff suppressed because it is too large Load Diff

View File

@ -5,10 +5,10 @@ use std::{convert::TryFrom, fmt, sync::Arc};
use arrow::{
array::{
ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray,
UInt64Array,
Array, ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray,
TimestampNanosecondArray, UInt64Array,
},
bitmap::Bitmap,
compute,
datatypes::DataType as ArrowDataType,
};
use predicate::rpc_predicate::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME};
@ -146,51 +146,43 @@ impl TryFrom<SeriesSet> for Vec<Series> {
impl SeriesSet {
/// Returns true if the array is entirely null between start_row and
/// start_row+num_rows
fn is_all_null(arr: &ArrayRef, start_row: usize, num_rows: usize) -> bool {
let end_row = start_row + num_rows;
(start_row..end_row).all(|i| arr.is_null(i))
fn is_all_null(arr: &ArrayRef) -> bool {
arr.null_count() == arr.len()
}
pub fn is_timestamp_all_null(&self) -> bool {
let start_row = self.start_row;
let num_rows = self.num_rows;
self.field_indexes.iter().all(|field_index| {
let array = self.batch.column(field_index.timestamp_index);
Self::is_all_null(array, start_row, num_rows)
Self::is_all_null(array)
})
}
// Convert and append the values from a single field to a Series
// appended to `frames`
fn field_to_series(&self, index: &FieldIndex) -> Result<Option<Series>> {
let batch = &self.batch;
let batch = self.batch.slice(self.start_row, self.num_rows);
let schema = batch.schema();
let field = schema.field(index.value_index);
let array = batch.column(index.value_index);
let start_row = self.start_row;
let num_rows = self.num_rows;
// No values for this field are in the array so it does not
// contribute to a series.
if field.is_nullable() && Self::is_all_null(array, start_row, num_rows) {
if field.is_nullable() && Self::is_all_null(array) {
return Ok(None);
}
let tags = self.create_frame_tags(schema.field(index.value_index).name());
// Only take timestamps (and values) from the rows that have non
// null values for this field
let valid = array.data().null_bitmap();
let timestamps = batch
.column(index.timestamp_index)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.extract_values(start_row, num_rows, valid);
let timestamps = compute::nullif(
batch.column(index.timestamp_index),
&compute::is_null(array).expect("is_null"),
)
.expect("null handling")
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.extract_values();
let data = match array.data_type() {
ArrowDataType::Utf8 => {
@ -198,7 +190,7 @@ impl SeriesSet {
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.extract_values(start_row, num_rows, valid);
.extract_values();
Data::StringPoints { timestamps, values }
}
ArrowDataType::Float64 => {
@ -206,7 +198,7 @@ impl SeriesSet {
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.extract_values(start_row, num_rows, valid);
.extract_values();
Data::FloatPoints { timestamps, values }
}
@ -215,7 +207,7 @@ impl SeriesSet {
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.extract_values(start_row, num_rows, valid);
.extract_values();
Data::IntegerPoints { timestamps, values }
}
ArrowDataType::UInt64 => {
@ -223,7 +215,7 @@ impl SeriesSet {
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.extract_values(start_row, num_rows, valid);
.extract_values();
Data::UnsignedPoints { timestamps, values }
}
ArrowDataType::Boolean => {
@ -231,7 +223,7 @@ impl SeriesSet {
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.extract_values(start_row, num_rows, valid);
.extract_values();
Data::BooleanPoints { timestamps, values }
}
_ => {
@ -345,47 +337,23 @@ fn fmt_strings(f: &mut fmt::Formatter<'_>, strings: &[Arc<str>]) -> fmt::Result
}
trait ExtractValues<T> {
/// Extracts num_rows of data starting from start_row as a vector,
/// Extracts rows as a vector,
/// for all rows `i` where `valid[i]` is set
fn extract_values(&self, start_row: usize, num_rows: usize, valid: Option<&Bitmap>) -> Vec<T>;
fn extract_values(&self) -> Vec<T>;
}
/// Implements extract_values for a particular type of array that
macro_rules! extract_values_impl {
($DATA_TYPE:ty) => {
fn extract_values(
&self,
start_row: usize,
num_rows: usize,
valid: Option<&Bitmap>,
) -> Vec<$DATA_TYPE> {
let end_row = start_row + num_rows;
match valid {
Some(valid) => (start_row..end_row)
.filter_map(|row| valid.is_set(row).then(|| self.value(row)))
.collect(),
None => (start_row..end_row).map(|row| self.value(row)).collect(),
}
fn extract_values(&self) -> Vec<$DATA_TYPE> {
self.iter().flatten().collect()
}
};
}
impl ExtractValues<String> for StringArray {
fn extract_values(
&self,
start_row: usize,
num_rows: usize,
valid: Option<&Bitmap>,
) -> Vec<String> {
let end_row = start_row + num_rows;
match valid {
Some(valid) => (start_row..end_row)
.filter_map(|row| valid.is_set(row).then(|| self.value(row).to_string()))
.collect(),
None => (start_row..end_row)
.map(|row| self.value(row).to_string())
.collect(),
}
fn extract_values(&self) -> Vec<String> {
self.iter().flatten().map(str::to_string).collect()
}
}