feat(query/stdlib): add an experimental "to" function (#14664)

pull/14688/head
Christopher M. Wolff 2019-08-15 12:02:59 -07:00 committed by Nathaniel Cook
parent cd2bfb9b22
commit c61646d0f4
6 changed files with 304 additions and 3 deletions

2
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
github.com/influxdata/flux v0.39.1-0.20190815172552-7cd93c0dbf18
github.com/influxdata/flux v0.39.1-0.20190815180331-3137d2af4892
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jessevdk/go-flags v1.4.0

4
go.sum
View File

@ -205,8 +205,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/changelog v1.0.0 h1:RstJD6H48zLQj0GdE6E6k/6RPwtUjkyzIe/T1E/xuWU=
github.com/influxdata/changelog v1.0.0/go.mod h1:uzpGWE/qehT8L426YuXwpMQub+a63vIINhIeEI9mnSM=
github.com/influxdata/flux v0.39.1-0.20190815172552-7cd93c0dbf18 h1:2y7AEir7q9TfWziF2qq4CiDtWmHdog10X/zKgs+/dV0=
github.com/influxdata/flux v0.39.1-0.20190815172552-7cd93c0dbf18/go.mod h1:pFWDX62wdE2DtMsXtYpYMRrTkZiIn3BYB5mQgCF57Yw=
github.com/influxdata/flux v0.39.1-0.20190815180331-3137d2af4892 h1:kIhmKfxranXtHJ9BhhGXy2ZmGVoKfm6D8OK+Ua01Kko=
github.com/influxdata/flux v0.39.1-0.20190815180331-3137d2af4892/go.mod h1:pFWDX62wdE2DtMsXtYpYMRrTkZiIn3BYB5mQgCF57Yw=
github.com/influxdata/goreleaser v0.97.0-influx h1:jT5OrcW7WfS0e2QxfwmTBjhLvpIC9CDLRhNgZJyhj8s=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=

View File

@ -0,0 +1,159 @@
package experimental
import (
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/experimental"
platform "github.com/influxdata/influxdb"
)
// ToKind is the kind for the `to` flux function
const ExperimentalToKind = experimental.ExperimentalToKind
// ToOpSpec is the flux.OperationSpec for the `to` flux function.
type ToOpSpec struct {
Bucket string `json:"bucket"`
BucketID string `json:"bucketID"`
Org string `json:"org"`
OrgID string `json:"orgID"`
Host string `json:"host"`
Token string `json:"token"`
}
func init() {
toSignature := flux.FunctionSignature(
map[string]semantic.PolyType{
"bucket": semantic.String,
"bucketID": semantic.String,
"org": semantic.String,
"orgID": semantic.String,
"host": semantic.String,
"token": semantic.String,
},
[]string{},
)
flux.ReplacePackageValue("experimental", "to", flux.FunctionValueWithSideEffect("to", createToOpSpec, toSignature))
flux.RegisterOpSpec(ExperimentalToKind, func() flux.OperationSpec { return &ToOpSpec{} })
plan.RegisterProcedureSpecWithSideEffect(ExperimentalToKind, newToProcedure, ExperimentalToKind)
}
// ReadArgs reads the args from flux.Arguments into the op spec
func (o *ToOpSpec) ReadArgs(args flux.Arguments) error {
var err error
var ok bool
if o.Bucket, ok, _ = args.GetString("bucket"); !ok {
if o.BucketID, err = args.GetRequiredString("bucketID"); err != nil {
return err
}
} else if o.BucketID, ok, _ = args.GetString("bucketID"); ok {
return &flux.Error{
Code: codes.Invalid,
Msg: "cannot provide both `bucket` and `bucketID` parameters to the `to` function",
}
}
if o.Org, ok, _ = args.GetString("org"); !ok {
if o.OrgID, err = args.GetRequiredString("orgID"); err != nil {
return err
}
} else if o.OrgID, ok, _ = args.GetString("orgID"); ok {
return &flux.Error{
Code: codes.Invalid,
Msg: "cannot provide both `org` and `orgID` parameters to the `to` function",
}
}
if o.Host, ok, _ = args.GetString("host"); ok {
if o.Token, err = args.GetRequiredString("token"); err != nil {
return err
}
}
return err
}
func createToOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
if err := a.AddParentFromArgs(args); err != nil {
return nil, err
}
s := &ToOpSpec{}
if err := s.ReadArgs(args); err != nil {
return nil, err
}
return s, nil
}
// Kind returns the kind for the ToOpSpec function.
func (ToOpSpec) Kind() flux.OperationKind {
return ExperimentalToKind
}
// BucketsAccessed returns the buckets accessed by the spec.
func (o *ToOpSpec) BucketsAccessed(orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter) {
bf := platform.BucketFilter{}
if o.Bucket != "" {
bf.Name = &o.Bucket
}
if o.BucketID != "" {
id, err := platform.IDFromString(o.BucketID)
if err == nil {
bf.ID = id
}
}
if o.Org != "" {
bf.Org = &o.Org
}
if o.OrgID != "" {
id, err := platform.IDFromString(o.OrgID)
if err == nil {
bf.OrganizationID = id
}
}
writeBuckets = append(writeBuckets, bf)
return readBuckets, writeBuckets
}
// ToProcedureSpec is the procedure spec for the `to` flux function.
type ToProcedureSpec struct {
plan.DefaultCost
Spec *ToOpSpec
}
// Kind returns the kind for the procedure spec for the `to` flux function.
func (o *ToProcedureSpec) Kind() plan.ProcedureKind {
return ExperimentalToKind
}
// Copy clones the procedure spec for `to` flux function.
func (o *ToProcedureSpec) Copy() plan.ProcedureSpec {
s := o.Spec
res := &ToProcedureSpec{
Spec: &ToOpSpec{
Bucket: s.Bucket,
BucketID: s.BucketID,
Org: s.Org,
OrgID: s.OrgID,
Host: s.Host,
Token: s.Token,
},
}
return res
}
func newToProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*ToOpSpec)
if !ok && spec != nil {
return nil, &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("invalid spec type %T", qs),
}
}
return &ToProcedureSpec{Spec: spec}, nil
}

View File

@ -0,0 +1,125 @@
package experimental_test
import (
"fmt"
"testing"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/flux/stdlib/universe"
platform "github.com/influxdata/influxdb"
_ "github.com/influxdata/influxdb/query/builtin"
pquerytest "github.com/influxdata/influxdb/query/querytest"
"github.com/influxdata/influxdb/query/stdlib/experimental"
"github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
)
func TestTo_Query(t *testing.T) {
tests := []querytest.NewQueryTestCase{
{
Name: "from range pivot experimental to",
Raw: `import "experimental"
import "influxdata/influxdb/v1"
from(bucket:"mydb")
|> range(start: -1h)
|> v1.fieldsAsCols()
|> experimental.to(bucket:"series1", org:"fred", host:"localhost", token:"auth-token")`,
Want: &flux.Spec{
Operations: []*flux.Operation{
{
ID: "influxDBFrom0",
Spec: &influxdb.FromOpSpec{
Bucket: "mydb",
},
},
{
ID: "range1",
Spec: &universe.RangeOpSpec{
Start: flux.Time{IsRelative: true, Relative: -time.Hour},
Stop: flux.Time{IsRelative: true},
TimeColumn: "_time",
StartColumn: "_start",
StopColumn: "_stop",
},
},
{
ID: "pivot2",
Spec: &universe.PivotOpSpec{
RowKey: []string{"_time"},
ColumnKey: []string{"_field"},
ValueColumn: "_value"},
},
{
ID: "experimental-to3",
Spec: &experimental.ToOpSpec{
Bucket: "series1",
Org: "fred",
Host: "localhost",
Token: "auth-token",
},
},
},
Edges: []flux.Edge{
{Parent: "influxDBFrom0", Child: "range1"},
{Parent: "range1", Child: "pivot2"},
{Parent: "pivot2", Child: "experimental-to3"},
},
},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
querytest.NewQueryTestHelper(t, tc)
})
}
}
func TestToOpSpec_BucketsAccessed(t *testing.T) {
bucketName := "my_bucket"
bucketIDString := "ddddccccbbbbaaaa"
bucketID, err := platform.IDFromString(bucketIDString)
if err != nil {
t.Fatal(err)
}
orgName := "my_org"
orgIDString := "aaaabbbbccccdddd"
orgID, err := platform.IDFromString(orgIDString)
if err != nil {
t.Fatal(err)
}
tests := []pquerytest.BucketsAccessedTestCase{
{
Name: "from() with bucket and to with org and bucket",
Raw: fmt.Sprintf(`import "experimental"
from(bucket:"%s")
|> experimental.to(bucket:"%s", org:"%s")`, bucketName, bucketName, orgName),
WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
WantWriteBuckets: &[]platform.BucketFilter{{Name: &bucketName, Org: &orgName}},
},
{
Name: "from() with bucket and to with orgID and bucket",
Raw: fmt.Sprintf(`import "experimental"
from(bucket:"%s") |> experimental.to(bucket:"%s", orgID:"%s")`, bucketName, bucketName, orgIDString),
WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
WantWriteBuckets: &[]platform.BucketFilter{{Name: &bucketName, OrganizationID: orgID}},
},
{
Name: "from() with bucket and to with orgID and bucketID",
Raw: fmt.Sprintf(`import "experimental"
from(bucket:"%s") |> experimental.to(bucketID:"%s", orgID:"%s")`, bucketName, bucketIDString, orgIDString),
WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
WantWriteBuckets: &[]platform.BucketFilter{{ID: bucketID, OrganizationID: orgID}},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
pquerytest.BucketsAccessedTestHelper(t, tc)
})
}
}

View File

@ -188,6 +188,12 @@ func (o *ToOpSpec) BucketsAccessed(orgID *platform.ID) (readBuckets, writeBucket
if o.Bucket != "" {
bf.Name = &o.Bucket
}
if o.BucketID != "" {
id, err := platform.IDFromString(o.BucketID)
if err == nil {
bf.ID = id
}
}
if o.Org != "" {
bf.Org = &o.Org
}

View File

@ -85,6 +85,11 @@ func TestTo_Query(t *testing.T) {
func TestToOpSpec_BucketsAccessed(t *testing.T) {
bucketName := "my_bucket"
bucketIDString := "ddddccccbbbbaaaa"
bucketID, err := platform.IDFromString(bucketIDString)
if err != nil {
t.Fatal(err)
}
orgName := "my_org"
orgIDString := "aaaabbbbccccdddd"
orgID, err := platform.IDFromString(orgIDString)
@ -104,6 +109,12 @@ func TestToOpSpec_BucketsAccessed(t *testing.T) {
WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
WantWriteBuckets: &[]platform.BucketFilter{{Name: &bucketName, OrganizationID: orgID}},
},
{
Name: "from() with bucket and to with orgID and bucketID",
Raw: fmt.Sprintf(`from(bucket:"%s") |> to(bucketID:"%s", orgID:"%s")`, bucketName, bucketIDString, orgIDString),
WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
WantWriteBuckets: &[]platform.BucketFilter{{ID: bucketID, OrganizationID: orgID}},
},
}
for _, tc := range tests {