Merge branch 'main' into ach-parse-tsm-key

pull/24376/head
Andrew Charlton 2022-07-20 09:50:45 +01:00 committed by GitHub
commit 4750b54ec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 495 additions and 408 deletions

16
Cargo.lock generated
View File

@ -1117,7 +1117,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"ahash",
"arrow",
@ -1156,7 +1156,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"arrow",
"object_store",
@ -1168,7 +1168,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"ahash",
"arrow",
@ -1179,7 +1179,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"arrow",
"async-trait",
@ -1194,7 +1194,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"ahash",
"arrow",
@ -1218,7 +1218,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"arrow",
"datafusion 10.0.0",
@ -1231,7 +1231,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"arrow",
"datafusion-common",
@ -1242,7 +1242,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "10.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=6cb695fc62140bf93977f2ef274a5bd83f88672f#6cb695fc62140bf93977f2ef274a5bd83f88672f"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8#9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8"
dependencies = [
"ahash",
"arrow",

View File

@ -9,6 +9,6 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypto functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="6cb695fc62140bf93977f2ef274a5bd83f88672f", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="6cb695fc62140bf93977f2ef274a5bd83f88672f" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="9aa38c3c73856fe7e93d67b29cc2c9ce892db3f8" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -39,15 +39,15 @@ async fn read_filter() {
let actual_frames = dump_data_frames(&frames);
let expected_frames = generator.substitute_nanos(&[
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,_field=value, type: 0",
"SeriesFrame, tags: _field=value,_measurement=cpu_load_short,host=server01, type: 0",
"FloatPointsFrame, timestamps: [ns1], values: \"27.99\"",
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,region=us-east,_field=value, type: 0",
"SeriesFrame, tags: _field=value,_measurement=cpu_load_short,host=server01,region=us-east, type: 0",
"FloatPointsFrame, timestamps: [ns3], values: \"1234567.891011\"",
"SeriesFrame, tags: _measurement=cpu_load_short,host=server01,region=us-west,_field=value, type: 0",
"SeriesFrame, tags: _field=value,_measurement=cpu_load_short,host=server01,region=us-west, type: 0",
"FloatPointsFrame, timestamps: [ns0, ns4], values: \"0.64,0.000003\"",
"SeriesFrame, tags: _measurement=swap,host=server01,name=disk0,_field=in, type: 1",
"SeriesFrame, tags: _field=in,_measurement=swap,host=server01,name=disk0, type: 1",
"IntegerPointsFrame, timestamps: [ns6], values: \"3\"",
"SeriesFrame, tags: _measurement=swap,host=server01,name=disk0,_field=out, type: 1",
"SeriesFrame, tags: _field=out,_measurement=swap,host=server01,name=disk0, type: 1",
"IntegerPointsFrame, timestamps: [ns6], values: \"4\""
]);
@ -70,13 +70,13 @@ pub async fn read_filter_regex_operator() {
.timestamp_range(0, 2001) // include all data
.regex_match_predicate("host", "^b.+"),
vec![
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"",
],
)
@ -93,7 +93,7 @@ pub async fn read_filter_empty_tag_eq() {
// host = '' means where host is not present
.tag_predicate("host", ""),
vec![
"SeriesFrame, tags: _measurement=cpu,_field=value, type: 0",
"SeriesFrame, tags: _field=value,_measurement=cpu, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"1\"",
],
)
@ -110,7 +110,7 @@ pub async fn read_filter_empty_tag_not_regex() {
// host !~ /^server01$/ means where host doesn't start with `server01`
.not_regex_match_predicate("host", "^server01"),
vec![
"SeriesFrame, tags: _measurement=cpu,_field=value, type: 0",
"SeriesFrame, tags: _field=value,_measurement=cpu, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"1\"",
],
)
@ -126,7 +126,7 @@ pub async fn read_filter_empty_tag_regex() {
// host =~ /.+/ means where host is at least one character
.regex_match_predicate("host", ".+"),
vec![
"SeriesFrame, tags: _measurement=cpu,host=server01,_field=value, type: 0",
"SeriesFrame, tags: _field=value,_measurement=cpu,host=server01, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"2\"",
],
)

View File

@ -19,23 +19,23 @@ async fn test_read_group_none_agg() {
.group(Group::By)
.aggregate_type(AggregateType::None),
vec![
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu1",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_user, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"10,11\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"71,72\"",
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu2",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_user, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"30,31\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"61,62\"",
],
)
@ -55,15 +55,15 @@ async fn test_read_group_none_agg_with_predicate() {
.group(Group::By)
.aggregate_type(AggregateType::None),
vec![
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu1",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"20\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"10\"",
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu2",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"40\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"30\"",
],
)
@ -84,23 +84,23 @@ async fn test_read_group_sum_agg() {
.group(Group::By)
.aggregate_type(AggregateType::Sum),
vec![
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu1",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_user, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"163\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"163\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"143\"",
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu2",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"81\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_user, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"103\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"61\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"103\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"123\"",
],
)
@ -120,23 +120,23 @@ async fn test_read_group_count_agg() {
.group(Group::By)
.aggregate_type(AggregateType::Count),
vec![
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu1",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_system, type: 1",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_user, type: 1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_system, type: 1",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_user, type: 1",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu2",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_system, type: 1",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_user, type: 1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_system, type: 1",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_user, type: 1",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 1",
"IntegerPointsFrame, timestamps: [2000], values: \"2\"",
],
)
@ -157,23 +157,23 @@ async fn test_read_group_last_agg() {
.group(Group::By)
.aggregate_type(AggregateType::Last),
vec![
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu1",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=bar,_field=usage_user, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"82\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"11\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu1,host=foo,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"82\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"72\"",
"GroupFrame, tag_keys: _measurement,cpu,host,_field, partition_key_vals: cpu2",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_system, type: 0",
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=bar,_field=usage_user, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"52\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_system, type: 0",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"31\"",
"SeriesFrame, tags: _measurement=cpu,cpu=cpu2,host=foo,_field=usage_user, type: 0",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"52\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
"FloatPointsFrame, timestamps: [2000], values: \"62\"",
],
)

View File

@ -67,10 +67,10 @@ pub async fn read_window_aggregate_test() {
let actual_frames = dump_data_frames(&frames);
let expected_frames = vec![
"SeriesFrame, tags: _measurement=h2o,city=Boston,state=MA,_field=temp, type: 0",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [400, 600], values: \"143,147\"",
"SeriesFrame, tags: _measurement=h2o,city=Cambridge,state=MA,_field=temp, type: 0",
"FloatPointsFrame, timestamps: [400, 600], values: \"163,167\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Cambridge,state=MA, type: 0",
"FloatPointsFrame, timestamps: [400, 600], values: \"163,167\""
];
assert_eq!(

View File

@ -21,7 +21,7 @@ use datafusion::{
logical_plan::{normalize_col, plan::Extension, Expr, LogicalPlan},
};
pub use context::{IOxSessionConfig, IOxSessionContext};
pub use context::{IOxSessionConfig, IOxSessionContext, SessionContextIOxExt};
use schema_pivot::SchemaPivotNode;
use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode};

View File

@ -27,7 +27,7 @@ use futures::TryStreamExt;
use observability_deps::tracing::debug;
use trace::{
ctx::SpanContext,
span::{MetaValue, SpanRecorder},
span::{MetaValue, Span, SpanRecorder},
};
use crate::exec::{
@ -51,7 +51,6 @@ use crate::plan::{
// Reuse DataFusion error and Result types for this module
pub use datafusion::error::{DataFusionError as Error, Result};
use trace::span::Span;
use super::{
non_null_checker::NonNullCheckerNode, seriesset::series::Either, split::StreamSplitNode,
@ -236,11 +235,7 @@ impl IOxSessionConfig {
let maybe_span = self.span_ctx.map(|ctx| ctx.child("Query Execution"));
IOxSessionContext {
inner,
exec: Some(self.exec),
recorder: SpanRecorder::new(maybe_span),
}
IOxSessionContext::new(inner, Some(self.exec), SpanRecorder::new(maybe_span))
}
}
@ -281,6 +276,24 @@ impl fmt::Debug for IOxSessionContext {
}
impl IOxSessionContext {
/// Private constructor
fn new(inner: SessionContext, exec: Option<DedicatedExecutor>, recorder: SpanRecorder) -> Self {
// attach span to DataFusion session
{
let mut state = inner.state.write();
state.config = state
.config
.clone()
.with_extension(Arc::new(recorder.span().cloned()));
}
Self {
inner,
exec,
recorder,
}
}
/// returns a reference to the inner datafusion execution context
pub fn inner(&self) -> &SessionContext {
&self.inner
@ -574,11 +587,11 @@ impl IOxSessionContext {
/// Returns a IOxSessionContext with a SpanRecorder that is a child of the current
pub fn child_ctx(&self, name: &'static str) -> Self {
Self {
inner: self.inner.clone(),
exec: self.exec.clone(),
recorder: self.recorder.child(name),
}
Self::new(
self.inner.clone(),
self.exec.clone(),
self.recorder.child(name),
)
}
/// Record an event on the span recorder
@ -601,3 +614,17 @@ impl IOxSessionContext {
self.exec.as_ref().map(|e| e.tasks()).unwrap_or_default()
}
}
/// Extension trait to pull IOx spans out of DataFusion contexts.
pub trait SessionContextIOxExt {
/// Get child span of the current context.
fn child_span(&self, name: &'static str) -> Option<Span>;
}
impl SessionContextIOxExt for SessionState {
fn child_span(&self, name: &'static str) -> Option<Span> {
self.config
.get_extension::<Option<Span>>()
.and_then(|span| span.as_ref().as_ref().map(|span| span.child(name)))
}
}

View File

@ -248,26 +248,19 @@ impl SeriesSet {
/// Create the tag=value pairs for this series set, adding
/// adding the _f and _m tags for the field name and measurement
fn create_frame_tags(&self, field_name: &str) -> Vec<Tag> {
// Special case "measurement" name which is modeled as a tag of
// "_measurement" and "field" which is modeled as a tag of "_field"
//
// N.B., in order to emit series sets in the same "tag order" as they
// would be in a TSM model we need to emit "_measurement" at the front
// and "_field" at the end. Whilst this does not appear to be the
// correct lexicographical order, in a TSM data-model these tags are
// actually stored as `\x00` and `\xff` respectively. Therefore the
// expectation is that "_measurement" is emitted first and "_field"
// last.
//
// This also ensures that the output will be sorted first by
// "_measurement" and then "_field" even when there are no groups
// requested.
// Prepend key with "_measurement"
let mut converted_tags = vec![Tag {
key: MEASUREMENT_COLUMN_NAME.into(),
value: Arc::clone(&self.table_name),
}];
// Tags are returned in lexicographical order.
// The special tags "_field" and "_measurement"
// come in front of everything because of the "_".
let mut converted_tags = vec![
Tag {
key: FIELD_COLUMN_NAME.into(),
value: field_name.into(),
},
Tag {
key: MEASUREMENT_COLUMN_NAME.into(),
value: Arc::clone(&self.table_name),
},
];
// convert the rest of the tags
converted_tags.extend(self.tags.iter().map(|(k, v)| Tag {
@ -275,11 +268,6 @@ impl SeriesSet {
value: Arc::clone(v),
}));
// Add "_field" to end of key.
converted_tags.push(Tag {
key: FIELD_COLUMN_NAME.into(),
value: field_name.into(),
});
converted_tags
}
}
@ -451,15 +439,15 @@ mod tests {
let series_strings = series_set_to_series_strings(series_set);
let expected = vec![
"Series tags={_measurement=the_table, tag1=val1, _field=string_field}",
"Series tags={_field=string_field, _measurement=the_table, tag1=val1}",
" StringPoints timestamps: [2000, 3000], values: [\"bar\", \"baz\"]",
"Series tags={_measurement=the_table, tag1=val1, _field=int_field}",
"Series tags={_field=int_field, _measurement=the_table, tag1=val1}",
" IntegerPoints timestamps: [2000, 3000], values: [2, 3]",
"Series tags={_measurement=the_table, tag1=val1, _field=uint_field}",
"Series tags={_field=uint_field, _measurement=the_table, tag1=val1}",
" UnsignedPoints timestamps: [2000, 3000], values: [22, 33]",
"Series tags={_measurement=the_table, tag1=val1, _field=float_field}",
"Series tags={_field=float_field, _measurement=the_table, tag1=val1}",
" FloatPoints timestamps: [2000, 3000], values: [20.1, 30.1]",
"Series tags={_measurement=the_table, tag1=val1, _field=boolean_field}",
"Series tags={_field=boolean_field, _measurement=the_table, tag1=val1}",
" BooleanPoints timestamps: [2000, 3000], values: [false, true]",
];
@ -500,9 +488,9 @@ mod tests {
let series_strings = series_set_to_series_strings(series_set);
let expected = vec![
"Series tags={_measurement=the_table, tag1=val1, _field=string_field2}",
"Series tags={_field=string_field2, _measurement=the_table, tag1=val1}",
" StringPoints timestamps: [4, 5], values: [\"far\", \"faz\"]",
"Series tags={_measurement=the_table, tag1=val1, _field=string_field1}",
"Series tags={_field=string_field1, _measurement=the_table, tag1=val1}",
" StringPoints timestamps: [2, 3], values: [\"bar\", \"baz\"]",
];
@ -552,7 +540,7 @@ mod tests {
let series_strings = series_set_to_series_strings(series_set);
let expected = vec![
"Series tags={_measurement=the_table, state=MA, _field=float_field}",
"Series tags={_field=float_field, _measurement=the_table, state=MA}",
" FloatPoints timestamps: [1000, 2000, 4000], values: [10.1, 20.1, 40.1]",
];
@ -600,15 +588,15 @@ mod tests {
let series_strings = series_set_to_series_strings(series_set);
let expected = vec![
"Series tags={_measurement=the_table, state=MA, _field=string_field}",
"Series tags={_field=string_field, _measurement=the_table, state=MA}",
" StringPoints timestamps: [2000], values: [\"foo\"]",
"Series tags={_measurement=the_table, state=MA, _field=float_field}",
"Series tags={_field=float_field, _measurement=the_table, state=MA}",
" FloatPoints timestamps: [2000], values: [1.0]",
"Series tags={_measurement=the_table, state=MA, _field=int_field}",
"Series tags={_field=int_field, _measurement=the_table, state=MA}",
" IntegerPoints timestamps: [2000], values: [-10]",
"Series tags={_measurement=the_table, state=MA, _field=uint_field}",
"Series tags={_field=uint_field, _measurement=the_table, state=MA}",
" UnsignedPoints timestamps: [2000], values: [100]",
"Series tags={_measurement=the_table, state=MA, _field=bool_field}",
"Series tags={_field=bool_field, _measurement=the_table, state=MA}",
" BooleanPoints timestamps: [2000], values: [true]",
];

View File

@ -219,7 +219,7 @@ impl InfluxRpcPlanner {
database: &dyn QueryDatabase,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
let _ctx = self.ctx.child_ctx("table_names planning");
let ctx = self.ctx.child_ctx("table_names planning");
debug!(?rpc_predicate, "planning table_names");
// Special case predicates that span the entire valid timestamp range
@ -237,7 +237,7 @@ impl InfluxRpcPlanner {
// Identify which chunks can answer from its metadata and then record its table,
// and which chunks needs full plan and group them into their table
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
for chunk in cheap_chunk_first(chunks) {
@ -357,7 +357,7 @@ impl InfluxRpcPlanner {
}
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
for chunk in cheap_chunk_first(chunks) {
@ -499,7 +499,7 @@ impl InfluxRpcPlanner {
.context(CreatingPredicatesSnafu)?;
for (table_name, predicate) in &table_predicates {
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
for chunk in cheap_chunk_first(chunks) {
@ -685,7 +685,7 @@ impl InfluxRpcPlanner {
continue;
}
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
let chunks = prune_chunks_metadata(chunks, predicate)?;
@ -743,7 +743,7 @@ impl InfluxRpcPlanner {
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
let chunks = prune_chunks_metadata(chunks, predicate)?;
@ -806,7 +806,7 @@ impl InfluxRpcPlanner {
for (table_name, predicate) in &table_predicates {
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
let chunks = prune_chunks_metadata(chunks, predicate)?;
@ -877,7 +877,7 @@ impl InfluxRpcPlanner {
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
let chunks = database
.chunks(table_name, predicate)
.chunks(table_name, predicate, ctx.child_ctx("table chunks"))
.await
.context(GettingChunksSnafu { table_name })?;
let chunks = prune_chunks_metadata(chunks, predicate)?;

View File

@ -156,6 +156,7 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
&self,
table_name: &str,
predicate: &Predicate,
ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, QueryDatabaseError>;
/// Record that particular type of query was run / planned

View File

@ -107,6 +107,7 @@ impl QueryDatabase for TestDatabase {
&self,
table_name: &str,
predicate: &Predicate,
_ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, QueryDatabaseError> {
// save last predicate
*self.chunks_predicate.lock() = predicate.clone();

View File

@ -133,6 +133,7 @@ pub(crate) fn df_to_scalar(
#[cfg(test)]
mod tests {
use arrow::datatypes::Field;
use test_helpers::assert_contains;
use super::*;
@ -226,7 +227,11 @@ mod tests {
fn test_unsupported_scalar_value() {
let scalar = datafusion::scalar::ScalarValue::List(
Some(vec![]),
Box::new(arrow::datatypes::DataType::Float64),
Box::new(Field::new(
"field",
arrow::datatypes::DataType::Float64,
true,
)),
);
let res = df_to_scalar(scalar);
assert_contains!(res.unwrap_err().to_string(), "unsupported scalar value:");
@ -245,7 +250,11 @@ mod tests {
right: Box::new(datafusion::logical_plan::Expr::Literal(
datafusion::scalar::ScalarValue::List(
Some(vec![]),
Box::new(arrow::datatypes::DataType::Float64),
Box::new(Field::new(
"field",
arrow::datatypes::DataType::Float64,
true,
)),
),
)),
};

View File

@ -40,6 +40,7 @@ impl QueryDatabase for QuerierNamespace {
&self,
table_name: &str,
predicate: &Predicate,
ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, QueryDatabaseError> {
debug!(%table_name, %predicate, "Finding chunks for table");
// get table metadata
@ -52,7 +53,12 @@ impl QueryDatabase for QuerierNamespace {
}
};
let mut chunks = table.chunks(predicate).await?;
let mut chunks = table
.chunks(
predicate,
ctx.span().map(|span| span.child("querier table chunks")),
)
.await?;
// if there is a field restriction on the predicate, only
// chunks with that field should be returned. If the chunk has
@ -194,6 +200,7 @@ mod tests {
use data_types::ColumnType;
use iox_query::frontend::sql::SqlQueryPlanner;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
use trace::{span::SpanStatus, RingBufferTraceCollector};
#[tokio::test]
async fn test_query() {
@ -326,7 +333,10 @@ mod tests {
let querier_namespace = Arc::new(querier_namespace(&ns).await);
assert_query(
let traces = Arc::new(RingBufferTraceCollector::new(100));
let span_ctx = SpanContext::new(Arc::clone(&traces) as _);
assert_query_with_span_ctx(
&querier_namespace,
"SELECT * FROM cpu ORDER BY host,time",
&[
@ -339,8 +349,18 @@ mod tests {
"| | b | 5 | 1970-01-01T00:00:00.000000011Z |",
"+-----+------+------+--------------------------------+",
],
Some(span_ctx),
)
.await;
let span = traces
.spans()
.into_iter()
.find(|s| s.name == "querier table chunks")
.expect("tracing span not found");
assert_eq!(span.status, SpanStatus::Ok);
assert_query(
&querier_namespace,
"SELECT * FROM mem ORDER BY host,time",
@ -470,7 +490,16 @@ mod tests {
sql: &str,
expected_lines: &[&str],
) {
let results = run(querier_namespace, sql).await;
assert_query_with_span_ctx(querier_namespace, sql, expected_lines, None).await
}
async fn assert_query_with_span_ctx(
querier_namespace: &Arc<QuerierNamespace>,
sql: &str,
expected_lines: &[&str],
span_ctx: Option<SpanContext>,
) {
let results = run(querier_namespace, sql, span_ctx).await;
assert_batches_sorted_eq!(expected_lines, &results);
}
@ -479,13 +508,17 @@ mod tests {
sql: &str,
expected_lines: &[&str],
) {
let results = run(querier_namespace, sql).await;
let results = run(querier_namespace, sql, None).await;
assert_batches_eq!(expected_lines, &results);
}
async fn run(querier_namespace: &Arc<QuerierNamespace>, sql: &str) -> Vec<RecordBatch> {
async fn run(
querier_namespace: &Arc<QuerierNamespace>,
sql: &str,
span_ctx: Option<SpanContext>,
) -> Vec<RecordBatch> {
let planner = SqlQueryPlanner::default();
let ctx = querier_namespace.new_query_context(None);
let ctx = querier_namespace.new_query_context(span_ctx);
let physical_plan = planner
.query(sql, &ctx)

View File

@ -17,6 +17,7 @@ use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use trace::span::{Span, SpanRecorder};
mod query_access;
mod state_reconciler;
@ -126,7 +127,25 @@ impl QuerierTable {
/// Query all chunks within this table.
///
/// This currently contains all parquet files linked to their unprocessed tombstones.
pub async fn chunks(&self, predicate: &Predicate) -> Result<Vec<Arc<dyn QueryChunk>>> {
pub async fn chunks(
&self,
predicate: &Predicate,
span: Option<Span>,
) -> Result<Vec<Arc<dyn QueryChunk>>> {
let mut span_recorder = SpanRecorder::new(span);
match self.chunks_inner(predicate).await {
Ok(chunks) => {
span_recorder.ok("got chunks");
Ok(chunks)
}
Err(e) => {
span_recorder.error("failed to get chunks");
Err(e)
}
}
}
async fn chunks_inner(&self, predicate: &Predicate) -> Result<Vec<Arc<dyn QueryChunk>>> {
debug!(
?predicate,
namespace=%self.namespace_name,
@ -964,7 +983,7 @@ mod tests {
.unwrap()
.next_response(Ok(self.ingester_partitions.clone()));
self.querier_table.chunks(pred).await
self.querier_table.chunks(pred, None).await
}
}

View File

@ -11,6 +11,7 @@ use datafusion::{
physical_plan::ExecutionPlan,
};
use iox_query::{
exec::SessionContextIOxExt,
provider::{ChunkPruner, ProviderBuilder},
pruning::{prune_chunks, PruningObserver},
QueryChunk,
@ -51,7 +52,7 @@ impl TableProvider for QuerierTable {
.fold(Predicate::new(), |b, expr| b.with_expr(expr.clone()));
let chunks = self
.chunks(&predicate)
.chunks(&predicate, ctx.child_span("querier table chunks"))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

View File

@ -101,10 +101,10 @@ async fn run_read_filter_error_case<D>(
#[tokio::test]
async fn test_read_filter_data_no_pred() {
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [70.4, 72.4]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200, 350], values: [90.0, 90.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [70.4, 72.4]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 350], values: [90.0, 90.0]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
@ -136,7 +136,7 @@ async fn test_read_filter_data_inclusive_predicate() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -150,9 +150,9 @@ async fn test_read_filter_data_exact_predicate() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -167,8 +167,8 @@ async fn test_read_filter_data_tag_predicate() {
// expect both series to be returned
let expected_results = vec![
"Series tags={_measurement=cpu, region=west, _field=user}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
"Series tags={_measurement=disk, region=east, _field=bytes}\n IntegerPoints timestamps: [200], values: [99]",
"Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
"Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
];
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
@ -223,10 +223,10 @@ async fn test_read_filter_unknown_column_in_predicate() {
#[tokio::test]
async fn test_read_filter_data_no_pred_with_delete() {
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100], values: [70.4]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
@ -241,8 +241,8 @@ async fn test_read_filter_data_no_pred_with_delete() {
async fn test_read_filter_data_no_pred_with_delete_all() {
// nothing from h2o table because all rows were deleted
let expected_results = vec![
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
@ -262,7 +262,7 @@ async fn test_read_filter_data_filter() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_filter_test_case(
@ -322,7 +322,7 @@ async fn test_read_filter_data_filter_with_delete() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100], values: [70.4]",
];
run_read_filter_test_case(
@ -344,7 +344,7 @@ async fn test_read_filter_data_filter_fields() {
// Only expect other_temp in this location
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=CA, _field=other_temp}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
@ -362,7 +362,7 @@ async fn test_read_filter_data_filter_measurement_pred() {
// Only expect other_temp in this location
let expected_results = vec![
"Series tags={_measurement=o2, state=CA, _field=temp}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
@ -395,8 +395,8 @@ async fn test_read_filter_data_pred_no_columns() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=cpu, region=west, _field=user}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
"Series tags={_measurement=disk, region=east, _field=bytes}\n IntegerPoints timestamps: [200], values: [99]",
"Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
"Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
];
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
@ -409,8 +409,8 @@ async fn test_read_filter_data_pred_no_columns_with_delete() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=cpu, region=west, _field=user}\n FloatPoints timestamps: [100], values: [23.2]",
"Series tags={_measurement=disk, region=east, _field=bytes}\n IntegerPoints timestamps: [200], values: [99]",
"Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100], values: [23.2]",
"Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
];
run_read_filter_test_case(TwoMeasurementsWithDelete {}, predicate, expected_results).await;
@ -424,7 +424,7 @@ async fn test_read_filter_data_pred_no_columns_with_delete_all() {
// Only table disk has no deleted data
let expected_results = vec![
"Series tags={_measurement=disk, region=east, _field=bytes}\n IntegerPoints timestamps: [200], values: [99]",
"Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
];
run_read_filter_test_case(TwoMeasurementsWithDeleteAll {}, predicate, expected_results).await;
@ -464,7 +464,7 @@ async fn test_read_filter_data_pred_using_regex_match() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -495,7 +495,7 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
@ -523,9 +523,9 @@ async fn test_read_filter_data_pred_using_regex_not_match() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -540,7 +540,7 @@ async fn test_read_filter_data_pred_regex_escape() {
// expect one series with influxdb.com
let expected_results = vec![
"Series tags={_measurement=status_code, url=https://influxdb.com, _field=value}\n FloatPoints timestamps: [1527018816000000000], values: [418.0]",
"Series tags={_field=value, _measurement=status_code, url=https://influxdb.com}\n FloatPoints timestamps: [1527018816000000000], values: [418.0]",
];
run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await;
@ -555,7 +555,7 @@ async fn test_read_filter_data_pred_not_match_regex_escape() {
// expect one series with influxdb.com
let expected_results = vec![
"Series tags={_measurement=status_code, url=http://www.example.com, _field=value}\n FloatPoints timestamps: [1527018806000000000], values: [404.0]",
"Series tags={_field=value, _measurement=status_code, url=http://www.example.com}\n FloatPoints timestamps: [1527018806000000000], values: [404.0]",
];
run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await;
@ -575,9 +575,9 @@ async fn test_read_filter_data_pred_unsupported_in_scan() {
// Note these results include data from both o2 and h2o
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200, 350], values: [90.0, 90.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 350], values: [90.0, 90.0]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
@ -597,9 +597,9 @@ async fn test_read_filter_data_pred_unsupported_in_scan_with_delete() {
// Note these results include data from both o2 and h2o
let expected_results = vec![
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
@ -611,8 +611,8 @@ async fn test_read_filter_data_pred_unsupported_in_scan_with_delete() {
// With delete all from h2o, no rows from h2p should be returned
let expected_results = vec![
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDeleteAll {},
@ -626,12 +626,12 @@ async fn test_read_filter_data_pred_unsupported_in_scan_with_delete() {
async fn test_read_filter_data_plan_order() {
test_helpers::maybe_start_logging();
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=CA, _field=temp}\n FloatPoints timestamps: [250], values: [70.3]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=other}\n FloatPoints timestamps: [250], values: [5.0]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [70.5]",
"Series tags={_measurement=h2o, city=Boston, state=MA, zz_tag=A, _field=temp}\n FloatPoints timestamps: [1000], values: [70.4]",
"Series tags={_measurement=h2o, city=Kingston, state=MA, zz_tag=A, _field=temp}\n FloatPoints timestamps: [800], values: [70.1]",
"Series tags={_measurement=h2o, city=Kingston, state=MA, zz_tag=B, _field=temp}\n FloatPoints timestamps: [100], values: [70.2]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [250], values: [70.3]",
"Series tags={_field=other, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [5.0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA, zz_tag=A}\n FloatPoints timestamps: [1000], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=A}\n FloatPoints timestamps: [800], values: [70.1]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=B}\n FloatPoints timestamps: [100], values: [70.2]",
];
run_read_filter_test_case(
@ -646,11 +646,11 @@ async fn test_read_filter_data_plan_order() {
async fn test_read_filter_data_plan_order_with_delete() {
test_helpers::maybe_start_logging();
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=other}\n FloatPoints timestamps: [250], values: [5.0]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [70.5]",
"Series tags={_measurement=h2o, city=Boston, state=MA, zz_tag=A, _field=temp}\n FloatPoints timestamps: [1000], values: [70.4]",
"Series tags={_measurement=h2o, city=Kingston, state=MA, zz_tag=A, _field=temp}\n FloatPoints timestamps: [800], values: [70.1]",
"Series tags={_measurement=h2o, city=Kingston, state=MA, zz_tag=B, _field=temp}\n FloatPoints timestamps: [100], values: [70.2]",
"Series tags={_field=other, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [5.0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA, zz_tag=A}\n FloatPoints timestamps: [1000], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=A}\n FloatPoints timestamps: [800], values: [70.1]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=B}\n FloatPoints timestamps: [100], values: [70.2]",
];
run_read_filter_test_case(
@ -671,7 +671,7 @@ async fn test_read_filter_filter_on_value() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=system, host=host.local, _field=load4}\n FloatPoints timestamps: [1527018806000000000, 1527018826000000000], values: [1.77, 1.77]",
"Series tags={_field=load4, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018806000000000, 1527018826000000000], values: [1.77, 1.77]",
];
run_read_filter_test_case(MeasurementsForDefect2845 {}, predicate, expected_results).await;
@ -688,9 +688,9 @@ async fn test_read_filter_on_field() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_measurement=o2, state=CA, _field=temp}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [50], values: [53.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
@ -707,10 +707,10 @@ async fn test_read_filter_on_not_field() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=CA, _field=other_temp}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=moisture}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=other_temp}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [50], values: [51.0]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [51.0]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
@ -746,7 +746,7 @@ async fn test_read_filter_on_field_single_measurement() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
@ -763,13 +763,13 @@ async fn test_read_filter_multi_negation() {
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=attributes, _field=color}\n StringPoints timestamps: [8000], values: [\"blue\"]",
"Series tags={_measurement=cpu_load_short, host=server01, _field=value}\n FloatPoints timestamps: [1000], values: [27.99]",
"Series tags={_measurement=cpu_load_short, host=server01, region=us-east, _field=value}\n FloatPoints timestamps: [3000], values: [1234567.891011]",
"Series tags={_measurement=cpu_load_short, host=server01, region=us-west, _field=value}\n FloatPoints timestamps: [0, 4000], values: [0.64, 3e-6]",
"Series tags={_measurement=status, _field=active}\n BooleanPoints timestamps: [7000], values: [true]",
"Series tags={_measurement=swap, host=server01, name=disk0, _field=in}\n FloatPoints timestamps: [6000], values: [3.0]",
"Series tags={_measurement=swap, host=server01, name=disk0, _field=out}\n FloatPoints timestamps: [6000], values: [4.0]",
"Series tags={_field=color, _measurement=attributes}\n StringPoints timestamps: [8000], values: [\"blue\"]",
"Series tags={_field=value, _measurement=cpu_load_short, host=server01}\n FloatPoints timestamps: [1000], values: [27.99]",
"Series tags={_field=value, _measurement=cpu_load_short, host=server01, region=us-east}\n FloatPoints timestamps: [3000], values: [1234567.891011]",
"Series tags={_field=value, _measurement=cpu_load_short, host=server01, region=us-west}\n FloatPoints timestamps: [0, 4000], values: [0.64, 3e-6]",
"Series tags={_field=active, _measurement=status}\n BooleanPoints timestamps: [7000], values: [true]",
"Series tags={_field=in, _measurement=swap, host=server01, name=disk0}\n FloatPoints timestamps: [6000], values: [3.0]",
"Series tags={_field=out, _measurement=swap, host=server01, name=disk0}\n FloatPoints timestamps: [6000], values: [4.0]",
];
run_read_filter_test_case(EndToEndTest {}, predicate, expected_results).await;
@ -793,13 +793,13 @@ async fn test_read_filter_on_field_multi_measurement() {
// SHOULD NOT contain temp from h2o
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=CA, _field=other_temp}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=other_temp}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
// This series should not be here (_field = temp)
// WRONG ANSWER, tracked by: https://github.com/influxdata/influxdb_iox/issues/3428
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_measurement=o2, state=CA, _field=temp}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [50], values: [53.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;

View File

@ -63,8 +63,8 @@ async fn test_read_group_data_no_tag_columns() {
let agg = Aggregate::Count;
let group_columns = vec![];
let expected_results = vec![
"Group tag_keys: _measurement, _field partition_key_vals: ",
"Series tags={_measurement=m0, _field=foo}\n IntegerPoints timestamps: [2], values: [2]",
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n IntegerPoints timestamps: [2], values: [2]",
];
run_read_group_test_case(
@ -79,8 +79,8 @@ async fn test_read_group_data_no_tag_columns() {
// min
let agg = Aggregate::Min;
let expected_results = vec![
"Group tag_keys: _measurement, _field partition_key_vals: ",
"Series tags={_measurement=m0, _field=foo}\n FloatPoints timestamps: [1], values: [1.0]",
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n FloatPoints timestamps: [1], values: [1.0]",
];
run_read_group_test_case(
@ -98,8 +98,8 @@ async fn test_read_group_data_no_tag_columns_count_with_delete() {
let agg = Aggregate::Count;
let group_columns = vec![];
let expected_results = vec![
"Group tag_keys: _measurement, _field partition_key_vals: ",
"Series tags={_measurement=m0, _field=foo}\n IntegerPoints timestamps: [2], values: [1]",
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n IntegerPoints timestamps: [2], values: [1]",
];
run_read_group_test_case(
OneMeasurementNoTagsWithDelete {},
@ -116,8 +116,8 @@ async fn test_read_group_data_no_tag_columns_min_with_delete() {
let agg = Aggregate::Min;
let group_columns = vec![];
let expected_results = vec![
"Group tag_keys: _measurement, _field partition_key_vals: ",
"Series tags={_measurement=m0, _field=foo}\n FloatPoints timestamps: [2], values: [2.0]",
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n FloatPoints timestamps: [2], values: [2.0]",
];
run_read_group_test_case(
@ -171,8 +171,8 @@ async fn test_read_group_data_pred() {
let agg = Aggregate::Sum;
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: CA",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_group_test_case(
@ -193,10 +193,10 @@ async fn test_read_group_data_field_restriction() {
let agg = Aggregate::Sum;
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: CA",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [180.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [142.8]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [180.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [142.8]",
];
run_read_group_test_case(
@ -228,9 +228,9 @@ async fn test_grouped_series_set_plan_sum() {
// The null field (after predicates) are not sent as series
// Note order of city key (boston --> cambridge)
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [400], values: [141.0]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n FloatPoints timestamps: [200], values: [163.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [141.0]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [163.0]",
];
run_read_group_test_case(
@ -260,11 +260,11 @@ async fn test_grouped_series_set_plan_count() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=humidity}\n IntegerPoints timestamps: [400], values: [0]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n IntegerPoints timestamps: [400], values: [2]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=humidity}\n IntegerPoints timestamps: [200], values: [0]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=humidity, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [400], values: [0]",
"Series tags={_field=humidity, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [200], values: [0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [400], values: [2]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [200], values: [2]",
];
run_read_group_test_case(
@ -294,9 +294,9 @@ async fn test_grouped_series_set_plan_mean() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [400], values: [70.5]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n FloatPoints timestamps: [200], values: [81.5]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [81.5]",
];
run_read_group_test_case(
@ -324,10 +324,10 @@ async fn test_grouped_series_set_plan_count_measurement_pred() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: CA",
"Series tags={_measurement=o2, city=LA, state=CA, _field=temp}\n IntegerPoints timestamps: [350], values: [2]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n IntegerPoints timestamps: [250], values: [2]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA",
"Series tags={_field=temp, _measurement=o2, city=LA, state=CA}\n IntegerPoints timestamps: [350], values: [2]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [250], values: [2]",
];
run_read_group_test_case(
@ -351,11 +351,11 @@ async fn test_grouped_series_set_plan_first() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=b}\n BooleanPoints timestamps: [2000], values: [true]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=f}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=i}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=s}\n StringPoints timestamps: [2000], values: [\"c\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [2000], values: [true]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [2000], values: [\"c\"]",
];
run_read_group_test_case(
@ -384,10 +384,10 @@ async fn test_grouped_series_set_plan_first_with_nulls() {
// expect timestamps to be present for all three series
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=moisture}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=other_temp}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [50], values: [70.4]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [70.4]",
];
run_read_group_test_case(
@ -411,11 +411,11 @@ async fn test_grouped_series_set_plan_last() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=b}\n BooleanPoints timestamps: [3000], values: [false]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=f}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=i}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=s}\n StringPoints timestamps: [3000], values: [\"b\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [3000], values: [false]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [3000], values: [\"b\"]",
];
run_read_group_test_case(
@ -444,10 +444,10 @@ async fn test_grouped_series_set_plan_last_with_nulls() {
// expect timestamps to be present for all three series
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=moisture}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=other_temp}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100000], values: [70.4]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [70.4]",
];
run_read_group_test_case(
@ -471,11 +471,11 @@ async fn test_grouped_series_set_plan_min() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=b}\n BooleanPoints timestamps: [1000], values: [false]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=f}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=i}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=s}\n StringPoints timestamps: [2000], values: [\"a\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [1000], values: [false]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [3000], values: [6.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [3000], values: [6]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [2000], values: [\"a\"]",
];
run_read_group_test_case(
@ -499,11 +499,11 @@ async fn test_grouped_series_set_plan_max() {
let group_columns = vec!["state"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=b}\n BooleanPoints timestamps: [3000], values: [true]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=f}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=i}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=s}\n StringPoints timestamps: [4000], values: [\"z\"]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
"Series tags={_field=b, _measurement=h2o, city=Cambridge, state=MA}\n BooleanPoints timestamps: [3000], values: [true]",
"Series tags={_field=f, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [2000], values: [7.0]",
"Series tags={_field=i, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [2000], values: [7]",
"Series tags={_field=s, _measurement=h2o, city=Cambridge, state=MA}\n StringPoints timestamps: [4000], values: [\"z\"]",
];
run_read_group_test_case(
@ -522,13 +522,13 @@ async fn test_grouped_series_set_plan_group_by_state_city() {
let group_columns = vec!["state", "city"];
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: CA, LA",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=humidity}\n FloatPoints timestamps: [600], values: [21.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [600], values: [181.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA, Boston",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [400], values: [141.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA, Cambridge",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n FloatPoints timestamps: [200], values: [243.0]"
"Group tag_keys: _field, _measurement, city, state partition_key_vals: CA, LA",
"Series tags={_field=humidity, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [600], values: [21.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [600], values: [181.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, Boston",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [141.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, Cambridge",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [243.0]"
];
run_read_group_test_case(
@ -548,13 +548,13 @@ async fn test_grouped_series_set_plan_group_by_city_state() {
// Test with alternate group key order (note the order of columns is different)
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: Boston, MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [400], values: [141.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: Cambridge, MA",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n FloatPoints timestamps: [200], values: [243.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: LA, CA",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=humidity}\n FloatPoints timestamps: [600], values: [21.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [600], values: [181.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: Boston, MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [400], values: [141.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: Cambridge, MA",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [200], values: [243.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: LA, CA",
"Series tags={_field=humidity, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [600], values: [21.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [600], values: [181.0]",
];
run_read_group_test_case(
@ -574,13 +574,13 @@ async fn test_grouped_series_set_plan_group_aggregate_none() {
// Expect order of the columns to begin with city/state
let expected_results = vec![
"Group tag_keys: _measurement, city, state, _field partition_key_vals: Boston, MA",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [300, 400], values: [70.0, 71.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: Cambridge, MA",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n FloatPoints timestamps: [50, 100, 200], values: [80.0, 81.0, 82.0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: LA, CA",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=humidity}\n FloatPoints timestamps: [500, 600], values: [10.0, 11.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [500, 600], values: [90.0, 91.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: Boston, MA",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [300, 400], values: [70.0, 71.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: Cambridge, MA",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n FloatPoints timestamps: [50, 100, 200], values: [80.0, 81.0, 82.0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: LA, CA",
"Series tags={_field=humidity, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [500, 600], values: [10.0, 11.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [500, 600], values: [90.0, 91.0]",
];
run_read_group_test_case(
@ -601,16 +601,16 @@ async fn test_grouped_series_set_plan_group_by_field_none() {
// Expect the data is grouped so all the distinct values of load1
// are before the values for load2
let expected_results = vec![
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load1",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load1}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_measurement=system, host=local, region=A, _field=load1}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Series tags={_measurement=system, host=local, region=C, _field=load1}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_measurement=system, host=remote, region=B, _field=load1}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load2",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load2}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_measurement=system, host=local, region=A, _field=load2}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Series tags={_measurement=system, host=local, region=C, _field=load2}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_measurement=system, host=remote, region=B, _field=load2}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
];
run_read_group_test_case(
@ -631,20 +631,20 @@ async fn test_grouped_series_set_plan_group_by_field_and_tag_none() {
// Expect the data is grouped so all the distinct values of load1
// are before the values for load2, grouped by region
let expected_results = vec![
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load1, A",
"Series tags={_measurement=system, host=local, region=A, _field=load1}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load1, B",
"Series tags={_measurement=system, host=remote, region=B, _field=load1}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load1, C",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load1}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_measurement=system, host=local, region=C, _field=load1}\n FloatPoints timestamps: [100], values: [100.1]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load2, A",
"Series tags={_measurement=system, host=local, region=A, _field=load2}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load2, B",
"Series tags={_measurement=system, host=remote, region=B, _field=load2}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: load2, C",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load2}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_measurement=system, host=local, region=C, _field=load2}\n FloatPoints timestamps: [100], values: [200.1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1, A",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1, B",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load1, C",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2, A",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2, B",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: load2, C",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
];
run_read_group_test_case(
@ -665,20 +665,20 @@ async fn test_grouped_series_set_plan_group_by_tag_and_field_none() {
let group_columns = vec!["region", "_field"];
let expected_results = vec![
"Group tag_keys: _measurement, host, region, _field partition_key_vals: A, load1",
"Series tags={_measurement=system, host=local, region=A, _field=load1}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: A, load2",
"Series tags={_measurement=system, host=local, region=A, _field=load2}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: B, load1",
"Series tags={_measurement=system, host=remote, region=B, _field=load1}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: B, load2",
"Series tags={_measurement=system, host=remote, region=B, _field=load2}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: C, load1",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load1}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_measurement=system, host=local, region=C, _field=load1}\n FloatPoints timestamps: [100], values: [100.1]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: C, load2",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load2}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_measurement=system, host=local, region=C, _field=load2}\n FloatPoints timestamps: [100], values: [200.1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: A, load1",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [1.1, 1.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: A, load2",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n FloatPoints timestamps: [100, 200], values: [2.1, 2.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: B, load1",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [10.1, 10.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: B, load2",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n FloatPoints timestamps: [100, 200], values: [2.1, 20.2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: C, load1",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [100.1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: C, load2",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n FloatPoints timestamps: [100], values: [200.1]",
];
run_read_group_test_case(
@ -698,18 +698,18 @@ async fn test_grouped_series_set_plan_group_measurement_tag_count() {
// Expect the data is grouped so output is sorted by measurement and then region
let expected_results = vec![
"Group tag_keys: _measurement, host, region, _field partition_key_vals: aa_system, C",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load1}\n IntegerPoints timestamps: [100], values: [1]",
"Series tags={_measurement=aa_system, host=local, region=C, _field=load2}\n IntegerPoints timestamps: [100], values: [1]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: system, A",
"Series tags={_measurement=system, host=local, region=A, _field=load1}\n IntegerPoints timestamps: [200], values: [2]",
"Series tags={_measurement=system, host=local, region=A, _field=load2}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: system, B",
"Series tags={_measurement=system, host=remote, region=B, _field=load1}\n IntegerPoints timestamps: [200], values: [2]",
"Series tags={_measurement=system, host=remote, region=B, _field=load2}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _measurement, host, region, _field partition_key_vals: system, C",
"Series tags={_measurement=system, host=local, region=C, _field=load1}\n IntegerPoints timestamps: [100], values: [1]",
"Series tags={_measurement=system, host=local, region=C, _field=load2}\n IntegerPoints timestamps: [100], values: [1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: aa_system, C",
"Series tags={_field=load1, _measurement=aa_system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
"Series tags={_field=load2, _measurement=aa_system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: system, A",
"Series tags={_field=load1, _measurement=system, host=local, region=A}\n IntegerPoints timestamps: [200], values: [2]",
"Series tags={_field=load2, _measurement=system, host=local, region=A}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: system, B",
"Series tags={_field=load1, _measurement=system, host=remote, region=B}\n IntegerPoints timestamps: [200], values: [2]",
"Series tags={_field=load2, _measurement=system, host=remote, region=B}\n IntegerPoints timestamps: [200], values: [2]",
"Group tag_keys: _field, _measurement, host, region partition_key_vals: system, C",
"Series tags={_field=load1, _measurement=system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
"Series tags={_field=load2, _measurement=system, host=local, region=C}\n IntegerPoints timestamps: [100], values: [1]",
];
run_read_group_test_case(
@ -731,12 +731,12 @@ async fn test_grouped_series_set_plan_group_field_start_stop() {
// Expect the data is grouped so output is sorted by state, with
// blank partition values for _start and _stop (mirroring TSM)
let expected_results = vec![
"Group tag_keys: _measurement, state, _field partition_key_vals: , , CA",
"Series tags={_measurement=o2, state=CA, _field=reading}\n IntegerPoints timestamps: [300], values: [0]",
"Series tags={_measurement=o2, state=CA, _field=temp}\n IntegerPoints timestamps: [300], values: [1]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: , , MA",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n IntegerPoints timestamps: [50], values: [1]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n IntegerPoints timestamps: [50], values: [1]",
"Group tag_keys: _field, _measurement, state partition_key_vals: , , CA",
"Series tags={_field=reading, _measurement=o2, state=CA}\n IntegerPoints timestamps: [300], values: [0]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n IntegerPoints timestamps: [300], values: [1]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: , , MA",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n IntegerPoints timestamps: [50], values: [1]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n IntegerPoints timestamps: [50], values: [1]",
];
let group_columns = vec!["_start", "_stop", "state"];
@ -771,14 +771,14 @@ async fn test_grouped_series_set_plan_group_field_pred_and_null_fields() {
// Expect the data is grouped so output is sorted by measurement state
let expected_results = vec![
"Group tag_keys: _measurement, state, _field partition_key_vals: CA, reading",
"Series tags={_measurement=o2, state=CA, _field=reading}\n IntegerPoints timestamps: [300], values: [0]",
"Group tag_keys: _measurement, state, _field partition_key_vals: CA, temp",
"Series tags={_measurement=o2, state=CA, _field=temp}\n IntegerPoints timestamps: [300], values: [1]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA, reading",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n IntegerPoints timestamps: [50], values: [1]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA, temp",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n IntegerPoints timestamps: [50], values: [1]",
"Group tag_keys: _field, _measurement, state partition_key_vals: CA, reading",
"Series tags={_field=reading, _measurement=o2, state=CA}\n IntegerPoints timestamps: [300], values: [0]",
"Group tag_keys: _field, _measurement, state partition_key_vals: CA, temp",
"Series tags={_field=temp, _measurement=o2, state=CA}\n IntegerPoints timestamps: [300], values: [1]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, reading",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n IntegerPoints timestamps: [50], values: [1]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, temp",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n IntegerPoints timestamps: [50], values: [1]",
];
run_read_group_test_case(
@ -806,10 +806,10 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_field() {
// Expect the data is grouped so output is sorted by measurement and then region
let expected_results = vec![
"Group tag_keys: _measurement, state, _field partition_key_vals: CA, reading",
"Series tags={_measurement=o2, state=CA, _field=reading}\n IntegerPoints timestamps: [300], values: [0]",
"Group tag_keys: _measurement, city, state, _field partition_key_vals: MA, reading",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n IntegerPoints timestamps: [50], values: [1]",
"Group tag_keys: _field, _measurement, state partition_key_vals: CA, reading",
"Series tags={_field=reading, _measurement=o2, state=CA}\n IntegerPoints timestamps: [300], values: [0]",
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA, reading",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n IntegerPoints timestamps: [50], values: [1]",
];
run_read_group_test_case(
@ -841,8 +841,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_value() {
// Expect the data is grouped so output is sorted by measurement and then region
let expected_results = vec![
"Group tag_keys: _measurement, host, _field partition_key_vals: load4",
"Series tags={_measurement=system, host=host.local, _field=load4}\n FloatPoints timestamps: [1527018806000000000], values: [1.77]",
"Group tag_keys: _field, _measurement, host partition_key_vals: load4",
"Series tags={_field=load4, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018806000000000], values: [1.77]",
];
run_read_group_test_case(
@ -870,10 +870,10 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_multiple_value(
// Expect the data is grouped so output is sorted by measurement and then region
let expected_results = vec![
"Group tag_keys: _measurement, host, _field partition_key_vals: load3",
"Series tags={_measurement=system, host=host.local, _field=load3}\n FloatPoints timestamps: [1527018806000000000], values: [1.72]",
"Group tag_keys: _measurement, host, _field partition_key_vals: load4",
"Series tags={_measurement=system, host=host.local, _field=load4}\n FloatPoints timestamps: [1527018806000000000], values: [1.77]",
"Group tag_keys: _field, _measurement, host partition_key_vals: load3",
"Series tags={_field=load3, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018806000000000], values: [1.72]",
"Group tag_keys: _field, _measurement, host partition_key_vals: load4",
"Series tags={_field=load4, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018806000000000], values: [1.77]",
];
run_read_group_test_case(
@ -901,8 +901,8 @@ async fn test_grouped_series_set_plan_group_field_pred_filter_on_value_sum() {
// Expect the data is grouped so output is sorted by measurement and then region
let expected_results = vec![
"Group tag_keys: _measurement, host, _field partition_key_vals: load4",
"Series tags={_measurement=system, host=host.local, _field=load4}\n FloatPoints timestamps: [1527018826000000000], values: [3.54]",
"Group tag_keys: _field, _measurement, host partition_key_vals: load4",
"Series tags={_field=load4, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018826000000000], values: [3.54]",
];
run_read_group_test_case(

View File

@ -63,8 +63,8 @@ async fn test_read_window_aggregate_nanoseconds() {
// note the name of the field is "temp" even though it is the average
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [200, 400, 600], values: [70.0, 71.5, 73.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200, 400, 600], values: [90.0, 91.5, 93.0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [200, 400, 600], values: [70.0, 71.5, 73.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 400, 600], values: [90.0, 91.5, 93.0]",
];
run_read_window_aggregate_test_case(
@ -95,8 +95,8 @@ async fn test_read_window_aggregate_nanoseconds_measurement_pred() {
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [200, 400, 600], values: [70.0, 71.5, 73.0]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200, 400, 600], values: [90.0, 91.5, 93.0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [200, 400, 600], values: [70.0, 71.5, 73.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 400, 600], values: [90.0, 91.5, 93.0]",
];
run_read_window_aggregate_test_case(
@ -121,9 +121,9 @@ async fn test_read_window_aggregate_nanoseconds_measurement_count() {
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_measurement=h2o, city=Cambridge, state=MA, _field=temp}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
];
run_read_window_aggregate_test_case(
@ -153,10 +153,10 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
// MAX, FIRST, LAST) we need to run a plan that brings along the timestamps
// for the chosen aggregate in the window.
let expected_results = vec![
"Series tags={_measurement=mm, section=1a, _field=bar}\n FloatPoints timestamps: [1609459201000000011], values: [5.0]",
"Series tags={_measurement=mm, section=1a, _field=foo}\n FloatPoints timestamps: [1609459201000000001, 1609459201000000024], values: [1.0, 11.24]",
"Series tags={_measurement=mm, section=2b, _field=bar}\n FloatPoints timestamps: [1609459201000000009, 1609459201000000015, 1609459201000000022], values: [4.0, 6.0, 1.2]",
"Series tags={_measurement=mm, section=2b, _field=foo}\n FloatPoints timestamps: [1609459201000000002], values: [2.0]",
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000011], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000001, 1609459201000000024], values: [1.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000009, 1609459201000000015, 1609459201000000022], values: [4.0, 6.0, 1.2]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000002], values: [2.0]",
];
run_read_window_aggregate_test_case(
@ -183,10 +183,10 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697_with_delet
// one row deleted
let expected_results = vec![
"Series tags={_measurement=mm, section=1a, _field=bar}\n FloatPoints timestamps: [1609459201000000011], values: [5.0]",
"Series tags={_measurement=mm, section=1a, _field=foo}\n FloatPoints timestamps: [1609459201000000001, 1609459201000000024], values: [1.0, 11.24]",
"Series tags={_measurement=mm, section=2b, _field=bar}\n FloatPoints timestamps: [1609459201000000009, 1609459201000000015], values: [4.0, 6.0]",
"Series tags={_measurement=mm, section=2b, _field=foo}\n FloatPoints timestamps: [1609459201000000002], values: [2.0]",
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000011], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000001, 1609459201000000024], values: [1.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000009, 1609459201000000015], values: [4.0, 6.0]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000002], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697WithDelete {},
@ -226,10 +226,10 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"Series tags={_measurement=mm, section=1a, _field=bar}\n FloatPoints timestamps: [1609459201000000020], values: [5.0]",
"Series tags={_measurement=mm, section=1a, _field=foo}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_measurement=mm, section=2b, _field=bar}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000020, 1609459201000000030], values: [4.0, 6.0, 1.2]",
"Series tags={_measurement=mm, section=2b, _field=foo}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000020], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000020, 1609459201000000030], values: [4.0, 6.0, 1.2]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
];
run_read_window_aggregate_test_case(
@ -261,8 +261,8 @@ async fn test_grouped_series_set_plan_group_aggregate_filter_on_field() {
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"Series tags={_measurement=mm, section=1a, _field=foo}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_measurement=mm, section=2b, _field=foo}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
];
run_read_window_aggregate_test_case(
@ -292,10 +292,10 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697_with_delet
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"Series tags={_measurement=mm, section=1a, _field=bar}\n FloatPoints timestamps: [1609459201000000020], values: [5.0]",
"Series tags={_measurement=mm, section=1a, _field=foo}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_measurement=mm, section=2b, _field=bar}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000020], values: [4.0, 6.0]",
"Series tags={_measurement=mm, section=2b, _field=foo}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000020], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000020], values: [4.0, 6.0]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697WithDelete {},
@ -331,8 +331,8 @@ async fn test_read_window_aggregate_overflow() {
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_measurement=mm, _field=bar}\n FloatPoints timestamps: [1609459201000000015], values: [6.0]",
"Series tags={_measurement=mm, _field=foo}\n FloatPoints timestamps: [1609459201000000005], values: [3.0]",
"Series tags={_field=bar, _measurement=mm}\n FloatPoints timestamps: [1609459201000000015], values: [6.0]",
"Series tags={_field=foo, _measurement=mm}\n FloatPoints timestamps: [1609459201000000005], values: [3.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2890 {},

View File

@ -36,8 +36,9 @@ async fn run_table_schema_test_case<D>(
// Make sure at least one table has data
let mut chunks_with_table = 0;
let ctx = db.new_query_context(None);
let chunks = db
.chunks(table_name, &Default::default())
.chunks(table_name, &Default::default(), ctx)
.await
.expect("error getting chunks");
for chunk in chunks {

View File

@ -156,7 +156,8 @@ fn group_to_frame(group: series::Group) -> Frame {
/// Convert the tag=value pairs from Arc<str> to Vec<u8> for gRPC transport
fn convert_tags(tags: Vec<series::Tag>, tag_key_binary_format: bool) -> Vec<Tag> {
tags.into_iter()
let mut res: Vec<Tag> = tags
.into_iter()
.map(|series::Tag { key, value }| Tag {
key: match tag_key_binary_format {
true => match key.as_ref() {
@ -168,7 +169,13 @@ fn convert_tags(tags: Vec<series::Tag>, tag_key_binary_format: bool) -> Vec<Tag>
},
value: value.bytes().collect(),
})
.collect()
.collect();
// tags must be returned in lexicographical order; when we rename the tags
// to use the binary encoding, we fiddle with the existing ordering and need to re-sort.
if tag_key_binary_format {
res.sort_unstable_by(|a, b| a.key.cmp(&b.key));
}
res
}
fn arcs_to_bytes(s: Vec<Arc<str>>) -> Vec<Vec<u8>> {
@ -381,15 +388,15 @@ mod tests {
let response = series_or_groups_to_read_response(series.clone(), false);
let dumped_frames = dump_frames(&response.frames);
let expected_frames = vec![
"SeriesFrame, tags: _measurement=the_table,tag1=val1,_field=string_field, type: 4",
"SeriesFrame, tags: _field=string_field,_measurement=the_table,tag1=val1, type: 4",
"StringPointsFrame, timestamps: [2000, 3000], values: bar,baz",
"SeriesFrame, tags: _measurement=the_table,tag1=val1,_field=int_field, type: 1",
"SeriesFrame, tags: _field=int_field,_measurement=the_table,tag1=val1, type: 1",
"IntegerPointsFrame, timestamps: [2000, 3000], values: \"2,3\"",
"SeriesFrame, tags: _measurement=the_table,tag1=val1,_field=uint_field, type: 2",
"SeriesFrame, tags: _field=uint_field,_measurement=the_table,tag1=val1, type: 2",
"UnsignedPointsFrame, timestamps: [2000, 3000], values: \"22,33\"",
"SeriesFrame, tags: _measurement=the_table,tag1=val1,_field=float_field, type: 0",
"SeriesFrame, tags: _field=float_field,_measurement=the_table,tag1=val1, type: 0",
"FloatPointsFrame, timestamps: [2000, 3000], values: \"20.1,30.1\"",
"SeriesFrame, tags: _measurement=the_table,tag1=val1,_field=boolean_field, type: 3",
"SeriesFrame, tags: _field=boolean_field,_measurement=the_table,tag1=val1, type: 3",
"BooleanPointsFrame, timestamps: [2000, 3000], values: false,true",
];