diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 26d657d827..203f9608d1 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -44,7 +44,7 @@ use std::{ sync::Mutex, }; -// Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle +// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle // & when delete predicates are applied // STRUCTs & ENUMs @@ -55,9 +55,9 @@ pub struct ChunkData<'a, 'b> { /// which stage this chunk will be created. /// - /// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be helpful when the test - /// scenario is not specific to the chunk stage. If this is used for multiple chunks, then all stage permutations - /// will be generated. + /// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be + /// helpful when the test scenario is not specific to the chunk stage. If this is used for + /// multiple chunks, then all stage permutations will be generated. pub chunk_stage: Option, /// Delete predicates @@ -80,7 +80,8 @@ impl<'a, 'b> ChunkData<'a, 'b> { } } - /// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`]. + /// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the + /// linked [`ChunkStage`]. fn replace_begin_and_end_delete_times(self) -> Self { Self { preds: self @@ -100,7 +101,7 @@ impl<'a, 'b> ChunkData<'a, 'b> { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ChunkStage { - /// In parquet file. + /// In parquet file, persisted by the ingester. Now managed by the querier. Parquet, /// In ingester. @@ -119,10 +120,12 @@ impl Display for ChunkStage { impl PartialOrd for ChunkStage { fn partial_cmp(&self, other: &Self) -> Option { match (self, other) { - // allow multiple parquet chunks (for the same partition). sequence numbers will be used for ordering. + // allow multiple parquet chunks (for the same partition). sequence numbers will be + // used for ordering. (Self::Parquet, Self::Parquet) => Some(Ordering::Equal), - // "parquet" chunks are older (i.e. come earlier) than chunks that still life in the ingester + // "parquet" chunks are older (i.e. come earlier) than chunks that still life in the + // ingester (Self::Parquet, Self::Ingester) => Some(Ordering::Less), (Self::Ingester, Self::Parquet) => Some(Ordering::Greater), @@ -149,7 +152,8 @@ pub struct Pred<'a> { } impl<'a> Pred<'a> { - /// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`]. + /// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the + /// linked [`ChunkStage`]. fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self { Self { delete_time: self.delete_time.replace_begin_and_end_delete_times(stage), @@ -168,9 +172,11 @@ impl<'a> Pred<'a> { /// Describes when a delete predicate was applied. /// /// # Ordering -/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest payload / LP data -/// resists in the ingester and is not yet available as a parquet file, the latest tombstones apply to parquet files and -/// were (past tense!) NOT applied while the LP data was in the ingester. +/// +/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest +/// payload / LP data resists in the ingester and is not yet available as a parquet file, the +/// latest tombstones apply to parquet files and were (past tense!) NOT applied while the LP data +/// was in the ingester. #[derive(Debug, Clone, Copy, PartialEq)] pub enum DeleteTime { /// Special delete time which marks the first time that could be used from deletion. @@ -182,11 +188,13 @@ pub enum DeleteTime { Ingester { /// Flag if the tombstone also exists in the catalog. /// - /// If this is set to `false`, then the tombstone was applied by the ingester but does not exist in the catalog - /// any longer. This can be because: + /// If this is set to `false`, then the tombstone was applied by the ingester but does not + /// exist in the catalog any longer. This can be because: /// - /// - the ingester decided that it doesn't need to be added to the catalog (this is currently/2022-04-21 not implemented!) - /// - the compactor pruned the tombstone from the catalog because there are zero affected parquet files + /// - the ingester decided that it doesn't need to be added to the catalog (this is + /// currently/2022-04-21 not implemented!) + /// - the compactor pruned the tombstone from the catalog because there are zero affected + /// parquet files also_in_catalog: bool, }, @@ -223,7 +231,8 @@ impl DeleteTime { } } - /// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`]. + /// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the + /// linked [`ChunkStage`]. fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self { match self { Self::Begin => Self::begin_for(stage), @@ -266,14 +275,14 @@ impl Display for DeleteTime { // -------------------------------------------------------------------------------------------- -/// All scenarios chunk stages and their life cycle moves for given set of delete predicates. +/// All scenarios of chunk stages and their lifecycle moves for a given set of delete predicates. /// If the delete predicates are empty, all scenarios of different chunk stages will be returned. pub async fn all_scenarios_for_one_chunk( - // These delete predicates are applied at all stages of the chunk life cycle + // These delete predicates are applied at all stages of the chunk lifecycle chunk_stage_preds: Vec<&DeletePredicate>, // These delete predicates are applied to all chunks at their final stages at_end_preds: Vec<&DeletePredicate>, - // Input data, formatted as line protocol. One chunk will be created for each measurement + // Input data, formatted as line protocol. One chunk will be created for each measurement // (table) that appears in the input lp_lines: Vec<&str>, // Table to which the delete predicates will be applied @@ -284,10 +293,9 @@ pub async fn all_scenarios_for_one_chunk( let mut scenarios = vec![]; // Go over chunk stages for chunk_stage in ChunkStage::all() { - // Apply delete chunk_stage_preds to this chunk stage at - // all stages at and before that in the life cycle to the chunk - // But only need to get all delete times if chunk_stage_preds is not empty, - // otherwise, produce only one scenario of each chunk stage + // Apply delete chunk_stage_preds to this chunk stage at all stages at and before that in + // the lifecycle of the chunk. But we only need to get all delete times if + // chunk_stage_preds is not empty, otherwise, produce only one scenario of each chunk stage let mut delete_times = vec![DeleteTime::begin_for(chunk_stage)]; if !chunk_stage_preds.is_empty() { delete_times = DeleteTime::all_from_and_before(chunk_stage) @@ -325,9 +333,9 @@ pub async fn all_scenarios_for_one_chunk( scenarios } -/// Build a chunk that may move with life cycle before/after deletes -/// Note that the only chunk in this function can be moved to different stages and delete predicates -/// can be applied at different stages when the chunk is moved. +/// Build a chunk that may move with lifecycle before/after deletes. Note that the only chunk in +/// this function can be moved to different stages, and delete predicates can be applied at +/// different stages when the chunk is moved. async fn make_chunk_with_deletes_at_different_stages( lp_lines: Vec<&str>, chunk_stage: ChunkStage, @@ -350,12 +358,7 @@ async fn make_chunk_with_deletes_at_different_stages( DbScenario { scenario_name, db } } -/// This function loads two chunks of lp data into 4 different scenarios -/// -/// Data in single open mutable buffer chunk -/// Data in one open mutable buffer chunk, one closed mutable chunk -/// Data in one open mutable buffer chunk, one read buffer chunk -/// Data in one two read buffer chunks, +/// Load two chunks of lp data into different chunk scenarios. pub async fn make_two_chunk_scenarios( partition_key: &str, data1: &str, @@ -480,7 +483,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>) panic!("Cannot have delete time '{other}' for ingester chunk") } DeleteTime::Begin | DeleteTime::End => { - unreachable!("Begin/end cases should have been replaced with concrete instances at this point") + unreachable!( + "Begin/end cases should have been replaced \ + with concrete instances at this point" + ) } } } @@ -507,7 +513,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>) .await; mock_ingester.buffer_operation(op).await; - // tombstones are created immediately, need to remember their ID to handle deletion later + // tombstones are created immediately, need to remember their ID to + // handle deletion later let mut tombstone_id = None; for id in mock_ingester.tombstone_ids(delete_table_name).await { if !ids_pre.contains(&id) { @@ -521,7 +528,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>) // will be attached AFTER the chunk was created } DeleteTime::Begin | DeleteTime::End => { - unreachable!("Begin/end cases should have been replaced with concrete instances at this point") + unreachable!( + "Begin/end cases should have been replaced \ + with concrete instances at this point" + ) } } } @@ -568,7 +578,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>) mock_ingester.buffer_operation(op).await; } DeleteTime::Begin | DeleteTime::End => { - unreachable!("Begin/end cases should have been replaced with concrete instances at this point") + unreachable!( + "Begin/end cases should have been replaced \ + with concrete instances at this point" + ) } } } @@ -600,8 +613,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>) /// Ingester that can be controlled specifically for query tests. /// -/// This uses as much ingester code as possible but allows more direct control over aspects like lifecycle and -/// partioning. +/// This uses as much ingester code as possible but allows more direct control over aspects like +/// lifecycle and partioning. #[derive(Debug)] struct MockIngester { /// Test catalog state. @@ -618,9 +631,10 @@ struct MockIngester { /// Memory of partition keys for certain sequence numbers. /// - /// This is currently required because [`DmlWrite`] does not carry partiion information so we need to do that. In - /// production this is not required because the router and the ingester use the same partition logic, but we need - /// direct control over the partion key for the query tests. + /// This is currently required because [`DmlWrite`] does not carry partiion information so we + /// need to do that. In production this is not required because the router and the ingester use + /// the same partition logic, but we need direct control over the partion key for the query + /// tests. partition_keys: HashMap, /// Ingester state. @@ -671,7 +685,8 @@ impl MockIngester { /// /// This will never persist. /// - /// Takes `&self mut` because our partioning implementation does not work with concurrent access. + /// Takes `&self mut` because our partioning implementation does not work with concurrent + /// access. async fn buffer_operation(&mut self, dml_operation: DmlOperation) { let lifecycle_handle = NoopLifecycleHandle {}; @@ -828,7 +843,8 @@ impl MockIngester { /// Finalizes the ingester and creates a querier namespace that can be used for query tests. /// - /// The querier namespace will hold a simulated connection to the ingester to be able to query unpersisted data. + /// The querier namespace will hold a simulated connection to the ingester to be able to query + /// unpersisted data. async fn into_query_namespace(self) -> Arc { let mut repos = self.catalog.catalog.repositories().await; let schema = Arc::new( @@ -912,8 +928,8 @@ impl IngesterFlightClient for MockIngester { _ingester_address: Arc, request: IngesterQueryRequest, ) -> Result, IngesterFlightClientError> { - // NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates - // of wrong types) + // NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior + // (e.g. passing predicates of wrong types) let response = prepare_data_to_querier(&self.ingester_data, &request) .await .map_err(|e| IngesterFlightClientError::Flight { @@ -943,8 +959,8 @@ impl IngesterFlightClient for MockIngester { } } -/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a [`IngesterFlightClientQueryData`] -/// (used by the querier) without doing any real gRPC IO. +/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a +/// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO. #[derive(Debug)] struct QueryDataAdapter { response: IngesterQueryResponse,