feat: add support for parsing predicate
parent
08901c13cd
commit
5441682207
|
@ -1781,6 +1781,7 @@ dependencies = [
|
|||
"influxdb_iox_client",
|
||||
"influxdb_line_protocol",
|
||||
"influxdb_storage_client",
|
||||
"influxrpc_parser",
|
||||
"ingester",
|
||||
"internal_types",
|
||||
"iox_catalog",
|
||||
|
|
|
@ -16,6 +16,7 @@ influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "
|
|||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
ingester = { path = "../ingester" }
|
||||
internal_types = { path = "../internal_types" }
|
||||
influxrpc_parser = { path = "../influxrpc_parser"}
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_object_store = { path = "../iox_object_store" }
|
||||
job_registry = { path = "../job_registry" }
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
use generated_types::Predicate;
|
||||
use influxrpc_parser::predicate;
|
||||
use time;
|
||||
|
||||
use snafu::Snafu;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Unable to parse timestamp '{:?}'", t))]
|
||||
TimestampParseError { t: String },
|
||||
|
||||
#[snafu(display("Unable to parse predicate: {:?}", source))]
|
||||
PredicateParseError { source: predicate::Error },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -25,8 +30,8 @@ pub struct Config {
|
|||
stop: i64,
|
||||
|
||||
/// A predicate to filter results by. Effectively InfluxQL predicate format (see examples).
|
||||
#[clap(long, default_value = "")]
|
||||
predicate: String,
|
||||
#[clap(long, default_value = "", parse(try_from_str = parse_predicate))]
|
||||
predicate: Predicate,
|
||||
}
|
||||
|
||||
// Attempts to parse either a stringified `i64` value. or alternatively parse an
|
||||
|
@ -43,6 +48,16 @@ fn parse_range(s: &str) -> Result<i64, Error> {
|
|||
}
|
||||
}
|
||||
|
||||
// Attempts to parse the optional predicate into an `Predicate` RPC node. This
|
||||
// node is then used as part of a read request.
|
||||
fn parse_predicate(expr: &str) -> Result<Predicate, Error> {
|
||||
if expr.is_empty() {
|
||||
return Ok(Predicate::default());
|
||||
}
|
||||
|
||||
predicate::expr_to_rpc_predicate(expr).context(PredicateParseSnafu)
|
||||
}
|
||||
|
||||
/// All possible subcommands for storage
|
||||
#[derive(Debug, clap::Parser)]
|
||||
enum Command {
|
||||
|
|
|
@ -168,7 +168,7 @@ enum Command {
|
|||
Debug(commands::debug::Config),
|
||||
|
||||
/// Initiate a read request to the gRPC storage service.
|
||||
StorageRPC(commands::storage::Config),
|
||||
Storage(commands::storage::Config),
|
||||
}
|
||||
|
||||
fn main() -> Result<(), std::io::Error> {
|
||||
|
@ -258,7 +258,7 @@ fn main() -> Result<(), std::io::Error> {
|
|||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Command::StorageRPC(config) => {
|
||||
Command::Storage(config) => {
|
||||
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
|
||||
if let Err(e) = commands::storage::command(config).await {
|
||||
eprintln!("{}", e);
|
||||
|
|
Loading…
Reference in New Issue