docs: Improve description of ChunkStage in query test scenarios

Namely, that ChunkStage::Parquet probably doesn't correspond to
ParquetChunk; it means the data has been persisted to parquet and the
chunks are now managed by the querier.
pull/24376/head
Carol (Nichols || Goulding) 2022-06-02 16:33:29 -04:00
parent c6cb594a6d
commit 5c6c086d26
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 64 additions and 48 deletions

View File

@ -44,7 +44,7 @@ use std::{
sync::Mutex,
};
// Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle
// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle
// & when delete predicates are applied
// STRUCTs & ENUMs
@ -55,9 +55,9 @@ pub struct ChunkData<'a, 'b> {
/// which stage this chunk will be created.
///
/// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be helpful when the test
/// scenario is not specific to the chunk stage. If this is used for multiple chunks, then all stage permutations
/// will be generated.
/// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be
/// helpful when the test scenario is not specific to the chunk stage. If this is used for
/// multiple chunks, then all stage permutations will be generated.
pub chunk_stage: Option<ChunkStage>,
/// Delete predicates
@ -80,7 +80,8 @@ impl<'a, 'b> ChunkData<'a, 'b> {
}
}
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self) -> Self {
Self {
preds: self
@ -100,7 +101,7 @@ impl<'a, 'b> ChunkData<'a, 'b> {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage {
/// In parquet file.
/// In parquet file, persisted by the ingester. Now managed by the querier.
Parquet,
/// In ingester.
@ -119,10 +120,12 @@ impl Display for ChunkStage {
impl PartialOrd for ChunkStage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
// allow multiple parquet chunks (for the same partition). sequence numbers will be used for ordering.
// allow multiple parquet chunks (for the same partition). sequence numbers will be
// used for ordering.
(Self::Parquet, Self::Parquet) => Some(Ordering::Equal),
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the ingester
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the
// ingester
(Self::Parquet, Self::Ingester) => Some(Ordering::Less),
(Self::Ingester, Self::Parquet) => Some(Ordering::Greater),
@ -149,7 +152,8 @@ pub struct Pred<'a> {
}
impl<'a> Pred<'a> {
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self {
Self {
delete_time: self.delete_time.replace_begin_and_end_delete_times(stage),
@ -168,9 +172,11 @@ impl<'a> Pred<'a> {
/// Describes when a delete predicate was applied.
///
/// # Ordering
/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest payload / LP data
/// resists in the ingester and is not yet available as a parquet file, the latest tombstones apply to parquet files and
/// were (past tense!) NOT applied while the LP data was in the ingester.
///
/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest
/// payload / LP data resists in the ingester and is not yet available as a parquet file, the
/// latest tombstones apply to parquet files and were (past tense!) NOT applied while the LP data
/// was in the ingester.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeleteTime {
/// Special delete time which marks the first time that could be used from deletion.
@ -182,11 +188,13 @@ pub enum DeleteTime {
Ingester {
/// Flag if the tombstone also exists in the catalog.
///
/// If this is set to `false`, then the tombstone was applied by the ingester but does not exist in the catalog
/// any longer. This can be because:
/// If this is set to `false`, then the tombstone was applied by the ingester but does not
/// exist in the catalog any longer. This can be because:
///
/// - the ingester decided that it doesn't need to be added to the catalog (this is currently/2022-04-21 not implemented!)
/// - the compactor pruned the tombstone from the catalog because there are zero affected parquet files
/// - the ingester decided that it doesn't need to be added to the catalog (this is
/// currently/2022-04-21 not implemented!)
/// - the compactor pruned the tombstone from the catalog because there are zero affected
/// parquet files
also_in_catalog: bool,
},
@ -223,7 +231,8 @@ impl DeleteTime {
}
}
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self {
match self {
Self::Begin => Self::begin_for(stage),
@ -266,14 +275,14 @@ impl Display for DeleteTime {
// --------------------------------------------------------------------------------------------
/// All scenarios chunk stages and their life cycle moves for given set of delete predicates.
/// All scenarios of chunk stages and their lifecycle moves for a given set of delete predicates.
/// If the delete predicates are empty, all scenarios of different chunk stages will be returned.
pub async fn all_scenarios_for_one_chunk(
// These delete predicates are applied at all stages of the chunk life cycle
// These delete predicates are applied at all stages of the chunk lifecycle
chunk_stage_preds: Vec<&DeletePredicate>,
// These delete predicates are applied to all chunks at their final stages
at_end_preds: Vec<&DeletePredicate>,
// Input data, formatted as line protocol. One chunk will be created for each measurement
// Input data, formatted as line protocol. One chunk will be created for each measurement
// (table) that appears in the input
lp_lines: Vec<&str>,
// Table to which the delete predicates will be applied
@ -284,10 +293,9 @@ pub async fn all_scenarios_for_one_chunk(
let mut scenarios = vec![];
// Go over chunk stages
for chunk_stage in ChunkStage::all() {
// Apply delete chunk_stage_preds to this chunk stage at
// all stages at and before that in the life cycle to the chunk
// But only need to get all delete times if chunk_stage_preds is not empty,
// otherwise, produce only one scenario of each chunk stage
// Apply delete chunk_stage_preds to this chunk stage at all stages at and before that in
// the lifecycle of the chunk. But we only need to get all delete times if
// chunk_stage_preds is not empty, otherwise, produce only one scenario of each chunk stage
let mut delete_times = vec![DeleteTime::begin_for(chunk_stage)];
if !chunk_stage_preds.is_empty() {
delete_times = DeleteTime::all_from_and_before(chunk_stage)
@ -325,9 +333,9 @@ pub async fn all_scenarios_for_one_chunk(
scenarios
}
/// Build a chunk that may move with life cycle before/after deletes
/// Note that the only chunk in this function can be moved to different stages and delete predicates
/// can be applied at different stages when the chunk is moved.
/// Build a chunk that may move with lifecycle before/after deletes. Note that the only chunk in
/// this function can be moved to different stages, and delete predicates can be applied at
/// different stages when the chunk is moved.
async fn make_chunk_with_deletes_at_different_stages(
lp_lines: Vec<&str>,
chunk_stage: ChunkStage,
@ -350,12 +358,7 @@ async fn make_chunk_with_deletes_at_different_stages(
DbScenario { scenario_name, db }
}
/// This function loads two chunks of lp data into 4 different scenarios
///
/// Data in single open mutable buffer chunk
/// Data in one open mutable buffer chunk, one closed mutable chunk
/// Data in one open mutable buffer chunk, one read buffer chunk
/// Data in one two read buffer chunks,
/// Load two chunks of lp data into different chunk scenarios.
pub async fn make_two_chunk_scenarios(
partition_key: &str,
data1: &str,
@ -480,7 +483,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
panic!("Cannot have delete time '{other}' for ingester chunk")
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -507,7 +513,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
.await;
mock_ingester.buffer_operation(op).await;
// tombstones are created immediately, need to remember their ID to handle deletion later
// tombstones are created immediately, need to remember their ID to
// handle deletion later
let mut tombstone_id = None;
for id in mock_ingester.tombstone_ids(delete_table_name).await {
if !ids_pre.contains(&id) {
@ -521,7 +528,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
// will be attached AFTER the chunk was created
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -568,7 +578,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
mock_ingester.buffer_operation(op).await;
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -600,8 +613,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
/// Ingester that can be controlled specifically for query tests.
///
/// This uses as much ingester code as possible but allows more direct control over aspects like lifecycle and
/// partioning.
/// This uses as much ingester code as possible but allows more direct control over aspects like
/// lifecycle and partioning.
#[derive(Debug)]
struct MockIngester {
/// Test catalog state.
@ -618,9 +631,10 @@ struct MockIngester {
/// Memory of partition keys for certain sequence numbers.
///
/// This is currently required because [`DmlWrite`] does not carry partiion information so we need to do that. In
/// production this is not required because the router and the ingester use the same partition logic, but we need
/// direct control over the partion key for the query tests.
/// This is currently required because [`DmlWrite`] does not carry partiion information so we
/// need to do that. In production this is not required because the router and the ingester use
/// the same partition logic, but we need direct control over the partion key for the query
/// tests.
partition_keys: HashMap<SequenceNumber, String>,
/// Ingester state.
@ -671,7 +685,8 @@ impl MockIngester {
///
/// This will never persist.
///
/// Takes `&self mut` because our partioning implementation does not work with concurrent access.
/// Takes `&self mut` because our partioning implementation does not work with concurrent
/// access.
async fn buffer_operation(&mut self, dml_operation: DmlOperation) {
let lifecycle_handle = NoopLifecycleHandle {};
@ -828,7 +843,8 @@ impl MockIngester {
/// Finalizes the ingester and creates a querier namespace that can be used for query tests.
///
/// The querier namespace will hold a simulated connection to the ingester to be able to query unpersisted data.
/// The querier namespace will hold a simulated connection to the ingester to be able to query
/// unpersisted data.
async fn into_query_namespace(self) -> Arc<QuerierNamespace> {
let mut repos = self.catalog.catalog.repositories().await;
let schema = Arc::new(
@ -912,8 +928,8 @@ impl IngesterFlightClient for MockIngester {
_ingester_address: Arc<str>,
request: IngesterQueryRequest,
) -> Result<Box<dyn IngesterFlightClientQueryData>, IngesterFlightClientError> {
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates
// of wrong types)
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior
// (e.g. passing predicates of wrong types)
let response = prepare_data_to_querier(&self.ingester_data, &request)
.await
.map_err(|e| IngesterFlightClientError::Flight {
@ -943,8 +959,8 @@ impl IngesterFlightClient for MockIngester {
}
}
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a [`IngesterFlightClientQueryData`]
/// (used by the querier) without doing any real gRPC IO.
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a
/// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO.
#[derive(Debug)]
struct QueryDataAdapter {
response: IngesterQueryResponse,