test: launcher tests for window first
parent
3958c76604
commit
7039d87a4f
|
@ -447,6 +447,22 @@ func (tl *TestLauncher) Metrics(tb testing.TB) (metrics map[string]*dto.MetricFa
|
|||
return metrics
|
||||
}
|
||||
|
||||
func (tl *TestLauncher) NumReads(tb testing.TB, op string) uint64 {
|
||||
const metricName = "query_influxdb_source_read_request_duration_seconds"
|
||||
mf := tl.Metrics(tb)[metricName]
|
||||
if mf != nil {
|
||||
fmt.Printf("%v\n", mf)
|
||||
for _, m := range mf.Metric {
|
||||
for _, label := range m.Label {
|
||||
if label.GetName() == "op" && label.GetValue() == op {
|
||||
return m.Histogram.GetSampleCount()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// QueryResult wraps a single flux.Result with some helper methods.
|
||||
type QueryResult struct {
|
||||
t *testing.T
|
||||
|
|
|
@ -748,63 +748,226 @@ from(bucket: "%s")
|
|||
}
|
||||
}
|
||||
|
||||
func TestLauncher_Query_PushDownWindowAggregateAndBareAggregate(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--feature-flags", "pushDownWindowAggregateCount=true,pushDownWindowAggregateSum=true")
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
l.WritePointsOrFail(t, `
|
||||
m0,k=k0 f=0i 0
|
||||
m0,k=k0 f=1i 1000000000
|
||||
m0,k=k0 f=2i 2000000000
|
||||
m0,k=k0 f=3i 3000000000
|
||||
m0,k=k0 f=4i 4000000000
|
||||
m0,k=k0 f=5i 5000000000
|
||||
m0,k=k0 f=6i 6000000000
|
||||
m0,k=k0 f=5i 7000000000
|
||||
m0,k=k0 f=0i 8000000000
|
||||
m0,k=k0 f=6i 9000000000
|
||||
m0,k=k0 f=6i 10000000000
|
||||
m0,k=k0 f=7i 11000000000
|
||||
m0,k=k0 f=5i 12000000000
|
||||
m0,k=k0 f=8i 13000000000
|
||||
m0,k=k0 f=9i 14000000000
|
||||
m0,k=k0 f=5i 15000000000
|
||||
`)
|
||||
|
||||
getReadRequestCount := func(op string) uint64 {
|
||||
const metricName = "query_influxdb_source_read_request_duration_seconds"
|
||||
mf := l.Metrics(t)[metricName]
|
||||
if mf != nil {
|
||||
fmt.Printf("%v\n", mf)
|
||||
for _, m := range mf.Metric {
|
||||
for _, label := range m.Label {
|
||||
if label.GetName() == "op" && label.GetValue() == op {
|
||||
return m.Histogram.GetSampleCount()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
q string
|
||||
op string
|
||||
res string
|
||||
func TestQueryPushDowns(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
data []string
|
||||
query string
|
||||
op string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "window first",
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:05Z, stop: 1970-01-01T00:00:20Z)
|
||||
|> window(every: 3s)
|
||||
|> first()
|
||||
`,
|
||||
op: "readWindow(first)",
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:06Z,1970-01-01T00:00:05Z,5,f,m0,k0
|
||||
,,1,1970-01-01T00:00:06Z,1970-01-01T00:00:09Z,1970-01-01T00:00:06Z,6,f,m0,k0
|
||||
,,2,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,6,f,m0,k0
|
||||
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0
|
||||
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "window first string",
|
||||
data: []string{
|
||||
"m,tag=a f=\"c\" 2000000000",
|
||||
"m,tag=a f=\"d\" 3000000000",
|
||||
"m,tag=a f=\"h\" 7000000000",
|
||||
"m,tag=a f=\"i\" 8000000000",
|
||||
"m,tag=a f=\"j\" 9000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:10Z)
|
||||
|> window(every: 5s)
|
||||
|> first()
|
||||
`,
|
||||
op: "readWindow(first)",
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
|
||||
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:05Z,1970-01-01T00:00:02Z,c,f,m,a
|
||||
,,1,1970-01-01T00:00:05Z,1970-01-01T00:00:10Z,1970-01-01T00:00:07Z,h,f,m,a
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "bare first",
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:05Z, stop: 1970-01-01T00:00:20Z)
|
||||
|> first()
|
||||
`,
|
||||
op: "readWindow(first)",
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:05Z,5,f,m0,k0
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "window empty first",
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:02Z)
|
||||
|> window(every: 500ms, createEmpty: true)
|
||||
|> first()
|
||||
`,
|
||||
op: "readWindow(first)",
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
,_result,0,1970-01-01T00:00:00Z,1970-01-01T00:00:00.5Z,1970-01-01T00:00:00Z,0,f,m0,k0
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,1,1970-01-01T00:00:00.5Z,1970-01-01T00:00:01Z,,,f,m0,k0
|
||||
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
,_result,2,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,1970-01-01T00:00:01Z,1,f,m0,k0
|
||||
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,3,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,,,f,m0,k0
|
||||
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "window aggregate first",
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:02Z)
|
||||
|> aggregateWindow(every: 500ms, fn: first)
|
||||
`,
|
||||
op: "readWindow(first)",
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,true,true,false,false,true,true,true
|
||||
#default,_result,,,,,,,,
|
||||
,result,table,_start,_stop,_time,_value,_field,_measurement,k
|
||||
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:00.5Z,0,f,m0,k0
|
||||
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:01.5Z,1,f,m0,k0
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "count",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> aggregateWindow(every: 5s, fn: count)
|
||||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
op: "readWindow(count)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,
|
||||
|
@ -816,14 +979,32 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "bare count",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> count()
|
||||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
op: "readWindow(count)",
|
||||
res: `
|
||||
want: `
|
||||
#group,false,false,false,true,true,true
|
||||
#datatype,string,long,long,string,string,string
|
||||
#default,_result,,,,,
|
||||
|
@ -833,14 +1014,32 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "sum",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> aggregateWindow(every: 5s, fn: sum)
|
||||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
op: "readWindow(sum)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,long,string,string,string
|
||||
#group,false,false,false,false,true,true,true
|
||||
#default,_result,,,,,,
|
||||
|
@ -852,14 +1051,32 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "bare sum",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0 f=0i 0",
|
||||
"m0,k=k0 f=1i 1000000000",
|
||||
"m0,k=k0 f=2i 2000000000",
|
||||
"m0,k=k0 f=3i 3000000000",
|
||||
"m0,k=k0 f=4i 4000000000",
|
||||
"m0,k=k0 f=5i 5000000000",
|
||||
"m0,k=k0 f=6i 6000000000",
|
||||
"m0,k=k0 f=5i 7000000000",
|
||||
"m0,k=k0 f=0i 8000000000",
|
||||
"m0,k=k0 f=6i 9000000000",
|
||||
"m0,k=k0 f=6i 10000000000",
|
||||
"m0,k=k0 f=7i 11000000000",
|
||||
"m0,k=k0 f=5i 12000000000",
|
||||
"m0,k=k0 f=8i 13000000000",
|
||||
"m0,k=k0 f=9i 14000000000",
|
||||
"m0,k=k0 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> sum()
|
||||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
op: "readWindow(sum)",
|
||||
res: `
|
||||
want: `
|
||||
#group,false,false,false,true,true,true
|
||||
#datatype,string,long,long,string,string,string
|
||||
#default,_result,,,,,
|
||||
|
@ -867,93 +1084,27 @@ from(bucket: v.bucket)
|
|||
,,0,67,f,m0,k0
|
||||
`,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
wantCount := getReadRequestCount(tt.op) + 1
|
||||
|
||||
prelude := fmt.Sprintf("v = {bucket: \"%s\", timeRangeStart: 1970-01-01T00:00:00Z, timeRangeStop: 1970-01-01T00:00:15Z}", l.Bucket.Name)
|
||||
queryStr := prelude + "\n" + tt.q
|
||||
res := l.MustExecuteQuery(queryStr)
|
||||
defer res.Done()
|
||||
got := flux.NewSliceResultIterator(res.Results)
|
||||
defer got.Release()
|
||||
|
||||
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
|
||||
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tt.res)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer want.Release()
|
||||
|
||||
if err := executetest.EqualResultIterators(want, got); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if want, got := wantCount, getReadRequestCount(tt.op); want != got {
|
||||
t.Fatalf("unexpected sample count -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLauncher_Query_PushDownGroupAggregate(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateCount=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateSum=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateFirst=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateLast=true",
|
||||
)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
l.WritePointsOrFail(t, `
|
||||
m0,k=k0,kk=kk0 f=0i 0
|
||||
m0,k=k0,kk=kk1 f=1i 1000000000
|
||||
m0,k=k0,kk=kk0 f=2i 2000000000
|
||||
m0,k=k0,kk=kk1 f=3i 3000000000
|
||||
m0,k=k0,kk=kk0 f=4i 4000000000
|
||||
m0,k=k0,kk=kk1 f=5i 5000000000
|
||||
m0,k=k0,kk=kk0 f=6i 6000000000
|
||||
m0,k=k0,kk=kk1 f=5i 7000000000
|
||||
m0,k=k0,kk=kk0 f=0i 8000000000
|
||||
m0,k=k0,kk=kk1 f=6i 9000000000
|
||||
m0,k=k0,kk=kk0 f=6i 10000000000
|
||||
m0,k=k0,kk=kk1 f=7i 11000000000
|
||||
m0,k=k0,kk=kk0 f=5i 12000000000
|
||||
m0,k=k0,kk=kk1 f=8i 13000000000
|
||||
m0,k=k0,kk=kk0 f=9i 14000000000
|
||||
m0,k=k0,kk=kk1 f=5i 15000000000
|
||||
`)
|
||||
|
||||
getReadRequestCount := func(op string) uint64 {
|
||||
const metricName = "query_influxdb_source_read_request_duration_seconds"
|
||||
mf := l.Metrics(t)[metricName]
|
||||
if mf != nil {
|
||||
fmt.Printf("%v\n", mf)
|
||||
for _, m := range mf.Metric {
|
||||
for _, label := range m.Label {
|
||||
if label.GetName() == "op" && label.GetValue() == op {
|
||||
return m.Histogram.GetSampleCount()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
q string
|
||||
op string
|
||||
res string
|
||||
}{
|
||||
{
|
||||
name: "group first",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 0)
|
||||
|> group(columns: ["k"])
|
||||
|
@ -961,7 +1112,7 @@ from(bucket: v.bucket)
|
|||
|> keep(columns: ["_time", "_value"])
|
||||
`,
|
||||
op: "readGroup(first)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,long
|
||||
#group,false,false,false,false
|
||||
#default,_result,,,
|
||||
|
@ -971,7 +1122,25 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "group none first",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 0)
|
||||
|> group()
|
||||
|
@ -979,7 +1148,7 @@ from(bucket: v.bucket)
|
|||
|> keep(columns: ["_time", "_value"])
|
||||
`,
|
||||
op: "readGroup(first)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,long
|
||||
#group,false,false,false,false
|
||||
#default,_result,,,
|
||||
|
@ -989,7 +1158,25 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "group last",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 0)
|
||||
|> group(columns: ["k"])
|
||||
|
@ -997,7 +1184,7 @@ from(bucket: v.bucket)
|
|||
|> keep(columns: ["_time", "_value"])
|
||||
`,
|
||||
op: "readGroup(last)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,long
|
||||
#group,false,false,false,false
|
||||
#default,_result,,,
|
||||
|
@ -1007,7 +1194,25 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "group none last",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 0)
|
||||
|> group()
|
||||
|
@ -1015,7 +1220,7 @@ from(bucket: v.bucket)
|
|||
|> keep(columns: ["_time", "_value"])
|
||||
`,
|
||||
op: "readGroup(last)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,dateTime:RFC3339,long
|
||||
#group,false,false,false,false
|
||||
#default,_result,,,
|
||||
|
@ -1025,7 +1230,25 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "count group none",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> group()
|
||||
|
@ -1033,7 +1256,7 @@ from(bucket: v.bucket)
|
|||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
op: "readGroup(count)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,long
|
||||
#group,false,false,false
|
||||
#default,_result,,
|
||||
|
@ -1043,15 +1266,33 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "count group",
|
||||
op: "readGroup(count)",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
op: "readGroup(count)",
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> group(columns: ["kk"])
|
||||
|> count()
|
||||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,string,long
|
||||
#group,false,false,true,false
|
||||
#default,_result,,,
|
||||
|
@ -1062,7 +1303,25 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "sum group none",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> group()
|
||||
|
@ -1070,7 +1329,7 @@ from(bucket: v.bucket)
|
|||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
op: "readGroup(sum)",
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,long
|
||||
#group,false,false,false
|
||||
#default,_result,,
|
||||
|
@ -1080,15 +1339,33 @@ from(bucket: v.bucket)
|
|||
},
|
||||
{
|
||||
name: "sum group",
|
||||
op: "readGroup(sum)",
|
||||
q: `
|
||||
data: []string{
|
||||
"m0,k=k0,kk=kk0 f=0i 0",
|
||||
"m0,k=k0,kk=kk1 f=1i 1000000000",
|
||||
"m0,k=k0,kk=kk0 f=2i 2000000000",
|
||||
"m0,k=k0,kk=kk1 f=3i 3000000000",
|
||||
"m0,k=k0,kk=kk0 f=4i 4000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 5000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 6000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 7000000000",
|
||||
"m0,k=k0,kk=kk0 f=0i 8000000000",
|
||||
"m0,k=k0,kk=kk1 f=6i 9000000000",
|
||||
"m0,k=k0,kk=kk0 f=6i 10000000000",
|
||||
"m0,k=k0,kk=kk1 f=7i 11000000000",
|
||||
"m0,k=k0,kk=kk0 f=5i 12000000000",
|
||||
"m0,k=k0,kk=kk1 f=8i 13000000000",
|
||||
"m0,k=k0,kk=kk0 f=9i 14000000000",
|
||||
"m0,k=k0,kk=kk1 f=5i 15000000000",
|
||||
},
|
||||
op: "readGroup(sum)",
|
||||
query: `
|
||||
from(bucket: v.bucket)
|
||||
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|
||||
|> group(columns: ["kk"])
|
||||
|> sum()
|
||||
|> drop(columns: ["_start", "_stop"])
|
||||
`,
|
||||
res: `
|
||||
want: `
|
||||
#datatype,string,long,string,long
|
||||
#group,false,false,true,false
|
||||
#default,_result,,,
|
||||
|
@ -1097,29 +1374,50 @@ from(bucket: v.bucket)
|
|||
,,1,kk1,35
|
||||
`,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
wantCount := getReadRequestCount(tt.op) + 1
|
||||
}
|
||||
for _, tc := range testcases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, nil,
|
||||
"--feature-flags",
|
||||
"pushDownWindowAggregateCount=true",
|
||||
"--feature-flags",
|
||||
"pushDownWindowAggregateSum=true",
|
||||
"--feature-flags",
|
||||
"pushDownWindowAggregateFirst=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateCount=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateSum=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateFirst=true",
|
||||
"--feature-flags",
|
||||
"pushDownGroupAggregateLast=true",
|
||||
)
|
||||
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
l.WritePointsOrFail(t, strings.Join(tc.data, "\n"))
|
||||
|
||||
queryStr := "v = {bucket: " + "\"" + l.Bucket.Name + "\"" + "}\n" + tc.query
|
||||
|
||||
prelude := fmt.Sprintf("v = {bucket: \"%s\", timeRangeStart: 1970-01-01T00:00:00Z, timeRangeStop: 1970-01-01T00:00:15Z}", l.Bucket.Name)
|
||||
queryStr := prelude + "\n" + tt.q
|
||||
res := l.MustExecuteQuery(queryStr)
|
||||
defer res.Done()
|
||||
got := flux.NewSliceResultIterator(res.Results)
|
||||
defer got.Release()
|
||||
|
||||
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
|
||||
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tt.res)))
|
||||
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tc.want)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer want.Release()
|
||||
|
||||
if err := executetest.EqualResultIterators(want, got); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if want, got := wantCount, getReadRequestCount(tt.op); want != got {
|
||||
if want, got := uint64(1), l.NumReads(t, tc.op); want != got {
|
||||
t.Fatalf("unexpected sample count -want/+got:\n\t- %d\n\t+ %d", want, got)
|
||||
}
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue