feat: Cleanup CLI flags for InfluxDB 3 Core (#25737)

This makes quite a few major changes to our CLI and how users interact
with it:

1. All commands are now in the form <verb> <noun> this was to make the
   commands consistent. We had last-cache as a noun, but serve as a
   verb in the top level. Given that we could only create or delete
   All noun based commands have been move under a create and delete
   command
2. --host short form is now -H not -h which is reassigned to -h/--help
   for shorter help text and is in line with what users would expect
   for a CLI
3. Only the needed items from clap_blocks have been moved into
   `influxdb3_clap_blocks` and any IOx specific references were changed
   to InfluxDB 3 specific ones
4. References to InfluxDB 3.0 OSS have been changed to InfluxDB 3 Core
   in our CLI tools
5. --dbname has been changed to --database to be consistent with --table
   in many commands. The short -d flag still remains. In the create/
   delete command for the database however the name of the database is
   a positional arg

   e.g. `influxbd3 create database foo` rather than
        `influxdb3 database create --dbname foo`
6. --table has been removed from the delete/create command for tables
   and is now a positional arg much like database
7. clap_blocks was removed as dependency to avoid having IOx specific
   env vars
8. --cache-name is now an optional positional arg for last_cache and meta_cache
9. last-cache/meta-cache commands are now last_cache and meta_cache respectively

Unfortunately we have quite a few options to run the software and I
couldn't cut down on them, but at least with this commands and options
will be more discoverable and we have full control over our CLI options
now.

Closes #25646
pull/25748/head
Michael Gattozzi 2025-01-06 18:51:55 -05:00 committed by GitHub
parent 1ce6a24c3f
commit f793d31f63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 2793 additions and 1364 deletions

213
Cargo.lock generated
View File

@ -468,7 +468,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -479,7 +479,7 @@ checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -653,7 +653,7 @@ checksum = "a539389a13af092cd345a2b47ae7dec12deb306d660b2223d25cd3419b253ebe"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -910,35 +910,6 @@ dependencies = [
"clap_derive",
]
[[package]]
name = "clap_blocks"
version = "0.1.0"
source = "git+https://github.com/influxdata/influxdb3_core?rev=a5f6076c966f4940a67998e0b85d12c3e8596715#a5f6076c966f4940a67998e0b85d12c3e8596715"
dependencies = [
"async-trait",
"clap",
"ed25519-dalek",
"http 0.2.12",
"http 1.2.0",
"humantime",
"iox_catalog",
"iox_time",
"itertools 0.13.0",
"libc",
"metric",
"non-empty-string",
"object_store",
"observability_deps",
"paste",
"snafu",
"sysinfo 0.33.1",
"tokio",
"trace_exporters",
"trogging",
"url",
"workspace-hack",
]
[[package]]
name = "clap_builder"
version = "4.5.23"
@ -960,7 +931,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -1282,33 +1253,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"digest",
"fiat-crypto",
"rustc_version",
"subtle",
"zeroize",
]
[[package]]
name = "curve25519-dalek-derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
]
[[package]]
name = "darling"
version = "0.20.10"
@ -1330,7 +1274,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -1341,7 +1285,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -1855,7 +1799,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -1876,30 +1820,6 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "ed25519"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
dependencies = [
"pkcs8",
"signature",
]
[[package]]
name = "ed25519-dalek"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871"
dependencies = [
"curve25519-dalek",
"ed25519",
"serde",
"sha2",
"subtle",
"zeroize",
]
[[package]]
name = "either"
version = "1.13.0"
@ -2013,12 +1933,6 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "fiat-crypto"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
[[package]]
name = "fixedbitset"
version = "0.4.2"
@ -2171,7 +2085,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -2742,7 +2656,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -2826,7 +2740,6 @@ dependencies = [
"backtrace",
"base64 0.21.7",
"clap",
"clap_blocks",
"console-subscriber",
"datafusion_util",
"dotenvy",
@ -2942,16 +2855,32 @@ dependencies = [
name = "influxdb3_clap_blocks"
version = "0.1.0"
dependencies = [
"async-trait",
"clap",
"datafusion",
"futures",
"http 0.2.12",
"http 1.2.0",
"humantime",
"iox_catalog",
"iox_query",
"iox_time",
"itertools 0.13.0",
"libc",
"metric",
"non-empty-string",
"object_store",
"observability_deps",
"paste",
"snafu",
"sysinfo 0.30.13",
"tempfile",
"test-log",
"test_helpers",
"tokio",
"trace_exporters",
"trogging",
"url",
]
[[package]]
@ -3293,13 +3222,13 @@ dependencies = [
[[package]]
name = "insta"
version = "1.41.1"
version = "1.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e9ffc4d4892617c50a928c52b2961cb5174b6fc6ebf252b2fac9d21955c48b8"
checksum = "6513e4067e16e69ed1db5ab56048ed65db32d10ba5fc1217f5393f8f17d8b5a5"
dependencies = [
"console",
"lazy_static",
"linked-hash-map",
"once_cell",
"pest",
"pest_derive",
"serde",
@ -3873,7 +3802,7 @@ dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -4385,7 +4314,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -4449,29 +4378,29 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.1.7"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95"
checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.7"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c"
checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
name = "pin-project-lite"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
@ -4612,12 +4541,12 @@ dependencies = [
[[package]]
name = "prettyplease"
version = "0.2.25"
version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033"
checksum = "483f8c21f64f3ea09fe0f30f5d48c3e8eefe5dac9129f0075f76593b4c1da705"
dependencies = [
"proc-macro2",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -4722,7 +4651,7 @@ dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
"regex",
"syn 2.0.94",
"syn 2.0.95",
"tempfile",
]
@ -4749,7 +4678,7 @@ dependencies = [
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -4762,7 +4691,7 @@ dependencies = [
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -4830,7 +4759,7 @@ dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -4843,7 +4772,7 @@ dependencies = [
"proc-macro2",
"pyo3-build-config",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -5483,7 +5412,7 @@ checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -5537,7 +5466,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -5698,7 +5627,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -5764,7 +5693,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -5844,7 +5773,7 @@ dependencies = [
"quote",
"sqlx-core",
"sqlx-macros-core",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -5867,7 +5796,7 @@ dependencies = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"syn 2.0.94",
"syn 2.0.95",
"tempfile",
"tokio",
"url",
@ -6027,7 +5956,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6049,9 +5978,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.94"
version = "2.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "987bc0be1cdea8b10216bd06e2ca407d40b9543468fafd3ddfb02f36e77f71f3"
checksum = "46f71c0377baf4ef1cc3e3402ded576dccc315800fbc62dfc7fe04b009773b4a"
dependencies = [
"proc-macro2",
"quote",
@ -6081,7 +6010,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6179,7 +6108,7 @@ checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6228,7 +6157,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6239,7 +6168,7 @@ checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6417,7 +6346,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6579,7 +6508,7 @@ dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -6727,7 +6656,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -7037,7 +6966,7 @@ dependencies = [
"log",
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
"wasm-bindgen-shared",
]
@ -7072,7 +7001,7 @@ checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -7221,7 +7150,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -7232,7 +7161,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -7523,7 +7452,7 @@ dependencies = [
"sqlx-sqlite",
"strum",
"subtle",
"syn 2.0.94",
"syn 2.0.95",
"thrift",
"tokio",
"tokio-metrics",
@ -7596,7 +7525,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
"synstructure",
]
@ -7618,7 +7547,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -7638,7 +7567,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
"synstructure",
]
@ -7659,7 +7588,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]
@ -7681,7 +7610,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.94",
"syn 2.0.95",
]
[[package]]

View File

@ -3,7 +3,8 @@
members = [
"influxdb3",
"influxdb3_cache",
"influxdb3_catalog", "influxdb3_clap_blocks",
"influxdb3_catalog",
"influxdb3_clap_blocks",
"influxdb3_client",
"influxdb3_id",
"influxdb3_load_generator",
@ -77,10 +78,12 @@ humantime = "2.1.0"
hyper = "0.14"
insta = { version = "1.39", features = ["json", "redactions", "yaml"] }
indexmap = { version = "2.2.6" }
itertools = "0.13.0"
libc = { version = "0.2" }
mime = "0.3.17"
mockito = { version = "1.4.0", default-features = false }
mockall = { version = "0.13.0" }
non-empty-string = "0.2.5"
num_cpus = "1.16.0"
object_store = "0.11.1"
parking_lot = "0.12.1"
@ -105,9 +108,11 @@ serde_json = "1.0.127"
serde_urlencoded = "0.7.0"
serde_with = "3.8.1"
sha2 = "0.10.8"
snafu = "0.8"
snap = "1.0.0"
sqlparser = "0.48.0"
sysinfo = "0.30.8"
tempfile = "3.14.0"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.42", features = ["full"] }
@ -126,7 +131,6 @@ num = { version = "0.4.3" }
# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
@ -154,7 +158,7 @@ trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c9
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715", features = ["clap"] }
[workspace.lints.rust]
missing_copy_implementations = "deny"

View File

@ -8,7 +8,6 @@ license.workspace = true
[dependencies]
# Core Crates
authz.workspace = true
clap_blocks.workspace = true
datafusion_util.workspace = true
iox_query.workspace = true
iox_time.workspace = true
@ -63,9 +62,9 @@ console-subscriber = { version = "0.1.10", optional = true, features = ["parking
[features]
default = ["jemalloc_replacing_malloc", "azure", "gcp", "aws"]
azure = ["clap_blocks/azure"] # Optional Azure Object store support
gcp = ["clap_blocks/gcp"] # Optional GCP object store support
aws = ["clap_blocks/aws"] # Optional AWS / S3 object store support
azure = ["influxdb3_clap_blocks/azure"] # Optional Azure Object store support
gcp = ["influxdb3_clap_blocks/gcp"] # Optional GCP object store support
aws = ["influxdb3_clap_blocks/aws"] # Optional AWS / S3 object store support
# Enable tokio_console support (https://github.com/tokio-rs/console)
#

View File

@ -0,0 +1,63 @@
use crate::commands::common::InfluxDb3Config;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use std::error::Error;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
cmd: SubCommand,
}
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
let (host_url, auth_token) = match &self.cmd {
SubCommand::Trigger(TriggerConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
}) => (host_url, auth_token),
};
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
}
#[derive(Debug, clap::Subcommand)]
enum SubCommand {
/// Activate a trigger to enable plugin execution
Trigger(TriggerConfig),
}
#[derive(Debug, clap::Parser)]
struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of trigger to manage
#[clap(required = true)]
trigger_name: String,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
match config.cmd {
SubCommand::Trigger(TriggerConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
trigger_name,
}) => {
client
.api_v3_configure_processing_engine_trigger_activate(database_name, &trigger_name)
.await?;
println!("Trigger {} activated successfully", trigger_name);
}
}
Ok(())
}

View File

@ -1,25 +1,26 @@
use std::str::FromStr;
use clap::Parser;
use secrecy::Secret;
use std::error::Error;
use std::fmt::Display;
use std::str::FromStr;
use url::Url;
#[derive(Debug, Parser)]
pub struct InfluxDb3Config {
/// The host URL of the running InfluxDB 3.0 server
/// The host URL of the running InfluxDB 3 Core server
#[clap(
short = 'h',
short = 'H',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub host_url: Url,
/// The database name to run the query against
#[clap(short = 'd', long = "dbname", env = "INFLUXDB3_DATABASE_NAME")]
/// The name of the database to operate on
#[clap(short = 'd', long = "database", env = "INFLUXDB3_DATABASE_NAME")]
pub database_name: String,
/// The token for authentication with the InfluxDB 3.0 server
/// The token for authentication with the InfluxDB 3 Core server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub auth_token: Option<Secret<String>>,
}
@ -77,3 +78,61 @@ impl<T, const SEPARATOR: char> IntoIterator for SeparatedList<T, SEPARATOR> {
self.0.into_iter()
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DataType {
Int64,
Uint64,
Float64,
Utf8,
Bool,
}
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("{0} is not a valid data type, values are int64, uint64, float64, utf8, and bool")]
pub struct ParseDataTypeError(String);
impl FromStr for DataType {
type Err = ParseDataTypeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"int64" => Ok(Self::Int64),
"uint64" => Ok(Self::Uint64),
"float64" => Ok(Self::Float64),
"utf8" => Ok(Self::Utf8),
"bool" => Ok(Self::Bool),
_ => Err(ParseDataTypeError(s.into())),
}
}
}
impl Display for DataType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Int64 => write!(f, "int64"),
Self::Uint64 => write!(f, "uint64"),
Self::Float64 => write!(f, "float64"),
Self::Utf8 => write!(f, "utf8"),
Self::Bool => write!(f, "bool"),
}
}
}
impl From<DataType> for String {
fn from(data: DataType) -> Self {
data.to_string()
}
}
/// Parse a single key-value pair
pub fn parse_key_val<T, U>(s: &str) -> Result<(T, U), Box<dyn Error + Send + Sync + 'static>>
where
T: std::str::FromStr,
T::Err: Error + Send + Sync + 'static,
U: std::str::FromStr,
U::Err: Error + Send + Sync + 'static,
{
let pos = s
.find(':')
.ok_or_else(|| format!("invalid FIELD:VALUE. No `:` found in `{s}`"))?;
Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
}

View File

@ -0,0 +1,450 @@
use crate::commands::common::{parse_key_val, DataType, InfluxDb3Config, SeparatedList};
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
use base64::Engine as _;
use influxdb3_client::Client;
use influxdb3_wal::TriggerSpecificationDefinition;
use rand::rngs::OsRng;
use rand::RngCore;
use secrecy::ExposeSecret;
use secrecy::Secret;
use sha2::Digest;
use sha2::Sha512;
use std::error::Error;
use std::fs;
use std::num::NonZeroUsize;
use std::str;
use std::time::Duration;
use url::Url;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
cmd: SubCommand,
}
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
match &self.cmd {
SubCommand::Database(DatabaseConfig {
host_url,
auth_token,
..
})
| SubCommand::LastCache(LastCacheConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::MetaCache(MetaCacheConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Plugin(PluginConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Table(TableConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Trigger(TriggerConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
}) => {
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
SubCommand::Token => unreachable!(),
}
}
}
#[derive(Debug, clap::Subcommand)]
pub enum SubCommand {
/// Create a new database
Database(DatabaseConfig),
/// Create a new last value cache
#[clap(name = "last_cache")]
LastCache(LastCacheConfig),
/// Create a new metadata cache
#[clap(name = "meta_cache")]
MetaCache(MetaCacheConfig),
/// Create a new processing engine plugin
Plugin(PluginConfig),
/// Create a new table in a database
Table(TableConfig),
/// Create a new auth token
Token,
/// Create a new trigger for the processing engine
Trigger(TriggerConfig),
}
#[derive(Debug, clap::Args)]
pub struct DatabaseConfig {
/// The host URL of the running InfluxDB 3 Core server
#[clap(
short = 'H',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub host_url: Url,
/// The token for authentication with the InfluxDB 3 Core server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub auth_token: Option<Secret<String>>,
/// The name of the database to create. Valid database names are
/// alphanumeric with - and _ allowed and starts with a letter or number
#[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)]
pub database_name: String,
}
#[derive(Debug, clap::Args)]
pub struct LastCacheConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The table name for which the cache is being created
#[clap(short = 't', long = "table")]
table: String,
/// Which columns in the table to use as keys in the cache. This is a comma separated list.
///
/// Example: --key-columns "foo,bar,baz"
#[clap(long = "key-columns")]
key_columns: Option<SeparatedList<String>>,
/// Which columns in the table to store as values in the cache. This is a comma separated list
///
/// Example: --value-columns "foo,bar,baz"
#[clap(long = "value-columns")]
value_columns: Option<SeparatedList<String>>,
/// The number of entries per unique key column combination the cache will store
#[clap(long = "count")]
count: Option<usize>,
/// The time-to-live (TTL) for entries in a cache. This uses a humantime form for example: --ttl "10s",
/// --ttl "1min 30sec", --ttl "3 hours"
///
/// See the parse_duration docs for more details about acceptable forms:
/// <https://docs.rs/humantime/2.1.0/humantime/fn.parse_duration.html>
#[clap(long = "ttl", value_parser = humantime::parse_duration)]
ttl: Option<Duration>,
/// Give a name for the cache.
#[clap(required = false)]
cache_name: Option<String>,
}
#[derive(Debug, clap::Args)]
pub struct MetaCacheConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The table name for which the cache is being created
#[clap(short = 't', long = "table")]
table: String,
/// Which columns in the table to cache distinct values for, as a comma-separated list of the
/// column names.
///
/// The cache is a hieararchical structure, with a level for each column specified; the order
/// specified here will determine the order of the levels from top-to-bottom of the cache
/// hierarchy.
#[clap(long = "columns")]
columns: SeparatedList<String>,
/// The maximum number of distinct value combinations to hold in the cache
#[clap(long = "max-cardinality")]
max_cardinality: Option<NonZeroUsize>,
/// The maximum age of an entry in the cache entered as a human-readable duration, e.g., "30d", "24h"
#[clap(long = "max-age")]
max_age: Option<humantime::Duration>,
/// Give the name of the cache.
///
/// This will be automatically generated if not provided
#[clap(required = false)]
cache_name: Option<String>,
}
#[derive(Debug, clap::Parser)]
pub struct PluginConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Python file containing the plugin code
#[clap(long = "code-filename")]
code_file: String,
/// Entry point function for the plugin
#[clap(long = "entry-point")]
function_name: String,
/// Type of trigger the plugin processes
#[clap(long = "plugin-type", default_value = "wal_rows")]
plugin_type: String,
/// Name of the plugin to create
plugin_name: String,
}
#[derive(Debug, clap::Args)]
pub struct TableConfig {
#[clap(long = "tags", required = true, num_args=0..)]
/// The list of tag names to be created for the table. Tags are alphanumeric, can contain - and _, and start with a letter or number
tags: Vec<String>,
#[clap(short = 'f', long = "fields", value_parser = parse_key_val::<String, DataType>, num_args=0..)]
/// The list of field names and their data type to be created for the table. Fields are alphanumeric, can contain - and _, and start with a letter or number
/// The expected format is a list like so: 'field_name:data_type'. Valid data types are: int64, uint64, float64, utf8, and bool
fields: Vec<(String, DataType)>,
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
#[clap(required = true)]
/// The name of the table to be created
table_name: String,
}
#[derive(Debug, clap::Parser)]
pub struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Plugin to execute when trigger fires
#[clap(long = "plugin")]
plugin_name: String,
/// When the trigger should fire
#[clap(long = "trigger-spec",
value_parser = TriggerSpecificationDefinition::from_string_rep,
help = "Trigger specification format: 'table:<TABLE_NAME>' or 'all_tables'")]
trigger_specification: TriggerSpecificationDefinition,
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
/// Name for the new trigger
trigger_name: String,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
match config.cmd {
SubCommand::Database(DatabaseConfig { database_name, .. }) => {
client.api_v3_configure_db_create(&database_name).await?;
println!("Database {:?} created successfully", &database_name);
}
SubCommand::LastCache(LastCacheConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table,
cache_name,
key_columns,
value_columns,
count,
ttl,
}) => {
let mut b = client.api_v3_configure_last_cache_create(database_name, table);
// Add optional parameters:
if let Some(name) = cache_name {
b = b.name(name);
}
if let Some(keys) = key_columns {
b = b.key_columns(keys);
}
if let Some(vals) = value_columns {
b = b.value_columns(vals);
}
if let Some(count) = count {
b = b.count(count);
}
if let Some(ttl) = ttl {
b = b.ttl(ttl.as_secs());
}
// Make the request:
match b.send().await? {
Some(def) => println!(
"new cache created: {}",
serde_json::to_string_pretty(&def)
.expect("serialize last cache definition as JSON")
),
None => println!("a cache already exists for the provided parameters"),
}
}
SubCommand::MetaCache(MetaCacheConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table,
cache_name,
columns,
max_cardinality,
max_age,
}) => {
let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns);
// Add the optional stuff:
if let Some(name) = cache_name {
b = b.name(name);
}
if let Some(max_cardinality) = max_cardinality {
b = b.max_cardinality(max_cardinality);
}
if let Some(max_age) = max_age {
b = b.max_age(max_age.into());
}
match b.send().await? {
Some(def) => println!(
"new cache created: {}",
serde_json::to_string_pretty(&def)
.expect("serialize meta cache definition as JSON")
),
None => println!("a cache already exists for the provided parameters"),
}
}
SubCommand::Plugin(PluginConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
plugin_name,
code_file,
function_name,
plugin_type,
}) => {
let code = fs::read_to_string(&code_file)?;
client
.api_v3_configure_processing_engine_plugin_create(
database_name,
&plugin_name,
code,
function_name,
plugin_type,
)
.await?;
println!("Plugin {} created successfully", plugin_name);
}
SubCommand::Table(TableConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table_name,
tags,
fields,
}) => {
client
.api_v3_configure_table_create(&database_name, &table_name, tags, fields)
.await?;
println!(
"Table {:?}.{:?} created successfully",
&database_name, &table_name
);
}
SubCommand::Token => {
let token = {
let mut token = String::from("apiv3_");
let mut key = [0u8; 64];
OsRng.fill_bytes(&mut key);
token.push_str(&B64.encode(key));
token
};
println!(
"\
Token: {token}\n\
Hashed Token: {hashed}\n\n\
Start the server with `influxdb3 serve --bearer-token {hashed}`\n\n\
HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\
This will grant you access to every HTTP endpoint or deny it otherwise
",
hashed = hex::encode(&Sha512::digest(&token)[..])
);
}
SubCommand::Trigger(TriggerConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
trigger_name,
plugin_name,
trigger_specification,
disabled,
}) => {
client
.api_v3_configure_processing_engine_trigger_create(
database_name,
&trigger_name,
plugin_name,
trigger_specification.string_rep(),
disabled,
)
.await?;
println!("Trigger {} created successfully", trigger_name);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use clap::Parser;
#[test]
fn parse_args() {
let args = super::Config::parse_from([
"create",
"last_cache",
"--database",
"bar",
"--table",
"foo",
"--key-columns",
"tag1,tag2,tag3",
"--value-columns",
"field1,field2,field3",
"--ttl",
"1 hour",
"--count",
"5",
"bar",
]);
let super::SubCommand::LastCache(super::LastCacheConfig {
table,
cache_name,
key_columns,
value_columns,
count,
ttl,
influxdb3_config: crate::commands::common::InfluxDb3Config { database_name, .. },
}) = args.cmd
else {
panic!("Did not parse args correctly: {args:#?}")
};
assert_eq!("bar", database_name);
assert_eq!("foo", table);
assert!(cache_name.is_some_and(|n| n == "bar"));
assert!(key_columns.is_some_and(|keys| keys.0 == ["tag1", "tag2", "tag3"]));
assert!(value_columns.is_some_and(|vals| vals.0 == ["field1", "field2", "field3"]));
assert!(count.is_some_and(|c| c == 5));
assert!(ttl.is_some_and(|t| t.as_secs() == 3600));
}
}

View File

@ -0,0 +1,63 @@
use crate::commands::common::InfluxDb3Config;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use std::error::Error;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
cmd: SubCommand,
}
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
let (host_url, auth_token) = match &self.cmd {
SubCommand::Trigger(TriggerConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
}) => (host_url, auth_token),
};
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
}
#[derive(Debug, clap::Subcommand)]
enum SubCommand {
/// Deactivate a plugin trigger
Trigger(TriggerConfig),
}
#[derive(Debug, clap::Parser)]
struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of trigger to deactivate
#[clap(required = true)]
trigger_name: String,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
match config.cmd {
SubCommand::Trigger(TriggerConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
trigger_name,
}) => {
client
.api_v3_configure_processing_engine_trigger_deactivate(database_name, &trigger_name)
.await?;
println!("Trigger {} deactivated successfully", trigger_name);
}
}
Ok(())
}

View File

@ -0,0 +1,265 @@
use super::common::InfluxDb3Config;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use secrecy::Secret;
use std::error::Error;
use std::io;
use url::Url;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
cmd: SubCommand,
}
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
match &self.cmd {
SubCommand::Database(DatabaseConfig {
host_url,
auth_token,
..
})
| SubCommand::LastCache(LastCacheConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::MetaCache(MetaCacheConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Plugin(PluginConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Table(TableConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Trigger(TriggerConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
}) => {
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
}
}
}
#[derive(Debug, clap::Subcommand)]
pub enum SubCommand {
/// Delete a database
Database(DatabaseConfig),
/// Delete a last value cache
#[clap(name = "last_cache")]
LastCache(LastCacheConfig),
/// Delete a meta value cache
#[clap(name = "meta_cache")]
MetaCache(MetaCacheConfig),
/// Delete an existing processing engine plugin
Plugin(PluginConfig),
/// Delete a table in a database
Table(TableConfig),
/// Delete a trigger
Trigger(TriggerConfig),
}
#[derive(Debug, clap::Args)]
pub struct DatabaseConfig {
/// The host URL of the running InfluxDB 3 Core server
#[clap(
short = 'H',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub host_url: Url,
/// The token for authentication with the InfluxDB 3 Core server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub auth_token: Option<Secret<String>>,
/// The name of the database to be deleted
#[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)]
pub database_name: String,
}
#[derive(Debug, clap::Args)]
pub struct LastCacheConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The table under which the cache is being deleted
#[clap(short = 't', long = "table")]
table: String,
/// The name of the cache being deleted
#[clap(required = true)]
cache_name: String,
}
#[derive(Debug, clap::Args)]
pub struct MetaCacheConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The table under which the cache is being deleted
#[clap(short = 't', long = "table")]
table: String,
/// The name of the cache being deleted
#[clap(required = true)]
cache_name: String,
}
#[derive(Debug, clap::Parser)]
pub struct PluginConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of the plugin to delete
#[clap(required = true)]
plugin_name: String,
}
#[derive(Debug, clap::Args)]
pub struct TableConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
#[clap(required = true)]
/// The name of the table to be deleted
table_name: String,
}
#[derive(Debug, clap::Parser)]
pub struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Force deletion even if trigger is active
#[clap(long)]
force: bool,
/// Name of trigger to delete
#[clap(required = true)]
trigger_name: String,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
match config.cmd {
SubCommand::Database(DatabaseConfig { database_name, .. }) => {
println!(
"Are you sure you want to delete {:?}? Enter 'yes' to confirm",
database_name
);
let mut confirmation = String::new();
let _ = io::stdin().read_line(&mut confirmation);
if confirmation.trim() != "yes" {
println!("Cannot delete database without confirmation");
} else {
client.api_v3_configure_db_delete(&database_name).await?;
println!("Database {:?} deleted successfully", &database_name);
}
}
SubCommand::LastCache(LastCacheConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table,
cache_name,
}) => {
client
.api_v3_configure_last_cache_delete(database_name, table, cache_name)
.await?;
println!("last cache deleted successfully");
}
SubCommand::MetaCache(MetaCacheConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table,
cache_name,
}) => {
client
.api_v3_configure_meta_cache_delete(database_name, table, cache_name)
.await?;
println!("meta cache deleted successfully");
}
SubCommand::Plugin(PluginConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
plugin_name,
}) => {
client
.api_v3_configure_processing_engine_plugin_delete(database_name, &plugin_name)
.await?;
println!("Plugin {} deleted successfully", plugin_name);
}
SubCommand::Table(TableConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table_name,
}) => {
println!(
"Are you sure you want to delete {:?}.{:?}? Enter 'yes' to confirm",
database_name, &table_name,
);
let mut confirmation = String::new();
let _ = io::stdin().read_line(&mut confirmation);
if confirmation.trim() != "yes" {
println!("Cannot delete table without confirmation");
} else {
client
.api_v3_configure_table_delete(&database_name, &table_name)
.await?;
println!(
"Table {:?}.{:?} deleted successfully",
&database_name, &table_name
);
}
}
SubCommand::Trigger(TriggerConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
trigger_name,
force,
}) => {
client
.api_v3_configure_processing_engine_trigger_delete(
database_name,
&trigger_name,
force,
)
.await?;
println!("Trigger {} deleted successfully", trigger_name);
}
}
Ok(())
}

View File

@ -1,127 +0,0 @@
use std::error::Error;
use secrecy::ExposeSecret;
use crate::commands::common::{InfluxDb3Config, SeparatedList};
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
#[clap(flatten)]
last_cache_config: LastCacheConfig,
}
#[derive(Debug, clap::Parser)]
pub struct LastCacheConfig {
/// The table name for which the cache is being created
#[clap(short = 't', long = "table")]
table: String,
/// Give a name for the cache.
#[clap(long = "cache-name")]
cache_name: Option<String>,
/// Which columns in the table to use as keys in the cache
#[clap(long = "key-columns")]
key_columns: Option<SeparatedList<String>>,
/// Which columns in the table to store as values in the cache
#[clap(long = "value-columns")]
value_columns: Option<SeparatedList<String>>,
/// The number of entries per unique key column combination the cache will store
#[clap(long = "count")]
count: Option<usize>,
/// The time-to-live (TTL) for entries in a cache in seconds
#[clap(long = "ttl")]
ttl: Option<u64>,
}
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let LastCacheConfig {
table,
cache_name,
key_columns,
value_columns,
count,
ttl,
..
} = config.last_cache_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
let mut b = client.api_v3_configure_last_cache_create(database_name, table);
// Add optional parameters:
if let Some(name) = cache_name {
b = b.name(name);
}
if let Some(keys) = key_columns {
b = b.key_columns(keys);
}
if let Some(vals) = value_columns {
b = b.value_columns(vals);
}
if let Some(count) = count {
b = b.count(count);
}
if let Some(ttl) = ttl {
b = b.ttl(ttl);
}
// Make the request:
match b.send().await? {
Some(def) => println!(
"new cache created: {}",
serde_json::to_string_pretty(&def).expect("serialize last cache definition as JSON")
),
None => println!("a cache already exists for the provided parameters"),
}
Ok(())
}
#[cfg(test)]
mod tests {
use clap::Parser;
use crate::commands::last_cache::create::LastCacheConfig;
#[test]
fn parse_args() {
let args = LastCacheConfig::parse_from([
"last_cache_create",
"--table",
"foo",
"--cache-name",
"bar",
"--key-columns",
"tag1,tag2,tag3",
"--value-columns",
"field1,field2,field3",
"--ttl",
"3600",
"--count",
"5",
]);
assert_eq!("foo", args.table);
assert!(args.cache_name.is_some_and(|n| n == "bar"));
assert!(args
.key_columns
.is_some_and(|keys| keys.0 == ["tag1", "tag2", "tag3"]));
assert!(args
.value_columns
.is_some_and(|vals| vals.0 == ["field1", "field2", "field3"]));
assert!(args.count.is_some_and(|c| c == 5));
assert!(args.ttl.is_some_and(|t| t == 3600));
}
}

View File

@ -1,38 +0,0 @@
use std::error::Error;
use secrecy::ExposeSecret;
use crate::commands::common::InfluxDb3Config;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The table under which the cache is being deleted
#[clap(short = 't', long = "table")]
table: String,
/// The name of the cache being deleted
#[clap(short = 'n', long = "cache-name")]
cache_name: String,
}
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client
.api_v3_configure_last_cache_delete(database_name, config.table, config.cache_name)
.await?;
println!("last cache deleted successfully");
Ok(())
}

View File

@ -1,25 +0,0 @@
use std::error::Error;
pub mod create;
pub mod delete;
#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
/// Create a new last-n-value cache
Create(create::Config),
/// Delete an existing last-n-value cache
Delete(delete::Config),
}
pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Create(config) => create::command(config).await,
Command::Delete(config) => delete::command(config).await,
}
}

View File

@ -1,70 +0,0 @@
use std::{error::Error, io};
use secrecy::ExposeSecret;
use crate::commands::common::InfluxDb3Config;
#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
Create(DatabaseConfig),
Delete(DatabaseConfig),
}
#[derive(Debug, clap::Parser)]
pub struct DatabaseConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Create(config) => {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client.api_v3_configure_db_create(&database_name).await?;
println!("Database {:?} created successfully", &database_name);
}
Command::Delete(config) => {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
println!(
"Are you sure you want to delete {:?}? Enter 'yes' to confirm",
database_name
);
let mut confirmation = String::new();
let _ = io::stdin().read_line(&mut confirmation);
if confirmation.trim() != "yes" {
println!("Cannot delete database without confirmation");
} else {
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client.api_v3_configure_db_delete(&database_name).await?;
println!("Database {:?} deleted successfully", &database_name);
}
}
}
Ok(())
}

View File

@ -1,2 +0,0 @@
pub mod database;
pub mod table;

View File

@ -1,159 +0,0 @@
use std::{error::Error, fmt::Display, io, str::FromStr};
use secrecy::ExposeSecret;
use crate::commands::common::InfluxDb3Config;
#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
Create(CreateTableConfig),
Delete(DeleteTableConfig),
}
#[derive(Debug, clap::Parser)]
pub struct DeleteTableConfig {
#[clap(short = 't', long = "table", required = true)]
table_name: String,
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
}
#[derive(Debug, clap::Parser)]
pub struct CreateTableConfig {
#[clap(short = 't', long = "table", required = true)]
table_name: String,
#[clap(long = "tags", required = true, num_args=0..)]
tags: Vec<String>,
#[clap(short = 'f', long = "fields", value_parser = parse_key_val::<String, DataType>, num_args=0..)]
fields: Vec<(String, DataType)>,
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Create(CreateTableConfig {
table_name,
tags,
fields,
influxdb3_config:
InfluxDb3Config {
host_url,
database_name,
auth_token,
},
}) => {
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client
.api_v3_configure_table_create(&database_name, &table_name, tags, fields)
.await?;
println!(
"Table {:?}.{:?} created successfully",
&database_name, &table_name
);
}
Command::Delete(config) => {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
println!(
"Are you sure you want to delete {:?}.{:?}? Enter 'yes' to confirm",
database_name, &config.table_name,
);
let mut confirmation = String::new();
let _ = io::stdin().read_line(&mut confirmation);
if confirmation.trim() != "yes" {
println!("Cannot delete table without confirmation");
} else {
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client
.api_v3_configure_table_delete(&database_name, &config.table_name)
.await?;
println!(
"Table {:?}.{:?} deleted successfully",
&database_name, &config.table_name
);
}
}
}
Ok(())
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DataType {
Int64,
Uint64,
Float64,
Utf8,
Bool,
}
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("{0} is not a valid data type, values are int64, uint64, float64, utf8, and bool")]
pub struct ParseDataTypeError(String);
impl FromStr for DataType {
type Err = ParseDataTypeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"int64" => Ok(Self::Int64),
"uint64" => Ok(Self::Uint64),
"float64" => Ok(Self::Float64),
"utf8" => Ok(Self::Utf8),
"bool" => Ok(Self::Bool),
_ => Err(ParseDataTypeError(s.into())),
}
}
}
impl Display for DataType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Int64 => write!(f, "int64"),
Self::Uint64 => write!(f, "uint64"),
Self::Float64 => write!(f, "float64"),
Self::Utf8 => write!(f, "utf8"),
Self::Bool => write!(f, "bool"),
}
}
}
impl From<DataType> for String {
fn from(data: DataType) -> Self {
data.to_string()
}
}
/// Parse a single key-value pair
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), Box<dyn Error + Send + Sync + 'static>>
where
T: std::str::FromStr,
T::Err: Error + Send + Sync + 'static,
U: std::str::FromStr,
U::Err: Error + Send + Sync + 'static,
{
let pos = s
.find(':')
.ok_or_else(|| format!("invalid FIELD:VALUE. No `:` found in `{s}`"))?;
Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
}

View File

@ -1,86 +0,0 @@
use std::{error::Error, num::NonZeroUsize};
use secrecy::ExposeSecret;
use crate::commands::common::{InfluxDb3Config, SeparatedList};
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
#[clap(flatten)]
meta_cache_config: MetaCacheConfig,
}
#[derive(Debug, clap::Parser)]
pub struct MetaCacheConfig {
/// The table name for which the cache is being created
#[clap(short = 't', long = "table")]
table: String,
/// Give the name of the cache.
///
/// This will be automatically generated if not provided
#[clap(long = "cache-name")]
cache_name: Option<String>,
/// Which columns in the table to cache distinct values for, as a comma-separated list of the
/// column names.
///
/// The cache is a hieararchical structure, with a level for each column specified; the order
/// specified here will determine the order of the levels from top-to-bottom of the cache
/// hierarchy.
#[clap(long = "columns")]
columns: SeparatedList<String>,
/// The maximum number of distinct value combinations to hold in the cache
#[clap(long = "max-cardinality")]
max_cardinality: Option<NonZeroUsize>,
/// The maximum age of an entry in the cache entered as a human-readable duration, e.g., "30d", "24h"
#[clap(long = "max-age")]
max_age: Option<humantime::Duration>,
}
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let MetaCacheConfig {
table,
cache_name,
columns,
max_cardinality,
max_age,
} = config.meta_cache_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns);
// Add the optional stuff:
if let Some(name) = cache_name {
b = b.name(name);
}
if let Some(max_cardinality) = max_cardinality {
b = b.max_cardinality(max_cardinality);
}
if let Some(max_age) = max_age {
b = b.max_age(max_age.into());
}
match b.send().await? {
Some(def) => println!(
"new cache created: {}",
serde_json::to_string_pretty(&def).expect("serialize meta cache definition as JSON")
),
None => println!("a cache already exists for the provided parameters"),
}
Ok(())
}

View File

@ -1,38 +0,0 @@
use std::error::Error;
use secrecy::ExposeSecret;
use crate::commands::common::InfluxDb3Config;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The table under which the cache is being deleted
#[clap(short = 't', long = "table")]
table: String,
/// The name of the cache being deleted
#[clap(short = 'n', long = "cache-name")]
cache_name: String,
}
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client
.api_v3_configure_meta_cache_delete(database_name, config.table, config.cache_name)
.await?;
println!("meta cache deleted successfully");
Ok(())
}

View File

@ -1,26 +0,0 @@
use std::error::Error;
pub mod create;
pub mod delete;
#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
/// Create a new metadata cache
Create(create::Config),
/// Delete a metadata cache
Delete(delete::Config),
}
pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Create(config) => create::command(config).await,
Command::Delete(config) => delete::command(config).await,
}
}

View File

@ -1,30 +0,0 @@
mod plugin;
mod trigger;
use std::error::Error;
#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
enum Command {
/// Manage plugins (create, delete, update, etc.)
Plugin(plugin::Config),
/// Manage triggers (create, delete, activate, deactivate, etc.)
Trigger(trigger::Config),
}
pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Plugin(plugin_config) => {
plugin::command(plugin_config).await?;
}
Command::Trigger(trigger_config) => {
trigger::command(trigger_config).await?;
}
}
Ok(())
}

View File

@ -1,97 +0,0 @@
use crate::commands::common::InfluxDb3Config;
use secrecy::ExposeSecret;
use std::error::Error;
use std::fs::File;
use std::io::Read;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
action: PluginAction,
}
#[derive(Debug, clap::Subcommand)]
enum PluginAction {
/// Create a new processing engine plugin
Create(CreateConfig),
/// Delete an existing processing engine plugin
Delete(DeleteConfig),
}
impl PluginAction {
pub fn get_influxdb3_config(&self) -> &InfluxDb3Config {
match self {
Self::Create(create) => &create.influxdb3_config,
Self::Delete(delete) => &delete.influxdb3_config,
}
}
}
#[derive(Debug, clap::Parser)]
struct CreateConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of the plugin to create
#[clap(long = "plugin-name")]
plugin_name: String,
/// Python file containing the plugin code
#[clap(long = "code-filename")]
code_file: String,
/// Entry point function for the plugin
#[clap(long = "entry-point")]
function_name: String,
/// Type of trigger the plugin processes
#[clap(long = "plugin-type", default_value = "wal_rows")]
plugin_type: String,
}
#[derive(Debug, clap::Parser)]
struct DeleteConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of the plugin to delete
#[clap(long = "plugin-name")]
plugin_name: String,
}
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let influxdb3_config = config.action.get_influxdb3_config();
let mut client = influxdb3_client::Client::new(influxdb3_config.host_url.clone())?;
if let Some(token) = &influxdb3_config.auth_token {
client = client.with_auth_token(token.expose_secret());
}
match config.action {
PluginAction::Create(create_config) => {
let code = open_file(&create_config.code_file)?;
client
.api_v3_configure_processing_engine_plugin_create(
create_config.influxdb3_config.database_name,
&create_config.plugin_name,
code,
create_config.function_name,
create_config.plugin_type,
)
.await?;
println!("Plugin {} created successfully", create_config.plugin_name);
}
PluginAction::Delete(delete_config) => {
client
.api_v3_configure_processing_engine_plugin_delete(
delete_config.influxdb3_config.database_name,
&delete_config.plugin_name,
)
.await?;
println!("Plugin {} deleted successfully", delete_config.plugin_name);
}
}
Ok(())
}
fn open_file(file: &str) -> Result<String, Box<dyn Error>> {
let mut file = File::open(file)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}

View File

@ -1,146 +0,0 @@
use crate::commands::common::InfluxDb3Config;
use influxdb3_client::Client;
use influxdb3_wal::TriggerSpecificationDefinition;
use secrecy::ExposeSecret;
use std::error::Error;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
action: TriggerAction,
}
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
let influxdb3_config = match &self.action {
TriggerAction::Create(create) => &create.influxdb3_config,
TriggerAction::Activate(manage) | TriggerAction::Deactivate(manage) => {
&manage.influxdb3_config
}
TriggerAction::Delete(delete) => &delete.influxdb3_config,
};
let mut client = Client::new(influxdb3_config.host_url.clone())?;
if let Some(token) = &influxdb3_config.auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
}
#[derive(Debug, clap::Subcommand)]
enum TriggerAction {
/// Create a new trigger using an existing plugin
Create(CreateConfig),
/// Activate a trigger to enable plugin execution
Activate(ManageConfig),
/// Deactivate a trigger to disable plugin execution
Deactivate(ManageConfig),
/// Delete a trigger
Delete(DeleteConfig),
}
#[derive(Debug, clap::Parser)]
struct CreateConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name for the new trigger
#[clap(long = "trigger-name")]
trigger_name: String,
/// Plugin to execute when trigger fires
#[clap(long = "plugin-name")]
plugin_name: String,
/// When the trigger should fire
#[clap(long = "trigger-spec",
value_parser = TriggerSpecificationDefinition::from_string_rep,
help = "Trigger specification format: 'table:<TABLE_NAME>' or 'all_tables'")]
trigger_specification: TriggerSpecificationDefinition,
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
}
#[derive(Debug, clap::Parser)]
struct ManageConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of trigger to manage
#[clap(long = "trigger-name")]
trigger_name: String,
}
#[derive(Debug, clap::Parser)]
struct DeleteConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of trigger to delete
#[clap(long = "trigger-name")]
trigger_name: String,
/// Force deletion even if trigger is active
#[clap(long)]
force: bool,
}
// [Previous CreateConfig and ManageConfig structs remain unchanged]
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
match config.action {
TriggerAction::Create(create_config) => {
client
.api_v3_configure_processing_engine_trigger_create(
create_config.influxdb3_config.database_name,
&create_config.trigger_name,
&create_config.plugin_name,
create_config.trigger_specification.string_rep(),
create_config.disabled,
)
.await?;
println!(
"Trigger {} created successfully",
create_config.trigger_name
);
}
TriggerAction::Activate(manage_config) => {
client
.api_v3_configure_processing_engine_trigger_activate(
manage_config.influxdb3_config.database_name,
&manage_config.trigger_name,
)
.await?;
println!(
"Trigger {} activated successfully",
manage_config.trigger_name
);
}
TriggerAction::Deactivate(manage_config) => {
client
.api_v3_configure_processing_engine_trigger_deactivate(
manage_config.influxdb3_config.database_name,
&manage_config.trigger_name,
)
.await?;
println!(
"Trigger {} deactivated successfully",
manage_config.trigger_name
);
}
TriggerAction::Delete(delete_config) => {
client
.api_v3_configure_processing_engine_trigger_delete(
delete_config.influxdb3_config.database_name,
&delete_config.trigger_name,
delete_config.force,
)
.await?;
println!(
"Trigger {} deleted successfully",
delete_config.trigger_name
);
}
}
Ok(())
}

View File

@ -35,14 +35,14 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Parser)]
#[clap(visible_alias = "q", trailing_var_arg = true)]
pub struct Config {
/// Common InfluxDB 3.0 config
/// Common InfluxDB 3 Core config
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// The query language used to format the provided query string
#[clap(
value_enum,
long = "lang",
long = "language",
short = 'l',
default_value_t = QueryLanguage::Sql,
)]
@ -50,9 +50,9 @@ pub struct Config {
/// The format in which to output the query
///
/// If `--fmt` is set to `parquet`, then you must also specify an output
/// If `--format` is set to `parquet`, then you must also specify an output
/// file path with `--output`.
#[clap(value_enum, long = "fmt", default_value = "pretty")]
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,
/// Put all query output into `output`

View File

@ -1,18 +1,19 @@
//! Entrypoint for InfluxDB 3.0 OSS Server
//! Entrypoint for InfluxDB 3 Core Server
use anyhow::{bail, Context};
use clap_blocks::{
memory_size::MemorySize,
object_store::{ObjectStoreConfig, ObjectStoreType},
socket_addr::SocketAddr,
};
use datafusion_util::config::register_iox_object_store;
use influxdb3_cache::{
last_cache::{self, LastCacheProvider},
meta_cache::MetaCacheProvider,
parquet_cache::create_cached_obj_store_and_oracle,
};
use influxdb3_clap_blocks::{datafusion::IoxQueryDatafusionConfig, tokio::TokioDatafusionConfig};
use influxdb3_clap_blocks::{
datafusion::IoxQueryDatafusionConfig,
memory_size::MemorySize,
object_store::{ObjectStoreConfig, ObjectStoreType},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
};
use influxdb3_process::{
build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID,
};
@ -62,7 +63,7 @@ pub const DEFAULT_TELMETRY_ENDPOINT: &str =
#[derive(Debug, Error)]
pub enum Error {
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
ObjectStoreParsing(#[from] influxdb3_clap_blocks::object_store::ParseError),
#[error("Tracing config error: {0}")]
TracingConfig(#[from] trace_exporters::Error),

View File

@ -0,0 +1,96 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use influxdb3_client::plugin_development::WalPluginTestRequest;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use std::collections::HashMap;
use std::error::Error;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
cmd: SubCommand,
}
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
match &self.cmd {
SubCommand::WalPlugin(WalPluginConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
}) => {
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
}
}
}
#[derive(Debug, clap::Subcommand)]
pub enum SubCommand {
/// Test a WAL Plugin
#[clap(name = "wal_plugin")]
WalPlugin(WalPluginConfig),
}
#[derive(Debug, clap::Parser)]
pub struct WalPluginConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// If given, pass this line protocol as input
#[clap(long = "lp")]
pub input_lp: Option<String>,
/// If given, pass this file of LP as input from on the server `<plugin-dir>/<name>_test/<input-file>`
#[clap(long = "file")]
pub input_file: Option<String>,
/// If given pass this map of string key/value pairs as input arguments
#[clap(long = "input-arguments")]
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>,
/// The name of the plugin, which should match its file name on the server `<plugin-dir>/<name>.py`
#[clap(required = true)]
pub name: String,
}
impl From<WalPluginConfig> for WalPluginTestRequest {
fn from(val: WalPluginConfig) -> Self {
let input_arguments = val.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});
Self {
name: val.name,
input_lp: val.input_lp,
input_file: val.input_file,
input_arguments,
}
}
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
match config.cmd {
SubCommand::WalPlugin(plugin_config) => {
let wal_plugin_test_request: WalPluginTestRequest = plugin_config.into();
let response = client.wal_plugin_test(wal_plugin_test_request).await?;
println!(
"{}",
serde_json::to_string_pretty(&response)
.expect("serialize wal plugin test response as JSON")
);
}
}
Ok(())
}

View File

@ -1,45 +0,0 @@
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
use base64::Engine as _;
use rand::rngs::OsRng;
use rand::RngCore;
use sha2::Digest;
use sha2::Sha512;
use std::error::Error;
use std::str;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
cmd: SubCommand,
}
#[derive(Debug, clap::Parser)]
pub enum SubCommand {
/// Create a new auth token
Create,
}
pub fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.cmd {
SubCommand::Create => {
let token = {
let mut token = String::from("apiv3_");
let mut key = [0u8; 64];
OsRng.fill_bytes(&mut key);
token.push_str(&B64.encode(key));
token
};
println!(
"\
Token: {token}\n\
Hashed Token: {hashed}\n\n\
Start the server with `influxdb3 serve --bearer-token {hashed}`\n\n\
HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\
This will grant you access to every HTTP endpoint or deny it otherwise
",
hashed = hex::encode(&Sha512::digest(&token)[..])
);
}
}
Ok(())
}

View File

@ -21,7 +21,7 @@ pub(crate) type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Parser)]
#[clap(visible_alias = "w", trailing_var_arg = true)]
pub struct Config {
/// Common InfluxDB 3.0 config
/// Common InfluxDB 3 Core config
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

View File

@ -21,15 +21,14 @@ use trogging::{
};
mod commands {
pub mod activate;
pub(crate) mod common;
pub mod last_cache;
pub mod manage;
pub mod meta_cache;
pub mod plugin_test;
pub mod processing_engine;
pub mod create;
pub mod deactivate;
pub mod delete;
pub mod query;
pub mod serve;
pub mod token;
pub mod test;
pub mod write;
}
@ -44,25 +43,29 @@ version = &VERSION_STRING[..],
disable_help_flag = true,
arg(
clap::Arg::new("help")
.short('h')
.long("help")
.help("Print help information")
.action(clap::ArgAction::Help)
.global(true)
),
about = "InfluxDB 3.0 OSS server and command line tools",
long_about = r#"InfluxDB 3.0 OSS server and command line tools
about = "InfluxDB 3 Core server and command line tools",
long_about = r#"InfluxDB 3 Core server and command line tools
Examples:
# Run the InfluxDB 3.0 OSS server
# Run the InfluxDB 3 Core server
influxdb3 serve --object-store file --data-dir ~/.influxdb3 --host_id my_host_name
# Display all commands
# Display all commands short form
influxdb3 -h
# Display all commands long form
influxdb3 --help
# Run the InfluxDB 3.0 OSS server with extra verbose logging
# Run the InfluxDB 3 Core server with extra verbose logging
influxdb3 serve -v --object-store file --data-dir ~/.influxdb3 --host_id my_host_name
# Run InfluxDB 3.0 OSS with full debug logging specified with LOG_FILTER
# Run InfluxDB 3 Core with full debug logging specified with LOG_FILTER
LOG_FILTER=debug influxdb3 serve --object-store file --data-dir ~/.influxdb3 --host_id my_host_name
"#
)]
@ -80,35 +83,29 @@ struct Config {
#[derive(Debug, clap::Parser)]
#[allow(clippy::large_enum_variant)]
enum Command {
/// Run the InfluxDB 3.0 server
Serve(commands::serve::Config),
/// Activate a resource such as a trigger
Activate(commands::activate::Config),
/// Perform a query against a running InfluxDB 3.0 server
/// Create a resource such as a database or auth token
Create(commands::create::Config),
/// Deactivate a resource such as a trigger
Deactivate(commands::deactivate::Config),
/// Delete a resource such as a database or table
Delete(commands::delete::Config),
/// Perform a query against a running InfluxDB 3 Core server
Query(commands::query::Config),
/// Perform a set of writes to a running InfluxDB 3.0 server
/// Run the InfluxDB 3 Core server
Serve(commands::serve::Config),
/// Test things, such as plugins, work the way you expect
Test(commands::test::Config),
/// Perform a set of writes to a running InfluxDB 3 Core server
Write(commands::write::Config),
/// Manage tokens for your InfluxDB 3.0 server
Token(commands::token::Config),
/// Manage last-n-value caches
LastCache(commands::last_cache::Config),
/// Manage metadata caches
MetaCache(commands::meta_cache::Config),
/// Manage processing engine plugins and triggers
ProcessingEngine(commands::processing_engine::Config),
/// Manage database (delete only for the moment)
Database(commands::manage::database::Config),
/// Manage table (delete only for the moment)
Table(commands::manage::table::Config),
/// Test Python plugins for processing WAL writes, persistence Snapshots, requests, or scheduled tasks.
PluginTest(commands::plugin_test::Config),
}
fn main() -> Result<(), std::io::Error> {
@ -134,7 +131,31 @@ fn main() -> Result<(), std::io::Error> {
}
match config.command {
None => println!("command required, --help for help"),
None => println!("command required, -h/--help for help"),
Some(Command::Activate(config)) => {
if let Err(e) = commands::activate::command(config).await {
eprintln!("Activate command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Create(config)) => {
if let Err(e) = commands::create::command(config).await {
eprintln!("Create command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Deactivate(config)) => {
if let Err(e) = commands::deactivate::command(config).await {
eprintln!("Deactivate command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Delete(config)) => {
if let Err(e) = commands::delete::command(config).await {
eprintln!("Delete command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Serve(config)) => {
let _tracing_guard =
handle_init_logs(init_logs_and_tracing(&config.logging_config));
@ -143,6 +164,12 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Test(config)) => {
if let Err(e) = commands::test::command(config).await {
eprintln!("Test command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Query(config)) => {
if let Err(e) = commands::query::command(config).await {
eprintln!("Query command failed: {e}");
@ -155,48 +182,6 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Token(config)) => {
if let Err(e) = commands::token::command(config) {
eprintln!("Token command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::LastCache(config)) => {
if let Err(e) = commands::last_cache::command(config).await {
eprintln!("Last Cache command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::MetaCache(config)) => {
if let Err(e) = commands::meta_cache::command(config).await {
eprintln!("Metadata Cache command faild: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::ProcessingEngine(config)) => {
if let Err(e) = commands::processing_engine::command(config).await {
eprintln!("Processing engine command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Database(config)) => {
if let Err(e) = commands::manage::database::command(config).await {
eprintln!("Database command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Table(config)) => {
if let Err(e) = commands::manage::table::command(config).await {
eprintln!("Table command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::PluginTest(config)) => {
if let Err(e) = commands::plugin_test::command(config).await {
eprintln!("Plugin Test command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
}
});

View File

@ -91,14 +91,7 @@ async fn test_create_database() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
let result = run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
let result = run_with_confirmation(&["create", "database", db_name, "--host", &server_addr]);
debug!(result = ?result, "create database");
assert_contains!(&result, "Database \"foo\" created successfully");
}
@ -110,26 +103,13 @@ async fn test_create_database_limit() {
let db_name = "foo";
for i in 0..5 {
let name = format!("{db_name}{i}");
let result = run_with_confirmation(&[
"database",
"create",
"--dbname",
&name,
"--host",
&server_addr,
]);
let result = run_with_confirmation(&["create", "database", &name, "--host", &server_addr]);
debug!(result = ?result, "create database");
assert_contains!(&result, format!("Database \"{name}\" created successfully"));
}
let result = run_with_confirmation_and_err(&[
"database",
"create",
"--dbname",
"foo5",
"--host",
&server_addr,
]);
let result =
run_with_confirmation_and_err(&["create", "database", "foo5", "--host", &server_addr]);
debug!(result = ?result, "create database");
assert_contains!(
&result,
@ -150,14 +130,7 @@ async fn test_delete_database() {
)
.await
.expect("write to db");
let result = run_with_confirmation(&[
"database",
"delete",
"--dbname",
db_name,
"--host",
&server_addr,
]);
let result = run_with_confirmation(&["delete", "database", db_name, "--host", &server_addr]);
debug!(result = ?result, "delete database");
assert_contains!(&result, "Database \"foo\" deleted successfully");
}
@ -167,14 +140,8 @@ async fn test_delete_missing_database() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
let result = run_with_confirmation_and_err(&[
"database",
"delete",
"--dbname",
db_name,
"--host",
&server_addr,
]);
let result =
run_with_confirmation_and_err(&["delete", "database", db_name, "--host", &server_addr]);
debug!(result = ?result, "delete missing database");
assert_contains!(&result, "404");
}
@ -185,23 +152,15 @@ async fn test_create_table() {
let server_addr = server.client_addr();
let db_name = "foo";
let table_name = "bar";
let result = run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
let result = run_with_confirmation(&["create", "database", db_name, "--host", &server_addr]);
debug!(result = ?result, "create database");
assert_contains!(&result, "Database \"foo\" created successfully");
let result = run_with_confirmation(&[
"table",
"create",
"--dbname",
db_name,
"--table",
"table",
table_name,
"--database",
db_name,
"--host",
&server_addr,
"--tags",
@ -279,12 +238,11 @@ async fn test_delete_table() {
.await
.expect("write to db");
let result = run_with_confirmation(&[
"table",
"delete",
"--dbname",
db_name,
"--table",
"table",
table_name,
"--database",
db_name,
"--host",
&server_addr,
]);
@ -308,12 +266,11 @@ async fn test_delete_missing_table() {
.expect("write to db");
let result = run_with_confirmation_and_err(&[
"table",
"delete",
"--dbname",
db_name,
"--table",
"table",
"cpu",
"--database",
db_name,
"--host",
&server_addr,
]);
@ -338,34 +295,32 @@ async fn test_create_delete_meta_cache() {
let cache_name = "baz";
// first create the cache:
let result = run(&[
"meta-cache",
"create",
"meta_cache",
"--host",
&server_addr,
"--dbname",
"--database",
db_name,
"--table",
table_name,
"--cache-name",
cache_name,
"--columns",
"t1,t2",
cache_name,
]);
assert_contains!(&result, "new cache created");
// doing the same thing over again will be a no-op
let result = run(&[
"meta-cache",
"create",
"meta_cache",
"--host",
&server_addr,
"--dbname",
"--database",
db_name,
"--table",
table_name,
"--cache-name",
cache_name,
"--columns",
"t1,t2",
cache_name,
]);
assert_contains!(
&result,
@ -373,29 +328,27 @@ async fn test_create_delete_meta_cache() {
);
// now delete it:
let result = run(&[
"meta-cache",
"delete",
"meta_cache",
"--host",
&server_addr,
"--dbname",
"--database",
db_name,
"--table",
table_name,
"--cache-name",
cache_name,
]);
assert_contains!(&result, "meta cache deleted successfully");
// trying to delete again should result in an error as the cache no longer exists:
let result = run_and_err(&[
"meta-cache",
"delete",
"meta_cache",
"--host",
&server_addr,
"--dbname",
"--database",
db_name,
"--table",
table_name,
"--cache-name",
cache_name,
]);
assert_contains!(&result, "[404 Not Found]: cache not found");
@ -408,14 +361,7 @@ async fn test_create_plugin() {
let plugin_name = "test_plugin";
// Create database first
let result = run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
let result = run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
assert_contains!(&result, "Database \"foo\" created successfully");
// Create plugin file
@ -428,19 +374,17 @@ def process_rows(iterator, output):
// Create plugin
let result = run_with_confirmation(&[
"processing-engine",
"plugin",
"create",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
debug!(result = ?result, "create plugin");
assert_contains!(&result, "Plugin test_plugin created successfully");
@ -454,14 +398,7 @@ async fn test_delete_plugin() {
let plugin_name = "test_plugin";
// Setup: create database and plugin
run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_file = create_plugin_file(
r#"
@ -471,31 +408,27 @@ def process_rows(iterator, output):
);
run_with_confirmation(&[
"processing-engine",
"plugin",
"create",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
// Delete plugin
let result = run_with_confirmation(&[
"processing-engine",
"plugin",
"delete",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
]);
debug!(result = ?result, "delete plugin");
@ -511,14 +444,7 @@ async fn test_create_trigger() {
let trigger_name = "test_trigger";
// Setup: create database and plugin
run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_file = create_plugin_file(
r#"
@ -528,36 +454,32 @@ def process_rows(iterator, output):
);
run_with_confirmation(&[
"processing-engine",
"plugin",
"create",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
// Create trigger
let result = run_with_confirmation(&[
"processing-engine",
"trigger",
"create",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
"--plugin-name",
"--plugin",
plugin_name,
"--trigger-spec",
"all_tables",
trigger_name,
]);
debug!(result = ?result, "create trigger");
assert_contains!(&result, "Trigger test_trigger created successfully");
@ -572,14 +494,7 @@ async fn test_trigger_activation() {
let trigger_name = "test_trigger";
// Setup: create database, plugin, and trigger
run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_file = create_plugin_file(
r#"
@ -589,47 +504,41 @@ def process_rows(iterator, output):
);
run_with_confirmation(&[
"processing-engine",
"plugin",
"create",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
run_with_confirmation(&[
"processing-engine",
"trigger",
"create",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
"--plugin-name",
"--plugin",
plugin_name,
"--trigger-spec",
"all_tables",
trigger_name,
]);
// Test activation
let result = run_with_confirmation(&[
"processing-engine",
"trigger",
"activate",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
]);
debug!(result = ?result, "activate trigger");
@ -637,14 +546,12 @@ def process_rows(iterator, output):
// Test deactivation
let result = run_with_confirmation(&[
"processing-engine",
"trigger",
"deactivate",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
]);
debug!(result = ?result, "deactivate trigger");
@ -660,14 +567,7 @@ async fn test_delete_active_trigger() {
let trigger_name = "test_trigger";
// Setup: create database, plugin, and active trigger
run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_file = create_plugin_file(
r#"
@ -677,59 +577,51 @@ def process_rows(iterator, output):
);
run_with_confirmation(&[
"processing-engine",
"plugin",
"create",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
run_with_confirmation(&[
"processing-engine",
"trigger",
"create",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
"--plugin-name",
"--plugin",
plugin_name,
"--trigger-spec",
"all_tables",
trigger_name,
]);
run_with_confirmation(&[
"processing-engine",
"trigger",
"activate",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
]);
// Try to delete active trigger without force flag
let result = run_with_confirmation_and_err(&[
"processing-engine",
"trigger",
"delete",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
]);
debug!(result = ?result, "delete active trigger without force");
@ -737,14 +629,12 @@ def process_rows(iterator, output):
// Delete active trigger with force flag
let result = run_with_confirmation(&[
"processing-engine",
"trigger",
"delete",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
"--force",
]);
@ -762,28 +652,20 @@ async fn test_table_specific_trigger() {
let trigger_name = "test_trigger";
// Setup: create database, table, and plugin
run_with_confirmation(&[
"database",
"create",
"--dbname",
db_name,
"--host",
&server_addr,
]);
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
run_with_confirmation(&[
"table",
"create",
"--dbname",
"table",
"--database",
db_name,
"--table",
table_name,
"--host",
&server_addr,
"--tags",
"tag1",
"--fields",
"field1:float64",
table_name,
]);
let plugin_file = create_plugin_file(
@ -794,36 +676,32 @@ def process_rows(iterator, output):
);
run_with_confirmation(&[
"processing-engine",
"plugin",
"create",
"--dbname",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-name",
plugin_name,
"--code-filename",
plugin_file.path().to_str().unwrap(),
"--entry-point",
"process_rows",
plugin_name,
]);
// Create table-specific trigger
let result = run_with_confirmation(&[
"processing-engine",
"trigger",
"create",
"--dbname",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--trigger-name",
trigger_name,
"--plugin-name",
"--plugin",
plugin_name,
"--trigger-spec",
&format!("table:{}", table_name),
trigger_name,
]);
debug!(result = ?result, "create table-specific trigger");
assert_contains!(&result, "Trigger test_trigger created successfully");

View File

@ -188,11 +188,11 @@ impl Serialize for Catalog {
}
impl Catalog {
/// Limit for the number of Databases that InfluxDB 3.0 OSS can have
/// Limit for the number of Databases that InfluxDB 3 Core OSS can have
pub(crate) const NUM_DBS_LIMIT: usize = 5;
/// Limit for the number of columns per table that InfluxDB 3.0 OSS can have
/// Limit for the number of columns per table that InfluxDB 3 Core OSS can have
pub(crate) const NUM_COLUMNS_PER_TABLE_LIMIT: usize = 500;
/// Limit for the number of tables across all DBs that InfluxDB 3.0 OSS can have
/// Limit for the number of tables across all DBs that InfluxDB 3 Core OSS can have
pub(crate) const NUM_TABLES_LIMIT: usize = 2000;
pub fn new(host_id: Arc<str>, instance_id: Arc<str>) -> Self {

View File

@ -11,16 +11,38 @@ iox_query.workspace = true
observability_deps.workspace = true
# crates.io dependencies
async-trait.workspace = true
clap.workspace = true
datafusion.workspace = true
http.workspace = true
# object store crate uses the new version of the http crate
http_1 = { version = "1.1", package = "http" }
humantime.workspace = true
iox_catalog.workspace = true
iox_time.workspace = true
itertools.workspace = true
libc.workspace = true
metric.workspace = true
non-empty-string.workspace = true
object_store.workspace = true
paste.workspace = true
snafu.workspace = true
sysinfo.workspace = true
tokio.workspace = true
trace_exporters.workspace = true
trogging.workspace = true
url.workspace = true
[dev-dependencies]
tempfile.workspace = true
test_helpers.workspace = true
futures.workspace = true
test-log.workspace = true
[lints]
workspace = true
[features]
azure = ["object_store/azure"]
gcp = ["object_store/gcp"]
aws = ["object_store/aws"]

View File

@ -1,4 +1,7 @@
//! Configuration options for the `influxdb3` CLI which uses the `clap` crate
pub mod datafusion;
pub mod memory_size;
pub mod object_store;
pub mod socket_addr;
pub mod tokio;

View File

@ -0,0 +1,138 @@
//! Helper types to express memory size.
use std::{str::FromStr, sync::OnceLock};
use observability_deps::tracing::info;
use sysinfo::System;
/// Memory size.
///
/// # Parsing
/// This can be parsed from strings in one of the following formats:
///
/// - **absolute:** just use a non-negative number to specify the absolute
/// bytes, e.g. `1024`
/// - **relative:** use percentage between 0 and 100 (both inclusive) to specify
/// a relative amount of the totally available memory size, e.g. `50%`
///
/// # Limits
///
/// Memory limits are read from the following, stopping when a valid value is
/// found:
///
/// - `/sys/fs/cgroup/memory/memory.limit_in_bytes` (cgroup)
/// - `/sys/fs/cgroup/memory.max` (cgroup2)
/// - Platform specific syscall (infallible)
///
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct MemorySize(usize);
impl MemorySize {
/// Number of bytes.
pub fn bytes(&self) -> usize {
self.0
}
}
impl std::fmt::Debug for MemorySize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::fmt::Display for MemorySize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for MemorySize {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.strip_suffix('%') {
Some(s) => {
let percentage = u64::from_str(s).map_err(|e| e.to_string())?;
if percentage > 100 {
return Err(format!(
"relative memory size must be in [0, 100] but is {percentage}"
));
}
let total = total_mem_bytes();
let bytes = (percentage as f64 / 100f64 * total as f64).round() as usize;
Ok(Self(bytes))
}
None => {
let bytes = usize::from_str(s).map_err(|e| e.to_string())?;
Ok(Self(bytes))
}
}
}
}
/// Totally available memory size in bytes.
pub fn total_mem_bytes() -> usize {
// Keep this in a global state so that we only need to inspect the system once during IOx startup.
static TOTAL_MEM_BYTES: OnceLock<usize> = OnceLock::new();
*TOTAL_MEM_BYTES.get_or_init(get_memory_limit)
}
/// Resolve the amount of memory available to this process.
///
/// This attempts to find a cgroup limit first, before falling back to the
/// amount of system RAM available.
fn get_memory_limit() -> usize {
let mut sys = System::new();
sys.refresh_memory();
let limit = sys
.cgroup_limits()
.map(|v| v.total_memory)
.unwrap_or_else(|| sys.total_memory()) as usize;
info!(%limit, "detected process memory available");
limit
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse() {
assert_ok("0", 0);
assert_ok("1", 1);
assert_ok("1024", 1024);
assert_ok("0%", 0);
assert_gt_zero("50%");
assert_err("-1", "invalid digit found in string");
assert_err("foo", "invalid digit found in string");
assert_err("-1%", "invalid digit found in string");
assert_err(
"101%",
"relative memory size must be in [0, 100] but is 101",
);
}
#[track_caller]
fn assert_ok(s: &'static str, expected: usize) {
let parsed: MemorySize = s.parse().unwrap();
assert_eq!(parsed.bytes(), expected);
}
#[track_caller]
fn assert_gt_zero(s: &'static str) {
let parsed: MemorySize = s.parse().unwrap();
assert!(parsed.bytes() > 0);
}
#[track_caller]
fn assert_err(s: &'static str, expected: &'static str) {
let err = MemorySize::from_str(s).unwrap_err();
assert_eq!(err, expected);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,77 @@
//! Config for socket addresses.
use std::{net::ToSocketAddrs, ops::Deref};
/// Parsable socket address.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SocketAddr(std::net::SocketAddr);
impl Deref for SocketAddr {
type Target = std::net::SocketAddr;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::fmt::Display for SocketAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::str::FromStr for SocketAddr {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_socket_addrs() {
Ok(mut addrs) => {
if let Some(addr) = addrs.next() {
Ok(Self(addr))
} else {
Err(format!("Found no addresses for '{s}'"))
}
}
Err(e) => Err(format!("Cannot parse socket address '{s}': {e}")),
}
}
}
impl From<SocketAddr> for std::net::SocketAddr {
fn from(addr: SocketAddr) -> Self {
addr.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
str::FromStr,
};
#[test]
fn test_socketaddr() {
let addr: std::net::SocketAddr = SocketAddr::from_str("127.0.0.1:1234").unwrap().into();
assert_eq!(addr, std::net::SocketAddr::from(([127, 0, 0, 1], 1234)),);
let addr: std::net::SocketAddr = SocketAddr::from_str("localhost:1234").unwrap().into();
// depending on where the test runs, localhost will either resolve to a ipv4 or
// an ipv6 addr.
match addr {
std::net::SocketAddr::V4(so) => {
assert_eq!(so, SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234))
}
std::net::SocketAddr::V6(so) => assert_eq!(
so,
SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 1234, 0, 0)
),
};
assert_eq!(
SocketAddr::from_str("!@INv_a1d(ad0/resp_!").unwrap_err(),
"Cannot parse socket address '!@INv_a1d(ad0/resp_!': invalid socket address",
);
}
}

View File

@ -177,7 +177,7 @@ macro_rules! tokio_rt_config {
let thread_counter = Arc::new(AtomicUsize::new(1));
let name = name.to_owned();
builder.thread_name_fn(move || {
format!("InfluxDB 3.0 Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst))
format!("InfluxDB 3 Core Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst))
});
// worker thread count
@ -286,7 +286,7 @@ mod tests {
.builder()
.unwrap(),
|| {
assert_thread_name("InfluxDB 3.0 Tokio IO");
assert_thread_name("InfluxDB 3 Core Tokio IO");
},
);
assert_runtime_thread_property(
@ -294,7 +294,7 @@ mod tests {
.builder()
.unwrap(),
|| {
assert_thread_name("InfluxDB 3.0 Tokio Datafusion");
assert_thread_name("InfluxDB 3 Core Tokio Datafusion");
},
);
assert_runtime_thread_property(
@ -302,7 +302,7 @@ mod tests {
.builder_with_name("foo")
.unwrap(),
|| {
assert_thread_name("InfluxDB 3.0 Tokio foo");
assert_thread_name("InfluxDB 3 Core Tokio foo");
},
);
}

View File

@ -67,12 +67,12 @@ impl Error {
pub type Result<T> = std::result::Result<T, Error>;
/// The InfluxDB 3.0 Client
/// The InfluxDB 3 Core Client
///
/// For programmatic access to the HTTP API of InfluxDB 3.0
/// For programmatic access to the HTTP API of InfluxDB 3 Core
#[derive(Debug, Clone)]
pub struct Client {
/// The base URL for making requests to a running InfluxDB 3.0 server
/// The base URL for making requests to a running InfluxDB 3 Core server
base_url: Url,
/// The `Bearer` token to use for authenticating on each request to the server
auth_token: Option<Secret<String>>,

View File

@ -14,7 +14,7 @@ use crate::{
#[derive(Debug, Parser)]
pub(crate) struct InfluxDb3Config {
/// The host URL of the running InfluxDB 3.0 server
/// The host URL of the running InfluxDB 3 Core server
#[clap(
short = 'h',
long = "host",
@ -32,7 +32,7 @@ pub(crate) struct InfluxDb3Config {
)]
pub(crate) database_name: String,
/// The token for authentication with the InfluxDB 3.0 server
/// The token for authentication with the InfluxDB 3 Core server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub(crate) auth_token: Option<Secret<String>>,

View File

@ -9,7 +9,7 @@ use super::{common::InfluxDb3Config, query::QueryConfig, write::WriteConfig};
#[derive(Debug, Parser)]
pub(crate) struct Config {
/// Common InfluxDB 3.0 config
/// Common InfluxDB 3 Core config
#[clap(flatten)]
common: InfluxDb3Config,

View File

@ -18,7 +18,7 @@ use super::common::InfluxDb3Config;
#[derive(Debug, Parser)]
#[clap(visible_alias = "q", trailing_var_arg = true)]
pub(crate) struct Config {
/// Common InfluxDB 3.0 config
/// Common InfluxDB 3 Core config
#[clap(flatten)]
common: InfluxDb3Config,

View File

@ -19,7 +19,7 @@ use super::common::InfluxDb3Config;
#[derive(Debug, Parser)]
#[clap(visible_alias = "w", trailing_var_arg = true)]
pub(crate) struct Config {
/// Common InfluxDB 3.0 config
/// Common InfluxDB 3 Core config
#[clap(flatten)]
common: InfluxDb3Config,

View File

@ -45,8 +45,8 @@ clap::Arg::new("help")
.action(clap::ArgAction::Help)
.global(true)
),
about = "InfluxDB 3.0 Load Generator for writes and queries",
long_about = r#"InfluxDB 3.0 Load Generator for writes and queries
about = "InfluxDB 3 Core Load Generator for writes and queries",
long_about = r#"InfluxDB 3 Core Load Generator for writes and queries
Examples:
# Run the write load generator
@ -76,13 +76,13 @@ struct Config {
#[derive(Debug, clap::Parser)]
#[allow(clippy::large_enum_variant)]
enum Command {
/// Perform a query against a running InfluxDB 3.0 server
/// Perform a query against a running InfluxDB 3 Core server
Query(commands::query::Config),
/// Perform a set of writes to a running InfluxDB 3.0 server
/// Perform a set of writes to a running InfluxDB 3 Core server
Write(commands::write::Config),
/// Perform both writes and queries against a running InfluxDB 3.0 server
/// Perform both writes and queries against a running InfluxDB 3 Core server
Full(commands::full::Config),
}

View File

@ -1,4 +1,4 @@
//! InfluxDB 3.0 OSS server implementation
//! InfluxDB 3 Core server implementation
//!
//! The server is responsible for handling the HTTP API
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]

View File

@ -1,4 +1,4 @@
//! This crate provides a Write Ahead Log (WAL) for InfluxDB 3.0. The WAL is used to buffer writes
//! This crate provides a Write Ahead Log (WAL) for InfluxDB 3 Core. The WAL is used to buffer writes
//! in memory and persist them as individual files in an object store. The WAL is used to make
//! writes durable until they can be written in larger batches as Parquet files and other snapshot and
//! index files in object storage.