From 31d1b37d73fdaabf2ea1e057908f40644bf93f68 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 25 May 2022 18:24:28 +0200 Subject: [PATCH] refactor: de-duplicate low-level arrow code (#4697) It seems that during prototyping NG we've copied low level code (w/o tests!) and never cleaned up. Let's not have this functionality twice. --- Cargo.lock | 3 + arrow_util/Cargo.toml | 2 + arrow_util/src/optimize.rs | 204 +++++++++++++++++++++++++++- ingester/src/server/grpc.rs | 122 ++--------------- service_grpc_flight/Cargo.toml | 1 + service_grpc_flight/src/lib.rs | 238 ++------------------------------- 6 files changed, 227 insertions(+), 343 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b616670a9b..33708b49c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,8 +143,10 @@ version = "0.1.0" dependencies = [ "ahash", "arrow", + "arrow-flight", "chrono", "comfy-table", + "datafusion 0.1.0", "hashbrown 0.12.1", "num-traits", "rand", @@ -4766,6 +4768,7 @@ version = "0.1.0" dependencies = [ "arrow", "arrow-flight", + "arrow_util", "bytes", "data_types", "datafusion 0.1.0", diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index e143c4beef..d9bff9cdea 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -11,10 +11,12 @@ arrow = { version = "14.0.0", features = ["prettyprint"] } # used by arrow anyway (needed for printing workaround) chrono = { version = "0.4", default-features = false } comfy-table = { version = "5.0", default-features = false } +datafusion = { path = "../datafusion" } hashbrown = "0.12" num-traits = "0.2" snafu = "0.7" workspace-hack = { path = "../workspace-hack"} [dev-dependencies] +arrow-flight = "14.0.0" rand = "0.8.3" diff --git a/arrow_util/src/optimize.rs b/arrow_util/src/optimize.rs index 827886d8d3..9b4acfa382 100644 --- a/arrow_util/src/optimize.rs +++ b/arrow_util/src/optimize.rs @@ -1,8 +1,8 @@ use std::collections::BTreeSet; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, DictionaryArray, StringArray}; -use arrow::datatypes::{DataType, Int32Type}; +use arrow::array::{make_array, Array, ArrayRef, DictionaryArray, MutableArrayData, StringArray}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; use arrow::error::{ArrowError, Result}; use arrow::record_batch::RecordBatch; use hashbrown::HashMap; @@ -94,13 +94,108 @@ fn optimize_dict_col( Ok(Arc::new(new_dictionary.to_arrow(new_keys, nulls))) } +/// Some batches are small slices of the underlying arrays. +/// At this stage we only know the number of rows in the record batch +/// and the sizes in bytes of the backing buffers of the column arrays. +/// There is no straight-forward relationship between these two quantities, +/// since some columns can host variable length data such as strings. +/// +/// However we can apply a quick&dirty heuristic: +/// if the backing buffer is two orders of magnitudes bigger +/// than the number of rows in the result set, we assume +/// that deep-copying the record batch is cheaper than the and transfer costs. +/// +/// Possible improvements: take the type of the columns into consideration +/// and perhaps sample a few element sizes (taking care of not doing more work +/// than to always copying the results in the first place). +/// +/// Or we just fix this upstream in +/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array +/// into a smaller buffer while we have to copy stuff around anyway. +/// +/// See rationale and discussions about future improvements on +/// +pub fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result { + let max_buf_len = batch + .columns() + .iter() + .map(|a| a.get_array_memory_size()) + .max() + .unwrap_or_default(); + + let columns: Result> = batch + .columns() + .iter() + .map(|column| { + if matches!(column.data_type(), DataType::Dictionary(_, _)) { + hydrate_dictionary(column) + } else if max_buf_len > batch.num_rows() * 100 { + Ok(deep_clone_array(column)) + } else { + Ok(Arc::clone(column)) + } + }) + .collect(); + + RecordBatch::try_new(schema, columns?) +} + +fn deep_clone_array(array: &ArrayRef) -> ArrayRef { + let mut mutable = MutableArrayData::new(vec![array.data()], false, 0); + mutable.extend(0, 0, array.len()); + + make_array(mutable.freeze()) +} + +/// Hydrates a dictionary to its underlying type +/// +/// An IPC response, streaming or otherwise, defines its schema up front +/// which defines the mapping from dictionary IDs. It then sends these +/// dictionaries over the wire. +/// +/// This requires identifying the different dictionaries in use, assigning +/// them IDs, and sending new dictionaries, delta or otherwise, when needed +/// +/// This is tracked by #1318 +/// +/// For now we just hydrate the dictionaries to their underlying type +fn hydrate_dictionary(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Dictionary(_, value) => arrow::compute::cast(array, value), + _ => unreachable!("not a dictionary"), + } +} + +/// Convert dictionary types to underlying types +/// See hydrate_dictionary for more information +pub fn optimize_schema(schema: &Schema) -> Schema { + let fields = schema + .fields() + .iter() + .map(|field| match field.data_type() { + DataType::Dictionary(_, value_type) => Field::new( + field.name(), + value_type.as_ref().clone(), + field.is_nullable(), + ), + _ => field.clone(), + }) + .collect(); + + Schema::new(fields) +} + #[cfg(test)] mod tests { use super::*; use crate as arrow_util; use crate::assert_batches_eq; - use arrow::array::{ArrayDataBuilder, DictionaryArray, Float64Array, Int32Array, StringArray}; + use arrow::array::{ + ArrayDataBuilder, DictionaryArray, Float64Array, Int32Array, StringArray, UInt32Array, + }; use arrow::compute::concat; + use arrow_flight::utils::flight_data_to_arrow_batch; + use datafusion::physical_plan::limit::truncate_batch; use std::iter::FromIterator; #[test] @@ -302,4 +397,107 @@ mod tests { DictionaryArray::from(data) } + + #[test] + fn test_deep_clone_array() { + let mut builder = UInt32Array::builder(1000); + builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap(); + let array: ArrayRef = Arc::new(builder.finish()); + assert_eq!(array.len(), 6); + + let sliced = array.slice(0, 2); + assert_eq!(sliced.len(), 2); + + let deep_cloned = deep_clone_array(&sliced); + assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size()); + } + + #[test] + fn test_encode_flight_data() { + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); + + let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)]) + .expect("cannot create record batch"); + let schema = batch.schema(); + + let (_, baseline_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); + + let big_batch = truncate_batch(&batch, batch.num_rows() - 1); + let optimized_big_batch = + optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize"); + let (_, optimized_big_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options); + + assert_eq!( + baseline_flight_batch.data_body.len(), + optimized_big_flight_batch.data_body.len() + ); + + let small_batch = truncate_batch(&batch, 1); + let optimized_small_batch = + optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize"); + let (_, optimized_small_flight_batch) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options); + + assert!( + baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len() + ); + } + + #[test] + fn test_encode_flight_data_dictionary() { + let options = arrow::ipc::writer::IpcWriteOptions::default(); + + let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); + let c2: DictionaryArray = vec![ + Some("foo"), + Some("bar"), + None, + Some("fiz"), + None, + Some("foo"), + ] + .into_iter() + .collect(); + + let batch = + RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))]) + .expect("cannot create record batch"); + + let original_schema = batch.schema(); + let optimized_schema = Arc::new(optimize_schema(&original_schema)); + + let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap(); + + let (_, flight_data) = + arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options); + + let dictionary_by_id = std::collections::HashMap::new(); + let batch = flight_data_to_arrow_batch( + &flight_data, + Arc::clone(&optimized_schema), + &dictionary_by_id, + ) + .unwrap(); + + // Should hydrate string dictionary for transport + assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8); + let array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let expected = StringArray::from(vec![ + Some("foo"), + Some("bar"), + None, + Some("fiz"), + None, + Some("foo"), + ]); + assert_eq!(array, &expected) + } } diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index 290bdb337c..21cefdb12e 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -1,17 +1,13 @@ //! gRPC service implementations for `ingester`. use crate::{data::IngesterQueryResponse, handler::IngestHandler}; -use arrow::{ - array::{make_array, ArrayRef, MutableArrayData}, - datatypes::{DataType, Field, Schema, SchemaRef}, - error::ArrowError, - record_batch::RecordBatch, -}; +use arrow::error::ArrowError; use arrow_flight::{ flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; +use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use futures::{SinkExt, Stream, StreamExt}; use generated_types::influxdata::iox::ingester::v1::{ self as proto, @@ -122,8 +118,8 @@ impl WriteInfoService for WriteInfoServiceImpl { #[derive(Debug, Snafu)] #[allow(missing_docs)] pub enum Error { - #[snafu(display("Failed to hydrate dictionary: {}", source))] - Dictionary { source: ArrowError }, + #[snafu(display("Failed to optimize record batch: {}", source))] + Optimize { source: ArrowError }, #[snafu(display("Invalid ticket. Error: {:?} Ticket: {:?}", source, ticket))] InvalidTicket { @@ -136,9 +132,6 @@ pub enum Error { source: generated_types::google::FieldViolation, }, - #[snafu(display("Invalid RecordBatch: {}", source))] - InvalidRecordBatch { source: ArrowError }, - #[snafu(display("Error while performing query: {}", source))] Query { source: Box, @@ -183,10 +176,9 @@ impl From for tonic::Status { // development info!(?err, msg) } - Error::Dictionary { .. } - | Error::InvalidRecordBatch { .. } - | Error::QueryStream { .. } - | Error::Serialization { .. } => warn!(?err, msg), + Error::Optimize { .. } | Error::QueryStream { .. } | Error::Serialization { .. } => { + warn!(?err, msg) + } } err.to_status() } @@ -201,8 +193,7 @@ impl Error { Status::invalid_argument(self.to_string()) } Self::Query { .. } - | Self::InvalidRecordBatch { .. } - | Self::Dictionary { .. } + | Self::Optimize { .. } | Self::QueryStream { .. } | Self::Serialization { .. } => Status::internal(self.to_string()), Self::NamespaceNotFound { .. } | Self::TableNotFound { .. } => { @@ -441,7 +432,9 @@ impl GetStream { } Err(e) => { // failure sending here is OK because we're cutting the stream anyways - tx.send(Err(e.into())).await.ok(); + tx.send(Err(Error::Optimize { source: e }.into())) + .await + .ok(); // end stream return; @@ -501,96 +494,3 @@ impl Stream for GetStream { } } } - -/// Some batches are small slices of the underlying arrays. -/// At this stage we only know the number of rows in the record batch -/// and the sizes in bytes of the backing buffers of the column arrays. -/// There is no straight-forward relationship between these two quantities, -/// since some columns can host variable length data such as strings. -/// -/// However we can apply a quick&dirty heuristic: -/// if the backing buffer is two orders of magnitudes bigger -/// than the number of rows in the result set, we assume -/// that deep-copying the record batch is cheaper than the and transfer costs. -/// -/// Possible improvements: take the type of the columns into consideration -/// and perhaps sample a few element sizes (taking care of not doing more work -/// than to always copying the results in the first place). -/// -/// Or we just fix this upstream in -/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array -/// into a smaller buffer while we have to copy stuff around anyway. -/// -/// See rationale and discussions about future improvements on -/// -fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result { - let max_buf_len = batch - .columns() - .iter() - .map(|a| a.get_array_memory_size()) - .max() - .unwrap_or_default(); - - let columns: Result, _> = batch - .columns() - .iter() - .map(|column| { - if matches!(column.data_type(), DataType::Dictionary(_, _)) { - hydrate_dictionary(column) - } else if max_buf_len > batch.num_rows() * 100 { - Ok(deep_clone_array(column)) - } else { - Ok(Arc::clone(column)) - } - }) - .collect(); - - RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu) -} - -fn deep_clone_array(array: &ArrayRef) -> ArrayRef { - let mut mutable = MutableArrayData::new(vec![array.data()], false, 0); - mutable.extend(0, 0, array.len()); - - make_array(mutable.freeze()) -} - -/// Convert dictionary types to underlying types -/// See hydrate_dictionary for more information -fn optimize_schema(schema: &Schema) -> Schema { - let fields = schema - .fields() - .iter() - .map(|field| match field.data_type() { - DataType::Dictionary(_, value_type) => Field::new( - field.name(), - value_type.as_ref().clone(), - field.is_nullable(), - ), - _ => field.clone(), - }) - .collect(); - - Schema::new(fields) -} - -/// Hydrates a dictionary to its underlying type -/// -/// An IPC response, streaming or otherwise, defines its schema up front -/// which defines the mapping from dictionary IDs. It then sends these -/// dictionaries over the wire. -/// -/// This requires identifying the different dictionaries in use, assigning -/// them IDs, and sending new dictionaries, delta or otherwise, when needed -/// -/// This is tracked by #1318 -/// -/// For now we just hydrate the dictionaries to their underlying type -fn hydrate_dictionary(array: &ArrayRef) -> Result { - match array.data_type() { - DataType::Dictionary(_, value) => { - arrow::compute::cast(array, value).context(DictionarySnafu) - } - _ => unreachable!("not a dictionary"), - } -} diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index 9d6bccadab..106c9e491f 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] # Workspace dependencies, in alphabetical order +arrow_util = { path = "../arrow_util" } data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } generated_types = { path = "../generated_types" } diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index f7fdd047f7..eaa46b97e2 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -1,16 +1,12 @@ //! Implements the native gRPC IOx query API using Arrow Flight -use arrow::{ - array::{make_array, ArrayRef, MutableArrayData}, - datatypes::{DataType, Field, Schema, SchemaRef}, - error::ArrowError, - record_batch::RecordBatch, -}; +use arrow::error::ArrowError; use arrow_flight::{ flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; +use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use bytes::{Bytes, BytesMut}; use data_types::{DatabaseName, DatabaseNameError}; use datafusion::physical_plan::ExecutionPlan; @@ -61,11 +57,8 @@ pub enum Error { #[snafu(display("Invalid database name: {}", source))] InvalidDatabaseName { source: DatabaseNameError }, - #[snafu(display("Invalid RecordBatch: {}", source))] - InvalidRecordBatch { source: ArrowError }, - - #[snafu(display("Failed to hydrate dictionary: {}", source))] - DictionaryError { source: ArrowError }, + #[snafu(display("Failed to optimize record batch: {}", source))] + Optimize { source: ArrowError }, #[snafu(display("Error while planning query: {}", source))] Planning { @@ -92,8 +85,7 @@ impl From for tonic::Status { // TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development | Error::InvalidDatabaseName { .. } => info!(?err, msg), Error::Query { .. } => info!(?err, msg), - Error::DictionaryError { .. } - | Error::InvalidRecordBatch { .. } + Error::Optimize { .. } | Error::Planning { .. } | Error::Serialization { .. } => warn!(?err, msg), } err.to_status() @@ -112,12 +104,11 @@ impl Error { Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), Self::Query { .. } => Status::internal(self.to_string()), Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()), - Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()), Self::Planning { source: service_common::planner::Error::External(_), } => Status::internal(self.to_string()), Self::Planning { .. } => Status::invalid_argument(self.to_string()), - Self::DictionaryError { .. } => Status::internal(self.to_string()), + Self::Optimize { .. } => Status::internal(self.to_string()), Self::Serialization { .. } => Status::internal(self.to_string()), } } @@ -358,7 +349,9 @@ impl GetStream { } Err(e) => { // failure sending here is OK because we're cutting the stream anyways - tx.send(Err(e.into())).await.ok(); + tx.send(Err(Error::Optimize { source: e }.into())) + .await + .ok(); // end stream return; @@ -425,216 +418,3 @@ impl Stream for GetStream { } } } - -/// Some batches are small slices of the underlying arrays. -/// At this stage we only know the number of rows in the record batch -/// and the sizes in bytes of the backing buffers of the column arrays. -/// There is no straight-forward relationship between these two quantities, -/// since some columns can host variable length data such as strings. -/// -/// However we can apply a quick&dirty heuristic: -/// if the backing buffer is two orders of magnitudes bigger -/// than the number of rows in the result set, we assume -/// that deep-copying the record batch is cheaper than the and transfer costs. -/// -/// Possible improvements: take the type of the columns into consideration -/// and perhaps sample a few element sizes (taking care of not doing more work -/// than to always copying the results in the first place). -/// -/// Or we just fix this upstream in -/// arrow_flight::utils::flight_data_from_arrow_batch and re-encode the array -/// into a smaller buffer while we have to copy stuff around anyway. -/// -/// See rationale and discussions about future improvements on -/// -fn optimize_record_batch(batch: &RecordBatch, schema: SchemaRef) -> Result { - let max_buf_len = batch - .columns() - .iter() - .map(|a| a.get_array_memory_size()) - .max() - .unwrap_or_default(); - - let columns: Result, _> = batch - .columns() - .iter() - .map(|column| { - if matches!(column.data_type(), DataType::Dictionary(_, _)) { - hydrate_dictionary(column) - } else if max_buf_len > batch.num_rows() * 100 { - Ok(deep_clone_array(column)) - } else { - Ok(Arc::clone(column)) - } - }) - .collect(); - - RecordBatch::try_new(schema, columns?).context(InvalidRecordBatchSnafu) -} - -fn deep_clone_array(array: &ArrayRef) -> ArrayRef { - let mut mutable = MutableArrayData::new(vec![array.data()], false, 0); - mutable.extend(0, 0, array.len()); - - make_array(mutable.freeze()) -} - -/// Convert dictionary types to underlying types -/// See hydrate_dictionary for more information -fn optimize_schema(schema: &Schema) -> Schema { - let fields = schema - .fields() - .iter() - .map(|field| match field.data_type() { - DataType::Dictionary(_, value_type) => Field::new( - field.name(), - value_type.as_ref().clone(), - field.is_nullable(), - ), - _ => field.clone(), - }) - .collect(); - - Schema::new(fields) -} - -/// Hydrates a dictionary to its underlying type -/// -/// An IPC response, streaming or otherwise, defines its schema up front -/// which defines the mapping from dictionary IDs. It then sends these -/// dictionaries over the wire. -/// -/// This requires identifying the different dictionaries in use, assigning -/// them IDs, and sending new dictionaries, delta or otherwise, when needed -/// -/// This is tracked by #1318 -/// -/// For now we just hydrate the dictionaries to their underlying type -fn hydrate_dictionary(array: &ArrayRef) -> Result { - match array.data_type() { - DataType::Dictionary(_, value) => { - arrow::compute::cast(array, value).context(DictionarySnafu) - } - _ => unreachable!("not a dictionary"), - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use arrow::array::StringArray; - use arrow::{ - array::{DictionaryArray, UInt32Array}, - datatypes::{DataType, Int32Type}, - }; - use arrow_flight::utils::flight_data_to_arrow_batch; - - use datafusion::physical_plan::limit::truncate_batch; - - use super::*; - - #[test] - fn test_deep_clone_array() { - let mut builder = UInt32Array::builder(1000); - builder.append_slice(&[1, 2, 3, 4, 5, 6]).unwrap(); - let array: ArrayRef = Arc::new(builder.finish()); - assert_eq!(array.len(), 6); - - let sliced = array.slice(0, 2); - assert_eq!(sliced.len(), 2); - - let deep_cloned = deep_clone_array(&sliced); - assert!(sliced.data().get_array_memory_size() > deep_cloned.data().get_array_memory_size()); - } - - #[test] - fn test_encode_flight_data() { - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); - - let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)]) - .expect("cannot create record batch"); - let schema = batch.schema(); - - let (_, baseline_flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); - - let big_batch = truncate_batch(&batch, batch.num_rows() - 1); - let optimized_big_batch = - optimize_record_batch(&big_batch, Arc::clone(&schema)).expect("failed to optimize"); - let (_, optimized_big_flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(&optimized_big_batch, &options); - - assert_eq!( - baseline_flight_batch.data_body.len(), - optimized_big_flight_batch.data_body.len() - ); - - let small_batch = truncate_batch(&batch, 1); - let optimized_small_batch = - optimize_record_batch(&small_batch, Arc::clone(&schema)).expect("failed to optimize"); - let (_, optimized_small_flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(&optimized_small_batch, &options); - - assert!( - baseline_flight_batch.data_body.len() > optimized_small_flight_batch.data_body.len() - ); - } - - #[test] - fn test_encode_flight_data_dictionary() { - let options = arrow::ipc::writer::IpcWriteOptions::default(); - - let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); - let c2: DictionaryArray = vec![ - Some("foo"), - Some("bar"), - None, - Some("fiz"), - None, - Some("foo"), - ] - .into_iter() - .collect(); - - let batch = - RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef), ("b", Arc::new(c2))]) - .expect("cannot create record batch"); - - let original_schema = batch.schema(); - let optimized_schema = Arc::new(optimize_schema(&original_schema)); - - let optimized_batch = optimize_record_batch(&batch, Arc::clone(&optimized_schema)).unwrap(); - - let (_, flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(&optimized_batch, &options); - - let dictionary_by_id = HashMap::new(); - let batch = flight_data_to_arrow_batch( - &flight_data, - Arc::clone(&optimized_schema), - &dictionary_by_id, - ) - .unwrap(); - - // Should hydrate string dictionary for transport - assert_eq!(optimized_schema.field(1).data_type(), &DataType::Utf8); - let array = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - let expected = StringArray::from(vec![ - Some("foo"), - Some("bar"), - None, - Some("fiz"), - None, - Some("foo"), - ]); - assert_eq!(array, &expected) - } -}