fix: default the write cli comment to the http default port. (#7394)
* fix: default the write cli command to the http default port. The all-in-one write api is based on influxdb cloud's v2 http api, which uses the 8080 http default port. This changeset will allow 'influxdb_iox write' to work against default influxdb_iox all-in-one without needing to use the --host option to change the port. It should not change behavior for existing users of `--host`. It adds a new configuartion option call `--http-host` to set the http port separately from the gRPC one. * fix: fmtpull/24376/head
parent
275dad704e
commit
6cc2f03ea0
|
@ -50,7 +50,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
/// Write data into the specified namespace
|
/// Write data into the specified namespace
|
||||||
#[derive(Debug, clap::Parser)]
|
#[derive(Debug, clap::Parser)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
/// If specified, restricts the maxium amount of line protocol
|
/// If specified, restricts the maximum amount of line protocol
|
||||||
/// sent per request to this many bytes. Defaults to 1MB
|
/// sent per request to this many bytes. Defaults to 1MB
|
||||||
#[clap(action, long, short = 'b', default_value = "1048576")]
|
#[clap(action, long, short = 'b', default_value = "1048576")]
|
||||||
max_request_payload_size_bytes: usize,
|
max_request_payload_size_bytes: usize,
|
||||||
|
@ -59,7 +59,7 @@ pub struct Config {
|
||||||
#[clap(action, long, short = 'c', default_value = "10")]
|
#[clap(action, long, short = 'c', default_value = "10")]
|
||||||
max_concurrent_uploads: usize,
|
max_concurrent_uploads: usize,
|
||||||
|
|
||||||
/// The namespace into which to write
|
/// The namespace into which to write, in the form <org_id>_<bucket_id>
|
||||||
#[clap(action)]
|
#[clap(action)]
|
||||||
namespace: String,
|
namespace: String,
|
||||||
|
|
||||||
|
|
|
@ -92,10 +92,10 @@ Examples:
|
||||||
# Display all "run" mode settings
|
# Display all "run" mode settings
|
||||||
influxdb_iox run --help
|
influxdb_iox run --help
|
||||||
|
|
||||||
# Run the interactive SQL prompt
|
# Run the interactive SQL prompt against a running server
|
||||||
influxdb_iox sql
|
influxdb_iox sql
|
||||||
|
|
||||||
Command are generally structured in the form:
|
Commands are generally structured in the form:
|
||||||
<type of object> <action> <arguments>
|
<type of object> <action> <arguments>
|
||||||
|
|
||||||
For example, a command such as the following shows all actions
|
For example, a command such as the following shows all actions
|
||||||
|
@ -105,16 +105,26 @@ For example, a command such as the following shows all actions
|
||||||
"#
|
"#
|
||||||
)]
|
)]
|
||||||
struct Config {
|
struct Config {
|
||||||
/// gRPC address of IOx server to connect to
|
/// gRPC or HTTP address of IOx server to connect to
|
||||||
#[clap(
|
#[clap(
|
||||||
short,
|
short,
|
||||||
long,
|
long,
|
||||||
global = true,
|
global = true,
|
||||||
env = "IOX_ADDR",
|
env = "IOX_ADDR",
|
||||||
default_value = "http://127.0.0.1:8082",
|
action,
|
||||||
action
|
help = "gRPC or HTTP address and port of IOx server, takes precedence over --http_host, [default: http://127.0.0.1:8082]"
|
||||||
)]
|
)]
|
||||||
host: String,
|
host: Option<String>,
|
||||||
|
|
||||||
|
/// http address of IOx server to connect to
|
||||||
|
#[clap(
|
||||||
|
long,
|
||||||
|
global = true,
|
||||||
|
env = "IOX_HTTP_ADDR",
|
||||||
|
action,
|
||||||
|
help = "http address and port of IOx server, [default: http://127.0.0.1:8080]"
|
||||||
|
)]
|
||||||
|
http_host: Option<String>,
|
||||||
|
|
||||||
/// Additional headers to add to CLI requests
|
/// Additional headers to add to CLI requests
|
||||||
///
|
///
|
||||||
|
@ -216,16 +226,18 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
// load all environment variables from .env before doing anything
|
// load all environment variables from .env before doing anything
|
||||||
load_dotenv();
|
load_dotenv();
|
||||||
|
|
||||||
let config: Config = clap::Parser::parse();
|
let global_config: Config = clap::Parser::parse();
|
||||||
|
|
||||||
let tokio_runtime = get_runtime(config.num_threads)?;
|
let tokio_runtime = get_runtime(global_config.num_threads)?;
|
||||||
tokio_runtime.block_on(async move {
|
tokio_runtime.block_on(async move {
|
||||||
let host = config.host;
|
let headers = global_config.header;
|
||||||
let headers = config.header;
|
let log_verbose_count = global_config
|
||||||
let log_verbose_count = config.all_in_one_config.logging_config.log_verbose_count;
|
.all_in_one_config
|
||||||
let rpc_timeout = config.rpc_timeout;
|
.logging_config
|
||||||
|
.log_verbose_count;
|
||||||
|
let rpc_timeout = global_config.rpc_timeout;
|
||||||
|
|
||||||
let connection = || async move {
|
let connection = |host| async move {
|
||||||
let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| {
|
let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| {
|
||||||
debug!(name=?kv.key, value=?kv.value, "Setting header");
|
debug!(name=?kv.key, value=?kv.value, "Setting header");
|
||||||
builder.header(kv.key, kv.value)
|
builder.header(kv.key, kv.value)
|
||||||
|
@ -233,18 +245,22 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
|
|
||||||
builder = builder.timeout(rpc_timeout);
|
builder = builder.timeout(rpc_timeout);
|
||||||
|
|
||||||
if config.gen_trace_id {
|
if global_config.gen_trace_id {
|
||||||
let key = http::header::HeaderName::from_str(&config.trace_id_header).unwrap();
|
let key =
|
||||||
|
http::header::HeaderName::from_str(&global_config.trace_id_header).unwrap();
|
||||||
let trace_id = gen_trace_id();
|
let trace_id = gen_trace_id();
|
||||||
let value = http::header::HeaderValue::from_str(trace_id.as_str()).unwrap();
|
let value = http::header::HeaderValue::from_str(trace_id.as_str()).unwrap();
|
||||||
debug!(name=?key, value=?value, "Setting trace header");
|
debug!(name=?key, value=?value, "Setting trace header");
|
||||||
builder = builder.header(key, value);
|
builder = builder.header(key, value);
|
||||||
|
|
||||||
// Emit trace id information
|
// Emit trace id information
|
||||||
println!("Trace ID set to {}={}", config.trace_id_header, trace_id);
|
println!(
|
||||||
|
"Trace ID set to {}={}",
|
||||||
|
global_config.trace_id_header, trace_id
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(token) = config.token.as_ref() {
|
if let Some(token) = global_config.token.as_ref() {
|
||||||
let key = http::header::HeaderName::from_str("Authorization").unwrap();
|
let key = http::header::HeaderName::from_str("Authorization").unwrap();
|
||||||
let value = http::header::HeaderValue::from_str(&format!("Token {token}")).unwrap();
|
let value = http::header::HeaderValue::from_str(&format!("Token {token}")).unwrap();
|
||||||
debug!(name=?key, value=?value, "Setting token header");
|
debug!(name=?key, value=?value, "Setting token header");
|
||||||
|
@ -270,17 +286,30 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match config.command {
|
let grpc_host = global_config
|
||||||
|
.host
|
||||||
|
.clone()
|
||||||
|
.unwrap_or("http://127.0.0.1:8082".to_string());
|
||||||
|
// The Write subcommand needs to use the http endpoint:port, unless the grpc_host is
|
||||||
|
// explicitly set, then use the grpc host:port to preserve existing users of --host with
|
||||||
|
// the write command.
|
||||||
|
let http_host = match global_config.host {
|
||||||
|
None => global_config
|
||||||
|
.http_host
|
||||||
|
.unwrap_or("http://127.0.0.1:8080".to_string()),
|
||||||
|
Some(_) => grpc_host.clone(),
|
||||||
|
};
|
||||||
|
match global_config.command {
|
||||||
None => {
|
None => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
if let Err(e) = all_in_one::command(config.all_in_one_config).await {
|
if let Err(e) = all_in_one::command(global_config.all_in_one_config).await {
|
||||||
eprintln!("Server command failed: {e}");
|
eprintln!("Server command failed: {e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(Command::Remote(config)) => {
|
Some(Command::Remote(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::remote::command(connection, config).await {
|
if let Err(e) = commands::remote::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -296,7 +325,7 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::Sql(config)) => {
|
Some(Command::Sql(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::sql::command(connection, config).await {
|
if let Err(e) = commands::sql::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -304,7 +333,7 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::Storage(config)) => {
|
Some(Command::Storage(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::storage::command(connection, config).await {
|
if let Err(e) = commands::storage::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -326,14 +355,14 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::Debug(config)) => {
|
Some(Command::Debug(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
if let Err(e) = commands::debug::command(connection, config).await {
|
if let Err(e) = commands::debug::command(|| connection(grpc_host), config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(Command::Write(config)) => {
|
Some(Command::Write(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(http_host).await;
|
||||||
if let Err(e) = commands::write::command(connection, config).await {
|
if let Err(e) = commands::write::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -341,7 +370,7 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::Query(config)) => {
|
Some(Command::Query(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::query::command(connection, config).await {
|
if let Err(e) = commands::query::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -349,7 +378,7 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::QueryIngester(config)) => {
|
Some(Command::QueryIngester(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::query_ingester::command(connection, config).await {
|
if let Err(e) = commands::query_ingester::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -357,7 +386,7 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::Import(config)) => {
|
Some(Command::Import(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::import::command(connection, config).await {
|
if let Err(e) = commands::import::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
@ -365,7 +394,7 @@ fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
Some(Command::Namespace(config)) => {
|
Some(Command::Namespace(config)) => {
|
||||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||||
let connection = connection().await;
|
let connection = connection(grpc_host).await;
|
||||||
if let Err(e) = commands::namespace::command(connection, config).await {
|
if let Err(e) = commands::namespace::command(connection, config).await {
|
||||||
eprintln!("{e}");
|
eprintln!("{e}");
|
||||||
std::process::exit(ReturnCode::Failure as _)
|
std::process::exit(ReturnCode::Failure as _)
|
||||||
|
|
|
@ -1,37 +1,15 @@
|
||||||
# InfluxDB IOx Client
|
# InfluxDB IOx Client
|
||||||
|
|
||||||
This is the official Rust client library for connecting to InfluxDB IOx.
|
This is the Rust client library for connecting to InfluxDB IOx.
|
||||||
|
|
||||||
Currently only gRPC is supported.
|
We're attempting to support all apis as they are added and modified but this client
|
||||||
|
is likely not 100% complete at any time.
|
||||||
|
|
||||||
## Using the gRPC Write Client
|
Some apis are http (for instance the `write`) and some are gRPC. See the individual
|
||||||
|
client modules for details.
|
||||||
|
|
||||||
|
## Example: Using the Write Client
|
||||||
|
|
||||||
To write to IOx, create a connection and a write client, and then send line
|
To write to IOx, create a connection and a write client, and then send line
|
||||||
protocol. Here is an example of creating an instance that connects to an IOx
|
protocol. Please see the example on ['Client' struct](./src/client/write.rs) that will work
|
||||||
server running at `http://127.0.0.1:8081` (the default bind address for the
|
when running against all-in-one mode.
|
||||||
gRPC endpoint of IOx when running in all-in-one mode) and sending a line of
|
|
||||||
line protocol:
|
|
||||||
|
|
||||||
```rust
|
|
||||||
#[tokio::main]
|
|
||||||
fn main() {
|
|
||||||
use influxdb_iox_client::{
|
|
||||||
write::Client,
|
|
||||||
connection::Builder,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut connection = Builder::default()
|
|
||||||
.build("http://127.0.0.1:8081")
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut client = Client::new(connection);
|
|
||||||
|
|
||||||
// write a line of line protocol data
|
|
||||||
client
|
|
||||||
.write_lp("bananas", "cpu,region=west user=23.2 100",0)
|
|
||||||
.await
|
|
||||||
.expect("failed to write to IOx");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
|
@ -22,16 +22,16 @@ pub const DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES: Option<usize> = Some(1024 * 10
|
||||||
/// connection::Builder,
|
/// connection::Builder,
|
||||||
/// };
|
/// };
|
||||||
///
|
///
|
||||||
/// let mut connection = Builder::default()
|
/// let connection = Builder::default()
|
||||||
/// .build("http://127.0.0.1:8080")
|
/// .build("http://127.0.0.1:8080")
|
||||||
/// .await
|
/// .await
|
||||||
/// .unwrap();
|
/// .unwrap();
|
||||||
///
|
///
|
||||||
/// let mut client = Client::new(connection);
|
/// let mut client = Client::new(connection);
|
||||||
///
|
///
|
||||||
/// // write a line of line procol data
|
/// // write a line of line protocol data
|
||||||
/// client
|
/// client
|
||||||
/// .write_lp("bananas", "cpu,region=west user=23.2 100")
|
/// .write_lp("fruit_bananas", "cpu,region=west user=23.2 100")
|
||||||
/// .await
|
/// .await
|
||||||
/// .expect("failed to write to IOx");
|
/// .expect("failed to write to IOx");
|
||||||
/// # }
|
/// # }
|
||||||
|
|
Loading…
Reference in New Issue