fix: Deep clone small results over flight

Workaround for #1133
pull/24376/head
Marko Mikulicic 2021-04-07 04:03:14 +02:00 committed by kodiakhq[bot]
parent ba2ac64f80
commit 82ed5d1708
1 changed files with 122 additions and 2 deletions

View File

@ -7,7 +7,12 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tonic::{Request, Response, Streaming};
use arrow_deps::{
arrow,
arrow::{
self,
array::{make_array, ArrayRef, MutableArrayData},
error::ArrowError,
record_batch::RecordBatch,
},
arrow_flight::{
self,
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
@ -55,6 +60,9 @@ pub enum Error {
#[snafu(display("Invalid database name: {}", source))]
InvalidDatabaseName { source: DatabaseNameError },
#[snafu(display("Invalid RecordBatch: {}", source))]
InvalidRecordBatch { source: ArrowError },
}
impl From<Error> for tonic::Status {
@ -78,6 +86,7 @@ impl Error {
Self::Query { .. } => Status::internal(self.to_string()),
Self::PlanningSQLQuery { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()),
}
}
}
@ -174,10 +183,14 @@ where
let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
let mut batches: Vec<Result<FlightData, tonic::Status>> = results
.iter()
.map(optimize_record_batch)
.collect::<Result<Vec<_>, Error>>()?
.iter()
.flat_map(|batch| {
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(batch, &options);
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
flight_dictionaries
.into_iter()
.chain(std::iter::once(flight_batch))
@ -243,3 +256,110 @@ where
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
// Some batches are small slices of the underlying arrays.
// At this stage we only know the number of rows in the record batch
// and the sizes in bytes of the backing buffers of the column arrays.
// There is no straight-forward relationship between these two quantities,
// since some columns can host variable length data such as strings.
//
// However we can apply a quick&dirty heuristic:
// if the backing buffer is two orders of magnitudes bigger
// than the number of rows in the result set, we assume
// that deep-copying the record batch is cheaper than the and transfer costs.
//
// Possible improvements: take the type of the columns into consideration
// and perhaps sample a few element sizes (taking care of not doing more work
// than to always copying the results in the first place).
//
// Or we just fix this upstream in
// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
// into a smaller buffer while we have to copy stuff around anyway.
//
// See rationale and discussions about future improvements on
// https://github.com/influxdata/influxdb_iox/issues/1133
fn optimize_record_batch(batch: &RecordBatch) -> Result<RecordBatch, Error> {
let max_buf_len = batch
.columns()
.iter()
.map(|a| a.get_array_memory_size())
.max()
.unwrap_or_default();
if max_buf_len > batch.num_rows() * 100 {
let limited_columns: Vec<ArrayRef> = (0..batch.num_columns())
.map(|i| deep_clone_array(batch.column(i)))
.collect();
return RecordBatch::try_new(batch.schema(), limited_columns).context(InvalidRecordBatch);
}
Ok(batch.clone())
}
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
mutable.extend(0, 0, array.len());
make_array(mutable.freeze())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_deps::arrow::{
array::UInt32Array,
datatypes::{DataType, Field, Schema},
};
use arrow_deps::datafusion::physical_plan::limit::truncate_batch;
use std::sync::Arc;
#[test]
fn test_deep_clone_array() {
let mut builder = UInt32Array::builder(1000);
builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap();
let array: ArrayRef = Arc::new(builder.finish());
assert_eq!(array.len(), 6);
let sliced = array.slice(0, 2);
assert_eq!(sliced.len(), 2);
let deep_cloned = deep_clone_array(&sliced);
assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size());
}
#[test]
fn test_encode_flight_data() {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let mut builder = UInt32Array::builder(1000);
builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap();
let column: ArrayRef = Arc::new(builder.finish());
let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![column])
.expect("cannot create record batch");
let (_, baseline_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
let big_batch = truncate_batch(&batch, batch.num_rows() - 1);
let optimized_big_batch = optimize_record_batch(&big_batch).expect("failed to optimize");
let (_, optimized_big_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options);
assert_eq!(
baseline_flight_batch.data_body.len(),
optimized_big_flight_batch.data_body.len()
);
let small_batch = truncate_batch(&batch, 1);
let optimized_small_batch =
optimize_record_batch(&small_batch).expect("failed to optimize");
let (_, optimized_small_flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options);
assert!(
baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len()
);
}
}