From 2334e779ebce3c378f9512af642b8f19eec24472 Mon Sep 17 00:00:00 2001 From: Edd Robinson <me@edd.io> Date: Wed, 9 Feb 2022 12:32:20 +0000 Subject: [PATCH] feat: implement read_window_aggregate sub-command --- influxdb_iox/src/commands/storage.rs | 60 +++++++++++++++++++- influxdb_iox/src/commands/storage/request.rs | 19 +++++-- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/influxdb_iox/src/commands/storage.rs b/influxdb_iox/src/commands/storage.rs index 585351c4f7..71ff287dd5 100644 --- a/influxdb_iox/src/commands/storage.rs +++ b/influxdb_iox/src/commands/storage.rs @@ -2,11 +2,12 @@ pub(crate) mod request; pub(crate) mod response; use std::num::NonZeroU64; +use std::time::Duration; use snafu::{ResultExt, Snafu}; use tonic::Status; -use generated_types::Predicate; +use generated_types::{aggregate::AggregateType, Predicate}; use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket}; use influxrpc_parser::predicate; use time; @@ -25,11 +26,17 @@ pub enum ParseError { #[snafu(display("server error: {:?}", source))] ServerError { source: Status }, + #[snafu(display("error building request: {:?}", source))] + Request { source: request::Error }, + #[snafu(display("error building response: {:?}", source))] ResponseError { source: response::Error }, #[snafu(display("value {:?} not supported for flag {:?}", value, flag))] UnsupportedFlagValue { value: String, flag: String }, + + #[snafu(display("unsupported aggregate type: '{:?}'", agg))] + Aggregate { agg: String }, } pub type Result<T, E = ParseError> = std::result::Result<T, E>; @@ -135,9 +142,37 @@ pub enum Format { #[derive(Debug, clap::Parser)] enum Command { ReadFilter, + ReadWindowAggregate(ReadWindowAggregate), TagValues(TagValues), } +#[derive(Debug, clap::Parser)] +struct ReadWindowAggregate { + #[clap(long, default_value = "", parse(try_from_str = humantime::parse_duration))] + window_every: Duration, + + #[clap(long, default_value = "", parse(try_from_str = humantime::parse_duration))] + offset: Duration, + + #[clap(long, default_value = "", parse(try_from_str = parse_aggregate))] + aggregate: Vec<AggregateType>, +} + +// Attempts to parse the optional format. +fn parse_aggregate(aggs: &str) -> Result<AggregateType, ParseError> { + match aggs.to_lowercase().as_str() { + "none" => Ok(AggregateType::None), + "count" => Ok(AggregateType::Count), + "sum" => Ok(AggregateType::Sum), + "min" => Ok(AggregateType::Min), + "max" => Ok(AggregateType::Max), + "mean" => Ok(AggregateType::Mean), + "first" => Ok(AggregateType::First), + "last" => Ok(AggregateType::Last), + _ => AggregateSnafu { agg: aggs }.fail(), + } +} + #[derive(Debug, clap::Parser)] struct TagValues { // The tag key value to interrogate for tag values. @@ -169,6 +204,29 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { Format::Quiet => {} } } + Command::ReadWindowAggregate(rwa) => { + let result = client + .read_window_aggregate( + request::read_window_aggregate( + source, + config.start, + config.stop, + predicate, + rwa.window_every, + rwa.offset, + rwa.aggregate, + None, // TODO(edd): determine if window needs to be set + ) + .context(RequestSnafu)?, + ) + .await + .context(ServerSnafu)?; + + match config.format { + Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?, + Format::Quiet => {} + } + } Command::TagValues(tv) => { let result = client .tag_values(request::tag_values( diff --git a/influxdb_iox/src/commands/storage/request.rs b/influxdb_iox/src/commands/storage/request.rs index cd15867db8..648cb67541 100644 --- a/influxdb_iox/src/commands/storage/request.rs +++ b/influxdb_iox/src/commands/storage/request.rs @@ -8,15 +8,15 @@ use self::generated_types::*; use super::response::{ tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN, }; -use ::generated_types::google::protobuf::*; +use ::generated_types::{aggregate::AggregateType, google::protobuf::*}; #[derive(Debug, Snafu)] -pub enum Request { +pub enum Error { #[snafu(display("duration {:?} too large", d))] Duration { d: std::time::Duration }, } -pub type Result<T, E = Request> = std::result::Result<T, E>; +pub type Result<T, E = Error> = std::result::Result<T, E>; pub fn read_filter( org_bucket: Any, @@ -33,6 +33,7 @@ pub fn read_filter( } } +#[allow(clippy::too_many_arguments)] pub fn read_window_aggregate( org_bucket: Any, start: i64, @@ -40,9 +41,9 @@ pub fn read_window_aggregate( predicate: std::option::Option<Predicate>, every: std::time::Duration, offset: std::time::Duration, - aggregates: Vec<Aggregate>, + aggregates: Vec<AggregateType>, window: std::option::Option<Window>, -) -> Result<ReadWindowAggregateRequest, Request> { +) -> Result<ReadWindowAggregateRequest, Error> { let window_every = if every.as_nanos() > i64::MAX as u128 { return DurationSnafu { d: every }.fail(); } else { @@ -55,13 +56,19 @@ pub fn read_window_aggregate( offset.as_nanos() as i64 }; + // wrap in the PB message type for aggregates. + let aggregate = aggregates + .into_iter() + .map(|a| Aggregate { r#type: a as i32 }) + .collect::<Vec<_>>(); + Ok(generated_types::ReadWindowAggregateRequest { predicate, read_source: Some(org_bucket), range: Some(TimestampRange { start, end: stop }), window_every, offset, - aggregate: aggregates, + aggregate, window, }) }