diff --git a/src/influxdb_ioxd/rpc/flight.rs b/src/influxdb_ioxd/rpc/flight.rs index 47eacff0aa..86f1056fab 100644 --- a/src/influxdb_ioxd/rpc/flight.rs +++ b/src/influxdb_ioxd/rpc/flight.rs @@ -7,7 +7,12 @@ use snafu::{OptionExt, ResultExt, Snafu}; use tonic::{Request, Response, Streaming}; use arrow_deps::{ - arrow, + arrow::{ + self, + array::{make_array, ArrayRef, MutableArrayData}, + error::ArrowError, + record_batch::RecordBatch, + }, arrow_flight::{ self, flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, @@ -55,6 +60,9 @@ pub enum Error { #[snafu(display("Invalid database name: {}", source))] InvalidDatabaseName { source: DatabaseNameError }, + + #[snafu(display("Invalid RecordBatch: {}", source))] + InvalidRecordBatch { source: ArrowError }, } impl From for tonic::Status { @@ -78,6 +86,7 @@ impl Error { Self::Query { .. } => Status::internal(self.to_string()), Self::PlanningSQLQuery { .. } => Status::invalid_argument(self.to_string()), Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()), + Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()), } } } @@ -174,10 +183,14 @@ where let mut flights: Vec> = vec![Ok(schema_flight_data)]; let mut batches: Vec> = results + .iter() + .map(optimize_record_batch) + .collect::, Error>>()? .iter() .flat_map(|batch| { let (flight_dictionaries, flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); + flight_dictionaries .into_iter() .chain(std::iter::once(flight_batch)) @@ -243,3 +256,110 @@ where Err(tonic::Status::unimplemented("Not yet implemented")) } } + +// Some batches are small slices of the underlying arrays. +// At this stage we only know the number of rows in the record batch +// and the sizes in bytes of the backing buffers of the column arrays. +// There is no straight-forward relationship between these two quantities, +// since some columns can host variable length data such as strings. +// +// However we can apply a quick&dirty heuristic: +// if the backing buffer is two orders of magnitudes bigger +// than the number of rows in the result set, we assume +// that deep-copying the record batch is cheaper than the and transfer costs. +// +// Possible improvements: take the type of the columns into consideration +// and perhaps sample a few element sizes (taking care of not doing more work +// than to always copying the results in the first place). +// +// Or we just fix this upstream in +// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array +// into a smaller buffer while we have to copy stuff around anyway. +// +// See rationale and discussions about future improvements on +// https://github.com/influxdata/influxdb_iox/issues/1133 +fn optimize_record_batch(batch: &RecordBatch) -> Result { + let max_buf_len = batch + .columns() + .iter() + .map(|a| a.get_array_memory_size()) + .max() + .unwrap_or_default(); + + if max_buf_len > batch.num_rows() * 100 { + let limited_columns: Vec = (0..batch.num_columns()) + .map(|i| deep_clone_array(batch.column(i))) + .collect(); + + return RecordBatch::try_new(batch.schema(), limited_columns).context(InvalidRecordBatch); + } + Ok(batch.clone()) +} + +fn deep_clone_array(array: &ArrayRef) -> ArrayRef { + let mut mutable = MutableArrayData::new(vec![array.data()], false, 0); + mutable.extend(0, 0, array.len()); + + make_array(mutable.freeze()) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_deps::arrow::{ + array::UInt32Array, + datatypes::{DataType, Field, Schema}, + }; + use arrow_deps::datafusion::physical_plan::limit::truncate_batch; + use std::sync::Arc; + + #[test] + fn test_deep_clone_array() { + let mut builder = UInt32Array::builder(1000); + builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap(); + let array: ArrayRef = Arc::new(builder.finish()); + assert_eq!(array.len(), 6); + + let sliced = array.slice(0, 2); + assert_eq!(sliced.len(), 2); + + let deep_cloned = deep_clone_array(&sliced); + assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size()); + } + + #[test] + fn test_encode_flight_data() { + let options = arrow::ipc::writer::IpcWriteOptions::default(); + + let mut builder = UInt32Array::builder(1000); + builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap(); + let column: ArrayRef = Arc::new(builder.finish()); + + let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![column]) + .expect("cannot create record batch"); + + let (_, baseline_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); + + let big_batch = truncate_batch(&batch, batch.num_rows() - 1); + let optimized_big_batch = optimize_record_batch(&big_batch).expect("failed to optimize"); + let (_, optimized_big_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options); + + assert_eq!( + baseline_flight_batch.data_body.len(), + optimized_big_flight_batch.data_body.len() + ); + + let small_batch = truncate_batch(&batch, 1); + let optimized_small_batch = + optimize_record_batch(&small_batch).expect("failed to optimize"); + let (_, optimized_small_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options); + + assert!( + baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len() + ); + } +}