From 6cc2f03ea00d77b701a769d54fbad3e98d27349c Mon Sep 17 00:00:00 2001 From: Phil Bracikowski <13472206+philjb@users.noreply.github.com> Date: Fri, 31 Mar 2023 07:59:24 -0700 Subject: [PATCH] fix: default the write cli comment to the http default port. (#7394) * fix: default the write cli command to the http default port. The all-in-one write api is based on influxdb cloud's v2 http api, which uses the 8080 http default port. This changeset will allow 'influxdb_iox write' to work against default influxdb_iox all-in-one without needing to use the --host option to change the port. It should not change behavior for existing users of `--host`. It adds a new configuartion option call `--http-host` to set the http port separately from the gRPC one. * fix: fmt --- influxdb_iox/src/commands/write.rs | 4 +- influxdb_iox/src/main.rs | 85 +++++++++++++++++-------- influxdb_iox_client/README.md | 40 +++--------- influxdb_iox_client/src/client/write.rs | 6 +- 4 files changed, 71 insertions(+), 64 deletions(-) diff --git a/influxdb_iox/src/commands/write.rs b/influxdb_iox/src/commands/write.rs index 02fd0cddee..8c3fb352e6 100644 --- a/influxdb_iox/src/commands/write.rs +++ b/influxdb_iox/src/commands/write.rs @@ -50,7 +50,7 @@ pub type Result = std::result::Result; /// Write data into the specified namespace #[derive(Debug, clap::Parser)] pub struct Config { - /// If specified, restricts the maxium amount of line protocol + /// If specified, restricts the maximum amount of line protocol /// sent per request to this many bytes. Defaults to 1MB #[clap(action, long, short = 'b', default_value = "1048576")] max_request_payload_size_bytes: usize, @@ -59,7 +59,7 @@ pub struct Config { #[clap(action, long, short = 'c', default_value = "10")] max_concurrent_uploads: usize, - /// The namespace into which to write + /// The namespace into which to write, in the form _ #[clap(action)] namespace: String, diff --git a/influxdb_iox/src/main.rs b/influxdb_iox/src/main.rs index 86e321861a..c29a862997 100644 --- a/influxdb_iox/src/main.rs +++ b/influxdb_iox/src/main.rs @@ -92,10 +92,10 @@ Examples: # Display all "run" mode settings influxdb_iox run --help - # Run the interactive SQL prompt + # Run the interactive SQL prompt against a running server influxdb_iox sql -Command are generally structured in the form: +Commands are generally structured in the form: For example, a command such as the following shows all actions @@ -105,16 +105,26 @@ For example, a command such as the following shows all actions "# )] struct Config { - /// gRPC address of IOx server to connect to + /// gRPC or HTTP address of IOx server to connect to #[clap( short, long, global = true, env = "IOX_ADDR", - default_value = "http://127.0.0.1:8082", - action + action, + help = "gRPC or HTTP address and port of IOx server, takes precedence over --http_host, [default: http://127.0.0.1:8082]" )] - host: String, + host: Option, + + /// http address of IOx server to connect to + #[clap( + long, + global = true, + env = "IOX_HTTP_ADDR", + action, + help = "http address and port of IOx server, [default: http://127.0.0.1:8080]" + )] + http_host: Option, /// Additional headers to add to CLI requests /// @@ -216,16 +226,18 @@ fn main() -> Result<(), std::io::Error> { // load all environment variables from .env before doing anything load_dotenv(); - let config: Config = clap::Parser::parse(); + let global_config: Config = clap::Parser::parse(); - let tokio_runtime = get_runtime(config.num_threads)?; + let tokio_runtime = get_runtime(global_config.num_threads)?; tokio_runtime.block_on(async move { - let host = config.host; - let headers = config.header; - let log_verbose_count = config.all_in_one_config.logging_config.log_verbose_count; - let rpc_timeout = config.rpc_timeout; + let headers = global_config.header; + let log_verbose_count = global_config + .all_in_one_config + .logging_config + .log_verbose_count; + let rpc_timeout = global_config.rpc_timeout; - let connection = || async move { + let connection = |host| async move { let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| { debug!(name=?kv.key, value=?kv.value, "Setting header"); builder.header(kv.key, kv.value) @@ -233,18 +245,22 @@ fn main() -> Result<(), std::io::Error> { builder = builder.timeout(rpc_timeout); - if config.gen_trace_id { - let key = http::header::HeaderName::from_str(&config.trace_id_header).unwrap(); + if global_config.gen_trace_id { + let key = + http::header::HeaderName::from_str(&global_config.trace_id_header).unwrap(); let trace_id = gen_trace_id(); let value = http::header::HeaderValue::from_str(trace_id.as_str()).unwrap(); debug!(name=?key, value=?value, "Setting trace header"); builder = builder.header(key, value); // Emit trace id information - println!("Trace ID set to {}={}", config.trace_id_header, trace_id); + println!( + "Trace ID set to {}={}", + global_config.trace_id_header, trace_id + ); } - if let Some(token) = config.token.as_ref() { + if let Some(token) = global_config.token.as_ref() { let key = http::header::HeaderName::from_str("Authorization").unwrap(); let value = http::header::HeaderValue::from_str(&format!("Token {token}")).unwrap(); debug!(name=?key, value=?value, "Setting token header"); @@ -270,17 +286,30 @@ fn main() -> Result<(), std::io::Error> { } } - match config.command { + let grpc_host = global_config + .host + .clone() + .unwrap_or("http://127.0.0.1:8082".to_string()); + // The Write subcommand needs to use the http endpoint:port, unless the grpc_host is + // explicitly set, then use the grpc host:port to preserve existing users of --host with + // the write command. + let http_host = match global_config.host { + None => global_config + .http_host + .unwrap_or("http://127.0.0.1:8080".to_string()), + Some(_) => grpc_host.clone(), + }; + match global_config.command { None => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - if let Err(e) = all_in_one::command(config.all_in_one_config).await { + if let Err(e) = all_in_one::command(global_config.all_in_one_config).await { eprintln!("Server command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } Some(Command::Remote(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::remote::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -296,7 +325,7 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Sql(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::sql::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -304,7 +333,7 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Storage(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::storage::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -326,14 +355,14 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Debug(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - if let Err(e) = commands::debug::command(connection, config).await { + if let Err(e) = commands::debug::command(|| connection(grpc_host), config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) } } Some(Command::Write(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(http_host).await; if let Err(e) = commands::write::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -341,7 +370,7 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Query(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::query::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -349,7 +378,7 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::QueryIngester(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::query_ingester::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -357,7 +386,7 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Import(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::import::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) @@ -365,7 +394,7 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Namespace(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let connection = connection().await; + let connection = connection(grpc_host).await; if let Err(e) = commands::namespace::command(connection, config).await { eprintln!("{e}"); std::process::exit(ReturnCode::Failure as _) diff --git a/influxdb_iox_client/README.md b/influxdb_iox_client/README.md index 4d5fb6a3de..52b7934614 100644 --- a/influxdb_iox_client/README.md +++ b/influxdb_iox_client/README.md @@ -1,37 +1,15 @@ # InfluxDB IOx Client -This is the official Rust client library for connecting to InfluxDB IOx. +This is the Rust client library for connecting to InfluxDB IOx. -Currently only gRPC is supported. +We're attempting to support all apis as they are added and modified but this client +is likely not 100% complete at any time. -## Using the gRPC Write Client +Some apis are http (for instance the `write`) and some are gRPC. See the individual +client modules for details. + +## Example: Using the Write Client To write to IOx, create a connection and a write client, and then send line -protocol. Here is an example of creating an instance that connects to an IOx -server running at `http://127.0.0.1:8081` (the default bind address for the -gRPC endpoint of IOx when running in all-in-one mode) and sending a line of -line protocol: - -```rust -#[tokio::main] -fn main() { - use influxdb_iox_client::{ - write::Client, - connection::Builder, - }; - - let mut connection = Builder::default() - .build("http://127.0.0.1:8081") - .await - .unwrap(); - - let mut client = Client::new(connection); - - // write a line of line protocol data - client - .write_lp("bananas", "cpu,region=west user=23.2 100",0) - .await - .expect("failed to write to IOx"); - } -} -``` +protocol. Please see the example on ['Client' struct](./src/client/write.rs) that will work +when running against all-in-one mode. diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index 0185f9096d..36a67d9c0c 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -22,16 +22,16 @@ pub const DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES: Option = Some(1024 * 10 /// connection::Builder, /// }; /// -/// let mut connection = Builder::default() +/// let connection = Builder::default() /// .build("http://127.0.0.1:8080") /// .await /// .unwrap(); /// /// let mut client = Client::new(connection); /// -/// // write a line of line procol data +/// // write a line of line protocol data /// client -/// .write_lp("bananas", "cpu,region=west user=23.2 100") +/// .write_lp("fruit_bananas", "cpu,region=west user=23.2 100") /// .await /// .expect("failed to write to IOx"); /// # }