feat: add arguments to flux to function (#20873)

Also remove some dead code

Closes #20853
pull/20889/head
Sam Arnold 2021-03-05 12:25:52 -04:00 committed by GitHub
parent d6f7716924
commit 7a992dac77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 798 additions and 107 deletions

View File

@ -24,7 +24,7 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
RUN gem install fpm
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain stable -y
# setup environment
ENV GO_VERSION 1.13.8

View File

@ -24,7 +24,7 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
RUN gem install fpm
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain stable -y
# setup environment
ENV GO_VERSION 1.13

View File

@ -1,57 +0,0 @@
package client
import (
"crypto/tls"
"net/http"
"net/url"
)
const (
fluxPath = "/api/v2/query"
)
// Shared transports for all clients to prevent leaking connections
var (
skipVerifyTransport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
defaultTransport = &http.Transport{}
)
// HTTP implements a Flux query client that makes requests to the /api/v2/query
// API endpoint.
type HTTP struct {
Addr string
Username string
Password string
InsecureSkipVerify bool
url *url.URL
}
// NewHTTP creates a HTTP client
func NewHTTP(u url.URL) (*HTTP, error) {
return &HTTP{url: &u}, nil
}
// Query runs a flux query against a influx server and decodes the result
func (s *HTTP) Do(hreq *http.Request) (*http.Response, error) {
if s.Username != "" {
hreq.SetBasicAuth(s.Username, s.Password)
}
hreq.Header.Set("Content-Type", "application/json")
hreq.Header.Set("Accept", "text/csv")
hc := newClient(s.url.Scheme, s.InsecureSkipVerify)
return hc.Do(hreq)
}
func newClient(scheme string, insecure bool) *http.Client {
hc := &http.Client{
Transport: defaultTransport,
}
if scheme == "https" && insecure {
hc.Transport = skipVerifyTransport
}
return hc
}

View File

@ -3,11 +3,14 @@ package influxdb
import (
"context"
"fmt"
"sort"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/compiler"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/semantic"
@ -19,12 +22,19 @@ import (
)
const (
ToKind = "influx1x/toKind"
DefaultBufferSize = 5000
ToKind = "influx1x/toKind"
DefaultBufferSize = 5000
DefaultFieldColLabel = "_field"
DefaultMeasurementColLabel = "_measurement"
)
// ToOpSpec is the flux.OperationSpec for the `to` flux function.
type ToOpSpec struct {
Bucket string `json:"bucket"`
Bucket string `json:"bucket"`
TimeColumn string `json:"timeColumn"`
MeasurementColumn string `json:"measurementColumn"`
TagColumns []string `json:"tagColumns"`
FieldFn interpreter.ResolvedFunction `json:"fieldFn"`
}
func init() {
@ -35,7 +45,7 @@ func init() {
execute.RegisterTransformation(ToKind, createToTransformation)
}
var unsupportedToArgs = []string{"bucketID", "orgID", "host", "timeColumn", "measurementColumn", "tagColumns", "fieldFn"}
var unsupportedToArgs = []string{"bucketID", "orgID", "host", "url", "brokers"}
func (o *ToOpSpec) ReadArgs(args flux.Arguments) error {
var err error
@ -53,7 +63,29 @@ func (o *ToOpSpec) ReadArgs(args flux.Arguments) error {
if o.Bucket, err = args.GetRequiredString("bucket"); err != nil {
return err
}
if o.TimeColumn, ok, _ = args.GetString("timeColumn"); !ok {
o.TimeColumn = execute.DefaultTimeColLabel
}
if o.MeasurementColumn, ok, _ = args.GetString("measurementColumn"); !ok {
o.MeasurementColumn = DefaultMeasurementColLabel
}
if tags, ok, _ := args.GetArray("tagColumns", semantic.String); ok {
o.TagColumns = make([]string, tags.Len())
tags.Sort(func(i, j values.Value) bool {
return i.Str() < j.Str()
})
tags.Range(func(i int, v values.Value) {
o.TagColumns[i] = v.Str()
})
}
if fieldFn, ok, _ := args.GetFunction("fieldFn"); ok {
if o.FieldFn, err = interpreter.ResolveFunction(fieldFn); err != nil {
return err
}
}
return nil
}
@ -88,7 +120,11 @@ func (o *ToProcedureSpec) Copy() plan.ProcedureSpec {
s := o.Spec
res := &ToProcedureSpec{
Spec: &ToOpSpec{
Bucket: s.Bucket,
Bucket: s.Bucket,
TimeColumn: s.TimeColumn,
MeasurementColumn: s.MeasurementColumn,
TagColumns: append([]string(nil), s.TagColumns...),
FieldFn: s.FieldFn.Copy(),
},
}
return res
@ -131,13 +167,16 @@ func createToTransformation(id execute.DatasetID, mode execute.AccumulationMode,
type ToTransformation struct {
execute.ExecutionNode
Ctx context.Context
DB string
RP string
d execute.Dataset
cache execute.TableBuilderCache
deps StorageDependencies
buf *coordinator.BufferedPointsWriter
Ctx context.Context
DB string
RP string
spec *ToProcedureSpec
isTagColumn func(column flux.ColMeta) bool
fn *execute.RowMapFn
d execute.Dataset
cache execute.TableBuilderCache
deps StorageDependencies
buf *coordinator.BufferedPointsWriter
}
func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, toSpec *ToProcedureSpec, deps StorageDependencies) (*ToTransformation, error) {
@ -147,14 +186,61 @@ func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.T
if err != nil {
return nil, err
}
var fn *execute.RowMapFn
if spec.FieldFn.Fn != nil {
fn = execute.NewRowMapFn(spec.FieldFn.Fn, compiler.ToScope(spec.FieldFn.Scope))
}
var isTagColumn func(column flux.ColMeta) bool
if toSpec.Spec.TagColumns == nil {
// If no tag columns are specified, by default we exclude
// _field, _value and _measurement from being tag columns.
excludeColumns := map[string]bool{
execute.DefaultValueColLabel: true,
DefaultFieldColLabel: true,
DefaultMeasurementColLabel: true,
}
// Also exclude the overridden measurement column
excludeColumns[toSpec.Spec.MeasurementColumn] = true
// If a field function is specified then we exclude any column that
// is referenced in the function expression from being a tag column.
if toSpec.Spec.FieldFn.Fn != nil {
recordParam := toSpec.Spec.FieldFn.Fn.Parameters.List[0].Key.Name
exprNode := toSpec.Spec.FieldFn.Fn
colVisitor := newFieldFunctionVisitor(recordParam)
// Walk the field function expression and record which columns
// are referenced. None of these columns will be used as tag columns.
semantic.Walk(colVisitor, exprNode)
for k, v := range colVisitor.captured {
excludeColumns[k] = v
}
}
isTagColumn = func(column flux.ColMeta) bool {
return column.Type == flux.TString && !excludeColumns[column.Label]
}
} else {
// Simply check if column is in the sorted list of tag values
isTagColumn = func(column flux.ColMeta) bool {
i := sort.SearchStrings(toSpec.Spec.TagColumns, column.Label)
if i >= len(toSpec.Spec.TagColumns) {
return false
}
return toSpec.Spec.TagColumns[i] == column.Label
}
}
return &ToTransformation{
Ctx: ctx,
DB: db,
RP: rp,
d: d,
cache: cache,
deps: deps,
buf: coordinator.NewBufferedPointsWriter(deps.PointsWriter, db, rp, DefaultBufferSize),
Ctx: ctx,
DB: db,
RP: rp,
spec: toSpec,
isTagColumn: isTagColumn,
fn: fn,
d: d,
cache: cache,
deps: deps,
buf: coordinator.NewBufferedPointsWriter(deps.PointsWriter, db, rp, DefaultBufferSize),
}, nil
}
@ -164,29 +250,17 @@ func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey)
// Process does the actual work for the ToTransformation.
func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
//TODO(lesam): this is where 2.x overrides with explicit tag columns
measurementColumn := "_measurement"
fieldColumn := "_field"
excludeColumns := map[string]bool{
execute.DefaultValueColLabel: true,
fieldColumn: true,
measurementColumn: true,
}
isTagColumn := func(column flux.ColMeta) bool {
return column.Type == flux.TString && !excludeColumns[column.Label]
}
columns := tbl.Cols()
isTag := make([]bool, len(columns))
numTags := 0
for i, col := range columns {
isTag[i] = isTagColumn(col)
numTags++
isTag[i] = t.isTagColumn(col)
if isTag[i] {
numTags++
}
}
// TODO(lesam): this is where 2.x overrides the default time column label
timeColLabel := execute.DefaultTimeColLabel
timeColLabel := t.spec.Spec.TimeColumn
timeColIdx := execute.ColIdx(timeColLabel, columns)
if timeColIdx < 0 {
return &flux.Error{
@ -201,6 +275,15 @@ func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
}
}
// prepare field function
var fn *execute.RowMapPreparedFn
if t.fn != nil {
var err error
if fn, err = t.fn.Prepare(columns); err != nil {
return err
}
}
builder, new := t.cache.TableBuilder(tbl.Key())
if new {
if err := execute.AddTableCols(tbl, builder); err != nil {
@ -217,12 +300,12 @@ func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
measurementName := ""
fields := make(models.Fields)
var pointTime time.Time
kv = kv[0:]
kv = kv[:0]
// get the non-field values
for j, col := range er.Cols() {
switch {
case col.Label == measurementColumn:
case col.Label == t.spec.Spec.MeasurementColumn:
measurementName = string(er.Strings(j).Value(i))
case col.Label == timeColLabel:
valueTime := execute.ValueForRow(er, i, j)
@ -231,7 +314,6 @@ func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
continue outer
}
pointTime = valueTime.Time().Time()
case isTag[j]:
if col.Type != flux.TString {
return &flux.Error{
@ -253,14 +335,17 @@ func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
if measurementName == "" {
return &flux.Error{
Code: codes.Invalid,
Msg: fmt.Sprintf("no column with label %s exists", measurementColumn),
Msg: fmt.Sprintf("no column with label %s exists", t.spec.Spec.MeasurementColumn),
}
}
var fieldValues values.Object
var err error
// TODO(lesam): this is where we would support the `fn` argument to `to`
if fieldValues, err = defaultFieldMapping(er, i); err != nil {
if fn == nil {
if fieldValues, err = defaultFieldMapping(er, i); err != nil {
return err
}
} else if fieldValues, err = fn.Eval(t.Ctx, i, er); err != nil {
return err
}
@ -303,6 +388,45 @@ func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
})
}
// fieldFunctionVisitor implements semantic.Visitor.
// fieldFunctionVisitor is used to walk the the field function expression
// of the `to` operation and to record all referenced columns. This visitor
// is only used when no tag columns are provided as input to the `to` func.
type fieldFunctionVisitor struct {
visited map[semantic.Node]bool
captured map[string]bool
rowParam string
}
func newFieldFunctionVisitor(rowParam string) *fieldFunctionVisitor {
return &fieldFunctionVisitor{
visited: make(map[semantic.Node]bool),
captured: make(map[string]bool),
rowParam: rowParam,
}
}
// A field function is of the form `(r) => { Function Body }`, and it returns an object
// mapping field keys to values for each row r of the input. Visit records every column
// that is referenced in `Function Body`. These columns are either directly or indirectly
// used as value columns and as such need to be recorded so as not to be used as tag columns.
func (v *fieldFunctionVisitor) Visit(node semantic.Node) semantic.Visitor {
if v.visited[node] {
return v
}
if member, ok := node.(*semantic.MemberExpression); ok {
if obj, ok := member.Object.(*semantic.IdentifierExpression); ok {
if obj.Name == v.rowParam {
v.captured[member.Property] = true
}
}
}
v.visited[node] = true
return v
}
func (v *fieldFunctionVisitor) Done(node semantic.Node) {}
// UpdateWatermark updates the watermark for the transformation for the `to` flux function.
func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
return t.d.UpdateWatermark(pt)

View File

@ -9,18 +9,22 @@ import (
"github.com/influxdata/flux/dependencies/dependenciestest"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/flux/runtime"
_ "github.com/influxdata/flux/stdlib"
"github.com/influxdata/flux/values/valuestest"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/flux/builtin"
"github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/services/meta"
"github.com/stretchr/testify/assert"
)
func init() {
builtin.Initialize()
}
func TestTo_Query(t *testing.T) {
runtime.FinalizeBuiltIns()
tests := []querytest.NewQueryTestCase{
{
Name: "from with database with range",
@ -36,7 +40,9 @@ func TestTo_Query(t *testing.T) {
{
ID: "influx1x/toKind1",
Spec: &influxdb.ToOpSpec{
Bucket: "myotherdb/autogen",
Bucket: "myotherdb/autogen",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
},
@ -70,7 +76,9 @@ func TestTo_Process(t *testing.T) {
name: "default case",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my_db",
Bucket: "my_db",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
@ -119,6 +127,622 @@ c _value=4 41`),
}},
},
},
{
name: "default with heterogeneous tag columns",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tag1", "tag2", "_field"},
Data: [][]interface{}{
{execute.Time(11), "a", "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "a", "bb", "_value", 2.0},
{execute.Time(21), "a", "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "a", "dd", "_value", 3.0},
{execute.Time(41), "a", "c", "ee", "_value", 4.0},
},
}),
executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tagA", Type: flux.TString},
{Label: "tagB", Type: flux.TString},
{Label: "tagC", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tagA", "tagB", "tagC", "_field"},
Data: [][]interface{}{
{execute.Time(11), "b", "a", "aa", "ff", "_value", 2.0},
{execute.Time(21), "b", "a", "bb", "gg", "_value", 2.0},
{execute.Time(21), "b", "b", "cc", "hh", "_value", 1.0},
{execute.Time(31), "b", "a", "dd", "ii", "_value", 3.0},
{execute.Time(41), "b", "c", "ee", "jj", "_value", 4.0},
},
}),
},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`a,tag1=a,tag2=aa _value=2 11
a,tag1=a,tag2=bb _value=2 21
a,tag1=b,tag2=cc _value=1 21
a,tag1=a,tag2=dd _value=3 31
a,tag1=c,tag2=ee _value=4 41
b,tagA=a,tagB=aa,tagC=ff _value=2 11
b,tagA=a,tagB=bb,tagC=gg _value=2 21
b,tagA=b,tagB=cc,tagC=hh _value=1 21
b,tagA=a,tagB=dd,tagC=ii _value=3 31
b,tagA=c,tagB=ee,tagC=jj _value=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tag1", "tag2", "_field"},
Data: [][]interface{}{
{execute.Time(11), "a", "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "a", "bb", "_value", 2.0},
{execute.Time(21), "a", "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "a", "dd", "_value", 3.0},
{execute.Time(41), "a", "c", "ee", "_value", 4.0},
},
},
{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tagA", Type: flux.TString},
{Label: "tagB", Type: flux.TString},
{Label: "tagC", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tagA", "tagB", "tagC", "_field"},
Data: [][]interface{}{
{execute.Time(11), "b", "a", "aa", "ff", "_value", 2.0},
{execute.Time(21), "b", "a", "bb", "gg", "_value", 2.0},
{execute.Time(21), "b", "b", "cc", "hh", "_value", 1.0},
{execute.Time(31), "b", "a", "dd", "ii", "_value", 3.0},
{execute.Time(41), "b", "c", "ee", "jj", "_value", 4.0},
},
},
},
},
},
{
name: "no _measurement with multiple tag columns",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "tag1",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "bb", "_value", 2.0},
{execute.Time(21), "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "dd", "_value", 3.0},
{execute.Time(41), "c", "ee", "_value", 4.0},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`a,tag2=aa _value=2 11
a,tag2=bb _value=2 21
b,tag2=cc _value=1 21
a,tag2=dd _value=3 31
c,tag2=ee _value=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "bb", "_value", 2.0},
{execute.Time(21), "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "dd", "_value", 3.0},
{execute.Time(41), "c", "ee", "_value", 4.0},
},
}},
},
},
{
name: "explicit tags",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "a", "aa"},
{execute.Time(21), "m", "_value", 2.0, "a", "bb"},
{execute.Time(21), "m", "_value", 1.0, "b", "cc"},
{execute.Time(31), "m", "_value", 3.0, "a", "dd"},
{execute.Time(41), "m", "_value", 4.0, "c", "ee"},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c,tag2=ee _value=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "a", "aa"},
{execute.Time(21), "m", "_value", 2.0, "a", "bb"},
{execute.Time(21), "m", "_value", 1.0, "b", "cc"},
{execute.Time(31), "m", "_value", 3.0, "a", "dd"},
{execute.Time(41), "m", "_value", 4.0, "c", "ee"},
},
}},
},
},
{
name: "explicit field function",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({temperature: r.temperature})`),
},
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", 2.0},
{execute.Time(21), "a", 2.0},
{execute.Time(21), "b", 1.0},
{execute.Time(31), "a", 3.0},
{execute.Time(41), "c", 4.0},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`a temperature=2 11
a temperature=2 21
b temperature=1 21
a temperature=3 31
c temperature=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", 2.0},
{execute.Time(21), "a", 2.0},
{execute.Time(21), "b", 1.0},
{execute.Time(31), "a", 3.0},
{execute.Time(41), "c", 4.0},
},
}},
},
},
{
name: "infer tags from complex field function",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "tag",
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({day: r.day, temperature: r.temperature, humidity: r.humidity, ratio: r.temperature / r.humidity})`),
},
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_time", Type: flux.TTime},
{Label: "day", Type: flux.TString},
{Label: "tag", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TFloat},
{Label: "_value", Type: flux.TString},
},
KeyCols: []string{"_measurement", "_field"},
Data: [][]interface{}{
{"m", "f", execute.Time(11), "Monday", "a", 2.0, 1.0, "bogus"},
{"m", "f", execute.Time(21), "Tuesday", "a", 2.0, 2.0, "bogus"},
{"m", "f", execute.Time(21), "Wednesday", "b", 1.0, 4.0, "bogus"},
{"m", "f", execute.Time(31), "Thursday", "a", 3.0, 3.0, "bogus"},
{"m", "f", execute.Time(41), "Friday", "c", 4.0, 5.0, "bogus"},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`a day="Monday",humidity=1,ratio=2,temperature=2 11
a day="Tuesday",humidity=2,ratio=1,temperature=2 21
b day="Wednesday",humidity=4,ratio=0.25,temperature=1 21
a day="Thursday",humidity=3,ratio=1,temperature=3 31
c day="Friday",humidity=5,ratio=0.8,temperature=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_time", Type: flux.TTime},
{Label: "day", Type: flux.TString},
{Label: "tag", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TFloat},
{Label: "_value", Type: flux.TString},
},
KeyCols: []string{"_measurement", "_field"},
Data: [][]interface{}{
{"m", "f", execute.Time(11), "Monday", "a", 2.0, 1.0, "bogus"},
{"m", "f", execute.Time(21), "Tuesday", "a", 2.0, 2.0, "bogus"},
{"m", "f", execute.Time(21), "Wednesday", "b", 1.0, 4.0, "bogus"},
{"m", "f", execute.Time(31), "Thursday", "a", 3.0, 3.0, "bogus"},
{"m", "f", execute.Time(41), "Friday", "c", 4.0, 5.0, "bogus"},
},
}},
},
},
{
name: "explicit tag columns, multiple values in field function, and extra columns",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "tag1",
TagColumns: []string{"tag2"},
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({temperature: r.temperature, humidity: r.humidity})`),
},
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "other-string-column", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TInt},
{Label: "other-value-column", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`a,tag2=d humidity=50i,temperature=2 11
a,tag2=d humidity=50i,temperature=2 21
b,tag2=d humidity=50i,temperature=1 21
a,tag2=e humidity=60i,temperature=3 31
c,tag2=e humidity=65i,temperature=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "other-string-column", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TInt},
{Label: "other-value-column", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
},
}},
},
},
{
name: "multiple _field",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_hello", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_hello", 4.0},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`a _value=2 11
a _value=2 21
b _value=1 21
a _hello=3 31
c _hello=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_hello", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_hello", 4.0},
},
}},
},
},
{
name: "unordered tags",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, "ee", "c"},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c,tag2=ee _value=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, "ee", "c"},
},
}},
},
},
{
name: "nil timestamp",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{nil, "m", "_value", 4.0, "ee", "c"},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
},
}},
},
},
{
name: "nil tag",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, nil, "c"},
},
})},
want: wanted{
result: &mockPointsWriter{
points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c _value=4 41`),
db: "my-bucket",
rp: "autogen",
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, nil, "c"},
},
}},
},
},
}
for _, tc := range testCases {