perf: avoid using channels for query execution

Pre-sized channels get full when the results to send over them are larger than the capacities. This causes significant runtime overhead and slows down query performance.

This commit removes the intermediate channels. The potential downside to this approach is there may be more buffering which could increase memory usage during query and also block a thread for longer periods of time.
pull/24376/head
Edd Robinson 2021-04-28 11:53:42 +01:00 committed by kodiakhq[bot]
parent d6c0f40630
commit a9ef604ef6
4 changed files with 119 additions and 170 deletions

View File

@ -10,7 +10,7 @@ pub mod seriesset;
pub mod stringset;
mod task;
pub use context::{DEFAULT_CATALOG, DEFAULT_SCHEMA};
use futures::Future;
use futures::{future, Future};
use std::sync::Arc;
@ -26,7 +26,7 @@ use schema_pivot::SchemaPivotNode;
use fieldlist::{FieldList, IntoFieldList};
use seriesset::{Error as SeriesSetError, SeriesSetConverter, SeriesSetItem};
use stringset::{IntoStringSet, StringSetRef};
use tokio::sync::mpsc::{self, error::SendError};
use tokio::sync::mpsc::error::SendError;
use snafu::{ResultExt, Snafu};
@ -125,31 +125,22 @@ impl Executor {
}
}
/// Executes the embedded plans, each as separate tasks, sending
/// the resulting `SeriesSet`s one by one to the `tx` channel.
/// Executes the embedded plans, each as separate tasks combining the results
/// into the returned collection of items.
///
/// The SeriesSets are guaranteed to come back ordered by table_name
///
/// Note that the returned future resolves (e.g. "returns") once
/// all plans have been sent to `tx`. This means that the future
/// will not resolve if there is nothing hooked up receiving
/// results from the other end of the channel and the channel
/// can't hold all the resulting series.
/// The SeriesSets are guaranteed to come back ordered by table_name.
pub async fn to_series_set(
&self,
series_set_plans: SeriesSetPlans,
tx: mpsc::Sender<Result<SeriesSetItem, SeriesSetError>>,
) -> Result<()> {
) -> Result<Vec<SeriesSetItem>, Error> {
let SeriesSetPlans { mut plans } = series_set_plans;
if plans.is_empty() {
return Ok(());
return Ok(vec![]);
}
// sort by table name and send the results to separate
// channels
// sort plans by table name
plans.sort_by(|a, b| a.table_name.cmp(&b.table_name));
let mut rx_channels = Vec::new(); // sorted by table names
// Run the plans in parallel
let handles = plans
@ -157,8 +148,6 @@ impl Executor {
.map(|plan| {
// TODO run these on some executor other than the main tokio pool (maybe?)
let ctx = self.new_context();
let (plan_tx, plan_rx) = mpsc::channel(1);
rx_channels.push(plan_rx);
self.exec.spawn(async move {
let SeriesSetPlan {
@ -180,7 +169,7 @@ impl Executor {
.await
.context(SeriesSetExecution)?;
SeriesSetConverter::new(plan_tx)
SeriesSetConverter::default()
.convert(
table_name,
tag_columns,
@ -189,30 +178,22 @@ impl Executor {
it,
)
.await
.context(SeriesSetConversion)?;
Ok(())
.context(SeriesSetConversion)
})
})
.collect::<Vec<_>>();
// transfer data from the rx streams in order
for mut rx in rx_channels {
while let Some(r) = rx.recv().await {
tx.send(r)
.await
.map_err(|e| Error::SendingDuringConversion {
source: Box::new(e),
})?
}
// join_all ensures that the results are consumed in the same order they
// were spawned maintaining the guarantee to return results ordered
// by the plan sort order.
// let handles = future::join_all(handles).await;
let handles = future::try_join_all(handles).await.context(TaskJoinError)?;
let mut results = vec![];
for handle in handles {
results.extend(handle?.into_iter());
}
// now, wait for all the values to resolve so we can report
// any errors
for join_handle in handles {
join_handle.await.context(TaskJoinError)??;
}
Ok(())
Ok(results)
}
/// Executes `plan` and return the resulting FieldList

View File

@ -31,7 +31,7 @@ use arrow_deps::{
datafusion::physical_plan::SendableRecordBatchStream,
};
use snafu::{ResultExt, Snafu};
use tokio::sync::mpsc::{self, error::SendError};
use tokio::sync::mpsc::error::SendError;
use tokio_stream::StreamExt;
use croaring::bitmap::Bitmap;
@ -117,20 +117,13 @@ pub enum SeriesSetItem {
Data(SeriesSet),
}
// Handles converting record batches into SeriesSets, and sending them
// to tx
#[derive(Debug)]
pub struct SeriesSetConverter {
tx: mpsc::Sender<Result<SeriesSetItem>>,
}
// Handles converting record batches into SeriesSets
#[derive(Debug, Default)]
pub struct SeriesSetConverter {}
impl SeriesSetConverter {
pub fn new(tx: mpsc::Sender<Result<SeriesSetItem>>) -> Self {
Self { tx }
}
/// Convert the results from running a DataFusion plan into the
/// appropriate SeriesSetItems, sending them self.tx
/// appropriate SeriesSetItems.
///
/// The results must be in the logical format described in this
/// module's documentation (i.e. ordered by tag keys)
@ -146,44 +139,15 @@ impl SeriesSetConverter {
///
/// it: record batch iterator that produces data in the desired order
pub async fn convert(
&mut self,
table_name: Arc<String>,
tag_columns: Arc<Vec<Arc<String>>>,
field_columns: FieldColumns,
num_prefix_tag_group_columns: Option<usize>,
it: SendableRecordBatchStream,
) -> Result<()> {
// Make sure that any error that results from processing is sent along
if let Err(e) = self
.convert_impl(
table_name,
tag_columns,
field_columns,
num_prefix_tag_group_columns,
it,
)
.await
{
self.tx
.send(Err(e))
.await
.map_err(|e| Error::SendingDuringConversion {
source: Box::new(e),
})?
}
Ok(())
}
/// Does the actual conversion, returning any error in processing
pub async fn convert_impl(
&mut self,
table_name: Arc<String>,
tag_columns: Arc<Vec<Arc<String>>>,
field_columns: FieldColumns,
num_prefix_tag_group_columns: Option<usize>,
mut it: SendableRecordBatchStream,
) -> Result<()> {
) -> Result<Vec<SeriesSetItem>, Error> {
let mut group_generator = GroupGenerator::new(num_prefix_tag_group_columns);
let mut results = vec![];
// for now, only handle a single record batch
if let Some(batch) = it.next().await {
@ -255,25 +219,15 @@ impl SeriesSetConverter {
})
.collect::<Vec<_>>();
results.reserve(series_sets.len());
for series_set in series_sets {
if let Some(group_desc) = group_generator.next_series(&series_set) {
self.tx
.send(Ok(SeriesSetItem::GroupStart(group_desc)))
.await
.map_err(|e| Error::SendingDuringGroupedConversion {
source: Box::new(e),
})?;
results.push(SeriesSetItem::GroupStart(group_desc));
}
self.tx
.send(Ok(SeriesSetItem::Data(series_set)))
.await
.map_err(|e| Error::SendingDuringConversion {
source: Box::new(e),
})?;
results.push(SeriesSetItem::Data(series_set));
}
}
Ok(())
Ok(results)
}
/// returns a bitset with all row indexes where the value of the
@ -494,7 +448,7 @@ mod tests {
let results = convert(table_name, &tag_columns, &field_columns, input).await;
assert_eq!(results.len(), 1);
let series_set = results[0].as_ref().expect("Correctly converted");
let series_set = &results[0];
assert_eq!(*series_set.table_name, "foo");
assert!(series_set.tags.is_empty());
@ -548,7 +502,7 @@ mod tests {
let results = convert(table_name, &tag_columns, &field_columns, input).await;
assert_eq!(results.len(), 1);
let series_set = results[0].as_ref().expect("Correctly converted");
let series_set = &results[0];
assert_eq!(*series_set.table_name, "foo");
assert!(series_set.tags.is_empty());
@ -602,7 +556,7 @@ mod tests {
let results = convert(table_name, &tag_columns, &field_columns, input).await;
assert_eq!(results.len(), 1);
let series_set = results[0].as_ref().expect("Correctly converted");
let series_set = &results[0];
assert_eq!(*series_set.table_name, "bar");
assert_eq!(series_set.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
@ -639,7 +593,7 @@ mod tests {
let results = convert(table_name, &tag_columns, &field_columns, input).await;
assert_eq!(results.len(), 2);
let series_set1 = results[0].as_ref().expect("Correctly converted");
let series_set1 = &results[0];
assert_eq!(*series_set1.table_name, "foo");
assert_eq!(series_set1.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
@ -650,7 +604,7 @@ mod tests {
assert_eq!(series_set1.start_row, 0);
assert_eq!(series_set1.num_rows, 3);
let series_set2 = results[1].as_ref().expect("Correctly converted");
let series_set2 = &results[1];
assert_eq!(*series_set2.table_name, "foo");
assert_eq!(series_set2.tags, str_pair_vec_to_vec(&[("tag_a", "two")]));
@ -688,7 +642,7 @@ mod tests {
let results = convert(table_name, &tag_columns, &field_columns, input).await;
assert_eq!(results.len(), 3);
let series_set1 = results[0].as_ref().expect("Correctly converted");
let series_set1 = &results[0];
assert_eq!(*series_set1.table_name, "foo");
assert_eq!(
@ -698,7 +652,7 @@ mod tests {
assert_eq!(series_set1.start_row, 0);
assert_eq!(series_set1.num_rows, 2);
let series_set2 = results[1].as_ref().expect("Correctly converted");
let series_set2 = &results[1];
assert_eq!(*series_set2.table_name, "foo");
assert_eq!(
@ -708,7 +662,7 @@ mod tests {
assert_eq!(series_set2.start_row, 2);
assert_eq!(series_set2.num_rows, 1);
let series_set3 = results[2].as_ref().expect("Correctly converted");
let series_set3 = &results[2];
assert_eq!(*series_set3.table_name, "foo");
assert_eq!(
@ -757,11 +711,11 @@ mod tests {
assert_eq!(results.len(), 5, "results were\n{:#?}", results); // 3 series, two groups (one and two)
let group_1 = extract_group(results[0].as_ref().expect("correctly made group"));
let series_set1 = extract_series_set(results[1].as_ref().expect("Correctly converted"));
let series_set2 = extract_series_set(results[2].as_ref().expect("Correctly converted"));
let group_2 = extract_group(results[3].as_ref().expect("correctly made group"));
let series_set3 = extract_series_set(results[4].as_ref().expect("Correctly converted"));
let group_1 = extract_group(&results[0]);
let series_set1 = extract_series_set(&results[1]);
let series_set2 = extract_series_set(&results[2]);
let group_2 = extract_group(&results[3]);
let series_set3 = extract_series_set(&results[4]);
assert_eq!(group_1.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
@ -820,9 +774,9 @@ mod tests {
assert_eq!(results.len(), 3, "results were\n{:#?}", results); // 3 series, two groups (one and two)
let group_1 = extract_group(results[0].as_ref().expect("correctly made group"));
let series_set1 = extract_series_set(results[1].as_ref().expect("Correctly converted"));
let series_set2 = extract_series_set(results[2].as_ref().expect("Correctly converted"));
let group_1 = extract_group(&results[0]);
let series_set1 = extract_series_set(&results[1]);
let series_set2 = extract_series_set(&results[2]);
assert_eq!(group_1.tags, &[]);
@ -863,34 +817,38 @@ mod tests {
tag_columns: &'a [&'a str],
field_columns: &'a [&'a str],
it: SendableRecordBatchStream,
) -> Vec<Result<SeriesSet>> {
let (tx, mut rx) = mpsc::channel(1);
let mut converter = SeriesSetConverter::new(tx);
) -> Vec<SeriesSet> {
let mut converter = SeriesSetConverter::default();
let table_name = Arc::new(table_name.into());
let tag_columns = str_vec_to_arc_vec(tag_columns);
let field_columns = FieldColumns::from(field_columns);
tokio::task::spawn(async move {
converter
.convert(table_name, tag_columns, field_columns, None, it)
.await
.expect("Conversion happened without error")
});
let mut results = Vec::new();
while let Some(r) = rx.recv().await {
results.push(r.map(|item| {
.expect("Conversion happened without error").into_iter().map(|item|{
if let SeriesSetItem::Data(series_set) = item {
series_set
}
else {
panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item)
}
})
);
}
results
}).collect::<Vec<_>>()
// let mut results = Vec::new();
// while let Some(r) = rx.recv().await {
// results.push(r.map(|item| {
// if let SeriesSetItem::Data(series_set) = item {
// series_set
// }
// else {
// panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item)
// }
// })
// );
// }
// results
}
/// Test helper: run conversion to groups and return a Vec
@ -900,15 +858,13 @@ mod tests {
num_prefix_tag_group_columns: usize,
field_columns: &'a [&'a str],
it: SendableRecordBatchStream,
) -> Vec<Result<SeriesSetItem>> {
let (tx, mut rx) = mpsc::channel(1);
let mut converter = SeriesSetConverter::new(tx);
) -> Vec<SeriesSetItem> {
let mut converter = SeriesSetConverter::default();
let table_name = Arc::new(table_name.into());
let tag_columns = str_vec_to_arc_vec(tag_columns);
let field_columns = FieldColumns::from(field_columns);
tokio::task::spawn(async move {
converter
.convert(
table_name,
@ -919,13 +875,23 @@ mod tests {
)
.await
.expect("Conversion happened without error")
});
let mut results = Vec::new();
while let Some(r) = rx.recv().await {
results.push(r)
}
results
// converter
// .convert(
// table_name,
// tag_columns,
// field_columns,
// Some(num_prefix_tag_group_columns),
// it,
// )
// .await
// .expect("Conversion happened without error")
// let mut results = Vec::new();
// while let Some(r) = rx.recv().await {
// results.push(r)
// }
// results
}
/// Test helper: parses the csv content into a single record batch arrow

View File

@ -1,5 +1,7 @@
//! Tests for the Influx gRPC queries
#[cfg(test)]
use super::util::run_series_set_plan;
use crate::query_tests::scenarios::*;
use arrow_deps::datafusion::logical_plan::{col, lit};
use async_trait::async_trait;

View File

@ -50,29 +50,29 @@ pub fn dump_series_set(s: SeriesSet) -> Vec<String> {
}
/// Run a series set plan to completion and produce a Vec<String> representation
///
/// # Panics
///
/// Panics if there is an error executing a plan, or if unexpected series set
/// items are returned.
#[cfg(test)]
pub async fn run_series_set_plan(executor: Arc<Executor>, plans: SeriesSetPlans) -> Vec<String> {
// Use a channel sufficiently large to buffer the series
let (tx, mut rx) = mpsc::channel(100);
executor
.to_series_set(plans, tx)
.await
.expect("Running series set plan");
let results = executor.to_series_set(plans).await;
// gather up the sets and compare them
let mut results = vec![];
while let Some(r) = rx.recv().await {
let item = r.expect("unexpected error in execution");
let item = if let SeriesSetItem::Data(series_set) = item {
let mut results = results
.unwrap()
.into_iter()
.map(|item| {
if let SeriesSetItem::Data(series_set) = item {
series_set
} else {
panic!(
"Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}",
item
)
};
results.push(item);
}
})
.collect::<Vec<_>>();
// sort the results so that we can reliably compare
results.sort_by(|r1, r2| {