diff --git a/query/functions/outputs/to.go b/query/functions/outputs/to.go index d7c0e1a747..c87ce3b364 100644 --- a/query/functions/outputs/to.go +++ b/query/functions/outputs/to.go @@ -66,7 +66,7 @@ func init() { []string{}, ) - flux.RegisterFunction(ToKind, createToOpSpec, toSignature) + flux.RegisterFunctionWithSideEffect(ToKind, createToOpSpec, toSignature) flux.RegisterOpSpec(ToKind, func() flux.OperationSpec { return &ToOpSpec{} }) plan.RegisterProcedureSpecWithSideEffect(ToKind, newToProcedure, ToKind) execute.RegisterTransformation(ToKind, createToTransformation) @@ -398,6 +398,33 @@ func (d ToDependencies) Validate() error { return nil } +type Stats struct { + NRows int + Latest time.Time + Earliest time.Time + NFields int + NTags int +} + +func (s Stats) Update(o Stats) { + s.NRows += o.NRows + if s.Latest.IsZero() || o.Latest.Unix() > s.Latest.Unix() { + s.Latest = o.Latest + } + + if s.Earliest.IsZero() || o.Earliest.Unix() < s.Earliest.Unix() { + s.Earliest = o.Earliest + } + + if o.NFields > s.NFields { + s.NFields = o.NFields + } + + if o.NTags > s.NTags { + s.NTags = o.NTags + } +} + func writeTable(t *ToTransformation, tbl flux.Table) error { var bucketID, orgID *platform.ID var err error @@ -454,6 +481,14 @@ func writeTable(t *ToTransformation, tbl flux.Table) error { } + builder, new := t.cache.TableBuilder(tbl.Key()) + if new { + if err := execute.AddTableCols(tbl, builder); err != nil { + return err + } + } + + measurementStats := make(map[string]Stats) measurementName := "" return tbl.Do(func(er flux.ColReader) error { var pointTime time.Time @@ -513,11 +548,28 @@ func writeTable(t *ToTransformation, tbl flux.Table) error { } }) + mstats := Stats{ + NRows: 1, + Latest: pointTime, + Earliest: pointTime, + NFields: len(fields), + NTags: len(tags), + } + _, ok := measurementStats[measurementName] + if !ok { + measurementStats[measurementName] = mstats + } else { + measurementStats[measurementName].Update(mstats) + } + pt, err := models.NewPoint(measurementName, tags, fields, pointTime) if err != nil { return err } points = append(points, pt) + if err := execute.AppendRecord(i, er, builder); err != nil { + return err + } } points, err = tsdb.ExplodePoints(*orgID, *bucketID, points) return d.PointsWriter.WritePoints(points) diff --git a/query/functions/outputs/to_test.go b/query/functions/outputs/to_test.go index 4c98b4c5bf..031550a27a 100644 --- a/query/functions/outputs/to_test.go +++ b/query/functions/outputs/to_test.go @@ -110,6 +110,7 @@ func TestTo_Process(t *testing.T) { bid, _ := (mockBucketLookup{}).Lookup(oid, "my-bucket") type wanted struct { result *mock.PointsWriter + tables []*executetest.Table } testCases := []struct { name string @@ -152,6 +153,23 @@ b _value=1.0 21 a _value=3.0 31 c _value=4.0 41`), }, + tables: []*executetest.Table{{ + ColMeta: []flux.ColMeta{ + {Label: "_start", Type: flux.TTime}, + {Label: "_stop", Type: flux.TTime}, + {Label: "_time", Type: flux.TTime}, + {Label: "_measurement", Type: flux.TString}, + {Label: "_field", Type: flux.TString}, + {Label: "_value", Type: flux.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0}, + {execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0}, + {execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0}, + {execute.Time(0), execute.Time(100), execute.Time(31), "a", "_value", 3.0}, + {execute.Time(0), execute.Time(100), execute.Time(41), "c", "_value", 4.0}, + }, + }}, }, }, { @@ -188,6 +206,22 @@ b,tag2=cc _value=1.0 21 a,tag2=dd _value=3.0 31 c,tag2=ee _value=4.0 41`), }, + tables: []*executetest.Table{{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "tag1", Type: flux.TString}, + {Label: "tag2", Type: flux.TString}, + {Label: "_field", Type: flux.TString}, + {Label: "_value", Type: flux.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(11), "a", "aa", "_value", 2.0}, + {execute.Time(21), "a", "bb", "_value", 2.0}, + {execute.Time(21), "b", "cc", "_value", 1.0}, + {execute.Time(31), "a", "dd", "_value", 3.0}, + {execute.Time(41), "c", "ee", "_value", 4.0}, + }, + }}, }, }, { @@ -226,6 +260,23 @@ m,tag1=b,tag2=cc _value=1.0 21 m,tag1=a,tag2=dd _value=3.0 31 m,tag1=c,tag2=ee _value=4.0 41`), }, + tables: []*executetest.Table{{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_measurement", Type: flux.TString}, + {Label: "_field", Type: flux.TString}, + {Label: "_value", Type: flux.TFloat}, + {Label: "tag1", Type: flux.TString}, + {Label: "tag2", Type: flux.TString}, + }, + Data: [][]interface{}{ + {execute.Time(11), "m", "_value", 2.0, "a", "aa"}, + {execute.Time(21), "m", "_value", 2.0, "a", "bb"}, + {execute.Time(21), "m", "_value", 1.0, "b", "cc"}, + {execute.Time(31), "m", "_value", 3.0, "a", "dd"}, + {execute.Time(41), "m", "_value", 4.0, "c", "ee"}, + }, + }}, }, }, { @@ -282,6 +333,20 @@ b temperature=1.0 21 a temperature=3.0 31 c temperature=4.0 41`), }, + tables: []*executetest.Table{{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_measurement", Type: flux.TString}, + {Label: "temperature", Type: flux.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(11), "a", 2.0}, + {execute.Time(21), "a", 2.0}, + {execute.Time(21), "b", 1.0}, + {execute.Time(31), "a", 3.0}, + {execute.Time(41), "c", 4.0}, + }, + }}, }, }, { @@ -368,6 +433,22 @@ b day="Wednesday",humidity=4.0,ratio=0.25,temperature=1.0 21 a day="Thursday",humidity=3.0,ratio=1.0,temperature=3.0 31 c day="Friday",humidity=5.0,ratio=0.8,temperature=4.0 41`), }, + tables: []*executetest.Table{{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "day", Type: flux.TString}, + {Label: "tag", Type: flux.TString}, + {Label: "temperature", Type: flux.TFloat}, + {Label: "humidity", Type: flux.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(11), "Monday", "a", 2.0, 1.0}, + {execute.Time(21), "Tuesday", "a", 2.0, 2.0}, + {execute.Time(21), "Wednesday", "b", 1.0, 4.0}, + {execute.Time(31), "Thursday", "a", 3.0, 3.0}, + {execute.Time(41), "Friday", "c", 4.0, 5.0}, + }, + }}, }, }, { @@ -438,6 +519,26 @@ b,tag2=d humidity=50i,temperature=1.0 21 a,tag2=e humidity=60i,temperature=3.0 31 c,tag2=e humidity=65i,temperature=4.0 41`), }, + tables: []*executetest.Table{{ + ColMeta: []flux.ColMeta{ + {Label: "_start", Type: flux.TTime}, + {Label: "_stop", Type: flux.TTime}, + {Label: "_time", Type: flux.TTime}, + {Label: "tag1", Type: flux.TString}, + {Label: "tag2", Type: flux.TString}, + {Label: "other-string-column", Type: flux.TString}, + {Label: "temperature", Type: flux.TFloat}, + {Label: "humidity", Type: flux.TInt}, + {Label: "other-value-column", Type: flux.TFloat}, + }, + Data: [][]interface{}{ + {execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0}, + {execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0}, + {execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0}, + {execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0}, + {execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0}, + }, + }}, }, }, } @@ -449,7 +550,7 @@ c,tag2=e humidity=65i,temperature=4.0 41`), executetest.ProcessTestHelper( t, tc.data, - nil, + tc.want.tables, nil, func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation { newT, _ := outputs.NewToTransformation(d, c, tc.spec, deps)