feat: use /api/v2 upload for write command rather than grpc `write_service` (#5749)
* feat: use /api/v2 upload for write command rather than grpc service * fix: Update influxdb_iox/src/commands/write.rspull/24376/head
parent
48c6ff0e09
commit
13ed1c089a
|
@ -2045,6 +2045,7 @@ dependencies = [
|
|||
"http",
|
||||
"humantime",
|
||||
"import",
|
||||
"influxdb2_client",
|
||||
"influxdb_iox_client",
|
||||
"influxdb_storage_client",
|
||||
"influxrpc_parser",
|
||||
|
|
|
@ -251,10 +251,10 @@ Data can be written to InfluxDB IOx by sending [line protocol] format to the `/a
|
|||
For example, assuming you are running in local mode, this command will send data in the `test_fixtures/lineproto/metrics.lp` file to the `company_sensors` database.
|
||||
|
||||
```shell
|
||||
./target/debug/influxdb_iox -vv write company_sensors test_fixtures/lineproto/metrics.lp --host http://localhost:8081
|
||||
./target/debug/influxdb_iox -vv write company_sensors test_fixtures/lineproto/metrics.lp --host http://localhost:8080
|
||||
```
|
||||
|
||||
Note that `--host http://localhost:8081` is required because the router and query services run on different gRPC ports and the CLI defaults to the querier's port, `8082`.
|
||||
Note that `--host http://localhost:8080` is required as the `/v2/api` endpoint is hosted on port `8080` while the default is the querier gRPC port `8082`.
|
||||
|
||||
To query the data stored in the `company_sensors` database:
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ datafusion = { path = "../datafusion" }
|
|||
generated_types = { path = "../generated_types" }
|
||||
import = { path = "../import" }
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
||||
influxdb2_client = { path = "../influxdb2_client" }
|
||||
influxdb_storage_client = { path = "../influxdb_storage_client" }
|
||||
influxrpc_parser = { path = "../influxrpc_parser"}
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
|
|
|
@ -1,19 +1,22 @@
|
|||
use influxdb_iox_client::{connection::Connection, write};
|
||||
use iox_time::TimeProvider;
|
||||
use influxdb2_client::RequestError;
|
||||
use observability_deps::tracing::debug;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{fs::File, io::Read, path::PathBuf};
|
||||
use thiserror::Error;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Error)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[error("Error reading file {:?}: {}", file_name, source)]
|
||||
#[snafu(display("Error reading file {:?}: {}", file_name, source))]
|
||||
ReadingFile {
|
||||
file_name: PathBuf,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[error("Client error: {0}")]
|
||||
ClientError(#[from] influxdb_iox_client::error::Error),
|
||||
#[snafu(display("Client error: {source}"))]
|
||||
ClientError { source: RequestError },
|
||||
|
||||
#[snafu(display("Invalid namespace '{namespace}': {reason}"))]
|
||||
InvalidNamespace { namespace: String, reason: String },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -21,34 +24,131 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// Write data into the specified database
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct Config {
|
||||
/// The name of the database
|
||||
/// The namespace into which to write
|
||||
#[clap(action)]
|
||||
name: String,
|
||||
namespace: String,
|
||||
|
||||
/// File with data to load. Currently supported formats are .lp
|
||||
#[clap(action)]
|
||||
file_name: PathBuf,
|
||||
}
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
let mut client = write::Client::new(connection);
|
||||
pub async fn command(url: String, config: Config) -> Result<()> {
|
||||
let Config {
|
||||
namespace,
|
||||
file_name,
|
||||
} = config;
|
||||
let file_name = &file_name;
|
||||
|
||||
let mut file = File::open(&config.file_name).map_err(|e| Error::ReadingFile {
|
||||
file_name: config.file_name.clone(),
|
||||
source: e,
|
||||
})?;
|
||||
let mut file = File::open(file_name).context(ReadingFileSnafu { file_name })?;
|
||||
|
||||
let mut lp_data = String::new();
|
||||
file.read_to_string(&mut lp_data)
|
||||
.map_err(|e| Error::ReadingFile {
|
||||
file_name: config.file_name.clone(),
|
||||
source: e,
|
||||
})?;
|
||||
.context(ReadingFileSnafu { file_name })?;
|
||||
|
||||
let default_time = iox_time::SystemProvider::new().now().timestamp_nanos();
|
||||
let lines_written = client.write_lp(config.name, lp_data, default_time).await?;
|
||||
let total_bytes = lp_data.len();
|
||||
|
||||
println!("{} Lines OK", lines_written);
|
||||
// split a namespace name ("foo_bar") into org_bucket
|
||||
let (org_id, bucket_id) = split_namespace(&namespace)?;
|
||||
|
||||
debug!(url, total_bytes, org_id, bucket_id, "Writing data");
|
||||
|
||||
// IOx's v2 api doesn't validate auth tokens so pass an empty one
|
||||
let auth_token = "";
|
||||
let client = influxdb2_client::Client::new(url, auth_token);
|
||||
|
||||
client
|
||||
.write_line_protocol(org_id, bucket_id, lp_data)
|
||||
.await
|
||||
.context(ClientSnafu)?;
|
||||
|
||||
println!("{} Bytes OK", total_bytes);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Splits up the strings into org_id and bucket_id
|
||||
fn split_namespace(namespace: &str) -> Result<(&str, &str)> {
|
||||
let mut iter = namespace.split('_');
|
||||
let org_id = iter.next().context(InvalidNamespaceSnafu {
|
||||
namespace,
|
||||
reason: "empty",
|
||||
})?;
|
||||
|
||||
if org_id.is_empty() {
|
||||
return InvalidNamespaceSnafu {
|
||||
namespace,
|
||||
reason: "No org_id found",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let bucket_id = iter.next().context(InvalidNamespaceSnafu {
|
||||
namespace,
|
||||
reason: "Could not find '_'",
|
||||
})?;
|
||||
|
||||
if bucket_id.is_empty() {
|
||||
return InvalidNamespaceSnafu {
|
||||
namespace,
|
||||
reason: "No bucket_id found",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
if iter.next().is_some() {
|
||||
return InvalidNamespaceSnafu {
|
||||
namespace,
|
||||
reason: "More than one '_'",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
Ok((org_id, bucket_id))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn split_good() {
|
||||
assert_eq!(split_namespace("foo_bar").unwrap(), ("foo", "bar"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "No org_id found")]
|
||||
fn split_bad_empty() {
|
||||
split_namespace("").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "No org_id found")]
|
||||
fn split_bad_only_underscore() {
|
||||
split_namespace("_").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "No org_id found")]
|
||||
fn split_bad_empty_org_id() {
|
||||
split_namespace("_ff").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "No bucket_id found")]
|
||||
fn split_bad_empty_bucket_id() {
|
||||
split_namespace("ff_").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "More than one '_'")]
|
||||
fn split_too_many() {
|
||||
split_namespace("ff_bf_").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "More than one '_'")]
|
||||
fn split_way_too_many() {
|
||||
split_namespace("ff_bf_dfd_3_f").unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -209,6 +209,7 @@ fn main() -> Result<(), std::io::Error> {
|
|||
let log_verbose_count = config.all_in_one_config.logging_config.log_verbose_count;
|
||||
let rpc_timeout = config.rpc_timeout;
|
||||
|
||||
let host_captured = host.clone();
|
||||
let connection = || async move {
|
||||
let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| {
|
||||
builder.header(kv.key, kv.value)
|
||||
|
@ -229,10 +230,10 @@ fn main() -> Result<(), std::io::Error> {
|
|||
println!("Trace ID set to {}", trace_id);
|
||||
}
|
||||
|
||||
match builder.build(&host).await {
|
||||
match builder.build(&host_captured).await {
|
||||
Ok(connection) => connection,
|
||||
Err(e) => {
|
||||
eprintln!("Error connecting to {}: {}", host, e);
|
||||
eprintln!("Error connecting to {}: {}", host_captured, e);
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
|
@ -311,8 +312,7 @@ fn main() -> Result<(), std::io::Error> {
|
|||
}
|
||||
Some(Command::Write(config)) => {
|
||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||
let connection = connection().await;
|
||||
if let Err(e) = commands::write::command(connection, config).await {
|
||||
if let Err(e) = commands::write::command(host, config).await {
|
||||
eprintln!("{}", e);
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
|
|
|
@ -4,8 +4,9 @@ use assert_cmd::Command;
|
|||
use futures::FutureExt;
|
||||
use predicates::prelude::*;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use tempfile::tempdir;
|
||||
use test_helpers::make_temp_file;
|
||||
use test_helpers_end_to_end::{
|
||||
maybe_skip_integration, AddAddrEnv, BindAddresses, MiniCluster, ServerType, Step, StepTest,
|
||||
StepTestState,
|
||||
|
@ -512,6 +513,86 @@ async fn schema_cli() {
|
|||
.await
|
||||
}
|
||||
|
||||
/// Test write CLI command and query CLI command
|
||||
#[tokio::test]
|
||||
async fn write_and_query() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let mut cluster = MiniCluster::create_shared(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
// write line protocol to a temp file
|
||||
let lp_file = make_temp_file("m,tag=1 v=2 12345");
|
||||
let lp_file_path = lp_file.path().to_string_lossy().to_string();
|
||||
let router_addr = state.cluster().router().router_http_base().to_string();
|
||||
|
||||
let namespace = state.cluster().namespace();
|
||||
println!("Writing into {namespace}");
|
||||
|
||||
// Validate the output of the schema CLI command
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&router_addr)
|
||||
.arg("write")
|
||||
.arg(&namespace)
|
||||
.arg(&lp_file_path)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("17 Bytes OK"));
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
let querier_addr = state.cluster().querier().querier_grpc_base().to_string();
|
||||
let namespace = state.cluster().namespace();
|
||||
|
||||
let max_wait_time = Duration::from_secs(10);
|
||||
let expected = "| 1 | 1970-01-01T00:00:00.000012345Z | 2 |";
|
||||
println!("Waiting for {expected}");
|
||||
|
||||
// Validate the output of running the query CLI command appears after at most max_wait_time
|
||||
let end = Instant::now() + max_wait_time;
|
||||
while Instant::now() < end {
|
||||
let maybe_result = Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&querier_addr)
|
||||
.arg("query")
|
||||
.arg(&namespace)
|
||||
.arg("SELECT * from m")
|
||||
.assert()
|
||||
.success()
|
||||
.try_stdout(predicate::str::contains(expected));
|
||||
|
||||
match maybe_result {
|
||||
Err(e) => {
|
||||
println!("Got err: {}, retrying", e);
|
||||
}
|
||||
Ok(r) => {
|
||||
println!("Success: {:?}", r);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// sleep and try again
|
||||
tokio::time::sleep(Duration::from_millis(500)).await
|
||||
}
|
||||
panic!("Did not find expected output in allotted time");
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Test the schema cli command
|
||||
#[tokio::test]
|
||||
async fn namespaces_cli() {
|
||||
|
|
Loading…
Reference in New Issue