feat: Support grouping by _field and _measurement (#2874)

* feat: Support grouping by _field and _measurement

* fix: clippy

* fix: doclink

* refactor: rename SeriesOrGroup --> Either

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-10-18 11:32:24 -04:00 committed by GitHub
parent 8a4981df04
commit f5a84122e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 652 additions and 748 deletions

View File

@ -2,7 +2,7 @@
//! DataFusion
use async_trait::async_trait;
use std::{fmt, sync::Arc};
use std::{convert::TryInto, fmt, sync::Arc};
use arrow::record_batch::RecordBatch;
@ -26,7 +26,10 @@ use crate::exec::{
fieldlist::{FieldList, IntoFieldList},
query_tracing::TracedStream,
schema_pivot::{SchemaPivotExec, SchemaPivotNode},
seriesset::{converter::SeriesSetConverter, SeriesSetItem},
seriesset::{
converter::{GroupGenerator, SeriesSetConverter},
series::Series,
},
split::StreamSplitExec,
stringset::{IntoStringSet, StringSetRef},
};
@ -40,7 +43,7 @@ use crate::plan::{
// Reuse DataFusion error and Result types for this module
pub use datafusion::error::{DataFusionError as Error, Result};
use super::{split::StreamSplitNode, task::DedicatedExecutor};
use super::{seriesset::series::Either, split::StreamSplitNode, task::DedicatedExecutor};
// The default catalog name - this impacts what SQL queries use if not specified
pub const DEFAULT_CATALOG: &str = "public";
@ -338,21 +341,23 @@ impl IOxExecutionContext {
}
/// Executes the SeriesSetPlans on the query executor, in
/// parallel, combining the results into the returned collection
/// of items.
/// parallel, producing series or groups
///
/// The SeriesSets are guaranteed to come back ordered by table_name.
pub async fn to_series_set(
/// TODO make this streaming rather than buffering the results
pub async fn to_series_and_groups(
&self,
series_set_plans: SeriesSetPlans,
) -> Result<Vec<SeriesSetItem>> {
let SeriesSetPlans { mut plans } = series_set_plans;
) -> Result<Vec<Either>> {
let SeriesSetPlans {
mut plans,
group_columns,
} = series_set_plans;
if plans.is_empty() {
return Ok(vec![]);
}
// sort plans by table name
// sort plans by table (measurement) name
plans.sort_by(|a, b| a.table_name.cmp(&b.table_name));
// Run the plans in parallel
@ -366,7 +371,6 @@ impl IOxExecutionContext {
plan,
tag_columns,
field_columns,
num_prefix_tag_group_columns,
} = plan;
let tag_columns = Arc::new(tag_columns);
@ -376,13 +380,7 @@ impl IOxExecutionContext {
let it = ctx.execute_stream(physical_plan).await?;
SeriesSetConverter::default()
.convert(
table_name,
tag_columns,
field_columns,
num_prefix_tag_group_columns,
it,
)
.convert(table_name, tag_columns, field_columns, it)
.await
.map_err(|e| {
Error::Execution(format!(
@ -396,14 +394,31 @@ impl IOxExecutionContext {
// join_all ensures that the results are consumed in the same order they
// were spawned maintaining the guarantee to return results ordered
// by the plan sort order.
let handles = futures::future::try_join_all(handles).await?;
let mut results = vec![];
for handle in handles {
results.extend(handle.into_iter());
// by table name and plan sort order.
let all_series_sets = futures::future::try_join_all(handles).await?;
// convert to series sets
let mut data: Vec<Series> = vec![];
for series_sets in all_series_sets {
for series_set in series_sets {
let series: Vec<Series> = series_set
.try_into()
.map_err(|e| Error::Execution(format!("Error converting to series: {}", e)))?;
data.extend(series);
}
}
Ok(results)
// If we have group columns, sort the results, and create the
// appropriate groups
if let Some(group_columns) = group_columns {
let grouper = GroupGenerator::new(group_columns);
grouper
.group(data)
.map_err(|e| Error::Execution(format!("Error forming groups: {}", e)))
} else {
let data = data.into_iter().map(|series| series.into()).collect();
Ok(data)
}
}
/// Executes `plan` and return the resulting FieldList on the query executor

View File

@ -87,29 +87,3 @@ pub struct SeriesSet {
// The underlying record batch data
pub batch: RecordBatch,
}
/// Describes a group of series "group of series" series. Namely,
/// several logical timeseries that share the same timestamps and
/// name=value tag keys, grouped by some subset of the tag keys
///
/// TODO: this may also support computing an aggregation per group,
/// pending on what is required for the gRPC layer.
#[derive(Debug)]
pub struct GroupDescription {
/// the names of all tags (not just the tags used for grouping)
pub all_tags: Vec<Arc<str>>,
/// the values of the group tags that defined the group.
/// For example,
///
/// If there were tags `t0`, `t1`, and `t2`, and the query had
/// group_keys of `[t1, t2]` then this list would have the values
/// of the t1 and t2 columns
pub gby_vals: Vec<Arc<str>>,
}
#[derive(Debug)]
pub enum SeriesSetItem {
GroupStart(GroupDescription),
Data(SeriesSet),
}

View File

@ -1,3 +1,7 @@
//! This module contains code that "unpivots" annotated
//! [`RecordBatch`]es to [`Series`] and [`Group`]s for output by the
//! storage gRPC interface
use arrow::{
self,
array::{Array, DictionaryArray, StringArray},
@ -7,16 +11,22 @@ use arrow::{
use datafusion::physical_plan::SendableRecordBatchStream;
use observability_deps::tracing::trace;
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use tokio_stream::StreamExt;
use croaring::bitmap::Bitmap;
use crate::exec::field::{self, FieldColumns, FieldIndexes};
use crate::exec::{
field::{self, FieldColumns, FieldIndexes},
seriesset::series::Group,
};
use super::{GroupDescription, SeriesSet, SeriesSetItem};
use super::{
series::{Either, Series},
SeriesSet,
};
#[derive(Debug, Snafu)]
pub enum Error {
@ -34,14 +44,17 @@ pub enum Error {
#[snafu(display("Internal field error while converting series set: {}", source))]
InternalField { source: field::Error },
#[snafu(display("Internal error finding grouping colum: {}", column_name))]
FindingGroupColumn { column_name: String },
#[snafu(display("Sending series set results during conversion: {:?}", source))]
SendingDuringConversion {
source: Box<SendError<Result<SeriesSetItem>>>,
source: Box<SendError<Result<SeriesSet>>>,
},
#[snafu(display("Sending grouped series set results during conversion: {:?}", source))]
SendingDuringGroupedConversion {
source: Box<SendError<Result<SeriesSetItem>>>,
source: Box<SendError<Result<SeriesSet>>>,
},
}
@ -64,19 +77,14 @@ impl SeriesSetConverter {
///
/// field_columns: The names of the columns which are "fields"
///
/// num_prefix_tag_group_columns: (optional) The size of the prefix
/// of `tag_columns` that defines each group
///
/// it: record batch iterator that produces data in the desired order
pub async fn convert(
&mut self,
table_name: Arc<str>,
tag_columns: Arc<Vec<Arc<str>>>,
field_columns: FieldColumns,
num_prefix_tag_group_columns: Option<usize>,
mut it: SendableRecordBatchStream,
) -> Result<Vec<SeriesSetItem>, Error> {
let mut group_generator = GroupGenerator::new(num_prefix_tag_group_columns);
) -> Result<Vec<SeriesSet>, Error> {
let mut results = vec![];
// for now, only handle a single record batch
@ -127,35 +135,26 @@ impl SeriesSetConverter {
// call await during the loop)
// emit each series
let series_sets = intersections
.iter()
.map(|end_row| {
let series_set = SeriesSet {
table_name: Arc::clone(&table_name),
tags: Self::get_tag_keys(
&batch,
start_row as usize,
&tag_columns,
&tag_indexes,
),
field_indexes: field_indexes.clone(),
start_row: start_row as usize,
num_rows: (end_row - start_row) as usize,
batch: batch.clone(),
};
let series_sets = intersections.iter().map(|end_row| {
let series_set = SeriesSet {
table_name: Arc::clone(&table_name),
tags: Self::get_tag_keys(
&batch,
start_row as usize,
&tag_columns,
&tag_indexes,
),
field_indexes: field_indexes.clone(),
start_row: start_row as usize,
num_rows: (end_row - start_row) as usize,
batch: batch.clone(),
};
start_row = end_row;
series_set
})
.collect::<Vec<_>>();
start_row = end_row;
series_set
});
results.reserve(series_sets.len());
for series_set in series_sets {
if let Some(group_desc) = group_generator.next_series(&series_set) {
results.push(SeriesSetItem::GroupStart(group_desc));
}
results.push(SeriesSetItem::Data(series_set));
}
results.extend(series_sets);
}
Ok(results)
}
@ -237,7 +236,7 @@ impl SeriesSetConverter {
}
}
_ => unimplemented!(
"Series transition calculations not supported for type {:?} in column {:?}",
"Series transition calculations not supported for tag type {:?} in column {:?}",
col.data_type(),
batch.schema().fields()[col_idx]
),
@ -316,56 +315,157 @@ impl SeriesSetConverter {
}
}
/// Encapsulates the logic to generate new GroupFrames
struct GroupGenerator {
num_prefix_tag_group_columns: Option<usize>,
// vec of num_prefix_tag_group_columns, if any
last_group_tags: Option<Vec<(Arc<str>, Arc<str>)>>,
/// Reorders and groups a sequence of Series is grouped correctly
#[derive(Debug)]
pub struct GroupGenerator {
group_columns: Vec<Arc<str>>,
}
impl GroupGenerator {
fn new(num_prefix_tag_group_columns: Option<usize>) -> Self {
Self {
num_prefix_tag_group_columns,
last_group_tags: None,
}
pub fn new(group_columns: Vec<Arc<str>>) -> Self {
Self { group_columns }
}
/// Process the next SeriesSet in the sequence. Return
/// `Some(GroupDescription{..})` if this marks the start of a new
/// group. Return None otherwise
fn next_series(&mut self, series_set: &SeriesSet) -> Option<GroupDescription> {
if let Some(num_prefix_tag_group_columns) = self.num_prefix_tag_group_columns {
// figure out if we are in a new group
let need_group_start = match &self.last_group_tags {
/// groups the set of `series` into SeriesOrGroups
pub fn group(&self, series: Vec<Series>) -> Result<Vec<Either>> {
let mut series = series
.into_iter()
.map(|series| SortableSeries::try_new(series, &self.group_columns))
.collect::<Result<Vec<_>>>()?;
// Potential optimization is to skip this sort if we are
// grouping by a prefix of the tags for a single measurement
//
// Another potential optimization is if we are only grouping on
// tag columns is to change the the actual plan output using
// DataFusion to sort the data in the required group (likely
// only possible with a single table)
// Resort the data according to group key values
series.sort();
// now find the groups boundaries and emit the output
let mut last_partition_key_vals: Option<Vec<Arc<str>>> = None;
// Note that if there are no group columns, we still need to
// sort by the tag keys, so that the output is sorted by tag
// keys, and thus we can't bail out early here
//
// Interesting, it isn't clear flux requires this ordering, but
// it is what TSM does so we preserve the behavior
let mut output = vec![];
// TODO make this more functional (issue is that sometimes the
// loop inserts one item into `output` and sometimes it inserts 2)
for SortableSeries {
series,
tag_vals,
num_partition_keys,
} in series.into_iter()
{
// keep only the values that form the group
let mut partition_key_vals = tag_vals;
partition_key_vals.truncate(num_partition_keys);
// figure out if we are in a new group (partition key values have changed)
let need_group_start = match &last_partition_key_vals {
None => true,
Some(last_group_tags) => {
last_group_tags.as_slice() != &series_set.tags[0..num_prefix_tag_group_columns]
}
Some(last_partition_key_vals) => &partition_key_vals != last_partition_key_vals,
};
if need_group_start {
let group_tags = series_set.tags[0..num_prefix_tag_group_columns].to_vec();
last_partition_key_vals = Some(partition_key_vals.clone());
let all_tags = series_set
.tags
.iter()
.map(|(tag, _value)| Arc::clone(tag))
.collect::<Vec<_>>();
let tag_keys = series.tags.iter().map(|tag| Arc::clone(&tag.key)).collect();
let gby_vals = group_tags
.iter()
.map(|(_tag, value)| Arc::clone(value))
.collect::<Vec<_>>();
let group = Group {
tag_keys,
partition_key_vals,
};
let group_desc = GroupDescription { all_tags, gby_vals };
self.last_group_tags = Some(group_tags);
return Some(group_desc);
output.push(group.into());
}
output.push(series.into())
}
None
Ok(output)
}
}
#[derive(Debug)]
/// Wrapper around a Series that has the values of the group_by columns extracted
struct SortableSeries {
series: Series,
/// All the tag values, reordered so that the group_columns are first
tag_vals: Vec<Arc<str>>,
/// How many of the first N tag_values are used for the partition key
num_partition_keys: usize,
}
impl PartialEq for SortableSeries {
fn eq(&self, other: &Self) -> bool {
self.tag_vals.eq(&other.tag_vals)
}
}
impl Eq for SortableSeries {}
impl PartialOrd for SortableSeries {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.tag_vals.partial_cmp(&other.tag_vals)
}
}
impl Ord for SortableSeries {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.tag_vals.cmp(&other.tag_vals)
}
}
impl SortableSeries {
fn try_new(series: Series, group_columns: &[Arc<str>]) -> Result<Self> {
// Compute the order of new tag values
let tags = &series.tags;
// tag_used_set[i] is true if we have used the value in tag_columns[i]
let mut tag_used_set = vec![false; tags.len()];
// put the group columns first
//
// Note that this is an O(N^2) algorithm. We are assuming the
// number of tag columns is reasonably small
let mut tag_vals: Vec<_> = group_columns
.iter()
.map(|col| {
tags.iter()
.enumerate()
// Searching for columns linearly is likely to be pretty slow....
.find(|(_i, tag)| tag.key == *col)
.map(|(i, tag)| {
assert!(!tag_used_set[i], "repeated group column");
tag_used_set[i] = true;
Arc::clone(&tag.value)
})
.context(FindingGroupColumn {
column_name: col.as_ref(),
})
})
.collect::<Result<Vec<_>>>()?;
// Fill in all remaining tags
tag_vals.extend(tags.iter().enumerate().filter_map(|(i, tag)| {
let use_tag = !tag_used_set[i];
use_tag.then(|| Arc::clone(&tag.value))
}));
Ok(Self {
series,
tag_vals,
num_partition_keys: group_columns.len(),
})
}
}
@ -692,147 +792,6 @@ mod tests {
assert_eq!(series_set2.num_rows, 1);
}
#[tokio::test]
async fn test_convert_groups() {
let schema = Arc::new(Schema::new(vec![
Field::new("tag_a", DataType::Utf8, true),
Field::new("tag_b", DataType::Utf8, true),
Field::new("float_field", DataType::Float64, true),
Field::new("time", DataType::Int64, false),
]));
let input = parse_to_iterator(
schema,
"one,ten,10.0,1000\n\
one,eleven,10.1,2000\n\
two,eleven,10.3,3000\n",
);
let table_name = "foo";
let tag_columns = ["tag_a", "tag_b"];
let num_prefix_tag_group_columns = 1;
let field_columns = ["float_field"];
let results = convert_groups(
table_name,
&tag_columns,
num_prefix_tag_group_columns,
&field_columns,
input,
)
.await;
// expect the output to be
// Group1 (tags: tag_a, tag_b, vals = one)
// Series1 (tag_a = one, tag_b = ten)
// Series2 (tag_a = one, tag_b = ten)
// Group2 (tags: tag_a, tag_b, vals = two)
// Series3 (tag_a = two, tag_b = eleven)
assert_eq!(results.len(), 5, "results were\n{:#?}", results); // 3 series, two groups (one and two)
let group_1 = extract_group(&results[0]);
let series_set1 = extract_series_set(&results[1]);
let series_set2 = extract_series_set(&results[2]);
let group_2 = extract_group(&results[3]);
let series_set3 = extract_series_set(&results[4]);
assert_eq!(group_1.all_tags, str_vec_to_arc_vec(&["tag_a", "tag_b"]));
assert_eq!(group_1.gby_vals, str_vec_to_arc_vec(&["one"]));
assert_eq!(series_set1.table_name.as_ref(), "foo");
assert_eq!(
series_set1.tags,
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "ten")])
);
assert_eq!(series_set1.start_row, 0);
assert_eq!(series_set1.num_rows, 1);
assert_eq!(series_set2.table_name.as_ref(), "foo");
assert_eq!(
series_set2.tags,
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "eleven")])
);
assert_eq!(series_set2.start_row, 1);
assert_eq!(series_set2.num_rows, 1);
assert_eq!(group_2.all_tags, str_vec_to_arc_vec(&["tag_a", "tag_b"]));
assert_eq!(group_2.gby_vals, str_vec_to_arc_vec(&["two"]));
assert_eq!(series_set3.table_name.as_ref(), "foo");
assert_eq!(
series_set3.tags,
str_pair_vec_to_vec(&[("tag_a", "two"), ("tag_b", "eleven")])
);
assert_eq!(series_set3.start_row, 2);
assert_eq!(series_set3.num_rows, 1);
}
// test with no group tags specified
#[tokio::test]
async fn test_convert_groups_no_tags() {
let schema = Arc::new(Schema::new(vec![
Field::new("tag_a", DataType::Utf8, true),
Field::new("tag_b", DataType::Utf8, true),
Field::new("float_field", DataType::Float64, true),
Field::new("time", DataType::Int64, false),
]));
let input = parse_to_iterator(
schema,
"one,ten,10.0,1000\n\
one,eleven,10.1,2000\n",
);
let table_name = "foo";
let tag_columns = ["tag_a", "tag_b"];
let field_columns = ["float_field"];
let results = convert_groups(table_name, &tag_columns, 0, &field_columns, input).await;
// expect the output to be be (no vals, because no group columns are specified)
// Group1 (tags: tag_a, tag_b, vals = [])
// Series1 (tag_a = one, tag_b = ten)
// Series2 (tag_a = one, tag_b = ten)
assert_eq!(results.len(), 3, "results were\n{:#?}", results); // 3 series, two groups (one and two)
let group_1 = extract_group(&results[0]);
let series_set1 = extract_series_set(&results[1]);
let series_set2 = extract_series_set(&results[2]);
assert_eq!(group_1.all_tags, str_vec_to_arc_vec(&["tag_a", "tag_b"]));
assert_eq!(group_1.gby_vals, str_vec_to_arc_vec(&[]));
assert_eq!(series_set1.table_name.as_ref(), "foo");
assert_eq!(
series_set1.tags,
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "ten")])
);
assert_eq!(series_set1.start_row, 0);
assert_eq!(series_set1.num_rows, 1);
assert_eq!(series_set2.table_name.as_ref(), "foo");
assert_eq!(
series_set2.tags,
str_pair_vec_to_vec(&[("tag_a", "one"), ("tag_b", "eleven")])
);
assert_eq!(series_set2.start_row, 1);
assert_eq!(series_set2.num_rows, 1);
}
fn extract_group(item: &SeriesSetItem) -> &GroupDescription {
match item {
SeriesSetItem::GroupStart(group) => group,
_ => panic!("Expected group, but got: {:?}", item),
}
}
fn extract_series_set(item: &SeriesSetItem) -> &SeriesSet {
match item {
SeriesSetItem::Data(series_set) => series_set,
_ => panic!("Expected series set, but got: {:?}", item),
}
}
/// Test helper: run conversion and return a Vec
pub async fn convert<'a>(
table_name: &'a str,
@ -847,40 +806,7 @@ mod tests {
let field_columns = FieldColumns::from(field_columns);
converter
.convert(table_name, tag_columns, field_columns, None, it)
.await
.expect("Conversion happened without error").into_iter().map(|item|{
if let SeriesSetItem::Data(series_set) = item {
series_set
}
else {
panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item)
}
}).collect::<Vec<_>>()
}
/// Test helper: run conversion to groups and return a Vec
pub async fn convert_groups<'a>(
table_name: &'a str,
tag_columns: &'a [&'a str],
num_prefix_tag_group_columns: usize,
field_columns: &'a [&'a str],
it: SendableRecordBatchStream,
) -> Vec<SeriesSetItem> {
let mut converter = SeriesSetConverter::default();
let table_name = Arc::from(table_name);
let tag_columns = Arc::new(str_vec_to_arc_vec(tag_columns));
let field_columns = FieldColumns::from(field_columns);
converter
.convert(
table_name,
tag_columns,
field_columns,
Some(num_prefix_tag_group_columns),
it,
)
.convert(table_name, tag_columns, field_columns, it)
.await
.expect("Conversion happened without error")
}

View File

@ -12,10 +12,7 @@ use arrow::{
datatypes::DataType as ArrowDataType,
};
use crate::exec::{
field::FieldIndex,
seriesset::{GroupDescription, SeriesSet},
};
use crate::exec::{field::FieldIndex, seriesset::SeriesSet};
use snafu::Snafu;
#[derive(Debug, Snafu)]
@ -242,6 +239,10 @@ impl SeriesSet {
fn create_frame_tags(&self, field_name: &str) -> Vec<Tag> {
// Special case "measurement" name which is modeled as a tag of
// "_measurement" and "field" which is modeled as a tag of "_field"
//
// Note by placing these tags at the front of the keys, it
// means the output will be sorted first by _field and then
// _measurement even when there are no groups requested
let mut converted_tags = vec![
Tag {
key: "_field".into(),
@ -266,39 +267,19 @@ impl SeriesSet {
/// Represents a group of `Series`
#[derive(Debug, Default)]
pub struct Group {
/// Contains *ALL* tag keys
/// Contains *ALL* tag keys (not just those used for grouping)
pub tag_keys: Vec<Arc<str>>,
/// Contains the values that define the group (may be values from
/// fields other than tags).
///
/// the values of the group tags that defined the group.
/// For example,
///
/// If there were tags `t0`, `t1`, and `t2`, and the query had
/// group_keys of `[t1, t2]` then this list would have the values
/// of the t1 and t2 columns
pub partition_key_vals: Vec<Arc<str>>,
// TODO: add Series that are part of this group, in order by
// tag_keys
//pub series: Vec<Series>
}
impl From<GroupDescription> for Group {
fn from(val: GroupDescription) -> Self {
// split key=value pairs into two separate vectors
let GroupDescription { all_tags, gby_vals } = val;
// Flux expects there to be `_field` and `_measurement` as the
// first two "tags". Note this means the lengths of tag_keys and
// partition_key_values is different.
//
// See https://github.com/influxdata/influxdb_iox/issues/2690 for gory details
let tag_keys = vec!["_field".into(), "_measurement".into()]
.into_iter()
.chain(all_tags)
.collect::<Vec<_>>();
let partition_key_vals = gby_vals.into_iter().collect::<Vec<_>>();
Self {
tag_keys,
partition_key_vals,
}
}
}
impl fmt::Display for Group {
@ -311,6 +292,33 @@ impl fmt::Display for Group {
}
}
#[derive(Debug)]
pub enum Either {
Series(Series),
Group(Group),
}
impl From<Series> for Either {
fn from(value: Series) -> Self {
Self::Series(value)
}
}
impl From<Group> for Either {
fn from(value: Group) -> Self {
Self::Group(value)
}
}
impl fmt::Display for Either {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Series(series) => series.fmt(f),
Self::Group(group) => group.fmt(f),
}
}
}
fn fmt_strings(f: &mut fmt::Formatter<'_>, strings: &[Arc<str>]) -> fmt::Result {
let mut first = true;
strings.iter().try_for_each(|item| {
@ -592,19 +600,6 @@ mod tests {
);
}
#[test]
fn test_group_group_conversion() {
let group_description = GroupDescription {
all_tags: vec![Arc::from("tag1"), Arc::from("tag2")],
gby_vals: vec![Arc::from("val1"), Arc::from("val2")],
};
let group: Group = group_description.try_into().unwrap();
let expected =
"Group tag_keys: _field, _measurement, tag1, tag2 partition_key_vals: val1, val2";
assert_eq!(group.to_string(), expected);
}
fn make_record_batch() -> RecordBatch {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz", "foo"]));
let int_array: ArrayRef = Arc::new(Int64Array::from(vec![1, 2, 3, 4]));

View File

@ -603,13 +603,11 @@ impl InfluxRpcPlanner {
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let prefix_columns: Option<&[&str]> = None;
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let ss_plan =
self.read_filter_plan(table_name, schema, prefix_columns, &mut normalizer, chunks)?;
let ss_plan = self.read_filter_plan(table_name, schema, &mut normalizer, chunks)?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
@ -617,7 +615,7 @@ impl InfluxRpcPlanner {
}
info!(" = End building plan for read_filter");
Ok(ss_plans.into())
Ok(SeriesSetPlans::new(ss_plans))
}
/// Creates one or more GroupedSeriesSet plans that produces an
@ -658,7 +656,6 @@ impl InfluxRpcPlanner {
// group tables by chunk, pruning if possible
let chunks = database.chunks(normalizer.unnormalized());
let table_chunks = self.group_chunks_by_table(&mut normalizer, chunks)?;
let num_prefix_tag_group_columns = group_columns.len();
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
@ -667,32 +664,28 @@ impl InfluxRpcPlanner {
table_name: &table_name,
})?;
let ss_plan = match agg {
Aggregate::None => self.read_filter_plan(
table_name,
Arc::clone(&schema),
Some(group_columns),
&mut normalizer,
chunks,
)?,
_ => self.read_group_plan(
table_name,
schema,
&mut normalizer,
agg,
group_columns,
chunks,
)?,
Aggregate::None => {
self.read_filter_plan(table_name, Arc::clone(&schema), &mut normalizer, chunks)?
}
_ => self.read_group_plan(table_name, schema, &mut normalizer, agg, chunks)?,
};
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
let grouped_plan = ss_plan.grouped(num_prefix_tag_group_columns);
ss_plans.push(grouped_plan);
ss_plans.push(ss_plan);
}
}
info!(" = End building plan for read_group");
Ok(ss_plans.into())
let plan = SeriesSetPlans::new(ss_plans);
// Note always group (which will resort the frames)
// by tag, even if there are 0 columns
let group_columns = group_columns
.iter()
.map(|s| Arc::from(s.as_ref()))
.collect();
Ok(plan.grouped_by(group_columns))
}
/// Creates a GroupedSeriesSet plan that produces an output table with rows
@ -745,7 +738,7 @@ impl InfluxRpcPlanner {
}
info!(" = End building plan for read_window_aggregate");
Ok(ss_plans.into())
Ok(SeriesSetPlans::new(ss_plans))
}
/// Creates a map of table_name --> Chunks that have that table that *may* pass the predicate
@ -901,8 +894,6 @@ impl InfluxRpcPlanner {
/// Creates a plan for computing series sets for a given table,
/// returning None if the predicate rules out matching any rows in
/// the table
///
/// prefix_columns, if any, are the prefix of the ordering.
//
/// The created plan looks like:
///
@ -914,7 +905,6 @@ impl InfluxRpcPlanner {
&self,
table_name: impl AsRef<str>,
schema: Arc<Schema>,
prefix_columns: Option<&[impl AsRef<str>]>,
normalizer: &mut PredicateNormalizer,
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
@ -937,17 +927,7 @@ impl InfluxRpcPlanner {
.tags_iter()
.chain(schema.time_iter())
.map(|f| f.name() as &str)
.collect();
// Reorder, if requested
let tags_and_timestamp: Vec<_> = match prefix_columns {
Some(prefix_columns) => reorder_prefix(prefix_columns, tags_and_timestamp)?,
None => tags_and_timestamp,
};
// Convert to SortExprs to pass to the plan builder
let tags_and_timestamp: Vec<_> = tags_and_timestamp
.into_iter()
// Convert to SortExprs to pass to the plan builder
.map(|n| n.as_sort_expr())
.collect();
@ -992,9 +972,8 @@ impl InfluxRpcPlanner {
}
/// Creates a GroupedSeriesSet plan that produces an output table
/// with rows grouped by an aggregate function. Note that we still
/// group by all tags (so group within series) and the
/// group_columns define the order of the result
/// with one row per tagset and the values aggregated using a
/// specific function.
///
/// Equivalent to this SQL query for 'aggregates': sum, count, mean
/// SELECT
@ -1006,7 +985,7 @@ impl InfluxRpcPlanner {
/// GROUP BY
/// tags,
/// ORDER BY
/// group_key1, group_key2, remaining tags
/// tags
///
/// Note the columns are the same but in a different order
/// for GROUP BY / ORDER BY
@ -1024,7 +1003,7 @@ impl InfluxRpcPlanner {
/// GROUP BY
/// tags
/// ORDER BY
/// group_key1, group_key2, remaining tags
/// tags
///
/// The created plan looks like:
///
@ -1038,7 +1017,6 @@ impl InfluxRpcPlanner {
schema: Arc<Schema>,
normalizer: &mut PredicateNormalizer,
agg: Aggregate,
group_columns: &[impl AsRef<str>],
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
@ -1076,21 +1054,12 @@ impl InfluxRpcPlanner {
.aggregate(group_exprs, agg_exprs)
.context(BuildingPlan)?;
// Reorganize the output so the group columns are first and
// the output is sorted first on the group columns and then
// any remaining tags
//
// This ensures that the `group_columns` are next to each
// other in the output in the order expected by flux
let reordered_tag_columns: Vec<Arc<str>> = reorder_prefix(group_columns, tag_columns)?
.into_iter()
.map(Arc::from)
.collect();
// Reorganize the output so it is ordered and sorted on tag columns
// no columns if there are no tags in the input and no group columns in the query
let plan_builder = if !reordered_tag_columns.is_empty() {
let plan_builder = if !tag_columns.is_empty() {
// reorder columns
let reorder_exprs = reordered_tag_columns
let reorder_exprs = tag_columns
.iter()
.map(|tag_name| tag_name.as_expr())
.collect::<Vec<_>>();
@ -1100,8 +1069,7 @@ impl InfluxRpcPlanner {
.map(|expr| expr.as_sort_expr())
.collect::<Vec<_>>();
let project_exprs =
project_exprs_in_schema(&reordered_tag_columns, plan_builder.schema());
let project_exprs = project_exprs_in_schema(&tag_columns, plan_builder.schema());
plan_builder
.project(project_exprs)
@ -1116,17 +1084,13 @@ impl InfluxRpcPlanner {
let plan = plan_builder.build().context(BuildingPlan)?;
let ss_plan = SeriesSetPlan::new(
Arc::from(table_name),
plan,
reordered_tag_columns,
field_columns,
);
let tag_columns = tag_columns.iter().map(|s| Arc::from(*s)).collect();
let ss_plan = SeriesSetPlan::new(Arc::from(table_name), plan, tag_columns, field_columns);
Ok(Some(ss_plan))
}
/// Creates a GroupedSeriesSet plan that produces an output table with rows
/// Creates a SeriesSetPlan that produces an output table with rows
/// that are grouped by window definitions
///
/// The order of the tag_columns
@ -1308,8 +1272,8 @@ impl InfluxRpcPlanner {
/// Return a `Vec` of `Exprs` such that it starts with `prefix` cols and
/// then has all columns in `schema` that are not already in the prefix.
fn project_exprs_in_schema(prefix: &[Arc<str>], schema: &DFSchemaRef) -> Vec<Expr> {
let seen: HashSet<_> = prefix.iter().map(|s| s.as_ref()).collect();
fn project_exprs_in_schema(prefix: &[&str], schema: &DFSchemaRef) -> Vec<Expr> {
let seen: HashSet<_> = prefix.iter().cloned().collect();
let prefix_exprs = prefix.iter().map(|name| col(name));
let new_exprs = schema.fields().iter().filter_map(|f| {
@ -1379,59 +1343,6 @@ struct TableScanAndFilter {
schema: Arc<Schema>,
}
/// Reorders tag_columns so that its prefix matches exactly
/// prefix_columns. Returns an error if there are duplicates, or other
/// untoward inputs
fn reorder_prefix<'a>(
prefix_columns: &[impl AsRef<str>],
tag_columns: Vec<&'a str>,
) -> Result<Vec<&'a str>> {
// tag_used_set[i] is true if we have used the value in tag_columns[i]
let mut tag_used_set = vec![false; tag_columns.len()];
// Note that this is an O(N^2) algorithm. We are assuming the
// number of tag columns is reasonably small
let mut new_tag_columns = prefix_columns
.iter()
.map(|pc| {
let found_location = tag_columns
.iter()
.enumerate()
.find(|(_, c)| pc.as_ref() == c as &str);
if let Some((index, &tag_column)) = found_location {
if tag_used_set[index] {
DuplicateGroupColumn {
column_name: pc.as_ref(),
}
.fail()
} else {
tag_used_set[index] = true;
Ok(tag_column)
}
} else {
GroupColumnNotFound {
column_name: pc.as_ref(),
all_tag_column_names: tag_columns.join(", "),
}
.fail()
}
})
.collect::<Result<Vec<_>>>()?;
new_tag_columns.extend(tag_columns.into_iter().enumerate().filter_map(|(i, c)| {
// already used in prefix
if tag_used_set[i] {
None
} else {
Some(c)
}
}));
Ok(new_tag_columns)
}
/// Helper for creating aggregates
pub(crate) struct AggExprs {
agg_exprs: Vec<Expr>,
@ -1753,87 +1664,6 @@ mod tests {
use super::*;
#[test]
fn test_reorder_prefix() {
assert_eq!(reorder_prefix_ok(&[], &[]), &[] as &[&str]);
assert_eq!(reorder_prefix_ok(&[], &["one"]), &["one"]);
assert_eq!(reorder_prefix_ok(&["one"], &["one"]), &["one"]);
assert_eq!(reorder_prefix_ok(&[], &["one", "two"]), &["one", "two"]);
assert_eq!(
reorder_prefix_ok(&["one"], &["one", "two"]),
&["one", "two"]
);
assert_eq!(
reorder_prefix_ok(&["two"], &["one", "two"]),
&["two", "one"]
);
assert_eq!(
reorder_prefix_ok(&["two", "one"], &["one", "two"]),
&["two", "one"]
);
assert_eq!(
reorder_prefix_ok(&[], &["one", "two", "three"]),
&["one", "two", "three"]
);
assert_eq!(
reorder_prefix_ok(&["one"], &["one", "two", "three"]),
&["one", "two", "three"]
);
assert_eq!(
reorder_prefix_ok(&["two"], &["one", "two", "three"]),
&["two", "one", "three"]
);
assert_eq!(
reorder_prefix_ok(&["three", "one"], &["one", "two", "three"]),
&["three", "one", "two"]
);
// errors
assert_eq!(
reorder_prefix_err(&["one"], &[]),
"Group column \'one\' not found in tag columns: "
);
assert_eq!(
reorder_prefix_err(&["one"], &["two", "three"]),
"Group column \'one\' not found in tag columns: two, three"
);
assert_eq!(
reorder_prefix_err(&["two", "one", "two"], &["one", "two"]),
"Duplicate group column \'two\'"
);
}
fn reorder_prefix_ok(prefix: &[&str], table_columns: &[&str]) -> Vec<String> {
let table_columns = table_columns.to_vec();
let res = reorder_prefix(prefix, table_columns);
let message = format!("Expected OK, got {:?}", res);
let res = res.expect(&message);
res.into_iter().map(|s| s.to_string()).collect()
}
// returns the error string or panics if `reorder_prefix` doesn't return an
// error
fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String {
let table_columns = table_columns.to_vec();
let res = reorder_prefix(prefix, table_columns);
match res {
Ok(r) => {
panic!(
"Expected error result from reorder_prefix_err, but was OK: '{:?}'",
r
);
}
Err(e) => format!("{}", e),
}
}
#[test]
fn test_schema_has_all_exprs_() {
let schema = SchemaBuilder::new().tag("t1").timestamp().build().unwrap();

View File

@ -33,10 +33,6 @@ pub struct SeriesSetPlan {
/// The names of the columns which are "fields"
pub field_columns: FieldColumns,
/// If present, how many of the series_set_plan::tag_columns
/// should be used to compute the group
pub num_prefix_tag_group_columns: Option<usize>,
}
impl SeriesSetPlan {
@ -57,30 +53,34 @@ impl SeriesSetPlan {
tag_columns: Vec<Arc<str>>,
field_columns: FieldColumns,
) -> Self {
let num_prefix_tag_group_columns = None;
Self {
table_name,
plan,
tag_columns,
field_columns,
num_prefix_tag_group_columns,
}
}
/// Create a SeriesSetPlan that will produce Group items, according to
/// num_prefix_tag_group_columns.
pub fn grouped(mut self, num_prefix_tag_group_columns: usize) -> Self {
self.num_prefix_tag_group_columns = Some(num_prefix_tag_group_columns);
self
}
}
/// A container for plans which each produce a logical stream of
/// timeseries (from across many potential tables).
#[derive(Debug, Default)]
pub struct SeriesSetPlans {
/// Plans the generate Series, ordered by table_name.
///
/// Each plan produces output that is sorted by tag keys (tag
/// column values) and then time.
pub plans: Vec<SeriesSetPlan>,
/// grouping keys, if any, that specify how the output series should be
/// sorted (aka grouped). If empty, means no grouping is needed
///
/// There are several special values that are possible in `group_keys`:
///
/// 1. _field (means group by field column name)
/// 2. _measurement (means group by the table name)
/// 3. _time (means group by the time column)
pub group_columns: Option<Vec<Arc<str>>>,
}
impl SeriesSetPlans {
@ -89,8 +89,20 @@ impl SeriesSetPlans {
}
}
impl From<Vec<SeriesSetPlan>> for SeriesSetPlans {
fn from(plans: Vec<SeriesSetPlan>) -> Self {
Self { plans }
impl SeriesSetPlans {
/// Create a new, ungrouped SeriesSetPlans
pub fn new(plans: Vec<SeriesSetPlan>) -> Self {
Self {
plans,
group_columns: None,
}
}
/// Group the created SeriesSetPlans
pub fn grouped_by(self, group_columns: Vec<Arc<str>>) -> Self {
Self {
group_columns: Some(group_columns),
..self
}
}
}

View File

@ -267,8 +267,8 @@ async fn test_read_group_data_pred() {
let agg = Aggregate::Sum;
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: CA",
"Series tags={_field=temp, _measurement=h2o, state=CA, city=LA}\n FloatPoints timestamps: [200], values: [90.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_group_test_case(
@ -290,10 +290,10 @@ async fn test_read_group_data_field_restriction() {
let agg = Aggregate::Sum;
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: CA",
"Series tags={_field=temp, _measurement=h2o, state=CA, city=LA}\n FloatPoints timestamps: [350], values: [180.0]",
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Boston}\n FloatPoints timestamps: [250], values: [142.8]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [180.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [142.8]",
];
run_read_group_test_case(
@ -349,9 +349,9 @@ async fn test_grouped_series_set_plan_sum() {
// The null field (after predicates) are not sent as series
// Note order of city key (boston --> cambridge)
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Boston}\n FloatPoints timestamps: [400], values: [141.0]",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [200], values: [163.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [141.0]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [163.0]",
];
run_read_group_test_case(
@ -381,11 +381,11 @@ async fn test_grouped_series_set_plan_count() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=humidity, _measurement=h2o, state=MA, city=Boston}\n IntegerPoints timestamps: [400], values: [0]",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Boston}\n IntegerPoints timestamps: [400], values: [2]",
"Series tags={_field=humidity, _measurement=h2o, state=MA, city=Cambridge}\n IntegerPoints timestamps: [200], values: [0]",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Cambridge}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=humidity, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [400], values: [0]",
"Series tags={_field=humidity, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [200], values: [0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [400], values: [2]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [200], values: [2]",
];
run_read_group_test_case(
@ -415,9 +415,9 @@ async fn test_grouped_series_set_plan_mean() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Boston}\n FloatPoints timestamps: [400], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [200], values: [81.5]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [81.5]",
];
run_read_group_test_case(
@ -466,10 +466,10 @@ async fn test_grouped_series_set_plan_count_measurement_pred() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Boston}\n IntegerPoints timestamps: [250], values: [2]",
"Group tag_keys: _field, _measurement, state, city partition_key_vals: CA",
"Series tags={_field=temp, _measurement=o2, state=CA, city=LA}\n IntegerPoints timestamps: [350], values: [2]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA",
"Series tags={_field=temp, _measurement=o2, city=LA, state=CA}\n IntegerPoints timestamps: [350], values: [2]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [250], values: [2]",
];
run_read_group_test_case(
@ -512,11 +512,11 @@ async fn test_grouped_series_set_plan_first() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, state=MA, city=Cambridge}\n BooleanPoints timestamps: [2000], values: [true]",
"Series tags={_field=f, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_field=i, _measurement=h2o, state=MA, city=Cambridge}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_field=s, _measurement=h2o, state=MA, city=Cambridge}\n StringPoints timestamps: [2000], values: [\"c\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [2000], values: [true]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [2000], values: [\"c\"]",
];
run_read_group_test_case(
@ -540,11 +540,11 @@ async fn test_grouped_series_set_plan_last() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, state=MA, city=Cambridge}\n BooleanPoints timestamps: [3000], values: [false]",
"Series tags={_field=f, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_field=i, _measurement=h2o, state=MA, city=Cambridge}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_field=s, _measurement=h2o, state=MA, city=Cambridge}\n StringPoints timestamps: [3000], values: [\"b\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [3000], values: [false]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [3000], values: [\"b\"]",
];
run_read_group_test_case(
@ -589,11 +589,11 @@ async fn test_grouped_series_set_plan_min() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, state=MA, city=Cambridge}\n BooleanPoints timestamps: [1000], values: [false]",
"Series tags={_field=f, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_field=i, _measurement=h2o, state=MA, city=Cambridge}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_field=s, _measurement=h2o, state=MA, city=Cambridge}\n StringPoints timestamps: [2000], values: [\"a\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [1000], values: [false]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [2000], values: [\"a\"]",
];
run_read_group_test_case(
@ -636,11 +636,11 @@ async fn test_grouped_series_set_plan_max() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, state=MA, city=Cambridge}\n BooleanPoints timestamps: [3000], values: [true]",
"Series tags={_field=f, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_field=i, _measurement=h2o, state=MA, city=Cambridge}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_field=s, _measurement=h2o, state=MA, city=Cambridge}\n StringPoints timestamps: [4000], values: [\"z\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [3000], values: [true]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [4000], values: [\"z\"]",
];
run_read_group_test_case(
@ -686,13 +686,13 @@ async fn test_grouped_series_set_plan_group_by_state_city() {
let group_columns = vec!["state", "city"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, state, city partition_key_vals: CA, LA",
"Series tags={_field=humidity, _measurement=h2o, state=CA, city=LA}\n FloatPoints timestamps: [600], values: [21.0]",
"Series tags={_field=temp, _measurement=h2o, state=CA, city=LA}\n FloatPoints timestamps: [600], values: [181.0]",
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA, Boston",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Boston}\n FloatPoints timestamps: [400], values: [141.0]",
"Group tag_keys: _field, _measurement, state, city partition_key_vals: MA, Cambridge",
"Series tags={_field=temp, _measurement=h2o, state=MA, city=Cambridge}\n FloatPoints timestamps: [200], values: [243.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA, LA",
"Series tags={_field=humidity, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [600], values: [21.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [600], values: [181.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, Boston",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [141.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, Cambridge",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [243.0]"
];
run_read_group_test_case(
@ -762,3 +762,166 @@ async fn test_grouped_series_set_plan_group_aggregate_none() {
)
.await;
}
struct MeasurementForGroupByField {}
#[async_trait]
impl DbSetup for MeasurementForGroupByField {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines1 = vec![
"system,host=local,region=A load1=1.1,load2=2.1 100",
"system,host=local,region=A load1=1.2,load2=2.2 200",
"system,host=remote,region=B load1=10.1,load2=2.1 100",
];
let lp_lines2 = vec![
"system,host=remote,region=B load1=10.2,load2=20.2 200",
"system,host=local,region=C load1=100.1,load2=200.1 100",
"aa_system,host=local,region=C load1=100.1,load2=200.1 100",
];
make_two_chunk_scenarios(partition_key, &lp_lines1.join("\n"), &lp_lines2.join("\n")).await
}
}
#[tokio::test]
async fn test_grouped_series_set_plan_group_by_field_none() {
// no predicate
let predicate = PredicateBuilder::default().build();
let agg = Aggregate::None;
let group_columns = vec!["_field"];
// Expect the data is grouped so all the distinct values of load1
// are before the values for load2
let expected_results = vec![
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
];
run_read_group_test_case(
MeasurementForGroupByField {},
predicate,
agg,
group_columns,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_grouped_series_set_plan_group_by_field_and_tag_none() {
// no predicate
let predicate = PredicateBuilder::default().build();
let agg = Aggregate::None;
let group_columns = vec!["_field", "region"];
// Expect the data is grouped so all the distinct values of load1
// are before the values for load2, grouped by region
let expected_results = vec![
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1, A",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1, B",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1, C",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2, A",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2, B",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2, C",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
];
run_read_group_test_case(
MeasurementForGroupByField {},
predicate,
agg,
group_columns,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_grouped_series_set_plan_group_by_tag_and_field_none() {
// no predicate
let predicate = PredicateBuilder::default().build();
let agg = Aggregate::None;
// note group by the tag first then the field.... Output shoud be
// sorted on on region first and then _field
let group_columns = vec!["region", "_field"];
let expected_results = vec![
"Group tag_keys: _field, _measurement, host, region partition_key_vals: A, load1",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: A, load2",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: B, load1",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: B, load2",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: C, load1",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: C, load2",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
];
run_read_group_test_case(
MeasurementForGroupByField {},
predicate,
agg,
group_columns,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_grouped_series_set_plan_group_measurement_tag_count() {
// no predicate
let predicate = PredicateBuilder::default().build();
let agg = Aggregate::Count;
let group_columns = vec!["_measurement", "region"];
// Expect the data is grouped so output is sorted by measurement and then region
let expected_results = vec![
"Group tag_keys: _field, _measurement, host, region partition_key_vals: aa_system, C",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: system, A",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n IntegerPoints timestamps: [200], values: [2]",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: system, B",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n IntegerPoints timestamps: [200], values: [2]",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: system, C",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
];
run_read_group_test_case(
MeasurementForGroupByField {},
predicate,
agg,
group_columns,
expected_results,
)
.await;
}

View File

@ -1,11 +1,5 @@
use query::exec::IOxExecutionContext;
use query::{
exec::seriesset::{
series::{Group, Series},
SeriesSetItem,
},
plan::seriesset::SeriesSetPlans,
};
use query::plan::seriesset::SeriesSetPlans;
/// Run a series set plan to completion and produce a Vec<String> representation
///
@ -15,22 +9,10 @@ use query::{
/// items are returned.
#[cfg(test)]
pub async fn run_series_set_plan(ctx: &IOxExecutionContext, plans: SeriesSetPlans) -> Vec<String> {
use std::convert::TryInto;
let series_sets = ctx.to_series_set(plans).await.expect("running plans");
series_sets
ctx.to_series_and_groups(plans)
.await
.expect("running plans")
.into_iter()
.map(|item| match item {
SeriesSetItem::GroupStart(group_description) => {
let group: Group = group_description.into();
vec![group.to_string()]
}
SeriesSetItem::Data(series_set) => {
let series: Vec<Series> = series_set.try_into().expect("converting series");
series.into_iter().map(|s| s.to_string()).collect()
}
})
.flatten()
.map(|series_or_group| series_or_group.to_string())
.collect()
}

View File

@ -116,7 +116,7 @@ async fn build_and_execute_plan(
let results = executor
.new_context(ExecutorType::Query)
.to_series_set(plan)
.to_series_and_groups(plan)
.await
.expect("Running series set plan");

View File

@ -125,7 +125,7 @@ async fn build_and_execute_plan(
let results = executor
.new_context(ExecutorType::Query)
.to_series_set(plan)
.to_series_and_groups(plan)
.await
.expect("Running series set plan");

View File

@ -1,14 +1,14 @@
//! This module contains code to translate from InfluxDB IOx data
//! formats into the formats needed by gRPC
use std::{collections::BTreeSet, convert::TryInto, fmt, sync::Arc};
use std::{collections::BTreeSet, fmt, sync::Arc};
use arrow::datatypes::DataType as ArrowDataType;
use observability_deps::tracing::trace;
use query::exec::{
fieldlist::FieldList,
seriesset::{series, GroupDescription, SeriesSet, SeriesSetItem},
seriesset::series::{self, Either},
};
use generated_types::{
@ -21,7 +21,7 @@ use generated_types::{
};
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use snafu::{ResultExt, Snafu};
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error {
@ -52,54 +52,7 @@ pub fn tag_keys_to_byte_vecs(tag_keys: Arc<BTreeSet<String>>) -> Vec<Vec<u8>> {
byte_vecs
}
fn series_set_to_frames(series_set: SeriesSet) -> Result<Vec<Frame>> {
let series: Vec<series::Series> = series_set.try_into().context(ConvertingSeries)?;
let frames = series
.into_iter()
.map(|series| {
let series::Series { tags, data } = series;
let (data_type, data_frame) = match data {
series::Data::FloatPoints { timestamps, values } => (
DataType::Float,
Data::FloatPoints(FloatPointsFrame { timestamps, values }),
),
series::Data::IntegerPoints { timestamps, values } => (
DataType::Integer,
Data::IntegerPoints(IntegerPointsFrame { timestamps, values }),
),
series::Data::UnsignedPoints { timestamps, values } => (
DataType::Unsigned,
Data::UnsignedPoints(UnsignedPointsFrame { timestamps, values }),
),
series::Data::BooleanPoints { timestamps, values } => (
DataType::Boolean,
Data::BooleanPoints(BooleanPointsFrame { timestamps, values }),
),
series::Data::StringPoints { timestamps, values } => (
DataType::String,
Data::StringPoints(StringPointsFrame { timestamps, values }),
),
};
let series_frame = Data::Series(SeriesFrame {
tags: convert_tags(tags),
data_type: data_type.into(),
});
vec![series_frame, data_frame]
})
.flatten()
.map(|data| Frame { data: Some(data) })
.collect();
Ok(frames)
}
/// Convert a `SeriesSetItem` into a form suitable for gRPC transport
///
/// Each `SeriesSetItem` gets converted into this pattern:
/// Convert Series and Groups ` into a form suitable for gRPC transport:
///
/// ```
/// (GroupFrame) potentially
@ -117,24 +70,71 @@ fn series_set_to_frames(series_set: SeriesSet) -> Result<Vec<Frame>> {
/// ```
///
/// The specific type of (*Points) depends on the type of field column.
pub fn series_set_item_to_read_response(series_set_item: SeriesSetItem) -> Result<ReadResponse> {
let frames = match series_set_item {
SeriesSetItem::GroupStart(group_description) => {
vec![group_description_to_frame(group_description)]
pub fn series_or_groups_to_read_response(series_or_groups: Vec<Either>) -> ReadResponse {
let mut frames = vec![];
for series_or_group in series_or_groups {
match series_or_group {
Either::Series(series) => {
series_to_frames(&mut frames, series);
}
Either::Group(group) => {
frames.push(group_to_frame(group));
}
}
SeriesSetItem::Data(series_set) => series_set_to_frames(series_set)?,
};
}
trace!(frames=%DisplayableFrames::new(&frames), "Response gRPC frames");
Ok(ReadResponse { frames })
ReadResponse { frames }
}
/// Converts a [`GroupDescription`] into a storage gRPC `GroupFrame`
/// Converts a `Series` into frames for GRPC transport
fn series_to_frames(frames: &mut Vec<Frame>, series: series::Series) {
let series::Series { tags, data } = series;
let (data_type, data_frame) = match data {
series::Data::FloatPoints { timestamps, values } => (
DataType::Float,
Data::FloatPoints(FloatPointsFrame { timestamps, values }),
),
series::Data::IntegerPoints { timestamps, values } => (
DataType::Integer,
Data::IntegerPoints(IntegerPointsFrame { timestamps, values }),
),
series::Data::UnsignedPoints { timestamps, values } => (
DataType::Unsigned,
Data::UnsignedPoints(UnsignedPointsFrame { timestamps, values }),
),
series::Data::BooleanPoints { timestamps, values } => (
DataType::Boolean,
Data::BooleanPoints(BooleanPointsFrame { timestamps, values }),
),
series::Data::StringPoints { timestamps, values } => (
DataType::String,
Data::StringPoints(StringPointsFrame { timestamps, values }),
),
};
let series_frame = Data::Series(SeriesFrame {
tags: convert_tags(tags),
data_type: data_type.into(),
});
frames.push(Frame {
data: Some(series_frame),
});
frames.push(Frame {
data: Some(data_frame),
});
}
/// Converts a [`series::Group`] into a storage gRPC `GroupFrame`
/// format that can be returned to the client.
fn group_description_to_frame(group_description: GroupDescription) -> Frame {
fn group_to_frame(group: series::Group) -> Frame {
let series::Group {
tag_keys,
partition_key_vals,
} = group_description.into();
} = group;
let group_frame = GroupFrame {
tag_keys: arcs_to_bytes(tag_keys),
@ -299,6 +299,8 @@ fn dump_tags(tags: &[Tag]) -> String {
#[cfg(test)]
mod tests {
use std::convert::TryInto;
use arrow::{
array::{
ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray,
@ -307,7 +309,14 @@ mod tests {
datatypes::DataType as ArrowDataType,
record_batch::RecordBatch,
};
use query::exec::{field::FieldIndexes, fieldlist::Field};
use query::exec::{
field::FieldIndexes,
fieldlist::Field,
seriesset::{
series::{Group, Series},
SeriesSet,
},
};
use super::*;
@ -338,11 +347,6 @@ mod tests {
);
}
fn series_set_to_read_response(series_set: SeriesSet) -> Result<ReadResponse> {
let frames = series_set_to_frames(series_set)?;
Ok(ReadResponse { frames })
}
#[test]
fn test_series_set_conversion() {
let series_set = SeriesSet {
@ -354,8 +358,12 @@ mod tests {
batch: make_record_batch(),
};
let response =
series_set_to_read_response(series_set).expect("Correctly converted series set");
let series: Vec<Series> = series_set
.try_into()
.expect("Correctly converted series set");
let series: Vec<Either> = series.into_iter().map(|s| s.into()).collect();
let response = series_or_groups_to_read_response(series);
let dumped_frames = dump_frames(&response.frames);
@ -381,15 +389,17 @@ mod tests {
#[test]
fn test_group_group_conversion() {
let group_description = GroupDescription {
all_tags: vec![Arc::from("tag1"), Arc::from("tag2")],
gby_vals: vec![Arc::from("val1"), Arc::from("val2")],
let group = Group {
tag_keys: vec![
Arc::from("_field"),
Arc::from("_measurement"),
Arc::from("tag1"),
Arc::from("tag2"),
],
partition_key_vals: vec![Arc::from("val1"), Arc::from("val2")],
};
let grouped_series_set_item = SeriesSetItem::GroupStart(group_description);
let response = series_set_item_to_read_response(grouped_series_set_item)
.expect("Correctly converted grouped_series_set_item");
let response = series_or_groups_to_read_response(vec![group.into()]);
let dumped_frames = dump_frames(&response.frames);

View File

@ -29,7 +29,7 @@ use crate::influxdb_ioxd::{
planner::Planner,
rpc::storage::{
data::{
fieldlist_to_measurement_fields_response, series_set_item_to_read_response,
fieldlist_to_measurement_fields_response, series_or_groups_to_read_response,
tag_keys_to_byte_vecs,
},
expr::{self, AddRpcNode, GroupByAndAggregate, Loggable, SpecialTagKeys},
@ -136,9 +136,6 @@ pub enum Error {
#[snafu(display("Error computing groups series: {}", source))]
ComputingGroupedSeriesSet { source: SeriesSetError },
#[snafu(display("Error converting time series into gRPC response: {}", source))]
ConvertingSeriesSet { source: super::data::Error },
#[snafu(display("Converting field information series into gRPC response: {}", source))]
ConvertingFieldList { source: super::data::Error },
@ -194,7 +191,6 @@ impl Error {
Self::ConvertingWindowAggregate { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingTagKeyInTagValues { .. } => Status::invalid_argument(self.to_string()),
Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingFieldList { .. } => Status::invalid_argument(self.to_string()),
Self::SendingResults { .. } => Status::internal(self.to_string()),
Self::InternalHintsFieldNotSupported { .. } => Status::internal(self.to_string()),
@ -901,8 +897,8 @@ where
.context(PlanningFilteringSeries { db_name })?;
// Execute the plans.
let ss_items = ctx
.to_series_set(series_plan)
let series_or_groups = ctx
.to_series_and_groups(series_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(FilteringSeries {
@ -910,11 +906,9 @@ where
})
.log_if_error("Running series set plan")?;
// Convert results into API responses
ss_items
.into_iter()
.map(|series_set| series_set_item_to_read_response(series_set).context(ConvertingSeriesSet))
.collect::<Result<Vec<ReadResponse>, Error>>()
let response = series_or_groups_to_read_response(series_or_groups);
Ok(vec![response])
}
/// Launch async tasks that send the result of executing read_group to `tx`
@ -967,8 +961,8 @@ where
// if big queries are causing a significant latency in TTFB.
// Execute the plans
let ss_items = ctx
.to_series_set(grouped_series_set_plan)
let series_or_groups = ctx
.to_series_and_groups(grouped_series_set_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(GroupingSeries {
@ -976,11 +970,9 @@ where
})
.log_if_error("Running Grouped SeriesSet Plan")?;
// Convert plans to API responses
ss_items
.into_iter()
.map(|series_set| series_set_item_to_read_response(series_set).context(ConvertingSeriesSet))
.collect::<Result<Vec<ReadResponse>, Error>>()
let response = series_or_groups_to_read_response(series_or_groups);
Ok(vec![response])
}
/// Return field names, restricted via optional measurement, timestamp and
@ -1837,6 +1829,7 @@ mod tests {
let chunk = TestChunk::new("TheMeasurement")
.with_time_column()
.with_i64_field_column("my field")
.with_tag_column("state")
.with_one_row_of_data();
@ -1865,7 +1858,11 @@ mod tests {
let frames = fixture.storage_client.read_group(request).await.unwrap();
assert_eq!(frames.len(), 1);
// three frames:
// GroupFrame
// SeriesFrame (tag=state, field=my field)
// DataFrame
assert_eq!(frames.len(), 3);
grpc_request_metric_has_count(&fixture, "ReadGroup", "ok", 1);
}

View File

@ -398,19 +398,19 @@ async fn test_read_group_none_agg() {
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"10,11\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"71,72\"",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"30,31\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"61,62\"",
];
@ -495,19 +495,19 @@ async fn test_read_group_sum_agg() {
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"163\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"163\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"143\"",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"81\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"103\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"61\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"103\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"123\"",
];
@ -548,19 +548,19 @@ async fn test_read_group_count_agg() {
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
];
@ -602,19 +602,19 @@ async fn test_read_group_last_agg() {
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"82\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"11\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"82\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"72\"",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"52\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"31\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"52\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"62\"",
];