diff --git a/Cargo.lock b/Cargo.lock index 49e0877a9f..189dab8f20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2280,9 +2280,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes", "fnv", @@ -2516,6 +2516,7 @@ dependencies = [ "console-subscriber", "data_types", "datafusion", + "dirs", "dotenvy", "flate2", "futures", diff --git a/README.md b/README.md index 351cc2be04..7e6aaf52d8 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,13 @@ To compile for development, run: cargo build ``` +To compile for release and install the `influxdb_iox` binary in your path (so you can run `influxdb_iox` directly) do: + +```shell +# from within the main `influxdb_iox` checkout +cargo install --path influxdb_iox +``` + This creates a binary at `target/debug/influxdb_iox`. ### Build a Docker image (optional) @@ -171,30 +178,32 @@ DOCKER_BUILDKIT=1 docker build . [Enable BuildKit]: https://docs.docker.com/develop/develop-images/build_enhancements/#to-enable-buildkit-builds -#### Ephemeral mode +#### Local filesystem testing mode -To start InfluxDB IOx and store data in memory, after you've compiled for development, run: +InfluxDB IOx supports testing entirely backed by the local filesystem. + +> **Note** +> +> This mode should NOT be used for production systems: it will have poor performance and limited tuning knobs are available. + +To run IOx in local testing mode, use: ```shell +./target/debug/influxdb_iox +# shorthand for ./target/debug/influxdb_iox run all-in-one ``` -By default this runs an "all-in-one" server with HTTP server on port `8080`, router gRPC server on port `8081` and querier gRPC server on port `8082`. When the server is stopped all data lost. +This will start an "all-in-one" IOx server with the following configuration: +1. File backed catalog (sqlite), object store, and write ahead log (wal) stored under `/.influxdb_iox` +2. HTTP `v2` api server on port `8080`, querier gRPC server on port `8082` and several ports for other internal services. -#### Local persistence mode - -To start InfluxDB IOx and store the catalog in Postgres and data in the local filesystem to persist -data across restarts, after you've compiled for development, run: +You can also change the configuration in limited ways, such as choosing a different data directory: ```shell -./target/debug/influxdb_iox run all-in-one --catalog-dsn postgres:///iox_shared --data-dir=~/iox_data +./target/debug/influxdb_iox run all-in-one --data-dir=/tmp/iox_data ``` -where `--catalog-dsn` is a connection URL to the Postgres database you wish to use, and -`--data-dir` is the directory you wish to use. - -Note that when the server is stopped all data that has not yet been written to parquet files will be lost. - #### Compile and run diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index 6c46f534b5..fe0bfc3fb1 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -9,7 +9,7 @@ license.workspace = true clap = { version = "4", features = ["derive", "env"] } data_types = { path = "../data_types" } futures = "0.3" -http = "0.2.8" +http = "0.2.9" humantime = "2.1.0" iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } diff --git a/clap_blocks/src/catalog_dsn.rs b/clap_blocks/src/catalog_dsn.rs index 40ef51c23d..3993668f78 100644 --- a/clap_blocks/src/catalog_dsn.rs +++ b/clap_blocks/src/catalog_dsn.rs @@ -60,7 +60,13 @@ pub struct CatalogDsnConfig { )] pub(crate) catalog_type_: CatalogType, - /// Postgres connection string. Required if catalog is set to postgres. + /// Catalog connection string. Required if catalog is set to postgres. + /// + /// The dsn is interpreted based on the type of catalog used. + /// + /// PostgreSQL: `postgresql://postgres@localhost:5432/postgres` + /// + /// Sqlite (a local filename): '/tmp/foo.sqlite' #[clap(long = "catalog-dsn", env = "INFLUXDB_IOX_CATALOG_DSN", action)] pub dsn: Option, @@ -117,13 +123,22 @@ pub struct CatalogDsnConfig { #[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)] pub enum CatalogType { /// PostgreSQL. + /// + /// Example dsn: `"postgresql://postgres@localhost:5432/postgres"` #[default] Postgres, /// In-memory. + /// + /// Example dsn: None Memory, /// SQLite. + /// + /// The dsn is a path to local file. + /// + /// Example dsn: `"/tmp/foo.sqlite"` + /// Sqlite, } @@ -139,7 +154,8 @@ impl CatalogDsnConfig { } } - /// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified + /// Create a new Postgres instance for all-in-one mode if a + /// catalog DSN is specified pub fn new_postgres(dsn: String, postgres_schema_name: String) -> Self { info!("Catalog: Postgres at `{}`", dsn); @@ -154,8 +170,9 @@ impl CatalogDsnConfig { } } - /// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified - pub fn new_sqlite(dsn: String) -> Self { + /// Create a new Sqlite instance for all-in-one mode + pub fn new_sqlite(dsn: impl Into) -> Self { + let dsn = dsn.into(); info!("Catalog: SQLite at `{}`", dsn); Self { diff --git a/clap_blocks/src/compactor2.rs b/clap_blocks/src/compactor2.rs index 753e845641..c5c179acc6 100644 --- a/clap_blocks/src/compactor2.rs +++ b/clap_blocks/src/compactor2.rs @@ -7,7 +7,9 @@ use std::num::NonZeroUsize; pub struct Compactor2Config { /// Number of partitions that should be compacted in parallel. /// - /// This should usually be larger than the compaction job concurrency since one partition can spawn multiple compaction jobs. + /// This should usually be larger than the compaction job + /// concurrency since one partition can spawn multiple compaction + /// jobs. #[clap( long = "compaction-partition-concurrency", env = "INFLUXDB_IOX_COMPACTION_PARTITION_CONCURRENCY", @@ -18,7 +20,8 @@ pub struct Compactor2Config { /// Number of concurrent compaction jobs. /// - /// This should usually be smaller than the partition concurrency since one partition can spawn multiple compaction jobs. + /// This should usually be smaller than the partition concurrency + /// since one partition can spawn multiple compaction jobs. #[clap( long = "compaction-job-concurrency", env = "INFLUXDB_IOX_COMPACTION_JOB_CONCURRENCY", @@ -27,7 +30,8 @@ pub struct Compactor2Config { )] pub compaction_job_concurrency: NonZeroUsize, - /// Number of jobs PER PARTITION that move files in and out of the scratchpad. + /// Number of jobs PER PARTITION that move files in and out of the + /// scratchpad. #[clap( long = "compaction-partition-scratchpad-concurrency", env = "INFLUXDB_IOX_COMPACTION_PARTITION_SCRATCHPAD_CONCURRENCY", @@ -36,7 +40,8 @@ pub struct Compactor2Config { )] pub compaction_partition_scratchpad_concurrency: NonZeroUsize, - /// Partitions with recent created files these last minutes are selected for compaction. + /// The compactor will only consider compacting partitions that + /// have new parquet files created within this many minutes. #[clap( long = "compaction_partition_minute_threshold", env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD", @@ -45,19 +50,22 @@ pub struct Compactor2Config { )] pub compaction_partition_minute_threshold: u64, - /// Number of threads to use for the compactor query execution, compaction and persistence. + /// Number of threads to use for the compactor query execution, + /// compaction and persistence. + /// If not specified, defaults to one less than the number of cores on the system #[clap( long = "query-exec-thread-count", env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT", - default_value = "4", action )] - pub query_exec_thread_count: usize, + pub query_exec_thread_count: Option, - /// Size of memory pool used during query exec, in bytes. + /// Size of memory pool used during compaction plan execution, in + /// bytes. /// - /// If queries attempt to allocate more than this many bytes - /// during execution, they will error with "ResourcesExhausted". + /// If compaction plans attempt to allocate more than this many + /// bytes during execution, they will error with + /// "ResourcesExhausted". #[clap( long = "exec-mem-pool-bytes", env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", @@ -67,7 +75,8 @@ pub struct Compactor2Config { pub exec_mem_pool_bytes: usize, /// Desired max size of compacted parquet files. - /// It is a target desired value, rather than a guarantee. + /// + /// Note this is a target desired value, rather than a guarantee. /// 1024 * 1024 * 100 = 104,857,600 #[clap( long = "compaction-max-desired-size-bytes", @@ -78,6 +87,7 @@ pub struct Compactor2Config { pub max_desired_file_size_bytes: u64, /// Percentage of desired max file size. + /// /// If the estimated compacted result is too small, no need to split it. /// This percentage is to determine how small it is: /// < percentage_max_file_size * max_desired_file_size_bytes: @@ -149,7 +159,14 @@ pub struct Compactor2Config { )] pub ignore_partition_skip_marker: bool, - /// Maximum number of files in a compaction plan + /// Maximum number of files that the compactor will try and + /// compact in a single plan. + /// + /// The higher this setting is the fewer compactor plans are run + /// and thus fewer resources over time are consumed by the + /// compactor. Increasing this setting also increases the peak + /// memory used for each compaction plan, and thus if it is set + /// too high, the compactor plans may exceed available memory. #[clap( long = "compaction-max-num-files-per-plan", env = "INFLUXDB_IOX_COMPACTION_MAX_NUM_FILES_PER_PLAN", @@ -158,8 +175,17 @@ pub struct Compactor2Config { )] pub max_num_files_per_plan: usize, - /// Maximum input bytes (in parquet) per partition. If there is more data, we ignore the partition (for now) as a - /// self-protection mechanism. + /// Maximum input bytes (in parquet) per partition that the + /// compactor will attempt to compact in any one plan. + /// + /// In the worst case, if the sum of the sizes of all parquet + /// files in a partition is greater than this value, the compactor + /// may not try to compact this partition. Under normal operation, + /// the compactor compacts a subset of files in a partition but in + /// some cases it may need to compact them all. + /// + /// This setting is a self protection mechanism, and it is + /// expected to be removed in future versions #[clap( long = "compaction-max-input-parquet-bytes-per-partition", env = "INFLUXDB_IOX_COMPACTION_MAX_INPUT_PARQUET_BYTES_PER_PARTITION", @@ -190,7 +216,15 @@ pub struct Compactor2Config { )] pub shard_id: Option, - /// Minimum number of L1 files to comapct to L2 + /// Minimum number of L1 files to compact to L2. + /// + /// If there are more than this many L1 (by definition non + /// overlapping) files in a partition, the compactor will compact + /// them together into one or more larger L2 files. + /// + /// Setting this value higher in general results in fewer overall + /// resources spent on compaction but more files per partition (and + /// thus less optimal compression and query performance). #[clap( long = "compaction-min-num-l1-files-to-compact", env = "INFLUXDB_IOX_COMPACTION_MIN_NUM_L1_FILES_TO_COMPACT", @@ -200,6 +234,10 @@ pub struct Compactor2Config { pub min_num_l1_files_to_compact: usize, /// Only process all discovered partitions once. + /// + /// By default the compactor will continuously loop over all + /// partitions looking for work. Setting this option results in + /// exiting the loop after the one iteration. #[clap( long = "compaction-process-once", env = "INFLUXDB_IOX_COMPACTION_PROCESS_ONCE", @@ -207,7 +245,8 @@ pub struct Compactor2Config { )] pub process_once: bool, - /// Compact all partitions found in the catalog, no matter if/when the received writes. + /// Compact all partitions found in the catalog, no matter if/when + /// they received writes. #[clap( long = "compaction-process-all-partitions", env = "INFLUXDB_IOX_COMPACTION_PROCESS_ALL_PARTITIONS", @@ -215,8 +254,11 @@ pub struct Compactor2Config { )] pub process_all_partitions: bool, - /// Maximum number of columns in the table of a partition that will be able to considered - /// to get compacted + /// Maximum number of columns in a table of a partition that + /// will be able to considered to get compacted + /// + /// If a table has more than this many columns, the compactor will + /// not compact it, to avoid large memory use. #[clap( long = "compaction-max-num-columns-per-table", env = "INFLUXDB_IOX_COMPACTION_MAX_NUM_COLUMNS_PER_TABLE", diff --git a/client_util/Cargo.toml b/client_util/Cargo.toml index 1cae8a4780..072c58dc46 100644 --- a/client_util/Cargo.toml +++ b/client_util/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] -http = "0.2.8" +http = "0.2.9" reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } thiserror = "1.0.38" tonic = { version = "0.8" } diff --git a/compactor2/tests/layouts/mod.rs b/compactor2/tests/layouts/mod.rs index 56d9b56b30..9a0d406ba3 100644 --- a/compactor2/tests/layouts/mod.rs +++ b/compactor2/tests/layouts/mod.rs @@ -119,6 +119,10 @@ async fn all_overlapping_l0() { - "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|" - "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|" - "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 90mb total:" + - "L1 " + - "L1.0[100,180] 72mb |-----------------------------L1.0-----------------------------| " + - "L1.0[180,200] 18mb |-----L1.0-----|" - "**** Final Output Files " - "L1 " - "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| " @@ -174,6 +178,10 @@ async fn all_non_overlapping_l0() { - "L0.3[200,201] |L0.3| " - "L0.2[100,101] |L0.2| " - "L0.1[0,1] |L0.1| " + - "**** 2 Output Files (parquet_file_id not yet assigned), 100mb total:" + - "L1 " + - "L1.0[0,720] 79.91mb |----------------------------L1.0-----------------------------| " + - "L1.0[720,901] 20.09mb |-----L1.0-----| " - "**** Final Output Files " - "L1 " - "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| " @@ -243,6 +251,9 @@ async fn l1_with_overlapping_l0() { - "L0.3[140,190] 5kb |------L0.3-------| " - "L1 " - "L1.2[100,150] 10mb |------L1.2-------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:" + - "L1, all files 10.02mb " + - "L1.0[100,310] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1 " - "L1.1[50,100] 10mb |----L1.1-----| " @@ -308,6 +319,9 @@ async fn l1_with_non_overlapping_l0() { - "L0.5[400,450] |-----L0.5-----| " - "L0.4[350,400] |-----L0.4-----| " - "L0.3[300,350] |-----L0.3-----| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:" + - "L1, all files 25kb " + - "L1.0[300,550] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1 " - "L1.1[50,100] 10mb |-L1.1-| " @@ -372,6 +386,9 @@ async fn l1_with_non_overlapping_l0_larger() { - "L0.7[400,450] |----------L0.7----------| " - "L0.6[350,400] |----------L0.6----------| " - "L0.5[300,350] |----------L0.5----------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[300,450] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:" - "L1 " - "L1.4[200,250] 3mb |--L1.4--| " @@ -379,6 +396,10 @@ async fn l1_with_non_overlapping_l0_larger() { - "L1.2[100,150] 50mb |--L1.2--| " - "L1.1[50,100] 20mb |--L1.1--| " - "L1.8[300,450] 15mb |------------L1.8------------|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 108mb total:" + - "L2 " + - "L2.0[50,370] 86.4mb |-----------------------------L2.0-----------------------------| " + - "L2.0[370,450] 21.6mb |-----L2.0-----|" - "**** Final Output Files " - "L2 " - "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| " @@ -451,6 +472,9 @@ async fn l1_too_much_with_non_overlapping_l0() { - "L0.13[600,650] |------------------------------------L0.13-------------------------------------|" - "L0.12[600,650] |------------------------------------L0.12-------------------------------------|" - "L0.11[600,650] |------------------------------------L0.11-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|" - "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456" - "**** Final Output Files " - "L1 " @@ -532,6 +556,9 @@ async fn many_l1_with_non_overlapping_l0() { - "L0.13[600,650] |------------------------------------L0.13-------------------------------------|" - "L0.12[600,650] |------------------------------------L0.12-------------------------------------|" - "L0.11[600,650] |------------------------------------L0.11-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:" - "L1 " - "L1.10[500,550] 7mb |L1.10| " @@ -545,6 +572,10 @@ async fn many_l1_with_non_overlapping_l0() { - "L1.2[100,150] 8mb |L1.2| " - "L1.1[50,100] 9mb |L1.1| " - "L1.14[600,650] 15mb |L1.14|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 88mb total:" + - "L2 " + - "L2.0[50,530] 70.4mb |-----------------------------L2.0-----------------------------| " + - "L2.0[530,650] 17.6mb |-----L2.0-----|" - "**** Final Output Files " - "L2 " - "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| " @@ -608,11 +639,18 @@ async fn large_l1_with_non_overlapping_l0() { - "L0.5[600,650] |-------------------------------------L0.5-------------------------------------|" - "L0.4[600,650] |-------------------------------------L0.4-------------------------------------|" - "L0.3[600,650] |-------------------------------------L0.3-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:" - "L1 " - "L1.2[100,150] 80mb |L1.2| " - "L1.1[50,100] 90mb |L1.1| " - "L1.6[600,650] 15mb |L1.6| " + - "**** 2 Output Files (parquet_file_id not yet assigned), 185mb total:" + - "L2 " + - "L2.0[50,375] 100.21mb|------------------L2.0-------------------| " + - "L2.0[375,650] 84.79mb |---------------L2.0---------------| " - "**** Final Output Files " - "L2 " - "L2.7[50,375] 100.21mb|------------------L2.7-------------------| " @@ -688,6 +726,9 @@ async fn many_l1_files() { - "L0.23[24,25] |------------------------------------L0.23-------------------------------------|" - "L0.22[24,25] |------------------------------------L0.22-------------------------------------|" - "L0.21[24,25] |------------------------------------L0.21-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:" + - "L1, all files 3mb " + - "L1.0[24,25] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:" - "L1 " - "L1.20[19,20] 10mb |L1.20| " @@ -711,6 +752,10 @@ async fn many_l1_files() { - "L1.2[1,2] 10mb |L1.2| " - "L1.1[0,1] 10mb |L1.1| " - "L1.24[24,25] 3mb |L1.24|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 203mb total:" + - "L2 " + - "L2.0[0,13] 105.56mb |-----------------L2.0------------------| " + - "L2.0[13,25] 97.44mb |----------------L2.0----------------| " - "**** Final Output Files " - "L2 " - "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| " @@ -1244,6 +1289,9 @@ async fn many_tiny_l0_files() { - "L0.198[197,198] |L0.198|" - "L0.199[198,199] |L0.199|" - "L0.200[199,200] |L0.200|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L0, all files 1.37mb " + - "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:" - "L0, all files 7kb " - "L0.201[200,201] |L0.201| " @@ -1334,10 +1382,16 @@ async fn many_tiny_l0_files() { - "L0.286[285,286] |L0.286|" - "L0.287[286,287] |L0.287|" - "L0.288[287,288] |L0.288|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:" + - "L0, all files 616kb " + - "L0.0[200,288] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:" - "L0 " - "L0.290[200,288] 616kb |--------L0.290--------| " - "L0.289[0,200] 1.37mb|-----------------------L0.289------------------------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:" + - "L1, all files 1.97mb " + - "L1.0[0,288] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1, all files 1.97mb " - "L1.291[0,288] |------------------------------------L1.291------------------------------------|" @@ -1990,6 +2044,9 @@ async fn over_two_times_max_files_per_plan() { - "L0.198[197,198] |L0.198|" - "L0.199[198,199] |L0.199|" - "L0.200[199,200] |L0.200|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L0, all files 1.37mb " + - "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:" - "L0, all files 7kb " - "L0.201[200,201] |L0.201| " @@ -2192,6 +2249,9 @@ async fn over_two_times_max_files_per_plan() { - "L0.398[397,398] |L0.398|" - "L0.399[398,399] |L0.399|" - "L0.400[399,400] |L0.400|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L0, all files 1.37mb " + - "L0.0[200,400] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:" - "L0, all files 7kb " - "L0.401[400,401] |L0.401| " @@ -2204,11 +2264,17 @@ async fn over_two_times_max_files_per_plan() { - "L0.408[407,408] |L0.408| " - "L0.409[408,409] |L0.409| " - "L0.410[409,410] |L0.410|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:" + - "L0, all files 70kb " + - "L0.0[400,410] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:" - "L0 " - "L0.413[400,410] 70kb |L0.413|" - "L0.412[200,400] 1.37mb |---------------L0.412----------------| " - "L0.411[0,200] 1.37mb|---------------L0.411----------------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:" + - "L1, all files 2.8mb " + - "L1.0[0,410] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1, all files 2.8mb " - "L1.414[0,410] |------------------------------------L1.414------------------------------------|" @@ -2741,6 +2807,9 @@ async fn many_tiny_l1_files() { - "L1.198[197,198] |L1.198|" - "L1.199[198,199] |L1.199|" - "L1.200[199,200] |L1.200|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L1, all files 1.37mb " + - "L1.0[0,200] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:" - "L1, all files 7kb " - "L1.201[200,201] |L1.201| " @@ -2831,6 +2900,9 @@ async fn many_tiny_l1_files() { - "L1.286[285,286] |L1.286|" - "L1.287[286,287] |L1.287|" - "L1.288[287,288] |L1.288|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:" + - "L1, all files 616kb " + - "L1.0[200,288] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1 " - "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| " @@ -3297,6 +3369,9 @@ async fn many_l0_and_overlapped_l1_files() { - "L0.188[187,188] |L0.188|" - "L0.189[188,189] |L0.189|" - "L0.190[189,190] |L0.190|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:" + - "L0, all files 1.3mb " + - "L0.0[0,190] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:" - "L0 " - "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| " @@ -3321,6 +3396,10 @@ async fn many_l0_and_overlapped_l1_files() { - "L1.207[160,169] 1mb |L1.207| " - "L1.208[170,179] 1mb |L1.208| " - "L1.209[180,189] 1mb |L1.209|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:" + - "L1 " + - "L1.0[0,159] 17.02mb |----------------------------L1.0-----------------------------| " + - "L1.0[159,199] 4.28mb |-----L1.0-----| " - "**** Final Output Files " - "L1 " - "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| " @@ -3808,6 +3887,9 @@ async fn not_many_l0_and_overlapped_l1_files() { - "L1.192[10,19] 1mb |L1.192| " - "L1.193[20,29] 1mb |L1.193| " - "L1.194[30,39] 1mb |L1.194| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:" + - "L1, all files 6.3mb " + - "L1.0[0,190] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:" - "L1 " - "L1.196[200,209] 1mb |L1.196| " @@ -3826,6 +3908,10 @@ async fn not_many_l0_and_overlapped_l1_files() { - "L1.209[2800,2809] 1mb |L1.209|" - "L1.210[3000,3009] 1mb |L1.210|" - "L1.211[0,190] 6.3mb |L1.211| " + - "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:" + - "L2 " + - "L2.0[0,2407] 17.04mb|----------------------------L2.0-----------------------------| " + - "L2.0[2407,3009] 4.26mb |-----L2.0-----| " - "**** Final Output Files " - "L2 " - "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| " diff --git a/compactor2_test_utils/src/display.rs b/compactor2_test_utils/src/display.rs index 7ff46eaf9b..0065454ed3 100644 --- a/compactor2_test_utils/src/display.rs +++ b/compactor2_test_utils/src/display.rs @@ -37,7 +37,7 @@ const DEFAULT_HEADING_WIDTH: usize = 20; /// parquet files arranged so they are lined up horizontally based on /// their relative time range. /// -/// See docs on [`ParquetFileFormatter`]z for examples. +/// See docs on [`ParquetFileFormatter`] for examples. fn readable_list_of_files<'a>( title: Option, files: impl IntoIterator, diff --git a/compactor2_test_utils/src/simulator.rs b/compactor2_test_utils/src/simulator.rs index 651e48bd87..1d4a299c9b 100644 --- a/compactor2_test_utils/src/simulator.rs +++ b/compactor2_test_utils/src/simulator.rs @@ -5,7 +5,8 @@ use std::{ use async_trait::async_trait; use data_types::{ - ColumnSet, CompactionLevel, ParquetFile, ParquetFileParams, SequenceNumber, ShardId, Timestamp, + ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber, + ShardId, Timestamp, }; use datafusion::physical_plan::SendableRecordBatchStream; use iox_time::Time; @@ -69,24 +70,50 @@ impl ParquetFileSimulator { runs.into_iter() .enumerate() .flat_map(|(i, run)| { - let total_input_size: i64 = run - .input_parquet_files - .iter() - .map(|f| f.file_size_bytes) - .sum(); - let title = format!( + let SimulatedRun { + plan_type, + input_parquet_files, + output_params, + } = run; + + let input_title = format!( "**** Simulation run {}, type={}. {} Input Files, {} total:", i, - run.plan_type, - run.input_parquet_files.len(), - display_size(total_input_size) + plan_type, + input_parquet_files.len(), + display_size(total_size(input_parquet_files.iter())) ); - format_files(title, &run.input_parquet_files) + + // display the files created by this run + let output_parquet_files: Vec<_> = output_params + .into_iter() + .map(|params| { + // Use file id 0 as they haven't been + // assigned an id in the catalog yet + ParquetFile::from_params(params, ParquetFileId::new(0)) + }) + .collect(); + + let output_title = format!( + "**** {} Output Files (parquet_file_id not yet assigned), {} total:", + output_parquet_files.len(), + display_size(total_size(output_parquet_files.iter())) + ); + + // hook up inputs and outputs + format_files(input_title, &input_parquet_files) + .into_iter() + .chain(format_files(output_title, &output_parquet_files).into_iter()) }) .collect() } } +/// return the total file size of all the parquet files +fn total_size<'a>(parquet_files: impl IntoIterator) -> i64 { + parquet_files.into_iter().map(|f| f.file_size_bytes).sum() +} + #[async_trait] impl ParquetFilesSink for ParquetFileSimulator { async fn stream_into_file_sink( @@ -134,20 +161,21 @@ impl ParquetFilesSink for ParquetFileSimulator { let partition_info = partition_info.as_ref(); // Compute final output - let output: Vec<_> = output_files + let output_params: Vec<_> = output_files .into_iter() .map(|f| { f.into_parquet_file_params(max_l0_created_at, column_set.clone(), partition_info) }) .collect(); - // record what we did + // record what the simulator did self.runs.lock().unwrap().push(SimulatedRun { plan_type, input_parquet_files, + output_params: output_params.clone(), }); - Ok(output) + Ok(output_params) } fn as_any(&self) -> &dyn std::any::Any { @@ -229,6 +257,7 @@ pub struct SimulatedRun { // fields are used in testing plan_type: String, input_parquet_files: Vec, + output_params: Vec, } fn overall_column_set<'a>(files: impl IntoIterator) -> ColumnSet { diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index f9331cb2fd..c6ca987f64 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -53,12 +53,13 @@ bytes = "1.4" clap = { version = "4", features = ["derive", "env"] } comfy-table = { version = "6.1", default-features = false } console-subscriber = { version = "0.1.8", optional = true, features = ["parking_lot"] } +dirs = "4.0.0" dotenvy = "0.15.6" futures = "0.3" futures-util = { version = "0.3" } flate2 = "1.0" hashbrown = { workspace = true } -http = "0.2.8" +http = "0.2.9" humantime = "2.1.0" itertools = "0.10.5" libc = { version = "0.2" } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index e48957534e..0d8b37d759 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -27,16 +27,25 @@ use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs}; use ioxd_router::create_router2_server_type; use object_store::DynObjectStore; use observability_deps::tracing::*; -use once_cell::sync::Lazy; use parquet_file::storage::{ParquetStorage, StorageId}; use std::{ - collections::HashMap, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, time::Duration, + collections::HashMap, + num::NonZeroUsize, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::Duration, }; -use tempfile::TempDir; use thiserror::Error; use trace_exporters::TracingConfig; use trogging::cli::LoggingConfig; +/// The default name of the influxdb_iox data directory +pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb_iox"; + +/// The default name of the catalog file +pub const DEFAULT_CATALOG_FILENAME: &str = "catalog.sqlite"; + /// The default bind address for the Router HTTP API. pub const DEFAULT_ROUTER_HTTP_BIND_ADDR: &str = "127.0.0.1:8080"; @@ -87,18 +96,18 @@ pub enum Error { pub type Result = std::result::Result; -/// Use a static so the directory lasts the entire process -static DEFAULT_WAL_DIRECTORY: Lazy = - Lazy::new(|| TempDir::new().expect("Could not create a temporary directory for WAL")); - /// The intention is to keep the number of options on this Config -/// object as small as possible. For more complex configurations and -/// deployments the individual services (e.g. Router and Compactor) -/// should be instantiated and configured individually. +/// object as small as possible. All in one mode is designed for ease +/// of use and testing. +/// +/// For production deployments, IOx is designed to run as multiple +/// individual services, and thus knobs for tuning performance are +/// found on those individual services. /// /// This creates the following four services, configured to talk to a /// common catalog, objectstore and write buffer. /// +/// ```text /// ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ /// │ Router │ │ Ingester │ │ Querier │ │ Compactor │ /// └─────────────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ @@ -116,11 +125,11 @@ static DEFAULT_WAL_DIRECTORY: Lazy = /// ( ) ( ) ( ) /// `─────' `─────' `─────' /// -/// Existing Object Store kafka / redpanda -/// Postgres (file, mem, or Object Store -/// (started by pre-existing) (file, mem, or -/// user) configured with pre-existing) -/// --object-store configured with +/// Catalog Object Store Write Ahead Log (WAL) +/// (sqllite, mem, (file, mem, or Object Store +/// or pre-existing) pre-existing) (file, mem, or +/// postgres) pre-existing) +/// ``` /// /// Ideally all the gRPC services would listen on the same port, but /// due to challenges with tonic this effort was postponed. See @@ -128,8 +137,8 @@ static DEFAULT_WAL_DIRECTORY: Lazy = /// for more details /// /// Currently, the starts services on 5 ports, designed so that the -/// ports used to interact with the old architecture are the same as -/// the new architecture (8081 write endpoint and query on 8082). +/// ports are the same as the CLI default (8080 http v2 API endpoint and +/// querier on 8082). /// /// Router; /// 8080 http (overrides INFLUXDB_IOX_BIND_ADDR) @@ -185,6 +194,12 @@ pub struct Config { #[clap(flatten)] catalog_dsn: CatalogDsnConfig, + /// The directory to store the write ahead log + /// + /// If not specified, defaults to INFLUXDB_IOX_DB_DIR/wal + #[clap(long = "wal-directory", env = "INFLUXDB_IOX_WAL_DIRECTORY", action)] + pub wal_directory: Option, + /// The number of seconds between WAL file rotations. #[clap( long = "wal-rotation-period-seconds", @@ -213,6 +228,7 @@ pub struct Config { )] pub persist_max_parallelism: usize, + // TODO - remove these ingester tuning knobs from all in one mode?? /// The maximum number of persist tasks that can be queued at any one time. /// /// Once this limit is reached, ingest is blocked until the persist backlog @@ -322,13 +338,15 @@ pub struct Config { } impl Config { - /// Get a specialized run config to use for each service + /// Convert the all-in-one mode configuration to a specialized + /// configuration for each individual IOx service fn specialize(self) -> SpecializedConfig { let Self { logging_config, tracing_config, max_http_request_size, object_store_config, + wal_directory, catalog_dsn, wal_rotation_period_seconds, concurrent_query_limit, @@ -346,26 +364,56 @@ impl Config { exec_mem_pool_bytes, } = self; - let database_directory = object_store_config.database_directory.clone(); - // If dir location is provided and no object store type is provided, use it also as file object store. - let object_store_config = { - if object_store_config.object_store.is_none() && database_directory.is_some() { - ObjectStoreConfig::new(database_directory.clone()) - } else { - object_store_config - } + // Determine where to store files (wal and possibly catalog + // and object store) + let database_directory = object_store_config + .database_directory + .clone() + .unwrap_or_else(|| { + dirs::home_dir() + .expect("No data-dir specified but could not find user's home directory") + .join(DEFAULT_DATA_DIRECTORY_NAME) + }); + + ensure_directory_exists(&database_directory); + + // if we have an explicit object store configuration, use + // that, otherwise default to file based object store in database directory/object_store + let object_store_config = if object_store_config.object_store.is_some() { + object_store_config + } else { + let object_store_directory = database_directory.join("object_store"); + debug!( + ?object_store_directory, + "No database directory, using default location for object store" + ); + ensure_directory_exists(&object_store_directory); + ObjectStoreConfig::new(Some(object_store_directory)) }; - // if data directory is specified, default to data-dir/wal - // otherwise use a per-process temporary directory - let wal_directory = database_directory - .map(|database_directory| database_directory.join("wal")) - .unwrap_or_else(|| PathBuf::from(DEFAULT_WAL_DIRECTORY.path())); + let wal_directory = wal_directory.clone().unwrap_or_else(|| { + debug!( + ?wal_directory, + "No wal_directory specified, using defaul location for wal" + ); + database_directory.join("wal") + }); + ensure_directory_exists(&wal_directory); - let catalog_dsn = if catalog_dsn.dsn.is_none() { - CatalogDsnConfig::new_memory() - } else { + let catalog_dsn = if catalog_dsn.dsn.is_some() { catalog_dsn + } else { + let local_catalog_path = database_directory + .join(DEFAULT_CATALOG_FILENAME) + .to_string_lossy() + .to_string(); + + debug!( + ?local_catalog_path, + "No catalog dsn specified, using default sqlite catalog" + ); + + CatalogDsnConfig::new_sqlite(local_catalog_path) }; let router_run_config = RunConfig::new( @@ -420,7 +468,7 @@ impl Config { compaction_job_concurrency: NonZeroUsize::new(1).unwrap(), compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(), compaction_partition_minute_threshold: 10, - query_exec_thread_count: 1, + query_exec_thread_count: Some(1), exec_mem_pool_bytes, max_desired_file_size_bytes: 30_000, percentage_max_file_size: 30, @@ -467,6 +515,16 @@ impl Config { } } +/// If `p` does not exist, try to create it as a directory. +/// +/// panic's if the directory does not exist and can not be created +fn ensure_directory_exists(p: &Path) { + if !p.exists() { + println!("Creating directory {p:?}"); + std::fs::create_dir_all(p).expect("Could not create default directory"); + } +} + /// Different run configs for the different services (needed as they /// listen on different ports) struct SpecializedConfig { diff --git a/influxdb_iox/src/commands/run/compactor2.rs b/influxdb_iox/src/commands/run/compactor2.rs index dbea7eade7..26940ee7ed 100644 --- a/influxdb_iox/src/commands/run/compactor2.rs +++ b/influxdb_iox/src/commands/run/compactor2.rs @@ -106,9 +106,15 @@ pub async fn command(config: Config) -> Result<(), Error> { StorageId::from("iox_scratchpad"), ); + let num_threads = config + .compactor_config + .query_exec_thread_count + .unwrap_or_else(|| num_cpus::get() - 1_usize); + info!(%num_threads, "using specified number of threads"); + let exec = Arc::new(Executor::new_with_config(ExecutorConfig { - num_threads: config.compactor_config.query_exec_thread_count, - target_query_partitions: config.compactor_config.query_exec_thread_count, + num_threads, + target_query_partitions: num_threads, object_stores: [&parquet_store_real, &parquet_store_scratchpad] .into_iter() .map(|store| (store.id(), Arc::clone(store.object_store()))) diff --git a/influxdb_iox/src/commands/storage.rs b/influxdb_iox/src/commands/storage.rs index 57d9b49240..414064152e 100644 --- a/influxdb_iox/src/commands/storage.rs +++ b/influxdb_iox/src/commands/storage.rs @@ -1,6 +1,7 @@ pub(crate) mod request; pub(crate) mod response; +use crate::commands::storage::response::{BinaryTagSchema, TextTagSchema}; use generated_types::{ aggregate::AggregateType, influxdata::platform::storage::read_group_request::Group, Predicate, }; @@ -197,7 +198,7 @@ struct ReadGroup { )] group: Group, - #[clap(long, action)] + #[clap(long, action, value_delimiter = ',')] group_keys: Vec, } @@ -303,7 +304,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { info!(?request, "read_filter"); let result = client.read_filter(request).await.context(ServerSnafu)?; match config.format { - Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?, + Format::Pretty => response::pretty_print_frames::(&result) + .context(ResponseSnafu)?, Format::Quiet => {} } } @@ -320,7 +322,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { info!(?request, "read_group"); let result = client.read_group(request).await.context(ServerSnafu)?; match config.format { - Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?, + Format::Pretty => response::pretty_print_frames::(&result) + .context(ResponseSnafu)?, Format::Quiet => {} } } @@ -343,7 +346,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { .context(ServerSnafu)?; match config.format { - Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?, + Format::Pretty => response::pretty_print_frames::(&result) + .context(ResponseSnafu)?, Format::Quiet => {} } } diff --git a/influxdb_iox/src/commands/storage/request.rs b/influxdb_iox/src/commands/storage/request.rs index dfc516f07b..129d0129a5 100644 --- a/influxdb_iox/src/commands/storage/request.rs +++ b/influxdb_iox/src/commands/storage/request.rs @@ -6,7 +6,7 @@ use snafu::Snafu; use self::generated_types::*; use super::response::{ - tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN, + FIELD_TAG_KEY_BIN, FIELD_TAG_KEY_TEXT, MEASUREMENT_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_TEXT, }; use ::generated_types::{aggregate::AggregateType, google::protobuf::*}; @@ -59,7 +59,7 @@ pub fn read_filter( read_source: Some(org_bucket), range: Some(TimestampRange { start, end: stop }), key_sort: read_filter_request::KeySort::Unspecified as i32, // IOx doesn't support any other sort - tag_key_meta_names: TagKeyMetaNames::Text as i32, + tag_key_meta_names: TagKeyMetaNames::Binary as i32, } } @@ -146,6 +146,14 @@ pub fn tag_values( } } +pub(crate) fn tag_key_is_measurement(key: &[u8]) -> bool { + (key == MEASUREMENT_TAG_KEY_TEXT) || (key == MEASUREMENT_TAG_KEY_BIN) +} + +pub(crate) fn tag_key_is_field(key: &[u8]) -> bool { + (key == FIELD_TAG_KEY_TEXT) || (key == FIELD_TAG_KEY_BIN) +} + #[cfg(test)] mod test_super { use std::num::NonZeroU64; diff --git a/influxdb_iox/src/commands/storage/response.rs b/influxdb_iox/src/commands/storage/response.rs index 23b1a9896b..853a6b1a8c 100644 --- a/influxdb_iox/src/commands/storage/response.rs +++ b/influxdb_iox/src/commands/storage/response.rs @@ -39,8 +39,8 @@ pub type Result = std::result::Result; // Prints the provided data frames in a tabular format grouped into tables per // distinct measurement. -pub fn pretty_print_frames(frames: &[Data]) -> Result<()> { - let rbs = frames_to_record_batches(frames)?; +pub fn pretty_print_frames(frames: &[Data]) -> Result<()> { + let rbs = frames_to_record_batches::(frames)?; for (k, rb) in rbs { println!("\n_measurement: {k}"); println!("rows: {:?}\n", &rb.num_rows()); @@ -72,18 +72,16 @@ pub fn pretty_print_strings(values: Vec) -> Result<()> { // This function takes a set of InfluxRPC data frames and converts them into an // Arrow record batches, which are suitable for pretty printing. -fn frames_to_record_batches(frames: &[Data]) -> Result> { +fn frames_to_record_batches( + frames: &[Data], +) -> Result> { // Run through all the frames once to build the schema of each table we need // to build as a record batch. - let mut table_column_mapping = determine_tag_columns(frames); + let mut table_column_mapping = determine_tag_columns::(frames); let mut all_tables = BTreeMap::new(); let mut current_table_frame: Option<(IntermediateTable, SeriesFrame)> = None; - if frames.is_empty() { - return Ok(all_tables); - } - for frame in frames { match frame { generated_types::read_response::frame::Data::Group(_) => { @@ -93,7 +91,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result { - let cur_frame_measurement = &sf.tags[0].value; + let cur_frame_measurement = T::measurement(sf); // First series frame in result set. if current_table_frame.is_none() { @@ -113,10 +111,10 @@ fn frames_to_record_batches(frames: &[Data]) -> Result Result { // Get field key associated with previous series frame. let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap(); - let column = current_table.field_column(field_name(prev_series_frame)); + let column = current_table.field_column(T::field_name(prev_series_frame)); let values = f.values.iter().copied().map(Some).collect::>(); column.extend_f64(&values); @@ -153,7 +151,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result { // Get field key associated with previous series frame. let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap(); - let column = current_table.field_column(field_name(prev_series_frame)); + let column = current_table.field_column(T::field_name(prev_series_frame)); let values = f.values.iter().copied().map(Some).collect::>(); column.extend_i64(&values); @@ -164,7 +162,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result { // Get field key associated with previous series frame. let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap(); - let column = current_table.field_column(field_name(prev_series_frame)); + let column = current_table.field_column(T::field_name(prev_series_frame)); let values = f.values.iter().copied().map(Some).collect::>(); column.extend_u64(&values); @@ -175,7 +173,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result { // Get field key associated with previous series frame. let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap(); - let column = current_table.field_column(field_name(prev_series_frame)); + let column = current_table.field_column(T::field_name(prev_series_frame)); let values = f.values.iter().copied().map(Some).collect::>(); column.extend_bool(&values); @@ -186,7 +184,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result { // Get field key associated with previous series frame. let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap(); - let column = current_table.field_column(field_name(prev_series_frame)); + let column = current_table.field_column(T::field_name(prev_series_frame)); let values = f .values @@ -209,11 +207,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result Result for RecordBatch { // These constants describe known values for the keys associated with // measurements and fields. -const MEASUREMENT_TAG_KEY_TEXT: [u8; 12] = [ - b'_', b'm', b'e', b'a', b's', b'u', b'r', b'e', b'm', b'e', b'n', b't', -]; +pub(crate) const MEASUREMENT_TAG_KEY_TEXT: [u8; 12] = *b"_measurement"; pub(crate) const MEASUREMENT_TAG_KEY_BIN: [u8; 1] = [0_u8]; -const FIELD_TAG_KEY_TEXT: [u8; 6] = [b'_', b'f', b'i', b'e', b'l', b'd']; +pub(crate) const FIELD_TAG_KEY_TEXT: [u8; 6] = *b"_field"; pub(crate) const FIELD_TAG_KEY_BIN: [u8; 1] = [255_u8]; // Store a collection of column names and types for a single table (measurement). #[derive(Debug, Default, PartialEq, Eq)] -struct TableColumns { +pub struct TableColumns { tag_columns: BTreeSet>, field_columns: BTreeMap, DataType>, } -// Given a set of data frames determine from the series frames within the set -// of tag columns for each distinct table (measurement). -fn determine_tag_columns(frames: &[Data]) -> BTreeMap, TableColumns> { +fn determine_tag_columns(frames: &[Data]) -> BTreeMap, TableColumns> { let mut schema: BTreeMap, TableColumns> = BTreeMap::new(); for frame in frames { - if let Data::Series(sf) = frame { + if let Data::Series(ref sf) = frame { assert!(!sf.tags.is_empty(), "expected _measurement and _field tags"); - // PERF: avoid clone of value - let measurement_name = sf - .tags - .iter() - .find(|t| tag_key_is_measurement(&t.key)) - .expect("measurement name not found") - .value - .clone(); + let measurement_name = T::measurement(sf).clone(); let table = schema.entry(measurement_name).or_default(); - for Tag { key, value } in sf.tags.iter().skip(1) { - if tag_key_is_field(key) { - table.field_columns.insert(value.clone(), sf.data_type()); - continue; - } + let field_name = T::field_name(sf).clone(); + table.field_columns.insert(field_name, sf.data_type()); + for Tag { key, .. } in T::tags(sf) { // PERF: avoid clone of key table.tag_columns.insert(key.clone()); // Add column to table schema } @@ -525,25 +507,67 @@ fn determine_tag_columns(frames: &[Data]) -> BTreeMap, TableColumns> { schema } -// Extract a reference to the measurement name from a Series frame. -fn measurement(frame: &SeriesFrame) -> &Vec { - assert!(tag_key_is_measurement(&frame.tags[0].key)); - &frame.tags[0].value +pub trait TagSchema { + type IntoIter<'a>: Iterator; + + /// Returns the value of the measurement meta tag. + fn measurement(frame: &SeriesFrame) -> &Vec; + + /// Returns the value of the field meta tag. + fn field_name(frame: &SeriesFrame) -> &Vec; + + /// Returns the tags without the measurement or field meta tags. + fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_>; } -// Extract a reference to the field name from a Series frame. -fn field_name(frame: &SeriesFrame) -> &Vec { - let idx = frame.tags.len() - 1; - assert!(tag_key_is_field(&frame.tags[idx].key)); - &frame.tags[idx].value +pub struct BinaryTagSchema; + +impl TagSchema for BinaryTagSchema { + type IntoIter<'a> = std::slice::Iter<'a, Tag>; + + fn measurement(frame: &SeriesFrame) -> &Vec { + assert_eq!(frame.tags[0].key, MEASUREMENT_TAG_KEY_BIN); + &frame.tags[0].value + } + + fn field_name(frame: &SeriesFrame) -> &Vec { + let idx = frame.tags.len() - 1; + assert_eq!(frame.tags[idx].key, FIELD_TAG_KEY_BIN); + &frame.tags[idx].value + } + + fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_> { + frame.tags[1..frame.tags.len() - 1].iter() + } } -pub(crate) fn tag_key_is_measurement(key: &[u8]) -> bool { - (key == MEASUREMENT_TAG_KEY_TEXT) || (key == MEASUREMENT_TAG_KEY_BIN) -} +pub struct TextTagSchema; -pub(crate) fn tag_key_is_field(key: &[u8]) -> bool { - (key == FIELD_TAG_KEY_TEXT) || (key == FIELD_TAG_KEY_BIN) +impl TagSchema for TextTagSchema { + type IntoIter<'a> = iter::Filter, fn(&&Tag) -> bool>; + + fn measurement(frame: &SeriesFrame) -> &Vec { + let idx = frame + .tags + .binary_search_by(|t| t.key[..].cmp(&MEASUREMENT_TAG_KEY_TEXT[..])) + .expect("missing measurement"); + &frame.tags[idx].value + } + + fn field_name(frame: &SeriesFrame) -> &Vec { + let idx = frame + .tags + .binary_search_by(|t| t.key[..].cmp(&FIELD_TAG_KEY_TEXT[..])) + .expect("missing field"); + &frame.tags[idx].value + } + + fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_> { + frame + .tags + .iter() + .filter(|t| t.key != MEASUREMENT_TAG_KEY_TEXT && t.key != FIELD_TAG_KEY_TEXT) + } } #[cfg(test)] @@ -553,15 +577,17 @@ mod test_super { BooleanPointsFrame, FloatPointsFrame, IntegerPointsFrame, SeriesFrame, StringPointsFrame, UnsignedPointsFrame, }; + use itertools::Itertools; use super::*; - // converts a vector of key/value pairs into a vector of `Tag`. - fn make_tags(pairs: &[(&str, &str)]) -> Vec { + /// Converts a vector of `(key, value)` tuples into a vector of `Tag`, sorted by key. + fn make_tags(pairs: &[(&[u8], &str)]) -> Vec { pairs .iter() + .sorted_by(|(a_key, _), (b_key, _)| Ord::cmp(a_key, b_key)) .map(|(key, value)| Tag { - key: key.as_bytes().to_vec(), + key: key.to_vec(), value: value.as_bytes().to_vec(), }) .collect::>() @@ -618,15 +644,32 @@ mod test_super { all_table_columns } + trait KeyNames { + const MEASUREMENT_KEY: &'static [u8]; + const FIELD_KEY: &'static [u8]; + } + + struct BinaryKeyNames; + impl KeyNames for BinaryKeyNames { + const MEASUREMENT_KEY: &'static [u8] = &[0_u8]; + const FIELD_KEY: &'static [u8] = &[255_u8]; + } + + struct TextKeyNames; + impl KeyNames for TextKeyNames { + const MEASUREMENT_KEY: &'static [u8] = b"_measurement"; + const FIELD_KEY: &'static [u8] = b"_field"; + } + // generate a substantial set of frames across multiple tables. - fn gen_frames() -> Vec { + fn gen_frames() -> Vec { vec![ Data::Series(SeriesFrame { tags: make_tags(&[ - ("_measurement", "cpu"), - ("host", "foo"), - ("server", "a"), - ("_field", "temp"), + (K::MEASUREMENT_KEY, "cpu"), + (b"host", "foo"), + (b"server", "a"), + (K::FIELD_KEY, "temp"), ]), data_type: DataType::Float as i32, }), @@ -640,10 +683,10 @@ mod test_super { }), Data::Series(SeriesFrame { tags: make_tags(&[ - ("_measurement", "cpu"), - ("host", "foo"), - ("server", "a"), - ("_field", "voltage"), + (K::MEASUREMENT_KEY, "cpu"), + (b"host", "foo"), + (b"server", "a"), + (K::FIELD_KEY, "voltage"), ]), data_type: DataType::Integer as i32, }), @@ -653,10 +696,10 @@ mod test_super { }), Data::Series(SeriesFrame { tags: make_tags(&[ - ("_measurement", "cpu"), - ("host", "foo"), - ("new_column", "a"), - ("_field", "voltage"), + (K::MEASUREMENT_KEY, "cpu"), + (b"host", "foo"), + (b"new_column", "a"), + (K::FIELD_KEY, "voltage"), ]), data_type: DataType::Integer as i32, }), @@ -665,7 +708,10 @@ mod test_super { values: vec![1000, 2000], }), Data::Series(SeriesFrame { - tags: make_tags(&[("_measurement", "another table"), ("_field", "voltage")]), + tags: make_tags(&[ + (K::MEASUREMENT_KEY, "another table"), + (K::FIELD_KEY, "voltage"), + ]), data_type: DataType::String as i32, }), Data::StringPoints(StringPointsFrame { @@ -674,9 +720,9 @@ mod test_super { }), Data::Series(SeriesFrame { tags: make_tags(&[ - ("_measurement", "another table"), - ("region", "west"), - ("_field", "voltage"), + (K::MEASUREMENT_KEY, "another table"), + (b"region", "west"), + (K::FIELD_KEY, "voltage"), ]), data_type: DataType::String as i32, }), @@ -686,9 +732,9 @@ mod test_super { }), Data::Series(SeriesFrame { tags: make_tags(&[ - ("_measurement", "another table"), - ("region", "north"), - ("_field", "bool_field"), + (K::MEASUREMENT_KEY, "another table"), + (b"region", "north"), + (K::FIELD_KEY, "bool_field"), ]), data_type: DataType::Boolean as i32, }), @@ -698,9 +744,9 @@ mod test_super { }), Data::Series(SeriesFrame { tags: make_tags(&[ - ("_measurement", "another table"), - ("region", "south"), - ("_field", "unsigned_field"), + (K::MEASUREMENT_KEY, "another table"), + (b"region", "south"), + (K::FIELD_KEY, "unsigned_field"), ]), data_type: DataType::Unsigned as i32, }), @@ -712,11 +758,15 @@ mod test_super { } #[test] - fn test_determine_tag_columns() { - assert!(determine_tag_columns(&[]).is_empty()); + fn test_binary_determine_tag_columns() { + assert!(determine_tag_columns::(&[]).is_empty()); let frame = Data::Series(SeriesFrame { - tags: make_tags(&[("_measurement", "cpu"), ("server", "a"), ("_field", "temp")]), + tags: make_tags(&[ + (BinaryKeyNames::MEASUREMENT_KEY, "cpu"), + (b"server", "a"), + (BinaryKeyNames::FIELD_KEY, "temp"), + ]), data_type: DataType::Float as i32, }); @@ -725,10 +775,10 @@ mod test_super { &["server"], &[("temp", DataType::Float)], )]); - assert_eq!(determine_tag_columns(&[frame]), exp); + assert_eq!(determine_tag_columns::(&[frame]), exp); // larger example - let frames = gen_frames(); + let frames = gen_frames::(); let exp = make_table_columns(&[ TableColumnInput::new( @@ -746,14 +796,56 @@ mod test_super { ], ), ]); - assert_eq!(determine_tag_columns(&frames), exp); + assert_eq!(determine_tag_columns::(&frames), exp); + } + + #[test] + fn test_text_determine_tag_columns() { + assert!(determine_tag_columns::(&[]).is_empty()); + + let frame = Data::Series(SeriesFrame { + tags: make_tags(&[ + (b"_measurement", "cpu"), + (b"server", "a"), + (b"_field", "temp"), + ]), + data_type: DataType::Float as i32, + }); + + let exp = make_table_columns(&[TableColumnInput::new( + "cpu", + &["server"], + &[("temp", DataType::Float)], + )]); + assert_eq!(determine_tag_columns::(&[frame]), exp); + + // larger example + let frames = gen_frames::(); + + let exp = make_table_columns(&[ + TableColumnInput::new( + "cpu", + &["host", "new_column", "server"], + &[("temp", DataType::Float), ("voltage", DataType::Integer)], + ), + TableColumnInput::new( + "another table", + &["region"], + &[ + ("bool_field", DataType::Boolean), + ("unsigned_field", DataType::Unsigned), + ("voltage", DataType::String), + ], + ), + ]); + assert_eq!(determine_tag_columns::(&frames), exp); } #[test] fn test_frames_to_into_record_batches() { - let frames = gen_frames(); + let frames = gen_frames::(); - let rbs = frames_to_record_batches(&frames); + let rbs = frames_to_record_batches::(&frames); let exp = vec![ ( "another table", diff --git a/ioxd_common/Cargo.toml b/ioxd_common/Cargo.toml index 28acfb8e70..df5e3826d6 100644 --- a/ioxd_common/Cargo.toml +++ b/ioxd_common/Cargo.toml @@ -33,7 +33,7 @@ chrono = { version = "0.4", default-features = false } flate2 = "1.0" futures = "0.3" hashbrown = { workspace = true } -http = "0.2.8" +http = "0.2.9" hyper = "0.14" log = "0.4" parking_lot = "0.12" diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index 970d36bdaf..83ee1580ae 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -15,7 +15,7 @@ data_types = { path = "../data_types" } dml = { path = "../dml" } futures = "0.3" generated_types = { path = "../generated_types" } -http = "0.2.8" +http = "0.2.9" hyper = "0.14" influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } mutable_batch_lp = { path = "../mutable_batch_lp" } diff --git a/trogging/src/cli.rs b/trogging/src/cli.rs index 31f9f7a5e0..9a490a1881 100644 --- a/trogging/src/cli.rs +++ b/trogging/src/cli.rs @@ -31,7 +31,7 @@ pub struct LoggingConfig { /// Convenient way to set log severity level filter. /// Overrides `--log-filter`. /// - /// -v 'info' + /// -v 'info,sqlx=warn' /// /// -vv 'debug,hyper::proto::h1=info,h2=info' /// diff --git a/trogging/src/lib.rs b/trogging/src/lib.rs index c411ad84ac..e6beec1f9d 100644 --- a/trogging/src/lib.rs +++ b/trogging/src/lib.rs @@ -118,7 +118,7 @@ where pub fn with_log_verbose_count(self, log_verbose_count: u8) -> Self { let log_filter = match log_verbose_count { 0 => self.log_filter, - 1 => Some(EnvFilter::try_new("info").unwrap()), + 1 => Some(EnvFilter::try_new("info,sqlx=warn").unwrap()), 2 => Some(EnvFilter::try_new("debug,hyper::proto::h1=info,h2=info").unwrap()), _ => Some(EnvFilter::try_new("trace,hyper::proto::h1=info,h2=info").unwrap()), };