Merge pull request #4780 from influxdata/cn/chunk-updating

docs: Remove outdated references to the old MUB -> RUB -> OS chunk cycle
pull/24376/head
kodiakhq[bot] 2022-06-06 12:52:34 +00:00 committed by GitHub
commit bc107aff1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 111 additions and 99 deletions

View File

@ -6,7 +6,7 @@ Here are useful metrics
### Requests to IOx Server including Routers and Query Servers
| Metric name | Code Name | Description |
| --- | --- | --- |
| --- | --- | --- |
| http_requests_total | http_requests | Total number of HTTP requests |
| gRPC_requests_total | requests | Total number of gROC requests |
| http_request_duration_seconds| ? | Time to finish a request |
@ -17,7 +17,7 @@ Here are useful metrics
### Line Protocol Data ingested into Routers
| Metric name | Code Name | Description |
| --- | --- | --- |
| --- | --- | --- |
| ingest_points_total | ingest_lines_total | Total number of lines ingested |
| ingest_fields_total | ingest_fields_total | Total number of fields (columns) ingested |
| ingest_points_bytes_total | ingest_points_bytes_total | Total number of bytes ingested |
@ -26,9 +26,9 @@ Here are useful metrics
### Chunks
| Metric name | Code Name | Description |
| --- | --- | --- |
| catalog_chunks_mem_usage_bytes | memory_metrics | Total memory usage by chunks (MUB, RUB, OS statistics) |
| catalog_loaded_chunks | chunk_storage | Total number of chunks (MUB, RUB, RUBandOS) for each table |
| catalog_loaded_rows | row_count | Total number of rows (MUB, RUB, RUBandOS) for each table |
| catalog_chunks_mem_usage_bytes | memory_metrics | Total memory usage by chunks |
| catalog_loaded_chunks | chunk_storage | Total number of chunks for each table |
| catalog_loaded_rows | row_count | Total number of rows for each table |
| catalog_lock_total | ? | ? |
| catalog_lock_wait_seconds_total | ? | ? |
| ? | partition_lock_tracker | ? |
@ -55,7 +55,7 @@ Here are useful metrics
| Metric name | Code Name | Description |
| --- | --- | --- |
| write_buffer_ingest_requests_total | red | Total number of write requests |
| write_buffer_read_bytes_total | bytes_read | Total number of write requested bytes |
| write_buffer_read_bytes_total | bytes_read | Total number of write requested bytes |
| write_buffer_last_sequence_number | last_sequence_number | sequence number of last write request |
| write_buffer_sequence_number_lag | sequence_number_lag | The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number |
| write_buffer_last_min_ts | last_min_ts | Minimum timestamp of last write as unix timestamp in nanoseconds |

View File

@ -241,7 +241,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError>;
/// Returns chunk type which is either MUB, RUB, OS
/// Returns chunk type. Useful in tests and debug logs.
fn chunk_type(&self) -> &str;
/// Order of this chunk relative to other overlapping chunks.

View File

@ -932,17 +932,19 @@ impl Deduplicater {
/// Return a sort plan for for a given chunk
/// This plan is applied for every chunk to read data from chunk
/// The plan will look like this. Reading bottom up:
/// 1. First we scan the data in IOxReadFilterNode which represents
/// a custom implemented scan of MUB, RUB, OS. Both Select Predicate of
/// the query and Delete Predicates of the chunk is pushed down
/// here to eliminate as much data as early as possible but it is not guaranteed
/// all filters are applied because only certain expressions work
/// at this low chunk scan level.
/// Delete Predicates are tombstone of deleted data that will be eliminated at read time.
/// 2. If the chunk has Delete Predicates, the FilterExec will be added to filter data out
/// We apply delete predicate filter at this low level because the Delete Predicates are chunk specific.
/// 3. Then SortExec is added if there is a request to sort this chunk at this stage
/// See the description of function build_scan_plan to see why the sort may be needed
///
/// 1. First we scan the data in IOxReadFilterNode which represents a custom implemented scan
/// of the chunk. Both Select Predicate of the query and Delete Predicates of the chunk is
/// pushed down here to eliminate as much data as early as possible but it is not
/// guaranteed all filters are applied because only certain expressions work at this low
/// chunk scan level. Delete Predicates are tombstone of deleted data that will be
/// eliminated at read time.
/// 2. If the chunk has Delete Predicates, the FilterExec will be added to filter data out.
/// We apply delete predicate filter at this low level because the Delete Predicates are
/// chunk specific.
/// 3. Then SortExec is added if there is a request to sort this chunk at this stage.
/// See the description of function build_scan_plan to see why the sort may be needed.
///
/// ```text
/// ┌─────────────────┐
/// │ ProjectionExec │

View File

@ -13,8 +13,8 @@ use data_types::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange};
// when they happen
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS
/// All data will be soft deleted in this setup
/// Setup for delete query test with one table and one chunk. All data will be soft deleted in this
/// setup.
pub struct OneDeleteSimpleExprOneChunkDeleteAll {}
#[async_trait]
impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
@ -31,13 +31,12 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
exprs: vec![],
};
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS
/// Setup for delete query test with one table and one chunk
pub struct OneDeleteSimpleExprOneChunk {}
#[async_trait]
impl DbSetup for OneDeleteSimpleExprOneChunk {
@ -58,14 +57,12 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
)],
};
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for many scenario move chunk from from MUB to RUB to OS
/// No delete in this case
/// Setup for many scenarios moving the chunk to different stages. No delete in this case.
pub struct NoDeleteOneChunk {}
#[async_trait]
impl DbSetup for NoDeleteOneChunk {
@ -80,13 +77,12 @@ impl DbSetup for NoDeleteOneChunk {
"cpu,foo=me bar=1 40",
];
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
/// Setup for multi-expression delete query test with one table and one chunk
pub struct OneDeleteMultiExprsOneChunk {}
#[async_trait]
impl DbSetup for OneDeleteMultiExprsOneChunk {
@ -109,20 +105,19 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
],
};
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
/// Two deletes at different chunk stages
/// Setup for multi-expression delete query test with one table and one chunk. Two deletes at
/// different chunk stages.
pub struct TwoDeletesMultiExprsOneChunk {}
#[async_trait]
impl DbSetup for TwoDeletesMultiExprsOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the multi-expression delete predicate is added in MUB and
// is moved with chunk moving. Then one more delete after moving
// The main purpose of these scenarios is the multi-expression delete predicate is added in
// the ingester and is moved with chunk moving. Then one more delete after moving.
// General setup for all scenarios
let partition_key = "1970-01-01T00";

View File

@ -40,7 +40,6 @@ impl DbSetup for OneMeasurementRealisticTimes {
"cpu,region=west user=21.0 1626809430000000000",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
@ -59,7 +58,6 @@ impl DbSetup for OneMeasurementNoTags {
"h2o level=200.0 300",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "h2o", partition_key).await
}
}
@ -81,7 +79,6 @@ impl DbSetup for OneMeasurementManyNullTags {
"h2o,state=NY,city=NYC,borough=Brooklyn temp=61.0 600",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
@ -176,7 +173,6 @@ impl DbSetup for TwoMeasurements {
"disk,region=east bytes=99i 200",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
@ -208,7 +204,8 @@ impl DbSetup for TwoMeasurementsWithDelete {
)],
};
// return all possible combination scenarios of a chunk stage and when the delete predicates are applied
// return all possible combination scenarios of a chunk stage and when the delete
// predicates are applied
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
@ -246,7 +243,8 @@ impl DbSetup for TwoMeasurementsWithDeleteAll {
exprs: vec![],
};
// return all possible combination scenarios of a chunk stage and when the delete predicates are applied
// return all possible combination scenarios of a chunk stage and when the delete
// predicates are applied
all_scenarios_for_one_chunk(
vec![&pred1],
vec![&pred2],
@ -489,7 +487,8 @@ impl DbSetup for ManyFieldsSeveralChunks {
// c4: parquet stage & overlap with c1
let lp_lines4 = vec![
"h2o,state=MA,city=Boston temp=88.6 230",
"h2o,state=MA,city=Boston other_temp=80 250", // duplicate with a row in c1 but more recent => this row is kept
"h2o,state=MA,city=Boston other_temp=80 250", // duplicate with a row in c1 but more
// recent => this row is kept
];
let c4 = ChunkData {
lp_lines: lp_lines4,
@ -559,8 +558,9 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
// . time range: 150 - 300
// . no duplicates in its own chunk
let lp_lines2 = vec![
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150", // new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
// new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150",
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
"h2o,state=MA,city=Reading min_temp=53.4, 250",
"h2o,state=CA,city=SF min_temp=79.0,max_temp=87.2,area=500u 300",
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
@ -696,7 +696,7 @@ impl DbSetup for EndToEndTest {
];
let partition_key = "1970-01-01T00";
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu_load_short", partition_key).await
}
}
@ -759,7 +759,7 @@ impl DbSetup for TwoMeasurementsMultiSeries {
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
@ -783,7 +783,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDelete {
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
@ -822,7 +822,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDeleteAll {
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
@ -978,9 +978,8 @@ impl DbSetup for OneMeasurementNoTagsWithDelete {
}
}
/// This will create many scenarios (at least 15), some have a chunk with
/// soft deleted data, some have no chunks because there is no point to
/// create a RUB for one or many compacted MUB with all deleted data.
/// This will create many scenarios: some have a chunk with soft deleted data, some have no chunks
/// because there is no point to create compacted chunks with all deleted data.
pub struct OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {}
#[async_trait]
impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {
@ -995,8 +994,8 @@ impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {
exprs: vec![],
};
// Apply predicate before the chunk is moved if any. There will be
// scenario without chunks as a consequence of not-compacting-deleted-data
// Apply predicate before the chunk is moved if any. There will be scenarios without chunks
// as a consequence of not-compacting-deleted-data
all_scenarios_for_one_chunk(
vec![&pred],
vec![],

View File

@ -44,7 +44,7 @@ use std::{
sync::Mutex,
};
// Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle
// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle
// & when delete predicates are applied
// STRUCTs & ENUMs
@ -55,9 +55,9 @@ pub struct ChunkData<'a, 'b> {
/// which stage this chunk will be created.
///
/// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be helpful when the test
/// scenario is not specific to the chunk stage. If this is used for multiple chunks, then all stage permutations
/// will be generated.
/// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be
/// helpful when the test scenario is not specific to the chunk stage. If this is used for
/// multiple chunks, then all stage permutations will be generated.
pub chunk_stage: Option<ChunkStage>,
/// Delete predicates
@ -80,7 +80,8 @@ impl<'a, 'b> ChunkData<'a, 'b> {
}
}
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self) -> Self {
Self {
preds: self
@ -100,7 +101,7 @@ impl<'a, 'b> ChunkData<'a, 'b> {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage {
/// In parquet file.
/// In parquet file, persisted by the ingester. Now managed by the querier.
Parquet,
/// In ingester.
@ -119,10 +120,12 @@ impl Display for ChunkStage {
impl PartialOrd for ChunkStage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
// allow multiple parquet chunks (for the same partition). sequence numbers will be used for ordering.
// allow multiple parquet chunks (for the same partition). sequence numbers will be
// used for ordering.
(Self::Parquet, Self::Parquet) => Some(Ordering::Equal),
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the ingester
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the
// ingester
(Self::Parquet, Self::Ingester) => Some(Ordering::Less),
(Self::Ingester, Self::Parquet) => Some(Ordering::Greater),
@ -149,7 +152,8 @@ pub struct Pred<'a> {
}
impl<'a> Pred<'a> {
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self {
Self {
delete_time: self.delete_time.replace_begin_and_end_delete_times(stage),
@ -168,9 +172,11 @@ impl<'a> Pred<'a> {
/// Describes when a delete predicate was applied.
///
/// # Ordering
/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest payload / LP data
/// resists in the ingester and is not yet available as a parquet file, the latest tombstones apply to parquet files and
/// were (past tense!) NOT applied while the LP data was in the ingester.
///
/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest
/// payload / LP data resists in the ingester and is not yet available as a parquet file, the
/// latest tombstones apply to parquet files and were (past tense!) NOT applied while the LP data
/// was in the ingester.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeleteTime {
/// Special delete time which marks the first time that could be used from deletion.
@ -182,11 +188,13 @@ pub enum DeleteTime {
Ingester {
/// Flag if the tombstone also exists in the catalog.
///
/// If this is set to `false`, then the tombstone was applied by the ingester but does not exist in the catalog
/// any longer. This can be because:
/// If this is set to `false`, then the tombstone was applied by the ingester but does not
/// exist in the catalog any longer. This can be because:
///
/// - the ingester decided that it doesn't need to be added to the catalog (this is currently/2022-04-21 not implemented!)
/// - the compactor pruned the tombstone from the catalog because there are zero affected parquet files
/// - the ingester decided that it doesn't need to be added to the catalog (this is
/// currently/2022-04-21 not implemented!)
/// - the compactor pruned the tombstone from the catalog because there are zero affected
/// parquet files
also_in_catalog: bool,
},
@ -223,7 +231,8 @@ impl DeleteTime {
}
}
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self {
match self {
Self::Begin => Self::begin_for(stage),
@ -266,14 +275,14 @@ impl Display for DeleteTime {
// --------------------------------------------------------------------------------------------
/// All scenarios chunk stages and their life cycle moves for given set of delete predicates.
/// All scenarios of chunk stages and their lifecycle moves for a given set of delete predicates.
/// If the delete predicates are empty, all scenarios of different chunk stages will be returned.
pub async fn all_scenarios_for_one_chunk(
// These delete predicates are applied at all stages of the chunk life cycle
// These delete predicates are applied at all stages of the chunk lifecycle
chunk_stage_preds: Vec<&DeletePredicate>,
// These delete predicates are applied to all chunks at their final stages
at_end_preds: Vec<&DeletePredicate>,
// Input data, formatted as line protocol. One chunk will be created for each measurement
// Input data, formatted as line protocol. One chunk will be created for each measurement
// (table) that appears in the input
lp_lines: Vec<&str>,
// Table to which the delete predicates will be applied
@ -284,10 +293,9 @@ pub async fn all_scenarios_for_one_chunk(
let mut scenarios = vec![];
// Go over chunk stages
for chunk_stage in ChunkStage::all() {
// Apply delete chunk_stage_preds to this chunk stage at
// all stages at and before that in the life cycle to the chunk
// But only need to get all delete times if chunk_stage_preds is not empty,
// otherwise, produce only one scenario of each chunk stage
// Apply delete chunk_stage_preds to this chunk stage at all stages at and before that in
// the lifecycle of the chunk. But we only need to get all delete times if
// chunk_stage_preds is not empty, otherwise, produce only one scenario of each chunk stage
let mut delete_times = vec![DeleteTime::begin_for(chunk_stage)];
if !chunk_stage_preds.is_empty() {
delete_times = DeleteTime::all_from_and_before(chunk_stage)
@ -325,9 +333,9 @@ pub async fn all_scenarios_for_one_chunk(
scenarios
}
/// Build a chunk that may move with life cycle before/after deletes
/// Note that the only chunk in this function can be moved to different stages and delete predicates
/// can be applied at different stages when the chunk is moved.
/// Build a chunk that may move with lifecycle before/after deletes. Note that the only chunk in
/// this function can be moved to different stages, and delete predicates can be applied at
/// different stages when the chunk is moved.
async fn make_chunk_with_deletes_at_different_stages(
lp_lines: Vec<&str>,
chunk_stage: ChunkStage,
@ -350,12 +358,7 @@ async fn make_chunk_with_deletes_at_different_stages(
DbScenario { scenario_name, db }
}
/// This function loads two chunks of lp data into 4 different scenarios
///
/// Data in single open mutable buffer chunk
/// Data in one open mutable buffer chunk, one closed mutable chunk
/// Data in one open mutable buffer chunk, one read buffer chunk
/// Data in one two read buffer chunks,
/// Load two chunks of lp data into different chunk scenarios.
pub async fn make_two_chunk_scenarios(
partition_key: &str,
data1: &str,
@ -480,7 +483,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
panic!("Cannot have delete time '{other}' for ingester chunk")
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -507,7 +513,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
.await;
mock_ingester.buffer_operation(op).await;
// tombstones are created immediately, need to remember their ID to handle deletion later
// tombstones are created immediately, need to remember their ID to
// handle deletion later
let mut tombstone_id = None;
for id in mock_ingester.tombstone_ids(delete_table_name).await {
if !ids_pre.contains(&id) {
@ -521,7 +528,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
// will be attached AFTER the chunk was created
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -568,7 +578,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
mock_ingester.buffer_operation(op).await;
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -600,8 +613,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
/// Ingester that can be controlled specifically for query tests.
///
/// This uses as much ingester code as possible but allows more direct control over aspects like lifecycle and
/// partioning.
/// This uses as much ingester code as possible but allows more direct control over aspects like
/// lifecycle and partioning.
#[derive(Debug)]
struct MockIngester {
/// Test catalog state.
@ -618,9 +631,10 @@ struct MockIngester {
/// Memory of partition keys for certain sequence numbers.
///
/// This is currently required because [`DmlWrite`] does not carry partiion information so we need to do that. In
/// production this is not required because the router and the ingester use the same partition logic, but we need
/// direct control over the partion key for the query tests.
/// This is currently required because [`DmlWrite`] does not carry partiion information so we
/// need to do that. In production this is not required because the router and the ingester use
/// the same partition logic, but we need direct control over the partion key for the query
/// tests.
partition_keys: HashMap<SequenceNumber, String>,
/// Ingester state.
@ -671,7 +685,8 @@ impl MockIngester {
///
/// This will never persist.
///
/// Takes `&self mut` because our partioning implementation does not work with concurrent access.
/// Takes `&self mut` because our partioning implementation does not work with concurrent
/// access.
async fn buffer_operation(&mut self, dml_operation: DmlOperation) {
let lifecycle_handle = NoopLifecycleHandle {};
@ -828,7 +843,8 @@ impl MockIngester {
/// Finalizes the ingester and creates a querier namespace that can be used for query tests.
///
/// The querier namespace will hold a simulated connection to the ingester to be able to query unpersisted data.
/// The querier namespace will hold a simulated connection to the ingester to be able to query
/// unpersisted data.
async fn into_query_namespace(self) -> Arc<QuerierNamespace> {
let mut repos = self.catalog.catalog.repositories().await;
let schema = Arc::new(
@ -912,8 +928,8 @@ impl IngesterFlightClient for MockIngester {
_ingester_address: Arc<str>,
request: IngesterQueryRequest,
) -> Result<Box<dyn IngesterFlightClientQueryData>, IngesterFlightClientError> {
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates
// of wrong types)
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior
// (e.g. passing predicates of wrong types)
let response = prepare_data_to_querier(&self.ingester_data, &request)
.await
.map_err(|e| IngesterFlightClientError::Flight {
@ -943,8 +959,8 @@ impl IngesterFlightClient for MockIngester {
}
}
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a [`IngesterFlightClientQueryData`]
/// (used by the querier) without doing any real gRPC IO.
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a
/// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO.
#[derive(Debug)]
struct QueryDataAdapter {
response: IngesterQueryResponse,

View File

@ -71,7 +71,7 @@ async fn run_table_schema_test_case<D>(
}
fn is_unsorted_chunk_type(chunk: &dyn QueryChunk) -> bool {
(chunk.chunk_type() == "MUB") || (chunk.chunk_type() == "IngesterPartition")
chunk.chunk_type() == "IngesterPartition"
}
#[tokio::test]