Merge pull request #3688 from influxdata/er/feat/storage_read_aggregate

feat: add read_window_aggregate storage sub-command
pull/24376/head
kodiakhq[bot] 2022-02-09 14:05:22 +00:00 committed by GitHub
commit 92f6413b3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 169 additions and 3 deletions

View File

@ -2,11 +2,12 @@ pub(crate) mod request;
pub(crate) mod response; pub(crate) mod response;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::time::Duration;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use tonic::Status; use tonic::Status;
use generated_types::Predicate; use generated_types::{aggregate::AggregateType, Predicate};
use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket}; use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket};
use influxrpc_parser::predicate; use influxrpc_parser::predicate;
use time; use time;
@ -25,11 +26,17 @@ pub enum ParseError {
#[snafu(display("server error: {:?}", source))] #[snafu(display("server error: {:?}", source))]
ServerError { source: Status }, ServerError { source: Status },
#[snafu(display("error building request: {:?}", source))]
Request { source: request::Error },
#[snafu(display("error building response: {:?}", source))] #[snafu(display("error building response: {:?}", source))]
ResponseError { source: response::Error }, ResponseError { source: response::Error },
#[snafu(display("value {:?} not supported for flag {:?}", value, flag))] #[snafu(display("value {:?} not supported for flag {:?}", value, flag))]
UnsupportedFlagValue { value: String, flag: String }, UnsupportedFlagValue { value: String, flag: String },
#[snafu(display("unsupported aggregate type: '{:?}'", agg))]
Aggregate { agg: String },
} }
pub type Result<T, E = ParseError> = std::result::Result<T, E>; pub type Result<T, E = ParseError> = std::result::Result<T, E>;
@ -135,9 +142,37 @@ pub enum Format {
#[derive(Debug, clap::Parser)] #[derive(Debug, clap::Parser)]
enum Command { enum Command {
ReadFilter, ReadFilter,
ReadWindowAggregate(ReadWindowAggregate),
TagValues(TagValues), TagValues(TagValues),
} }
#[derive(Debug, clap::Parser)]
struct ReadWindowAggregate {
#[clap(long, default_value = "", parse(try_from_str = humantime::parse_duration))]
window_every: Duration,
#[clap(long, default_value = "", parse(try_from_str = humantime::parse_duration))]
offset: Duration,
#[clap(long, default_value = "", parse(try_from_str = parse_aggregate))]
aggregate: Vec<AggregateType>,
}
// Attempts to parse the optional format.
fn parse_aggregate(aggs: &str) -> Result<AggregateType, ParseError> {
match aggs.to_lowercase().as_str() {
"none" => Ok(AggregateType::None),
"count" => Ok(AggregateType::Count),
"sum" => Ok(AggregateType::Sum),
"min" => Ok(AggregateType::Min),
"max" => Ok(AggregateType::Max),
"mean" => Ok(AggregateType::Mean),
"first" => Ok(AggregateType::First),
"last" => Ok(AggregateType::Last),
_ => AggregateSnafu { agg: aggs }.fail(),
}
}
#[derive(Debug, clap::Parser)] #[derive(Debug, clap::Parser)]
struct TagValues { struct TagValues {
// The tag key value to interrogate for tag values. // The tag key value to interrogate for tag values.
@ -169,6 +204,29 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
Format::Quiet => {} Format::Quiet => {}
} }
} }
Command::ReadWindowAggregate(rwa) => {
let result = client
.read_window_aggregate(
request::read_window_aggregate(
source,
config.start,
config.stop,
predicate,
rwa.window_every,
rwa.offset,
rwa.aggregate,
None, // TODO(edd): determine if window needs to be set
)
.context(RequestSnafu)?,
)
.await
.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Quiet => {}
}
}
Command::TagValues(tv) => { Command::TagValues(tv) => {
let result = client let result = client
.tag_values(request::tag_values( .tag_values(request::tag_values(

View File

@ -2,11 +2,21 @@ pub mod generated_types {
pub use generated_types::influxdata::platform::storage::*; pub use generated_types::influxdata::platform::storage::*;
} }
use snafu::Snafu;
use self::generated_types::*; use self::generated_types::*;
use super::response::{ use super::response::{
tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN, tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN,
}; };
use ::generated_types::google::protobuf::*; use ::generated_types::{aggregate::AggregateType, google::protobuf::*};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("duration {:?} too large", d))]
Duration { d: std::time::Duration },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub fn read_filter( pub fn read_filter(
org_bucket: Any, org_bucket: Any,
@ -23,6 +33,46 @@ pub fn read_filter(
} }
} }
#[allow(clippy::too_many_arguments)]
pub fn read_window_aggregate(
org_bucket: Any,
start: i64,
stop: i64,
predicate: std::option::Option<Predicate>,
every: std::time::Duration,
offset: std::time::Duration,
aggregates: Vec<AggregateType>,
window: std::option::Option<Window>,
) -> Result<ReadWindowAggregateRequest, Error> {
let window_every = if every.as_nanos() > i64::MAX as u128 {
return DurationSnafu { d: every }.fail();
} else {
every.as_nanos() as i64
};
let offset = if offset.as_nanos() > i64::MAX as u128 {
return DurationSnafu { d: offset }.fail();
} else {
offset.as_nanos() as i64
};
// wrap in the PB message type for aggregates.
let aggregate = aggregates
.into_iter()
.map(|a| Aggregate { r#type: a as i32 })
.collect::<Vec<_>>();
Ok(generated_types::ReadWindowAggregateRequest {
predicate,
read_source: Some(org_bucket),
range: Some(TimestampRange { start, end: stop }),
window_every,
offset,
aggregate,
window,
})
}
pub fn tag_values( pub fn tag_values(
org_bucket: Any, org_bucket: Any,
start: i64, start: i64,
@ -46,10 +96,68 @@ pub fn tag_values(
} }
} }
#[cfg(test)]
mod test_super {
use std::num::NonZeroU64;
use influxdb_storage_client::{Client, OrgAndBucket};
use super::*;
#[test]
fn test_read_window_aggregate_durations() {
let org_bucket = Client::read_source(
&OrgAndBucket::new(
NonZeroU64::new(123_u64).unwrap(),
NonZeroU64::new(456_u64).unwrap(),
),
0,
);
let got = read_window_aggregate(
org_bucket.clone(),
1,
10,
None,
std::time::Duration::from_millis(3),
std::time::Duration::from_millis(2),
vec![],
None,
)
.unwrap();
assert_eq!(got.window_every, 3_000_000);
assert_eq!(got.offset, 2_000_000);
let got = read_window_aggregate(
org_bucket.clone(),
1,
10,
None,
std::time::Duration::from_secs(u64::MAX),
std::time::Duration::from_millis(2),
vec![],
None,
);
assert!(got.is_err());
let got = read_window_aggregate(
org_bucket,
1,
10,
None,
std::time::Duration::from_secs(3),
std::time::Duration::from_secs(u64::MAX),
vec![],
None,
);
assert!(got.is_err());
}
}
// TODO Add the following helpers for building requests: // TODO Add the following helpers for building requests:
// //
// * read_group // * read_group
// * read_window_aggregate
// * tag_keys // * tag_keys
// * tag_values_with_measurement_and_key // * tag_values_with_measurement_and_key
// * measurement_names // * measurement_names