Merge branch 'main' into alamb/remove_old_algorithm

pull/24376/head
Joe-Blount 2023-02-21 09:02:35 -06:00 committed by GitHub
commit 88d2882350
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 553 additions and 200 deletions

5
Cargo.lock generated
View File

@ -2280,9 +2280,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.8"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
dependencies = [
"bytes",
"fnv",
@ -2516,6 +2516,7 @@ dependencies = [
"console-subscriber",
"data_types",
"datafusion",
"dirs",
"dotenvy",
"flate2",
"futures",

View File

@ -151,6 +151,13 @@ To compile for development, run:
cargo build
```
To compile for release and install the `influxdb_iox` binary in your path (so you can run `influxdb_iox` directly) do:
```shell
# from within the main `influxdb_iox` checkout
cargo install --path influxdb_iox
```
This creates a binary at `target/debug/influxdb_iox`.
### Build a Docker image (optional)
@ -171,30 +178,32 @@ DOCKER_BUILDKIT=1 docker build .
[Enable BuildKit]: https://docs.docker.com/develop/develop-images/build_enhancements/#to-enable-buildkit-builds
#### Ephemeral mode
#### Local filesystem testing mode
To start InfluxDB IOx and store data in memory, after you've compiled for development, run:
InfluxDB IOx supports testing entirely backed by the local filesystem.
> **Note**
>
> This mode should NOT be used for production systems: it will have poor performance and limited tuning knobs are available.
To run IOx in local testing mode, use:
```shell
./target/debug/influxdb_iox
# shorthand for
./target/debug/influxdb_iox run all-in-one
```
By default this runs an "all-in-one" server with HTTP server on port `8080`, router gRPC server on port `8081` and querier gRPC server on port `8082`. When the server is stopped all data lost.
This will start an "all-in-one" IOx server with the following configuration:
1. File backed catalog (sqlite), object store, and write ahead log (wal) stored under `<HOMEDIR>/.influxdb_iox`
2. HTTP `v2` api server on port `8080`, querier gRPC server on port `8082` and several ports for other internal services.
#### Local persistence mode
To start InfluxDB IOx and store the catalog in Postgres and data in the local filesystem to persist
data across restarts, after you've compiled for development, run:
You can also change the configuration in limited ways, such as choosing a different data directory:
```shell
./target/debug/influxdb_iox run all-in-one --catalog-dsn postgres:///iox_shared --data-dir=~/iox_data
./target/debug/influxdb_iox run all-in-one --data-dir=/tmp/iox_data
```
where `--catalog-dsn` is a connection URL to the Postgres database you wish to use, and
`--data-dir` is the directory you wish to use.
Note that when the server is stopped all data that has not yet been written to parquet files will be lost.
#### Compile and run

View File

@ -9,7 +9,7 @@ license.workspace = true
clap = { version = "4", features = ["derive", "env"] }
data_types = { path = "../data_types" }
futures = "0.3"
http = "0.2.8"
http = "0.2.9"
humantime = "2.1.0"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }

View File

@ -60,7 +60,13 @@ pub struct CatalogDsnConfig {
)]
pub(crate) catalog_type_: CatalogType,
/// Postgres connection string. Required if catalog is set to postgres.
/// Catalog connection string. Required if catalog is set to postgres.
///
/// The dsn is interpreted based on the type of catalog used.
///
/// PostgreSQL: `postgresql://postgres@localhost:5432/postgres`
///
/// Sqlite (a local filename): '/tmp/foo.sqlite'
#[clap(long = "catalog-dsn", env = "INFLUXDB_IOX_CATALOG_DSN", action)]
pub dsn: Option<String>,
@ -117,13 +123,22 @@ pub struct CatalogDsnConfig {
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
pub enum CatalogType {
/// PostgreSQL.
///
/// Example dsn: `"postgresql://postgres@localhost:5432/postgres"`
#[default]
Postgres,
/// In-memory.
///
/// Example dsn: None
Memory,
/// SQLite.
///
/// The dsn is a path to local file.
///
/// Example dsn: `"/tmp/foo.sqlite"`
///
Sqlite,
}
@ -139,7 +154,8 @@ impl CatalogDsnConfig {
}
}
/// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified
/// Create a new Postgres instance for all-in-one mode if a
/// catalog DSN is specified
pub fn new_postgres(dsn: String, postgres_schema_name: String) -> Self {
info!("Catalog: Postgres at `{}`", dsn);
@ -154,8 +170,9 @@ impl CatalogDsnConfig {
}
}
/// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified
pub fn new_sqlite(dsn: String) -> Self {
/// Create a new Sqlite instance for all-in-one mode
pub fn new_sqlite(dsn: impl Into<String>) -> Self {
let dsn = dsn.into();
info!("Catalog: SQLite at `{}`", dsn);
Self {

View File

@ -7,7 +7,9 @@ use std::num::NonZeroUsize;
pub struct Compactor2Config {
/// Number of partitions that should be compacted in parallel.
///
/// This should usually be larger than the compaction job concurrency since one partition can spawn multiple compaction jobs.
/// This should usually be larger than the compaction job
/// concurrency since one partition can spawn multiple compaction
/// jobs.
#[clap(
long = "compaction-partition-concurrency",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_CONCURRENCY",
@ -18,7 +20,8 @@ pub struct Compactor2Config {
/// Number of concurrent compaction jobs.
///
/// This should usually be smaller than the partition concurrency since one partition can spawn multiple compaction jobs.
/// This should usually be smaller than the partition concurrency
/// since one partition can spawn multiple compaction jobs.
#[clap(
long = "compaction-job-concurrency",
env = "INFLUXDB_IOX_COMPACTION_JOB_CONCURRENCY",
@ -27,7 +30,8 @@ pub struct Compactor2Config {
)]
pub compaction_job_concurrency: NonZeroUsize,
/// Number of jobs PER PARTITION that move files in and out of the scratchpad.
/// Number of jobs PER PARTITION that move files in and out of the
/// scratchpad.
#[clap(
long = "compaction-partition-scratchpad-concurrency",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_SCRATCHPAD_CONCURRENCY",
@ -36,7 +40,8 @@ pub struct Compactor2Config {
)]
pub compaction_partition_scratchpad_concurrency: NonZeroUsize,
/// Partitions with recent created files these last minutes are selected for compaction.
/// The compactor will only consider compacting partitions that
/// have new parquet files created within this many minutes.
#[clap(
long = "compaction_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD",
@ -45,19 +50,22 @@ pub struct Compactor2Config {
)]
pub compaction_partition_minute_threshold: u64,
/// Number of threads to use for the compactor query execution, compaction and persistence.
/// Number of threads to use for the compactor query execution,
/// compaction and persistence.
/// If not specified, defaults to one less than the number of cores on the system
#[clap(
long = "query-exec-thread-count",
env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT",
default_value = "4",
action
)]
pub query_exec_thread_count: usize,
pub query_exec_thread_count: Option<usize>,
/// Size of memory pool used during query exec, in bytes.
/// Size of memory pool used during compaction plan execution, in
/// bytes.
///
/// If queries attempt to allocate more than this many bytes
/// during execution, they will error with "ResourcesExhausted".
/// If compaction plans attempt to allocate more than this many
/// bytes during execution, they will error with
/// "ResourcesExhausted".
#[clap(
long = "exec-mem-pool-bytes",
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
@ -67,7 +75,8 @@ pub struct Compactor2Config {
pub exec_mem_pool_bytes: usize,
/// Desired max size of compacted parquet files.
/// It is a target desired value, rather than a guarantee.
///
/// Note this is a target desired value, rather than a guarantee.
/// 1024 * 1024 * 100 = 104,857,600
#[clap(
long = "compaction-max-desired-size-bytes",
@ -78,6 +87,7 @@ pub struct Compactor2Config {
pub max_desired_file_size_bytes: u64,
/// Percentage of desired max file size.
///
/// If the estimated compacted result is too small, no need to split it.
/// This percentage is to determine how small it is:
/// < percentage_max_file_size * max_desired_file_size_bytes:
@ -149,7 +159,14 @@ pub struct Compactor2Config {
)]
pub ignore_partition_skip_marker: bool,
/// Maximum number of files in a compaction plan
/// Maximum number of files that the compactor will try and
/// compact in a single plan.
///
/// The higher this setting is the fewer compactor plans are run
/// and thus fewer resources over time are consumed by the
/// compactor. Increasing this setting also increases the peak
/// memory used for each compaction plan, and thus if it is set
/// too high, the compactor plans may exceed available memory.
#[clap(
long = "compaction-max-num-files-per-plan",
env = "INFLUXDB_IOX_COMPACTION_MAX_NUM_FILES_PER_PLAN",
@ -158,8 +175,17 @@ pub struct Compactor2Config {
)]
pub max_num_files_per_plan: usize,
/// Maximum input bytes (in parquet) per partition. If there is more data, we ignore the partition (for now) as a
/// self-protection mechanism.
/// Maximum input bytes (in parquet) per partition that the
/// compactor will attempt to compact in any one plan.
///
/// In the worst case, if the sum of the sizes of all parquet
/// files in a partition is greater than this value, the compactor
/// may not try to compact this partition. Under normal operation,
/// the compactor compacts a subset of files in a partition but in
/// some cases it may need to compact them all.
///
/// This setting is a self protection mechanism, and it is
/// expected to be removed in future versions
#[clap(
long = "compaction-max-input-parquet-bytes-per-partition",
env = "INFLUXDB_IOX_COMPACTION_MAX_INPUT_PARQUET_BYTES_PER_PARTITION",
@ -190,7 +216,15 @@ pub struct Compactor2Config {
)]
pub shard_id: Option<usize>,
/// Minimum number of L1 files to comapct to L2
/// Minimum number of L1 files to compact to L2.
///
/// If there are more than this many L1 (by definition non
/// overlapping) files in a partition, the compactor will compact
/// them together into one or more larger L2 files.
///
/// Setting this value higher in general results in fewer overall
/// resources spent on compaction but more files per partition (and
/// thus less optimal compression and query performance).
#[clap(
long = "compaction-min-num-l1-files-to-compact",
env = "INFLUXDB_IOX_COMPACTION_MIN_NUM_L1_FILES_TO_COMPACT",
@ -200,6 +234,10 @@ pub struct Compactor2Config {
pub min_num_l1_files_to_compact: usize,
/// Only process all discovered partitions once.
///
/// By default the compactor will continuously loop over all
/// partitions looking for work. Setting this option results in
/// exiting the loop after the one iteration.
#[clap(
long = "compaction-process-once",
env = "INFLUXDB_IOX_COMPACTION_PROCESS_ONCE",
@ -207,7 +245,8 @@ pub struct Compactor2Config {
)]
pub process_once: bool,
/// Compact all partitions found in the catalog, no matter if/when the received writes.
/// Compact all partitions found in the catalog, no matter if/when
/// they received writes.
#[clap(
long = "compaction-process-all-partitions",
env = "INFLUXDB_IOX_COMPACTION_PROCESS_ALL_PARTITIONS",
@ -215,8 +254,11 @@ pub struct Compactor2Config {
)]
pub process_all_partitions: bool,
/// Maximum number of columns in the table of a partition that will be able to considered
/// to get compacted
/// Maximum number of columns in a table of a partition that
/// will be able to considered to get compacted
///
/// If a table has more than this many columns, the compactor will
/// not compact it, to avoid large memory use.
#[clap(
long = "compaction-max-num-columns-per-table",
env = "INFLUXDB_IOX_COMPACTION_MAX_NUM_COLUMNS_PER_TABLE",

View File

@ -7,7 +7,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
http = "0.2.8"
http = "0.2.9"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
thiserror = "1.0.38"
tonic = { version = "0.8" }

View File

@ -119,6 +119,10 @@ async fn all_overlapping_l0() {
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L1 "
- "L1.0[100,180] 72mb |-----------------------------L1.0-----------------------------| "
- "L1.0[180,200] 18mb |-----L1.0-----|"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| "
@ -174,6 +178,10 @@ async fn all_non_overlapping_l0() {
- "L0.3[200,201] |L0.3| "
- "L0.2[100,101] |L0.2| "
- "L0.1[0,1] |L0.1| "
- "**** 2 Output Files (parquet_file_id not yet assigned), 100mb total:"
- "L1 "
- "L1.0[0,720] 79.91mb |----------------------------L1.0-----------------------------| "
- "L1.0[720,901] 20.09mb |-----L1.0-----| "
- "**** Final Output Files "
- "L1 "
- "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| "
@ -243,6 +251,9 @@ async fn l1_with_overlapping_l0() {
- "L0.3[140,190] 5kb |------L0.3-------| "
- "L1 "
- "L1.2[100,150] 10mb |------L1.2-------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:"
- "L1, all files 10.02mb "
- "L1.0[100,310] |-------------------------------------L1.0-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.1[50,100] 10mb |----L1.1-----| "
@ -308,6 +319,9 @@ async fn l1_with_non_overlapping_l0() {
- "L0.5[400,450] |-----L0.5-----| "
- "L0.4[350,400] |-----L0.4-----| "
- "L0.3[300,350] |-----L0.3-----| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:"
- "L1, all files 25kb "
- "L1.0[300,550] |-------------------------------------L1.0-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.1[50,100] 10mb |-L1.1-| "
@ -372,6 +386,9 @@ async fn l1_with_non_overlapping_l0_larger() {
- "L0.7[400,450] |----------L0.7----------| "
- "L0.6[350,400] |----------L0.6----------| "
- "L0.5[300,350] |----------L0.5----------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[300,450] |-------------------------------------L1.0-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:"
- "L1 "
- "L1.4[200,250] 3mb |--L1.4--| "
@ -379,6 +396,10 @@ async fn l1_with_non_overlapping_l0_larger() {
- "L1.2[100,150] 50mb |--L1.2--| "
- "L1.1[50,100] 20mb |--L1.1--| "
- "L1.8[300,450] 15mb |------------L1.8------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 108mb total:"
- "L2 "
- "L2.0[50,370] 86.4mb |-----------------------------L2.0-----------------------------| "
- "L2.0[370,450] 21.6mb |-----L2.0-----|"
- "**** Final Output Files "
- "L2 "
- "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| "
@ -451,6 +472,9 @@ async fn l1_too_much_with_non_overlapping_l0() {
- "L0.13[600,650] |------------------------------------L0.13-------------------------------------|"
- "L0.12[600,650] |------------------------------------L0.12-------------------------------------|"
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
- "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456"
- "**** Final Output Files "
- "L1 "
@ -532,6 +556,9 @@ async fn many_l1_with_non_overlapping_l0() {
- "L0.13[600,650] |------------------------------------L0.13-------------------------------------|"
- "L0.12[600,650] |------------------------------------L0.12-------------------------------------|"
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:"
- "L1 "
- "L1.10[500,550] 7mb |L1.10| "
@ -545,6 +572,10 @@ async fn many_l1_with_non_overlapping_l0() {
- "L1.2[100,150] 8mb |L1.2| "
- "L1.1[50,100] 9mb |L1.1| "
- "L1.14[600,650] 15mb |L1.14|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 88mb total:"
- "L2 "
- "L2.0[50,530] 70.4mb |-----------------------------L2.0-----------------------------| "
- "L2.0[530,650] 17.6mb |-----L2.0-----|"
- "**** Final Output Files "
- "L2 "
- "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| "
@ -608,11 +639,18 @@ async fn large_l1_with_non_overlapping_l0() {
- "L0.5[600,650] |-------------------------------------L0.5-------------------------------------|"
- "L0.4[600,650] |-------------------------------------L0.4-------------------------------------|"
- "L0.3[600,650] |-------------------------------------L0.3-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:"
- "L1 "
- "L1.2[100,150] 80mb |L1.2| "
- "L1.1[50,100] 90mb |L1.1| "
- "L1.6[600,650] 15mb |L1.6| "
- "**** 2 Output Files (parquet_file_id not yet assigned), 185mb total:"
- "L2 "
- "L2.0[50,375] 100.21mb|------------------L2.0-------------------| "
- "L2.0[375,650] 84.79mb |---------------L2.0---------------| "
- "**** Final Output Files "
- "L2 "
- "L2.7[50,375] 100.21mb|------------------L2.7-------------------| "
@ -688,6 +726,9 @@ async fn many_l1_files() {
- "L0.23[24,25] |------------------------------------L0.23-------------------------------------|"
- "L0.22[24,25] |------------------------------------L0.22-------------------------------------|"
- "L0.21[24,25] |------------------------------------L0.21-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:"
- "L1, all files 3mb "
- "L1.0[24,25] |-------------------------------------L1.0-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:"
- "L1 "
- "L1.20[19,20] 10mb |L1.20| "
@ -711,6 +752,10 @@ async fn many_l1_files() {
- "L1.2[1,2] 10mb |L1.2| "
- "L1.1[0,1] 10mb |L1.1| "
- "L1.24[24,25] 3mb |L1.24|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 203mb total:"
- "L2 "
- "L2.0[0,13] 105.56mb |-----------------L2.0------------------| "
- "L2.0[13,25] 97.44mb |----------------L2.0----------------| "
- "**** Final Output Files "
- "L2 "
- "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| "
@ -1244,6 +1289,9 @@ async fn many_tiny_l0_files() {
- "L0.198[197,198] |L0.198|"
- "L0.199[198,199] |L0.199|"
- "L0.200[199,200] |L0.200|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|"
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
- "L0, all files 7kb "
- "L0.201[200,201] |L0.201| "
@ -1334,10 +1382,16 @@ async fn many_tiny_l0_files() {
- "L0.286[285,286] |L0.286|"
- "L0.287[286,287] |L0.287|"
- "L0.288[287,288] |L0.288|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
- "L0, all files 616kb "
- "L0.0[200,288] |-------------------------------------L0.0-------------------------------------|"
- "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:"
- "L0 "
- "L0.290[200,288] 616kb |--------L0.290--------| "
- "L0.289[0,200] 1.37mb|-----------------------L0.289------------------------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:"
- "L1, all files 1.97mb "
- "L1.0[0,288] |-------------------------------------L1.0-------------------------------------|"
- "**** Final Output Files "
- "L1, all files 1.97mb "
- "L1.291[0,288] |------------------------------------L1.291------------------------------------|"
@ -1990,6 +2044,9 @@ async fn over_two_times_max_files_per_plan() {
- "L0.198[197,198] |L0.198|"
- "L0.199[198,199] |L0.199|"
- "L0.200[199,200] |L0.200|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|"
- "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:"
- "L0, all files 7kb "
- "L0.201[200,201] |L0.201| "
@ -2192,6 +2249,9 @@ async fn over_two_times_max_files_per_plan() {
- "L0.398[397,398] |L0.398|"
- "L0.399[398,399] |L0.399|"
- "L0.400[399,400] |L0.400|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.0[200,400] |-------------------------------------L0.0-------------------------------------|"
- "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:"
- "L0, all files 7kb "
- "L0.401[400,401] |L0.401| "
@ -2204,11 +2264,17 @@ async fn over_two_times_max_files_per_plan() {
- "L0.408[407,408] |L0.408| "
- "L0.409[408,409] |L0.409| "
- "L0.410[409,410] |L0.410|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:"
- "L0, all files 70kb "
- "L0.0[400,410] |-------------------------------------L0.0-------------------------------------|"
- "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:"
- "L0 "
- "L0.413[400,410] 70kb |L0.413|"
- "L0.412[200,400] 1.37mb |---------------L0.412----------------| "
- "L0.411[0,200] 1.37mb|---------------L0.411----------------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:"
- "L1, all files 2.8mb "
- "L1.0[0,410] |-------------------------------------L1.0-------------------------------------|"
- "**** Final Output Files "
- "L1, all files 2.8mb "
- "L1.414[0,410] |------------------------------------L1.414------------------------------------|"
@ -2741,6 +2807,9 @@ async fn many_tiny_l1_files() {
- "L1.198[197,198] |L1.198|"
- "L1.199[198,199] |L1.199|"
- "L1.200[199,200] |L1.200|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L1, all files 1.37mb "
- "L1.0[0,200] |-------------------------------------L1.0-------------------------------------|"
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
- "L1, all files 7kb "
- "L1.201[200,201] |L1.201| "
@ -2831,6 +2900,9 @@ async fn many_tiny_l1_files() {
- "L1.286[285,286] |L1.286|"
- "L1.287[286,287] |L1.287|"
- "L1.288[287,288] |L1.288|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
- "L1, all files 616kb "
- "L1.0[200,288] |-------------------------------------L1.0-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| "
@ -3297,6 +3369,9 @@ async fn many_l0_and_overlapped_l1_files() {
- "L0.188[187,188] |L0.188|"
- "L0.189[188,189] |L0.189|"
- "L0.190[189,190] |L0.190|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:"
- "L0, all files 1.3mb "
- "L0.0[0,190] |-------------------------------------L0.0-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:"
- "L0 "
- "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| "
@ -3321,6 +3396,10 @@ async fn many_l0_and_overlapped_l1_files() {
- "L1.207[160,169] 1mb |L1.207| "
- "L1.208[170,179] 1mb |L1.208| "
- "L1.209[180,189] 1mb |L1.209|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:"
- "L1 "
- "L1.0[0,159] 17.02mb |----------------------------L1.0-----------------------------| "
- "L1.0[159,199] 4.28mb |-----L1.0-----| "
- "**** Final Output Files "
- "L1 "
- "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| "
@ -3808,6 +3887,9 @@ async fn not_many_l0_and_overlapped_l1_files() {
- "L1.192[10,19] 1mb |L1.192| "
- "L1.193[20,29] 1mb |L1.193| "
- "L1.194[30,39] 1mb |L1.194| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:"
- "L1, all files 6.3mb "
- "L1.0[0,190] |-------------------------------------L1.0-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:"
- "L1 "
- "L1.196[200,209] 1mb |L1.196| "
@ -3826,6 +3908,10 @@ async fn not_many_l0_and_overlapped_l1_files() {
- "L1.209[2800,2809] 1mb |L1.209|"
- "L1.210[3000,3009] 1mb |L1.210|"
- "L1.211[0,190] 6.3mb |L1.211| "
- "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:"
- "L2 "
- "L2.0[0,2407] 17.04mb|----------------------------L2.0-----------------------------| "
- "L2.0[2407,3009] 4.26mb |-----L2.0-----| "
- "**** Final Output Files "
- "L2 "
- "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| "

View File

@ -37,7 +37,7 @@ const DEFAULT_HEADING_WIDTH: usize = 20;
/// parquet files arranged so they are lined up horizontally based on
/// their relative time range.
///
/// See docs on [`ParquetFileFormatter`]z for examples.
/// See docs on [`ParquetFileFormatter`] for examples.
fn readable_list_of_files<'a>(
title: Option<String>,
files: impl IntoIterator<Item = &'a ParquetFile>,

View File

@ -5,7 +5,8 @@ use std::{
use async_trait::async_trait;
use data_types::{
ColumnSet, CompactionLevel, ParquetFile, ParquetFileParams, SequenceNumber, ShardId, Timestamp,
ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber,
ShardId, Timestamp,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_time::Time;
@ -69,24 +70,50 @@ impl ParquetFileSimulator {
runs.into_iter()
.enumerate()
.flat_map(|(i, run)| {
let total_input_size: i64 = run
.input_parquet_files
.iter()
.map(|f| f.file_size_bytes)
.sum();
let title = format!(
let SimulatedRun {
plan_type,
input_parquet_files,
output_params,
} = run;
let input_title = format!(
"**** Simulation run {}, type={}. {} Input Files, {} total:",
i,
run.plan_type,
run.input_parquet_files.len(),
display_size(total_input_size)
plan_type,
input_parquet_files.len(),
display_size(total_size(input_parquet_files.iter()))
);
format_files(title, &run.input_parquet_files)
// display the files created by this run
let output_parquet_files: Vec<_> = output_params
.into_iter()
.map(|params| {
// Use file id 0 as they haven't been
// assigned an id in the catalog yet
ParquetFile::from_params(params, ParquetFileId::new(0))
})
.collect();
let output_title = format!(
"**** {} Output Files (parquet_file_id not yet assigned), {} total:",
output_parquet_files.len(),
display_size(total_size(output_parquet_files.iter()))
);
// hook up inputs and outputs
format_files(input_title, &input_parquet_files)
.into_iter()
.chain(format_files(output_title, &output_parquet_files).into_iter())
})
.collect()
}
}
/// return the total file size of all the parquet files
fn total_size<'a>(parquet_files: impl IntoIterator<Item = &'a ParquetFile>) -> i64 {
parquet_files.into_iter().map(|f| f.file_size_bytes).sum()
}
#[async_trait]
impl ParquetFilesSink for ParquetFileSimulator {
async fn stream_into_file_sink(
@ -134,20 +161,21 @@ impl ParquetFilesSink for ParquetFileSimulator {
let partition_info = partition_info.as_ref();
// Compute final output
let output: Vec<_> = output_files
let output_params: Vec<_> = output_files
.into_iter()
.map(|f| {
f.into_parquet_file_params(max_l0_created_at, column_set.clone(), partition_info)
})
.collect();
// record what we did
// record what the simulator did
self.runs.lock().unwrap().push(SimulatedRun {
plan_type,
input_parquet_files,
output_params: output_params.clone(),
});
Ok(output)
Ok(output_params)
}
fn as_any(&self) -> &dyn std::any::Any {
@ -229,6 +257,7 @@ pub struct SimulatedRun {
// fields are used in testing
plan_type: String,
input_parquet_files: Vec<ParquetFile>,
output_params: Vec<ParquetFileParams>,
}
fn overall_column_set<'a>(files: impl IntoIterator<Item = &'a ParquetFile>) -> ColumnSet {

View File

@ -53,12 +53,13 @@ bytes = "1.4"
clap = { version = "4", features = ["derive", "env"] }
comfy-table = { version = "6.1", default-features = false }
console-subscriber = { version = "0.1.8", optional = true, features = ["parking_lot"] }
dirs = "4.0.0"
dotenvy = "0.15.6"
futures = "0.3"
futures-util = { version = "0.3" }
flate2 = "1.0"
hashbrown = { workspace = true }
http = "0.2.8"
http = "0.2.9"
humantime = "2.1.0"
itertools = "0.10.5"
libc = { version = "0.2" }

View File

@ -27,16 +27,25 @@ use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs};
use ioxd_router::create_router2_server_type;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use once_cell::sync::Lazy;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{
collections::HashMap, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
collections::HashMap,
num::NonZeroUsize,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
time::Duration,
};
use tempfile::TempDir;
use thiserror::Error;
use trace_exporters::TracingConfig;
use trogging::cli::LoggingConfig;
/// The default name of the influxdb_iox data directory
pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb_iox";
/// The default name of the catalog file
pub const DEFAULT_CATALOG_FILENAME: &str = "catalog.sqlite";
/// The default bind address for the Router HTTP API.
pub const DEFAULT_ROUTER_HTTP_BIND_ADDR: &str = "127.0.0.1:8080";
@ -87,18 +96,18 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Use a static so the directory lasts the entire process
static DEFAULT_WAL_DIRECTORY: Lazy<TempDir> =
Lazy::new(|| TempDir::new().expect("Could not create a temporary directory for WAL"));
/// The intention is to keep the number of options on this Config
/// object as small as possible. For more complex configurations and
/// deployments the individual services (e.g. Router and Compactor)
/// should be instantiated and configured individually.
/// object as small as possible. All in one mode is designed for ease
/// of use and testing.
///
/// For production deployments, IOx is designed to run as multiple
/// individual services, and thus knobs for tuning performance are
/// found on those individual services.
///
/// This creates the following four services, configured to talk to a
/// common catalog, objectstore and write buffer.
///
/// ```text
/// ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
/// │ Router │ │ Ingester │ │ Querier │ │ Compactor │
/// └─────────────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
@ -116,11 +125,11 @@ static DEFAULT_WAL_DIRECTORY: Lazy<TempDir> =
/// ( ) ( ) ( )
/// `─────' `─────' `─────'
///
/// Existing Object Store kafka / redpanda
/// Postgres (file, mem, or Object Store
/// (started by pre-existing) (file, mem, or
/// user) configured with pre-existing)
/// --object-store configured with
/// Catalog Object Store Write Ahead Log (WAL)
/// (sqllite, mem, (file, mem, or Object Store
/// or pre-existing) pre-existing) (file, mem, or
/// postgres) pre-existing)
/// ```
///
/// Ideally all the gRPC services would listen on the same port, but
/// due to challenges with tonic this effort was postponed. See
@ -128,8 +137,8 @@ static DEFAULT_WAL_DIRECTORY: Lazy<TempDir> =
/// for more details
///
/// Currently, the starts services on 5 ports, designed so that the
/// ports used to interact with the old architecture are the same as
/// the new architecture (8081 write endpoint and query on 8082).
/// ports are the same as the CLI default (8080 http v2 API endpoint and
/// querier on 8082).
///
/// Router;
/// 8080 http (overrides INFLUXDB_IOX_BIND_ADDR)
@ -185,6 +194,12 @@ pub struct Config {
#[clap(flatten)]
catalog_dsn: CatalogDsnConfig,
/// The directory to store the write ahead log
///
/// If not specified, defaults to INFLUXDB_IOX_DB_DIR/wal
#[clap(long = "wal-directory", env = "INFLUXDB_IOX_WAL_DIRECTORY", action)]
pub wal_directory: Option<PathBuf>,
/// The number of seconds between WAL file rotations.
#[clap(
long = "wal-rotation-period-seconds",
@ -213,6 +228,7 @@ pub struct Config {
)]
pub persist_max_parallelism: usize,
// TODO - remove these ingester tuning knobs from all in one mode??
/// The maximum number of persist tasks that can be queued at any one time.
///
/// Once this limit is reached, ingest is blocked until the persist backlog
@ -322,13 +338,15 @@ pub struct Config {
}
impl Config {
/// Get a specialized run config to use for each service
/// Convert the all-in-one mode configuration to a specialized
/// configuration for each individual IOx service
fn specialize(self) -> SpecializedConfig {
let Self {
logging_config,
tracing_config,
max_http_request_size,
object_store_config,
wal_directory,
catalog_dsn,
wal_rotation_period_seconds,
concurrent_query_limit,
@ -346,26 +364,56 @@ impl Config {
exec_mem_pool_bytes,
} = self;
let database_directory = object_store_config.database_directory.clone();
// If dir location is provided and no object store type is provided, use it also as file object store.
let object_store_config = {
if object_store_config.object_store.is_none() && database_directory.is_some() {
ObjectStoreConfig::new(database_directory.clone())
} else {
object_store_config
}
// Determine where to store files (wal and possibly catalog
// and object store)
let database_directory = object_store_config
.database_directory
.clone()
.unwrap_or_else(|| {
dirs::home_dir()
.expect("No data-dir specified but could not find user's home directory")
.join(DEFAULT_DATA_DIRECTORY_NAME)
});
ensure_directory_exists(&database_directory);
// if we have an explicit object store configuration, use
// that, otherwise default to file based object store in database directory/object_store
let object_store_config = if object_store_config.object_store.is_some() {
object_store_config
} else {
let object_store_directory = database_directory.join("object_store");
debug!(
?object_store_directory,
"No database directory, using default location for object store"
);
ensure_directory_exists(&object_store_directory);
ObjectStoreConfig::new(Some(object_store_directory))
};
// if data directory is specified, default to data-dir/wal
// otherwise use a per-process temporary directory
let wal_directory = database_directory
.map(|database_directory| database_directory.join("wal"))
.unwrap_or_else(|| PathBuf::from(DEFAULT_WAL_DIRECTORY.path()));
let wal_directory = wal_directory.clone().unwrap_or_else(|| {
debug!(
?wal_directory,
"No wal_directory specified, using defaul location for wal"
);
database_directory.join("wal")
});
ensure_directory_exists(&wal_directory);
let catalog_dsn = if catalog_dsn.dsn.is_none() {
CatalogDsnConfig::new_memory()
} else {
let catalog_dsn = if catalog_dsn.dsn.is_some() {
catalog_dsn
} else {
let local_catalog_path = database_directory
.join(DEFAULT_CATALOG_FILENAME)
.to_string_lossy()
.to_string();
debug!(
?local_catalog_path,
"No catalog dsn specified, using default sqlite catalog"
);
CatalogDsnConfig::new_sqlite(local_catalog_path)
};
let router_run_config = RunConfig::new(
@ -420,7 +468,7 @@ impl Config {
compaction_job_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_minute_threshold: 10,
query_exec_thread_count: 1,
query_exec_thread_count: Some(1),
exec_mem_pool_bytes,
max_desired_file_size_bytes: 30_000,
percentage_max_file_size: 30,
@ -467,6 +515,16 @@ impl Config {
}
}
/// If `p` does not exist, try to create it as a directory.
///
/// panic's if the directory does not exist and can not be created
fn ensure_directory_exists(p: &Path) {
if !p.exists() {
println!("Creating directory {p:?}");
std::fs::create_dir_all(p).expect("Could not create default directory");
}
}
/// Different run configs for the different services (needed as they
/// listen on different ports)
struct SpecializedConfig {

View File

@ -106,9 +106,15 @@ pub async fn command(config: Config) -> Result<(), Error> {
StorageId::from("iox_scratchpad"),
);
let num_threads = config
.compactor_config
.query_exec_thread_count
.unwrap_or_else(|| num_cpus::get() - 1_usize);
info!(%num_threads, "using specified number of threads");
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads: config.compactor_config.query_exec_thread_count,
target_query_partitions: config.compactor_config.query_exec_thread_count,
num_threads,
target_query_partitions: num_threads,
object_stores: [&parquet_store_real, &parquet_store_scratchpad]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))

View File

@ -1,6 +1,7 @@
pub(crate) mod request;
pub(crate) mod response;
use crate::commands::storage::response::{BinaryTagSchema, TextTagSchema};
use generated_types::{
aggregate::AggregateType, influxdata::platform::storage::read_group_request::Group, Predicate,
};
@ -197,7 +198,7 @@ struct ReadGroup {
)]
group: Group,
#[clap(long, action)]
#[clap(long, action, value_delimiter = ',')]
group_keys: Vec<String>,
}
@ -303,7 +304,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
info!(?request, "read_filter");
let result = client.read_filter(request).await.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Pretty => response::pretty_print_frames::<BinaryTagSchema>(&result)
.context(ResponseSnafu)?,
Format::Quiet => {}
}
}
@ -320,7 +322,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
info!(?request, "read_group");
let result = client.read_group(request).await.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Pretty => response::pretty_print_frames::<TextTagSchema>(&result)
.context(ResponseSnafu)?,
Format::Quiet => {}
}
}
@ -343,7 +346,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Pretty => response::pretty_print_frames::<TextTagSchema>(&result)
.context(ResponseSnafu)?,
Format::Quiet => {}
}
}

View File

@ -6,7 +6,7 @@ use snafu::Snafu;
use self::generated_types::*;
use super::response::{
tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN,
FIELD_TAG_KEY_BIN, FIELD_TAG_KEY_TEXT, MEASUREMENT_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_TEXT,
};
use ::generated_types::{aggregate::AggregateType, google::protobuf::*};
@ -59,7 +59,7 @@ pub fn read_filter(
read_source: Some(org_bucket),
range: Some(TimestampRange { start, end: stop }),
key_sort: read_filter_request::KeySort::Unspecified as i32, // IOx doesn't support any other sort
tag_key_meta_names: TagKeyMetaNames::Text as i32,
tag_key_meta_names: TagKeyMetaNames::Binary as i32,
}
}
@ -146,6 +146,14 @@ pub fn tag_values(
}
}
pub(crate) fn tag_key_is_measurement(key: &[u8]) -> bool {
(key == MEASUREMENT_TAG_KEY_TEXT) || (key == MEASUREMENT_TAG_KEY_BIN)
}
pub(crate) fn tag_key_is_field(key: &[u8]) -> bool {
(key == FIELD_TAG_KEY_TEXT) || (key == FIELD_TAG_KEY_BIN)
}
#[cfg(test)]
mod test_super {
use std::num::NonZeroU64;

View File

@ -39,8 +39,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
// Prints the provided data frames in a tabular format grouped into tables per
// distinct measurement.
pub fn pretty_print_frames(frames: &[Data]) -> Result<()> {
let rbs = frames_to_record_batches(frames)?;
pub fn pretty_print_frames<T: TagSchema>(frames: &[Data]) -> Result<()> {
let rbs = frames_to_record_batches::<T>(frames)?;
for (k, rb) in rbs {
println!("\n_measurement: {k}");
println!("rows: {:?}\n", &rb.num_rows());
@ -72,18 +72,16 @@ pub fn pretty_print_strings(values: Vec<String>) -> Result<()> {
// This function takes a set of InfluxRPC data frames and converts them into an
// Arrow record batches, which are suitable for pretty printing.
fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBatch>> {
fn frames_to_record_batches<T: TagSchema>(
frames: &[Data],
) -> Result<BTreeMap<String, RecordBatch>> {
// Run through all the frames once to build the schema of each table we need
// to build as a record batch.
let mut table_column_mapping = determine_tag_columns(frames);
let mut table_column_mapping = determine_tag_columns::<T>(frames);
let mut all_tables = BTreeMap::new();
let mut current_table_frame: Option<(IntermediateTable, SeriesFrame)> = None;
if frames.is_empty() {
return Ok(all_tables);
}
for frame in frames {
match frame {
generated_types::read_response::frame::Data::Group(_) => {
@ -93,7 +91,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
.fail();
}
generated_types::read_response::frame::Data::Series(sf) => {
let cur_frame_measurement = &sf.tags[0].value;
let cur_frame_measurement = T::measurement(sf);
// First series frame in result set.
if current_table_frame.is_none() {
@ -113,10 +111,10 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
// Series frame has moved on to a different measurement. Push
// this table into a record batch and onto final results, then
// create a new table.
if measurement(&prev_series_frame) != cur_frame_measurement {
if T::measurement(&prev_series_frame) != cur_frame_measurement {
let rb: RecordBatch = current_table.try_into()?;
all_tables.insert(
String::from_utf8(measurement(&prev_series_frame).to_owned())
String::from_utf8(T::measurement(&prev_series_frame).to_owned())
.context(InvalidMeasurementNameSnafu)?,
rb,
);
@ -142,7 +140,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::FloatPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_f64(&values);
@ -153,7 +151,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::IntegerPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_i64(&values);
@ -164,7 +162,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::UnsignedPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_u64(&values);
@ -175,7 +173,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::BooleanPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_bool(&values);
@ -186,7 +184,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::StringPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f
.values
@ -209,11 +207,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
// Pad all tag columns with keys present in the previous series frame
// with identical values.
for Tag { key, value } in &prev_series_frame.tags {
if tag_key_is_measurement(key) || tag_key_is_field(key) {
continue;
}
for Tag { ref key, value } in T::tags(prev_series_frame) {
let idx = current_table
.tag_columns
.get(key)
@ -250,13 +244,14 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
}
// Convert and insert current table
let (current_table, prev_series_frame) = current_table_frame.take().unwrap();
let rb: RecordBatch = current_table.try_into()?;
all_tables.insert(
String::from_utf8(measurement(&prev_series_frame).to_owned())
.context(InvalidMeasurementNameSnafu)?,
rb,
);
if let Some((current_table, prev_series_frame)) = current_table_frame.take() {
let rb: RecordBatch = current_table.try_into()?;
all_tables.insert(
String::from_utf8(T::measurement(&prev_series_frame).to_owned())
.context(InvalidMeasurementNameSnafu)?,
rb,
);
}
Ok(all_tables)
}
@ -479,44 +474,31 @@ impl TryFrom<IntermediateTable> for RecordBatch {
// These constants describe known values for the keys associated with
// measurements and fields.
const MEASUREMENT_TAG_KEY_TEXT: [u8; 12] = [
b'_', b'm', b'e', b'a', b's', b'u', b'r', b'e', b'm', b'e', b'n', b't',
];
pub(crate) const MEASUREMENT_TAG_KEY_TEXT: [u8; 12] = *b"_measurement";
pub(crate) const MEASUREMENT_TAG_KEY_BIN: [u8; 1] = [0_u8];
const FIELD_TAG_KEY_TEXT: [u8; 6] = [b'_', b'f', b'i', b'e', b'l', b'd'];
pub(crate) const FIELD_TAG_KEY_TEXT: [u8; 6] = *b"_field";
pub(crate) const FIELD_TAG_KEY_BIN: [u8; 1] = [255_u8];
// Store a collection of column names and types for a single table (measurement).
#[derive(Debug, Default, PartialEq, Eq)]
struct TableColumns {
pub struct TableColumns {
tag_columns: BTreeSet<Vec<u8>>,
field_columns: BTreeMap<Vec<u8>, DataType>,
}
// Given a set of data frames determine from the series frames within the set
// of tag columns for each distinct table (measurement).
fn determine_tag_columns(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
fn determine_tag_columns<T: TagSchema>(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
let mut schema: BTreeMap<Vec<u8>, TableColumns> = BTreeMap::new();
for frame in frames {
if let Data::Series(sf) = frame {
if let Data::Series(ref sf) = frame {
assert!(!sf.tags.is_empty(), "expected _measurement and _field tags");
// PERF: avoid clone of value
let measurement_name = sf
.tags
.iter()
.find(|t| tag_key_is_measurement(&t.key))
.expect("measurement name not found")
.value
.clone();
let measurement_name = T::measurement(sf).clone();
let table = schema.entry(measurement_name).or_default();
for Tag { key, value } in sf.tags.iter().skip(1) {
if tag_key_is_field(key) {
table.field_columns.insert(value.clone(), sf.data_type());
continue;
}
let field_name = T::field_name(sf).clone();
table.field_columns.insert(field_name, sf.data_type());
for Tag { key, .. } in T::tags(sf) {
// PERF: avoid clone of key
table.tag_columns.insert(key.clone()); // Add column to table schema
}
@ -525,25 +507,67 @@ fn determine_tag_columns(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
schema
}
// Extract a reference to the measurement name from a Series frame.
fn measurement(frame: &SeriesFrame) -> &Vec<u8> {
assert!(tag_key_is_measurement(&frame.tags[0].key));
&frame.tags[0].value
pub trait TagSchema {
type IntoIter<'a>: Iterator<Item = &'a Tag>;
/// Returns the value of the measurement meta tag.
fn measurement(frame: &SeriesFrame) -> &Vec<u8>;
/// Returns the value of the field meta tag.
fn field_name(frame: &SeriesFrame) -> &Vec<u8>;
/// Returns the tags without the measurement or field meta tags.
fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_>;
}
// Extract a reference to the field name from a Series frame.
fn field_name(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame.tags.len() - 1;
assert!(tag_key_is_field(&frame.tags[idx].key));
&frame.tags[idx].value
pub struct BinaryTagSchema;
impl TagSchema for BinaryTagSchema {
type IntoIter<'a> = std::slice::Iter<'a, Tag>;
fn measurement(frame: &SeriesFrame) -> &Vec<u8> {
assert_eq!(frame.tags[0].key, MEASUREMENT_TAG_KEY_BIN);
&frame.tags[0].value
}
fn field_name(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame.tags.len() - 1;
assert_eq!(frame.tags[idx].key, FIELD_TAG_KEY_BIN);
&frame.tags[idx].value
}
fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_> {
frame.tags[1..frame.tags.len() - 1].iter()
}
}
pub(crate) fn tag_key_is_measurement(key: &[u8]) -> bool {
(key == MEASUREMENT_TAG_KEY_TEXT) || (key == MEASUREMENT_TAG_KEY_BIN)
}
pub struct TextTagSchema;
pub(crate) fn tag_key_is_field(key: &[u8]) -> bool {
(key == FIELD_TAG_KEY_TEXT) || (key == FIELD_TAG_KEY_BIN)
impl TagSchema for TextTagSchema {
type IntoIter<'a> = iter::Filter<std::slice::Iter<'a, Tag>, fn(&&Tag) -> bool>;
fn measurement(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame
.tags
.binary_search_by(|t| t.key[..].cmp(&MEASUREMENT_TAG_KEY_TEXT[..]))
.expect("missing measurement");
&frame.tags[idx].value
}
fn field_name(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame
.tags
.binary_search_by(|t| t.key[..].cmp(&FIELD_TAG_KEY_TEXT[..]))
.expect("missing field");
&frame.tags[idx].value
}
fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_> {
frame
.tags
.iter()
.filter(|t| t.key != MEASUREMENT_TAG_KEY_TEXT && t.key != FIELD_TAG_KEY_TEXT)
}
}
#[cfg(test)]
@ -553,15 +577,17 @@ mod test_super {
BooleanPointsFrame, FloatPointsFrame, IntegerPointsFrame, SeriesFrame, StringPointsFrame,
UnsignedPointsFrame,
};
use itertools::Itertools;
use super::*;
// converts a vector of key/value pairs into a vector of `Tag`.
fn make_tags(pairs: &[(&str, &str)]) -> Vec<Tag> {
/// Converts a vector of `(key, value)` tuples into a vector of `Tag`, sorted by key.
fn make_tags(pairs: &[(&[u8], &str)]) -> Vec<Tag> {
pairs
.iter()
.sorted_by(|(a_key, _), (b_key, _)| Ord::cmp(a_key, b_key))
.map(|(key, value)| Tag {
key: key.as_bytes().to_vec(),
key: key.to_vec(),
value: value.as_bytes().to_vec(),
})
.collect::<Vec<_>>()
@ -618,15 +644,32 @@ mod test_super {
all_table_columns
}
trait KeyNames {
const MEASUREMENT_KEY: &'static [u8];
const FIELD_KEY: &'static [u8];
}
struct BinaryKeyNames;
impl KeyNames for BinaryKeyNames {
const MEASUREMENT_KEY: &'static [u8] = &[0_u8];
const FIELD_KEY: &'static [u8] = &[255_u8];
}
struct TextKeyNames;
impl KeyNames for TextKeyNames {
const MEASUREMENT_KEY: &'static [u8] = b"_measurement";
const FIELD_KEY: &'static [u8] = b"_field";
}
// generate a substantial set of frames across multiple tables.
fn gen_frames() -> Vec<Data> {
fn gen_frames<K: KeyNames>() -> Vec<Data> {
vec![
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "cpu"),
("host", "foo"),
("server", "a"),
("_field", "temp"),
(K::MEASUREMENT_KEY, "cpu"),
(b"host", "foo"),
(b"server", "a"),
(K::FIELD_KEY, "temp"),
]),
data_type: DataType::Float as i32,
}),
@ -640,10 +683,10 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "cpu"),
("host", "foo"),
("server", "a"),
("_field", "voltage"),
(K::MEASUREMENT_KEY, "cpu"),
(b"host", "foo"),
(b"server", "a"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::Integer as i32,
}),
@ -653,10 +696,10 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "cpu"),
("host", "foo"),
("new_column", "a"),
("_field", "voltage"),
(K::MEASUREMENT_KEY, "cpu"),
(b"host", "foo"),
(b"new_column", "a"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::Integer as i32,
}),
@ -665,7 +708,10 @@ mod test_super {
values: vec![1000, 2000],
}),
Data::Series(SeriesFrame {
tags: make_tags(&[("_measurement", "another table"), ("_field", "voltage")]),
tags: make_tags(&[
(K::MEASUREMENT_KEY, "another table"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::String as i32,
}),
Data::StringPoints(StringPointsFrame {
@ -674,9 +720,9 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "another table"),
("region", "west"),
("_field", "voltage"),
(K::MEASUREMENT_KEY, "another table"),
(b"region", "west"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::String as i32,
}),
@ -686,9 +732,9 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "another table"),
("region", "north"),
("_field", "bool_field"),
(K::MEASUREMENT_KEY, "another table"),
(b"region", "north"),
(K::FIELD_KEY, "bool_field"),
]),
data_type: DataType::Boolean as i32,
}),
@ -698,9 +744,9 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "another table"),
("region", "south"),
("_field", "unsigned_field"),
(K::MEASUREMENT_KEY, "another table"),
(b"region", "south"),
(K::FIELD_KEY, "unsigned_field"),
]),
data_type: DataType::Unsigned as i32,
}),
@ -712,11 +758,15 @@ mod test_super {
}
#[test]
fn test_determine_tag_columns() {
assert!(determine_tag_columns(&[]).is_empty());
fn test_binary_determine_tag_columns() {
assert!(determine_tag_columns::<BinaryTagSchema>(&[]).is_empty());
let frame = Data::Series(SeriesFrame {
tags: make_tags(&[("_measurement", "cpu"), ("server", "a"), ("_field", "temp")]),
tags: make_tags(&[
(BinaryKeyNames::MEASUREMENT_KEY, "cpu"),
(b"server", "a"),
(BinaryKeyNames::FIELD_KEY, "temp"),
]),
data_type: DataType::Float as i32,
});
@ -725,10 +775,10 @@ mod test_super {
&["server"],
&[("temp", DataType::Float)],
)]);
assert_eq!(determine_tag_columns(&[frame]), exp);
assert_eq!(determine_tag_columns::<BinaryTagSchema>(&[frame]), exp);
// larger example
let frames = gen_frames();
let frames = gen_frames::<BinaryKeyNames>();
let exp = make_table_columns(&[
TableColumnInput::new(
@ -746,14 +796,56 @@ mod test_super {
],
),
]);
assert_eq!(determine_tag_columns(&frames), exp);
assert_eq!(determine_tag_columns::<BinaryTagSchema>(&frames), exp);
}
#[test]
fn test_text_determine_tag_columns() {
assert!(determine_tag_columns::<TextTagSchema>(&[]).is_empty());
let frame = Data::Series(SeriesFrame {
tags: make_tags(&[
(b"_measurement", "cpu"),
(b"server", "a"),
(b"_field", "temp"),
]),
data_type: DataType::Float as i32,
});
let exp = make_table_columns(&[TableColumnInput::new(
"cpu",
&["server"],
&[("temp", DataType::Float)],
)]);
assert_eq!(determine_tag_columns::<TextTagSchema>(&[frame]), exp);
// larger example
let frames = gen_frames::<TextKeyNames>();
let exp = make_table_columns(&[
TableColumnInput::new(
"cpu",
&["host", "new_column", "server"],
&[("temp", DataType::Float), ("voltage", DataType::Integer)],
),
TableColumnInput::new(
"another table",
&["region"],
&[
("bool_field", DataType::Boolean),
("unsigned_field", DataType::Unsigned),
("voltage", DataType::String),
],
),
]);
assert_eq!(determine_tag_columns::<TextTagSchema>(&frames), exp);
}
#[test]
fn test_frames_to_into_record_batches() {
let frames = gen_frames();
let frames = gen_frames::<TextKeyNames>();
let rbs = frames_to_record_batches(&frames);
let rbs = frames_to_record_batches::<TextTagSchema>(&frames);
let exp = vec![
(
"another table",

View File

@ -33,7 +33,7 @@ chrono = { version = "0.4", default-features = false }
flate2 = "1.0"
futures = "0.3"
hashbrown = { workspace = true }
http = "0.2.8"
http = "0.2.9"
hyper = "0.14"
log = "0.4"
parking_lot = "0.12"

View File

@ -15,7 +15,7 @@ data_types = { path = "../data_types" }
dml = { path = "../dml" }
futures = "0.3"
generated_types = { path = "../generated_types" }
http = "0.2.8"
http = "0.2.9"
hyper = "0.14"
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -31,7 +31,7 @@ pub struct LoggingConfig {
/// Convenient way to set log severity level filter.
/// Overrides `--log-filter`.
///
/// -v 'info'
/// -v 'info,sqlx=warn'
///
/// -vv 'debug,hyper::proto::h1=info,h2=info'
///

View File

@ -118,7 +118,7 @@ where
pub fn with_log_verbose_count(self, log_verbose_count: u8) -> Self {
let log_filter = match log_verbose_count {
0 => self.log_filter,
1 => Some(EnvFilter::try_new("info").unwrap()),
1 => Some(EnvFilter::try_new("info,sqlx=warn").unwrap()),
2 => Some(EnvFilter::try_new("debug,hyper::proto::h1=info,h2=info").unwrap()),
_ => Some(EnvFilter::try_new("trace,hyper::proto::h1=info,h2=info").unwrap()),
};