add output to to function (#1282)

* added (unused) stats struct, and piped written data to output table

* updated to tests since to returns data now

* changes based on review
pull/10616/head
Adam 2018-11-02 13:40:25 -04:00 committed by GitHub
parent 3fed14dabc
commit 612005cad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 155 additions and 2 deletions

View File

@ -66,7 +66,7 @@ func init() {
[]string{},
)
flux.RegisterFunction(ToKind, createToOpSpec, toSignature)
flux.RegisterFunctionWithSideEffect(ToKind, createToOpSpec, toSignature)
flux.RegisterOpSpec(ToKind, func() flux.OperationSpec { return &ToOpSpec{} })
plan.RegisterProcedureSpecWithSideEffect(ToKind, newToProcedure, ToKind)
execute.RegisterTransformation(ToKind, createToTransformation)
@ -398,6 +398,33 @@ func (d ToDependencies) Validate() error {
return nil
}
type Stats struct {
NRows int
Latest time.Time
Earliest time.Time
NFields int
NTags int
}
func (s Stats) Update(o Stats) {
s.NRows += o.NRows
if s.Latest.IsZero() || o.Latest.Unix() > s.Latest.Unix() {
s.Latest = o.Latest
}
if s.Earliest.IsZero() || o.Earliest.Unix() < s.Earliest.Unix() {
s.Earliest = o.Earliest
}
if o.NFields > s.NFields {
s.NFields = o.NFields
}
if o.NTags > s.NTags {
s.NTags = o.NTags
}
}
func writeTable(t *ToTransformation, tbl flux.Table) error {
var bucketID, orgID *platform.ID
var err error
@ -454,6 +481,14 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
}
builder, new := t.cache.TableBuilder(tbl.Key())
if new {
if err := execute.AddTableCols(tbl, builder); err != nil {
return err
}
}
measurementStats := make(map[string]Stats)
measurementName := ""
return tbl.Do(func(er flux.ColReader) error {
var pointTime time.Time
@ -513,11 +548,28 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
}
})
mstats := Stats{
NRows: 1,
Latest: pointTime,
Earliest: pointTime,
NFields: len(fields),
NTags: len(tags),
}
_, ok := measurementStats[measurementName]
if !ok {
measurementStats[measurementName] = mstats
} else {
measurementStats[measurementName].Update(mstats)
}
pt, err := models.NewPoint(measurementName, tags, fields, pointTime)
if err != nil {
return err
}
points = append(points, pt)
if err := execute.AppendRecord(i, er, builder); err != nil {
return err
}
}
points, err = tsdb.ExplodePoints(*orgID, *bucketID, points)
return d.PointsWriter.WritePoints(points)

View File

@ -110,6 +110,7 @@ func TestTo_Process(t *testing.T) {
bid, _ := (mockBucketLookup{}).Lookup(oid, "my-bucket")
type wanted struct {
result *mock.PointsWriter
tables []*executetest.Table
}
testCases := []struct {
name string
@ -152,6 +153,23 @@ b _value=1.0 21
a _value=3.0 31
c _value=4.0 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_value", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_value", 4.0},
},
}},
},
},
{
@ -188,6 +206,22 @@ b,tag2=cc _value=1.0 21
a,tag2=dd _value=3.0 31
c,tag2=ee _value=4.0 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "bb", "_value", 2.0},
{execute.Time(21), "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "dd", "_value", 3.0},
{execute.Time(41), "c", "ee", "_value", 4.0},
},
}},
},
},
{
@ -226,6 +260,23 @@ m,tag1=b,tag2=cc _value=1.0 21
m,tag1=a,tag2=dd _value=3.0 31
m,tag1=c,tag2=ee _value=4.0 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "a", "aa"},
{execute.Time(21), "m", "_value", 2.0, "a", "bb"},
{execute.Time(21), "m", "_value", 1.0, "b", "cc"},
{execute.Time(31), "m", "_value", 3.0, "a", "dd"},
{execute.Time(41), "m", "_value", 4.0, "c", "ee"},
},
}},
},
},
{
@ -282,6 +333,20 @@ b temperature=1.0 21
a temperature=3.0 31
c temperature=4.0 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", 2.0},
{execute.Time(21), "a", 2.0},
{execute.Time(21), "b", 1.0},
{execute.Time(31), "a", 3.0},
{execute.Time(41), "c", 4.0},
},
}},
},
},
{
@ -368,6 +433,22 @@ b day="Wednesday",humidity=4.0,ratio=0.25,temperature=1.0 21
a day="Thursday",humidity=3.0,ratio=1.0,temperature=3.0 31
c day="Friday",humidity=5.0,ratio=0.8,temperature=4.0 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "day", Type: flux.TString},
{Label: "tag", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "Monday", "a", 2.0, 1.0},
{execute.Time(21), "Tuesday", "a", 2.0, 2.0},
{execute.Time(21), "Wednesday", "b", 1.0, 4.0},
{execute.Time(31), "Thursday", "a", 3.0, 3.0},
{execute.Time(41), "Friday", "c", 4.0, 5.0},
},
}},
},
},
{
@ -438,6 +519,26 @@ b,tag2=d humidity=50i,temperature=1.0 21
a,tag2=e humidity=60i,temperature=3.0 31
c,tag2=e humidity=65i,temperature=4.0 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "other-string-column", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TInt},
{Label: "other-value-column", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
},
}},
},
},
}
@ -449,7 +550,7 @@ c,tag2=e humidity=65i,temperature=4.0 41`),
executetest.ProcessTestHelper(
t,
tc.data,
nil,
tc.want.tables,
nil,
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
newT, _ := outputs.NewToTransformation(d, c, tc.spec, deps)