feat: add "read group" support to storage CLI (#5601)

* fix: do not panic if measurement name is not the first tag

* feat: add "read group" support to storage CLI

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-09-12 08:04:09 +00:00 committed by GitHub
parent 39a7b661ec
commit 15b3705f9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 4 deletions

View File

@ -7,7 +7,9 @@ use std::time::Duration;
use snafu::{ResultExt, Snafu};
use tonic::Status;
use generated_types::{aggregate::AggregateType, Predicate};
use generated_types::{
aggregate::AggregateType, influxdata::platform::storage::read_group_request::Group, Predicate,
};
use influxdb_storage_client::{connection::Connection, Client, OrgAndBucket};
use influxrpc_parser::predicate;
use iox_time;
@ -37,6 +39,9 @@ pub enum ParseError {
#[snafu(display("unsupported aggregate type: '{:?}'", agg))]
Aggregate { agg: String },
#[snafu(display("unsupported group: '{:?}'", group))]
Group { group: String },
}
pub type Result<T, E = ParseError> = std::result::Result<T, E>;
@ -165,6 +170,7 @@ pub enum Format {
enum Command {
MeasurementFields(MeasurementFields),
ReadFilter,
ReadGroup(ReadGroup),
ReadWindowAggregate(ReadWindowAggregate),
TagValues(TagValues),
}
@ -175,6 +181,24 @@ struct MeasurementFields {
measurement: String,
}
#[derive(Debug, clap::Parser)]
struct ReadGroup {
#[clap(
long,
value_parser = parse_aggregate,
)]
aggregate: Option<AggregateType>,
#[clap(
long,
value_parser = parse_group,
)]
group: Group,
#[clap(long, action)]
group_keys: Vec<String>,
}
#[derive(Debug, clap::Parser)]
struct ReadWindowAggregate {
#[clap(
@ -211,6 +235,14 @@ fn parse_aggregate(aggs: &str) -> Result<AggregateType, ParseError> {
}
}
fn parse_group(g: &str) -> Result<Group, ParseError> {
match g.to_lowercase().as_str() {
"0" | "none" => Ok(Group::None),
"2" | "by" => Ok(Group::By),
_ => GroupSnafu { group: g }.fail(),
}
}
#[derive(Debug, clap::Parser)]
struct TagValues {
/// The tag key value to interrogate for tag values.
@ -259,6 +291,24 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
Format::Quiet => {}
}
}
Command::ReadGroup(rg) => {
let result = client
.read_group(request::read_group(
source,
config.start,
config.stop,
predicate,
rg.aggregate,
rg.group,
rg.group_keys,
))
.await
.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Quiet => {}
}
}
Command::ReadWindowAggregate(rwa) => {
let result = client
.read_window_aggregate(

View File

@ -1,5 +1,5 @@
pub mod generated_types {
pub use generated_types::influxdata::platform::storage::*;
pub use generated_types::influxdata::platform::storage::{read_group_request::Group, *};
}
use snafu::Snafu;
@ -48,6 +48,25 @@ pub fn read_filter(
}
}
pub fn read_group(
org_bucket: Any,
start: i64,
stop: i64,
predicate: std::option::Option<Predicate>,
aggregate: std::option::Option<AggregateType>,
group: Group,
group_keys: Vec<String>,
) -> ReadGroupRequest {
generated_types::ReadGroupRequest {
predicate,
read_source: Some(org_bucket),
range: Some(TimestampRange { start, end: stop }),
aggregate: aggregate.map(|a| Aggregate { r#type: a as i32 }),
group: group as i32,
group_keys,
}
}
#[allow(clippy::too_many_arguments)]
pub fn read_window_aggregate(
org_bucket: Any,

View File

@ -501,9 +501,14 @@ fn determine_tag_columns(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
if let Data::Series(sf) = frame {
assert!(!sf.tags.is_empty(), "expected _measurement and _field tags");
assert!(tag_key_is_measurement(&sf.tags[0].key));
// PERF: avoid clone of value
let measurement_name = sf.tags[0].value.clone();
let measurement_name = sf
.tags
.iter()
.find(|t| tag_key_is_measurement(&t.key))
.expect("measurement name not found")
.value
.clone();
let table = schema.entry(measurement_name).or_default();
for Tag { key, value } in sf.tags.iter().skip(1) {