feat(flux): enable writing to remote hosts via `to()` and `experimental.to()` (#22634)

pull/22656/head
Daniel Moran 2021-10-12 12:35:44 -04:00 committed by GitHub
parent 0700cc8582
commit 401af4b3ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 271 additions and 2552 deletions

View File

@ -0,0 +1,103 @@
package launcher_test
import (
"context"
"fmt"
"testing"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/stretchr/testify/require"
)
func TestRemoteTo(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Boot 2 servers.
l1 := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l1.ShutdownOrFail(t, ctx)
l2 := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l2.ShutdownOrFail(t, ctx)
// Run a flux script in the 1st server, writing data to the 2nd.
q1 := fmt.Sprintf(`import "array"
option now = () => (2030-01-01T00:01:00Z)
rows = [
{_time: now(), _measurement: "test", _field: "f", _value: 1.0},
{_time: now(), _measurement: "test", _field: "v", _value: -123.0},
{_time: now(), _measurement: "test2", _field: "f", _value: 0.03}
]
array.from(rows) |> to(bucket: "%s", host: "%s", token: "%s", org: "%s")
`, l2.Bucket.Name, l2.URL().String(), l2.Auth.Token, l2.Org.Name)
_ = l1.FluxQueryOrFail(t, l1.Org, l1.Auth.Token, q1)
// Query the 2nd server and check that the points landed.
q2 := fmt.Sprintf(`from(bucket:"%s")
|> range(start: 2030-01-01T00:00:00Z, stop: 2030-01-02T00:00:00Z)
|> keep(columns: ["_measurement", "_field", "_value"])
`, l2.Bucket.Name)
exp := `,result,table,_value,_field,_measurement` + "\r\n" +
`,_result,0,1,f,test` + "\r\n" +
`,_result,1,0.03,f,test2` + "\r\n" +
`,_result,2,-123,v,test` + "\r\n\r\n"
res := l2.FluxQueryOrFail(t, l2.Org, l2.Auth.Token, q2)
require.Equal(t, exp, res)
}
func TestRemoteTo_Experimental(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Boot 2 servers.
l1 := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l1.ShutdownOrFail(t, ctx)
l2 := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l2.ShutdownOrFail(t, ctx)
// Run a flux script in the 1st server, writing data to the 2nd.
q1 := fmt.Sprintf(`import "array"
import "experimental"
option now = () => (2030-01-01T00:01:00Z)
testRows = [
{_time: now(), _field: "f", _value: 1.0},
{_time: now(), _field: "v", _value: -123.0},
]
test2Rows = [
{_time: now(), _field: "f", _value: 0.03}
]
testTable = array.from(rows: testRows)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({r with _measurement: "test"}))
test2Table = array.from(rows: test2Rows)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({r with _measurement: "test2"}))
union(tables: [testTable, test2Table]) |> group(columns: ["_measurement"])
|> experimental.to(bucket: "%s", host: "%s", token: "%s", org: "%s")
`, l2.Bucket.Name, l2.URL().String(), l2.Auth.Token, l2.Org.Name)
_ = l1.FluxQueryOrFail(t, l1.Org, l1.Auth.Token, q1)
// Query the 2nd server and check that the points landed.
q2 := fmt.Sprintf(`from(bucket:"%s")
|> range(start: 2030-01-01T00:00:00Z, stop: 2030-01-02T00:00:00Z)
|> keep(columns: ["_measurement", "_field", "_value"])
`, l2.Bucket.Name)
exp := `,result,table,_value,_field,_measurement` + "\r\n" +
`,_result,0,1,f,test` + "\r\n" +
`,_result,1,0.03,f,test2` + "\r\n" +
`,_result,2,-123,v,test` + "\r\n\r\n"
res := l2.FluxQueryOrFail(t, l2.Org, l2.Auth.Token, q2)
require.Equal(t, exp, res)
}

2
go.mod
View File

@ -42,6 +42,7 @@ require (
github.com/influxdata/flux v0.133.1-0.20211007185412-3d6c47d9113f
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839
github.com/influxdata/pkg-config v0.2.9-0.20210928145121-f721f9766b86
github.com/jmoiron/sqlx v1.3.4
github.com/jsternberg/zap-logfmt v1.2.0
@ -166,7 +167,6 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect

View File

@ -1,439 +0,0 @@
package experimental
import (
"context"
"errors"
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/semantic"
_ "github.com/influxdata/flux/stdlib/experimental"
platform "github.com/influxdata/influxdb/v2"
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/v2/storage"
)
// ToKind is the kind for the `to` flux function
const ExperimentalToKind = "influxdb-experimental-to"
// ToOpSpec is the flux.OperationSpec for the `to` flux function.
type ToOpSpec struct {
Bucket string `json:"bucket"`
BucketID string `json:"bucketID"`
Org string `json:"org"`
OrgID string `json:"orgID"`
Host string `json:"host"`
Token string `json:"token"`
}
func init() {
toSignature := runtime.MustLookupBuiltinType("experimental", "to")
runtime.ReplacePackageValue("experimental", "to", flux.MustValue(flux.FunctionValueWithSideEffect("to", createToOpSpec, toSignature)))
plan.RegisterProcedureSpecWithSideEffect(ExperimentalToKind, newToProcedure, ExperimentalToKind)
execute.RegisterTransformation(ExperimentalToKind, createToTransformation)
}
// ReadArgs reads the args from flux.Arguments into the op spec
func (o *ToOpSpec) ReadArgs(args flux.Arguments) error {
var err error
var ok bool
if o.Bucket, ok, _ = args.GetString("bucket"); !ok {
if o.BucketID, err = args.GetRequiredString("bucketID"); err != nil {
return err
}
} else if o.BucketID, ok, _ = args.GetString("bucketID"); ok {
return &flux.Error{
Code: codes.Invalid,
Msg: "cannot provide both `bucket` and `bucketID` parameters to the `to` function",
}
}
if o.Org, ok, _ = args.GetString("org"); !ok {
if o.OrgID, _, err = args.GetString("orgID"); err != nil {
return err
}
} else if o.OrgID, ok, _ = args.GetString("orgID"); ok {
return &flux.Error{
Code: codes.Invalid,
Msg: "cannot provide both `org` and `orgID` parameters to the `to` function",
}
}
if o.Host, ok, _ = args.GetString("host"); ok {
if o.Token, err = args.GetRequiredString("token"); err != nil {
return err
}
}
return err
}
func createToOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
if err := a.AddParentFromArgs(args); err != nil {
return nil, err
}
s := &ToOpSpec{}
if err := s.ReadArgs(args); err != nil {
return nil, err
}
return s, nil
}
// Kind returns the kind for the ToOpSpec function.
func (ToOpSpec) Kind() flux.OperationKind {
return ExperimentalToKind
}
// BucketsAccessed returns the buckets accessed by the spec.
func (o *ToOpSpec) BucketsAccessed(orgID *platform2.ID) (readBuckets, writeBuckets []platform.BucketFilter) {
bf := platform.BucketFilter{}
if o.Bucket != "" {
bf.Name = &o.Bucket
}
if o.BucketID != "" {
id, err := platform2.IDFromString(o.BucketID)
if err == nil {
bf.ID = id
}
}
if o.Org != "" {
bf.Org = &o.Org
}
if o.OrgID != "" {
id, err := platform2.IDFromString(o.OrgID)
if err == nil {
bf.OrganizationID = id
}
}
writeBuckets = append(writeBuckets, bf)
return readBuckets, writeBuckets
}
// ToProcedureSpec is the procedure spec for the `to` flux function.
type ToProcedureSpec struct {
plan.DefaultCost
Spec *ToOpSpec
}
// Kind returns the kind for the procedure spec for the `to` flux function.
func (o *ToProcedureSpec) Kind() plan.ProcedureKind {
return ExperimentalToKind
}
// Copy clones the procedure spec for `to` flux function.
func (o *ToProcedureSpec) Copy() plan.ProcedureSpec {
s := o.Spec
res := &ToProcedureSpec{
Spec: &ToOpSpec{
Bucket: s.Bucket,
BucketID: s.BucketID,
Org: s.Org,
OrgID: s.OrgID,
Host: s.Host,
Token: s.Token,
},
}
return res
}
func newToProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*ToOpSpec)
if !ok && spec != nil {
return nil, &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("invalid spec type %T", qs),
}
}
return &ToProcedureSpec{Spec: spec}, nil
}
func createToTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
s, ok := spec.(*ToProcedureSpec)
if !ok {
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
}
cache := execute.NewTableBuilderCache(a.Allocator())
d := execute.NewDataset(id, mode, cache)
deps := influxdb.GetStorageDependencies(a.Context()).ToDeps
t, err := NewToTransformation(a.Context(), d, cache, s, deps)
if err != nil {
return nil, nil, err
}
return t, d, nil
}
// ToTransformation is the transformation for the `to` flux function.
type ToTransformation struct {
execute.ExecutionNode
ctx context.Context
bucketID platform2.ID
orgID platform2.ID
d execute.Dataset
cache execute.TableBuilderCache
spec *ToOpSpec
deps influxdb.ToDependencies
buf *storage.BufferedPointsWriter
}
// RetractTable retracts the table for the transformation for the `to` flux function.
func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
return t.d.RetractTable(key)
}
// NewToTransformation returns a new *ToTransformation with the appropriate fields set.
func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, spec *ToProcedureSpec, deps influxdb.ToDependencies) (*ToTransformation, error) {
var err error
var orgID platform2.ID
// Get organization name and ID
if spec.Spec.Org != "" {
oID, ok := deps.OrganizationLookup.Lookup(ctx, spec.Spec.Org)
if !ok {
return nil, fmt.Errorf("failed to look up organization %q", spec.Spec.Org)
}
orgID = oID
} else if spec.Spec.OrgID != "" {
if oid, err := platform2.IDFromString(spec.Spec.OrgID); err != nil {
return nil, err
} else {
orgID = *oid
}
} else {
// No org or orgID provided as an arg, use the orgID from the context
req := query.RequestFromContext(ctx)
if req == nil {
return nil, errors.New("missing request on context")
}
orgID = req.OrganizationID
}
var bucketID *platform2.ID
// Get bucket name and ID
// User will have specified exactly one in the ToOpSpec.
if spec.Spec.Bucket != "" {
bID, ok := deps.BucketLookup.Lookup(ctx, orgID, spec.Spec.Bucket)
if !ok {
return nil, fmt.Errorf("failed to look up bucket %q in org %q", spec.Spec.Bucket, spec.Spec.Org)
}
bucketID = &bID
} else {
if bucketID, err = platform2.IDFromString(spec.Spec.BucketID); err != nil {
return nil, err
}
}
return &ToTransformation{
ctx: ctx,
bucketID: *bucketID,
orgID: orgID,
d: d,
cache: cache,
spec: spec.Spec,
deps: deps,
buf: storage.NewBufferedPointsWriter(orgID, *bucketID, influxdb.DefaultBufferSize, deps.PointsWriter),
}, nil
}
// Process does the actual work for the ToTransformation.
func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
return t.writeTable(t.ctx, tbl)
}
// UpdateWatermark updates the watermark for the transformation for the `to` flux function.
func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
return t.d.UpdateWatermark(pt)
}
// UpdateProcessingTime updates the processing time for the transformation for the `to` flux function.
func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
return t.d.UpdateProcessingTime(pt)
}
// Finish is called after the `to` flux function's transformation is done processing.
func (t *ToTransformation) Finish(id execute.DatasetID, err error) {
if err != nil {
t.d.Finish(err)
return
}
err = t.buf.Flush(t.ctx)
t.d.Finish(err)
}
const (
defaultFieldColLabel = "_field"
defaultMeasurementColLabel = influxdb.DefaultMeasurementColLabel
defaultTimeColLabel = execute.DefaultTimeColLabel
defaultStartColLabel = execute.DefaultStartColLabel
defaultStopColLabel = execute.DefaultStopColLabel
)
type LabelAndOffset struct {
Label string
Offset int
}
// TablePointsMetadata stores state needed to write the points from one table.
type TablePointsMetadata struct {
MeasurementName string
// The tags in the table (final element is left as nil, to be replaced by field name)
Tags [][]byte
// The offset in tags where to store the field name
FieldKeyTagValueOffset int
// The column offset in the input table where the _time column is stored
TimestampOffset int
// The labels and offsets of all the fields in the table
Fields []LabelAndOffset
}
func GetTablePointsMetadata(tbl flux.Table) (*TablePointsMetadata, error) {
// Find measurement, tags
var measurement string
tagmap := make(map[string]string, len(tbl.Key().Cols())+2)
isTag := make(map[string]bool)
for j, col := range tbl.Key().Cols() {
switch col.Label {
case defaultStartColLabel:
continue
case defaultStopColLabel:
continue
case defaultFieldColLabel:
return nil, fmt.Errorf("found column %q in the group key; experimental.to() expects pivoted data", col.Label)
case defaultMeasurementColLabel:
if col.Type != flux.TString {
return nil, fmt.Errorf("group key column %q has type %v; type %v is required", col.Label, col.Type, flux.TString)
}
measurement = tbl.Key().ValueString(j)
default:
if col.Type != flux.TString {
return nil, fmt.Errorf("group key column %q has type %v; type %v is required", col.Label, col.Type, flux.TString)
}
isTag[col.Label] = true
tagmap[col.Label] = tbl.Key().ValueString(j)
}
}
if len(measurement) == 0 {
return nil, fmt.Errorf("required column %q not in group key", defaultMeasurementColLabel)
}
t := models.NewTags(tagmap)
tags := make([][]byte, 0, len(t)*2)
for i := range t {
tags = append(tags, t[i].Key, t[i].Value)
}
// Loop over all columns to find fields and _time
fields := make([]LabelAndOffset, 0, len(tbl.Cols())-len(tbl.Key().Cols()))
timestampOffset := -1
for j, col := range tbl.Cols() {
switch col.Label {
case defaultStartColLabel:
continue
case defaultStopColLabel:
continue
case defaultMeasurementColLabel:
continue
case defaultTimeColLabel:
if col.Type != flux.TTime {
return nil, fmt.Errorf("column %q has type string; type %s is required", defaultTimeColLabel, flux.TTime)
}
timestampOffset = j
continue
default:
if !isTag[col.Label] {
fields = append(fields, LabelAndOffset{Label: col.Label, Offset: j})
}
}
}
if timestampOffset == -1 {
return nil, fmt.Errorf("input table is missing required column %q", defaultTimeColLabel)
}
tmd := &TablePointsMetadata{
MeasurementName: measurement,
Tags: tags,
TimestampOffset: timestampOffset,
Fields: fields,
}
return tmd, nil
}
func (t *ToTransformation) writeTable(ctx context.Context, tbl flux.Table) error {
builder, new := t.cache.TableBuilder(tbl.Key())
if new {
if err := execute.AddTableCols(tbl, builder); err != nil {
return err
}
}
tmd, err := GetTablePointsMetadata(tbl)
if err != nil {
return err
}
pointName := tmd.MeasurementName
return tbl.Do(func(cr flux.ColReader) error {
if cr.Len() == 0 {
// Nothing to do
return nil
}
var (
points models.Points
tags models.Tags
)
for i := 0; i < cr.Len(); i++ {
fields := make(models.Fields, len(tmd.Fields))
for _, lao := range tmd.Fields {
fieldVal := execute.ValueForRow(cr, i, lao.Offset)
// Skip this iteration if field value is null
if fieldVal.IsNull() {
continue
}
switch fieldVal.Type() {
case semantic.BasicFloat:
fields[lao.Label] = fieldVal.Float()
case semantic.BasicInt:
fields[lao.Label] = fieldVal.Int()
case semantic.BasicUint:
fields[lao.Label] = fieldVal.UInt()
case semantic.BasicString:
fields[lao.Label] = fieldVal.Str()
case semantic.BasicBool:
fields[lao.Label] = fieldVal.Bool()
default:
return fmt.Errorf("unsupported field type %v", fieldVal.Type())
}
}
timestamp := execute.ValueForRow(cr, i, tmd.TimestampOffset).Time().Time()
tags, err := models.NewTagsKeyValues(tags, tmd.Tags...)
if err != nil {
return err
}
pt, err := models.NewPoint(pointName, tags, fields, timestamp)
if err != nil {
return err
}
points = append(points, pt)
if err := execute.AppendRecord(i, cr, builder); err != nil {
return err
}
}
return t.buf.WritePoints(ctx, points)
})
}

View File

@ -1,543 +0,0 @@
package experimental_test
import (
"context"
"errors"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/flux/stdlib/universe"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query/stdlib/experimental"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
)
func TestTo_Query(t *testing.T) {
tests := []querytest.NewQueryTestCase{
{
Name: "from range pivot experimental to",
Raw: `import "experimental"
import "influxdata/influxdb/v1"
from(bucket:"mydb")
|> range(start: -1h)
|> v1.fieldsAsCols()
|> experimental.to(bucket:"series1", org:"fred", host:"localhost", token:"auth-token")`,
Want: &flux.Spec{
Operations: []*flux.Operation{
{
ID: "from0",
Spec: &influxdb.FromOpSpec{
Bucket: influxdb.NameOrID{Name: "mydb"},
},
},
{
ID: "range1",
Spec: &universe.RangeOpSpec{
Start: flux.Time{IsRelative: true, Relative: -time.Hour},
Stop: flux.Time{IsRelative: true},
TimeColumn: "_time",
StartColumn: "_start",
StopColumn: "_stop",
},
},
{
ID: "pivot2",
Spec: &universe.PivotOpSpec{
RowKey: []string{"_time"},
ColumnKey: []string{"_field"},
ValueColumn: "_value"},
},
{
ID: "influxdb-experimental-to3",
Spec: &experimental.ToOpSpec{
Bucket: "series1",
Org: "fred",
Host: "localhost",
Token: "auth-token",
},
},
},
Edges: []flux.Edge{
{Parent: "from0", Child: "range1"},
{Parent: "range1", Child: "pivot2"},
{Parent: "pivot2", Child: "influxdb-experimental-to3"},
},
},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
querytest.NewQueryTestHelper(t, tc)
})
}
}
func TestTo_Process(t *testing.T) {
oid, _ := mock.OrganizationLookup{}.Lookup(context.Background(), "my-org")
bid, _ := mock.BucketLookup{}.Lookup(context.Background(), oid, "my-bucket")
type wanted struct {
result *mock.PointsWriter
}
testCases := []struct {
name string
spec *experimental.ToProcedureSpec
data []*executetest.Table
want wanted
wantErr error
}{
{
name: "measurement not in group key",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", 4.0},
},
}},
wantErr: errors.New(`required column "_measurement" not in group key`),
},
{
name: "non-string in group key",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop", "gkcol"},
ColMeta: []flux.ColMeta{
{Label: "gkcol", Type: flux.TFloat},
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{100.0, execute.Time(0), execute.Time(100), execute.Time(11), "a", 2.0},
{100.0, execute.Time(0), execute.Time(100), execute.Time(21), "a", 2.0},
{100.0, execute.Time(0), execute.Time(100), execute.Time(21), "a", 1.0},
{100.0, execute.Time(0), execute.Time(100), execute.Time(31), "a", 3.0},
{100.0, execute.Time(0), execute.Time(100), execute.Time(41), "a", 4.0},
},
}},
wantErr: errors.New(`group key column "gkcol" has type float; type string is required`),
},
{
name: "unpivoted data with _field column",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop", "_field"},
ColMeta: []flux.ColMeta{
{Label: "_field", Type: flux.TString},
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{"cpu", execute.Time(0), execute.Time(100), execute.Time(11), "a", 2.0},
{"cpu", execute.Time(0), execute.Time(100), execute.Time(21), "a", 2.0},
{"cpu", execute.Time(0), execute.Time(100), execute.Time(21), "a", 1.0},
{"cpu", execute.Time(0), execute.Time(100), execute.Time(31), "a", 3.0},
{"cpu", execute.Time(0), execute.Time(100), execute.Time(41), "a", 4.0},
},
}},
wantErr: errors.New(`found column "_field" in the group key; experimental.to() expects pivoted data`),
},
{
name: "no time column",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), "a", 2.0},
{execute.Time(0), execute.Time(100), "a", 2.0},
{execute.Time(0), execute.Time(100), "a", 1.0},
{execute.Time(0), execute.Time(100), "a", 1.0},
{execute.Time(0), execute.Time(100), "a", 1.0},
{execute.Time(0), execute.Time(100), "a", 3.0},
{execute.Time(0), execute.Time(100), "a", 4.0},
},
}},
wantErr: errors.New(`input table is missing required column "_time"`),
},
{
name: "time column wrong type",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), "eleven", "a", 2.0},
{execute.Time(0), execute.Time(100), "twenty-one", "a", 2.0},
{execute.Time(0), execute.Time(100), "twenty-one", "a", 1.0},
{execute.Time(0), execute.Time(100), "thirty-one", "a", 3.0},
{execute.Time(0), execute.Time(100), "forty-one", "a", 4.0},
},
}},
wantErr: errors.New(`column "_time" has type string; type time is required`),
},
{
name: "field invalid type",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TTime},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", execute.Time(11)},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", execute.Time(11)},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", execute.Time(11)},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", execute.Time(11)},
{execute.Time(0), execute.Time(100), execute.Time(41), "a", execute.Time(11)},
},
}},
wantErr: errors.New("unsupported field type time"),
},
{
name: "simple case",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "a", 4.0},
},
}},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a v=2 11
a v=2 21
a v=1 21
a v=3 31
a v=4 41`),
},
},
},
{
name: "two tags",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop", "t"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "t", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "x", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "x", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "x", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "x", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "a", "x", 4.0},
},
}},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,t=x v=2 11
a,t=x v=2 21
a,t=x v=1 21
a,t=x v=3 31
a,t=x v=4 41`),
},
},
},
{
name: "two tags measurement not first",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_start", "_stop", "t", "_measurement"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "t", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "x", "a", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "x", "a", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "x", "a", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "x", "a", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "x", "a", 4.0},
},
}},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,t=x v=2 11
a,t=x v=2 21
a,t=x v=1 21
a,t=x v=3 31
a,t=x v=4 41`),
},
},
},
{
name: "two fields",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
{Label: "w", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", 2.0, 3.5},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", 2.0, 3.5},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", 1.0, 2.5},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", 3.0, 4.5},
{execute.Time(0), execute.Time(100), execute.Time(41), "a", 4.0, 5.5},
},
}},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a v=2,w=3.5 11
a v=2,w=3.5 21
a v=1,w=2.5 21
a v=3,w=4.5 31
a v=4,w=5.5 41`),
},
},
},
{
name: "two fields and key column",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "key1"},
ColMeta: []flux.ColMeta{
{Label: "key1", Type: flux.TString},
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
{Label: "w", Type: flux.TFloat},
},
Data: [][]interface{}{
{"v1", execute.Time(0), execute.Time(100), execute.Time(11), "a", 2.0, 3.5},
{"v1", execute.Time(0), execute.Time(100), execute.Time(21), "a", 2.0, 3.5},
{"v1", execute.Time(0), execute.Time(100), execute.Time(21), "a", 1.0, 2.5},
{"v1", execute.Time(0), execute.Time(100), execute.Time(31), "a", 3.0, 4.5},
{"v1", execute.Time(0), execute.Time(100), execute.Time(41), "a", 4.0, 5.5},
},
}},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,key1=v1 v=2,w=3.5 11
a,key1=v1 v=2,w=3.5 21
a,key1=v1 v=1,w=2.5 21
a,key1=v1 v=3,w=4.5 31
a,key1=v1 v=4,w=5.5 41`),
},
},
},
{
name: "unordered tags",
spec: &experimental.ToProcedureSpec{
Spec: &experimental.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
},
},
data: []*executetest.Table{{
KeyCols: []string{"_measurement", "_start", "_stop", "t1", "t0"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "t1", Type: flux.TString},
{Label: "t0", Type: flux.TString},
{Label: "v", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "val1", "val0", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "val1", "val0", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "val1", "val0", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "val1", "val0", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "a", "val1", "val0", 4.0},
},
}},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(oid, bid, `a,t0=val0,t1=val1 v=2 11
a,t0=val0,t1=val1 v=2 21
a,t0=val0,t1=val1 v=1 21
a,t0=val0,t1=val1 v=3 31
a,t0=val0,t1=val1 v=4 41`),
},
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
deps := mockDependencies()
inTables := make([]flux.Table, 0, len(tc.data))
wantTables := make([]*executetest.Table, 0, len(tc.data))
for _, tbl := range tc.data {
rwTable := &executetest.RowWiseTable{Table: tbl}
inTables = append(inTables, rwTable)
wantTables = append(wantTables, tbl)
}
executetest.ProcessTestHelper(
t,
inTables,
wantTables,
tc.wantErr,
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
newT, _ := experimental.NewToTransformation(context.TODO(), d, c, tc.spec, deps)
return newT
},
)
if tc.wantErr == nil {
pw := deps.PointsWriter.(*mock.PointsWriter)
if len(pw.Points) != len(tc.want.result.Points) {
t.Errorf("Expected result values to have length of %d but got %d", len(tc.want.result.Points), len(pw.Points))
}
gotStr := pointsToStr(pw.Points)
wantStr := pointsToStr(tc.want.result.Points)
if !cmp.Equal(gotStr, wantStr) {
t.Errorf("got other than expected %s", cmp.Diff(gotStr, wantStr))
}
}
})
}
}
func mockDependencies() influxdb.ToDependencies {
return influxdb.ToDependencies{
BucketLookup: mock.BucketLookup{},
OrganizationLookup: mock.OrganizationLookup{},
PointsWriter: new(mock.PointsWriter),
}
}
func mockPoints(org, bucket platform2.ID, pointdata string) []models.Point {
points, err := models.ParsePoints([]byte(pointdata))
if err != nil {
return nil
}
return points
}
func pointsToStr(points []models.Point) string {
outStr := ""
for _, x := range points {
outStr += x.String() + "\n"
}
return outStr
}

View File

@ -14,8 +14,11 @@ import (
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
protocol "github.com/influxdata/line-protocol"
)
type (
@ -59,6 +62,70 @@ func (p Provider) SeriesCardinalityReaderFor(ctx context.Context, conf influxdb.
}, nil
}
func (p Provider) WriterFor(ctx context.Context, conf influxdb.Config) (influxdb.Writer, error) {
// If a host is specified, writes must be sent through the http provider.
if conf.Host != "" {
return p.HttpProvider.WriterFor(ctx, conf)
}
deps := GetStorageDependencies(ctx).ToDeps
req := query.RequestFromContext(ctx)
if req == nil {
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "missing request on context",
}
}
reqOrgID := req.OrganizationID
// Check if the to() spec is pointing to an org. If so, ensure it's the same as the org executing the request.
//
// It's possible for flux to write points into an org other than the one running the query, but only via an HTTP
// request (which requires a `host` to be set). Specifying an `org` that's == to the one executing the query is
// redundant, but we allow it in order to support running the e2e tests imported from the flux codebase.
var orgID platform.ID
switch {
case conf.Org.Name != "":
var ok bool
orgID, ok = deps.OrganizationLookup.Lookup(ctx, conf.Org.Name)
if !ok {
return nil, &flux.Error{
Code: codes.NotFound,
Msg: fmt.Sprintf("could not find org %q", conf.Org.Name),
}
}
case conf.Org.ID != "":
if err := orgID.DecodeFromString(conf.Org.ID); err != nil {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "invalid org id",
Err: err,
}
}
default:
}
if orgID.Valid() && orgID != reqOrgID {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "host must be specified when writing points to another org",
}
}
bucketID, err := p.lookupBucketID(ctx, reqOrgID, conf.Bucket)
if err != nil {
return nil, err
}
return &localPointsWriter{
ctx: ctx,
buf: make([]models.Point, 1<<14),
orgID: orgID,
bucketID: bucketID,
wr: deps.PointsWriter,
}, nil
}
// readFilterSpec will construct a query.ReadFilterSpec from the context and the
// configuration parameters.
func (p Provider) readFilterSpec(ctx context.Context, conf influxdb.Config, bounds flux.Bounds, predicateSet influxdb.PredicateSet) (query.ReadFilterSpec, error) {
@ -164,3 +231,80 @@ func (s seriesCardinalityReader) Read(ctx context.Context, f func(flux.Table) er
return reader.Do(f)
}
type localPointsWriter struct {
ctx context.Context
buf []models.Point
orgID platform.ID
bucketID platform.ID
n int
wr storage.PointsWriter
err error
}
func (w *localPointsWriter) Write(ms ...protocol.Metric) error {
copyPoints := func() int {
n := 0
for _, m := range ms {
if w.n+n == len(w.buf) {
break
}
mtags := m.TagList()
mfields := m.FieldList()
tags := make(models.Tags, len(mtags))
fields := make(models.Fields, len(mfields))
for ti, t := range mtags {
tags[ti] = models.Tag{Key: []byte(t.Key), Value: []byte(t.Value)}
}
for _, f := range mfields {
fields[f.Key] = f.Value
}
w.buf[w.n+n], w.err = models.NewPoint(m.Name(), tags, fields, m.Time())
if w.err != nil {
return n
}
n++
}
return n
}
for len(ms) > w.available() {
n := copyPoints()
if w.err != nil {
return w.err
}
w.n += n
w.err = w.flush()
if w.err != nil {
return w.err
}
ms = ms[n:]
}
w.n += copyPoints()
return w.err
}
func (w *localPointsWriter) available() int {
return len(w.buf) - w.n
}
func (w *localPointsWriter) flush() error {
if w.err != nil {
return w.err
}
if w.n == 0 {
return nil
}
w.err = w.wr.WritePoints(w.ctx, w.orgID, w.bucketID, w.buf[:w.n])
if w.err != nil {
return w.err
}
w.n = 0
return nil
}
func (w *localPointsWriter) Close() error {
return w.flush()
}

View File

@ -18,6 +18,7 @@ import (
fluxinfluxdb "github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/flux/values"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"

View File

@ -6,6 +6,7 @@ import (
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
@ -55,6 +56,27 @@ func (d FromDependencies) PrometheusCollectors() []prometheus.Collector {
return collectors
}
// ToDependencies contains the dependencies for executing the `to` function.
type ToDependencies struct {
BucketLookup BucketLookup
OrganizationLookup OrganizationLookup
PointsWriter storage.PointsWriter
}
// Validate returns an error if any required field is unset.
func (d ToDependencies) Validate() error {
if d.BucketLookup == nil {
return errors.New("missing bucket lookup dependency")
}
if d.OrganizationLookup == nil {
return errors.New("missing organization lookup dependency")
}
if d.PointsWriter == nil {
return errors.New("missing points writer dependency")
}
return nil
}
type StaticLookup struct {
hosts []string
}

View File

@ -1,708 +0,0 @@
package influxdb
import (
"context"
"fmt"
"sort"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/compiler"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/stdlib/kafka"
"github.com/influxdata/flux/values"
platform "github.com/influxdata/influxdb/v2"
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage"
)
const (
// ToKind is the kind for the `to` flux function
ToKind = "influx2x/toKind"
// TODO(jlapacik) remove this once we have execute.DefaultFieldColLabel
defaultFieldColLabel = "_field"
DefaultMeasurementColLabel = "_measurement"
DefaultBufferSize = 1 << 14
toOp = "influxdata/influxdb/to"
)
// ToOpSpec is the flux.OperationSpec for the `to` flux function.
type ToOpSpec struct {
Bucket string `json:"bucket"`
BucketID string `json:"bucketID"`
Org string `json:"org"`
OrgID string `json:"orgID"`
Host string `json:"host"`
Token string `json:"token"`
TimeColumn string `json:"timeColumn"`
MeasurementColumn string `json:"measurementColumn"`
TagColumns []string `json:"tagColumns"`
FieldFn interpreter.ResolvedFunction `json:"fieldFn"`
}
func init() {
toSignature := runtime.MustLookupBuiltinType("influxdata/influxdb", influxdb.ToKind)
runtime.ReplacePackageValue("influxdata/influxdb", "to", flux.MustValue(flux.FunctionValueWithSideEffect(ToKind, createToOpSpec, toSignature)))
flux.RegisterOpSpec(ToKind, func() flux.OperationSpec { return &ToOpSpec{} })
plan.RegisterProcedureSpecWithSideEffect(ToKind, newToProcedure, ToKind)
execute.RegisterTransformation(ToKind, createToTransformation)
}
// argsReader is an interface for OperationSpec that have the same method to read args.
type argsReader interface {
flux.OperationSpec
ReadArgs(args flux.Arguments) error
}
// ReadArgs reads the args from flux.Arguments into the op spec
func (o *ToOpSpec) ReadArgs(args flux.Arguments) error {
var err error
var ok bool
if o.Bucket, ok, _ = args.GetString("bucket"); !ok {
if o.BucketID, err = args.GetRequiredString("bucketID"); err != nil {
return err
}
} else if o.BucketID, ok, _ = args.GetString("bucketID"); ok {
return &flux.Error{
Code: codes.Invalid,
Msg: "cannot provide both `bucket` and `bucketID` parameters to the `to` function",
}
}
if o.Org, ok, _ = args.GetString("org"); !ok {
if o.OrgID, _, err = args.GetString("orgID"); err != nil {
return err
}
} else if o.OrgID, ok, _ = args.GetString("orgID"); ok {
return &flux.Error{
Code: codes.Invalid,
Msg: "cannot provide both `org` and `orgID` parameters to the `to` function",
}
}
if o.Host, ok, _ = args.GetString("host"); ok {
if o.Token, err = args.GetRequiredString("token"); err != nil {
return err
}
}
if o.TimeColumn, ok, _ = args.GetString("timeColumn"); !ok {
o.TimeColumn = execute.DefaultTimeColLabel
}
if o.MeasurementColumn, ok, _ = args.GetString("measurementColumn"); !ok {
o.MeasurementColumn = DefaultMeasurementColLabel
}
if tags, ok, _ := args.GetArray("tagColumns", semantic.String); ok {
o.TagColumns = make([]string, tags.Len())
tags.Sort(func(i, j values.Value) bool {
return i.Str() < j.Str()
})
tags.Range(func(i int, v values.Value) {
o.TagColumns[i] = v.Str()
})
}
if fieldFn, ok, _ := args.GetFunction("fieldFn"); ok {
if o.FieldFn, err = interpreter.ResolveFunction(fieldFn); err != nil {
return err
}
}
return err
}
func createToOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
if err := a.AddParentFromArgs(args); err != nil {
return nil, err
}
_, httpOK, err := args.GetString("url")
if err != nil {
return nil, err
}
_, kafkaOK, err := args.GetString("brokers")
if err != nil {
return nil, err
}
var s argsReader
switch {
case httpOK && kafkaOK:
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "specify at most one of url, brokers in the same `to` function",
}
case kafkaOK:
s = &kafka.ToKafkaOpSpec{}
default:
s = &ToOpSpec{}
}
if err := s.ReadArgs(args); err != nil {
return nil, err
}
return s, nil
}
// Kind returns the kind for the ToOpSpec function.
func (ToOpSpec) Kind() flux.OperationKind {
return ToKind
}
// BucketsAccessed returns the buckets accessed by the spec.
func (o *ToOpSpec) BucketsAccessed(orgID *platform2.ID) (readBuckets, writeBuckets []platform.BucketFilter) {
bf := platform.BucketFilter{}
if o.Bucket != "" {
bf.Name = &o.Bucket
}
if o.BucketID != "" {
id, err := platform2.IDFromString(o.BucketID)
if err == nil {
bf.ID = id
}
}
if o.Org != "" {
bf.Org = &o.Org
}
if o.OrgID != "" {
id, err := platform2.IDFromString(o.OrgID)
if err == nil {
bf.OrganizationID = id
}
}
writeBuckets = append(writeBuckets, bf)
return readBuckets, writeBuckets
}
// ToProcedureSpec is the procedure spec for the `to` flux function.
type ToProcedureSpec struct {
plan.DefaultCost
Spec *ToOpSpec
}
// Kind returns the kind for the procedure spec for the `to` flux function.
func (o *ToProcedureSpec) Kind() plan.ProcedureKind {
return ToKind
}
// Copy clones the procedure spec for `to` flux function.
func (o *ToProcedureSpec) Copy() plan.ProcedureSpec {
s := o.Spec
res := &ToProcedureSpec{
Spec: &ToOpSpec{
Bucket: s.Bucket,
BucketID: s.BucketID,
Org: s.Org,
OrgID: s.OrgID,
Host: s.Host,
Token: s.Token,
TimeColumn: s.TimeColumn,
MeasurementColumn: s.MeasurementColumn,
TagColumns: append([]string(nil), s.TagColumns...),
FieldFn: s.FieldFn.Copy(),
},
}
return res
}
func newToProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*ToOpSpec)
if !ok && spec != nil {
return nil, &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("invalid spec type %T", qs),
}
}
return &ToProcedureSpec{Spec: spec}, nil
}
func createToTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
s, ok := spec.(*ToProcedureSpec)
if !ok {
return nil, nil, &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("invalid spec type %T", spec),
}
}
cache := execute.NewTableBuilderCache(a.Allocator())
d := execute.NewDataset(id, mode, cache)
deps := GetStorageDependencies(a.Context())
if deps == (StorageDependencies{}) {
return nil, nil, &flux.Error{
Code: codes.Unimplemented,
Msg: "cannot return storage dependencies; storage dependencies are unimplemented",
}
}
toDeps := deps.ToDeps
t, err := NewToTransformation(a.Context(), d, cache, s, toDeps)
if err != nil {
return nil, nil, err
}
return t, d, nil
}
// ToTransformation is the transformation for the `to` flux function.
type ToTransformation struct {
execute.ExecutionNode
Ctx context.Context
OrgID platform2.ID
BucketID platform2.ID
d execute.Dataset
fn *execute.RowMapFn
cache execute.TableBuilderCache
spec *ToProcedureSpec
implicitTagColumns bool
deps ToDependencies
buf *storage.BufferedPointsWriter
}
// RetractTable retracts the table for the transformation for the `to` flux function.
func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
return t.d.RetractTable(key)
}
// NewToTransformation returns a new *ToTransformation with the appropriate fields set.
func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, toSpec *ToProcedureSpec, deps ToDependencies) (x *ToTransformation, err error) {
var fn *execute.RowMapFn
spec := toSpec.Spec
var bucketID, orgID *platform2.ID
if spec.FieldFn.Fn != nil {
fn = execute.NewRowMapFn(spec.FieldFn.Fn, compiler.ToScope(spec.FieldFn.Scope))
}
// Get organization ID
if spec.Org != "" {
oID, ok := deps.OrganizationLookup.Lookup(ctx, spec.Org)
if !ok {
return nil, &flux.Error{
Code: codes.NotFound,
Msg: fmt.Sprintf("failed to look up organization %q", spec.Org),
}
}
orgID = &oID
} else if spec.OrgID != "" {
if orgID, err = platform2.IDFromString(spec.OrgID); err != nil {
return nil, err
}
} else {
// No org or orgID provided as an arg, use the orgID from the context
req := query.RequestFromContext(ctx)
if req == nil {
return nil, &errors.Error{
Code: errors.EInternal,
Msg: "missing request on context",
Op: toOp,
}
}
orgID = &req.OrganizationID
}
// Get bucket ID
if spec.Bucket != "" {
bID, ok := deps.BucketLookup.Lookup(ctx, *orgID, spec.Bucket)
if !ok {
return nil, &flux.Error{
Code: codes.NotFound,
Msg: fmt.Sprintf("failed to look up bucket %q in org %q", spec.Bucket, spec.Org),
}
}
bucketID = &bID
} else if bucketID, err = platform2.IDFromString(spec.BucketID); err != nil {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "invalid bucket id",
Err: err,
}
}
if orgID == nil || bucketID == nil {
return nil, &flux.Error{
Code: codes.Unknown,
Msg: "You must specify org and bucket",
}
}
return &ToTransformation{
Ctx: ctx,
OrgID: *orgID,
BucketID: *bucketID,
d: d,
fn: fn,
cache: cache,
spec: toSpec,
implicitTagColumns: spec.TagColumns == nil,
deps: deps,
buf: storage.NewBufferedPointsWriter(*orgID, *bucketID, DefaultBufferSize, deps.PointsWriter),
}, nil
}
// Process does the actual work for the ToTransformation.
func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
if t.implicitTagColumns {
// If no tag columns are specified, by default we exclude
// _field, _value and _measurement from being tag columns.
excludeColumns := map[string]bool{
execute.DefaultValueColLabel: true,
defaultFieldColLabel: true,
DefaultMeasurementColLabel: true,
}
// If a field function is specified then we exclude any column that
// is referenced in the function expression from being a tag column.
if t.spec.Spec.FieldFn.Fn != nil {
recordParam := t.spec.Spec.FieldFn.Fn.Parameters.List[0].Key.Name
exprNode := t.spec.Spec.FieldFn.Fn
colVisitor := newFieldFunctionVisitor(recordParam, tbl.Cols())
// Walk the field function expression and record which columns
// are referenced. None of these columns will be used as tag columns.
semantic.Walk(colVisitor, exprNode)
for k, v := range colVisitor.captured {
excludeColumns[k] = v
}
}
addTagsFromTable(t.spec.Spec, tbl, excludeColumns)
}
return writeTable(t.Ctx, t, tbl)
}
// fieldFunctionVisitor implements semantic.Visitor.
// fieldFunctionVisitor is used to walk the the field function expression
// of the `to` operation and to record all referenced columns. This visitor
// is only used when no tag columns are provided as input to the `to` func.
type fieldFunctionVisitor struct {
columns map[string]bool
visited map[semantic.Node]bool
captured map[string]bool
rowParam string
}
func newFieldFunctionVisitor(rowParam string, cols []flux.ColMeta) *fieldFunctionVisitor {
columns := make(map[string]bool, len(cols))
for _, col := range cols {
columns[col.Label] = true
}
return &fieldFunctionVisitor{
columns: columns,
visited: make(map[semantic.Node]bool, len(cols)),
captured: make(map[string]bool, len(cols)),
rowParam: rowParam,
}
}
// A field function is of the form `(r) => { Function Body }`, and it returns an object
// mapping field keys to values for each row r of the input. Visit records every column
// that is referenced in `Function Body`. These columns are either directly or indirectly
// used as value columns and as such need to be recorded so as not to be used as tag columns.
func (v *fieldFunctionVisitor) Visit(node semantic.Node) semantic.Visitor {
if v.visited[node] {
return v
}
if member, ok := node.(*semantic.MemberExpression); ok {
if obj, ok := member.Object.(*semantic.IdentifierExpression); ok {
if obj.Name == v.rowParam && v.columns[member.Property] {
v.captured[member.Property] = true
}
}
}
v.visited[node] = true
return v
}
func (v *fieldFunctionVisitor) Done(node semantic.Node) {}
func addTagsFromTable(spec *ToOpSpec, table flux.Table, exclude map[string]bool) {
if cap(spec.TagColumns) < len(table.Cols()) {
spec.TagColumns = make([]string, 0, len(table.Cols()))
} else {
spec.TagColumns = spec.TagColumns[:0]
}
for _, column := range table.Cols() {
if column.Type == flux.TString && !exclude[column.Label] {
spec.TagColumns = append(spec.TagColumns, column.Label)
}
}
sort.Strings(spec.TagColumns)
}
// UpdateWatermark updates the watermark for the transformation for the `to` flux function.
func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
return t.d.UpdateWatermark(pt)
}
// UpdateProcessingTime updates the processing time for the transformation for the `to` flux function.
func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
return t.d.UpdateProcessingTime(pt)
}
// Finish is called after the `to` flux function's transformation is done processing.
func (t *ToTransformation) Finish(id execute.DatasetID, err error) {
if err == nil {
err = t.buf.Flush(t.Ctx)
}
t.d.Finish(err)
}
// ToDependencies contains the dependencies for executing the `to` function.
type ToDependencies struct {
BucketLookup BucketLookup
OrganizationLookup OrganizationLookup
PointsWriter storage.PointsWriter
}
// Validate returns an error if any required field is unset.
func (d ToDependencies) Validate() error {
if d.BucketLookup == nil {
return &errors.Error{
Code: errors.EInternal,
Msg: "missing bucket lookup dependency",
Op: toOp,
}
}
if d.OrganizationLookup == nil {
return &errors.Error{
Code: errors.EInternal,
Msg: "missing organization lookup dependency",
Op: toOp,
}
}
if d.PointsWriter == nil {
return &errors.Error{
Code: errors.EInternal,
Msg: "missing points writer dependency",
Op: toOp,
}
}
return nil
}
type Stats struct {
NRows int
Latest time.Time
Earliest time.Time
NFields int
NTags int
}
func (s Stats) Update(o Stats) Stats {
s.NRows += o.NRows
if s.Latest.IsZero() || o.Latest.Unix() > s.Latest.Unix() {
s.Latest = o.Latest
}
if s.Earliest.IsZero() || o.Earliest.Unix() < s.Earliest.Unix() {
s.Earliest = o.Earliest
}
if o.NFields > s.NFields {
s.NFields = o.NFields
}
if o.NTags > s.NTags {
s.NTags = o.NTags
}
return s
}
func writeTable(ctx context.Context, t *ToTransformation, tbl flux.Table) (err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
spec := t.spec.Spec
// cache tag columns
columns := tbl.Cols()
isTag := make([]bool, len(columns))
for i, col := range columns {
tagIdx := sort.SearchStrings(spec.TagColumns, col.Label)
isTag[i] = tagIdx < len(spec.TagColumns) && spec.TagColumns[tagIdx] == col.Label
}
// do time
timeColLabel := spec.TimeColumn
timeColIdx := execute.ColIdx(timeColLabel, columns)
if timeColIdx < 0 {
return &flux.Error{
Code: codes.Invalid,
Msg: "no time column detected",
}
}
if columns[timeColIdx].Type != flux.TTime {
return &flux.Error{
Code: codes.Invalid,
Msg: fmt.Sprintf("column %s of type %s is not of type %s", timeColLabel, columns[timeColIdx].Type, flux.TTime),
}
}
// prepare field function if applicable and record the number of values to write per row
var fn *execute.RowMapPreparedFn
if spec.FieldFn.Fn != nil {
var err error
if fn, err = t.fn.Prepare(columns); err != nil {
return err
}
}
builder, new := t.cache.TableBuilder(tbl.Key())
if new {
if err := execute.AddTableCols(tbl, builder); err != nil {
return err
}
}
measurementStats := make(map[string]Stats)
measurementName := ""
return tbl.Do(func(er flux.ColReader) error {
var pointTime time.Time
var points models.Points
var tags models.Tags
kv := make([][]byte, 2, er.Len()*2+2) // +2 for field key, value
outer:
for i := 0; i < er.Len(); i++ {
measurementName = ""
fields := make(models.Fields)
kv = kv[:0]
// Gather the timestamp and the tags.
for j, col := range er.Cols() {
switch {
case col.Label == spec.MeasurementColumn:
measurementName = string(er.Strings(j).Value(i))
case col.Label == timeColLabel:
valueTime := execute.ValueForRow(er, i, j)
if valueTime.IsNull() {
// skip rows with null timestamp
continue outer
}
pointTime = valueTime.Time().Time()
case isTag[j]:
if col.Type != flux.TString {
return &errors.Error{
Code: errors.EInvalid,
Msg: "invalid type for tag column",
Op: toOp,
}
}
// TODO(docmerlin): instead of doing this sort of thing, it would be nice if we had a way that allocated a lot less.
kv = append(kv, []byte(col.Label), []byte(er.Strings(j).Value(i)))
}
}
if pointTime.IsZero() {
return &flux.Error{
Code: codes.Invalid,
Msg: "timestamp missing from block",
}
}
if measurementName == "" {
return &flux.Error{
Code: codes.Invalid,
Msg: fmt.Sprintf("no column with label %s exists", spec.MeasurementColumn),
}
}
var fieldValues values.Object
if fn == nil {
if fieldValues, err = defaultFieldMapping(er, i); err != nil {
return err
}
} else if fieldValues, err = fn.Eval(t.Ctx, i, er); err != nil {
return err
}
fieldValues.Range(func(k string, v values.Value) {
if v.IsNull() {
fields[k] = nil
return
}
switch v.Type().Nature() {
case semantic.Float:
fields[k] = v.Float()
case semantic.Int:
fields[k] = v.Int()
case semantic.UInt:
fields[k] = v.UInt()
case semantic.String:
fields[k] = v.Str()
case semantic.Time:
fields[k] = v.Time()
case semantic.Bool:
fields[k] = v.Bool()
}
})
mstats := Stats{
NRows: 1,
Latest: pointTime,
Earliest: pointTime,
NFields: len(fields),
NTags: len(kv) / 2,
}
_, ok := measurementStats[measurementName]
if ok {
mstats = measurementStats[measurementName].Update(mstats)
}
measurementStats[measurementName] = mstats
tags, _ = models.NewTagsKeyValues(tags, kv...)
pt, err := models.NewPoint(measurementName, tags, fields, pointTime)
if err != nil {
return err
}
points = append(points, pt)
if err := execute.AppendRecord(i, er, builder); err != nil {
return err
}
}
return t.buf.WritePoints(ctx, points)
})
}
func defaultFieldMapping(er flux.ColReader, row int) (values.Object, error) {
fieldColumnIdx := execute.ColIdx(defaultFieldColLabel, er.Cols())
valueColumnIdx := execute.ColIdx(execute.DefaultValueColLabel, er.Cols())
if fieldColumnIdx < 0 {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "table has no _field column",
}
}
if valueColumnIdx < 0 {
return nil, &flux.Error{
Code: codes.Invalid,
Msg: "table has no _value column",
}
}
value := execute.ValueForRow(er, row, valueColumnIdx)
field := execute.ValueForRow(er, row, fieldColumnIdx)
props := []semantic.PropertyType{
{
Key: []byte(field.Str()),
Value: value.Type(),
},
}
fieldValueMapping := values.NewObject(semantic.NewObjectType(props))
fieldValueMapping.Set(field.Str(), value)
return fieldValueMapping, nil
}

View File

@ -1,798 +0,0 @@
package influxdb_test
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/dependenciestest"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/flux/values/valuestest"
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
)
func TestTo_Query(t *testing.T) {
tests := []querytest.NewQueryTestCase{
{
Name: "from with database with range",
Raw: `from(bucket:"mydb") |> to(bucket:"series1", org:"fred", host:"localhost", token:"auth-token", fieldFn: (r) => ({ col: r.col }) )`,
Want: &flux.Spec{
Operations: []*flux.Operation{
{
ID: "from0",
Spec: &influxdb.FromOpSpec{
Bucket: influxdb.NameOrID{Name: "mydb"},
},
},
{
ID: "influx2x/toKind1",
Spec: &influxdb.ToOpSpec{
Bucket: "series1",
Org: "fred",
Host: "localhost",
Token: "auth-token",
TimeColumn: execute.DefaultTimeColLabel,
MeasurementColumn: influxdb.DefaultMeasurementColLabel,
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({col: r.col})`),
},
},
},
},
Edges: []flux.Edge{
{Parent: "from0", Child: "influx2x/toKind1"},
},
},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
querytest.NewQueryTestHelper(t, tc)
})
}
}
func TestTo_Process(t *testing.T) {
type wanted struct {
result *mock.PointsWriter
tables []*executetest.Table
}
testCases := []struct {
name string
spec *influxdb.ToProcedureSpec
data []flux.Table
want wanted
}{
{
name: "default case",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_value", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_value", 4.0},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a _value=2 11
a _value=2 21
b _value=1 21
a _value=3 31
c _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_value", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_value", 4.0},
},
}},
},
},
{
name: "default with heterogeneous tag columns",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tag1", "tag2", "_field"},
Data: [][]interface{}{
{execute.Time(11), "a", "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "a", "bb", "_value", 2.0},
{execute.Time(21), "a", "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "a", "dd", "_value", 3.0},
{execute.Time(41), "a", "c", "ee", "_value", 4.0},
},
}),
executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tagA", Type: flux.TString},
{Label: "tagB", Type: flux.TString},
{Label: "tagC", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tagA", "tagB", "tagC", "_field"},
Data: [][]interface{}{
{execute.Time(11), "b", "a", "aa", "ff", "_value", 2.0},
{execute.Time(21), "b", "a", "bb", "gg", "_value", 2.0},
{execute.Time(21), "b", "b", "cc", "hh", "_value", 1.0},
{execute.Time(31), "b", "a", "dd", "ii", "_value", 3.0},
{execute.Time(41), "b", "c", "ee", "jj", "_value", 4.0},
},
}),
},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a,tag1=a,tag2=aa _value=2 11
a,tag1=a,tag2=bb _value=2 21
a,tag1=b,tag2=cc _value=1 21
a,tag1=a,tag2=dd _value=3 31
a,tag1=c,tag2=ee _value=4 41
b,tagA=a,tagB=aa,tagC=ff _value=2 11
b,tagA=a,tagB=bb,tagC=gg _value=2 21
b,tagA=b,tagB=cc,tagC=hh _value=1 21
b,tagA=a,tagB=dd,tagC=ii _value=3 31
b,tagA=c,tagB=ee,tagC=jj _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tag1", "tag2", "_field"},
Data: [][]interface{}{
{execute.Time(11), "a", "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "a", "bb", "_value", 2.0},
{execute.Time(21), "a", "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "a", "dd", "_value", 3.0},
{execute.Time(41), "a", "c", "ee", "_value", 4.0},
},
},
{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "tagA", Type: flux.TString},
{Label: "tagB", Type: flux.TString},
{Label: "tagC", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
KeyCols: []string{"_measurement", "tagA", "tagB", "tagC", "_field"},
Data: [][]interface{}{
{execute.Time(11), "b", "a", "aa", "ff", "_value", 2.0},
{execute.Time(21), "b", "a", "bb", "gg", "_value", 2.0},
{execute.Time(21), "b", "b", "cc", "hh", "_value", 1.0},
{execute.Time(31), "b", "a", "dd", "ii", "_value", 3.0},
{execute.Time(41), "b", "c", "ee", "jj", "_value", 4.0},
},
},
},
},
},
{
name: "no _measurement with multiple tag columns",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "tag1",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "bb", "_value", 2.0},
{execute.Time(21), "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "dd", "_value", 3.0},
{execute.Time(41), "c", "ee", "_value", 4.0},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a,tag2=aa _value=2 11
a,tag2=bb _value=2 21
b,tag2=cc _value=1 21
a,tag2=dd _value=3 31
c,tag2=ee _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", "aa", "_value", 2.0},
{execute.Time(21), "a", "bb", "_value", 2.0},
{execute.Time(21), "b", "cc", "_value", 1.0},
{execute.Time(31), "a", "dd", "_value", 3.0},
{execute.Time(41), "c", "ee", "_value", 4.0},
},
}},
},
},
{
name: "explicit tags",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "a", "aa"},
{execute.Time(21), "m", "_value", 2.0, "a", "bb"},
{execute.Time(21), "m", "_value", 1.0, "b", "cc"},
{execute.Time(31), "m", "_value", 3.0, "a", "dd"},
{execute.Time(41), "m", "_value", 4.0, "c", "ee"},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c,tag2=ee _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "a", "aa"},
{execute.Time(21), "m", "_value", 2.0, "a", "bb"},
{execute.Time(21), "m", "_value", 1.0, "b", "cc"},
{execute.Time(31), "m", "_value", 3.0, "a", "dd"},
{execute.Time(41), "m", "_value", 4.0, "c", "ee"},
},
}},
},
},
{
name: "explicit field function",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({temperature: r.temperature})`),
},
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", 2.0},
{execute.Time(21), "a", 2.0},
{execute.Time(21), "b", 1.0},
{execute.Time(31), "a", 3.0},
{execute.Time(41), "c", 4.0},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a temperature=2 11
a temperature=2 21
b temperature=1 21
a temperature=3 31
c temperature=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(11), "a", 2.0},
{execute.Time(21), "a", 2.0},
{execute.Time(21), "b", 1.0},
{execute.Time(31), "a", 3.0},
{execute.Time(41), "c", 4.0},
},
}},
},
},
{
name: "infer tags from complex field function",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "tag",
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({day: r.day, temperature: r.temperature, humidity: r.humidity, ratio: r.temperature / r.humidity})`),
},
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_time", Type: flux.TTime},
{Label: "day", Type: flux.TString},
{Label: "tag", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TFloat},
{Label: "_value", Type: flux.TString},
},
KeyCols: []string{"_measurement", "_field"},
Data: [][]interface{}{
{"m", "f", execute.Time(11), "Monday", "a", 2.0, 1.0, "bogus"},
{"m", "f", execute.Time(21), "Tuesday", "a", 2.0, 2.0, "bogus"},
{"m", "f", execute.Time(21), "Wednesday", "b", 1.0, 4.0, "bogus"},
{"m", "f", execute.Time(31), "Thursday", "a", 3.0, 3.0, "bogus"},
{"m", "f", execute.Time(41), "Friday", "c", 4.0, 5.0, "bogus"},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a day="Monday",humidity=1,ratio=2,temperature=2 11
a day="Tuesday",humidity=2,ratio=1,temperature=2 21
b day="Wednesday",humidity=4,ratio=0.25,temperature=1 21
a day="Thursday",humidity=3,ratio=1,temperature=3 31
c day="Friday",humidity=5,ratio=0.8,temperature=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_time", Type: flux.TTime},
{Label: "day", Type: flux.TString},
{Label: "tag", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TFloat},
{Label: "_value", Type: flux.TString},
},
KeyCols: []string{"_measurement", "_field"},
Data: [][]interface{}{
{"m", "f", execute.Time(11), "Monday", "a", 2.0, 1.0, "bogus"},
{"m", "f", execute.Time(21), "Tuesday", "a", 2.0, 2.0, "bogus"},
{"m", "f", execute.Time(21), "Wednesday", "b", 1.0, 4.0, "bogus"},
{"m", "f", execute.Time(31), "Thursday", "a", 3.0, 3.0, "bogus"},
{"m", "f", execute.Time(41), "Friday", "c", 4.0, 5.0, "bogus"},
},
}},
},
},
{
name: "explicit tag columns, multiple values in field function, and extra columns",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "tag1",
TagColumns: []string{"tag2"},
FieldFn: interpreter.ResolvedFunction{
Scope: valuestest.Scope(),
Fn: executetest.FunctionExpression(t, `(r) => ({temperature: r.temperature, humidity: r.humidity})`),
},
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "other-string-column", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TInt},
{Label: "other-value-column", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a,tag2=d humidity=50i,temperature=2 11
a,tag2=d humidity=50i,temperature=2 21
b,tag2=d humidity=50i,temperature=1 21
a,tag2=e humidity=60i,temperature=3 31
c,tag2=e humidity=65i,temperature=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "tag1", Type: flux.TString},
{Label: "tag2", Type: flux.TString},
{Label: "other-string-column", Type: flux.TString},
{Label: "temperature", Type: flux.TFloat},
{Label: "humidity", Type: flux.TInt},
{Label: "other-value-column", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
},
}},
},
},
{
name: "multiple _field",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_hello", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_hello", 4.0},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`a _value=2 11
a _value=2 21
b _value=1 21
a _hello=3 31
c _hello=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
},
Data: [][]interface{}{
{execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
{execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
{execute.Time(0), execute.Time(100), execute.Time(31), "a", "_hello", 3.0},
{execute.Time(0), execute.Time(100), execute.Time(41), "c", "_hello", 4.0},
},
}},
},
},
{
name: "unordered tags",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, "ee", "c"},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c,tag2=ee _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, "ee", "c"},
},
}},
},
},
{
name: "nil timestamp",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{nil, "m", "_value", 4.0, "ee", "c"},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
},
}},
},
},
{
name: "nil tag",
spec: &influxdb.ToProcedureSpec{
Spec: &influxdb.ToOpSpec{
Org: "my-org",
Bucket: "my-bucket",
TimeColumn: "_time",
TagColumns: []string{"tag1", "tag2"},
MeasurementColumn: "_measurement",
},
},
data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, nil, "c"},
},
})},
want: wanted{
result: &mock.PointsWriter{
Points: mockPoints(`m,tag1=a,tag2=aa _value=2 11
m,tag1=a,tag2=bb _value=2 21
m,tag1=b,tag2=cc _value=1 21
m,tag1=a,tag2=dd _value=3 31
m,tag1=c _value=4 41`),
},
tables: []*executetest.Table{{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_measurement", Type: flux.TString},
{Label: "_field", Type: flux.TString},
{Label: "_value", Type: flux.TFloat},
{Label: "tag2", Type: flux.TString},
{Label: "tag1", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(11), "m", "_value", 2.0, "aa", "a"},
{execute.Time(21), "m", "_value", 2.0, "bb", "a"},
{execute.Time(21), "m", "_value", 1.0, "cc", "b"},
{execute.Time(31), "m", "_value", 3.0, "dd", "a"},
{execute.Time(41), "m", "_value", 4.0, nil, "c"},
},
}},
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
deps := influxdb.Dependencies{
FluxDeps: dependenciestest.Default(),
StorageDeps: influxdb.StorageDependencies{
ToDeps: mockDependencies(),
},
}
executetest.ProcessTestHelper(
t,
tc.data,
tc.want.tables,
nil,
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
ctx := deps.Inject(context.Background())
newT, err := influxdb.NewToTransformation(ctx, d, c, tc.spec, deps.StorageDeps.ToDeps)
if err != nil {
t.Error(err)
}
return newT
},
)
pw := deps.StorageDeps.ToDeps.PointsWriter.(*mock.PointsWriter)
if len(pw.Points) != len(tc.want.result.Points) {
t.Errorf("Expected result values to have length of %d but got %d", len(tc.want.result.Points), len(pw.Points))
}
gotStr := pointsToStr(pw.Points)
wantStr := pointsToStr(tc.want.result.Points)
if !cmp.Equal(gotStr, wantStr) {
t.Errorf("got other than expected %s", cmp.Diff(gotStr, wantStr))
}
})
}
}
func mockDependencies() influxdb.ToDependencies {
return influxdb.ToDependencies{
BucketLookup: mock.BucketLookup{},
OrganizationLookup: mock.OrganizationLookup{},
PointsWriter: new(mock.PointsWriter),
}
}
func pointsToStr(points []models.Point) string {
outStr := ""
for _, x := range points {
outStr += x.String() + "\n"
}
return outStr
}
func mockPoints(pointdata string) []models.Point {
points, err := models.ParsePoints([]byte(pointdata))
if err != nil {
return nil
}
return points
}

View File

@ -2,7 +2,6 @@ package stdlib
// Import all stdlib packages
import (
_ "github.com/influxdata/influxdb/v2/query/stdlib/experimental"
_ "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
_ "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb/v1"
_ "github.com/influxdata/influxdb/v2/query/stdlib/testing"

View File

@ -67,65 +67,3 @@ func (w *LoggingPointsWriter) WritePoints(ctx context.Context, orgID platform.ID
return err
}
type BufferedPointsWriter struct {
buf []models.Point
orgID platform.ID
bucketID platform.ID
n int
wr PointsWriter
err error
}
func NewBufferedPointsWriter(orgID platform.ID, bucketID platform.ID, size int, pointswriter PointsWriter) *BufferedPointsWriter {
return &BufferedPointsWriter{
buf: make([]models.Point, size),
orgID: orgID,
bucketID: bucketID,
wr: pointswriter,
}
}
// WritePoints writes the points to the underlying PointsWriter.
func (b *BufferedPointsWriter) WritePoints(ctx context.Context, p []models.Point) error {
for len(p) > b.Available() && b.err == nil {
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
b.err = b.wr.WritePoints(ctx, b.orgID, b.bucketID, p)
return b.err
}
n := copy(b.buf[b.n:], p)
b.n += n
b.err = b.Flush(ctx)
p = p[n:]
}
if b.err != nil {
return b.err
}
b.n += copy(b.buf[b.n:], p)
return nil
}
// Available returns how many models.Points are unused in the buffer.
func (b *BufferedPointsWriter) Available() int { return len(b.buf) - b.n }
// Buffered returns the number of models.Points that have been written into the current buffer.
func (b *BufferedPointsWriter) Buffered() int { return b.n }
// Flush writes any buffered data to the underlying PointsWriter.
func (b *BufferedPointsWriter) Flush(ctx context.Context) error {
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
b.err = b.wr.WritePoints(ctx, b.orgID, b.bucketID, b.buf[:b.n])
if b.err != nil {
return b.err
}
b.n = 0
return nil
}