From 0774e1d328558b99b81471aa60be88e43febca09 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 8 Feb 2022 22:24:07 +0000 Subject: [PATCH 1/2] feat: add read_window_aggregate request builder --- influxdb_iox/src/commands/storage/request.rs | 103 ++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/influxdb_iox/src/commands/storage/request.rs b/influxdb_iox/src/commands/storage/request.rs index 4a994f44bc..cd15867db8 100644 --- a/influxdb_iox/src/commands/storage/request.rs +++ b/influxdb_iox/src/commands/storage/request.rs @@ -2,12 +2,22 @@ pub mod generated_types { pub use generated_types::influxdata::platform::storage::*; } +use snafu::Snafu; + use self::generated_types::*; use super::response::{ tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN, }; use ::generated_types::google::protobuf::*; +#[derive(Debug, Snafu)] +pub enum Request { + #[snafu(display("duration {:?} too large", d))] + Duration { d: std::time::Duration }, +} + +pub type Result = std::result::Result; + pub fn read_filter( org_bucket: Any, start: i64, @@ -23,6 +33,39 @@ pub fn read_filter( } } +pub fn read_window_aggregate( + org_bucket: Any, + start: i64, + stop: i64, + predicate: std::option::Option, + every: std::time::Duration, + offset: std::time::Duration, + aggregates: Vec, + window: std::option::Option, +) -> Result { + let window_every = if every.as_nanos() > i64::MAX as u128 { + return DurationSnafu { d: every }.fail(); + } else { + every.as_nanos() as i64 + }; + + let offset = if offset.as_nanos() > i64::MAX as u128 { + return DurationSnafu { d: offset }.fail(); + } else { + offset.as_nanos() as i64 + }; + + Ok(generated_types::ReadWindowAggregateRequest { + predicate, + read_source: Some(org_bucket), + range: Some(TimestampRange { start, end: stop }), + window_every, + offset, + aggregate: aggregates, + window, + }) +} + pub fn tag_values( org_bucket: Any, start: i64, @@ -46,10 +89,68 @@ pub fn tag_values( } } +#[cfg(test)] +mod test_super { + use std::num::NonZeroU64; + + use influxdb_storage_client::{Client, OrgAndBucket}; + + use super::*; + + #[test] + fn test_read_window_aggregate_durations() { + let org_bucket = Client::read_source( + &OrgAndBucket::new( + NonZeroU64::new(123_u64).unwrap(), + NonZeroU64::new(456_u64).unwrap(), + ), + 0, + ); + + let got = read_window_aggregate( + org_bucket.clone(), + 1, + 10, + None, + std::time::Duration::from_millis(3), + std::time::Duration::from_millis(2), + vec![], + None, + ) + .unwrap(); + + assert_eq!(got.window_every, 3_000_000); + assert_eq!(got.offset, 2_000_000); + + let got = read_window_aggregate( + org_bucket.clone(), + 1, + 10, + None, + std::time::Duration::from_secs(u64::MAX), + std::time::Duration::from_millis(2), + vec![], + None, + ); + assert!(got.is_err()); + + let got = read_window_aggregate( + org_bucket, + 1, + 10, + None, + std::time::Duration::from_secs(3), + std::time::Duration::from_secs(u64::MAX), + vec![], + None, + ); + assert!(got.is_err()); + } +} + // TODO Add the following helpers for building requests: // // * read_group -// * read_window_aggregate // * tag_keys // * tag_values_with_measurement_and_key // * measurement_names From 2334e779ebce3c378f9512af642b8f19eec24472 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 9 Feb 2022 12:32:20 +0000 Subject: [PATCH 2/2] feat: implement read_window_aggregate sub-command --- influxdb_iox/src/commands/storage.rs | 60 +++++++++++++++++++- influxdb_iox/src/commands/storage/request.rs | 19 +++++-- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/influxdb_iox/src/commands/storage.rs b/influxdb_iox/src/commands/storage.rs index 585351c4f7..71ff287dd5 100644 --- a/influxdb_iox/src/commands/storage.rs +++ b/influxdb_iox/src/commands/storage.rs @@ -2,11 +2,12 @@ pub(crate) mod request; pub(crate) mod response; use std::num::NonZeroU64; +use std::time::Duration; use snafu::{ResultExt, Snafu}; use tonic::Status; -use generated_types::Predicate; +use generated_types::{aggregate::AggregateType, Predicate}; use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket}; use influxrpc_parser::predicate; use time; @@ -25,11 +26,17 @@ pub enum ParseError { #[snafu(display("server error: {:?}", source))] ServerError { source: Status }, + #[snafu(display("error building request: {:?}", source))] + Request { source: request::Error }, + #[snafu(display("error building response: {:?}", source))] ResponseError { source: response::Error }, #[snafu(display("value {:?} not supported for flag {:?}", value, flag))] UnsupportedFlagValue { value: String, flag: String }, + + #[snafu(display("unsupported aggregate type: '{:?}'", agg))] + Aggregate { agg: String }, } pub type Result = std::result::Result; @@ -135,9 +142,37 @@ pub enum Format { #[derive(Debug, clap::Parser)] enum Command { ReadFilter, + ReadWindowAggregate(ReadWindowAggregate), TagValues(TagValues), } +#[derive(Debug, clap::Parser)] +struct ReadWindowAggregate { + #[clap(long, default_value = "", parse(try_from_str = humantime::parse_duration))] + window_every: Duration, + + #[clap(long, default_value = "", parse(try_from_str = humantime::parse_duration))] + offset: Duration, + + #[clap(long, default_value = "", parse(try_from_str = parse_aggregate))] + aggregate: Vec, +} + +// Attempts to parse the optional format. +fn parse_aggregate(aggs: &str) -> Result { + match aggs.to_lowercase().as_str() { + "none" => Ok(AggregateType::None), + "count" => Ok(AggregateType::Count), + "sum" => Ok(AggregateType::Sum), + "min" => Ok(AggregateType::Min), + "max" => Ok(AggregateType::Max), + "mean" => Ok(AggregateType::Mean), + "first" => Ok(AggregateType::First), + "last" => Ok(AggregateType::Last), + _ => AggregateSnafu { agg: aggs }.fail(), + } +} + #[derive(Debug, clap::Parser)] struct TagValues { // The tag key value to interrogate for tag values. @@ -169,6 +204,29 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { Format::Quiet => {} } } + Command::ReadWindowAggregate(rwa) => { + let result = client + .read_window_aggregate( + request::read_window_aggregate( + source, + config.start, + config.stop, + predicate, + rwa.window_every, + rwa.offset, + rwa.aggregate, + None, // TODO(edd): determine if window needs to be set + ) + .context(RequestSnafu)?, + ) + .await + .context(ServerSnafu)?; + + match config.format { + Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?, + Format::Quiet => {} + } + } Command::TagValues(tv) => { let result = client .tag_values(request::tag_values( diff --git a/influxdb_iox/src/commands/storage/request.rs b/influxdb_iox/src/commands/storage/request.rs index cd15867db8..648cb67541 100644 --- a/influxdb_iox/src/commands/storage/request.rs +++ b/influxdb_iox/src/commands/storage/request.rs @@ -8,15 +8,15 @@ use self::generated_types::*; use super::response::{ tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN, }; -use ::generated_types::google::protobuf::*; +use ::generated_types::{aggregate::AggregateType, google::protobuf::*}; #[derive(Debug, Snafu)] -pub enum Request { +pub enum Error { #[snafu(display("duration {:?} too large", d))] Duration { d: std::time::Duration }, } -pub type Result = std::result::Result; +pub type Result = std::result::Result; pub fn read_filter( org_bucket: Any, @@ -33,6 +33,7 @@ pub fn read_filter( } } +#[allow(clippy::too_many_arguments)] pub fn read_window_aggregate( org_bucket: Any, start: i64, @@ -40,9 +41,9 @@ pub fn read_window_aggregate( predicate: std::option::Option, every: std::time::Duration, offset: std::time::Duration, - aggregates: Vec, + aggregates: Vec, window: std::option::Option, -) -> Result { +) -> Result { let window_every = if every.as_nanos() > i64::MAX as u128 { return DurationSnafu { d: every }.fail(); } else { @@ -55,13 +56,19 @@ pub fn read_window_aggregate( offset.as_nanos() as i64 }; + // wrap in the PB message type for aggregates. + let aggregate = aggregates + .into_iter() + .map(|a| Aggregate { r#type: a as i32 }) + .collect::>(); + Ok(generated_types::ReadWindowAggregateRequest { predicate, read_source: Some(org_bucket), range: Some(TimestampRange { start, end: stop }), window_every, offset, - aggregate: aggregates, + aggregate, window, }) }