Merge branch 'main' into cn/read-buffer-cache

pull/24376/head
kodiakhq[bot] 2022-06-06 12:52:48 +00:00 committed by GitHub
commit 412309e7b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 132 additions and 187 deletions

View File

@ -8,7 +8,7 @@ description = "Re-exports datafusion at a specific version"
[dependencies]
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
# Turn off optional datafusion features (e.g. don't get support for crypto functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="8ddd99c8432fdac2c236040973f984a4146f18b7", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="8ddd99c8432fdac2c236040973f984a4146f18b7" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -1,8 +1,9 @@
# InfluxDB IOx Documentation
Please see the main [README](../README.md) for user facing documentation.
This directory contains internal design documentation of potential
interest for those who wish to understand how the code works. It is
not intended to be general user facing documentation
interest for those who wish to understand how the code works.
## IOx Tech Talks

View File

@ -14,21 +14,21 @@ Some examples
```bash
# Default verbosity
$ ./influxdb_iox run database
$ ./influxdb_iox run all-in-one
# More verbose
$ ./influxdb_iox run database -v
$ ./influxdb_iox run all-in-one -v
# Even more verbose
$ ./influxdb_iox run database -vv
$ ./influxdb_iox run all-in-one -vv
# Everything!!
$ ./influxdb_iox run database --log-filter trace
$ ./influxdb_iox run all-in-one --log-filter trace
# Default info, but debug within http module
$ ./influxdb_iox run database --log-filter info,influxdb_iox::influxdb_ioxd::http=debug
$ ./influxdb_iox run all-in-one --log-filter info,influxdb_iox::influxdb_ioxd::http=debug
```
Additionally, the output format can be controlled with `--log-format`
```bash
$ ./influxdb_iox run database --log-filter debug --log-format logfmt
$ ./influxdb_iox run all-in-one --log-filter debug --log-format logfmt
```
## Developer Guide
@ -69,7 +69,7 @@ will strip out all trace level callsites from the release binary.
### Format
IOx supports logging in many formats. For a list run `influxdb_iox run database --help` and view the help output
IOx supports logging in many formats. For a list run `influxdb_iox run --help` and view the help output
for `--log-format`.
<sup>1.</sup> This span propagation uses thread-local storage and therefore does not automatically carry across

View File

@ -6,7 +6,7 @@ Here are useful metrics
### Requests to IOx Server including Routers and Query Servers
| Metric name | Code Name | Description |
| --- | --- | --- |
| --- | --- | --- |
| http_requests_total | http_requests | Total number of HTTP requests |
| gRPC_requests_total | requests | Total number of gROC requests |
| http_request_duration_seconds| ? | Time to finish a request |
@ -17,7 +17,7 @@ Here are useful metrics
### Line Protocol Data ingested into Routers
| Metric name | Code Name | Description |
| --- | --- | --- |
| --- | --- | --- |
| ingest_points_total | ingest_lines_total | Total number of lines ingested |
| ingest_fields_total | ingest_fields_total | Total number of fields (columns) ingested |
| ingest_points_bytes_total | ingest_points_bytes_total | Total number of bytes ingested |
@ -26,9 +26,9 @@ Here are useful metrics
### Chunks
| Metric name | Code Name | Description |
| --- | --- | --- |
| catalog_chunks_mem_usage_bytes | memory_metrics | Total memory usage by chunks (MUB, RUB, OS statistics) |
| catalog_loaded_chunks | chunk_storage | Total number of chunks (MUB, RUB, RUBandOS) for each table |
| catalog_loaded_rows | row_count | Total number of rows (MUB, RUB, RUBandOS) for each table |
| catalog_chunks_mem_usage_bytes | memory_metrics | Total memory usage by chunks |
| catalog_loaded_chunks | chunk_storage | Total number of chunks for each table |
| catalog_loaded_rows | row_count | Total number of rows for each table |
| catalog_lock_total | ? | ? |
| catalog_lock_wait_seconds_total | ? | ? |
| ? | partition_lock_tracker | ? |
@ -55,7 +55,7 @@ Here are useful metrics
| Metric name | Code Name | Description |
| --- | --- | --- |
| write_buffer_ingest_requests_total | red | Total number of write requests |
| write_buffer_read_bytes_total | bytes_read | Total number of write requested bytes |
| write_buffer_read_bytes_total | bytes_read | Total number of write requested bytes |
| write_buffer_last_sequence_number | last_sequence_number | sequence number of last write request |
| write_buffer_sequence_number_lag | sequence_number_lag | The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number |
| write_buffer_last_min_ts | last_min_ts | Minimum timestamp of last write as unix timestamp in nanoseconds |

View File

@ -42,74 +42,6 @@ You can also see more logging using the `LOG_FILTER` variable. For example:
LOG_FILTER=debug,sqlx=warn,h2=warn TEST_INTEGRATION=1 TEST_INFLUXDB_IOX_CATALOG_DSN=postgresql://localhost:5432/alamb cargo test --test end_to_end
```
## Running the IOx server from source
### Starting the server
You can run IOx locally with a command like this (replacing `--data-dir` with your preferred location)
```shell
cargo run -- run -v --object-store=file --data-dir=$HOME/.influxdb_iox --server-id=42
```
### Loading data
In another terminal window, try loading some data. These commands will create a database called `parquet_db` and load the contents of `tests/fixtures/lineproto/metrics.lp` into it
```shell
cd influxdb_iox
./target/debug/influxdb_iox database create parquet_db
./target/debug/influxdb_iox database write parquet_db tests/fixtures/lineproto/metrics.lp
```
### Editing configuration
You can interactively edit the configuration of the IOx instance with a command like this:
```shell
./scripts/edit_db_rules localhost:8082 parquet_db
```
Which will bring up your editor with a file that looks like this. Any changes you make to the file will be sent to IOx as its new config.
In this case, these settings will cause data to be persisted to parquet almost immediately
```json
{
"rules": {
"name": "parquet_db",
"partitionTemplate": {
"parts": [
{
"time": "%Y-%m-%d %H:00:00"
}
]
},
"lifecycleRules": {
"bufferSizeSoft": "52428800",
"bufferSizeHard": "104857600",
"dropNonPersisted": true,
"immutable": false,
"persist": true,
"workerBackoffMillis": "1000",
"catalogTransactionsUntilCheckpoint": "100",
"lateArriveWindowSeconds": 1,
"persistRowThreshold": "1",
"persistAgeThresholdSeconds": 1,
"mubRowThreshold": "1",
"parquetCacheLimit": "0",
"maxActiveCompactionsCpuFraction": 1
},
"workerCleanupAvgSleep": "500s"
}
}
```
### Examining Parquet Files
You can use tools such as `parquet-tools` to examine the parquet files created by IOx. For example, the following command will show the contents of the `disk` table when persisted as parquet (note the actual filename will be different):
```shell
parquet-tools meta /Users/alamb/.influxdb_iox/42/parquet_db/data/disk/2020-06-11\ 16\:00\:00/1.4b1a7805-d6de-495e-844b-32fa452147c7.parquet
```
## Object storage
### To run the tests or not run the tests

View File

@ -68,7 +68,7 @@ The simplest way to use the massif output is to use [massif-visualizer]:
![massif-visualizer screenshot](./images/screenshot_massif_visualizer.jpeg)
[heappy]: https://github.com/mkmik/heappy
[jemalloc]: ttps://github.com/jemalloc/jemalloc
[jemalloc]: https://github.com/jemalloc/jemalloc
[lazycell]: https://crates.io/crates/lazycell
[Massif]: https://valgrind.org/docs/manual/ms-manual.html
[massif-visualizer]: https://github.com/KDE/massif-visualizer

View File

@ -77,7 +77,7 @@ pub fn agent_pre_generated(c: &mut Criterion) {
let spec: DataSpec = toml::from_str(r#"
name = "storage_cardinality_example"
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
# Values are automatically generated before the agents are initialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
[[values]]
@ -122,7 +122,7 @@ template = "{{id}}"
cardinality = 10
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
# increase the cardiality beyond count(bucket) * count(partition). Later this example will use the
# increase the cardinality beyond count(bucket) * count(partition). Later this example will use the
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
[[tag_sets]]
name = "bucket_set"

View File

@ -137,7 +137,7 @@ f64_range = [0.0, 1.0]
[[database_writers]]
agents = [
{name = "high", sampling_interval = "10s", count = 10}, # 5,000 meassurmeents
{name = "high", sampling_interval = "10s", count = 10}, # 5,000 measurements
{name = "medium", sampling_interval = "10s", count = 20}, # 20,000 measurements
{name = "low", sampling_interval = "10s", count = 20} # 200,000 measurements
]

View File

@ -84,7 +84,7 @@ name = "mem"
i64_range = [0, 10000000]
[[agents.measurements.fields]]
name = "avaiable_percent"
name = "available_percent"
f64_range = [0.0, 100.0]
[[agents.measurements.fields]]

View File

@ -33,7 +33,7 @@
name = "full_example"
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
# Values are automatically generated before the agents are initialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# can be shared across tagsets and used in the agents as pre-generated data.
[[values]]
@ -55,7 +55,7 @@ template = "foo_{{guid}}_{{id}}_{{random 5}}_{{format-time \"%Y-%m-%d\"}}"
name = "t1"
template = "t1_{{id}}"
cardinality = 3
# each t1 genereated will reference one of t3 and one of foo_bar. As each t1 is generated
# each t1 generated will reference one of t3 and one of foo_bar. As each t1 is generated
# it will loop through the t3 and foo_bar collections. So the 3rd t1 that is generated will
# reference the first t3 and foo_bar
has_one = ["t3", "foo_bar"]
@ -85,7 +85,7 @@ name = "example"
# for_each specifies how to iterate through the values to generate tagsets. If you want to
# use values that belong_to others or are a has_one, specify their parent first. For values
# without relationships, you'll get a combined cardinality of each multiplied by the other.
# In this eaxmple we get cardinality of card(t1) * card(foo_bar) * card(other). The has_one
# In this example we get cardinality of card(t1) * card(foo_bar) * card(other). The has_one
# members of t1 don't increase cardinality.
for_each = [
"t1",

View File

@ -1,6 +1,6 @@
name = "storage_cardinality_example"
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
# Values are automatically generated before the agents are initialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
[[values]]
@ -45,7 +45,7 @@ template = "{{id}}"
cardinality = 10
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
# increase the cardiality beyond count(bucket) * count(partition). Later this example will use the
# increase the cardinality beyond count(bucket) * count(partition). Later this example will use the
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
[[tag_sets]]
name = "bucket_set"

View File

@ -241,7 +241,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError>;
/// Returns chunk type which is either MUB, RUB, OS
/// Returns chunk type. Useful in tests and debug logs.
fn chunk_type(&self) -> &str;
/// Order of this chunk relative to other overlapping chunks.

View File

@ -932,17 +932,19 @@ impl Deduplicater {
/// Return a sort plan for for a given chunk
/// This plan is applied for every chunk to read data from chunk
/// The plan will look like this. Reading bottom up:
/// 1. First we scan the data in IOxReadFilterNode which represents
/// a custom implemented scan of MUB, RUB, OS. Both Select Predicate of
/// the query and Delete Predicates of the chunk is pushed down
/// here to eliminate as much data as early as possible but it is not guaranteed
/// all filters are applied because only certain expressions work
/// at this low chunk scan level.
/// Delete Predicates are tombstone of deleted data that will be eliminated at read time.
/// 2. If the chunk has Delete Predicates, the FilterExec will be added to filter data out
/// We apply delete predicate filter at this low level because the Delete Predicates are chunk specific.
/// 3. Then SortExec is added if there is a request to sort this chunk at this stage
/// See the description of function build_scan_plan to see why the sort may be needed
///
/// 1. First we scan the data in IOxReadFilterNode which represents a custom implemented scan
/// of the chunk. Both Select Predicate of the query and Delete Predicates of the chunk is
/// pushed down here to eliminate as much data as early as possible but it is not
/// guaranteed all filters are applied because only certain expressions work at this low
/// chunk scan level. Delete Predicates are tombstone of deleted data that will be
/// eliminated at read time.
/// 2. If the chunk has Delete Predicates, the FilterExec will be added to filter data out.
/// We apply delete predicate filter at this low level because the Delete Predicates are
/// chunk specific.
/// 3. Then SortExec is added if there is a request to sort this chunk at this stage.
/// See the description of function build_scan_plan to see why the sort may be needed.
///
/// ```text
/// ┌─────────────────┐
/// │ ProjectionExec │

View File

@ -13,8 +13,8 @@ use data_types::{DeleteExpr, DeletePredicate, Op, Scalar, TimestampRange};
// when they happen
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS
/// All data will be soft deleted in this setup
/// Setup for delete query test with one table and one chunk. All data will be soft deleted in this
/// setup.
pub struct OneDeleteSimpleExprOneChunkDeleteAll {}
#[async_trait]
impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
@ -31,13 +31,12 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
exprs: vec![],
};
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS
/// Setup for delete query test with one table and one chunk
pub struct OneDeleteSimpleExprOneChunk {}
#[async_trait]
impl DbSetup for OneDeleteSimpleExprOneChunk {
@ -58,14 +57,12 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
)],
};
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for many scenario move chunk from from MUB to RUB to OS
/// No delete in this case
/// Setup for many scenarios moving the chunk to different stages. No delete in this case.
pub struct NoDeleteOneChunk {}
#[async_trait]
impl DbSetup for NoDeleteOneChunk {
@ -80,13 +77,12 @@ impl DbSetup for NoDeleteOneChunk {
"cpu,foo=me bar=1 40",
];
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
/// Setup for multi-expression delete query test with one table and one chunk
pub struct OneDeleteMultiExprsOneChunk {}
#[async_trait]
impl DbSetup for OneDeleteMultiExprsOneChunk {
@ -109,20 +105,19 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
],
};
// this returns 15 scenarios
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
/// Two deletes at different chunk stages
/// Setup for multi-expression delete query test with one table and one chunk. Two deletes at
/// different chunk stages.
pub struct TwoDeletesMultiExprsOneChunk {}
#[async_trait]
impl DbSetup for TwoDeletesMultiExprsOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the multi-expression delete predicate is added in MUB and
// is moved with chunk moving. Then one more delete after moving
// The main purpose of these scenarios is the multi-expression delete predicate is added in
// the ingester and is moved with chunk moving. Then one more delete after moving.
// General setup for all scenarios
let partition_key = "1970-01-01T00";

View File

@ -40,7 +40,6 @@ impl DbSetup for OneMeasurementRealisticTimes {
"cpu,region=west user=21.0 1626809430000000000",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
@ -59,7 +58,6 @@ impl DbSetup for OneMeasurementNoTags {
"h2o level=200.0 300",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "h2o", partition_key).await
}
}
@ -81,7 +79,6 @@ impl DbSetup for OneMeasurementManyNullTags {
"h2o,state=NY,city=NYC,borough=Brooklyn temp=61.0 600",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
@ -176,7 +173,6 @@ impl DbSetup for TwoMeasurements {
"disk,region=east bytes=99i 200",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
@ -208,7 +204,8 @@ impl DbSetup for TwoMeasurementsWithDelete {
)],
};
// return all possible combination scenarios of a chunk stage and when the delete predicates are applied
// return all possible combination scenarios of a chunk stage and when the delete
// predicates are applied
all_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key).await
}
}
@ -246,7 +243,8 @@ impl DbSetup for TwoMeasurementsWithDeleteAll {
exprs: vec![],
};
// return all possible combination scenarios of a chunk stage and when the delete predicates are applied
// return all possible combination scenarios of a chunk stage and when the delete
// predicates are applied
all_scenarios_for_one_chunk(
vec![&pred1],
vec![&pred2],
@ -489,7 +487,8 @@ impl DbSetup for ManyFieldsSeveralChunks {
// c4: parquet stage & overlap with c1
let lp_lines4 = vec![
"h2o,state=MA,city=Boston temp=88.6 230",
"h2o,state=MA,city=Boston other_temp=80 250", // duplicate with a row in c1 but more recent => this row is kept
"h2o,state=MA,city=Boston other_temp=80 250", // duplicate with a row in c1 but more
// recent => this row is kept
];
let c4 = ChunkData {
lp_lines: lp_lines4,
@ -559,8 +558,9 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
// . time range: 150 - 300
// . no duplicates in its own chunk
let lp_lines2 = vec![
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150", // new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
// new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150",
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
"h2o,state=MA,city=Reading min_temp=53.4, 250",
"h2o,state=CA,city=SF min_temp=79.0,max_temp=87.2,area=500u 300",
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
@ -696,7 +696,7 @@ impl DbSetup for EndToEndTest {
];
let partition_key = "1970-01-01T00";
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu_load_short", partition_key).await
}
}
@ -759,7 +759,7 @@ impl DbSetup for TwoMeasurementsMultiSeries {
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
@ -783,7 +783,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDelete {
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
@ -822,7 +822,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDeleteAll {
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
@ -978,9 +978,8 @@ impl DbSetup for OneMeasurementNoTagsWithDelete {
}
}
/// This will create many scenarios (at least 15), some have a chunk with
/// soft deleted data, some have no chunks because there is no point to
/// create a RUB for one or many compacted MUB with all deleted data.
/// This will create many scenarios: some have a chunk with soft deleted data, some have no chunks
/// because there is no point to create compacted chunks with all deleted data.
pub struct OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {}
#[async_trait]
impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {
@ -995,8 +994,8 @@ impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk {
exprs: vec![],
};
// Apply predicate before the chunk is moved if any. There will be
// scenario without chunks as a consequence of not-compacting-deleted-data
// Apply predicate before the chunk is moved if any. There will be scenarios without chunks
// as a consequence of not-compacting-deleted-data
all_scenarios_for_one_chunk(
vec![&pred],
vec![],

View File

@ -44,7 +44,7 @@ use std::{
sync::Mutex,
};
// Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle
// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle
// & when delete predicates are applied
// STRUCTs & ENUMs
@ -55,9 +55,9 @@ pub struct ChunkData<'a, 'b> {
/// which stage this chunk will be created.
///
/// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be helpful when the test
/// scenario is not specific to the chunk stage. If this is used for multiple chunks, then all stage permutations
/// will be generated.
/// If not set, this chunk will be created in [all](ChunkStage::all) stages. This can be
/// helpful when the test scenario is not specific to the chunk stage. If this is used for
/// multiple chunks, then all stage permutations will be generated.
pub chunk_stage: Option<ChunkStage>,
/// Delete predicates
@ -80,7 +80,8 @@ impl<'a, 'b> ChunkData<'a, 'b> {
}
}
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self) -> Self {
Self {
preds: self
@ -100,7 +101,7 @@ impl<'a, 'b> ChunkData<'a, 'b> {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage {
/// In parquet file.
/// In parquet file, persisted by the ingester. Now managed by the querier.
Parquet,
/// In ingester.
@ -119,10 +120,12 @@ impl Display for ChunkStage {
impl PartialOrd for ChunkStage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
// allow multiple parquet chunks (for the same partition). sequence numbers will be used for ordering.
// allow multiple parquet chunks (for the same partition). sequence numbers will be
// used for ordering.
(Self::Parquet, Self::Parquet) => Some(Ordering::Equal),
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the ingester
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the
// ingester
(Self::Parquet, Self::Ingester) => Some(Ordering::Less),
(Self::Ingester, Self::Parquet) => Some(Ordering::Greater),
@ -149,7 +152,8 @@ pub struct Pred<'a> {
}
impl<'a> Pred<'a> {
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self {
Self {
delete_time: self.delete_time.replace_begin_and_end_delete_times(stage),
@ -168,9 +172,11 @@ impl<'a> Pred<'a> {
/// Describes when a delete predicate was applied.
///
/// # Ordering
/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest payload / LP data
/// resists in the ingester and is not yet available as a parquet file, the latest tombstones apply to parquet files and
/// were (past tense!) NOT applied while the LP data was in the ingester.
///
/// Compared to [`ChunkStage`], the ordering here may seem a bit confusing. While the latest
/// payload / LP data resists in the ingester and is not yet available as a parquet file, the
/// latest tombstones apply to parquet files and were (past tense!) NOT applied while the LP data
/// was in the ingester.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeleteTime {
/// Special delete time which marks the first time that could be used from deletion.
@ -182,11 +188,13 @@ pub enum DeleteTime {
Ingester {
/// Flag if the tombstone also exists in the catalog.
///
/// If this is set to `false`, then the tombstone was applied by the ingester but does not exist in the catalog
/// any longer. This can be because:
/// If this is set to `false`, then the tombstone was applied by the ingester but does not
/// exist in the catalog any longer. This can be because:
///
/// - the ingester decided that it doesn't need to be added to the catalog (this is currently/2022-04-21 not implemented!)
/// - the compactor pruned the tombstone from the catalog because there are zero affected parquet files
/// - the ingester decided that it doesn't need to be added to the catalog (this is
/// currently/2022-04-21 not implemented!)
/// - the compactor pruned the tombstone from the catalog because there are zero affected
/// parquet files
also_in_catalog: bool,
},
@ -223,7 +231,8 @@ impl DeleteTime {
}
}
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the linked [`ChunkStage`].
/// Replace [`DeleteTime::Begin`] and [`DeleteTime::End`] with values that correspond to the
/// linked [`ChunkStage`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStage) -> Self {
match self {
Self::Begin => Self::begin_for(stage),
@ -266,14 +275,14 @@ impl Display for DeleteTime {
// --------------------------------------------------------------------------------------------
/// All scenarios chunk stages and their life cycle moves for given set of delete predicates.
/// All scenarios of chunk stages and their lifecycle moves for a given set of delete predicates.
/// If the delete predicates are empty, all scenarios of different chunk stages will be returned.
pub async fn all_scenarios_for_one_chunk(
// These delete predicates are applied at all stages of the chunk life cycle
// These delete predicates are applied at all stages of the chunk lifecycle
chunk_stage_preds: Vec<&DeletePredicate>,
// These delete predicates are applied to all chunks at their final stages
at_end_preds: Vec<&DeletePredicate>,
// Input data, formatted as line protocol. One chunk will be created for each measurement
// Input data, formatted as line protocol. One chunk will be created for each measurement
// (table) that appears in the input
lp_lines: Vec<&str>,
// Table to which the delete predicates will be applied
@ -284,10 +293,9 @@ pub async fn all_scenarios_for_one_chunk(
let mut scenarios = vec![];
// Go over chunk stages
for chunk_stage in ChunkStage::all() {
// Apply delete chunk_stage_preds to this chunk stage at
// all stages at and before that in the life cycle to the chunk
// But only need to get all delete times if chunk_stage_preds is not empty,
// otherwise, produce only one scenario of each chunk stage
// Apply delete chunk_stage_preds to this chunk stage at all stages at and before that in
// the lifecycle of the chunk. But we only need to get all delete times if
// chunk_stage_preds is not empty, otherwise, produce only one scenario of each chunk stage
let mut delete_times = vec![DeleteTime::begin_for(chunk_stage)];
if !chunk_stage_preds.is_empty() {
delete_times = DeleteTime::all_from_and_before(chunk_stage)
@ -325,9 +333,9 @@ pub async fn all_scenarios_for_one_chunk(
scenarios
}
/// Build a chunk that may move with life cycle before/after deletes
/// Note that the only chunk in this function can be moved to different stages and delete predicates
/// can be applied at different stages when the chunk is moved.
/// Build a chunk that may move with lifecycle before/after deletes. Note that the only chunk in
/// this function can be moved to different stages, and delete predicates can be applied at
/// different stages when the chunk is moved.
async fn make_chunk_with_deletes_at_different_stages(
lp_lines: Vec<&str>,
chunk_stage: ChunkStage,
@ -350,12 +358,7 @@ async fn make_chunk_with_deletes_at_different_stages(
DbScenario { scenario_name, db }
}
/// This function loads two chunks of lp data into 4 different scenarios
///
/// Data in single open mutable buffer chunk
/// Data in one open mutable buffer chunk, one closed mutable chunk
/// Data in one open mutable buffer chunk, one read buffer chunk
/// Data in one two read buffer chunks,
/// Load two chunks of lp data into different chunk scenarios.
pub async fn make_two_chunk_scenarios(
partition_key: &str,
data1: &str,
@ -480,7 +483,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
panic!("Cannot have delete time '{other}' for ingester chunk")
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -507,7 +513,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
.await;
mock_ingester.buffer_operation(op).await;
// tombstones are created immediately, need to remember their ID to handle deletion later
// tombstones are created immediately, need to remember their ID to
// handle deletion later
let mut tombstone_id = None;
for id in mock_ingester.tombstone_ids(delete_table_name).await {
if !ids_pre.contains(&id) {
@ -521,7 +528,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
// will be attached AFTER the chunk was created
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -568,7 +578,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
mock_ingester.buffer_operation(op).await;
}
DeleteTime::Begin | DeleteTime::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
unreachable!(
"Begin/end cases should have been replaced \
with concrete instances at this point"
)
}
}
}
@ -600,8 +613,8 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
/// Ingester that can be controlled specifically for query tests.
///
/// This uses as much ingester code as possible but allows more direct control over aspects like lifecycle and
/// partioning.
/// This uses as much ingester code as possible but allows more direct control over aspects like
/// lifecycle and partioning.
#[derive(Debug)]
struct MockIngester {
/// Test catalog state.
@ -618,9 +631,10 @@ struct MockIngester {
/// Memory of partition keys for certain sequence numbers.
///
/// This is currently required because [`DmlWrite`] does not carry partiion information so we need to do that. In
/// production this is not required because the router and the ingester use the same partition logic, but we need
/// direct control over the partion key for the query tests.
/// This is currently required because [`DmlWrite`] does not carry partiion information so we
/// need to do that. In production this is not required because the router and the ingester use
/// the same partition logic, but we need direct control over the partion key for the query
/// tests.
partition_keys: HashMap<SequenceNumber, String>,
/// Ingester state.
@ -671,7 +685,8 @@ impl MockIngester {
///
/// This will never persist.
///
/// Takes `&self mut` because our partioning implementation does not work with concurrent access.
/// Takes `&self mut` because our partioning implementation does not work with concurrent
/// access.
async fn buffer_operation(&mut self, dml_operation: DmlOperation) {
let lifecycle_handle = NoopLifecycleHandle {};
@ -828,7 +843,8 @@ impl MockIngester {
/// Finalizes the ingester and creates a querier namespace that can be used for query tests.
///
/// The querier namespace will hold a simulated connection to the ingester to be able to query unpersisted data.
/// The querier namespace will hold a simulated connection to the ingester to be able to query
/// unpersisted data.
async fn into_query_namespace(self) -> Arc<QuerierNamespace> {
let mut repos = self.catalog.catalog.repositories().await;
let schema = Arc::new(
@ -912,8 +928,8 @@ impl IngesterFlightClient for MockIngester {
_ingester_address: Arc<str>,
request: IngesterQueryRequest,
) -> Result<Box<dyn IngesterFlightClientQueryData>, IngesterFlightClientError> {
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior (e.g. passing predicates
// of wrong types)
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior
// (e.g. passing predicates of wrong types)
let response = prepare_data_to_querier(&self.ingester_data, &request)
.await
.map_err(|e| IngesterFlightClientError::Flight {
@ -943,8 +959,8 @@ impl IngesterFlightClient for MockIngester {
}
}
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a [`IngesterFlightClientQueryData`]
/// (used by the querier) without doing any real gRPC IO.
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a
/// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO.
#[derive(Debug)]
struct QueryDataAdapter {
response: IngesterQueryResponse,

View File

@ -71,7 +71,7 @@ async fn run_table_schema_test_case<D>(
}
fn is_unsorted_chunk_type(chunk: &dyn QueryChunk) -> bool {
(chunk.chunk_type() == "MUB") || (chunk.chunk_type() == "IngesterPartition")
chunk.chunk_type() == "IngesterPartition"
}
#[tokio::test]