refactor(query/functions/outputs): refactor to.go to use arrow data … (#2131)
* refactor(query/functions/outputs): refactor to.go to use arrow data structures * update flux to latest masterpull/10616/head
parent
30b2d9ca00
commit
6777b9676c
2
go.mod
2
go.mod
|
@ -78,7 +78,7 @@ require (
|
|||
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
|
||||
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/influxdata/flux v0.11.0
|
||||
github.com/influxdata/flux v0.11.1-0.20181220234653-14af560e394d
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
|
||||
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
|
||||
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -236,8 +236,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
|
|||
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/influxdata/flux v0.11.0 h1:HbnN00buPrE8xoiUY7zt9E0BCsdXrbp5w8BNOmjODk0=
|
||||
github.com/influxdata/flux v0.11.0/go.mod h1:crguqnTMQHaGEKp93vZH+pIyTVlJYqkv8bNqSMfc22A=
|
||||
github.com/influxdata/flux v0.11.1-0.20181220234653-14af560e394d h1:/Wb7hy8cF4+med09+HNXeGtAUlDPu2gjHhdJKg29CLw=
|
||||
github.com/influxdata/flux v0.11.1-0.20181220234653-14af560e394d/go.mod h1:crguqnTMQHaGEKp93vZH+pIyTVlJYqkv8bNqSMfc22A=
|
||||
github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg=
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
|
||||
|
|
|
@ -490,7 +490,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
|
|||
|
||||
measurementStats := make(map[string]Stats)
|
||||
measurementName := ""
|
||||
return tbl.Do(func(er flux.ColReader) error {
|
||||
return tbl.DoArrow(func(er flux.ArrowColReader) error {
|
||||
var pointTime time.Time
|
||||
var points models.Points
|
||||
var tags models.Tags
|
||||
|
@ -502,16 +502,16 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
|
|||
for j, col := range er.Cols() {
|
||||
switch {
|
||||
case col.Label == spec.MeasurementColumn:
|
||||
measurementName = er.Strings(j)[i]
|
||||
measurementName = string(er.Strings(j).Value(i))
|
||||
case col.Label == timeColLabel:
|
||||
pointTime = er.Times(j)[i].Time()
|
||||
pointTime = execute.ValueForRowArrow(er, i, j).Time().Time()
|
||||
case isTag[j]:
|
||||
if col.Type != flux.TString {
|
||||
return errors.New("invalid type for tag column")
|
||||
}
|
||||
// TODO(docmerlin): instead of doing this sort of thing, it would be nice if we had a way that allocated a lot less.
|
||||
// Note that tags are 2-tuples of key and then value.
|
||||
tags = append(tags, models.NewTag([]byte(col.Label), []byte(er.Strings(j)[i])))
|
||||
tags = append(tags, models.NewTag([]byte(col.Label), er.Strings(j).Value(i)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -567,7 +567,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
|
|||
return err
|
||||
}
|
||||
points = append(points, pt)
|
||||
if err := execute.AppendRecord(i, er, builder); err != nil {
|
||||
if err := execute.AppendRecordArrow(i, er, builder); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -576,7 +576,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
|
|||
})
|
||||
}
|
||||
|
||||
func defaultFieldMapping(er flux.ColReader, row int) (values.Object, error) {
|
||||
func defaultFieldMapping(er flux.ArrowColReader, row int) (values.Object, error) {
|
||||
fieldColumnIdx := execute.ColIdx(defaultFieldColLabel, er.Cols())
|
||||
valueColumnIdx := execute.ColIdx(execute.DefaultValueColLabel, er.Cols())
|
||||
|
||||
|
@ -588,29 +588,11 @@ func defaultFieldMapping(er flux.ColReader, row int) (values.Object, error) {
|
|||
return nil, errors.New("table has no _value column")
|
||||
}
|
||||
|
||||
var value values.Value
|
||||
valueColumnType := er.Cols()[valueColumnIdx].Type
|
||||
|
||||
switch valueColumnType {
|
||||
case flux.TFloat:
|
||||
value = values.NewFloat(er.Floats(valueColumnIdx)[row])
|
||||
case flux.TInt:
|
||||
value = values.NewInt(er.Ints(valueColumnIdx)[row])
|
||||
case flux.TUInt:
|
||||
value = values.NewUInt(er.UInts(valueColumnIdx)[row])
|
||||
case flux.TString:
|
||||
value = values.NewString(er.Strings(valueColumnIdx)[row])
|
||||
case flux.TTime:
|
||||
value = values.NewTime(er.Times(valueColumnIdx)[row])
|
||||
case flux.TBool:
|
||||
value = values.NewBool(er.Bools(valueColumnIdx)[row])
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported type %v for _value column", valueColumnType)
|
||||
}
|
||||
value := execute.ValueForRowArrow(er, row, valueColumnIdx)
|
||||
|
||||
fieldValueMapping := values.NewObject()
|
||||
field := er.Strings(fieldColumnIdx)[row]
|
||||
fieldValueMapping.Set(field, value)
|
||||
field := execute.ValueForRowArrow(er, row, fieldColumnIdx)
|
||||
fieldValueMapping.Set(field.Str(), value)
|
||||
|
||||
return fieldValueMapping, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue