From 191adc9fc7add478d87db637409c3dd35a6aec68 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Thu, 20 Jan 2022 18:22:41 -0500 Subject: [PATCH 1/8] feat: initial implementation for ingester's compaction --- ingester/Cargo.toml | 5 + ingester/src/compact.rs | 273 ++++++++++++++++++++++++++++++++++++++++ ingester/src/data.rs | 4 +- ingester/src/lib.rs | 2 + 4 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 ingester/src/compact.rs diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 7e1d8f1719..54b2d5bd6e 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -6,9 +6,14 @@ edition = "2021" [dependencies] arrow = { version = "7.0", features = ["prettyprint"] } +datafusion = { path = "../datafusion" } +data_types = { path = "../data_types" } iox_catalog = { path = "../iox_catalog" } mutable_batch = { path = "../mutable_batch"} parking_lot = "0.11.2" +predicate = { path = "../predicate" } +query = { path = "../query" } +schema = { path = "../schema" } snafu = "0.7" uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs new file mode 100644 index 0000000000..4b133e7eef --- /dev/null +++ b/ingester/src/compact.rs @@ -0,0 +1,273 @@ +//! This module is responsible for compacting Ingester's data + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use data_types::{ + chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}, + delete_predicate::DeletePredicate, + partition_metadata::TableSummary, +}; +use datafusion::{ + error::DataFusionError, + physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}, +}; +use predicate::{ + delete_predicate::parse_delete_predicate, + predicate::{Predicate, PredicateMatch}, +}; +use query::{ + exec::{stringset::StringSet, Executor, ExecutorType}, + frontend::reorg::ReorgPlanner, + QueryChunk, QueryChunkMeta, +}; +use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; +use snafu::{ResultExt, Snafu}; + +use crate::data::PersistingBatch; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error while building logical plan for Ingester's compaction"))] + LogicalPlan { + source: query::frontend::reorg::Error, + }, + + #[snafu(display("Error while building physical plan for Ingester's compaction"))] + PhysicalPlan { source: DataFusionError }, + + #[snafu(display("Error while executing Ingester's compaction"))] + ExecutePlan { source: DataFusionError }, + + #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] + DeletePredicate { + source: predicate::delete_predicate::Error, + min: String, + max: String, + predicate: String, + }, +} + +/// A specialized `Error` for Ingester's Compact errors +pub type Result = std::result::Result; + +/// Compact the given Ingester's data +/// Note: the given `executor` should be created with the IngesterServer +pub async fn compact( + executor: &Executor, + data: &'static PersistingBatch, +) -> Result { + let chunk = CompactingChunk::new(data); + + // Build logical plan for compaction + let ctx = executor.new_context(ExecutorType::Reorg); + let logical_plan = ReorgPlanner::new() + .scan_single_chunk_plan(chunk.schema(), Arc::new(chunk)) + .context(LogicalPlanSnafu {})?; + + // Build physical plan + let physical_plan = ctx + .prepare_plan(&logical_plan) + .await + .context(PhysicalPlanSnafu {})?; + + // Execute the plan and return the compacted stream + let output_stream = ctx + .execute_stream(physical_plan) + .await + .context(ExecutePlanSnafu {})?; + Ok(output_stream) +} + +// todo: move this function to a more appropriate crate +/// Return the merged schema for RecordBatches +/// +/// This is infallable because the schemas of chunks within a +/// partition are assumed to be compatible because that schema was +/// enforced as part of writing into the partition +pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { + let mut merger = SchemaMerger::new(); + for batch in batches { + let schema = Schema::try_from(batch.schema()).expect("Schema conversion error"); + merger = merger.merge(&schema).expect("schemas compatible"); + } + Arc::new(merger.build()) +} + +// ------------------------------------------------------------ +/// CompactingChunk is a wrapper of a PersistingBatch that implements +/// QueryChunk and QueryChunkMeta needed to build a query plan to compact its data +#[derive(Debug)] +pub struct CompactingChunk<'a> { + /// Provided ingest data + pub data: &'a PersistingBatch, + + /// Statistic of this data which is currently empty as it may not needed + pub summary: TableSummary, + + /// Delete predicates to be applied while copacting this + pub delete_predicates: Vec>, +} + +impl<'a> CompactingChunk<'a> { + /// Create a PersistingChunk for a given PesistingBatch + pub fn new(data: &'a PersistingBatch) -> Self { + // Empty table summary because PersistingBatch does not keep stats + // Todo - note to self: the TableSummary is used to get stastistics + // while planning the scan. This empty column stats may + // cause some issues during planning. Will verify if it is the case. + let summary = TableSummary { + // todo: do not have table name to provide here. + // Either accept this ID as the name (used mostly for debug/log) while + // running compaction or store table name with the PersistingBatch to + // avoid reading the catalog for it + name: data.table_id.get().to_string(), + columns: vec![], + }; + + let mut delete_predicates = vec![]; + for delete in &data.deletes { + let delete_predicate = Arc::new( + parse_delete_predicate( + &delete.min_time.get().to_string(), + &delete.max_time.get().to_string(), + &delete.serialized_predicate, + ) + .expect("Error building delete predicate"), + ); + + delete_predicates.push(delete_predicate); + } + + Self { + data, + summary, + delete_predicates, + } + } +} + +impl QueryChunkMeta for CompactingChunk<'_> { + fn summary(&self) -> &TableSummary { + &self.summary + } + + fn schema(&self) -> Arc { + // Merge schema of all RecordBatches of the PerstingBatch + let batches: Vec> = + self.data.data.iter().map(|s| Arc::clone(&s.data)).collect(); + merge_record_batch_schemas(&batches) + } + + fn delete_predicates(&self) -> &[Arc] { + self.delete_predicates.as_ref() + } +} + +impl QueryChunk for CompactingChunk<'_> { + type Error = Error; + + // This function should not be used in PersistingBatch context + fn id(&self) -> ChunkId { + unimplemented!() + } + + // This function should not be used in PersistingBatch context + fn addr(&self) -> ChunkAddr { + unimplemented!() + } + + /// Returns the name of the table stored in this chunk + fn table_name(&self) -> &str { + &self.summary.name + } + + /// Returns true if the chunk may contain a duplicate "primary + /// key" within itself + fn may_contain_pk_duplicates(&self) -> bool { + // always true because they are not deduplicated yet + true + } + + /// Returns the result of applying the `predicate` to the chunk + /// using an efficient, but inexact method, based on metadata. + /// + /// NOTE: This method is suitable for calling during planning, and + /// may return PredicateMatch::Unknown for certain types of + /// predicates. + fn apply_predicate_to_metadata( + &self, + _predicate: &Predicate, + ) -> Result { + Ok(PredicateMatch::Unknown) + } + + /// Returns a set of Strings with column names from the specified + /// table that have at least one row that matches `predicate`, if + /// the predicate can be evaluated entirely on the metadata of + /// this Chunk. Returns `None` otherwise + fn column_names( + &self, + _predicate: &Predicate, + _columns: Selection<'_>, + ) -> Result, Self::Error> { + Ok(None) + } + + /// Return a set of Strings containing the distinct values in the + /// specified columns. If the predicate can be evaluated entirely + /// on the metadata of this Chunk. Returns `None` otherwise + /// + /// The requested columns must all have String type. + fn column_values( + &self, + _column_name: &str, + _predicate: &Predicate, + ) -> Result, Self::Error> { + Ok(None) + } + + /// Provides access to raw `QueryChunk` data as an + /// asynchronous stream of `RecordBatch`es filtered by a *required* + /// predicate. Note that not all chunks can evaluate all types of + /// predicates and this function will return an error + /// if requested to evaluate with a predicate that is not supported + /// + /// This is the analog of the `TableProvider` in DataFusion + /// + /// The reason we can't simply use the `TableProvider` trait + /// directly is that the data for a particular Table lives in + /// several chunks within a partition, so there needs to be an + /// implementation of `TableProvider` that stitches together the + /// streams from several different `QueryChunk`s. + fn read_filter( + &self, + _predicate: &Predicate, + _selection: Selection<'_>, + ) -> Result { + let batches: Vec<_> = self.data.data.iter().map(|s| Arc::clone(&s.data)).collect(); + let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches); + Ok(Box::pin(stream)) + } + + /// Returns true if data of this chunk is sorted + fn is_sorted_on_pk(&self) -> bool { + false + } + + /// Returns the sort key of the chunk if any + fn sort_key(&self) -> Option> { + None + } + + /// Returns chunk type + fn chunk_type(&self) -> &str { + "PersistingBatch" + } + + // This function should not be used in PersistingBatch context + fn order(&self) -> ChunkOrder { + unimplemented!() + } +} diff --git a/ingester/src/data.rs b/ingester/src/data.rs index b838faca54..04addbf82e 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -169,17 +169,19 @@ pub struct BufferBatch { } /// SnapshotBatch contains data of many contiguous BufferBatches +#[derive(Debug)] pub struct SnapshotBatch { /// Min sequencer number of its comebined BufferBatches pub min_sequencer_number: SequenceNumber, /// Max sequencer number of its comebined BufferBatches pub max_sequencer_number: SequenceNumber, /// Data of its comebined BufferBatches kept in one RecordBatch - pub data: RecordBatch, + pub data: Arc, } /// PersistingBatch contains all needed info and data for creating /// a parquet file for given set of SnapshotBatches +#[derive(Debug)] pub struct PersistingBatch { /// Sesquencer id of the data pub sequencer_id: SequencerId, diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 31bc719a49..dd22318dbf 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -13,5 +13,7 @@ )] #[allow(dead_code)] + +pub mod compact; pub mod data; pub mod server; From cd01b141f3be5f0e82e223070a42c16f9ba09a75 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Fri, 21 Jan 2022 16:49:02 -0500 Subject: [PATCH 2/8] refactor: for paul --- ingester/Cargo.toml | 1 + ingester/src/compact.rs | 294 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 289 insertions(+), 6 deletions(-) diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 54b2d5bd6e..e2af2ad35b 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -15,5 +15,6 @@ predicate = { path = "../predicate" } query = { path = "../query" } schema = { path = "../schema" } snafu = "0.7" +tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 4b133e7eef..50bb8d5b3a 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -56,7 +56,7 @@ pub type Result = std::result::Result; /// Note: the given `executor` should be created with the IngesterServer pub async fn compact( executor: &Executor, - data: &'static PersistingBatch, + data: Arc, ) -> Result { let chunk = CompactingChunk::new(data); @@ -99,9 +99,9 @@ pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { /// CompactingChunk is a wrapper of a PersistingBatch that implements /// QueryChunk and QueryChunkMeta needed to build a query plan to compact its data #[derive(Debug)] -pub struct CompactingChunk<'a> { +pub struct CompactingChunk { /// Provided ingest data - pub data: &'a PersistingBatch, + pub data: Arc, /// Statistic of this data which is currently empty as it may not needed pub summary: TableSummary, @@ -112,7 +112,7 @@ pub struct CompactingChunk<'a> { impl<'a> CompactingChunk<'a> { /// Create a PersistingChunk for a given PesistingBatch - pub fn new(data: &'a PersistingBatch) -> Self { + pub fn new(data: Arc) -> Self { // Empty table summary because PersistingBatch does not keep stats // Todo - note to self: the TableSummary is used to get stastistics // while planning the scan. This empty column stats may @@ -243,8 +243,8 @@ impl QueryChunk for CompactingChunk<'_> { /// streams from several different `QueryChunk`s. fn read_filter( &self, - _predicate: &Predicate, - _selection: Selection<'_>, + _predicate: &Predicate, // no needs because all data will be read for compaction + _selection: Selection<'_>, // no needs because all columns will be read and compact ) -> Result { let batches: Vec<_> = self.data.data.iter().map(|s| Arc::clone(&s.data)).collect(); let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches); @@ -271,3 +271,285 @@ impl QueryChunk for CompactingChunk<'_> { unimplemented!() } } + + + +#[cfg(test)] +mod tests { + use std::num::NonZeroU64; + + use crate::data::SnapshotBatch; + + use super::*; + + use arrow::{array::{ArrayRef, DictionaryArray, Int64Array, StringArray, BooleanArray, TimestampNanosecondArray, UInt64Array, Float64Array}, datatypes::{Int32Type, DataType, TimeUnit}}; + use iox_catalog::interface::{SequenceNumber, SequencerId, TableId, PartitionId}; + use query::test::{TestChunk, raw_data}; + use uuid::Uuid; + + #[tokio::test] + async fn test_merge_batch_schema() { + + // Merge schema of the batches + // The fileds in the schema are sorted by column name + let batches = create_batches(); + let merged_schema = (&*merge_record_batch_schemas(&batches)).clone(); + + // Expected Arrow schema + let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("dict", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true), + arrow::datatypes::Field::new("int64", DataType::Int64, true), + arrow::datatypes::Field::new("string", DataType::Utf8, true), + arrow::datatypes::Field::new("bool", DataType::Boolean, true), + arrow::datatypes::Field::new("time", DataType::Timestamp(TimeUnit::Nanosecond, None), false), + arrow::datatypes::Field::new("uint64", DataType::UInt64, false), + arrow::datatypes::Field::new("float64", DataType::Float64, true), + ])); + let expected_schema = Schema::try_from(arrow_schema).unwrap().sort_fields_by_name(); + + assert_eq!( + expected_schema, merged_schema, + "\nExpected:\n{:#?}\nActual:\n{:#?}", + expected_schema, merged_schema + ); + } + + #[tokio::test] + async fn test_compact() { + let batches = create_batches_with_influxtype().await; + let persisting_batch = make_persisting_batch(batches); + let exc = Executor::new(1); + + let stream = compact(&exc, persisting_batch).await.unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream).await.unwrap(); + + println!("output_batches: {:#?}", output_batches); + + + // let table = pretty_format_batches(&[batch]).unwrap(); + + // let expected = vec![ + // "+------+-------+--------+---------+-------+--------+--------------------------------+", + // "| dict | int64 | uint64 | float64 | bool | string | time |", + // "+------+-------+--------+---------+-------+--------+--------------------------------+", + // "| a | -1 | 1 | 1 | true | foo | |", + // "| | | | | | | 1970-01-01T00:00:00.000000100Z |", + // "| b | 2 | 2 | 2 | false | bar | 2021-07-20T23:28:50Z |", + // "+------+-------+--------+---------+-------+--------+--------------------------------+", + // ]; + + } + + // ---------------------------------------------------------------------------------------------- + // Data for testing + // Crerate pure RecordBatches without knowledge of Influx datatype + fn create_batches() -> Vec> { + // Batch 1: & 3 rows + let dict_array: ArrayRef = Arc::new( + vec![Some("a"), None, Some("b")] + .into_iter() + .collect::>(), + ); + let int64_array: ArrayRef = + Arc::new([Some(-1), None, Some(2)].iter().collect::()); + let string_array: ArrayRef = Arc::new( + vec![Some("foo"), Some("and"), Some("bar")] + .into_iter() + .collect::(), + ); + let bool_array: ArrayRef = Arc::new( + [Some(true), None, Some(false)] + .iter() + .collect::(), + ); + let ts_array: ArrayRef = Arc::new( + [Some(150), Some(200), Some(1526823730000000000)] + .iter() + .collect::(), + ); + let batch1 = RecordBatch::try_from_iter_with_nullable(vec![ + ("dict", dict_array, true), + ("int64", int64_array, true), + ("string", string_array, true), + ("bool", bool_array, true), + ("time", ts_array, false), // not null + ]) + .unwrap(); + + + // Batch 2: & 2 rows + let dict_array: ArrayRef = Arc::new( + vec![None, Some("d")] + .into_iter() + .collect::>(), + ); + let uint64_array: ArrayRef = + Arc::new([Some(1), Some(2)].iter().collect::()); // not null + let float64_array: ArrayRef = Arc::new( + [Some(1.0), Some(2.0)] + .iter() + .collect::(), + ); + let string_array: ArrayRef = Arc::new( + vec![Some("foo"), Some("bar")] + .into_iter() + .collect::(), + ); + let bool_array: ArrayRef = Arc::new( + [Some(true), None] + .iter() + .collect::(), + ); + let ts_array: ArrayRef = Arc::new( + [Some(100), Some(1626823730000000000)] // not null + .iter() + .collect::(), + ); + let batch2 = RecordBatch::try_from_iter_with_nullable (vec![ + ("dict", dict_array, true), + ("uint64", uint64_array, false), // not null + ("float64", float64_array, true), + ("string", string_array, true), + ("bool", bool_array, true), + ("time", ts_array, false), // not null + ]) + .unwrap(); + + vec![Arc::new(batch1), Arc::new(batch2)] + } + + // RecordBatches with knowledge of influx metadata + pub async fn create_batches_with_influxtype() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column_with_full_stats( + Some(5), + Some(7000), + 10, + Some(NonZeroU64::new(7).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("AL"), + Some("MT"), + 10, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&vec![chunk1]).await[0].clone(); + //println!("BATCH1: {:#?}", batch1); + batches.push(Arc::new(batch1)); + + // chunk2 overlaps with chunk 1 + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column_with_full_stats( + Some(5), + Some(7000), + 5, + Some(NonZeroU64::new(5).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("AL"), + Some("MT"), + 5, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&vec![chunk2]).await[0].clone(); + //println!("BATCH2: {:#?}", batch2); + batches.push(Arc::new(batch2)); + + // chunk3 no overlap, no duplicates within + let chunk3 = Arc::new( + TestChunk::new("t") + .with_id(3) + .with_time_column_with_full_stats( + Some(8000), + Some(20000), + 3, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("UT"), + Some("WA"), + 3, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_three_rows_of_data(), + ); + let batch3 = raw_data(&vec![chunk3]).await[0].clone(); + //println!("BATCH3: {:#?}", batch3); + batches.push(Arc::new(batch3)); + + // chunk3 no overlap, duplicates within + let chunk4 = Arc::new( + TestChunk::new("t") + .with_id(4) + .with_time_column_with_full_stats( + Some(28000), + Some(220000), + 4, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("UT"), + Some("WA"), + 4, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_may_contain_pk_duplicates(true) + .with_four_rows_of_data(), + ); + let batch4 = raw_data(&vec![chunk4]).await[0].clone(); + //println!("BATCH4: {:#?}", batch4); + batches.push(Arc::new(batch4)); + + batches + } + + pub fn make_persisting_batch<'a>(batches: Vec>) -> PersistingBatch { + // make snapshots for the bacthes + let mut snapshots = vec![]; + let mut seq_num = 1; + for batch in batches { + let seq = SequenceNumber::new(seq_num); + snapshots.push(make_snapshot_batch(batch, seq, seq )); + seq_num = seq_num + 1; + } + + PersistingBatch { + sequencer_id: SequencerId::new(1), + table_id: TableId::new(1), + partition_id: PartitionId::new(1), + object_store_id: Uuid::new_v4(), + data: snapshots, + deletes: vec![], + } + } + + pub fn make_snapshot_batch(batch: Arc, min: SequenceNumber, max: SequenceNumber) -> SnapshotBatch { + SnapshotBatch { + min_sequencer_number: min, + max_sequencer_number: max, + data: batch, + } + } + + +} \ No newline at end of file From fa41067e3d7c6a6600a25c2bd73dfdb292a2c92b Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Fri, 21 Jan 2022 16:50:49 -0500 Subject: [PATCH 3/8] refactor: for paul --- ingester/src/compact.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 50bb8d5b3a..c0aad0302f 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -110,7 +110,7 @@ pub struct CompactingChunk { pub delete_predicates: Vec>, } -impl<'a> CompactingChunk<'a> { +impl<'a> CompactingChunk { /// Create a PersistingChunk for a given PesistingBatch pub fn new(data: Arc) -> Self { // Empty table summary because PersistingBatch does not keep stats From ee0a468b4deb705ff0cacd34ecb20c185497daf5 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Fri, 21 Jan 2022 18:15:23 -0500 Subject: [PATCH 4/8] feat: a few tests for compaction --- ingester/src/compact.rs | 348 +++------------------------------------- ingester/src/data.rs | 26 ++- ingester/src/lib.rs | 1 + ingester/src/query.rs | 311 +++++++++++++++++++++++++++++++++++ 4 files changed, 354 insertions(+), 332 deletions(-) create mode 100644 ingester/src/query.rs diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index c0aad0302f..430a9d4a03 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -24,7 +24,7 @@ use query::{ use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; -use crate::data::PersistingBatch; +use crate::data::{PersistingBatch, QueryableBatch}; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -56,14 +56,13 @@ pub type Result = std::result::Result; /// Note: the given `executor` should be created with the IngesterServer pub async fn compact( executor: &Executor, - data: Arc, + data: Arc, ) -> Result { - let chunk = CompactingChunk::new(data); // Build logical plan for compaction let ctx = executor.new_context(ExecutorType::Reorg); let logical_plan = ReorgPlanner::new() - .scan_single_chunk_plan(chunk.schema(), Arc::new(chunk)) + .scan_single_chunk_plan(data.schema(), data) .context(LogicalPlanSnafu {})?; // Build physical plan @@ -80,199 +79,6 @@ pub async fn compact( Ok(output_stream) } -// todo: move this function to a more appropriate crate -/// Return the merged schema for RecordBatches -/// -/// This is infallable because the schemas of chunks within a -/// partition are assumed to be compatible because that schema was -/// enforced as part of writing into the partition -pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { - let mut merger = SchemaMerger::new(); - for batch in batches { - let schema = Schema::try_from(batch.schema()).expect("Schema conversion error"); - merger = merger.merge(&schema).expect("schemas compatible"); - } - Arc::new(merger.build()) -} - -// ------------------------------------------------------------ -/// CompactingChunk is a wrapper of a PersistingBatch that implements -/// QueryChunk and QueryChunkMeta needed to build a query plan to compact its data -#[derive(Debug)] -pub struct CompactingChunk { - /// Provided ingest data - pub data: Arc, - - /// Statistic of this data which is currently empty as it may not needed - pub summary: TableSummary, - - /// Delete predicates to be applied while copacting this - pub delete_predicates: Vec>, -} - -impl<'a> CompactingChunk { - /// Create a PersistingChunk for a given PesistingBatch - pub fn new(data: Arc) -> Self { - // Empty table summary because PersistingBatch does not keep stats - // Todo - note to self: the TableSummary is used to get stastistics - // while planning the scan. This empty column stats may - // cause some issues during planning. Will verify if it is the case. - let summary = TableSummary { - // todo: do not have table name to provide here. - // Either accept this ID as the name (used mostly for debug/log) while - // running compaction or store table name with the PersistingBatch to - // avoid reading the catalog for it - name: data.table_id.get().to_string(), - columns: vec![], - }; - - let mut delete_predicates = vec![]; - for delete in &data.deletes { - let delete_predicate = Arc::new( - parse_delete_predicate( - &delete.min_time.get().to_string(), - &delete.max_time.get().to_string(), - &delete.serialized_predicate, - ) - .expect("Error building delete predicate"), - ); - - delete_predicates.push(delete_predicate); - } - - Self { - data, - summary, - delete_predicates, - } - } -} - -impl QueryChunkMeta for CompactingChunk<'_> { - fn summary(&self) -> &TableSummary { - &self.summary - } - - fn schema(&self) -> Arc { - // Merge schema of all RecordBatches of the PerstingBatch - let batches: Vec> = - self.data.data.iter().map(|s| Arc::clone(&s.data)).collect(); - merge_record_batch_schemas(&batches) - } - - fn delete_predicates(&self) -> &[Arc] { - self.delete_predicates.as_ref() - } -} - -impl QueryChunk for CompactingChunk<'_> { - type Error = Error; - - // This function should not be used in PersistingBatch context - fn id(&self) -> ChunkId { - unimplemented!() - } - - // This function should not be used in PersistingBatch context - fn addr(&self) -> ChunkAddr { - unimplemented!() - } - - /// Returns the name of the table stored in this chunk - fn table_name(&self) -> &str { - &self.summary.name - } - - /// Returns true if the chunk may contain a duplicate "primary - /// key" within itself - fn may_contain_pk_duplicates(&self) -> bool { - // always true because they are not deduplicated yet - true - } - - /// Returns the result of applying the `predicate` to the chunk - /// using an efficient, but inexact method, based on metadata. - /// - /// NOTE: This method is suitable for calling during planning, and - /// may return PredicateMatch::Unknown for certain types of - /// predicates. - fn apply_predicate_to_metadata( - &self, - _predicate: &Predicate, - ) -> Result { - Ok(PredicateMatch::Unknown) - } - - /// Returns a set of Strings with column names from the specified - /// table that have at least one row that matches `predicate`, if - /// the predicate can be evaluated entirely on the metadata of - /// this Chunk. Returns `None` otherwise - fn column_names( - &self, - _predicate: &Predicate, - _columns: Selection<'_>, - ) -> Result, Self::Error> { - Ok(None) - } - - /// Return a set of Strings containing the distinct values in the - /// specified columns. If the predicate can be evaluated entirely - /// on the metadata of this Chunk. Returns `None` otherwise - /// - /// The requested columns must all have String type. - fn column_values( - &self, - _column_name: &str, - _predicate: &Predicate, - ) -> Result, Self::Error> { - Ok(None) - } - - /// Provides access to raw `QueryChunk` data as an - /// asynchronous stream of `RecordBatch`es filtered by a *required* - /// predicate. Note that not all chunks can evaluate all types of - /// predicates and this function will return an error - /// if requested to evaluate with a predicate that is not supported - /// - /// This is the analog of the `TableProvider` in DataFusion - /// - /// The reason we can't simply use the `TableProvider` trait - /// directly is that the data for a particular Table lives in - /// several chunks within a partition, so there needs to be an - /// implementation of `TableProvider` that stitches together the - /// streams from several different `QueryChunk`s. - fn read_filter( - &self, - _predicate: &Predicate, // no needs because all data will be read for compaction - _selection: Selection<'_>, // no needs because all columns will be read and compact - ) -> Result { - let batches: Vec<_> = self.data.data.iter().map(|s| Arc::clone(&s.data)).collect(); - let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches); - Ok(Box::pin(stream)) - } - - /// Returns true if data of this chunk is sorted - fn is_sorted_on_pk(&self) -> bool { - false - } - - /// Returns the sort key of the chunk if any - fn sort_key(&self) -> Option> { - None - } - - /// Returns chunk type - fn chunk_type(&self) -> &str { - "PersistingBatch" - } - - // This function should not be used in PersistingBatch context - fn order(&self) -> ChunkOrder { - unimplemented!() - } -} - - #[cfg(test)] mod tests { @@ -287,37 +93,12 @@ mod tests { use query::test::{TestChunk, raw_data}; use uuid::Uuid; - #[tokio::test] - async fn test_merge_batch_schema() { - // Merge schema of the batches - // The fileds in the schema are sorted by column name - let batches = create_batches(); - let merged_schema = (&*merge_record_batch_schemas(&batches)).clone(); - - // Expected Arrow schema - let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![ - arrow::datatypes::Field::new("dict", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true), - arrow::datatypes::Field::new("int64", DataType::Int64, true), - arrow::datatypes::Field::new("string", DataType::Utf8, true), - arrow::datatypes::Field::new("bool", DataType::Boolean, true), - arrow::datatypes::Field::new("time", DataType::Timestamp(TimeUnit::Nanosecond, None), false), - arrow::datatypes::Field::new("uint64", DataType::UInt64, false), - arrow::datatypes::Field::new("float64", DataType::Float64, true), - ])); - let expected_schema = Schema::try_from(arrow_schema).unwrap().sort_fields_by_name(); - - assert_eq!( - expected_schema, merged_schema, - "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, merged_schema - ); - } #[tokio::test] async fn test_compact() { let batches = create_batches_with_influxtype().await; - let persisting_batch = make_persisting_batch(batches); + let persisting_batch = make_queryable_batch(batches); let exc = Executor::new(1); let stream = compact(&exc, persisting_batch).await.unwrap(); @@ -342,82 +123,27 @@ mod tests { // ---------------------------------------------------------------------------------------------- // Data for testing - // Crerate pure RecordBatches without knowledge of Influx datatype - fn create_batches() -> Vec> { - // Batch 1: & 3 rows - let dict_array: ArrayRef = Arc::new( - vec![Some("a"), None, Some("b")] - .into_iter() - .collect::>(), - ); - let int64_array: ArrayRef = - Arc::new([Some(-1), None, Some(2)].iter().collect::()); - let string_array: ArrayRef = Arc::new( - vec![Some("foo"), Some("and"), Some("bar")] - .into_iter() - .collect::(), - ); - let bool_array: ArrayRef = Arc::new( - [Some(true), None, Some(false)] - .iter() - .collect::(), - ); - let ts_array: ArrayRef = Arc::new( - [Some(150), Some(200), Some(1526823730000000000)] - .iter() - .collect::(), - ); - let batch1 = RecordBatch::try_from_iter_with_nullable(vec![ - ("dict", dict_array, true), - ("int64", int64_array, true), - ("string", string_array, true), - ("bool", bool_array, true), - ("time", ts_array, false), // not null - ]) - .unwrap(); + pub fn make_queryable_batch<'a>(batches: Vec>) -> Arc { + // make snapshots for the bacthes + let mut snapshots = vec![]; + let mut seq_num = 1; + for batch in batches { + let seq = SequenceNumber::new(seq_num); + snapshots.push(make_snapshot_batch(batch, seq, seq )); + seq_num = seq_num + 1; + } - - // Batch 2: & 2 rows - let dict_array: ArrayRef = Arc::new( - vec![None, Some("d")] - .into_iter() - .collect::>(), - ); - let uint64_array: ArrayRef = - Arc::new([Some(1), Some(2)].iter().collect::()); // not null - let float64_array: ArrayRef = Arc::new( - [Some(1.0), Some(2.0)] - .iter() - .collect::(), - ); - let string_array: ArrayRef = Arc::new( - vec![Some("foo"), Some("bar")] - .into_iter() - .collect::(), - ); - let bool_array: ArrayRef = Arc::new( - [Some(true), None] - .iter() - .collect::(), - ); - let ts_array: ArrayRef = Arc::new( - [Some(100), Some(1626823730000000000)] // not null - .iter() - .collect::(), - ); - let batch2 = RecordBatch::try_from_iter_with_nullable (vec![ - ("dict", dict_array, true), - ("uint64", uint64_array, false), // not null - ("float64", float64_array, true), - ("string", string_array, true), - ("bool", bool_array, true), - ("time", ts_array, false), // not null - ]) - .unwrap(); - - vec![Arc::new(batch1), Arc::new(batch2)] + Arc::new(QueryableBatch::new("test_table", snapshots, vec![])) } + pub fn make_snapshot_batch(batch: Arc, min: SequenceNumber, max: SequenceNumber) -> Arc { + Arc::new(SnapshotBatch { + min_sequencer_number: min, + max_sequencer_number: max, + data: batch, + }) + } + // RecordBatches with knowledge of influx metadata pub async fn create_batches_with_influxtype() -> Vec> { // Use the available TestChunk to create chunks and then convert them to raw RecordBatches @@ -522,34 +248,4 @@ mod tests { batches } - - pub fn make_persisting_batch<'a>(batches: Vec>) -> PersistingBatch { - // make snapshots for the bacthes - let mut snapshots = vec![]; - let mut seq_num = 1; - for batch in batches { - let seq = SequenceNumber::new(seq_num); - snapshots.push(make_snapshot_batch(batch, seq, seq )); - seq_num = seq_num + 1; - } - - PersistingBatch { - sequencer_id: SequencerId::new(1), - table_id: TableId::new(1), - partition_id: PartitionId::new(1), - object_store_id: Uuid::new_v4(), - data: snapshots, - deletes: vec![], - } - } - - pub fn make_snapshot_batch(batch: Arc, min: SequenceNumber, max: SequenceNumber) -> SnapshotBatch { - SnapshotBatch { - min_sequencer_number: min, - max_sequencer_number: max, - data: batch, - } - } - - } \ No newline at end of file diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 04addbf82e..d6e444f743 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -2,6 +2,7 @@ //! use arrow::record_batch::RecordBatch; +use data_types::delete_predicate::DeletePredicate; use std::{collections::BTreeMap, sync::Arc}; use uuid::Uuid; @@ -195,10 +196,23 @@ pub struct PersistingBatch { /// Id of to-be-created parquet file of this data pub object_store_id: Uuid, - /// data to be persisted - pub data: Vec, - - /// delete predicates to be appied to the data - /// before perssiting - pub deletes: Vec, + /// data + pub data: Arc, +} + +// Queryable data used for both query and persistence +#[derive(Debug)] +pub struct QueryableBatch { + /// data + pub data: Vec>, + + /// Tomstones to be applied on data + pub deletes: Vec, + + /// Delete predicates of the tombstones + /// Note: this is needed here to return its reference for a trait function + pub delete_predicates: Vec>, + + //// This is needed to return a reference for a trait function + pub table_name: String, } diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index dd22318dbf..77c5561e23 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -16,4 +16,5 @@ pub mod compact; pub mod data; +pub mod query; pub mod server; diff --git a/ingester/src/query.rs b/ingester/src/query.rs new file mode 100644 index 0000000000..b97bd2c47a --- /dev/null +++ b/ingester/src/query.rs @@ -0,0 +1,311 @@ +//! Module to handle query on Ingester's data + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use data_types::{partition_metadata::TableSummary, delete_predicate::DeletePredicate, chunk_metadata::{ChunkId, ChunkAddr, ChunkOrder}}; +use datafusion::physical_plan::{SendableRecordBatchStream, common::SizedRecordBatchStream}; +use iox_catalog::interface::Tombstone; +use predicate::{delete_predicate::parse_delete_predicate, predicate::{Predicate, PredicateMatch}}; +use query::{QueryChunkMeta, QueryChunk, exec::stringset::StringSet}; +use schema::{Schema, merge::SchemaMerger, selection::Selection, sort::SortKey}; +use snafu::Snafu; + +use crate::data::{SnapshotBatch, QueryableBatch}; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] + DeletePredicate { + source: predicate::delete_predicate::Error, + min: String, + max: String, + predicate: String, + }, +} + +/// A specialized `Error` for Ingester's Query errors +pub type Result = std::result::Result; + + +// todo: move this function to a more appropriate crate +/// Return the merged schema for RecordBatches +/// +/// This is infallable because the schemas of chunks within a +/// partition are assumed to be compatible because that schema was +/// enforced as part of writing into the partition +pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { + let mut merger = SchemaMerger::new(); + for batch in batches { + let schema = Schema::try_from(batch.schema()).expect("Schema conversion error"); + merger = merger.merge(&schema).expect("schemas compatible"); + } + Arc::new(merger.build()) +} + + +impl QueryableBatch { + pub fn new(table_name: &str, data: Vec>, deletes: Vec) -> Self { + + let mut delete_predicates = vec![]; + for delete in &deletes { + let delete_predicate = Arc::new( + parse_delete_predicate( + &delete.min_time.get().to_string(), + &delete.max_time.get().to_string(), + &delete.serialized_predicate, + ) + .expect("Error building delete predicate"), + ); + + delete_predicates.push(delete_predicate); + } + + Self { + data, + deletes, + delete_predicates, + table_name: table_name.to_string(), + } + } +} + +impl QueryChunkMeta for QueryableBatch { + fn summary(&self) -> &TableSummary { + unimplemented!() + } + + fn schema(&self) -> Arc { + // todo: may want store this schema as a field of QueryableBatch and + // only do this schema merge the first time it is call + + // Merge schema of all RecordBatches of the PerstingBatch + let batches: Vec> = + self.data.iter().map(|s| Arc::clone(&s.data)).collect(); + merge_record_batch_schemas(&batches) + } + + fn delete_predicates(&self) -> &[Arc] { + self.delete_predicates.as_ref() + } +} + +impl QueryChunk for QueryableBatch { + type Error = Error; + + // This function should not be used in PersistingBatch context + fn id(&self) -> ChunkId { + unimplemented!() + } + + // This function should not be used in PersistingBatch context + fn addr(&self) -> ChunkAddr { + unimplemented!() + } + + /// Returns the name of the table stored in this chunk + fn table_name(&self) -> &str { + &self.table_name + } + + /// Returns true if the chunk may contain a duplicate "primary + /// key" within itself + fn may_contain_pk_duplicates(&self) -> bool { + // always true because they are not deduplicated yet + true + } + + /// Returns the result of applying the `predicate` to the chunk + /// using an efficient, but inexact method, based on metadata. + /// + /// NOTE: This method is suitable for calling during planning, and + /// may return PredicateMatch::Unknown for certain types of + /// predicates. + fn apply_predicate_to_metadata( + &self, + _predicate: &Predicate, + ) -> Result { + Ok(PredicateMatch::Unknown) + } + + /// Returns a set of Strings with column names from the specified + /// table that have at least one row that matches `predicate`, if + /// the predicate can be evaluated entirely on the metadata of + /// this Chunk. Returns `None` otherwise + fn column_names( + &self, + _predicate: &Predicate, + _columns: Selection<'_>, + ) -> Result, Self::Error> { + Ok(None) + } + + /// Return a set of Strings containing the distinct values in the + /// specified columns. If the predicate can be evaluated entirely + /// on the metadata of this Chunk. Returns `None` otherwise + /// + /// The requested columns must all have String type. + fn column_values( + &self, + _column_name: &str, + _predicate: &Predicate, + ) -> Result, Self::Error> { + Ok(None) + } + + /// Provides access to raw `QueryChunk` data as an + /// asynchronous stream of `RecordBatch`es filtered by a *required* + /// predicate. Note that not all chunks can evaluate all types of + /// predicates and this function will return an error + /// if requested to evaluate with a predicate that is not supported + /// + /// This is the analog of the `TableProvider` in DataFusion + /// + /// The reason we can't simply use the `TableProvider` trait + /// directly is that the data for a particular Table lives in + /// several chunks within a partition, so there needs to be an + /// implementation of `TableProvider` that stitches together the + /// streams from several different `QueryChunk`s. + fn read_filter( + &self, + _predicate: &Predicate, // no needs because all data will be read for compaction + _selection: Selection<'_>, // no needs because all columns will be read and compact + ) -> Result { + let batches: Vec<_> = self.data.iter().map(|s| Arc::clone(&s.data)).collect(); + let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches); + Ok(Box::pin(stream)) + } + + /// Returns true if data of this chunk is sorted + fn is_sorted_on_pk(&self) -> bool { + false + } + + /// Returns the sort key of the chunk if any + fn sort_key(&self) -> Option> { + None + } + + /// Returns chunk type + fn chunk_type(&self) -> &str { + "PersistingBatch" + } + + // This function should not be used in PersistingBatch context + fn order(&self) -> ChunkOrder { + unimplemented!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow::{array::{ArrayRef, DictionaryArray, Int64Array, StringArray, BooleanArray, TimestampNanosecondArray, UInt64Array, Float64Array}, datatypes::{Int32Type, DataType, TimeUnit}}; + + #[tokio::test] + async fn test_merge_batch_schema() { + + // Merge schema of the batches + // The fileds in the schema are sorted by column name + let batches = create_batches(); + let merged_schema = (&*merge_record_batch_schemas(&batches)).clone(); + + // Expected Arrow schema + let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("dict", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true), + arrow::datatypes::Field::new("int64", DataType::Int64, true), + arrow::datatypes::Field::new("string", DataType::Utf8, true), + arrow::datatypes::Field::new("bool", DataType::Boolean, true), + arrow::datatypes::Field::new("time", DataType::Timestamp(TimeUnit::Nanosecond, None), false), + arrow::datatypes::Field::new("uint64", DataType::UInt64, false), + arrow::datatypes::Field::new("float64", DataType::Float64, true), + ])); + let expected_schema = Schema::try_from(arrow_schema).unwrap().sort_fields_by_name(); + + assert_eq!( + expected_schema, merged_schema, + "\nExpected:\n{:#?}\nActual:\n{:#?}", + expected_schema, merged_schema + ); + } + + // ---------------------------------------------------------------------------------------------- + // Data for testing + // Create pure RecordBatches without knowledge of Influx datatype + fn create_batches() -> Vec> { + // Batch 1: & 3 rows + let dict_array: ArrayRef = Arc::new( + vec![Some("a"), None, Some("b")] + .into_iter() + .collect::>(), + ); + let int64_array: ArrayRef = + Arc::new([Some(-1), None, Some(2)].iter().collect::()); + let string_array: ArrayRef = Arc::new( + vec![Some("foo"), Some("and"), Some("bar")] + .into_iter() + .collect::(), + ); + let bool_array: ArrayRef = Arc::new( + [Some(true), None, Some(false)] + .iter() + .collect::(), + ); + let ts_array: ArrayRef = Arc::new( + [Some(150), Some(200), Some(1526823730000000000)] + .iter() + .collect::(), + ); + let batch1 = RecordBatch::try_from_iter_with_nullable(vec![ + ("dict", dict_array, true), + ("int64", int64_array, true), + ("string", string_array, true), + ("bool", bool_array, true), + ("time", ts_array, false), // not null + ]) + .unwrap(); + + + // Batch 2: & 2 rows + let dict_array: ArrayRef = Arc::new( + vec![None, Some("d")] + .into_iter() + .collect::>(), + ); + let uint64_array: ArrayRef = + Arc::new([Some(1), Some(2)].iter().collect::()); // not null + let float64_array: ArrayRef = Arc::new( + [Some(1.0), Some(2.0)] + .iter() + .collect::(), + ); + let string_array: ArrayRef = Arc::new( + vec![Some("foo"), Some("bar")] + .into_iter() + .collect::(), + ); + let bool_array: ArrayRef = Arc::new( + [Some(true), None] + .iter() + .collect::(), + ); + let ts_array: ArrayRef = Arc::new( + [Some(100), Some(1626823730000000000)] // not null + .iter() + .collect::(), + ); + let batch2 = RecordBatch::try_from_iter_with_nullable (vec![ + ("dict", dict_array, true), + ("uint64", uint64_array, false), // not null + ("float64", float64_array, true), + ("string", string_array, true), + ("bool", bool_array, true), + ("time", ts_array, false), // not null + ]) + .unwrap(); + + vec![Arc::new(batch1), Arc::new(batch2)] + } +} \ No newline at end of file From 939ea536d4fec6ef72c4eedf08aea3cf40d3d5f0 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 24 Jan 2022 12:00:23 -0500 Subject: [PATCH 5/8] feat: add but ignore a few compaction tests --- ingester/Cargo.toml | 1 + ingester/src/compact.rs | 201 +++++++++++++++++++++++++++------------- ingester/src/data.rs | 4 +- ingester/src/lib.rs | 1 - ingester/src/query.rs | 85 +++++++++-------- 5 files changed, 187 insertions(+), 105 deletions(-) diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index e2af2ad35b..901ad78cef 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] arrow = { version = "7.0", features = ["prettyprint"] } +arrow_util = { path = "../arrow_util" } datafusion = { path = "../datafusion" } data_types = { path = "../data_types" } iox_catalog = { path = "../iox_catalog" } diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 430a9d4a03..be93b5055e 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -1,30 +1,15 @@ //! This module is responsible for compacting Ingester's data +use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use query::{ + exec::{Executor, ExecutorType}, + frontend::reorg::ReorgPlanner, + QueryChunkMeta, +}; +use snafu::{ResultExt, Snafu}; use std::sync::Arc; -use arrow::record_batch::RecordBatch; -use data_types::{ - chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}, - delete_predicate::DeletePredicate, - partition_metadata::TableSummary, -}; -use datafusion::{ - error::DataFusionError, - physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}, -}; -use predicate::{ - delete_predicate::parse_delete_predicate, - predicate::{Predicate, PredicateMatch}, -}; -use query::{ - exec::{stringset::StringSet, Executor, ExecutorType}, - frontend::reorg::ReorgPlanner, - QueryChunk, QueryChunkMeta, -}; -use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; -use snafu::{ResultExt, Snafu}; - -use crate::data::{PersistingBatch, QueryableBatch}; +use crate::data::QueryableBatch; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -58,7 +43,6 @@ pub async fn compact( executor: &Executor, data: Arc, ) -> Result { - // Build logical plan for compaction let ctx = executor.new_context(ExecutorType::Reorg); let logical_plan = ReorgPlanner::new() @@ -79,7 +63,6 @@ pub async fn compact( Ok(output_stream) } - #[cfg(test)] mod tests { use std::num::NonZeroU64; @@ -88,68 +71,104 @@ mod tests { use super::*; - use arrow::{array::{ArrayRef, DictionaryArray, Int64Array, StringArray, BooleanArray, TimestampNanosecondArray, UInt64Array, Float64Array}, datatypes::{Int32Type, DataType, TimeUnit}}; - use iox_catalog::interface::{SequenceNumber, SequencerId, TableId, PartitionId}; - use query::test::{TestChunk, raw_data}; - use uuid::Uuid; - - + use arrow::record_batch::RecordBatch; + use arrow_util::assert_batches_eq; + use iox_catalog::interface::SequenceNumber; + use query::test::{raw_data, TestChunk}; + #[ignore] #[tokio::test] - async fn test_compact() { - let batches = create_batches_with_influxtype().await; - let persisting_batch = make_queryable_batch(batches); + async fn test_compact_no_dedup_no_delete() { + let batches = create_record_batches_with_influxtype_no_duplicates().await; + let compact_batch = make_queryable_batch(batches); let exc = Executor::new(1); - - let stream = compact(&exc, persisting_batch).await.unwrap(); - let output_batches = datafusion::physical_plan::common::collect(stream).await.unwrap(); + let stream = compact(&exc, compact_batch).await.unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); println!("output_batches: {:#?}", output_batches); - - + + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[ignore] + #[tokio::test] + async fn test_compact_with_dedup_no_delete() { + let batches = create_batches_with_influxtype().await; + let compact_batch = make_queryable_batch(batches); + let exc = Executor::new(1); + + let stream = compact(&exc, compact_batch).await.unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // println!("output_batches: {:#?}", output_batches); // let table = pretty_format_batches(&[batch]).unwrap(); - - // let expected = vec![ - // "+------+-------+--------+---------+-------+--------+--------------------------------+", - // "| dict | int64 | uint64 | float64 | bool | string | time |", - // "+------+-------+--------+---------+-------+--------+--------------------------------+", - // "| a | -1 | 1 | 1 | true | foo | |", - // "| | | | | | | 1970-01-01T00:00:00.000000100Z |", - // "| b | 2 | 2 | 2 | false | bar | 2021-07-20T23:28:50Z |", - // "+------+-------+--------+---------+-------+--------+--------------------------------+", - // ]; - + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); } // ---------------------------------------------------------------------------------------------- // Data for testing - pub fn make_queryable_batch<'a>(batches: Vec>) -> Arc { + pub fn make_queryable_batch(batches: Vec>) -> Arc { // make snapshots for the bacthes let mut snapshots = vec![]; let mut seq_num = 1; for batch in batches { let seq = SequenceNumber::new(seq_num); - snapshots.push(make_snapshot_batch(batch, seq, seq )); - seq_num = seq_num + 1; + snapshots.push(make_snapshot_batch(batch, seq, seq)); + seq_num += 1; } Arc::new(QueryableBatch::new("test_table", snapshots, vec![])) } - pub fn make_snapshot_batch(batch: Arc, min: SequenceNumber, max: SequenceNumber) -> Arc { + pub fn make_snapshot_batch( + batch: Arc, + min: SequenceNumber, + max: SequenceNumber, + ) -> Arc { Arc::new(SnapshotBatch { min_sequencer_number: min, max_sequencer_number: max, data: batch, }) } - - // RecordBatches with knowledge of influx metadata - pub async fn create_batches_with_influxtype() -> Vec> { - // Use the available TestChunk to create chunks and then convert them to raw RecordBatches - let mut batches = vec![]; - // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within + pub async fn create_record_batches_with_influxtype_no_duplicates() -> Vec> { let chunk1 = Arc::new( TestChunk::new("t") .with_id(1) @@ -169,7 +188,57 @@ mod tests { .with_i64_field_column("field_int") .with_ten_rows_of_data_some_duplicates(), ); - let batch1 = raw_data(&vec![chunk1]).await[0].clone(); + let batches = raw_data(&[chunk1]).await; + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + + // Output data look like this: + // let expected = vec![ + // "+-----------+------+-------------------------------+", + // "| field_int | tag1 | time |", + // "+-----------+------+-------------------------------+", + // "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + // "| 10 | MT | 1970-01-01 00:00:00.000007 |", + // "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + // "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + // "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + // "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + // "| 20 | MT | 1970-01-01 00:00:00.000007 |", + // "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + // "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + // "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + // "+-----------+------+-------------------------------+", + // ]; + + batches + } + + // RecordBatches with knowledge of influx metadata + pub async fn create_batches_with_influxtype() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within + // todo: may want to simplify the below data of test chunks. these are reuse the current code that cover many commpaction cases + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column_with_full_stats( + Some(5), + Some(7000), + 10, + Some(NonZeroU64::new(7).unwrap()), + ) + .with_tag_column_with_full_stats( + "tag1", + Some("AL"), + Some("MT"), + 10, + Some(NonZeroU64::new(3).unwrap()), + ) + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); //println!("BATCH1: {:#?}", batch1); batches.push(Arc::new(batch1)); @@ -193,7 +262,7 @@ mod tests { .with_i64_field_column("field_int") .with_five_rows_of_data(), ); - let batch2 = raw_data(&vec![chunk2]).await[0].clone(); + let batch2 = raw_data(&[chunk2]).await[0].clone(); //println!("BATCH2: {:#?}", batch2); batches.push(Arc::new(batch2)); @@ -217,7 +286,7 @@ mod tests { .with_i64_field_column("field_int") .with_three_rows_of_data(), ); - let batch3 = raw_data(&vec![chunk3]).await[0].clone(); + let batch3 = raw_data(&[chunk3]).await[0].clone(); //println!("BATCH3: {:#?}", batch3); batches.push(Arc::new(batch3)); @@ -242,10 +311,12 @@ mod tests { .with_may_contain_pk_duplicates(true) .with_four_rows_of_data(), ); - let batch4 = raw_data(&vec![chunk4]).await[0].clone(); + let batch4 = raw_data(&[chunk4]).await[0].clone(); //println!("BATCH4: {:#?}", batch4); batches.push(Arc::new(batch4)); + // todo: show what output data look like in comments for easier debug + batches } -} \ No newline at end of file +} diff --git a/ingester/src/data.rs b/ingester/src/data.rs index d6e444f743..c682c23b39 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -200,7 +200,7 @@ pub struct PersistingBatch { pub data: Arc, } -// Queryable data used for both query and persistence +/// Queryable data used for both query and persistence #[derive(Debug)] pub struct QueryableBatch { /// data @@ -213,6 +213,6 @@ pub struct QueryableBatch { /// Note: this is needed here to return its reference for a trait function pub delete_predicates: Vec>, - //// This is needed to return a reference for a trait function + /// This is needed to return a reference for a trait function pub table_name: String, } diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 77c5561e23..097e3242e0 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -13,7 +13,6 @@ )] #[allow(dead_code)] - pub mod compact; pub mod data; pub mod query; diff --git a/ingester/src/query.rs b/ingester/src/query.rs index b97bd2c47a..aa33d25f6e 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -3,15 +3,22 @@ use std::sync::Arc; use arrow::record_batch::RecordBatch; -use data_types::{partition_metadata::TableSummary, delete_predicate::DeletePredicate, chunk_metadata::{ChunkId, ChunkAddr, ChunkOrder}}; -use datafusion::physical_plan::{SendableRecordBatchStream, common::SizedRecordBatchStream}; +use data_types::{ + chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}, + delete_predicate::DeletePredicate, + partition_metadata::TableSummary, +}; +use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use iox_catalog::interface::Tombstone; -use predicate::{delete_predicate::parse_delete_predicate, predicate::{Predicate, PredicateMatch}}; -use query::{QueryChunkMeta, QueryChunk, exec::stringset::StringSet}; -use schema::{Schema, merge::SchemaMerger, selection::Selection, sort::SortKey}; +use predicate::{ + delete_predicate::parse_delete_predicate, + predicate::{Predicate, PredicateMatch}, +}; +use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; +use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; use snafu::Snafu; -use crate::data::{SnapshotBatch, QueryableBatch}; +use crate::data::{QueryableBatch, SnapshotBatch}; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -28,7 +35,6 @@ pub enum Error { /// A specialized `Error` for Ingester's Query errors pub type Result = std::result::Result; - // todo: move this function to a more appropriate crate /// Return the merged schema for RecordBatches /// @@ -44,10 +50,9 @@ pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { Arc::new(merger.build()) } - impl QueryableBatch { + /// Initilaize a QueryableBatch pub fn new(table_name: &str, data: Vec>, deletes: Vec) -> Self { - let mut delete_predicates = vec![]; for delete in &deletes { let delete_predicate = Arc::new( @@ -170,7 +175,7 @@ impl QueryChunk for QueryableBatch { fn read_filter( &self, _predicate: &Predicate, // no needs because all data will be read for compaction - _selection: Selection<'_>, // no needs because all columns will be read and compact + _selection: Selection<'_>, // no needs because all columns will be read and compact ) -> Result { let batches: Vec<_> = self.data.iter().map(|s| Arc::clone(&s.data)).collect(); let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), batches); @@ -202,11 +207,16 @@ impl QueryChunk for QueryableBatch { mod tests { use super::*; - use arrow::{array::{ArrayRef, DictionaryArray, Int64Array, StringArray, BooleanArray, TimestampNanosecondArray, UInt64Array, Float64Array}, datatypes::{Int32Type, DataType, TimeUnit}}; + use arrow::{ + array::{ + ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, + TimestampNanosecondArray, UInt64Array, + }, + datatypes::{DataType, Int32Type, TimeUnit}, + }; #[tokio::test] async fn test_merge_batch_schema() { - // Merge schema of the batches // The fileds in the schema are sorted by column name let batches = create_batches(); @@ -214,16 +224,26 @@ mod tests { // Expected Arrow schema let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![ - arrow::datatypes::Field::new("dict", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true), + arrow::datatypes::Field::new( + "dict", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), arrow::datatypes::Field::new("int64", DataType::Int64, true), arrow::datatypes::Field::new("string", DataType::Utf8, true), arrow::datatypes::Field::new("bool", DataType::Boolean, true), - arrow::datatypes::Field::new("time", DataType::Timestamp(TimeUnit::Nanosecond, None), false), + arrow::datatypes::Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), arrow::datatypes::Field::new("uint64", DataType::UInt64, false), arrow::datatypes::Field::new("float64", DataType::Float64, true), ])); - let expected_schema = Schema::try_from(arrow_schema).unwrap().sort_fields_by_name(); - + let expected_schema = Schema::try_from(arrow_schema) + .unwrap() + .sort_fields_by_name(); + assert_eq!( expected_schema, merged_schema, "\nExpected:\n{:#?}\nActual:\n{:#?}", @@ -237,14 +257,14 @@ mod tests { fn create_batches() -> Vec> { // Batch 1: & 3 rows let dict_array: ArrayRef = Arc::new( - vec![Some("a"), None, Some("b")] + vec![Some("a"), None, Some("b")] .into_iter() .collect::>(), ); let int64_array: ArrayRef = Arc::new([Some(-1), None, Some(2)].iter().collect::()); let string_array: ArrayRef = Arc::new( - vec![Some("foo"), Some("and"), Some("bar")] + vec![Some("foo"), Some("and"), Some("bar")] .into_iter() .collect::(), ); @@ -263,42 +283,33 @@ mod tests { ("int64", int64_array, true), ("string", string_array, true), ("bool", bool_array, true), - ("time", ts_array, false), // not null + ("time", ts_array, false), // not null ]) .unwrap(); - // Batch 2: & 2 rows let dict_array: ArrayRef = Arc::new( - vec![None, Some("d")] + vec![None, Some("d")] .into_iter() .collect::>(), - ); - let uint64_array: ArrayRef = - Arc::new([Some(1), Some(2)].iter().collect::()); // not null - let float64_array: ArrayRef = Arc::new( - [Some(1.0), Some(2.0)] - .iter() - .collect::(), ); + let uint64_array: ArrayRef = Arc::new([Some(1), Some(2)].iter().collect::()); // not null + let float64_array: ArrayRef = + Arc::new([Some(1.0), Some(2.0)].iter().collect::()); let string_array: ArrayRef = Arc::new( - vec![Some("foo"), Some("bar")] + vec![Some("foo"), Some("bar")] .into_iter() .collect::(), ); - let bool_array: ArrayRef = Arc::new( - [Some(true), None] - .iter() - .collect::(), - ); + let bool_array: ArrayRef = Arc::new([Some(true), None].iter().collect::()); let ts_array: ArrayRef = Arc::new( [Some(100), Some(1626823730000000000)] // not null .iter() .collect::(), ); - let batch2 = RecordBatch::try_from_iter_with_nullable (vec![ + let batch2 = RecordBatch::try_from_iter_with_nullable(vec![ ("dict", dict_array, true), - ("uint64", uint64_array, false), // not null + ("uint64", uint64_array, false), // not null ("float64", float64_array, true), ("string", string_array, true), ("bool", bool_array, true), @@ -308,4 +319,4 @@ mod tests { vec![Arc::new(batch1), Arc::new(batch2)] } -} \ No newline at end of file +} From 5f98a07b7f0232c4a88bf029dbace82dbde3525e Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 24 Jan 2022 12:03:02 -0500 Subject: [PATCH 6/8] chore: add Corgo.lock --- Cargo.lock | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a747b1329a..ebdeda30da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1858,10 +1858,17 @@ name = "ingester" version = "0.1.0" dependencies = [ "arrow", + "arrow_util", + "data_types", + "datafusion 0.1.0", "iox_catalog", "mutable_batch", "parking_lot", + "predicate", + "query", + "schema", "snafu", + "tokio", "uuid", "workspace-hack", ] From c6a195b0e659962e1be4f2e877ede0556b218fb3 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 24 Jan 2022 13:05:44 -0500 Subject: [PATCH 7/8] refactor: address review comments --- ingester/src/compact.rs | 6 +++--- ingester/src/data.rs | 2 +- ingester/src/lib.rs | 5 ++--- ingester/src/query.rs | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index be93b5055e..4ffc89053d 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -160,12 +160,12 @@ mod tests { batch: Arc, min: SequenceNumber, max: SequenceNumber, - ) -> Arc { - Arc::new(SnapshotBatch { + ) -> SnapshotBatch { + SnapshotBatch { min_sequencer_number: min, max_sequencer_number: max, data: batch, - }) + } } pub async fn create_record_batches_with_influxtype_no_duplicates() -> Vec> { diff --git a/ingester/src/data.rs b/ingester/src/data.rs index e3e30641e2..3f3fdf3003 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -202,7 +202,7 @@ pub struct PersistingBatch { #[derive(Debug)] pub struct QueryableBatch { /// data - pub data: Vec>, + pub data: Vec, /// Tomstones to be applied on data pub deletes: Vec, diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 68fd09a53d..544aab0579 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -11,10 +11,9 @@ clippy::use_self, clippy::clone_on_ref_ptr )] - -#[allow(dead_code)] +#![allow(dead_code)] pub mod compact; pub mod data; -pub mod query; pub mod handler; +pub mod query; pub mod server; diff --git a/ingester/src/query.rs b/ingester/src/query.rs index aa33d25f6e..fd62d5d5c3 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -52,7 +52,7 @@ pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { impl QueryableBatch { /// Initilaize a QueryableBatch - pub fn new(table_name: &str, data: Vec>, deletes: Vec) -> Self { + pub fn new(table_name: &str, data: Vec, deletes: Vec) -> Self { let mut delete_predicates = vec![]; for delete in &deletes { let delete_predicate = Arc::new( From f9c1e80a7f84dc08a6050fdc6cbeb0cb8310ee0b Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 24 Jan 2022 13:29:01 -0500 Subject: [PATCH 8/8] chore: update thread_local chore: update thread_local --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04d8a545b8..300bf8bbc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4610,9 +4610,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" dependencies = [ "once_cell", ]