Merge branch 'main' into dom/buffer-tree-query

pull/24376/head
Dom 2022-11-29 15:06:41 +00:00 committed by GitHub
commit 0b1449e908
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 357 additions and 381 deletions

1
Cargo.lock generated
View File

@ -2630,6 +2630,7 @@ dependencies = [
"itertools",
"object_store",
"observability_deps",
"once_cell",
"parking_lot 0.12.1",
"parquet_file",
"predicate",

View File

@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::array::{make_array, Array, ArrayRef, DictionaryArray, MutableArrayData, StringArray};
use arrow::array::{Array, ArrayRef, DictionaryArray, StringArray};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::record_batch::RecordBatch;
@ -12,7 +12,8 @@ use crate::dictionary::StringDictionary;
/// Takes a record batch and returns a new record batch with dictionaries
/// optimized to contain no duplicate or unreferenced values
///
/// Where the input dictionaries are sorted, the output dictionaries will also be
/// Where the input dictionaries are sorted, the output dictionaries
/// will also be
pub fn optimize_dictionaries(batch: &RecordBatch) -> Result<RecordBatch> {
let schema = batch.schema();
let new_columns = batch
@ -94,59 +95,6 @@ fn optimize_dict_col(
Ok(Arc::new(new_dictionary.to_arrow(new_keys, nulls)))
}
/// Some batches are small slices of the underlying arrays.
/// At this stage we only know the number of rows in the record batch
/// and the sizes in bytes of the backing buffers of the column arrays.
/// There is no straight-forward relationship between these two quantities,
/// since some columns can host variable length data such as strings.
///
/// However we can apply a quick&dirty heuristic:
/// if the backing buffer is two orders of magnitudes bigger
/// than the number of rows in the result set, we assume
/// that deep-copying the record batch is cheaper than the and transfer costs.
///
/// Possible improvements: take the type of the columns into consideration
/// and perhaps sample a few element sizes (taking care of not doing more work
/// than to always copying the results in the first place).
///
/// Or we just fix this upstream in
/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
/// into a smaller buffer while we have to copy stuff around anyway.
///
/// See rationale and discussions about future improvements on
/// <https://github.com/influxdata/influxdb_iox/issues/1133>
pub fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch> {
let max_buf_len = batch
.columns()
.iter()
.map(|a| a.get_array_memory_size())
.max()
.unwrap_or_default();
let columns: Result<Vec<_>> = batch
.columns()
.iter()
.map(|column| {
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
hydrate_dictionary(column)
} else if max_buf_len > batch.num_rows() * 100 {
Ok(deep_clone_array(column))
} else {
Ok(Arc::clone(column))
}
})
.collect();
RecordBatch::try_new(schema, columns?)
}
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
mutable.extend(0, 0, array.len());
make_array(mutable.freeze())
}
/// Hydrates a dictionary to its underlying type
///
/// An IPC response, streaming or otherwise, defines its schema up front
@ -156,7 +104,11 @@ fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
/// This requires identifying the different dictionaries in use, assigning
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
///
/// This is tracked by #1318
/// This is tracked by <https://github.com/influxdata/influxdb_iox/issues/1318>
///
/// See also:
/// * <https://github.com/influxdata/influxdb_iox/issues/4275>
/// * <https://github.com/apache/arrow-rs/issues/1206>
///
/// For now we just hydrate the dictionaries to their underlying type
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
@ -166,9 +118,35 @@ fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
}
}
/// Prepares a RecordBatch for transport over the Arrow Flight protocol
///
/// This means:
///
/// 1. Hydrates any dictionaries to its underlying type. See
/// hydrate_dictionary for more information.
///
pub fn prepare_batch_for_flight(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch> {
let columns: Result<Vec<_>> = batch
.columns()
.iter()
.map(|column| {
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
hydrate_dictionary(column)
} else {
Ok(Arc::clone(column))
}
})
.collect();
RecordBatch::try_new(schema, columns?)
}
/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Convert dictionary types to underlying types
///
/// See hydrate_dictionary for more information
pub fn optimize_schema(schema: &Schema) -> Schema {
pub fn prepare_schema_for_flight(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
@ -436,20 +414,6 @@ mod tests {
DictionaryArray::from(data)
}
#[test]
fn test_deep_clone_array() {
let mut builder = UInt32Array::builder(1000);
builder.append_slice(&[1, 2, 3, 4, 5, 6]);
let array: ArrayRef = Arc::new(builder.finish());
assert_eq!(array.len(), 6);
let sliced = array.slice(0, 2);
assert_eq!(sliced.len(), 2);
let deep_cloned = deep_clone_array(&sliced);
assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size());
}
#[test]
fn test_encode_flight_data() {
let options = arrow::ipc::writer::IpcWriteOptions::default();
@ -464,7 +428,7 @@ mod tests {
let big_batch = batch.slice(0, batch.num_rows() - 1);
let optimized_big_batch =
optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize");
prepare_batch_for_flight(&big_batch, Arc::clone(&schema)).expect("failed to optimize");
let (_, optimized_big_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options);
@ -474,8 +438,8 @@ mod tests {
);
let small_batch = batch.slice(0, 1);
let optimized_small_batch =
optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize");
let optimized_small_batch = prepare_batch_for_flight(&small_batch, Arc::clone(&schema))
.expect("failed to optimize");
let (_, optimized_small_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options);
@ -505,9 +469,10 @@ mod tests {
.expect("cannot create record batch");
let original_schema = batch.schema();
let optimized_schema = Arc::new(optimize_schema(&original_schema));
let optimized_schema = Arc::new(prepare_schema_for_flight(&original_schema));
let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap();
let optimized_batch =
prepare_batch_for_flight(&batch, Arc::clone(&optimized_schema)).unwrap();
let (_, flight_data) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options);

View File

@ -3,7 +3,9 @@
use std::{pin::Pin, sync::Arc};
use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema, split_batch_for_grpc_response};
use arrow_util::optimize::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
};
use data_types::{NamespaceId, PartitionId, SequenceNumber, TableId};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
@ -133,7 +135,8 @@ impl IngesterQueryResponse {
.snapshots
.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema = Arc::new(optimize_schema(&snapshot.schema()));
let schema =
Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let schema_captured = Arc::clone(&schema);
let head = futures::stream::once(async {
@ -144,7 +147,8 @@ impl IngesterQueryResponse {
let tail = snapshot.flat_map(move |batch_res| match batch_res {
Ok(batch) => {
match optimize_record_batch(&batch, Arc::clone(&schema)) {
match prepare_batch_for_flight(&batch, Arc::clone(&schema))
{
Ok(batch) => futures::stream::iter(
split_batch_for_grpc_response(batch),
)

View File

@ -6,7 +6,9 @@ use arrow_flight::{
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage,
PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use arrow_util::optimize::{optimize_record_batch, optimize_schema, split_batch_for_grpc_response};
use arrow_util::optimize::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
};
use data_types::{NamespaceId, PartitionId, TableId};
use flatbuffers::FlatBufferBuilder;
use futures::{Stream, StreamExt};
@ -308,7 +310,7 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
.into_record_batch_stream()
.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema = Arc::new(optimize_schema(&snapshot.schema()));
let schema = Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let schema_captured = Arc::clone(&schema);
let head = futures::stream::once(async {
@ -317,19 +319,17 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
})
});
// TODO: these optimize calls may be redundant
//
// See: https://github.com/apache/arrow-rs/issues/208
let tail = match optimize_record_batch(&snapshot, Arc::clone(&schema)) {
Ok(batch) => {
futures::stream::iter(split_batch_for_grpc_response(batch))
.map(|batch| {
Ok(FlatIngesterQueryResponse::RecordBatch { batch })
})
.boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
};
let tail =
match prepare_batch_for_flight(&snapshot, Arc::clone(&schema)) {
Ok(batch) => {
futures::stream::iter(split_batch_for_grpc_response(batch))
.map(|batch| {
Ok(FlatIngesterQueryResponse::RecordBatch { batch })
})
.boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
};
head.chain(tail).boxed()
}

View File

@ -29,6 +29,7 @@ influxdb_influxql_parser = { path = "../influxdb_influxql_parser" }
itertools = "0.10.5"
object_store = "0.5.1"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.16.0", features = ["parking_lot"] }
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
query_functions = { path = "../query_functions"}
@ -42,4 +43,4 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }
assert_matches = "1"
assert_matches = "1"

View File

@ -166,7 +166,7 @@ impl Executor {
let inner = SessionContext::with_state(state.clone());
let exec = self.executor(executor_type).clone();
let recorder = SpanRecorder::new(state.span_ctx().child_span("Query Execution"));
IOxSessionContext::new(inner, Some(exec), recorder)
IOxSessionContext::new(inner, exec, recorder)
}
/// Create a new execution context, suitable for executing a new query or system task

View File

@ -42,8 +42,9 @@ use datafusion::{
};
use datafusion_util::config::{iox_session_config, DEFAULT_CATALOG};
use executor::DedicatedExecutor;
use futures::TryStreamExt;
use futures::{Stream, StreamExt, TryStreamExt};
use observability_deps::tracing::debug;
use once_cell::sync::Lazy;
use query_functions::selectors::register_selector_aggregates;
use std::{convert::TryInto, fmt, sync::Arc};
use trace::{
@ -218,7 +219,7 @@ impl IOxSessionConfig {
let maybe_span = self.span_ctx.child_span("Query Execution");
IOxSessionContext::new(inner, Some(self.exec), SpanRecorder::new(maybe_span))
IOxSessionContext::new(inner, self.exec, SpanRecorder::new(maybe_span))
}
}
@ -237,13 +238,13 @@ impl IOxSessionConfig {
pub struct IOxSessionContext {
inner: SessionContext,
/// Optional dedicated executor for query execution.
/// Dedicated executor for query execution.
///
/// DataFusion plans are "CPU" bound and thus can consume tokio
/// executors threads for extended periods of time. We use a
/// dedicated tokio runtime to run them so that other requests
/// can be handled.
exec: Option<DedicatedExecutor>,
exec: DedicatedExecutor,
/// Span context from which to create spans for this query
recorder: SpanRecorder,
@ -253,10 +254,16 @@ impl fmt::Debug for IOxSessionContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IOxSessionContext")
.field("inner", &"<DataFusion ExecutionContext>")
.field("exec", &self.exec)
.field("recorder", &self.recorder)
.finish()
}
}
/// [`DedicatedExecutor`] for testing purposes.
static TESTING_EXECUTOR: Lazy<DedicatedExecutor> =
Lazy::new(|| DedicatedExecutor::new("testing", 1));
impl IOxSessionContext {
/// Constructor for testing.
///
@ -265,7 +272,7 @@ impl IOxSessionContext {
pub fn with_testing() -> Self {
Self {
inner: SessionContext::default(),
exec: None,
exec: TESTING_EXECUTOR.clone(),
recorder: SpanRecorder::default(),
}
}
@ -273,7 +280,7 @@ impl IOxSessionContext {
/// Private constructor
pub(crate) fn new(
inner: SessionContext,
exec: Option<DedicatedExecutor>,
exec: DedicatedExecutor,
recorder: SpanRecorder,
) -> Self {
// attach span to DataFusion session
@ -409,14 +416,14 @@ impl IOxSessionContext {
pub async fn to_series_and_groups(
&self,
series_set_plans: SeriesSetPlans,
) -> Result<Vec<Either>> {
) -> Result<impl Stream<Item = Result<Either>>> {
let SeriesSetPlans {
mut plans,
group_columns,
} = series_set_plans;
if plans.is_empty() {
return Ok(vec![]);
return Ok(futures::stream::empty().boxed());
}
// sort plans by table (measurement) name
@ -479,17 +486,15 @@ impl IOxSessionContext {
data.extend(series);
}
}
let data = futures::stream::iter(data).map(Ok);
// If we have group columns, sort the results, and create the
// appropriate groups
if let Some(group_columns) = group_columns {
let grouper = GroupGenerator::new(group_columns);
grouper
.group(data)
.map_err(|e| Error::Execution(format!("Error forming groups: {}", e)))
Ok(grouper.group(data).await?.boxed())
} else {
let data = data.into_iter().map(|series| series.into()).collect();
Ok(data)
Ok(data.map_ok(|series| series.into()).boxed())
}
}
@ -592,13 +597,10 @@ impl IOxSessionContext {
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,
{
match &self.exec {
Some(exec) => exec
.spawn(fut)
.await
.unwrap_or_else(|e| Err(Error::Execution(format!("Join Error: {}", e)))),
None => unimplemented!("spawn onto current threadpool"),
}
self.exec
.spawn(fut)
.await
.unwrap_or_else(|e| Err(Error::Execution(format!("Join Error: {}", e))))
}
/// Returns a IOxSessionContext with a SpanRecorder that is a child of the current
@ -627,7 +629,7 @@ impl IOxSessionContext {
/// Number of currently active tasks.
pub fn tasks(&self) -> usize {
self.exec.as_ref().map(|e| e.tasks()).unwrap_or_default()
self.exec.tasks()
}
}

View File

@ -9,8 +9,12 @@ use arrow::{
datatypes::{DataType, Int32Type},
record_batch::RecordBatch,
};
use datafusion::physical_plan::{common::collect, SendableRecordBatchStream};
use datafusion::{
error::DataFusionError,
physical_plan::{common::collect, SendableRecordBatchStream},
};
use futures::{Stream, StreamExt, TryStreamExt};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
@ -269,11 +273,25 @@ impl GroupGenerator {
}
/// groups the set of `series` into SeriesOrGroups
pub fn group(&self, series: Vec<Series>) -> Result<Vec<Either>> {
///
/// TODO: make this truly stream-based
pub async fn group<S>(
&self,
series: S,
) -> Result<impl Stream<Item = Result<Either, DataFusionError>>, DataFusionError>
where
S: Stream<Item = Result<Series, DataFusionError>> + Send,
{
let mut series = series
.into_iter()
.map(|series| SortableSeries::try_new(series, &self.group_columns))
.collect::<Result<Vec<_>>>()?;
.map(|res| {
res.and_then(|series| {
SortableSeries::try_new(series, &self.group_columns)
.map_err(|e| DataFusionError::External(Box::new(e)))
})
})
.try_collect::<Vec<_>>()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// Potential optimization is to skip this sort if we are
// grouping by a prefix of the tags for a single measurement
@ -331,7 +349,7 @@ impl GroupGenerator {
output.push(series.into())
}
Ok(output)
Ok(futures::stream::iter(output).map(Ok))
}
}

View File

@ -22,12 +22,13 @@ pub async fn run_series_set_plan_maybe_error(
ctx: &IOxSessionContext,
plans: SeriesSetPlans,
) -> Result<Vec<String>, DataFusionError> {
Ok(ctx
.to_series_and_groups(plans)
use futures::TryStreamExt;
ctx.to_series_and_groups(plans)
.await?
.into_iter()
.map(|series_or_group| series_or_group.to_string())
.collect())
.map_ok(|series_or_group| series_or_group.to_string())
.try_collect()
.await
}
/// https://github.com/influxdata/influxdb_iox/issues/3635

View File

@ -6,7 +6,9 @@ use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use arrow_util::optimize::{optimize_record_batch, optimize_schema, split_batch_for_grpc_response};
use arrow_util::optimize::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
};
use bytes::{Bytes, BytesMut};
use data_types::NamespaceNameError;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
@ -386,7 +388,7 @@ impl GetStream {
let (mut tx, rx) = futures::channel::mpsc::channel::<Result<FlightData, tonic::Status>>(1);
// get schema
let schema = Arc::new(optimize_schema(&physical_plan.schema()));
let schema = Arc::new(prepare_schema_for_flight(&physical_plan.schema()));
// setup stream
let options = arrow::ipc::writer::IpcWriteOptions::default();
@ -414,7 +416,7 @@ impl GetStream {
while let Some(batch_or_err) = stream_record_batches.next().await {
match batch_or_err {
Ok(batch) => {
match optimize_record_batch(&batch, Arc::clone(&schema)) {
match prepare_batch_for_flight(&batch, Arc::clone(&schema)) {
Ok(batch) => {
for batch in split_batch_for_grpc_response(batch) {
let (flight_dictionaries, flight_batch) =

View File

@ -1,15 +1,15 @@
//! This module contains code to translate from InfluxDB IOx data
//! formats into the formats needed by gRPC
use std::{collections::BTreeSet, fmt, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};
use arrow::datatypes::DataType as ArrowDataType;
use futures::{stream::BoxStream, Stream, StreamExt};
use iox_query::exec::{
fieldlist::FieldList,
seriesset::series::{self, Either},
};
use observability_deps::tracing::trace;
use predicate::rpc_predicate::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME};
use generated_types::{
@ -18,7 +18,7 @@ use generated_types::{
frame::Data, BooleanPointsFrame, DataType, FloatPointsFrame, Frame, GroupFrame,
IntegerPointsFrame, SeriesFrame, StringPointsFrame, UnsignedPointsFrame,
},
MeasurementFieldsResponse, ReadResponse, Tag,
MeasurementFieldsResponse, Tag,
};
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
@ -75,29 +75,30 @@ pub fn tag_keys_to_byte_vecs(tag_keys: Arc<BTreeSet<String>>) -> Vec<Vec<u8>> {
/// If `tag_key_binary_format` is `true` then tag keys for measurements and
/// fields are emitted in the canonical TSM format represented by `\x00` and
/// `\xff` respectively.
pub fn series_or_groups_to_read_response(
series_or_groups: Vec<Either>,
pub fn series_or_groups_to_frames<S, E>(
series_or_groups: S,
tag_key_binary_format: bool,
) -> ReadResponse {
let mut frames = vec![];
for series_or_group in series_or_groups {
match series_or_group {
Either::Series(series) => {
series_to_frames(&mut frames, series, tag_key_binary_format);
}
Either::Group(group) => {
frames.push(group_to_frame(group));
}
}
}
trace!(frames=%DisplayableFrames::new(&frames), "Response gRPC frames");
ReadResponse { frames }
) -> impl Stream<Item = Result<Frame, E>>
where
S: Stream<Item = Result<Either, E>>,
E: Send + 'static,
{
series_or_groups.flat_map(move |res| match res {
Ok(Either::Series(series)) => series_to_frames(series, tag_key_binary_format)
.map(Ok)
.boxed() as BoxStream<'static, Result<Frame, E>>,
Ok(Either::Group(group)) => futures::stream::once(async move { Ok(group_to_frame(group)) })
.boxed() as BoxStream<'static, Result<Frame, E>>,
Err(e) => futures::stream::once(async move { Err(e) }).boxed()
as BoxStream<'static, Result<Frame, E>>,
})
}
/// Converts a `Series` into frames for GRPC transport
fn series_to_frames(frames: &mut Vec<Frame>, series: series::Series, tag_key_binary_format: bool) {
fn series_to_frames(
series: series::Series,
tag_key_binary_format: bool,
) -> impl Stream<Item = Frame> {
let series::Series { tags, data } = series;
let (data_type, data_frame) = match data {
@ -128,12 +129,14 @@ fn series_to_frames(frames: &mut Vec<Frame>, series: series::Series, tag_key_bin
data_type: data_type.into(),
});
frames.push(Frame {
data: Some(series_frame),
});
frames.push(Frame {
data: Some(data_frame),
});
futures::stream::iter([
Frame {
data: Some(series_frame),
},
Frame {
data: Some(data_frame),
},
])
}
/// Converts a [`series::Group`] into a storage gRPC `GroupFrame`
@ -215,113 +218,9 @@ fn datatype_to_measurement_field_enum(data_type: &ArrowDataType) -> Result<Field
}
}
/// Wrapper struture that implements [`std::fmt::Display`] for a slice
/// of `Frame`s
struct DisplayableFrames<'a> {
frames: &'a [Frame],
}
impl<'a> DisplayableFrames<'a> {
fn new(frames: &'a [Frame]) -> Self {
Self { frames }
}
}
impl<'a> fmt::Display for DisplayableFrames<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.frames.iter().try_for_each(|frame| {
format_frame(frame, f)?;
writeln!(f)
})
}
}
fn format_frame(frame: &Frame, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data = &frame.data;
match data {
Some(Data::Series(SeriesFrame { tags, data_type })) => write!(
f,
"SeriesFrame, tags: {}, type: {:?}",
dump_tags(tags),
data_type
),
Some(Data::FloatPoints(FloatPointsFrame { timestamps, values })) => write!(
f,
"FloatPointsFrame, timestamps: {:?}, values: {:?}",
timestamps,
dump_values(values)
),
Some(Data::IntegerPoints(IntegerPointsFrame { timestamps, values })) => write!(
f,
"IntegerPointsFrame, timestamps: {:?}, values: {:?}",
timestamps,
dump_values(values)
),
Some(Data::UnsignedPoints(UnsignedPointsFrame { timestamps, values })) => write!(
f,
"UnsignedPointsFrame, timestamps: {:?}, values: {:?}",
timestamps,
dump_values(values)
),
Some(Data::BooleanPoints(BooleanPointsFrame { timestamps, values })) => write!(
f,
"BooleanPointsFrame, timestamps: {:?}, values: {}",
timestamps,
dump_values(values)
),
Some(Data::StringPoints(StringPointsFrame { timestamps, values })) => write!(
f,
"StringPointsFrame, timestamps: {:?}, values: {}",
timestamps,
dump_values(values)
),
Some(Data::Group(GroupFrame {
tag_keys,
partition_key_vals,
})) => write!(
f,
"GroupFrame, tag_keys: {}, partition_key_vals: {}",
dump_u8_vec(tag_keys),
dump_u8_vec(partition_key_vals)
),
None => write!(f, "<NO data field>"),
}
}
fn dump_values<T>(v: &[T]) -> String
where
T: std::fmt::Display,
{
v.iter()
.map(|item| format!("{}", item))
.collect::<Vec<_>>()
.join(",")
}
fn dump_u8_vec(encoded_strings: &[Vec<u8>]) -> String {
encoded_strings
.iter()
.map(|b| String::from_utf8_lossy(b))
.collect::<Vec<_>>()
.join(",")
}
fn dump_tags(tags: &[Tag]) -> String {
tags.iter()
.map(|tag| {
format!(
"{}={}",
String::from_utf8_lossy(&tag.key),
String::from_utf8_lossy(&tag.value),
)
})
.collect::<Vec<_>>()
.join(",")
}
#[cfg(test)]
mod tests {
use std::convert::TryInto;
use std::{convert::TryInto, fmt};
use arrow::{
array::{
@ -331,6 +230,7 @@ mod tests {
datatypes::DataType as ArrowDataType,
record_batch::RecordBatch,
};
use futures::TryStreamExt;
use iox_query::exec::{
field::FieldIndexes,
fieldlist::Field,
@ -369,8 +269,8 @@ mod tests {
);
}
#[test]
fn test_series_set_conversion() {
#[tokio::test]
async fn test_series_set_conversion() {
let series_set = SeriesSet {
table_name: Arc::from("the_table"),
tags: vec![(Arc::from("tag1"), Arc::from("val1"))],
@ -385,8 +285,14 @@ mod tests {
.expect("Correctly converted series set");
let series: Vec<Either> = series.into_iter().map(|s| s.into()).collect();
let response = series_or_groups_to_read_response(series.clone(), false);
let dumped_frames = dump_frames(&response.frames);
let frames = series_or_groups_to_frames::<_, ()>(
futures::stream::iter(series.clone()).map(Ok),
false,
)
.try_collect::<Vec<_>>()
.await
.unwrap();
let dumped_frames = dump_frames(&frames);
let expected_frames = vec![
"SeriesFrame, tags: _field=string_field,_measurement=the_table,tag1=val1, type: 4",
"StringPointsFrame, timestamps: [2000, 3000], values: bar,baz",
@ -410,8 +316,12 @@ mod tests {
// Convert using binary tag key format.
//
let response = series_or_groups_to_read_response(series, true);
let dumped_frames = dump_frames(&response.frames);
let frames =
series_or_groups_to_frames::<_, ()>(futures::stream::iter(series).map(Ok), true)
.try_collect::<Vec<_>>()
.await
.unwrap();
let dumped_frames = dump_frames(&frames);
let expected_frames = vec![
"SeriesFrame, tags: \x00=the_table,tag1=val1,<2C>=string_field, type: 4",
"StringPointsFrame, timestamps: [2000, 3000], values: bar,baz",
@ -432,8 +342,8 @@ mod tests {
);
}
#[test]
fn test_group_group_conversion() {
#[tokio::test]
async fn test_group_group_conversion() {
let group = Group {
tag_keys: vec![
Arc::from("_field"),
@ -444,9 +354,15 @@ mod tests {
partition_key_vals: vec![Arc::from("val1"), Arc::from("val2")],
};
let response = series_or_groups_to_read_response(vec![group.into()], false);
let frames = series_or_groups_to_frames::<_, ()>(
futures::stream::iter([group.into()]).map(Ok),
false,
)
.try_collect::<Vec<_>>()
.await
.unwrap();
let dumped_frames = dump_frames(&response.frames);
let dumped_frames = dump_frames(&frames);
let expected_frames = vec![
"GroupFrame, tag_keys: _field,_measurement,tag1,tag2, partition_key_vals: val1,val2",
@ -575,6 +491,27 @@ mod tests {
.expect("created new record batch")
}
/// Wrapper struture that implements [`std::fmt::Display`] for a slice
/// of `Frame`s
struct DisplayableFrames<'a> {
frames: &'a [Frame],
}
impl<'a> DisplayableFrames<'a> {
fn new(frames: &'a [Frame]) -> Self {
Self { frames }
}
}
impl<'a> fmt::Display for DisplayableFrames<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.frames.iter().try_for_each(|frame| {
format_frame(frame, f)?;
writeln!(f)
})
}
}
fn dump_frames(frames: &[Frame]) -> Vec<String> {
DisplayableFrames::new(frames)
.to_string()
@ -583,4 +520,87 @@ mod tests {
.map(|s| s.to_string())
.collect()
}
fn format_frame(frame: &Frame, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data = &frame.data;
match data {
Some(Data::Series(SeriesFrame { tags, data_type })) => write!(
f,
"SeriesFrame, tags: {}, type: {:?}",
dump_tags(tags),
data_type
),
Some(Data::FloatPoints(FloatPointsFrame { timestamps, values })) => write!(
f,
"FloatPointsFrame, timestamps: {:?}, values: {:?}",
timestamps,
dump_values(values)
),
Some(Data::IntegerPoints(IntegerPointsFrame { timestamps, values })) => write!(
f,
"IntegerPointsFrame, timestamps: {:?}, values: {:?}",
timestamps,
dump_values(values)
),
Some(Data::UnsignedPoints(UnsignedPointsFrame { timestamps, values })) => write!(
f,
"UnsignedPointsFrame, timestamps: {:?}, values: {:?}",
timestamps,
dump_values(values)
),
Some(Data::BooleanPoints(BooleanPointsFrame { timestamps, values })) => write!(
f,
"BooleanPointsFrame, timestamps: {:?}, values: {}",
timestamps,
dump_values(values)
),
Some(Data::StringPoints(StringPointsFrame { timestamps, values })) => write!(
f,
"StringPointsFrame, timestamps: {:?}, values: {}",
timestamps,
dump_values(values)
),
Some(Data::Group(GroupFrame {
tag_keys,
partition_key_vals,
})) => write!(
f,
"GroupFrame, tag_keys: {}, partition_key_vals: {}",
dump_u8_vec(tag_keys),
dump_u8_vec(partition_key_vals)
),
None => write!(f, "<NO data field>"),
}
}
fn dump_values<T>(v: &[T]) -> String
where
T: std::fmt::Display,
{
v.iter()
.map(|item| format!("{}", item))
.collect::<Vec<_>>()
.join(",")
}
fn dump_u8_vec(encoded_strings: &[Vec<u8>]) -> String {
encoded_strings
.iter()
.map(|b| String::from_utf8_lossy(b))
.collect::<Vec<_>>()
.join(",")
}
fn dump_tags(tags: &[Tag]) -> String {
tags.iter()
.map(|tag| {
format!(
"{}={}",
String::from_utf8_lossy(&tag.key),
String::from_utf8_lossy(&tag.value),
)
})
.collect::<Vec<_>>()
.join(",")
}
}

View File

@ -6,7 +6,7 @@ use std::{
use futures::{ready, stream::BoxStream, Stream, StreamExt};
use generated_types::{read_response::Frame, ReadResponse};
/// Chunk given [`ReadResponse`]s -- while preserving the [`Frame`] order -- into responses that shall at max have the
/// Chunk given [`Frame`]s -- while preserving the order -- into [`ReadResponse`]s that shall at max have the
/// given `size_limit`, in bytes.
pub struct ChunkReadResponses {
inner: BoxStream<'static, Result<Frame, tonic::Status>>,
@ -24,21 +24,12 @@ impl ChunkReadResponses {
/// Panics if `size_limit` is 0.
pub fn new<S>(inner: S, size_limit: usize) -> Self
where
S: Stream<Item = Result<ReadResponse, tonic::Status>> + Send + 'static,
S: Stream<Item = Result<Frame, tonic::Status>> + Send + 'static,
{
assert!(size_limit > 0, "zero size limit");
Self {
inner: inner
.flat_map(|res| match res {
Ok(read_response) => {
futures::stream::iter(read_response.frames).map(Ok).boxed()
as BoxStream<'static, Result<Frame, tonic::Status>>
}
Err(e) => futures::stream::once(async move { Err(e) }).boxed()
as BoxStream<'static, Result<Frame, tonic::Status>>,
})
.boxed(),
inner: inner.boxed(),
size_limit,
finished: false,
frames: Vec::default(),
@ -150,30 +141,16 @@ mod tests {
vec![],
);
// no frames
assert_eq!(
ChunkReadResponses::new(
futures::stream::iter(vec![Ok(ReadResponse { frames: vec![] })]),
1
)
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![],
);
// split
assert_eq!(
ChunkReadResponses::new(
futures::stream::iter(vec![Ok(ReadResponse {
frames: vec![
frame1.clone(),
frame1.clone(),
frame2.clone(),
frame2.clone(),
frame1.clone(),
],
})]),
futures::stream::iter([
Ok(frame1.clone()),
Ok(frame1.clone()),
Ok(frame2.clone()),
Ok(frame2.clone()),
Ok(frame1.clone()),
]),
fsize1 + fsize1 + fsize2,
)
.try_collect::<Vec<_>>()
@ -189,17 +166,12 @@ mod tests {
],
);
// join
// single response
assert_eq!(
ChunkReadResponses::new(
futures::stream::iter(vec![
Ok(ReadResponse {
frames: vec![frame1.clone(), frame2.clone(),],
}),
Ok(ReadResponse {
frames: vec![frame2.clone(),],
}),
]),
futures::stream::iter(
[Ok(frame1.clone()), Ok(frame2.clone()), Ok(frame2.clone()),]
),
fsize1 + fsize2 + fsize2,
)
.try_collect::<Vec<_>>()
@ -210,22 +182,17 @@ mod tests {
},],
);
// re-arrange
// multiple responses
assert_eq!(
ChunkReadResponses::new(
futures::stream::iter(vec![
Ok(ReadResponse {
frames: vec![
frame1.clone(),
frame1.clone(),
frame2.clone(),
frame2.clone(),
frame1.clone(),
],
}),
Ok(ReadResponse {
frames: vec![frame1.clone(), frame2.clone(),],
}),
futures::stream::iter([
Ok(frame1.clone()),
Ok(frame1.clone()),
Ok(frame2.clone()),
Ok(frame2.clone()),
Ok(frame1.clone()),
Ok(frame1.clone()),
Ok(frame2.clone()),
]),
fsize1 + fsize1 + fsize2,
)
@ -259,22 +226,12 @@ mod tests {
// split
let res = ChunkReadResponses::new(
futures::stream::iter(vec![
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
Ok(frame.clone()),
Ok(frame.clone()),
Ok(frame.clone()),
Ok(frame.clone()),
Err(tonic::Status::internal("foo")),
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
Ok(frame.clone()),
]),
2 * fsize,
)
@ -319,14 +276,7 @@ mod tests {
// split
let res = ChunkReadResponses::new(
futures::stream::iter(vec![
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
Ok(ReadResponse {
frames: vec![frame.clone()],
}),
]),
futures::stream::iter([Ok(frame.clone()), Ok(frame.clone())]),
1,
)
.collect::<Vec<_>>()

View File

@ -4,8 +4,7 @@
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use crate::{
data::{
fieldlist_to_measurement_fields_response, series_or_groups_to_read_response,
tag_keys_to_byte_vecs,
fieldlist_to_measurement_fields_response, series_or_groups_to_frames, tag_keys_to_byte_vecs,
},
expr::{self, DecodedTagKey, GroupByAndAggregate, InfluxRpcPredicateBuilder, Loggable},
input::GrpcInputs,
@ -16,12 +15,13 @@ use crate::{
};
use data_types::{org_and_bucket_to_namespace, NamespaceName};
use datafusion::error::DataFusionError;
use futures::{stream::BoxStream, Stream, StreamExt};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use generated_types::{
google::protobuf::{Any as ProtoAny, Empty},
influxdata::platform::errors::InfluxDbError,
literal_or_regex::Value as RegexOrLiteralValue,
offsets_response::PartitionOffsetResponse,
read_response::Frame,
storage_server::Storage,
tag_key_predicate, CapabilitiesResponse, Capability, Int64ValuesResponse, LiteralOrRegex,
MeasurementFieldsRequest, MeasurementFieldsResponse, MeasurementNamesRequest,
@ -373,14 +373,12 @@ where
let ctx = db.new_query_context(span_ctx);
let query_completed_token = db.record_query(&ctx, "read_filter", defer_json(&req));
let results = read_filter_impl(Arc::clone(&db), db_name, req, &ctx)
let frames = read_filter_impl(Arc::clone(&db), db_name, req, &ctx)
.await?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
.map_err(|e| e.into_status());
make_response(
ChunkReadResponses::new(futures::stream::iter(results), MAX_READ_RESPONSE_SIZE),
ChunkReadResponses::new(frames, MAX_READ_RESPONSE_SIZE),
query_completed_token,
permit,
)
@ -444,7 +442,7 @@ where
let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys)
.context(ConvertingReadGroupAggregateSnafu { aggregate_string })?;
let results = query_group_impl(
let frames = query_group_impl(
Arc::clone(&db),
db_name,
range,
@ -455,12 +453,10 @@ where
)
.await
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
.map_err(|e| e.into_status());
make_response(
ChunkReadResponses::new(futures::stream::iter(results), MAX_READ_RESPONSE_SIZE),
ChunkReadResponses::new(frames, MAX_READ_RESPONSE_SIZE),
query_completed_token,
permit,
)
@ -523,7 +519,7 @@ where
let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window)
.context(ConvertingWindowAggregateSnafu { aggregate_string })?;
let results = query_group_impl(
let frames = query_group_impl(
Arc::clone(&db),
db_name,
range,
@ -534,12 +530,10 @@ where
)
.await
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
.map_err(|e| e.into_status());
make_response(
ChunkReadResponses::new(futures::stream::iter(results), MAX_READ_RESPONSE_SIZE),
ChunkReadResponses::new(frames, MAX_READ_RESPONSE_SIZE),
query_completed_token,
permit,
)
@ -1325,7 +1319,7 @@ async fn read_filter_impl<N>(
db_name: NamespaceName<'static>,
req: ReadFilterRequest,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
) -> Result<impl Stream<Item = Result<Frame, Error>>, Error>
where
N: QueryNamespace + ExecutionContextProvider + 'static,
{
@ -1352,16 +1346,25 @@ where
.context(PlanningFilteringSeriesSnafu { db_name })?;
// Execute the plans.
let db_name = db_name.to_owned();
let series_or_groups = ctx
.to_series_and_groups(series_plan)
.await
.context(FilteringSeriesSnafu { db_name })
.log_if_error("Running series set plan")?;
.context(FilteringSeriesSnafu {
db_name: db_name.clone(),
})
.log_if_error("Running series set plan")?
.map_err(move |e| Error::FilteringSeries {
db_name: db_name.clone(),
source: e,
});
let emit_tag_keys_binary_format = req.tag_key_meta_names == TagKeyMetaNames::Binary as i32;
let response = series_or_groups_to_read_response(series_or_groups, emit_tag_keys_binary_format);
Ok(vec![response])
Ok(series_or_groups_to_frames(
series_or_groups,
emit_tag_keys_binary_format,
))
}
/// Launch async tasks that send the result of executing read_group to `tx`
@ -1373,7 +1376,7 @@ async fn query_group_impl<N>(
gby_agg: GroupByAndAggregate,
tag_key_meta_names: TagKeyMetaNames,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
) -> Result<impl Stream<Item = Result<Frame, Error>>>
where
N: QueryNamespace + ExecutionContextProvider + 'static,
{
@ -1408,16 +1411,25 @@ where
// if big queries are causing a significant latency in TTFB.
// Execute the plans
let db_name = db_name.to_owned();
let series_or_groups = ctx
.to_series_and_groups(grouped_series_set_plan)
.await
.context(GroupingSeriesSnafu { db_name })
.log_if_error("Running Grouped SeriesSet Plan")?;
.context(GroupingSeriesSnafu {
db_name: db_name.clone(),
})
.log_if_error("Running Grouped SeriesSet Plan")?
.map_err(move |e| Error::FilteringSeries {
db_name: db_name.clone(),
source: e,
});
let tag_key_binary_format = tag_key_meta_names == TagKeyMetaNames::Binary;
let response = series_or_groups_to_read_response(series_or_groups, tag_key_binary_format);
Ok(vec![response])
Ok(series_or_groups_to_frames(
series_or_groups,
tag_key_binary_format,
))
}
/// Return field names, restricted via optional measurement, timestamp and