test: table iterator tests for window first

pull/18745/head
jlapacik 2020-06-25 13:41:57 -07:00
parent 7039d87a4f
commit 0b52a48f83
1 changed files with 355 additions and 0 deletions

View File

@ -1193,6 +1193,361 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing
}
}
func TestStorageReader_ReadWindowFirst(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Tags: []*gen.TagValuesSpec{
{
TagKey: "t0",
Values: func() gen.CountableSequence {
return gen.NewCounterByteSequence("a%s", 0, 1)
},
},
},
}
spec := gen.Spec{
OrgID: org,
BucketID: bucket,
Measurements: []gen.MeasurementSpec{
{
Name: "m0",
TagsSpec: tagsSpec,
FieldValuesSpec: &gen.FieldValuesSpec{
Name: "f0",
TimeSequenceSpec: gen.TimeSequenceSpec{
Count: math.MaxInt32,
Delta: 5 * time.Second,
},
DataType: models.Integer,
Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
return gen.NewTimeIntegerValuesSequence(
spec.Count,
gen.NewTimestampSequenceFromSpec(spec),
gen.NewIntegerArrayValuesSequence([]int64{1, 2, 3, 4}),
)
},
},
},
},
}
tr := gen.TimeRange{
Start: mustParseTime("2019-11-25T00:00:00Z"),
End: mustParseTime("2019-11-25T00:01:00Z"),
}
return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
WindowEvery: int64(10 * time.Second),
Aggregates: []plan.ProcedureKind{
universe.FirstKind,
},
}, mem)
if err != nil {
t.Fatal(err)
}
makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {
return &executetest.Table{
KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TInt},
{Label: "_field", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "t0", Type: flux.TString},
},
Data: [][]interface{}{
{start, stop, time, v, "f0", "m0", "a0"},
},
}
}
want := []*executetest.Table{
makeWindowTable(Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:00Z"), 1),
makeWindowTable(Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:10Z"), 3),
makeWindowTable(Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:20Z"), 1),
makeWindowTable(Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:40Z"), Time("2019-11-25T00:00:30Z"), 3),
makeWindowTable(Time("2019-11-25T00:00:40Z"), Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:00:40Z"), 1),
makeWindowTable(Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:50Z"), 3),
}
executetest.NormalizeTables(want)
sort.Sort(executetest.SortedTables(want))
var got []*executetest.Table
if err := ti.Do(func(table flux.Table) error {
t, err := executetest.ConvertTable(table)
if err != nil {
return err
}
got = append(got, t)
return nil
}); err != nil {
t.Fatal(err)
}
executetest.NormalizeTables(got)
sort.Sort(executetest.SortedTables(got))
// compare these two
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
}
func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Tags: []*gen.TagValuesSpec{
{
TagKey: "t0",
Values: func() gen.CountableSequence {
return gen.NewCounterByteSequence("a%s", 0, 1)
},
},
},
}
spec := gen.Spec{
OrgID: org,
BucketID: bucket,
Measurements: []gen.MeasurementSpec{
{
Name: "m0",
TagsSpec: tagsSpec,
FieldValuesSpec: &gen.FieldValuesSpec{
Name: "f0",
TimeSequenceSpec: gen.TimeSequenceSpec{
Count: math.MaxInt32,
Delta: 20 * time.Second,
},
DataType: models.Integer,
Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
return gen.NewTimeIntegerValuesSequence(
spec.Count,
gen.NewTimestampSequenceFromSpec(spec),
gen.NewIntegerArrayValuesSequence([]int64{1, 2}),
)
},
},
},
},
}
tr := gen.TimeRange{
Start: mustParseTime("2019-11-25T00:00:00Z"),
End: mustParseTime("2019-11-25T00:01:00Z"),
}
return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
WindowEvery: int64(10 * time.Second),
Aggregates: []plan.ProcedureKind{
universe.FirstKind,
},
CreateEmpty: true,
}, mem)
if err != nil {
t.Fatal(err)
}
makeEmptyTable := func(start, stop values.Time) *executetest.Table {
return &executetest.Table{
KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},
KeyValues: []interface{}{start, stop, "f0", "m0", "a0"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TInt},
{Label: "_field", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "t0", Type: flux.TString},
},
Data: nil,
}
}
makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {
return &executetest.Table{
KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TInt},
{Label: "_field", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "t0", Type: flux.TString},
},
Data: [][]interface{}{
{start, stop, time, v, "f0", "m0", "a0"},
},
}
}
want := []*executetest.Table{
makeWindowTable(
Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:00Z"), 1,
),
makeEmptyTable(
Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:20Z"),
),
makeWindowTable(
Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:20Z"), 2,
),
makeEmptyTable(
Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:40Z"),
),
makeWindowTable(
Time("2019-11-25T00:00:40Z"), Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:00:40Z"), 1,
),
makeEmptyTable(
Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:01:00Z"),
),
}
executetest.NormalizeTables(want)
sort.Sort(executetest.SortedTables(want))
var got []*executetest.Table
if err := ti.Do(func(table flux.Table) error {
t, err := executetest.ConvertTable(table)
if err != nil {
return err
}
got = append(got, t)
return nil
}); err != nil {
t.Fatal(err)
}
executetest.NormalizeTables(got)
sort.Sort(executetest.SortedTables(got))
// compare these two
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
}
func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Tags: []*gen.TagValuesSpec{
{
TagKey: "t0",
Values: func() gen.CountableSequence {
return gen.NewCounterByteSequence("a%s", 0, 1)
},
},
},
}
spec := gen.Spec{
OrgID: org,
BucketID: bucket,
Measurements: []gen.MeasurementSpec{
{
Name: "m0",
TagsSpec: tagsSpec,
FieldValuesSpec: &gen.FieldValuesSpec{
Name: "f0",
TimeSequenceSpec: gen.TimeSequenceSpec{
Count: math.MaxInt32,
Delta: 20 * time.Second,
},
DataType: models.Integer,
Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
return gen.NewTimeIntegerValuesSequence(
spec.Count,
gen.NewTimestampSequenceFromSpec(spec),
gen.NewIntegerArrayValuesSequence([]int64{1, 2}),
)
},
},
},
},
}
tr := gen.TimeRange{
Start: mustParseTime("2019-11-25T00:00:00Z"),
End: mustParseTime("2019-11-25T00:01:00Z"),
}
return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
WindowEvery: int64(10 * time.Second),
Aggregates: []plan.ProcedureKind{
universe.FirstKind,
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
if err != nil {
t.Fatal(err)
}
want := []*executetest.Table{
&executetest.Table{
KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TInt},
{Label: "_field", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "t0", Type: flux.TString},
},
Data: [][]interface{}{
{Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:10Z"), int64(1), "f0", "m0", "a0"},
{Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:30Z"), int64(2), "f0", "m0", "a0"},
{Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:50Z"), int64(1), "f0", "m0", "a0"},
},
},
}
executetest.NormalizeTables(want)
sort.Sort(executetest.SortedTables(want))
var got []*executetest.Table
if err := ti.Do(func(table flux.Table) error {
t, err := executetest.ConvertTable(table)
if err != nil {
return err
}
got = append(got, t)
return nil
}); err != nil {
t.Fatal(err)
}
executetest.NormalizeTables(got)
sort.Sort(executetest.SortedTables(got))
// compare these two
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
}
func BenchmarkReadFilter(b *testing.B) {
setupFn := func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{