diff --git a/Cargo.lock b/Cargo.lock index d3451f7c59..2a660261d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2647,6 +2647,7 @@ dependencies = [ "arrow", "bytes", "data_types2", + "datafusion 0.1.0", "iox_catalog", "iox_object_store", "iox_time", @@ -4623,6 +4624,7 @@ dependencies = [ "arrow_util", "async-trait", "data_types", + "data_types2", "datafusion 0.1.0", "datafusion_util", "db", diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index 6d2775ef0b..9ba97bafa6 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -9,6 +9,7 @@ description = "IOx NG test utils and tests" arrow = "12" bytes = "1.0" data_types2 = { path = "../data_types2" } +datafusion = { path = "../datafusion" } iox_catalog = { path = "../iox_catalog" } iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 73055de815..840d99bd88 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -10,6 +10,7 @@ use data_types2::{ ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId, }; +use datafusion::physical_plan::metrics::Count; use iox_catalog::{ interface::{Catalog, PartitionRepo, INITIAL_COMPACTION_LEVEL}, mem::MemCatalog, @@ -19,7 +20,7 @@ use iox_time::{MockProvider, Time, TimeProvider}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use object_store::{DynObjectStore, ObjectStoreImpl}; use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData}; -use query::exec::Executor; +use query::{exec::Executor, provider::RecordBatchDeduplicator, util::arrow_sort_key_exprs}; use schema::{ selection::Selection, sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder}, @@ -472,6 +473,7 @@ impl TestPartition { let row_count = record_batch.num_rows(); assert!(row_count > 0, "Parquet file must have at least 1 row"); let (record_batch, sort_key) = sort_batch(record_batch, schema); + let record_batch = dedup_batch(record_batch, &sort_key); let object_store_id = Uuid::new_v4(); let min_sequence_number = SequenceNumber::new(min_seq); @@ -755,3 +757,16 @@ fn sort_batch(record_batch: RecordBatch, schema: Schema) -> (RecordBatch, SortKe (record_batch, sort_key) } + +fn dedup_batch(record_batch: RecordBatch, sort_key: &SortKey) -> RecordBatch { + let schema = record_batch.schema(); + let sort_keys = arrow_sort_key_exprs(sort_key, &schema); + let mut deduplicator = RecordBatchDeduplicator::new(sort_keys, Count::default(), None); + + let mut batches = vec![deduplicator.push(record_batch).unwrap()]; + if let Some(batch) = deduplicator.finish().unwrap() { + batches.push(batch); + } + + RecordBatch::concat(&schema, &batches).unwrap() +} diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index d41c0b295f..7400c4a7d5 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -6,18 +6,18 @@ use parking_lot::Mutex; use super::IngesterConnection; /// IngesterConnection for testing -#[derive(Debug)] -pub(crate) struct MockIngesterConnection { +#[derive(Debug, Default)] +pub struct MockIngesterConnection { next_response: Mutex>>>>, } impl MockIngesterConnection { + /// Create connection w/ an empty response. pub fn new() -> Self { - Self { - next_response: Mutex::new(None), - } + Self::default() } + /// Set next response for this connection. pub fn next_response(&self, response: super::Result>>) { *self.next_response.lock() = Some(response); } diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 2f2efc2173..0366a1f347 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -28,7 +28,8 @@ pub use cache::CatalogCache as QuerierCatalogCache; pub use database::QuerierDatabase; pub use handler::{QuerierHandler, QuerierHandlerImpl}; pub use ingester::{ - create_ingester_connection, create_ingester_connection_for_testing, IngesterConnection, + create_ingester_connection, create_ingester_connection_for_testing, Error as IngesterError, + IngesterConnection, IngesterPartition, }; pub use namespace::QuerierNamespace; pub use server::QuerierServer; diff --git a/query/src/provider.rs b/query/src/provider.rs index 1623e206d5..9e51c792d8 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -38,6 +38,7 @@ pub mod overlap; mod physical; use self::overlap::{group_potential_duplicates, group_potential_duplicates_og}; pub(crate) use deduplicate::DeduplicateExec; +pub use deduplicate::RecordBatchDeduplicator; pub(crate) use physical::IOxReadFilterNode; #[derive(Debug, Snafu)] diff --git a/query/src/provider/deduplicate.rs b/query/src/provider/deduplicate.rs index c2c74c66ce..bcbb821fb5 100644 --- a/query/src/provider/deduplicate.rs +++ b/query/src/provider/deduplicate.rs @@ -11,7 +11,7 @@ use arrow::{ use async_trait::async_trait; use datafusion_util::AdapterStream; -use self::algo::RecordBatchDeduplicator; +pub use self::algo::RecordBatchDeduplicator; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, diff --git a/query/src/provider/deduplicate/algo.rs b/query/src/provider/deduplicate/algo.rs index 5dd2c69a48..12160921e8 100644 --- a/query/src/provider/deduplicate/algo.rs +++ b/query/src/provider/deduplicate/algo.rs @@ -22,7 +22,7 @@ use crate::provider::deduplicate::key_ranges::key_ranges; // [`RecordBatch`]es which are already sorted on a primary key, // including primary keys which straddle RecordBatch boundaries #[derive(Debug)] -pub(crate) struct RecordBatchDeduplicator { +pub struct RecordBatchDeduplicator { sort_keys: Vec, last_batch: Option, num_dupes: metrics::Count, diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index aef66e378e..dc8e8fb23d 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -8,8 +8,10 @@ description = "Tests of the query engine against different database configuratio # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arrow = { version = "12", features = ["prettyprint"] } async-trait = "0.1" data_types = { path = "../data_types" } +data_types2 = { path = "../data_types2" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } db = { path = "../db" } diff --git a/query_tests/cases/in/delete_three_delete_three_chunks.expected b/query_tests/cases/in/delete_three_delete_three_chunks.expected index f8a6e7d395..b21247a22b 100644 --- a/query_tests/cases/in/delete_three_delete_three_chunks.expected +++ b/query_tests/cases/in/delete_three_delete_three_chunks.expected @@ -108,6 +108,7 @@ | 1970-01-01T00:00:00.000000080Z | +--------------------------------+ -- SQL: SELECT foo, min(time) from cpu group by foo; +-- Results After Sorting +-----+--------------------------------+ | foo | MIN(cpu.time) | +-----+--------------------------------+ diff --git a/query_tests/cases/in/delete_three_delete_three_chunks.sql b/query_tests/cases/in/delete_three_delete_three_chunks.sql index ff76876780..3423babffa 100644 --- a/query_tests/cases/in/delete_three_delete_three_chunks.sql +++ b/query_tests/cases/in/delete_three_delete_three_chunks.sql @@ -28,6 +28,7 @@ SELECT max(foo) from cpu; SELECT min(time) from cpu; SELECT max(time) from cpu; +-- IOX_COMPARE: sorted SELECT foo, min(time) from cpu group by foo; SELECT bar, max(time) as max_time from cpu group by bar order by bar, max_time; SELECT max(time) as max_time from cpu group by bar order by max_time; @@ -72,4 +73,4 @@ SELECT time, bar from cpu where bar >= 1.0 order by bar, time; SELECT * from cpu where foo = 'you' order by bar, foo, time; -SELECT min(bar) as mi, max(time) as ma from cpu where foo = 'you' order by mi, ma; \ No newline at end of file +SELECT min(bar) as mi, max(time) as ma from cpu where foo = 'you' order by mi, ma; diff --git a/query_tests/cases/in/duplicates.expected b/query_tests/cases/in/old_duplicates.expected similarity index 99% rename from query_tests/cases/in/duplicates.expected rename to query_tests/cases/in/old_duplicates.expected index 6a276e1b4c..4d30b8cc77 100644 --- a/query_tests/cases/in/duplicates.expected +++ b/query_tests/cases/in/old_duplicates.expected @@ -1,4 +1,4 @@ --- Test Setup: OneMeasurementFourChunksWithDuplicates +-- Test Setup: OldOneMeasurementFourChunksWithDuplicates -- SQL: explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city; +---------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | diff --git a/query_tests/cases/in/duplicates.sql b/query_tests/cases/in/old_duplicates.sql similarity index 86% rename from query_tests/cases/in/duplicates.sql rename to query_tests/cases/in/old_duplicates.sql index a7da9a28de..ad7054ce94 100644 --- a/query_tests/cases/in/duplicates.sql +++ b/query_tests/cases/in/old_duplicates.sql @@ -1,5 +1,5 @@ -- Test for predicate push down explains --- IOX_SETUP: OneMeasurementFourChunksWithDuplicates +-- IOX_SETUP: OldOneMeasurementFourChunksWithDuplicates -- Plan with order by explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city; diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index 9e4f24bfd3..e8e6c48506 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -116,20 +116,6 @@ async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() { .expect("flush worked"); } -#[tokio::test] -// Tests from "duplicates.sql", -async fn test_cases_duplicates_sql() { - let input_path = Path::new("cases").join("in").join("duplicates.sql"); - let mut runner = Runner::new(); - runner - .run(input_path) - .await - .expect("test failed"); - runner - .flush() - .expect("flush worked"); -} - #[tokio::test] // Tests from "new_sql_system_tables.sql", async fn test_cases_new_sql_system_tables_sql() { @@ -158,6 +144,20 @@ async fn test_cases_no_stats_plans_sql() { .expect("flush worked"); } +#[tokio::test] +// Tests from "old_duplicates.sql", +async fn test_cases_old_duplicates_sql() { + let input_path = Path::new("cases").join("in").join("old_duplicates.sql"); + let mut runner = Runner::new(); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); +} + #[tokio::test] // Tests from "old_sql_system_tables.sql", async fn test_cases_old_sql_system_tables_sql() { diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index e834c7da62..c4934b2f5e 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -55,6 +55,7 @@ pub fn get_all_setups() -> &'static HashMap> { register_setup!(OldTwoMeasurementsManyFieldsTwoChunks), register_setup!(OldTwoMeasurementsManyFieldsOneRubChunk), register_setup!(OneMeasurementFourChunksWithDuplicates), + register_setup!(OldOneMeasurementFourChunksWithDuplicates), register_setup!(OneMeasurementAllChunksDropped), register_setup!(ChunkOrder), register_setup!(ThreeDeleteThreeChunks), diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index 99eb828285..3347cb6d06 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -5,7 +5,7 @@ use data_types::timestamp::TimestampRange; use async_trait::async_trait; -use super::util::{make_n_chunks_scenario_new, ChunkDataNew, PredNew}; +use super::util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew}; use super::{DbScenario, DbSetup}; use crate::scenarios::util::{ all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario_old, @@ -427,21 +427,46 @@ impl DbSetup for ThreeDeleteThreeChunks { &mut make_n_chunks_scenario_new(&[ ChunkDataNew { lp_lines: lp_lines_1, - preds: vec![PredNew::end(&pred1)], + preds: vec![ + PredNew { + predicate: &pred1, + delete_time: DeleteTimeNew::End, + }, + PredNew { + predicate: &pred2, + delete_time: DeleteTimeNew::End, + }, + PredNew { + predicate: &pred3, + delete_time: DeleteTimeNew::End, + }, + ], delete_table_name: Some(table_name), partition_key, ..Default::default() }, ChunkDataNew { lp_lines: lp_lines_2, - preds: vec![PredNew::end(&pred2)], + preds: vec![ + PredNew { + predicate: &pred2, + delete_time: DeleteTimeNew::End, + }, + PredNew { + predicate: &pred3, + delete_time: DeleteTimeNew::End, + }, + ], delete_table_name: Some(table_name), partition_key, ..Default::default() }, ChunkDataNew { lp_lines: lp_lines_3, - preds: vec![PredNew::end(&pred3)], + preds: vec![PredNew { + predicate: &pred3, + delete_time: DeleteTimeNew::End, + }], delete_table_name: Some(table_name), partition_key, ..Default::default() diff --git a/query_tests/src/scenarios/library.rs b/query_tests/src/scenarios/library.rs index 6d145c6506..e3cb1327dd 100644 --- a/query_tests/src/scenarios/library.rs +++ b/query_tests/src/scenarios/library.rs @@ -850,6 +850,25 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates { } } +#[derive(Debug)] +/// Setup for four chunks with duplicates for deduplicate tests +/// +/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables. +pub struct OldOneMeasurementFourChunksWithDuplicates {} +#[async_trait] +impl DbSetup for OldOneMeasurementFourChunksWithDuplicates { + async fn make(&self) -> Vec { + let scenarios: Vec<_> = OneMeasurementFourChunksWithDuplicates {} + .make() + .await + .into_iter() + .filter(|s| s.scenario_name == "Data in four chunks with duplicates") + .collect(); + assert_eq!(scenarios.len(), 1); + scenarios + } +} + #[derive(Debug)] /// This has a single scenario with all the life cycle operations to /// test queries that depend on that diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 717b3259db..cbdf23df9f 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -1,6 +1,9 @@ //! This module contains util functions for testing scenarios use super::DbScenario; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; use data_types::{chunk_metadata::ChunkId, delete_predicate::DeletePredicate}; +use data_types2::SequenceNumber; use datafusion_util::batch_filter; use db::test_helpers::chunk_ids_rub; use db::{ @@ -12,11 +15,15 @@ use iox_catalog::interface::get_schema_by_name; use iox_tests::util::{TestCatalog, TestNamespace}; use itertools::Itertools; use predicate::PredicateBuilder; -use querier::{create_ingester_connection_for_testing, QuerierNamespace}; -use query::util::df_physical_expr_from_schema_and_expr; +use querier::{IngesterConnection, IngesterError, IngesterPartition, QuerierNamespace}; +use query::util::{ + compute_timenanosecond_min_max_for_one_record_batch, df_physical_expr_from_schema_and_expr, +}; use query::QueryChunk; use schema::merge::SchemaMerger; use schema::selection::Selection; +use std::any::Any; +use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Write; use std::{fmt::Display, sync::Arc}; @@ -64,6 +71,23 @@ impl<'a, 'b> ChunkDataNew<'a, 'b> { ..self } } + + /// Replace [`DeleteTimeNew::Begin`] and [`DeleteTimeNew::End`] with values that correspond to the linked [`ChunkStageNew`]. + fn replace_begin_and_end_delete_times(self) -> Self { + Self { + preds: self + .preds + .into_iter() + .map(|pred| { + pred.replace_begin_and_end_delete_times( + self.chunk_stage + .expect("chunk stage must be set at this point"), + ) + }) + .collect(), + ..self + } + } } #[derive(Debug, Clone, Copy)] @@ -99,16 +123,36 @@ impl ChunkStageOld { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ChunkStageNew { /// In parquet file. Parquet, + + /// In ingester. + Ingester, } impl Display for ChunkStageNew { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Parquet => write!(f, "Parquet"), + Self::Ingester => write!(f, "Ingester"), + } + } +} + +impl PartialOrd for ChunkStageNew { + fn partial_cmp(&self, other: &Self) -> Option { + match (self, other) { + // allow multiple parquet chunks (for the same partition). sequence numbers will be used for ordering. + (Self::Parquet, Self::Parquet) => Some(Ordering::Equal), + + // "parquet" chunks are older (i.e. come earlier) than chunks that still life in the ingester + (Self::Parquet, Self::Ingester) => Some(Ordering::Less), + (Self::Ingester, Self::Parquet) => Some(Ordering::Greater), + + // it's impossible for two chunks (for the same partition) to be in the ingester stage + (Self::Ingester, Self::Ingester) => None, } } } @@ -116,7 +160,7 @@ impl Display for ChunkStageNew { impl ChunkStageNew { /// return the list of all chunk types pub fn all() -> Vec { - vec![Self::Parquet] + vec![Self::Parquet, Self::Ingester] } } @@ -157,23 +201,18 @@ pub struct PredOld<'a> { #[derive(Debug, Clone)] pub struct PredNew<'a> { /// Delete predicate - predicate: &'a DeletePredicate, + pub predicate: &'a DeletePredicate, + /// At which chunk stage this predicate is applied - delete_time: DeleteTimeNew, + pub delete_time: DeleteTimeNew, } impl<'a> PredNew<'a> { - pub fn begin(predicate: &'a DeletePredicate) -> Self { + /// Replace [`DeleteTimeNew::Begin`] and [`DeleteTimeNew::End`] with values that correspond to the linked [`ChunkStageNew`]. + fn replace_begin_and_end_delete_times(self, stage: ChunkStageNew) -> Self { Self { - predicate, - delete_time: DeleteTimeNew::begin(), - } - } - - pub fn end(predicate: &'a DeletePredicate) -> Self { - Self { - predicate, - delete_time: DeleteTimeNew::end(), + delete_time: self.delete_time.replace_begin_and_end_delete_times(stage), + ..self } } } @@ -239,8 +278,19 @@ impl DeleteTimeOld { } } +/// Describes when a delete predicate was applied. +/// +/// # Ordering +/// Compared to [`ChunkStageNew`], the ordering here may seem a bit confusing. While the latest payload / LP data +/// resists in the ingester and is not yet available as a parquet file, the latest tombstones apply to parquet files and +/// were (past tense!) NOT applied while the LP data was in the ingester. #[derive(Debug, Clone, Copy, PartialEq)] pub enum DeleteTimeNew { + /// Special delete time which marks the first time that could be used from deletion. + /// + /// May depend on [`ChunkStageNew`]. + Begin, + /// Delete predicate is added while chunk is was still in ingester memory. Ingester { /// Flag if the tombstone also exists in the catalog. @@ -255,6 +305,11 @@ pub enum DeleteTimeNew { /// Delete predicate is added to chunks at their parquet stage Parquet, + + /// Special delete time which marks the last time that could be used from deletion. + /// + /// May depend on [`ChunkStageNew`]. + End, } impl DeleteTimeNew { @@ -270,23 +325,46 @@ impl DeleteTimeNew { }, Self::Parquet, ], + ChunkStageNew::Ingester => vec![ + Self::Ingester { + also_in_catalog: true, + }, + Self::Ingester { + also_in_catalog: false, + }, + ], } } - pub fn begin() -> Self { + /// Replace [`DeleteTimeNew::Begin`] and [`DeleteTimeNew::End`] with values that correspond to the linked [`ChunkStageNew`]. + fn replace_begin_and_end_delete_times(self, stage: ChunkStageNew) -> Self { + match self { + Self::Begin => Self::begin_for(stage), + Self::End => Self::end_for(stage), + other @ (Self::Ingester { .. } | Self::Parquet) => other, + } + } + + fn begin_for(_stage: ChunkStageNew) -> Self { Self::Ingester { also_in_catalog: true, } } - pub fn end() -> Self { - Self::Parquet + fn end_for(stage: ChunkStageNew) -> Self { + match stage { + ChunkStageNew::Ingester => Self::Ingester { + also_in_catalog: true, + }, + ChunkStageNew::Parquet => Self::Parquet, + } } } impl Display for DeleteTimeNew { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + Self::Begin => write!(f, "Begin"), Self::Ingester { also_in_catalog: false, } => write!(f, "Ingester w/o catalog entry"), @@ -294,6 +372,7 @@ impl Display for DeleteTimeNew { also_in_catalog: true, } => write!(f, "Ingester w/ catalog entry"), Self::Parquet => write!(f, "Parquet"), + Self::End => write!(f, "End"), } } } @@ -306,7 +385,7 @@ pub enum DeleteTime { impl DeleteTime { /// Return all DeleteTime at and after the given chunk stage - pub fn all_from_and_before(chunk_stage: ChunkStage) -> Vec { + fn all_from_and_before(chunk_stage: ChunkStage) -> Vec { match chunk_stage { ChunkStage::Old(chunk_stage) => DeleteTimeOld::all_from_and_before(chunk_stage) .into_iter() @@ -319,17 +398,17 @@ impl DeleteTime { } } - pub fn begin_for(chunk_stage: ChunkStage) -> Self { + fn begin_for(chunk_stage: ChunkStage) -> Self { match chunk_stage { ChunkStage::Old(_) => Self::Old(DeleteTimeOld::begin()), - ChunkStage::New(_) => Self::New(DeleteTimeNew::begin()), + ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::begin_for(chunk_stage)), } } - pub fn end_for(chunk_stage: ChunkStage) -> Self { + fn end_for(chunk_stage: ChunkStage) -> Self { match chunk_stage { ChunkStage::Old(_) => Self::Old(DeleteTimeOld::end()), - ChunkStage::New(_) => Self::New(DeleteTimeNew::end()), + ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::end_for(chunk_stage)), } } } @@ -698,9 +777,11 @@ async fn make_chunk_with_deletes_at_different_stages_new( delete_table_name, partition_key, }; - let scenario_name = make_ng_chunk(Arc::clone(&ns), chunk_data).await; + let mut ingester_connection = MockIngesterConnection::default(); + let scenario_name = + make_ng_chunk(Arc::clone(&ns), chunk_data, &mut ingester_connection, 0).await; - let db = make_querier_namespace(ns).await; + let db = make_querier_namespace(ns, ingester_connection).await; DbScenario { scenario_name, db } } @@ -1091,14 +1172,25 @@ pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec< for stages in ChunkStageNew::all() .into_iter() - .permutations(n_stages_unset) + .combinations_with_replacement(n_stages_unset) { + // filter out unordered stages + if !stages.windows(2).all(|stages| { + stages[0] + .partial_cmp(&stages[1]) + .map(|o| o.is_le()) + .unwrap_or_default() + }) { + continue; + } + let catalog = TestCatalog::new(); let ns = catalog.create_namespace("test_db").await; let mut scenario_name = format!("{} chunks:", chunks.len()); let mut stages_it = stages.iter(); + let mut ingester_connection = MockIngesterConnection::default(); - for chunk_data in chunks { + for (i, chunk_data) in chunks.iter().enumerate() { let mut chunk_data = chunk_data.clone(); if chunk_data.chunk_stage.is_none() { @@ -1106,14 +1198,23 @@ pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec< chunk_data = chunk_data.with_chunk_stage(*chunk_stage); } - let name = make_ng_chunk(Arc::clone(&ns), chunk_data).await; + let chunk_data = chunk_data.replace_begin_and_end_delete_times(); - write!(&mut scenario_name, "{}", name).unwrap(); + let sequence_number_offset = (i as i64) * 1000; + let name = make_ng_chunk( + Arc::clone(&ns), + chunk_data, + &mut ingester_connection, + sequence_number_offset, + ) + .await; + + write!(&mut scenario_name, ", {}", name).unwrap(); } assert!(stages_it.next().is_none(), "generated too many stages"); - let db = make_querier_namespace(ns).await; + let db = make_querier_namespace(ns, ingester_connection).await; scenarios.push(DbScenario { scenario_name, db }); } @@ -1159,7 +1260,12 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( vec![scenario1, scenario2] } -async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> String { +async fn make_ng_chunk( + ns: Arc, + chunk: ChunkDataNew<'_, '_>, + ingester_connection: &mut MockIngesterConnection, + sequence_number_offset: i64, +) -> String { use mutable_batch_lp::test_helpers::lp_to_mutable_batch; let chunk_stage = chunk.chunk_stage.expect("chunk stage should be set"); @@ -1245,6 +1351,59 @@ async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> S let mut batch = batch.to_arrow(Selection::All).unwrap(); match chunk_stage { + ChunkStageNew::Ingester => { + // process delete predicates + if let Some(delete_table_name) = chunk.delete_table_name { + if delete_table_name == table_name { + for pred in &chunk.preds { + match pred.delete_time { + DeleteTimeNew::Ingester { .. } => { + batch = materialize_delete_predicate(batch, pred.predicate); + } + other @ DeleteTimeNew::Parquet => { + panic!("Cannot have delete time '{other}' for ingester chunk") + } + DeleteTimeNew::Begin | DeleteTimeNew::End => { + unreachable!("Begin/end cases should have been replaced with concrete instances at this point") + } + } + } + } + } + + // create ingester partition + if batch.num_rows() > 0 { + let partition = partitions.get(&table_name).unwrap(); + let chunk_id = ChunkId::new(); + let namespace_name = Arc::from(ns.namespace.name.clone()); + let table_name = Arc::from(table_name); + let partition_id = partition.partition.id; + let sequencer_id = partition.sequencer.sequencer.id; + let old_gen_partition_key = Arc::from(format!( + "{}-{}", + sequencer_id, partition.partition.partition_key + )); + let expected_schema = Arc::new(schema); + let parquet_max_sequence_number = Some(SequenceNumber::new(i64::MAX)); // do NOT exclude any parquet files in other chunks + let tombstone_max_sequence_number = Some(SequenceNumber::new(i64::MAX)); // do NOT exclude any delete predicates in other chunks + let batches = vec![batch]; + ingester_connection.push(Arc::new( + IngesterPartition::try_new( + chunk_id, + namespace_name, + table_name, + partition_id, + sequencer_id, + old_gen_partition_key, + expected_schema, + parquet_max_sequence_number, + tombstone_max_sequence_number, + batches, + ) + .unwrap(), + )); + } + } ChunkStageNew::Parquet => { // model delete predicates that are materialized (applied) by the ingester, // during parquet file creation @@ -1253,35 +1412,27 @@ async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> S for pred in &chunk.preds { match pred.delete_time { DeleteTimeNew::Ingester { .. } => { - let mut predicate = PredicateBuilder::new().build(); - predicate.merge_delete_predicates(&[Arc::new( - pred.predicate.clone().into(), - )]); - if let Some(expr) = predicate.filter_expr() { - let df_phy_expr = df_physical_expr_from_schema_and_expr( - schema.as_arrow(), - expr, - ) - .unwrap(); - batch = batch_filter(&batch, &df_phy_expr).unwrap(); - } + batch = materialize_delete_predicate(batch, pred.predicate); } DeleteTimeNew::Parquet => { // will be attached AFTER the chunk was created } + DeleteTimeNew::Begin | DeleteTimeNew::End => { + unreachable!("Begin/end cases should have been replaced with concrete instances at this point") + } } } } } // create parquet file - let parquet_file_seq_number = 1000; + let parquet_file_seq_number = sequence_number_offset + (chunk.preds.len() as i64); if batch.num_rows() > 0 { let partition = partitions.get(&table_name).unwrap(); let min_seq = parquet_file_seq_number; let max_seq = parquet_file_seq_number; - let min_time = 0; - let max_time = 0; + let (min_time, max_time) = + compute_timenanosecond_min_max_for_one_record_batch(&batch).unwrap(); let file_size_bytes = None; // don't mock/override let creation_time = 1; partition @@ -1315,7 +1466,9 @@ async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> S DeleteTimeNew::Ingester { also_in_catalog: true, } => { - let sequence_number = i as i64; + let sequence_number = parquet_file_seq_number + - (chunk.preds.len() as i64) + + (i as i64); assert!(sequence_number < parquet_file_seq_number); let min_time = pred.predicate.range.start(); @@ -1348,6 +1501,9 @@ async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> S ) .await; } + DeleteTimeNew::Begin | DeleteTimeNew::End => { + unreachable!("Begin/end cases should have been replaced with concrete instances at this point") + } } } } @@ -1359,12 +1515,39 @@ async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> S let mut name = format!("NG Chunk {}", chunk_stage); let n_preds = chunk.preds.len(); if n_preds > 0 { - write!(name, " with {} deletes", n_preds).unwrap(); + let delete_names: Vec<_> = chunk + .preds + .iter() + .map(|p| p.delete_time.to_string()) + .collect(); + write!( + name, + " with {} delete(s) ({})", + n_preds, + delete_names.join(", ") + ) + .unwrap(); } name } -async fn make_querier_namespace(ns: Arc) -> Arc { +fn materialize_delete_predicate(record_batch: RecordBatch, pred: &DeletePredicate) -> RecordBatch { + let mut predicate = PredicateBuilder::new().build(); + predicate.merge_delete_predicates(&[Arc::new(pred.clone().into())]); + + if let Some(expr) = predicate.filter_expr() { + let df_phy_expr = + df_physical_expr_from_schema_and_expr(record_batch.schema(), expr).unwrap(); + batch_filter(&record_batch, &df_phy_expr).unwrap() + } else { + record_batch + } +} + +async fn make_querier_namespace( + ns: Arc, + ingester_connection: MockIngesterConnection, +) -> Arc { let mut repos = ns.catalog.catalog.repositories().await; let schema = Arc::new( get_schema_by_name(&ns.namespace.name, repos.as_mut()) @@ -1372,6 +1555,8 @@ async fn make_querier_namespace(ns: Arc) -> Arc .unwrap(), ); + let ingester_connection = Arc::new(ingester_connection); + Arc::new(QuerierNamespace::new_testing( ns.catalog.catalog(), ns.catalog.object_store(), @@ -1380,6 +1565,40 @@ async fn make_querier_namespace(ns: Arc) -> Arc ns.namespace.name.clone().into(), schema, ns.catalog.exec(), - create_ingester_connection_for_testing(), + ingester_connection, )) } + +#[derive(Debug, Default)] +struct MockIngesterConnection { + responses: Vec>, +} + +impl MockIngesterConnection { + fn push(&mut self, p: Arc) { + self.responses.push(p); + } +} + +#[async_trait] +impl IngesterConnection for MockIngesterConnection { + async fn partitions( + &self, + _namespace_name: Arc, + table_name: Arc, + _columns: Vec, + _predicate: &predicate::Predicate, + _expected_schema: Arc, + ) -> Result>, IngesterError> { + Ok(self + .responses + .iter() + .filter(|p| p.table_name() == table_name.as_ref()) + .cloned() + .collect()) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} diff --git a/query_tests/src/table_schema.rs b/query_tests/src/table_schema.rs index 26faedcbc5..fe15ee8ed0 100644 --- a/query_tests/src/table_schema.rs +++ b/query_tests/src/table_schema.rs @@ -71,7 +71,7 @@ async fn run_table_schema_test_case( } fn is_unsorted_chunk_type(chunk: &dyn QueryChunk) -> bool { - chunk.chunk_type() == "MUB" + (chunk.chunk_type() == "MUB") || (chunk.chunk_type() == "IngesterPartition") } #[tokio::test]