diff --git a/Cargo.lock b/Cargo.lock index 0bb208a651..73ad910c7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2600,6 +2600,7 @@ dependencies = [ "console-subscriber", "dotenvy", "hex", + "influxdb3_client", "influxdb3_server", "influxdb3_write", "iox_query", @@ -2615,6 +2616,7 @@ dependencies = [ "parking_lot 0.12.1", "parquet_file", "reqwest", + "secrecy", "sha2", "thiserror", "tikv-jemalloc-ctl", @@ -2625,6 +2627,7 @@ dependencies = [ "trace", "trace_exporters", "trogging", + "url", "uuid", "workspace-hack", ] diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 704e0efce2..ff952ab4b2 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -13,6 +13,7 @@ influxdb3_server = { path = "../influxdb3_server" } iox_time = { path = "../iox_time" } iox_query = { path = "../iox_query" } ioxd_common = { path = "../ioxd_common"} +influxdb3_client = { path = "../influxdb3_client" } influxdb3_write = { path = "../influxdb3_write" } metric = { path = "../metric" } object_store = { workspace = true } @@ -33,11 +34,13 @@ libc = { version = "0.2" } num_cpus = "1.16.0" once_cell = { version = "1.18", features = ["parking_lot"] } parking_lot = "0.12.1" +secrecy = "0.8.0" thiserror = "1.0.48" tikv-jemalloc-ctl = { version = "0.5.4", optional = true } tikv-jemalloc-sys = { version = "0.5.4", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] } tokio = { version = "1.32", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "io-std"] } tokio-util = { version = "0.7.9" } +url = "2.5.0" uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } sha2 = "0.10.8" diff --git a/influxdb3/src/commands/common.rs b/influxdb3/src/commands/common.rs new file mode 100644 index 0000000000..7a443fd9e2 --- /dev/null +++ b/influxdb3/src/commands/common.rs @@ -0,0 +1,23 @@ +use clap::Parser; +use secrecy::Secret; +use url::Url; + +#[derive(Debug, Parser)] +pub struct InfluxDb3Config { + /// The host URL of the running InfluxDB 3.0 server + #[clap( + short = 'h', + long = "host", + env = "INFLUXDB3_HOST_URL", + default_value = "http://127.0.0.1:8181" + )] + pub host_url: Url, + + /// The database name to run the query against + #[clap(short = 'd', long = "dbname", env = "INFLUXDB3_DATABASE_NAME")] + pub database_name: String, + + /// The token for authentication with the InfluxDB 3.0 server + #[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")] + pub auth_token: Option>, +} diff --git a/influxdb3/src/commands/query.rs b/influxdb3/src/commands/query.rs new file mode 100644 index 0000000000..cfdb8c7495 --- /dev/null +++ b/influxdb3/src/commands/query.rs @@ -0,0 +1,161 @@ +use std::str::Utf8Error; + +use clap::{Parser, ValueEnum}; +use secrecy::ExposeSecret; +use tokio::{ + fs::OpenOptions, + io::{self, AsyncWriteExt}, +}; + +use super::common::InfluxDb3Config; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error(transparent)] + Client(#[from] influxdb3_client::Error), + + #[error(transparent)] + Query(#[from] QueryError), + + #[error("invlid UTF8 received from server: {0}")] + Utf8(#[from] Utf8Error), + + #[error("io error: {0}")] + Io(#[from] io::Error), + + #[error( + "must specify an output file path with `--output` parameter when formatting\ + the output as `parquet`" + )] + NoOutputFileForParquet, +} + +pub type Result = std::result::Result; + +#[derive(Debug, Parser)] +#[clap(visible_alias = "q", trailing_var_arg = true)] +pub struct Config { + /// Common InfluxDB 3.0 config + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The query language used to format the provided query string + #[clap( + value_enum, + long = "lang", + short = 'l', + default_value_t = QueryLanguage::Sql, + )] + language: QueryLanguage, + + /// The format in which to output the query + /// + /// If `--fmt` is set to `parquet`, then you must also specify an output + /// file path with `--output`. + #[clap(value_enum, long = "fmt", default_value = "pretty")] + output_format: Format, + + /// Put all query output into `output` + #[clap(short = 'o', long = "output")] + output_file_path: Option, + + /// The query string to execute + query: Vec, +} + +#[derive(Debug, ValueEnum, Clone)] +#[clap(rename_all = "snake_case")] +enum Format { + Pretty, + Json, + Csv, + Parquet, +} + +impl Format { + fn is_parquet(&self) -> bool { + matches!(self, Self::Parquet) + } +} + +impl From for influxdb3_client::Format { + fn from(this: Format) -> Self { + match this { + Format::Pretty => Self::Pretty, + Format::Json => Self::Json, + Format::Csv => Self::Csv, + Format::Parquet => Self::Parquet, + } + } +} + +#[derive(Debug, ValueEnum, Clone)] +enum QueryLanguage { + Sql, +} + +pub(crate) async fn command(config: Config) -> Result<()> { + let InfluxDb3Config { + host_url, + database_name, + auth_token, + } = config.influxdb3_config; + let mut client = influxdb3_client::Client::new(host_url)?; + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + + let query = parse_query(config.query)?; + + // make the query using the client + let mut resp_bytes = match config.language { + QueryLanguage::Sql => { + client + .api_v3_query_sql(database_name, query) + .format(config.output_format.clone().into()) + .send() + .await? + } + }; + + // write to file if output path specified + if let Some(path) = &config.output_file_path { + let mut f = OpenOptions::new() + .write(true) + .create(true) + .open(path) + .await?; + f.write_all_buf(&mut resp_bytes).await?; + } else { + if config.output_format.is_parquet() { + Err(Error::NoOutputFileForParquet)? + } + println!("{}", std::str::from_utf8(&resp_bytes)?); + } + + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum QueryError { + #[error("no query provided")] + NoQuery, + + #[error( + "ensure that a single query string is provided as the final \ + argument, enclosed in quotes" + )] + MoreThanOne, +} + +/// Parse the user-inputted query string +fn parse_query(mut input: Vec) -> Result { + if input.is_empty() { + Err(QueryError::NoQuery)? + } + if input.len() > 1 { + Err(QueryError::MoreThanOne)? + } else { + Ok(input.remove(0)) + } +} diff --git a/influxdb3/src/commands/write.rs b/influxdb3/src/commands/write.rs new file mode 100644 index 0000000000..ad7556e98f --- /dev/null +++ b/influxdb3/src/commands/write.rs @@ -0,0 +1,65 @@ +use clap::Parser; +use secrecy::ExposeSecret; +use tokio::{ + fs::File, + io::{self, AsyncReadExt}, +}; + +use super::common::InfluxDb3Config; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error(transparent)] + Client(#[from] influxdb3_client::Error), + + #[error("error reading file: {0}")] + Io(#[from] io::Error), +} + +pub(crate) type Result = std::result::Result; + +#[derive(Debug, Parser)] +#[clap(visible_alias = "w", trailing_var_arg = true)] +pub struct Config { + /// Common InfluxDB 3.0 config + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// File path to load the write data from + /// + /// Currently, only files containing line protocol are supported. + #[clap(short = 'f', long = "file")] + file_path: String, + + /// Flag to request the server accept partial writes + /// + /// Invalid lines in the input data will be ignored by the server. + #[clap(long = "accept-partial")] + accept_partial_writes: bool, +} + +pub(crate) async fn command(config: Config) -> Result<()> { + let InfluxDb3Config { + host_url, + database_name, + auth_token, + } = config.influxdb3_config; + let mut client = influxdb3_client::Client::new(host_url)?; + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + + let mut f = File::open(config.file_path).await?; + let mut writes = Vec::new(); + f.read_to_end(&mut writes).await?; + + let mut req = client.api_v3_write_lp(database_name); + if config.accept_partial_writes { + req = req.accept_partial(true); + } + req.body(writes).send().await?; + + println!("success"); + + Ok(()) +} diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index 683fc45c42..6b9058b0a5 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -25,8 +25,11 @@ use trogging::{ }; mod commands { + pub(crate) mod common; pub mod create; + pub mod query; pub mod serve; + pub mod write; } #[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))] @@ -79,11 +82,21 @@ struct Config { command: Option, } +// Ignoring clippy here since this enum is just used for running +// the CLI command +#[allow(clippy::large_enum_variant)] #[derive(Debug, clap::Parser)] #[allow(clippy::large_enum_variant)] enum Command { /// Run the InfluxDB 3.0 server Serve(commands::serve::Config), + + /// Perform a query against a running InfluxDB 3.0 server + Query(commands::query::Config), + + /// Perform a set of writes to a running InfluxDB 3.0 server + Write(commands::write::Config), + /// Create new resources Create(commands::create::Config), } @@ -118,6 +131,18 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } + Some(Command::Query(config)) => { + if let Err(e) = commands::query::command(config).await { + eprintln!("Query command failed: {e}"); + std::process::exit(ReturnCode::Failure as _) + } + } + Some(Command::Write(config)) => { + if let Err(e) = commands::write::command(config).await { + eprintln!("Write command failed: {e}"); + std::process::exit(ReturnCode::Failure as _) + } + } Some(Command::Create(config)) => { if let Err(e) = commands::create::command(config) { eprintln!("Create command failed: {e}");