refactor: de-duplicate low-level arrow code (#4697)
It seems that during prototyping NG we've copied low level code (w/o tests!) and never cleaned up. Let's not have this functionality twice.pull/24376/head
parent
9ddb0a816e
commit
31d1b37d73
|
@ -143,8 +143,10 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash",
|
"ahash",
|
||||||
"arrow",
|
"arrow",
|
||||||
|
"arrow-flight",
|
||||||
"chrono",
|
"chrono",
|
||||||
"comfy-table",
|
"comfy-table",
|
||||||
|
"datafusion 0.1.0",
|
||||||
"hashbrown 0.12.1",
|
"hashbrown 0.12.1",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"rand",
|
"rand",
|
||||||
|
@ -4766,6 +4768,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-flight",
|
"arrow-flight",
|
||||||
|
"arrow_util",
|
||||||
"bytes",
|
"bytes",
|
||||||
"data_types",
|
"data_types",
|
||||||
"datafusion 0.1.0",
|
"datafusion 0.1.0",
|
||||||
|
|
|
@ -11,10 +11,12 @@ arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||||
# used by arrow anyway (needed for printing workaround)
|
# used by arrow anyway (needed for printing workaround)
|
||||||
chrono = { version = "0.4", default-features = false }
|
chrono = { version = "0.4", default-features = false }
|
||||||
comfy-table = { version = "5.0", default-features = false }
|
comfy-table = { version = "5.0", default-features = false }
|
||||||
|
datafusion = { path = "../datafusion" }
|
||||||
hashbrown = "0.12"
|
hashbrown = "0.12"
|
||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
snafu = "0.7"
|
snafu = "0.7"
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
arrow-flight = "14.0.0"
|
||||||
rand = "0.8.3"
|
rand = "0.8.3"
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow::array::{Array, ArrayRef, DictionaryArray, StringArray};
|
use arrow::array::{make_array, Array, ArrayRef, DictionaryArray, MutableArrayData, StringArray};
|
||||||
use arrow::datatypes::{DataType, Int32Type};
|
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
|
||||||
use arrow::error::{ArrowError, Result};
|
use arrow::error::{ArrowError, Result};
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
@ -94,13 +94,108 @@ fn optimize_dict_col(
|
||||||
Ok(Arc::new(new_dictionary.to_arrow(new_keys, nulls)))
|
Ok(Arc::new(new_dictionary.to_arrow(new_keys, nulls)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Some batches are small slices of the underlying arrays.
|
||||||
|
/// At this stage we only know the number of rows in the record batch
|
||||||
|
/// and the sizes in bytes of the backing buffers of the column arrays.
|
||||||
|
/// There is no straight-forward relationship between these two quantities,
|
||||||
|
/// since some columns can host variable length data such as strings.
|
||||||
|
///
|
||||||
|
/// However we can apply a quick&dirty heuristic:
|
||||||
|
/// if the backing buffer is two orders of magnitudes bigger
|
||||||
|
/// than the number of rows in the result set, we assume
|
||||||
|
/// that deep-copying the record batch is cheaper than the and transfer costs.
|
||||||
|
///
|
||||||
|
/// Possible improvements: take the type of the columns into consideration
|
||||||
|
/// and perhaps sample a few element sizes (taking care of not doing more work
|
||||||
|
/// than to always copying the results in the first place).
|
||||||
|
///
|
||||||
|
/// Or we just fix this upstream in
|
||||||
|
/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
|
||||||
|
/// into a smaller buffer while we have to copy stuff around anyway.
|
||||||
|
///
|
||||||
|
/// See rationale and discussions about future improvements on
|
||||||
|
/// <https://github.com/influxdata/influxdb_iox/issues/1133>
|
||||||
|
pub fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch> {
|
||||||
|
let max_buf_len = batch
|
||||||
|
.columns()
|
||||||
|
.iter()
|
||||||
|
.map(|a| a.get_array_memory_size())
|
||||||
|
.max()
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let columns: Result<Vec<_>> = batch
|
||||||
|
.columns()
|
||||||
|
.iter()
|
||||||
|
.map(|column| {
|
||||||
|
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
|
||||||
|
hydrate_dictionary(column)
|
||||||
|
} else if max_buf_len > batch.num_rows() * 100 {
|
||||||
|
Ok(deep_clone_array(column))
|
||||||
|
} else {
|
||||||
|
Ok(Arc::clone(column))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
RecordBatch::try_new(schema, columns?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
|
||||||
|
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
|
||||||
|
mutable.extend(0, 0, array.len());
|
||||||
|
|
||||||
|
make_array(mutable.freeze())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Hydrates a dictionary to its underlying type
|
||||||
|
///
|
||||||
|
/// An IPC response, streaming or otherwise, defines its schema up front
|
||||||
|
/// which defines the mapping from dictionary IDs. It then sends these
|
||||||
|
/// dictionaries over the wire.
|
||||||
|
///
|
||||||
|
/// This requires identifying the different dictionaries in use, assigning
|
||||||
|
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
|
||||||
|
///
|
||||||
|
/// This is tracked by #1318
|
||||||
|
///
|
||||||
|
/// For now we just hydrate the dictionaries to their underlying type
|
||||||
|
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
|
||||||
|
match array.data_type() {
|
||||||
|
DataType::Dictionary(_, value) => arrow::compute::cast(array, value),
|
||||||
|
_ => unreachable!("not a dictionary"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert dictionary types to underlying types
|
||||||
|
/// See hydrate_dictionary for more information
|
||||||
|
pub fn optimize_schema(schema: &Schema) -> Schema {
|
||||||
|
let fields = schema
|
||||||
|
.fields()
|
||||||
|
.iter()
|
||||||
|
.map(|field| match field.data_type() {
|
||||||
|
DataType::Dictionary(_, value_type) => Field::new(
|
||||||
|
field.name(),
|
||||||
|
value_type.as_ref().clone(),
|
||||||
|
field.is_nullable(),
|
||||||
|
),
|
||||||
|
_ => field.clone(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Schema::new(fields)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate as arrow_util;
|
use crate as arrow_util;
|
||||||
use crate::assert_batches_eq;
|
use crate::assert_batches_eq;
|
||||||
use arrow::array::{ArrayDataBuilder, DictionaryArray, Float64Array, Int32Array, StringArray};
|
use arrow::array::{
|
||||||
|
ArrayDataBuilder, DictionaryArray, Float64Array, Int32Array, StringArray, UInt32Array,
|
||||||
|
};
|
||||||
use arrow::compute::concat;
|
use arrow::compute::concat;
|
||||||
|
use arrow_flight::utils::flight_data_to_arrow_batch;
|
||||||
|
use datafusion::physical_plan::limit::truncate_batch;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -302,4 +397,107 @@ mod tests {
|
||||||
|
|
||||||
DictionaryArray::from(data)
|
DictionaryArray::from(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deep_clone_array() {
|
||||||
|
let mut builder = UInt32Array::builder(1000);
|
||||||
|
builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap();
|
||||||
|
let array: ArrayRef = Arc::new(builder.finish());
|
||||||
|
assert_eq!(array.len(), 6);
|
||||||
|
|
||||||
|
let sliced = array.slice(0, 2);
|
||||||
|
assert_eq!(sliced.len(), 2);
|
||||||
|
|
||||||
|
let deep_cloned = deep_clone_array(&sliced);
|
||||||
|
assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_encode_flight_data() {
|
||||||
|
let options = arrow::ipc::writer::IpcWriteOptions::default();
|
||||||
|
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
|
||||||
|
.expect("cannot create record batch");
|
||||||
|
let schema = batch.schema();
|
||||||
|
|
||||||
|
let (_, baseline_flight_batch) =
|
||||||
|
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
|
||||||
|
|
||||||
|
let big_batch = truncate_batch(&batch, batch.num_rows() - 1);
|
||||||
|
let optimized_big_batch =
|
||||||
|
optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize");
|
||||||
|
let (_, optimized_big_flight_batch) =
|
||||||
|
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
baseline_flight_batch.data_body.len(),
|
||||||
|
optimized_big_flight_batch.data_body.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
let small_batch = truncate_batch(&batch, 1);
|
||||||
|
let optimized_small_batch =
|
||||||
|
optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize");
|
||||||
|
let (_, optimized_small_flight_batch) =
|
||||||
|
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_encode_flight_data_dictionary() {
|
||||||
|
let options = arrow::ipc::writer::IpcWriteOptions::default();
|
||||||
|
|
||||||
|
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
|
||||||
|
let c2: DictionaryArray<Int32Type> = vec![
|
||||||
|
Some("foo"),
|
||||||
|
Some("bar"),
|
||||||
|
None,
|
||||||
|
Some("fiz"),
|
||||||
|
None,
|
||||||
|
Some("foo"),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let batch =
|
||||||
|
RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))])
|
||||||
|
.expect("cannot create record batch");
|
||||||
|
|
||||||
|
let original_schema = batch.schema();
|
||||||
|
let optimized_schema = Arc::new(optimize_schema(&original_schema));
|
||||||
|
|
||||||
|
let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap();
|
||||||
|
|
||||||
|
let (_, flight_data) =
|
||||||
|
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options);
|
||||||
|
|
||||||
|
let dictionary_by_id = std::collections::HashMap::new();
|
||||||
|
let batch = flight_data_to_arrow_batch(
|
||||||
|
&flight_data,
|
||||||
|
Arc::clone(&optimized_schema),
|
||||||
|
&dictionary_by_id,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Should hydrate string dictionary for transport
|
||||||
|
assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8);
|
||||||
|
let array = batch
|
||||||
|
.column(1)
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<StringArray>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let expected = StringArray::from(vec![
|
||||||
|
Some("foo"),
|
||||||
|
Some("bar"),
|
||||||
|
None,
|
||||||
|
Some("fiz"),
|
||||||
|
None,
|
||||||
|
Some("foo"),
|
||||||
|
]);
|
||||||
|
assert_eq!(array, &expected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,17 +1,13 @@
|
||||||
//! gRPC service implementations for `ingester`.
|
//! gRPC service implementations for `ingester`.
|
||||||
|
|
||||||
use crate::{data::IngesterQueryResponse, handler::IngestHandler};
|
use crate::{data::IngesterQueryResponse, handler::IngestHandler};
|
||||||
use arrow::{
|
use arrow::error::ArrowError;
|
||||||
array::{make_array, ArrayRef, MutableArrayData},
|
|
||||||
datatypes::{DataType, Field, Schema, SchemaRef},
|
|
||||||
error::ArrowError,
|
|
||||||
record_batch::RecordBatch,
|
|
||||||
};
|
|
||||||
use arrow_flight::{
|
use arrow_flight::{
|
||||||
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
|
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
|
||||||
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
|
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
|
||||||
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
||||||
};
|
};
|
||||||
|
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
|
||||||
use futures::{SinkExt, Stream, StreamExt};
|
use futures::{SinkExt, Stream, StreamExt};
|
||||||
use generated_types::influxdata::iox::ingester::v1::{
|
use generated_types::influxdata::iox::ingester::v1::{
|
||||||
self as proto,
|
self as proto,
|
||||||
|
@ -122,8 +118,8 @@ impl WriteInfoService for WriteInfoServiceImpl {
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
#[allow(missing_docs)]
|
#[allow(missing_docs)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[snafu(display("Failed to hydrate dictionary: {}", source))]
|
#[snafu(display("Failed to optimize record batch: {}", source))]
|
||||||
Dictionary { source: ArrowError },
|
Optimize { source: ArrowError },
|
||||||
|
|
||||||
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
|
#[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))]
|
||||||
InvalidTicket {
|
InvalidTicket {
|
||||||
|
@ -136,9 +132,6 @@ pub enum Error {
|
||||||
source: generated_types::google::FieldViolation,
|
source: generated_types::google::FieldViolation,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Invalid RecordBatch: {}", source))]
|
|
||||||
InvalidRecordBatch { source: ArrowError },
|
|
||||||
|
|
||||||
#[snafu(display("Error while performing query: {}", source))]
|
#[snafu(display("Error while performing query: {}", source))]
|
||||||
Query {
|
Query {
|
||||||
source: Box<crate::querier_handler::Error>,
|
source: Box<crate::querier_handler::Error>,
|
||||||
|
@ -183,10 +176,9 @@ impl From<Error> for tonic::Status {
|
||||||
// development
|
// development
|
||||||
info!(?err, msg)
|
info!(?err, msg)
|
||||||
}
|
}
|
||||||
Error::Dictionary { .. }
|
Error::Optimize { .. } | Error::QueryStream { .. } | Error::Serialization { .. } => {
|
||||||
| Error::InvalidRecordBatch { .. }
|
warn!(?err, msg)
|
||||||
| Error::QueryStream { .. }
|
}
|
||||||
| Error::Serialization { .. } => warn!(?err, msg),
|
|
||||||
}
|
}
|
||||||
err.to_status()
|
err.to_status()
|
||||||
}
|
}
|
||||||
|
@ -201,8 +193,7 @@ impl Error {
|
||||||
Status::invalid_argument(self.to_string())
|
Status::invalid_argument(self.to_string())
|
||||||
}
|
}
|
||||||
Self::Query { .. }
|
Self::Query { .. }
|
||||||
| Self::InvalidRecordBatch { .. }
|
| Self::Optimize { .. }
|
||||||
| Self::Dictionary { .. }
|
|
||||||
| Self::QueryStream { .. }
|
| Self::QueryStream { .. }
|
||||||
| Self::Serialization { .. } => Status::internal(self.to_string()),
|
| Self::Serialization { .. } => Status::internal(self.to_string()),
|
||||||
Self::NamespaceNotFound { .. } | Self::TableNotFound { .. } => {
|
Self::NamespaceNotFound { .. } | Self::TableNotFound { .. } => {
|
||||||
|
@ -441,7 +432,9 @@ impl GetStream {
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// failure sending here is OK because we're cutting the stream anyways
|
// failure sending here is OK because we're cutting the stream anyways
|
||||||
tx.send(Err(e.into())).await.ok();
|
tx.send(Err(Error::Optimize { source: e }.into()))
|
||||||
|
.await
|
||||||
|
.ok();
|
||||||
|
|
||||||
// end stream
|
// end stream
|
||||||
return;
|
return;
|
||||||
|
@ -501,96 +494,3 @@ impl Stream for GetStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Some batches are small slices of the underlying arrays.
|
|
||||||
/// At this stage we only know the number of rows in the record batch
|
|
||||||
/// and the sizes in bytes of the backing buffers of the column arrays.
|
|
||||||
/// There is no straight-forward relationship between these two quantities,
|
|
||||||
/// since some columns can host variable length data such as strings.
|
|
||||||
///
|
|
||||||
/// However we can apply a quick&dirty heuristic:
|
|
||||||
/// if the backing buffer is two orders of magnitudes bigger
|
|
||||||
/// than the number of rows in the result set, we assume
|
|
||||||
/// that deep-copying the record batch is cheaper than the and transfer costs.
|
|
||||||
///
|
|
||||||
/// Possible improvements: take the type of the columns into consideration
|
|
||||||
/// and perhaps sample a few element sizes (taking care of not doing more work
|
|
||||||
/// than to always copying the results in the first place).
|
|
||||||
///
|
|
||||||
/// Or we just fix this upstream in
|
|
||||||
/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
|
|
||||||
/// into a smaller buffer while we have to copy stuff around anyway.
|
|
||||||
///
|
|
||||||
/// See rationale and discussions about future improvements on
|
|
||||||
/// <https://github.com/influxdata/influxdb_iox/issues/1133>
|
|
||||||
fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch, Error> {
|
|
||||||
let max_buf_len = batch
|
|
||||||
.columns()
|
|
||||||
.iter()
|
|
||||||
.map(|a| a.get_array_memory_size())
|
|
||||||
.max()
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
let columns: Result<Vec<_>, _> = batch
|
|
||||||
.columns()
|
|
||||||
.iter()
|
|
||||||
.map(|column| {
|
|
||||||
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
|
|
||||||
hydrate_dictionary(column)
|
|
||||||
} else if max_buf_len > batch.num_rows() * 100 {
|
|
||||||
Ok(deep_clone_array(column))
|
|
||||||
} else {
|
|
||||||
Ok(Arc::clone(column))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
|
|
||||||
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
|
|
||||||
mutable.extend(0, 0, array.len());
|
|
||||||
|
|
||||||
make_array(mutable.freeze())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convert dictionary types to underlying types
|
|
||||||
/// See hydrate_dictionary for more information
|
|
||||||
fn optimize_schema(schema: &Schema) -> Schema {
|
|
||||||
let fields = schema
|
|
||||||
.fields()
|
|
||||||
.iter()
|
|
||||||
.map(|field| match field.data_type() {
|
|
||||||
DataType::Dictionary(_, value_type) => Field::new(
|
|
||||||
field.name(),
|
|
||||||
value_type.as_ref().clone(),
|
|
||||||
field.is_nullable(),
|
|
||||||
),
|
|
||||||
_ => field.clone(),
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Schema::new(fields)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Hydrates a dictionary to its underlying type
|
|
||||||
///
|
|
||||||
/// An IPC response, streaming or otherwise, defines its schema up front
|
|
||||||
/// which defines the mapping from dictionary IDs. It then sends these
|
|
||||||
/// dictionaries over the wire.
|
|
||||||
///
|
|
||||||
/// This requires identifying the different dictionaries in use, assigning
|
|
||||||
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
|
|
||||||
///
|
|
||||||
/// This is tracked by #1318
|
|
||||||
///
|
|
||||||
/// For now we just hydrate the dictionaries to their underlying type
|
|
||||||
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
|
|
||||||
match array.data_type() {
|
|
||||||
DataType::Dictionary(_, value) => {
|
|
||||||
arrow::compute::cast(array, value).context(DictionarySnafu)
|
|
||||||
}
|
|
||||||
_ => unreachable!("not a dictionary"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
# Workspace dependencies, in alphabetical order
|
# Workspace dependencies, in alphabetical order
|
||||||
|
arrow_util = { path = "../arrow_util" }
|
||||||
data_types = { path = "../data_types" }
|
data_types = { path = "../data_types" }
|
||||||
datafusion = { path = "../datafusion" }
|
datafusion = { path = "../datafusion" }
|
||||||
generated_types = { path = "../generated_types" }
|
generated_types = { path = "../generated_types" }
|
||||||
|
|
|
@ -1,16 +1,12 @@
|
||||||
//! Implements the native gRPC IOx query API using Arrow Flight
|
//! Implements the native gRPC IOx query API using Arrow Flight
|
||||||
|
|
||||||
use arrow::{
|
use arrow::error::ArrowError;
|
||||||
array::{make_array, ArrayRef, MutableArrayData},
|
|
||||||
datatypes::{DataType, Field, Schema, SchemaRef},
|
|
||||||
error::ArrowError,
|
|
||||||
record_batch::RecordBatch,
|
|
||||||
};
|
|
||||||
use arrow_flight::{
|
use arrow_flight::{
|
||||||
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
|
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
|
||||||
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
|
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
|
||||||
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
|
||||||
};
|
};
|
||||||
|
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use data_types::{DatabaseName, DatabaseNameError};
|
use data_types::{DatabaseName, DatabaseNameError};
|
||||||
use datafusion::physical_plan::ExecutionPlan;
|
use datafusion::physical_plan::ExecutionPlan;
|
||||||
|
@ -61,11 +57,8 @@ pub enum Error {
|
||||||
#[snafu(display("Invalid database name: {}", source))]
|
#[snafu(display("Invalid database name: {}", source))]
|
||||||
InvalidDatabaseName { source: DatabaseNameError },
|
InvalidDatabaseName { source: DatabaseNameError },
|
||||||
|
|
||||||
#[snafu(display("Invalid RecordBatch: {}", source))]
|
#[snafu(display("Failed to optimize record batch: {}", source))]
|
||||||
InvalidRecordBatch { source: ArrowError },
|
Optimize { source: ArrowError },
|
||||||
|
|
||||||
#[snafu(display("Failed to hydrate dictionary: {}", source))]
|
|
||||||
DictionaryError { source: ArrowError },
|
|
||||||
|
|
||||||
#[snafu(display("Error while planning query: {}", source))]
|
#[snafu(display("Error while planning query: {}", source))]
|
||||||
Planning {
|
Planning {
|
||||||
|
@ -92,8 +85,7 @@ impl From<Error> for tonic::Status {
|
||||||
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
|
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
|
||||||
| Error::InvalidDatabaseName { .. } => info!(?err, msg),
|
| Error::InvalidDatabaseName { .. } => info!(?err, msg),
|
||||||
Error::Query { .. } => info!(?err, msg),
|
Error::Query { .. } => info!(?err, msg),
|
||||||
Error::DictionaryError { .. }
|
Error::Optimize { .. }
|
||||||
| Error::InvalidRecordBatch { .. }
|
|
||||||
| Error::Planning { .. } | Error::Serialization { .. } => warn!(?err, msg),
|
| Error::Planning { .. } | Error::Serialization { .. } => warn!(?err, msg),
|
||||||
}
|
}
|
||||||
err.to_status()
|
err.to_status()
|
||||||
|
@ -112,12 +104,11 @@ impl Error {
|
||||||
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
|
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
|
||||||
Self::Query { .. } => Status::internal(self.to_string()),
|
Self::Query { .. } => Status::internal(self.to_string()),
|
||||||
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
|
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
|
||||||
Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()),
|
|
||||||
Self::Planning {
|
Self::Planning {
|
||||||
source: service_common::planner::Error::External(_),
|
source: service_common::planner::Error::External(_),
|
||||||
} => Status::internal(self.to_string()),
|
} => Status::internal(self.to_string()),
|
||||||
Self::Planning { .. } => Status::invalid_argument(self.to_string()),
|
Self::Planning { .. } => Status::invalid_argument(self.to_string()),
|
||||||
Self::DictionaryError { .. } => Status::internal(self.to_string()),
|
Self::Optimize { .. } => Status::internal(self.to_string()),
|
||||||
Self::Serialization { .. } => Status::internal(self.to_string()),
|
Self::Serialization { .. } => Status::internal(self.to_string()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -358,7 +349,9 @@ impl GetStream {
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// failure sending here is OK because we're cutting the stream anyways
|
// failure sending here is OK because we're cutting the stream anyways
|
||||||
tx.send(Err(e.into())).await.ok();
|
tx.send(Err(Error::Optimize { source: e }.into()))
|
||||||
|
.await
|
||||||
|
.ok();
|
||||||
|
|
||||||
// end stream
|
// end stream
|
||||||
return;
|
return;
|
||||||
|
@ -425,216 +418,3 @@ impl Stream for GetStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Some batches are small slices of the underlying arrays.
|
|
||||||
/// At this stage we only know the number of rows in the record batch
|
|
||||||
/// and the sizes in bytes of the backing buffers of the column arrays.
|
|
||||||
/// There is no straight-forward relationship between these two quantities,
|
|
||||||
/// since some columns can host variable length data such as strings.
|
|
||||||
///
|
|
||||||
/// However we can apply a quick&dirty heuristic:
|
|
||||||
/// if the backing buffer is two orders of magnitudes bigger
|
|
||||||
/// than the number of rows in the result set, we assume
|
|
||||||
/// that deep-copying the record batch is cheaper than the and transfer costs.
|
|
||||||
///
|
|
||||||
/// Possible improvements: take the type of the columns into consideration
|
|
||||||
/// and perhaps sample a few element sizes (taking care of not doing more work
|
|
||||||
/// than to always copying the results in the first place).
|
|
||||||
///
|
|
||||||
/// Or we just fix this upstream in
|
|
||||||
/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array
|
|
||||||
/// into a smaller buffer while we have to copy stuff around anyway.
|
|
||||||
///
|
|
||||||
/// See rationale and discussions about future improvements on
|
|
||||||
/// <https://github.com/influxdata/influxdb_iox/issues/1133>
|
|
||||||
fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result<RecordBatch, Error> {
|
|
||||||
let max_buf_len = batch
|
|
||||||
.columns()
|
|
||||||
.iter()
|
|
||||||
.map(|a| a.get_array_memory_size())
|
|
||||||
.max()
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
let columns: Result<Vec<_>, _> = batch
|
|
||||||
.columns()
|
|
||||||
.iter()
|
|
||||||
.map(|column| {
|
|
||||||
if matches!(column.data_type(), DataType::Dictionary(_, _)) {
|
|
||||||
hydrate_dictionary(column)
|
|
||||||
} else if max_buf_len > batch.num_rows() * 100 {
|
|
||||||
Ok(deep_clone_array(column))
|
|
||||||
} else {
|
|
||||||
Ok(Arc::clone(column))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deep_clone_array(array: &ArrayRef) -> ArrayRef {
|
|
||||||
let mut mutable = MutableArrayData::new(vec![array.data()], false, 0);
|
|
||||||
mutable.extend(0, 0, array.len());
|
|
||||||
|
|
||||||
make_array(mutable.freeze())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Convert dictionary types to underlying types
|
|
||||||
/// See hydrate_dictionary for more information
|
|
||||||
fn optimize_schema(schema: &Schema) -> Schema {
|
|
||||||
let fields = schema
|
|
||||||
.fields()
|
|
||||||
.iter()
|
|
||||||
.map(|field| match field.data_type() {
|
|
||||||
DataType::Dictionary(_, value_type) => Field::new(
|
|
||||||
field.name(),
|
|
||||||
value_type.as_ref().clone(),
|
|
||||||
field.is_nullable(),
|
|
||||||
),
|
|
||||||
_ => field.clone(),
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Schema::new(fields)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Hydrates a dictionary to its underlying type
|
|
||||||
///
|
|
||||||
/// An IPC response, streaming or otherwise, defines its schema up front
|
|
||||||
/// which defines the mapping from dictionary IDs. It then sends these
|
|
||||||
/// dictionaries over the wire.
|
|
||||||
///
|
|
||||||
/// This requires identifying the different dictionaries in use, assigning
|
|
||||||
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
|
|
||||||
///
|
|
||||||
/// This is tracked by #1318
|
|
||||||
///
|
|
||||||
/// For now we just hydrate the dictionaries to their underlying type
|
|
||||||
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
|
|
||||||
match array.data_type() {
|
|
||||||
DataType::Dictionary(_, value) => {
|
|
||||||
arrow::compute::cast(array, value).context(DictionarySnafu)
|
|
||||||
}
|
|
||||||
_ => unreachable!("not a dictionary"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use arrow::array::StringArray;
|
|
||||||
use arrow::{
|
|
||||||
array::{DictionaryArray, UInt32Array},
|
|
||||||
datatypes::{DataType, Int32Type},
|
|
||||||
};
|
|
||||||
use arrow_flight::utils::flight_data_to_arrow_batch;
|
|
||||||
|
|
||||||
use datafusion::physical_plan::limit::truncate_batch;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_deep_clone_array() {
|
|
||||||
let mut builder = UInt32Array::builder(1000);
|
|
||||||
builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap();
|
|
||||||
let array: ArrayRef = Arc::new(builder.finish());
|
|
||||||
assert_eq!(array.len(), 6);
|
|
||||||
|
|
||||||
let sliced = array.slice(0, 2);
|
|
||||||
assert_eq!(sliced.len(), 2);
|
|
||||||
|
|
||||||
let deep_cloned = deep_clone_array(&sliced);
|
|
||||||
assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_encode_flight_data() {
|
|
||||||
let options = arrow::ipc::writer::IpcWriteOptions::default();
|
|
||||||
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
|
|
||||||
|
|
||||||
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
|
|
||||||
.expect("cannot create record batch");
|
|
||||||
let schema = batch.schema();
|
|
||||||
|
|
||||||
let (_, baseline_flight_batch) =
|
|
||||||
arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
|
|
||||||
|
|
||||||
let big_batch = truncate_batch(&batch, batch.num_rows() - 1);
|
|
||||||
let optimized_big_batch =
|
|
||||||
optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize");
|
|
||||||
let (_, optimized_big_flight_batch) =
|
|
||||||
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
baseline_flight_batch.data_body.len(),
|
|
||||||
optimized_big_flight_batch.data_body.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
let small_batch = truncate_batch(&batch, 1);
|
|
||||||
let optimized_small_batch =
|
|
||||||
optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize");
|
|
||||||
let (_, optimized_small_flight_batch) =
|
|
||||||
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options);
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_encode_flight_data_dictionary() {
|
|
||||||
let options = arrow::ipc::writer::IpcWriteOptions::default();
|
|
||||||
|
|
||||||
let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
|
|
||||||
let c2: DictionaryArray<Int32Type> = vec![
|
|
||||||
Some("foo"),
|
|
||||||
Some("bar"),
|
|
||||||
None,
|
|
||||||
Some("fiz"),
|
|
||||||
None,
|
|
||||||
Some("foo"),
|
|
||||||
]
|
|
||||||
.into_iter()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let batch =
|
|
||||||
RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))])
|
|
||||||
.expect("cannot create record batch");
|
|
||||||
|
|
||||||
let original_schema = batch.schema();
|
|
||||||
let optimized_schema = Arc::new(optimize_schema(&original_schema));
|
|
||||||
|
|
||||||
let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap();
|
|
||||||
|
|
||||||
let (_, flight_data) =
|
|
||||||
arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options);
|
|
||||||
|
|
||||||
let dictionary_by_id = HashMap::new();
|
|
||||||
let batch = flight_data_to_arrow_batch(
|
|
||||||
&flight_data,
|
|
||||||
Arc::clone(&optimized_schema),
|
|
||||||
&dictionary_by_id,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Should hydrate string dictionary for transport
|
|
||||||
assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8);
|
|
||||||
let array = batch
|
|
||||||
.column(1)
|
|
||||||
.as_any()
|
|
||||||
.downcast_ref::<StringArray>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let expected = StringArray::from(vec![
|
|
||||||
Some("foo"),
|
|
||||||
Some("bar"),
|
|
||||||
None,
|
|
||||||
Some("fiz"),
|
|
||||||
None,
|
|
||||||
Some("foo"),
|
|
||||||
]);
|
|
||||||
assert_eq!(array, &expected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue