From 5441682207517145dbc49ce4a8c0ad2405727fa7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 2 Feb 2022 09:51:35 +0000 Subject: [PATCH] feat: add support for parsing predicate --- Cargo.lock | 1 + influxdb_iox/Cargo.toml | 1 + influxdb_iox/src/commands/storage.rs | 21 ++++++++++++++++++--- influxdb_iox/src/main.rs | 4 ++-- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4198b61769..6865f616ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,6 +1781,7 @@ dependencies = [ "influxdb_iox_client", "influxdb_line_protocol", "influxdb_storage_client", + "influxrpc_parser", "ingester", "internal_types", "iox_catalog", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 7823b4ac51..a9e055f29a 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -16,6 +16,7 @@ influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", " influxdb_line_protocol = { path = "../influxdb_line_protocol" } ingester = { path = "../ingester" } internal_types = { path = "../internal_types" } +influxrpc_parser = { path = "../influxrpc_parser"} iox_catalog = { path = "../iox_catalog" } iox_object_store = { path = "../iox_object_store" } job_registry = { path = "../job_registry" } diff --git a/influxdb_iox/src/commands/storage.rs b/influxdb_iox/src/commands/storage.rs index 3dd7740474..a6186f7f4c 100644 --- a/influxdb_iox/src/commands/storage.rs +++ b/influxdb_iox/src/commands/storage.rs @@ -1,11 +1,16 @@ +use generated_types::Predicate; +use influxrpc_parser::predicate; use time; -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Unable to parse timestamp '{:?}'", t))] TimestampParseError { t: String }, + + #[snafu(display("Unable to parse predicate: {:?}", source))] + PredicateParseError { source: predicate::Error }, } pub type Result = std::result::Result; @@ -25,8 +30,8 @@ pub struct Config { stop: i64, /// A predicate to filter results by. Effectively InfluxQL predicate format (see examples). - #[clap(long, default_value = "")] - predicate: String, + #[clap(long, default_value = "", parse(try_from_str = parse_predicate))] + predicate: Predicate, } // Attempts to parse either a stringified `i64` value. or alternatively parse an @@ -43,6 +48,16 @@ fn parse_range(s: &str) -> Result { } } +// Attempts to parse the optional predicate into an `Predicate` RPC node. This +// node is then used as part of a read request. +fn parse_predicate(expr: &str) -> Result { + if expr.is_empty() { + return Ok(Predicate::default()); + } + + predicate::expr_to_rpc_predicate(expr).context(PredicateParseSnafu) +} + /// All possible subcommands for storage #[derive(Debug, clap::Parser)] enum Command { diff --git a/influxdb_iox/src/main.rs b/influxdb_iox/src/main.rs index a2c5aee580..74d510db96 100644 --- a/influxdb_iox/src/main.rs +++ b/influxdb_iox/src/main.rs @@ -168,7 +168,7 @@ enum Command { Debug(commands::debug::Config), /// Initiate a read request to the gRPC storage service. - StorageRPC(commands::storage::Config), + Storage(commands::storage::Config), } fn main() -> Result<(), std::io::Error> { @@ -258,7 +258,7 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } - Command::StorageRPC(config) => { + Command::Storage(config) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); if let Err(e) = commands::storage::command(config).await { eprintln!("{}", e);