diff --git a/Cargo.lock b/Cargo.lock index fe5cf88981..300bf8bbc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1860,13 +1860,20 @@ name = "ingester" version = "0.1.0" dependencies = [ "arrow", + "arrow_util", + "data_types", + "datafusion 0.1.0", "hyper", "iox_catalog", "metric", "mutable_batch", "parking_lot", + "predicate", + "query", + "schema", "snafu", "thiserror", + "tokio", "uuid", "workspace-hack", ] @@ -4603,9 +4610,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" dependencies = [ "once_cell", ] diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 8df4f7bf46..a988285b1f 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -6,12 +6,19 @@ edition = "2021" [dependencies] arrow = { version = "7.0", features = ["prettyprint"] } +arrow_util = { path = "../arrow_util" } +datafusion = { path = "../datafusion" } +data_types = { path = "../data_types" } hyper = "0.14" iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch"} parking_lot = "0.11.2" +predicate = { path = "../predicate" } +query = { path = "../query" } +schema = { path = "../schema" } snafu = "0.7" +tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } thiserror = "1.0" uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs new file mode 100644 index 0000000000..4ffc89053d --- /dev/null +++ b/ingester/src/compact.rs @@ -0,0 +1,322 @@ +//! This module is responsible for compacting Ingester's data + +use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use query::{ + exec::{Executor, ExecutorType}, + frontend::reorg::ReorgPlanner, + QueryChunkMeta, +}; +use snafu::{ResultExt, Snafu}; +use std::sync::Arc; + +use crate::data::QueryableBatch; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error while building logical plan for Ingester's compaction"))] + LogicalPlan { + source: query::frontend::reorg::Error, + }, + + #[snafu(display("Error while building physical plan for Ingester's compaction"))] + PhysicalPlan { source: DataFusionError }, + + #[snafu(display("Error while executing Ingester's compaction"))] + ExecutePlan { source: DataFusionError }, + + #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] + DeletePredicate { + source: predicate::delete_predicate::Error, + min: String, + max: String, + predicate: String, + }, +} + +/// A specialized `Error` for Ingester's Compact errors +pub type Result = std::result::Result; + +/// Compact the given Ingester's data +/// Note: the given `executor` should be created with the IngesterServer +pub async fn compact( + executor: &Executor, + data: Arc, +) -> Result { + // Build logical plan for compaction + let ctx = executor.new_context(ExecutorType::Reorg); + let logical_plan = ReorgPlanner::new() + .scan_single_chunk_plan(data.schema(), data) + .context(LogicalPlanSnafu {})?; + + // Build physical plan + let physical_plan = ctx + .prepare_plan(&logical_plan) + .await + .context(PhysicalPlanSnafu {})?; + + // Execute the plan and return the compacted stream + let output_stream = ctx + .execute_stream(physical_plan) + .await + .context(ExecutePlanSnafu {})?; + Ok(output_stream) +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU64; + + use crate::data::SnapshotBatch; + + use super::*; + + use arrow::record_batch::RecordBatch; + use arrow_util::assert_batches_eq; + use iox_catalog::interface::SequenceNumber; + use query::test::{raw_data, TestChunk}; + + #[ignore] + #[tokio::test] + async fn test_compact_no_dedup_no_delete() { + let batches = create_record_batches_with_influxtype_no_duplicates().await; + let compact_batch = make_queryable_batch(batches); + let exc = Executor::new(1); + let stream = compact(&exc, compact_batch).await.unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + println!("output_batches: {:#?}", output_batches); + + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[ignore] + #[tokio::test] + async fn test_compact_with_dedup_no_delete() { + let batches = create_batches_with_influxtype().await; + let compact_batch = make_queryable_batch(batches); + let exc = Executor::new(1); + + let stream = compact(&exc, compact_batch).await.unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // println!("output_batches: {:#?}", output_batches); + // let table = pretty_format_batches(&[batch]).unwrap(); + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + // ---------------------------------------------------------------------------------------------- + // Data for testing + pub fn make_queryable_batch(batches: Vec>) -> Arc { + // make snapshots for the bacthes + let mut snapshots = vec![]; + let mut seq_num = 1; + for batch in batches { + let seq = SequenceNumber::new(seq_num); + snapshots.push(make_snapshot_batch(batch, seq, seq)); + seq_num += 1; + } + + Arc::new(QueryableBatch::new("test_table", snapshots, vec![])) + } + + pub fn make_snapshot_batch( + batch: Arc, + min: SequenceNumber, + max: SequenceNumber, + ) -> SnapshotBatch { + SnapshotBatch { + min_sequencer_number: min, + max_sequencer_number: max, + data: batch, + } + } + + pub async fn create_record_batches_with_influxtype_no_duplicates() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column_with_full_stats( + Some(5), + Some(7000), + 10, + Some(NonZeroU64::new(7).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("AL"), + Some("MT"), + 10, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batches = raw_data(&[chunk1]).await; + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + + // Output data look like this: + // let expected = vec![ + // "+-----------+------+-------------------------------+", + // "| field_int | tag1 | time |", + // "+-----------+------+-------------------------------+", + // "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + // "| 10 | MT | 1970-01-01 00:00:00.000007 |", + // "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + // "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + // "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + // "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + // "| 20 | MT | 1970-01-01 00:00:00.000007 |", + // "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + // "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + // "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + // "+-----------+------+-------------------------------+", + // ]; + + batches + } + + // RecordBatches with knowledge of influx metadata + pub async fn create_batches_with_influxtype() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within + // todo: may want to simplify the below data of test chunks. these are reuse the current code that cover many commpaction cases + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column_with_full_stats( + Some(5), + Some(7000), + 10, + Some(NonZeroU64::new(7).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("AL"), + Some("MT"), + 10, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + //println!("BATCH1: {:#?}", batch1); + batches.push(Arc::new(batch1)); + + // chunk2 overlaps with chunk 1 + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column_with_full_stats( + Some(5), + Some(7000), + 5, + Some(NonZeroU64::new(5).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("AL"), + Some("MT"), + 5, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + //println!("BATCH2: {:#?}", batch2); + batches.push(Arc::new(batch2)); + + // chunk3 no overlap, no duplicates within + let chunk3 = Arc::new( + TestChunk::new("t") + .with_id(3) + .with_time_column_with_full_stats( + Some(8000), + Some(20000), + 3, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("UT"), + Some("WA"), + 3, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_three_rows_of_data(), + ); + let batch3 = raw_data(&[chunk3]).await[0].clone(); + //println!("BATCH3: {:#?}", batch3); + batches.push(Arc::new(batch3)); + + // chunk3 no overlap, duplicates within + let chunk4 = Arc::new( + TestChunk::new("t") + .with_id(4) + .with_time_column_with_full_stats( + Some(28000), + Some(220000), + 4, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("UT"), + Some("WA"), + 4, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_may_contain_pk_duplicates(true) + .with_four_rows_of_data(), + ); + let batch4 = raw_data(&[chunk4]).await[0].clone(); + //println!("BATCH4: {:#?}", batch4); + batches.push(Arc::new(batch4)); + + // todo: show what output data look like in comments for easier debug + + batches + } +} diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 2e1767eb21..3f3fdf3003 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -2,6 +2,7 @@ //! use arrow::record_batch::RecordBatch; +use data_types::delete_predicate::DeletePredicate; use std::{collections::BTreeMap, sync::Arc}; use uuid::Uuid; @@ -167,17 +168,19 @@ pub struct BufferBatch { } /// SnapshotBatch contains data of many contiguous BufferBatches +#[derive(Debug)] pub struct SnapshotBatch { /// Min sequencer number of its comebined BufferBatches pub min_sequencer_number: SequenceNumber, /// Max sequencer number of its comebined BufferBatches pub max_sequencer_number: SequenceNumber, /// Data of its comebined BufferBatches kept in one RecordBatch - pub data: RecordBatch, + pub data: Arc, } /// PersistingBatch contains all needed info and data for creating /// a parquet file for given set of SnapshotBatches +#[derive(Debug)] pub struct PersistingBatch { /// Sesquencer id of the data pub sequencer_id: SequencerId, @@ -191,10 +194,23 @@ pub struct PersistingBatch { /// Id of to-be-created parquet file of this data pub object_store_id: Uuid, - /// data to be persisted + /// data + pub data: Arc, +} + +/// Queryable data used for both query and persistence +#[derive(Debug)] +pub struct QueryableBatch { + /// data pub data: Vec, - /// delete predicates to be appied to the data - /// before perssiting + /// Tomstones to be applied on data pub deletes: Vec, + + /// Delete predicates of the tombstones + /// Note: this is needed here to return its reference for a trait function + pub delete_predicates: Vec>, + + /// This is needed to return a reference for a trait function + pub table_name: String, } diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 57b5a1a450..544aab0579 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -11,8 +11,9 @@ clippy::use_self, clippy::clone_on_ref_ptr )] - -#[allow(dead_code)] +#![allow(dead_code)] +pub mod compact; pub mod data; pub mod handler; +pub mod query; pub mod server; diff --git a/ingester/src/query.rs b/ingester/src/query.rs new file mode 100644 index 0000000000..fd62d5d5c3 --- /dev/null +++ b/ingester/src/query.rs @@ -0,0 +1,322 @@ +//! Module to handle query on Ingester's data + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use data_types::{ + chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}, + delete_predicate::DeletePredicate, + partition_metadata::TableSummary, +}; +use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; +use iox_catalog::interface::Tombstone; +use predicate::{ + delete_predicate::parse_delete_predicate, + predicate::{Predicate, PredicateMatch}, +}; +use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; +use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; +use snafu::Snafu; + +use crate::data::{QueryableBatch, SnapshotBatch}; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] + DeletePredicate { + source: predicate::delete_predicate::Error, + min: String, + max: String, + predicate: String, + }, +} + +/// A specialized `Error` for Ingester's Query errors +pub type Result = std::result::Result; + +// todo: move this function to a more appropriate crate +/// Return the merged schema for RecordBatches +/// +/// This is infallable because the schemas of chunks within a +/// partition are assumed to be compatible because that schema was +/// enforced as part of writing into the partition +pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { + let mut merger = SchemaMerger::new(); + for batch in batches { + let schema = Schema::try_from(batch.schema()).expect("Schema conversion error"); + merger = merger.merge(&schema).expect("schemas compatible"); + } + Arc::new(merger.build()) +} + +impl QueryableBatch { + /// Initilaize a QueryableBatch + pub fn new(table_name: &str, data: Vec, deletes: Vec) -> Self { + let mut delete_predicates = vec![]; + for delete in &deletes { + let delete_predicate = Arc::new( + parse_delete_predicate( + &delete.min_time.get().to_string(), + &delete.max_time.get().to_string(), + &delete.serialized_predicate, + ) + .expect("Error building delete predicate"), + ); + + delete_predicates.push(delete_predicate); + } + + Self { + data, + deletes, + delete_predicates, + table_name: table_name.to_string(), + } + } +} + +impl QueryChunkMeta for QueryableBatch { + fn summary(&self) -> &TableSummary { + unimplemented!() + } + + fn schema(&self) -> Arc { + // todo: may want store this schema as a field of QueryableBatch and + // only do this schema merge the first time it is call + + // Merge schema of all RecordBatches of the PerstingBatch + let batches: Vec> = + self.data.iter().map(|s| Arc::clone(&s.data)).collect(); + merge_record_batch_schemas(&batches) + } + + fn delete_predicates(&self) -> &[Arc] { + self.delete_predicates.as_ref() + } +} + +impl QueryChunk for QueryableBatch { + type Error = Error; + + // This function should not be used in PersistingBatch context + fn id(&self) -> ChunkId { + unimplemented!() + } + + // This function should not be used in PersistingBatch context + fn addr(&self) -> ChunkAddr { + unimplemented!() + } + + /// Returns the name of the table stored in this chunk + fn table_name(&self) -> &str { + &self.table_name + } + + /// Returns true if the chunk may contain a duplicate "primary + /// key" within itself + fn may_contain_pk_duplicates(&self) -> bool { + // always true because they are not deduplicated yet + true + } + + /// Returns the result of applying the `predicate` to the chunk + /// using an efficient, but inexact method, based on metadata. + /// + /// NOTE: This method is suitable for calling during planning, and + /// may return PredicateMatch::Unknown for certain types of + /// predicates. + fn apply_predicate_to_metadata( + &self, + _predicate: &Predicate, + ) -> Result { + Ok(PredicateMatch::Unknown) + } + + /// Returns a set of Strings with column names from the specified + /// table that have at least one row that matches `predicate`, if + /// the predicate can be evaluated entirely on the metadata of + /// this Chunk. Returns `None` otherwise + fn column_names( + &self, + _predicate: &Predicate, + _columns: Selection<'_>, + ) -> Result, Self::Error> { + Ok(None) + } + + /// Return a set of Strings containing the distinct values in the + /// specified columns. If the predicate can be evaluated entirely + /// on the metadata of this Chunk. Returns `None` otherwise + /// + /// The requested columns must all have String type. + fn column_values( + &self, + _column_name: &str, + _predicate: &Predicate, + ) -> Result, Self::Error> { + Ok(None) + } + + /// Provides access to raw `QueryChunk` data as an + /// asynchronous stream of `RecordBatch`es filtered by a *required* + /// predicate. Note that not all chunks can evaluate all types of + /// predicates and this function will return an error + /// if requested to evaluate with a predicate that is not supported + /// + /// This is the analog of the `TableProvider` in DataFusion + /// + /// The reason we can't simply use the `TableProvider` trait + /// directly is that the data for a particular Table lives in + /// several chunks within a partition, so there needs to be an + /// implementation of `TableProvider` that stitches together the + /// streams from several different `QueryChunk`s. + fn read_filter( + &self, + _predicate: &Predicate, // no needs because all data will be read for compaction + _selection: Selection<'_>, // no needs because all columns will be read and compact + ) -> Result { + let batches: Vec<_> = self.data.iter().map(|s| Arc::clone(&s.data)).collect(); + let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches); + Ok(Box::pin(stream)) + } + + /// Returns true if data of this chunk is sorted + fn is_sorted_on_pk(&self) -> bool { + false + } + + /// Returns the sort key of the chunk if any + fn sort_key(&self) -> Option> { + None + } + + /// Returns chunk type + fn chunk_type(&self) -> &str { + "PersistingBatch" + } + + // This function should not be used in PersistingBatch context + fn order(&self) -> ChunkOrder { + unimplemented!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow::{ + array::{ + ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, + TimestampNanosecondArray, UInt64Array, + }, + datatypes::{DataType, Int32Type, TimeUnit}, + }; + + #[tokio::test] + async fn test_merge_batch_schema() { + // Merge schema of the batches + // The fileds in the schema are sorted by column name + let batches = create_batches(); + let merged_schema = (&*merge_record_batch_schemas(&batches)).clone(); + + // Expected Arrow schema + let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new( + "dict", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + arrow::datatypes::Field::new("int64", DataType::Int64, true), + arrow::datatypes::Field::new("string", DataType::Utf8, true), + arrow::datatypes::Field::new("bool", DataType::Boolean, true), + arrow::datatypes::Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + arrow::datatypes::Field::new("uint64", DataType::UInt64, false), + arrow::datatypes::Field::new("float64", DataType::Float64, true), + ])); + let expected_schema = Schema::try_from(arrow_schema) + .unwrap() + .sort_fields_by_name(); + + assert_eq!( + expected_schema, merged_schema, + "\nExpected:\n{:#?}\nActual:\n{:#?}", + expected_schema, merged_schema + ); + } + + // ---------------------------------------------------------------------------------------------- + // Data for testing + // Create pure RecordBatches without knowledge of Influx datatype + fn create_batches() -> Vec> { + // Batch 1: & 3 rows + let dict_array: ArrayRef = Arc::new( + vec![Some("a"), None, Some("b")] + .into_iter() + .collect::>(), + ); + let int64_array: ArrayRef = + Arc::new([Some(-1), None, Some(2)].iter().collect::()); + let string_array: ArrayRef = Arc::new( + vec![Some("foo"), Some("and"), Some("bar")] + .into_iter() + .collect::(), + ); + let bool_array: ArrayRef = Arc::new( + [Some(true), None, Some(false)] + .iter() + .collect::(), + ); + let ts_array: ArrayRef = Arc::new( + [Some(150), Some(200), Some(1526823730000000000)] + .iter() + .collect::(), + ); + let batch1 = RecordBatch::try_from_iter_with_nullable(vec![ + ("dict", dict_array, true), + ("int64", int64_array, true), + ("string", string_array, true), + ("bool", bool_array, true), + ("time", ts_array, false), // not null + ]) + .unwrap(); + + // Batch 2: & 2 rows + let dict_array: ArrayRef = Arc::new( + vec![None, Some("d")] + .into_iter() + .collect::>(), + ); + let uint64_array: ArrayRef = Arc::new([Some(1), Some(2)].iter().collect::()); // not null + let float64_array: ArrayRef = + Arc::new([Some(1.0), Some(2.0)].iter().collect::()); + let string_array: ArrayRef = Arc::new( + vec![Some("foo"), Some("bar")] + .into_iter() + .collect::(), + ); + let bool_array: ArrayRef = Arc::new([Some(true), None].iter().collect::()); + let ts_array: ArrayRef = Arc::new( + [Some(100), Some(1626823730000000000)] // not null + .iter() + .collect::(), + ); + let batch2 = RecordBatch::try_from_iter_with_nullable(vec![ + ("dict", dict_array, true), + ("uint64", uint64_array, false), // not null + ("float64", float64_array, true), + ("string", string_array, true), + ("bool", bool_array, true), + ("time", ts_array, false), // not null + ]) + .unwrap(); + + vec![Arc::new(batch1), Arc::new(batch2)] + } +}