influxdb/patches/flux.patch

3015 lines
93 KiB
Diff

diff --git b/flux/stdlib/influxdata/influxdb/buckets.go a/flux/stdlib/influxdata/influxdb/buckets.go
index 4fd36f948d..9ecbe49234 100644
--- b/flux/stdlib/influxdata/influxdb/buckets.go
+++ a/flux/stdlib/influxdata/influxdb/buckets.go
@@ -2,6 +2,7 @@ package influxdb
import (
"context"
+ "errors"
"fmt"
"github.com/influxdata/flux"
@@ -11,8 +12,8 @@ import (
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/values"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/query"
+ "github.com/influxdata/influxdb/services/meta"
+ "github.com/influxdata/influxql"
)
func init() {
@@ -20,10 +21,9 @@ func init() {
}
type BucketsDecoder struct {
- orgID platform.ID
- deps BucketDependencies
- buckets []*platform.Bucket
- alloc *memory.Allocator
+ deps StorageDependencies
+ alloc *memory.Allocator
+ user meta.User
}
func (bd *BucketsDecoder) Connect(ctx context.Context) error {
@@ -31,20 +31,12 @@ func (bd *BucketsDecoder) Connect(ctx context.Context) error {
}
func (bd *BucketsDecoder) Fetch(ctx context.Context) (bool, error) {
- b, count := bd.deps.FindAllBuckets(ctx, bd.orgID)
- if count <= 0 {
- return false, &flux.Error{
- Code: codes.NotFound,
- Msg: fmt.Sprintf("no buckets found in organization %v", bd.orgID),
- }
- }
- bd.buckets = b
return false, nil
}
func (bd *BucketsDecoder) Decode(ctx context.Context) (flux.Table, error) {
kb := execute.NewGroupKeyBuilder(nil)
- kb.AddKeyValue("organizationID", values.NewString(bd.buckets[0].OrgID.String()))
+ kb.AddKeyValue("organizationID", values.NewString(""))
gk, err := kb.Build()
if err != nil {
return nil, err
@@ -52,43 +44,50 @@ func (bd *BucketsDecoder) Decode(ctx context.Context) (flux.Table, error) {
b := execute.NewColListTableBuilder(gk, bd.alloc)
- if _, err := b.AddCol(flux.ColMeta{
+ _, _ = b.AddCol(flux.ColMeta{
Label: "name",
Type: flux.TString,
- }); err != nil {
- return nil, err
- }
- if _, err := b.AddCol(flux.ColMeta{
+ })
+ _, _ = b.AddCol(flux.ColMeta{
Label: "id",
Type: flux.TString,
- }); err != nil {
- return nil, err
- }
- if _, err := b.AddCol(flux.ColMeta{
+ })
+ _, _ = b.AddCol(flux.ColMeta{
Label: "organizationID",
Type: flux.TString,
- }); err != nil {
- return nil, err
- }
- if _, err := b.AddCol(flux.ColMeta{
+ })
+ _, _ = b.AddCol(flux.ColMeta{
Label: "retentionPolicy",
Type: flux.TString,
- }); err != nil {
- return nil, err
- }
- if _, err := b.AddCol(flux.ColMeta{
+ })
+ _, _ = b.AddCol(flux.ColMeta{
Label: "retentionPeriod",
Type: flux.TInt,
- }); err != nil {
- return nil, err
+ })
+
+ var hasAccess func(db string) bool
+ if bd.user == nil {
+ hasAccess = func(db string) bool {
+ return true
+ }
+ } else {
+ hasAccess = func(db string) bool {
+ return bd.deps.Authorizer.AuthorizeDatabase(bd.user, influxql.ReadPrivilege, db) == nil ||
+ bd.deps.Authorizer.AuthorizeDatabase(bd.user, influxql.WritePrivilege, db) == nil
+ }
}
- for _, bucket := range bd.buckets {
- _ = b.AppendString(0, bucket.Name)
- _ = b.AppendString(1, bucket.ID.String())
- _ = b.AppendString(2, bucket.OrgID.String())
- _ = b.AppendString(3, bucket.RetentionPolicyName)
- _ = b.AppendInt(4, bucket.RetentionPeriod.Nanoseconds())
+ for _, db := range bd.deps.MetaClient.Databases() {
+ if hasAccess(db.Name) {
+ for _, rp := range db.RetentionPolicies {
+ _ = b.AppendString(0, db.Name+"/"+rp.Name)
+ _ = b.AppendString(1, "")
+ _ = b.AppendString(2, "influxdb")
+ _ = b.AppendString(3, "")
+ _ = b.AppendString(4, rp.Name)
+ _ = b.AppendInt(5, rp.Duration.Nanoseconds())
+ }
+ }
}
return b.Table()
@@ -109,22 +108,22 @@ func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
// the dependencies used for FromKind are adequate for what we need here
// so there's no need to inject custom dependencies for buckets()
- deps := GetStorageDependencies(a.Context()).BucketDeps
- req := query.RequestFromContext(a.Context())
- if req == nil {
- return nil, &flux.Error{
- Code: codes.Internal,
- Msg: "missing request on context",
+ deps := GetStorageDependencies(a.Context())
+
+ var user meta.User
+ if deps.AuthEnabled {
+ user = meta.UserFromContext(a.Context())
+ if user == nil {
+ return nil, errors.New("createBucketsSource: no user")
}
}
- orgID := req.OrganizationID
- bd := &BucketsDecoder{orgID: orgID, deps: deps, alloc: a.Allocator()}
+ bd := &BucketsDecoder{deps: deps, alloc: a.Allocator(), user: user}
return execute.CreateSourceFromDecoder(bd, dsid, a)
}
-type AllBucketLookup interface {
- FindAllBuckets(ctx context.Context, orgID platform.ID) ([]*platform.Bucket, int)
+type MetaClient interface {
+ Databases() []meta.DatabaseInfo
+ Database(name string) *meta.DatabaseInfo
}
-type BucketDependencies AllBucketLookup
diff --git b/flux/stdlib/influxdata/influxdb/dependencies.go a/flux/stdlib/influxdata/influxdb/dependencies.go
index 3303c27589..ad9a36ab6d 100644
--- b/flux/stdlib/influxdata/influxdb/dependencies.go
+++ a/flux/stdlib/influxdata/influxdb/dependencies.go
@@ -2,13 +2,9 @@ package influxdb
import (
"context"
+ "errors"
"github.com/influxdata/flux"
- "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/kit/prom"
- "github.com/influxdata/influxdb/query"
- "github.com/influxdata/influxdb/storage"
- "github.com/prometheus/client_golang/prometheus"
)
type key int
@@ -16,33 +12,31 @@ type key int
const dependenciesKey key = iota
type StorageDependencies struct {
- FromDeps FromDependencies
- BucketDeps BucketDependencies
- ToDeps ToDependencies
+ Reader Reader
+ MetaClient MetaClient
+ Authorizer Authorizer
+ AuthEnabled bool
}
func (d StorageDependencies) Inject(ctx context.Context) context.Context {
return context.WithValue(ctx, dependenciesKey, d)
}
-func GetStorageDependencies(ctx context.Context) StorageDependencies {
- return ctx.Value(dependenciesKey).(StorageDependencies)
-}
-
-// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
-func (d StorageDependencies) PrometheusCollectors() []prometheus.Collector {
- depS := []interface{}{
- d.FromDeps,
- d.BucketDeps,
- d.ToDeps,
+func (d StorageDependencies) Validate() error {
+ if d.Reader == nil {
+ return errors.New("missing reader dependency")
}
- collectors := make([]prometheus.Collector, 0, len(depS))
- for _, v := range depS {
- if pc, ok := v.(prom.PrometheusCollector); ok {
- collectors = append(collectors, pc.PrometheusCollectors()...)
- }
+ if d.MetaClient == nil {
+ return errors.New("missing meta client dependency")
}
- return collectors
+ if d.AuthEnabled && d.Authorizer == nil {
+ return errors.New("missing authorizer dependency")
+ }
+ return nil
+}
+
+func GetStorageDependencies(ctx context.Context) StorageDependencies {
+ return ctx.Value(dependenciesKey).(StorageDependencies)
}
type Dependencies struct {
@@ -55,45 +49,21 @@ func (d Dependencies) Inject(ctx context.Context) context.Context {
return d.StorageDeps.Inject(ctx)
}
-// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
-func (d Dependencies) PrometheusCollectors() []prometheus.Collector {
- collectors := d.StorageDeps.PrometheusCollectors()
- if pc, ok := d.FluxDeps.(prom.PrometheusCollector); ok {
- collectors = append(collectors, pc.PrometheusCollectors()...)
- }
- return collectors
-}
-
func NewDependencies(
+ mc MetaClient,
reader Reader,
- writer storage.PointsWriter,
- bucketSvc influxdb.BucketService,
- orgSvc influxdb.OrganizationService,
- ss influxdb.SecretService,
- metricLabelKeys []string,
+ auth Authorizer,
+ authEnabled bool,
) (Dependencies, error) {
fdeps := flux.NewDefaultDependencies()
- fdeps.Deps.SecretService = query.FromSecretService(ss)
deps := Dependencies{FluxDeps: fdeps}
- bucketLookupSvc := query.FromBucketService(bucketSvc)
- orgLookupSvc := query.FromOrganizationService(orgSvc)
- metrics := NewMetrics(metricLabelKeys)
- deps.StorageDeps.FromDeps = FromDependencies{
- Reader: reader,
- BucketLookup: bucketLookupSvc,
- OrganizationLookup: orgLookupSvc,
- Metrics: metrics,
- }
- if err := deps.StorageDeps.FromDeps.Validate(); err != nil {
- return Dependencies{}, err
- }
- deps.StorageDeps.BucketDeps = bucketLookupSvc
- deps.StorageDeps.ToDeps = ToDependencies{
- BucketLookup: bucketLookupSvc,
- OrganizationLookup: orgLookupSvc,
- PointsWriter: writer,
+ deps.StorageDeps = StorageDependencies{
+ Reader: reader,
+ MetaClient: mc,
+ Authorizer: auth,
+ AuthEnabled: authEnabled,
}
- if err := deps.StorageDeps.ToDeps.Validate(); err != nil {
+ if err := deps.StorageDeps.Validate(); err != nil {
return Dependencies{}, err
}
return deps, nil
diff --git b/flux/stdlib/influxdata/influxdb/from.go a/flux/stdlib/influxdata/influxdb/from.go
index cdd6789c19..6662f54fd4 100644
--- b/flux/stdlib/influxdata/influxdb/from.go
+++ a/flux/stdlib/influxdata/influxdb/from.go
@@ -8,7 +8,6 @@ import (
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
- platform "github.com/influxdata/influxdb"
)
const FromKind = "influxDBFrom"
@@ -71,31 +70,6 @@ func (s *FromOpSpec) Kind() flux.OperationKind {
return FromKind
}
-// BucketsAccessed makes FromOpSpec a query.BucketAwareOperationSpec
-func (s *FromOpSpec) BucketsAccessed(orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter) {
- bf := platform.BucketFilter{}
- if s.Bucket != "" {
- bf.Name = &s.Bucket
- }
- if orgID != nil {
- bf.OrganizationID = orgID
- }
-
- if len(s.BucketID) > 0 {
- if id, err := platform.IDFromString(s.BucketID); err != nil {
- invalidID := platform.InvalidID()
- bf.ID = &invalidID
- } else {
- bf.ID = id
- }
- }
-
- if bf.ID != nil || bf.Name != nil {
- readBuckets = append(readBuckets, bf)
- }
- return readBuckets, writeBuckets
-}
-
type FromProcedureSpec struct {
Bucket string
BucketID string
diff --git b/flux/stdlib/influxdata/influxdb/from_test.go a/flux/stdlib/influxdata/influxdb/from_test.go
index dac3b13eee..daba8c9362 100644
--- b/flux/stdlib/influxdata/influxdb/from_test.go
+++ a/flux/stdlib/influxdata/influxdb/from_test.go
@@ -1,192 +1 @@
package influxdb_test
-
-import (
- "fmt"
- "testing"
- "time"
-
- "github.com/influxdata/flux"
- "github.com/influxdata/flux/execute"
- "github.com/influxdata/flux/plan"
- "github.com/influxdata/flux/plan/plantest"
- "github.com/influxdata/flux/querytest"
- "github.com/influxdata/flux/stdlib/universe"
- platform "github.com/influxdata/influxdb"
- pquerytest "github.com/influxdata/influxdb/query/querytest"
- "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
-)
-
-func TestFrom_NewQuery(t *testing.T) {
- t.Skip()
- tests := []querytest.NewQueryTestCase{
- {
- Name: "from no args",
- Raw: `from()`,
- WantErr: true,
- },
- {
- Name: "from conflicting args",
- Raw: `from(bucket:"d", bucket:"b")`,
- WantErr: true,
- },
- {
- Name: "from repeat arg",
- Raw: `from(bucket:"telegraf", bucket:"oops")`,
- WantErr: true,
- },
- {
- Name: "from",
- Raw: `from(bucket:"telegraf", chicken:"what is this?")`,
- WantErr: true,
- },
- {
- Name: "from bucket invalid ID",
- Raw: `from(bucketID:"invalid")`,
- WantErr: true,
- },
- {
- Name: "from bucket ID",
- Raw: `from(bucketID:"aaaabbbbccccdddd")`,
- Want: &flux.Spec{
- Operations: []*flux.Operation{
- {
- ID: "from0",
- Spec: &influxdb.FromOpSpec{
- BucketID: "aaaabbbbccccdddd",
- },
- },
- },
- },
- },
- {
- Name: "from with database",
- Raw: `from(bucket:"mybucket") |> range(start:-4h, stop:-2h) |> sum()`,
- Want: &flux.Spec{
- Operations: []*flux.Operation{
- {
- ID: "from0",
- Spec: &influxdb.FromOpSpec{
- Bucket: "mybucket",
- },
- },
- {
- ID: "range1",
- Spec: &universe.RangeOpSpec{
- Start: flux.Time{
- Relative: -4 * time.Hour,
- IsRelative: true,
- },
- Stop: flux.Time{
- Relative: -2 * time.Hour,
- IsRelative: true,
- },
- TimeColumn: "_time",
- StartColumn: "_start",
- StopColumn: "_stop",
- },
- },
- {
- ID: "sum2",
- Spec: &universe.SumOpSpec{
- AggregateConfig: execute.DefaultAggregateConfig,
- },
- },
- },
- Edges: []flux.Edge{
- {Parent: "from0", Child: "range1"},
- {Parent: "range1", Child: "sum2"},
- },
- },
- },
- }
- for _, tc := range tests {
- tc := tc
- t.Run(tc.Name, func(t *testing.T) {
- t.Parallel()
- querytest.NewQueryTestHelper(t, tc)
- })
- }
-}
-
-func TestFromOperation_Marshaling(t *testing.T) {
- t.Skip()
- data := []byte(`{"id":"from","kind":"from","spec":{"bucket":"mybucket"}}`)
- op := &flux.Operation{
- ID: "from",
- Spec: &influxdb.FromOpSpec{
- Bucket: "mybucket",
- },
- }
- querytest.OperationMarshalingTestHelper(t, data, op)
-}
-
-func TestFromOpSpec_BucketsAccessed(t *testing.T) {
- bucketName := "my_bucket"
- bucketIDString := "aaaabbbbccccdddd"
- bucketID, err := platform.IDFromString(bucketIDString)
- if err != nil {
- t.Fatal(err)
- }
- invalidID := platform.InvalidID()
- tests := []pquerytest.BucketsAccessedTestCase{
- {
- Name: "From with bucket",
- Raw: fmt.Sprintf(`from(bucket:"%s")`, bucketName),
- WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
- WantWriteBuckets: &[]platform.BucketFilter{},
- },
- {
- Name: "From with bucketID",
- Raw: fmt.Sprintf(`from(bucketID:"%s")`, bucketID),
- WantReadBuckets: &[]platform.BucketFilter{{ID: bucketID}},
- WantWriteBuckets: &[]platform.BucketFilter{},
- },
- {
- Name: "From invalid bucketID",
- Raw: `from(bucketID:"invalid")`,
- WantReadBuckets: &[]platform.BucketFilter{{ID: &invalidID}},
- WantWriteBuckets: &[]platform.BucketFilter{},
- },
- }
- for _, tc := range tests {
- tc := tc
- t.Run(tc.Name, func(t *testing.T) {
- t.Parallel()
- pquerytest.BucketsAccessedTestHelper(t, tc)
- })
- }
-}
-
-func TestFromValidation(t *testing.T) {
- spec := plantest.PlanSpec{
- // from |> group (cannot query an infinite time range)
- Nodes: []plan.Node{
- plan.CreateLogicalNode("from", &influxdb.FromProcedureSpec{
- Bucket: "my-bucket",
- }),
- plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
- GroupMode: flux.GroupModeBy,
- GroupKeys: []string{"_measurement", "_field"},
- }),
- },
- Edges: [][2]int{
- {0, 1},
- },
- }
-
- ps := plantest.CreatePlanSpec(&spec)
- pp := plan.NewPhysicalPlanner(plan.OnlyPhysicalRules(
- influxdb.PushDownRangeRule{},
- influxdb.PushDownFilterRule{},
- influxdb.PushDownGroupRule{},
- ))
- _, err := pp.Plan(ps)
- if err == nil {
- t.Error("Expected query with no call to range to fail physical planning")
- }
- want := `cannot submit unbounded read to "my-bucket"; try bounding 'from' with a call to 'range'`
- got := err.Error()
- if want != got {
- t.Errorf("unexpected error; -want/+got\n- %s\n+ %s", want, got)
- }
-}
diff --git b/flux/stdlib/influxdata/influxdb/metrics.go a/flux/stdlib/influxdata/influxdb/metrics.go
index 82577e3a57..dd3cee868b 100644
--- b/flux/stdlib/influxdata/influxdb/metrics.go
+++ a/flux/stdlib/influxdata/influxdb/metrics.go
@@ -1,83 +1,3 @@
package influxdb
-import (
- "context"
- "fmt"
- "time"
-
- platform "github.com/influxdata/influxdb"
- "github.com/prometheus/client_golang/prometheus"
-)
-
-const (
- orgLabel = "org"
- opLabel = "op"
-)
-
-type metrics struct {
- ctxLabelKeys []string
- requestDur *prometheus.HistogramVec
-}
-
-// NewMetrics produces a new metrics objects for an influxdb source.
-// Currently it just collects the duration of read requests into a histogram.
-// ctxLabelKeys is a list of labels to add to the produced metrics.
-// The value for a given key will be read off the context.
-// The context value must be a string or an implementation of the Stringer interface.
-// In addition, produced metrics will be labeled with the orgID and type of operation requested.
-func NewMetrics(ctxLabelKeys []string) *metrics {
- labelKeys := make([]string, len(ctxLabelKeys)+2)
- copy(labelKeys, ctxLabelKeys)
- labelKeys[len(labelKeys)-2] = orgLabel
- labelKeys[len(labelKeys)-1] = opLabel
-
- m := new(metrics)
- m.requestDur = prometheus.NewHistogramVec(prometheus.HistogramOpts{
- Namespace: "query",
- Subsystem: "influxdb_source",
- Name: "read_request_duration_seconds",
- Help: "Histogram of times spent in read requests",
- Buckets: prometheus.ExponentialBuckets(1e-3, 5, 7),
- }, labelKeys)
- m.ctxLabelKeys = ctxLabelKeys
- return m
-}
-
-// PrometheusCollectors satisfies the PrometheusCollector interface.
-func (m *metrics) PrometheusCollectors() []prometheus.Collector {
- if m == nil {
- // if metrics happens to be nil here (such as for a test), then let's not panic.
- return nil
- }
- return []prometheus.Collector{
- m.requestDur,
- }
-}
-
-func (m *metrics) getLabelValues(ctx context.Context, orgID platform.ID, op string) []string {
- if m == nil {
- return nil
- }
- labelValues := make([]string, len(m.ctxLabelKeys)+2)
- for i, k := range m.ctxLabelKeys {
- value := ctx.Value(k)
- var str string
- switch v := value.(type) {
- case string:
- str = v
- case fmt.Stringer:
- str = v.String()
- }
- labelValues[i] = str
- }
- labelValues[len(labelValues)-2] = orgID.String()
- labelValues[len(labelValues)-1] = op
- return labelValues
-}
-
-func (m *metrics) recordMetrics(labelValues []string, start time.Time) {
- if m == nil {
- return
- }
- m.requestDur.WithLabelValues(labelValues...).Observe(time.Since(start).Seconds())
-}
+// Storage metrics are not implemented in the 1.x flux engine.
diff --git b/flux/stdlib/influxdata/influxdb/operators.go a/flux/stdlib/influxdata/influxdb/operators.go
index 4203b074be..b0db495917 100644
--- b/flux/stdlib/influxdata/influxdb/operators.go
+++ a/flux/stdlib/influxdata/influxdb/operators.go
@@ -2,14 +2,16 @@ package influxdb
import (
"context"
- "fmt"
+ "errors"
+ "strings"
"github.com/influxdata/flux"
- "github.com/influxdata/flux/codes"
+ "github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
- "github.com/influxdata/influxdb"
+ "github.com/influxdata/influxdb/services/meta"
+ "github.com/influxdata/influxql"
)
const (
@@ -79,34 +81,43 @@ func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec {
return ns
}
-func (s *ReadRangePhysSpec) LookupBucketID(ctx context.Context, orgID influxdb.ID, buckets BucketLookup) (influxdb.ID, error) {
- // Determine bucketID
- switch {
- case s.Bucket != "":
- b, ok := buckets.Lookup(ctx, orgID, s.Bucket)
- if !ok {
- return 0, &flux.Error{
- Code: codes.NotFound,
- Msg: fmt.Sprintf("could not find bucket %q", s.Bucket),
- }
- }
- return b, nil
- case len(s.BucketID) != 0:
- var b influxdb.ID
- if err := b.DecodeFromString(s.BucketID); err != nil {
- return 0, &flux.Error{
- Code: codes.Invalid,
- Msg: "invalid bucket id",
- Err: err,
- }
+func (s *ReadRangePhysSpec) LookupDatabase(ctx context.Context, deps StorageDependencies, a execute.Administration) (string, string, error) {
+ if len(s.BucketID) != 0 {
+ return "", "", errors.New("cannot refer to buckets by their id in 1.x")
+ }
+
+ var db, rp string
+ if i := strings.IndexByte(s.Bucket, '/'); i == -1 {
+ db = s.Bucket
+ } else {
+ rp = s.Bucket[i+1:]
+ db = s.Bucket[:i]
+ }
+
+ // validate and resolve db/rp
+ di := deps.MetaClient.Database(db)
+ if di == nil {
+ return "", "", errors.New("no database")
+ }
+
+ if deps.AuthEnabled {
+ user := meta.UserFromContext(a.Context())
+ if user == nil {
+ return "", "", errors.New("createFromSource: no user")
}
- return b, nil
- default:
- return 0, &flux.Error{
- Code: codes.Invalid,
- Msg: "no bucket name or id have been specified",
+ if err := deps.Authorizer.AuthorizeDatabase(user, influxql.ReadPrivilege, db); err != nil {
+ return "", "", err
}
}
+
+ if rp == "" {
+ rp = di.DefaultRetentionPolicy
+ }
+
+ if rpi := di.RetentionPolicy(rp); rpi == nil {
+ return "", "", errors.New("invalid retention policy")
+ }
+ return db, rp, nil
}
// TimeBounds implements plan.BoundsAwareProcedureSpec.
diff --git b/flux/stdlib/influxdata/influxdb/rules.go a/flux/stdlib/influxdata/influxdb/rules.go
index 4102a2a73e..8ce3a20e04 100644
--- b/flux/stdlib/influxdata/influxdb/rules.go
+++ a/flux/stdlib/influxdata/influxdb/rules.go
@@ -198,6 +198,12 @@ func (rule PushDownReadTagKeysRule) Rewrite(pn plan.Node) (plan.Node, bool, erro
// constructing our own replacement. We do not care about it
// at the moment though which is why it is not in the pattern.
+ // The tag keys mechanism doesn't know about fields so we cannot
+ // push down _field comparisons in 1.x.
+ if hasFieldExpr(fromSpec.Filter) {
+ return pn, false, nil
+ }
+
// The schema mutator needs to correspond to a keep call
// on the column specified by the keys procedure.
if len(keepSpec.Mutations) != 1 {
@@ -227,6 +233,20 @@ func (rule PushDownReadTagKeysRule) Rewrite(pn plan.Node) (plan.Node, bool, erro
}), true, nil
}
+func hasFieldExpr(expr semantic.Expression) bool {
+ hasField := false
+ v := semantic.CreateVisitor(func(node semantic.Node) {
+ switch n := node.(type) {
+ case *semantic.MemberExpression:
+ if n.Property == "_field" {
+ hasField = true
+ }
+ }
+ })
+ semantic.Walk(v, expr)
+ return hasField
+}
+
// PushDownReadTagValuesRule matches 'ReadRange |> keep(columns: [tag]) |> group() |> distinct(column: tag)'.
// The 'from()' must have already been merged with 'range' and, optionally,
// may have been merged with 'filter'.
@@ -304,6 +324,9 @@ var invalidTagKeysForTagValues = []string{
execute.DefaultValueColLabel,
execute.DefaultStartColLabel,
execute.DefaultStopColLabel,
+ // TODO(jsternberg): There just doesn't seem to be a good way to do this
+ // in the 1.x line of the release.
+ "_field",
}
// isValidTagKeyForTagValues returns true if the given key can
diff --git b/flux/stdlib/influxdata/influxdb/rules_test.go a/flux/stdlib/influxdata/influxdb/rules_test.go
index b4f7b79efe..e27ecabf24 100644
--- b/flux/stdlib/influxdata/influxdb/rules_test.go
+++ a/flux/stdlib/influxdata/influxdb/rules_test.go
@@ -12,7 +12,7 @@ import (
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/universe"
- "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
+ "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
)
func fluxTime(t int64) flux.Time {
diff --git b/flux/stdlib/influxdata/influxdb/source.go a/flux/stdlib/influxdata/influxdb/source.go
index 0ace5b7dc5..3e0d5b6542 100644
--- b/flux/stdlib/influxdata/influxdb/source.go
+++ a/flux/stdlib/influxdata/influxdb/source.go
@@ -3,7 +3,6 @@ package influxdb
import (
"context"
"errors"
- "time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
@@ -11,9 +10,6 @@ import (
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/kit/tracing"
- "github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb/cursors"
)
@@ -36,24 +32,10 @@ type Source struct {
stats cursors.CursorStats
runner runner
-
- m *metrics
- orgID platform.ID
- op string
}
func (s *Source) Run(ctx context.Context) {
- labelValues := s.m.getLabelValues(ctx, s.orgID, s.op)
- start := time.Now()
- var err error
- if flux.IsExperimentalTracingEnabled() {
- span, ctxWithSpan := tracing.StartSpanFromContextWithOperationName(ctx, "source-"+s.op)
- err = s.runner.run(ctxWithSpan)
- span.Finish()
- } else {
- err = s.runner.run(ctx)
- }
- s.m.recordMetrics(labelValues, start)
+ err := s.runner.run(ctx)
for _, t := range s.ts {
t.Finish(s.id, err)
}
@@ -130,10 +112,6 @@ func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, a
src.reader = r
src.readSpec = readSpec
- src.m = GetStorageDependencies(a.Context()).FromDeps.Metrics
- src.orgID = readSpec.OrganizationID
- src.op = "readFilter"
-
src.runner = src
return src
}
@@ -152,8 +130,7 @@ func (s *readFilterSource) run(ctx context.Context) error {
}
func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) {
- span, ctx := tracing.StartSpanFromContext(a.Context())
- defer span.Finish()
+ ctx := a.Context()
spec := s.(*ReadRangePhysSpec)
@@ -165,18 +142,9 @@ func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execut
}
}
- deps := GetStorageDependencies(a.Context()).FromDeps
-
- req := query.RequestFromContext(a.Context())
- if req == nil {
- return nil, &flux.Error{
- Code: codes.Internal,
- Msg: "missing request on context",
- }
- }
+ deps := GetStorageDependencies(a.Context())
- orgID := req.OrganizationID
- bucketID, err := spec.LookupBucketID(ctx, orgID, deps.BucketLookup)
+ db, rp, err := spec.LookupDatabase(ctx, deps, a)
if err != nil {
return nil, err
}
@@ -189,10 +157,10 @@ func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execut
id,
deps.Reader,
ReadFilterSpec{
- OrganizationID: orgID,
- BucketID: bucketID,
- Bounds: *bounds,
- Predicate: filter,
+ Database: db,
+ RetentionPolicy: rp,
+ Bounds: *bounds,
+ Predicate: filter,
},
a,
), nil
@@ -213,10 +181,6 @@ func ReadGroupSource(id execute.DatasetID, r Reader, readSpec ReadGroupSpec, a e
src.reader = r
src.readSpec = readSpec
- src.m = GetStorageDependencies(a.Context()).FromDeps.Metrics
- src.orgID = readSpec.OrganizationID
- src.op = "readGroup"
-
src.runner = src
return src
}
@@ -235,8 +199,7 @@ func (s *readGroupSource) run(ctx context.Context) error {
}
func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) {
- span, ctx := tracing.StartSpanFromContext(a.Context())
- defer span.Finish()
+ ctx := a.Context()
spec := s.(*ReadGroupPhysSpec)
@@ -245,15 +208,9 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute
return nil, errors.New("nil bounds passed to from")
}
- deps := GetStorageDependencies(a.Context()).FromDeps
-
- req := query.RequestFromContext(a.Context())
- if req == nil {
- return nil, errors.New("missing request on context")
- }
+ deps := GetStorageDependencies(a.Context())
- orgID := req.OrganizationID
- bucketID, err := spec.LookupBucketID(ctx, orgID, deps.BucketLookup)
+ db, rp, err := spec.LookupDatabase(ctx, deps, a)
if err != nil {
return nil, err
}
@@ -267,10 +224,10 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute
deps.Reader,
ReadGroupSpec{
ReadFilterSpec: ReadFilterSpec{
- OrganizationID: orgID,
- BucketID: bucketID,
- Bounds: *bounds,
- Predicate: filter,
+ Database: db,
+ RetentionPolicy: rp,
+ Bounds: *bounds,
+ Predicate: filter,
},
GroupMode: ToGroupMode(spec.GroupMode),
GroupKeys: spec.GroupKeys,
@@ -281,18 +238,12 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute
}
func createReadTagKeysSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
- span, ctx := tracing.StartSpanFromContext(a.Context())
- defer span.Finish()
+ ctx := a.Context()
spec := prSpec.(*ReadTagKeysPhysSpec)
- deps := GetStorageDependencies(a.Context()).FromDeps
- req := query.RequestFromContext(a.Context())
- if req == nil {
- return nil, errors.New("missing request on context")
- }
- orgID := req.OrganizationID
+ deps := GetStorageDependencies(a.Context())
- bucketID, err := spec.LookupBucketID(ctx, orgID, deps.BucketLookup)
+ db, rp, err := spec.LookupDatabase(ctx, deps, a)
if err != nil {
return nil, err
}
@@ -308,10 +259,10 @@ func createReadTagKeysSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID,
deps.Reader,
ReadTagKeysSpec{
ReadFilterSpec: ReadFilterSpec{
- OrganizationID: orgID,
- BucketID: bucketID,
- Bounds: *bounds,
- Predicate: filter,
+ Database: db,
+ RetentionPolicy: rp,
+ Bounds: *bounds,
+ Predicate: filter,
},
},
a,
@@ -333,10 +284,6 @@ func ReadTagKeysSource(id execute.DatasetID, r Reader, readSpec ReadTagKeysSpec,
src.id = id
src.alloc = a.Allocator()
- src.m = GetStorageDependencies(a.Context()).FromDeps.Metrics
- src.orgID = readSpec.OrganizationID
- src.op = "readTagKeys"
-
src.runner = src
return src
}
@@ -350,18 +297,12 @@ func (s *readTagKeysSource) run(ctx context.Context) error {
}
func createReadTagValuesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
- span, ctx := tracing.StartSpanFromContext(a.Context())
- defer span.Finish()
+ ctx := a.Context()
spec := prSpec.(*ReadTagValuesPhysSpec)
- deps := GetStorageDependencies(a.Context()).FromDeps
- req := query.RequestFromContext(a.Context())
- if req == nil {
- return nil, errors.New("missing request on context")
- }
- orgID := req.OrganizationID
+ deps := GetStorageDependencies(a.Context())
- bucketID, err := spec.LookupBucketID(ctx, orgID, deps.BucketLookup)
+ db, rp, err := spec.LookupDatabase(ctx, deps, a)
if err != nil {
return nil, err
}
@@ -377,10 +318,10 @@ func createReadTagValuesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID
deps.Reader,
ReadTagValuesSpec{
ReadFilterSpec: ReadFilterSpec{
- OrganizationID: orgID,
- BucketID: bucketID,
- Bounds: *bounds,
- Predicate: filter,
+ Database: db,
+ RetentionPolicy: rp,
+ Bounds: *bounds,
+ Predicate: filter,
},
TagKey: spec.TagKey,
},
@@ -403,10 +344,6 @@ func ReadTagValuesSource(id execute.DatasetID, r Reader, readSpec ReadTagValuesS
src.id = id
src.alloc = a.Allocator()
- src.m = GetStorageDependencies(a.Context()).FromDeps.Metrics
- src.orgID = readSpec.OrganizationID
- src.op = "readTagValues"
-
src.runner = src
return src
}
diff --git b/flux/stdlib/influxdata/influxdb/source_test.go a/flux/stdlib/influxdata/influxdb/source_test.go
index 1cdda0f935..daba8c9362 100644
--- b/flux/stdlib/influxdata/influxdb/source_test.go
+++ a/flux/stdlib/influxdata/influxdb/source_test.go
@@ -1,131 +1 @@
package influxdb_test
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/influxdata/flux"
- "github.com/influxdata/flux/dependencies/dependenciestest"
- "github.com/influxdata/flux/execute"
- "github.com/influxdata/flux/memory"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/kit/prom/promtest"
- "github.com/influxdata/influxdb/mock"
- "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
- "github.com/influxdata/influxdb/tsdb/cursors"
- "github.com/influxdata/influxdb/uuid"
- "github.com/prometheus/client_golang/prometheus"
-)
-
-type mockTableIterator struct {
-}
-
-func (mockTableIterator) Do(f func(flux.Table) error) error {
- return nil
-}
-
-func (mockTableIterator) Statistics() cursors.CursorStats {
- return cursors.CursorStats{}
-}
-
-type mockReader struct {
-}
-
-func (mockReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
- return &mockTableIterator{}, nil
-}
-
-func (mockReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
- return &mockTableIterator{}, nil
-}
-
-func (mockReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
- return &mockTableIterator{}, nil
-}
-
-func (mockReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
- return &mockTableIterator{}, nil
-}
-
-func (mockReader) Close() {
-}
-
-type mockAdministration struct {
- Ctx context.Context
-}
-
-func (a mockAdministration) Context() context.Context {
- return a.Ctx
-}
-
-func (mockAdministration) ResolveTime(qt flux.Time) execute.Time {
- return 0
-}
-
-func (mockAdministration) StreamContext() execute.StreamContext {
- return nil
-}
-
-func (mockAdministration) Allocator() *memory.Allocator {
- return &memory.Allocator{}
-}
-
-func (mockAdministration) Parents() []execute.DatasetID {
- return nil
-}
-
-const (
- labelKey = "key1"
- labelValue = "value1"
-)
-
-// TestMetrics ensures that the metrics collected by an influxdb source are recorded.
-func TestMetrics(t *testing.T) {
- reg := prometheus.NewRegistry()
-
- orgID, err := platform.IDFromString("deadbeefbeefdead")
- if err != nil {
- t.Fatal(err)
- }
-
- deps := influxdb.Dependencies{
- FluxDeps: dependenciestest.Default(),
- StorageDeps: influxdb.StorageDependencies{
- FromDeps: influxdb.FromDependencies{
- Reader: &mockReader{},
- BucketLookup: mock.BucketLookup{},
- OrganizationLookup: mock.OrganizationLookup{},
- Metrics: influxdb.NewMetrics([]string{labelKey}),
- },
- },
- }
- reg.MustRegister(deps.PrometheusCollectors()...)
-
- // This key/value pair added to the context will appear as a label in the prometheus histogram.
- ctx := context.WithValue(context.Background(), labelKey, labelValue) //lint:ignore SA1029 this is a temporary ignore until we have time to create an appropriate type
- // Injecting deps
- ctx = deps.Inject(ctx)
- a := &mockAdministration{Ctx: ctx}
- rfs := influxdb.ReadFilterSource(
- execute.DatasetID(uuid.FromTime(time.Now())),
- &mockReader{},
- influxdb.ReadFilterSpec{
- OrganizationID: *orgID,
- },
- a,
- )
- rfs.Run(ctx)
-
- // Verify that we sampled the execution of the source by checking the prom registry.
- mfs := promtest.MustGather(t, reg)
- expectedLabels := map[string]string{
- "org": "deadbeefbeefdead",
- "key1": "value1",
- "op": "readFilter",
- }
- m := promtest.MustFindMetric(t, mfs, "query_influxdb_source_read_request_duration_seconds", expectedLabels)
- if want, got := uint64(1), *(m.Histogram.SampleCount); want != got {
- t.Fatalf("expected sample count of %v, got %v", want, got)
- }
-}
diff --git b/flux/stdlib/influxdata/influxdb/storage.go a/flux/stdlib/influxdata/influxdb/storage.go
index 4f574fb20e..7278949076 100644
--- b/flux/stdlib/influxdata/influxdb/storage.go
+++ a/flux/stdlib/influxdata/influxdb/storage.go
@@ -8,75 +8,33 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/semantic"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/kit/prom"
+ "github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb/cursors"
+ "github.com/influxdata/influxql"
"github.com/pkg/errors"
- "github.com/prometheus/client_golang/prometheus"
)
-type HostLookup interface {
- Hosts() []string
- Watch() <-chan struct{}
-}
-
-type BucketLookup interface {
- Lookup(ctx context.Context, orgID platform.ID, name string) (platform.ID, bool)
- LookupName(ctx context.Context, orgID platform.ID, id platform.ID) string
-}
-
-type OrganizationLookup interface {
- Lookup(ctx context.Context, name string) (platform.ID, bool)
- LookupName(ctx context.Context, id platform.ID) string
+type Authorizer interface {
+ AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error
}
type FromDependencies struct {
- Reader Reader
- BucketLookup BucketLookup
- OrganizationLookup OrganizationLookup
- Metrics *metrics
+ Reader Reader
+ MetaClient MetaClient
+ Authorizer Authorizer
+ AuthEnabled bool
}
func (d FromDependencies) Validate() error {
if d.Reader == nil {
return errors.New("missing reader dependency")
}
- if d.BucketLookup == nil {
- return errors.New("missing bucket lookup dependency")
- }
- if d.OrganizationLookup == nil {
- return errors.New("missing organization lookup dependency")
- }
- return nil
-}
-
-// PrometheusCollectors satisfies the PrometheusCollector interface.
-func (d FromDependencies) PrometheusCollectors() []prometheus.Collector {
- collectors := make([]prometheus.Collector, 0)
- if pc, ok := d.Reader.(prom.PrometheusCollector); ok {
- collectors = append(collectors, pc.PrometheusCollectors()...)
+ if d.MetaClient == nil {
+ return errors.New("missing meta client dependency")
}
- if d.Metrics != nil {
- collectors = append(collectors, d.Metrics.PrometheusCollectors()...)
+ if d.AuthEnabled && d.Authorizer == nil {
+ return errors.New("missing authorizer dependency")
}
- return collectors
-}
-
-type StaticLookup struct {
- hosts []string
-}
-
-func NewStaticLookup(hosts []string) StaticLookup {
- return StaticLookup{
- hosts: hosts,
- }
-}
-
-func (l StaticLookup) Hosts() []string {
- return l.hosts
-}
-func (l StaticLookup) Watch() <-chan struct{} {
- // A nil channel always blocks, since hosts never change this is appropriate.
return nil
}
@@ -102,8 +60,8 @@ func ToGroupMode(fluxMode flux.GroupMode) GroupMode {
}
type ReadFilterSpec struct {
- OrganizationID platform.ID
- BucketID platform.ID
+ Database string
+ RetentionPolicy string
Bounds execute.Bounds
diff --git b/flux/stdlib/influxdata/influxdb/to.go a/flux/stdlib/influxdata/influxdb/to.go
index a70161bf32..6fecaa533a 100644
--- b/flux/stdlib/influxdata/influxdb/to.go
+++ a/flux/stdlib/influxdata/influxdb/to.go
@@ -1,702 +1,4 @@
package influxdb
-import (
- "context"
- "errors"
- "fmt"
- "sort"
- "time"
-
- "github.com/influxdata/flux"
- "github.com/influxdata/flux/codes"
- "github.com/influxdata/flux/compiler"
- "github.com/influxdata/flux/execute"
- "github.com/influxdata/flux/interpreter"
- "github.com/influxdata/flux/plan"
- "github.com/influxdata/flux/semantic"
- "github.com/influxdata/flux/stdlib/influxdata/influxdb"
- "github.com/influxdata/flux/stdlib/kafka"
- "github.com/influxdata/flux/values"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/kit/tracing"
- "github.com/influxdata/influxdb/models"
- "github.com/influxdata/influxdb/query"
- "github.com/influxdata/influxdb/storage"
- "github.com/influxdata/influxdb/tsdb"
-)
-
-// ToKind is the kind for the `to` flux function
-const ToKind = influxdb.ToKind
-
-// TODO(jlapacik) remove this once we have execute.DefaultFieldColLabel
-const defaultFieldColLabel = "_field"
-const DefaultMeasurementColLabel = "_measurement"
-const DefaultBufferSize = 1 << 14
-
-// ToOpSpec is the flux.OperationSpec for the `to` flux function.
-type ToOpSpec struct {
- Bucket string `json:"bucket"`
- BucketID string `json:"bucketID"`
- Org string `json:"org"`
- OrgID string `json:"orgID"`
- Host string `json:"host"`
- Token string `json:"token"`
- TimeColumn string `json:"timeColumn"`
- MeasurementColumn string `json:"measurementColumn"`
- TagColumns []string `json:"tagColumns"`
- FieldFn interpreter.ResolvedFunction `json:"fieldFn"`
-}
-
-func init() {
- toSignature := flux.FunctionSignature(
- map[string]semantic.PolyType{
- "bucket": semantic.String,
- "bucketID": semantic.String,
- "org": semantic.String,
- "orgID": semantic.String,
- "host": semantic.String,
- "token": semantic.String,
- "timeColumn": semantic.String,
- "measurementColumn": semantic.String,
- "tagColumns": semantic.Array,
- "fieldFn": semantic.NewFunctionPolyType(semantic.FunctionPolySignature{
- Parameters: map[string]semantic.PolyType{
- "r": semantic.Tvar(1),
- },
- Required: semantic.LabelSet{"r"},
- Return: semantic.Tvar(2),
- }),
- },
- []string{},
- )
-
- flux.ReplacePackageValue("influxdata/influxdb", "to", flux.FunctionValueWithSideEffect(ToKind, createToOpSpec, toSignature))
- flux.RegisterOpSpec(ToKind, func() flux.OperationSpec { return &ToOpSpec{} })
- plan.RegisterProcedureSpecWithSideEffect(ToKind, newToProcedure, ToKind)
- execute.RegisterTransformation(ToKind, createToTransformation)
-}
-
-// argsReader is an interface for OperationSpec that have the same method to read args.
-type argsReader interface {
- flux.OperationSpec
- ReadArgs(args flux.Arguments) error
-}
-
-// ReadArgs reads the args from flux.Arguments into the op spec
-func (o *ToOpSpec) ReadArgs(args flux.Arguments) error {
- var err error
- var ok bool
-
- if o.Bucket, ok, _ = args.GetString("bucket"); !ok {
- if o.BucketID, err = args.GetRequiredString("bucketID"); err != nil {
- return err
- }
- } else if o.BucketID, ok, _ = args.GetString("bucketID"); ok {
- return &flux.Error{
- Code: codes.Invalid,
- Msg: "cannot provide both `bucket` and `bucketID` parameters to the `to` function",
- }
- }
-
- if o.Org, ok, _ = args.GetString("org"); !ok {
- if o.OrgID, _, err = args.GetString("orgID"); err != nil {
- return err
- }
- } else if o.OrgID, ok, _ = args.GetString("orgID"); ok {
- return &flux.Error{
- Code: codes.Invalid,
- Msg: "cannot provide both `org` and `orgID` parameters to the `to` function",
- }
- }
-
- if o.Host, ok, _ = args.GetString("host"); ok {
- if o.Token, err = args.GetRequiredString("token"); err != nil {
- return err
- }
- }
-
- if o.TimeColumn, ok, _ = args.GetString("timeColumn"); !ok {
- o.TimeColumn = execute.DefaultTimeColLabel
- }
-
- if o.MeasurementColumn, ok, _ = args.GetString("measurementColumn"); !ok {
- o.MeasurementColumn = DefaultMeasurementColLabel
- }
-
- if tags, ok, _ := args.GetArray("tagColumns", semantic.String); ok {
- o.TagColumns = make([]string, tags.Len())
- tags.Sort(func(i, j values.Value) bool {
- return i.Str() < j.Str()
- })
- tags.Range(func(i int, v values.Value) {
- o.TagColumns[i] = v.Str()
- })
- }
-
- if fieldFn, ok, _ := args.GetFunction("fieldFn"); ok {
- if o.FieldFn, err = interpreter.ResolveFunction(fieldFn); err != nil {
- return err
- }
- }
-
- return err
-}
-
-func createToOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
- if err := a.AddParentFromArgs(args); err != nil {
- return nil, err
- }
-
- _, httpOK, err := args.GetString("url")
- if err != nil {
- return nil, err
- }
-
- _, kafkaOK, err := args.GetString("brokers")
- if err != nil {
- return nil, err
- }
-
- var s argsReader
-
- switch {
- case httpOK && kafkaOK:
- return nil, &flux.Error{
- Code: codes.Invalid,
- Msg: "specify at most one of url, brokers in the same `to` function",
- }
- case kafkaOK:
- s = &kafka.ToKafkaOpSpec{}
- default:
- s = &ToOpSpec{}
- }
- if err := s.ReadArgs(args); err != nil {
- return nil, err
- }
- return s, nil
-}
-
-// Kind returns the kind for the ToOpSpec function.
-func (ToOpSpec) Kind() flux.OperationKind {
- return ToKind
-}
-
-// BucketsAccessed returns the buckets accessed by the spec.
-func (o *ToOpSpec) BucketsAccessed(orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter) {
- bf := platform.BucketFilter{}
- if o.Bucket != "" {
- bf.Name = &o.Bucket
- }
- if o.BucketID != "" {
- id, err := platform.IDFromString(o.BucketID)
- if err == nil {
- bf.ID = id
- }
- }
- if o.Org != "" {
- bf.Org = &o.Org
- }
- if o.OrgID != "" {
- id, err := platform.IDFromString(o.OrgID)
- if err == nil {
- bf.OrganizationID = id
- }
- }
- writeBuckets = append(writeBuckets, bf)
- return readBuckets, writeBuckets
-}
-
-// ToProcedureSpec is the procedure spec for the `to` flux function.
-type ToProcedureSpec struct {
- plan.DefaultCost
- Spec *ToOpSpec
-}
-
-// Kind returns the kind for the procedure spec for the `to` flux function.
-func (o *ToProcedureSpec) Kind() plan.ProcedureKind {
- return ToKind
-}
-
-// Copy clones the procedure spec for `to` flux function.
-func (o *ToProcedureSpec) Copy() plan.ProcedureSpec {
- s := o.Spec
- res := &ToProcedureSpec{
- Spec: &ToOpSpec{
- Bucket: s.Bucket,
- BucketID: s.BucketID,
- Org: s.Org,
- OrgID: s.OrgID,
- Host: s.Host,
- Token: s.Token,
- TimeColumn: s.TimeColumn,
- MeasurementColumn: s.MeasurementColumn,
- TagColumns: append([]string(nil), s.TagColumns...),
- FieldFn: s.FieldFn.Copy(),
- },
- }
- return res
-}
-
-func newToProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
- spec, ok := qs.(*ToOpSpec)
- if !ok && spec != nil {
- return nil, &flux.Error{
- Code: codes.Internal,
- Msg: fmt.Sprintf("invalid spec type %T", qs),
- }
- }
- return &ToProcedureSpec{Spec: spec}, nil
-}
-
-func createToTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
- s, ok := spec.(*ToProcedureSpec)
- if !ok {
- return nil, nil, &flux.Error{
- Code: codes.Internal,
- Msg: fmt.Sprintf("invalid spec type %T", spec),
- }
- }
- cache := execute.NewTableBuilderCache(a.Allocator())
- d := execute.NewDataset(id, mode, cache)
- deps := GetStorageDependencies(a.Context()).ToDeps
- t, err := NewToTransformation(a.Context(), d, cache, s, deps)
- if err != nil {
- return nil, nil, err
- }
- return t, d, nil
-}
-
-// ToTransformation is the transformation for the `to` flux function.
-type ToTransformation struct {
- Ctx context.Context
- OrgID platform.ID
- BucketID platform.ID
- d execute.Dataset
- fn *execute.RowMapFn
- cache execute.TableBuilderCache
- spec *ToProcedureSpec
- implicitTagColumns bool
- deps ToDependencies
- buf *storage.BufferedPointsWriter
-}
-
-// RetractTable retracts the table for the transformation for the `to` flux function.
-func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
- return t.d.RetractTable(key)
-}
-
-// NewToTransformation returns a new *ToTransformation with the appropriate fields set.
-func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, toSpec *ToProcedureSpec, deps ToDependencies) (x *ToTransformation, err error) {
- var fn *execute.RowMapFn
- //var err error
- spec := toSpec.Spec
- var bucketID, orgID *platform.ID
- if spec.FieldFn.Fn != nil {
- if fn, err = execute.NewRowMapFn(spec.FieldFn.Fn, compiler.ToScope(spec.FieldFn.Scope)); err != nil {
- return nil, err
- }
- }
- // Get organization ID
- if spec.Org != "" {
- oID, ok := deps.OrganizationLookup.Lookup(ctx, spec.Org)
- if !ok {
- return nil, &flux.Error{
- Code: codes.NotFound,
- Msg: fmt.Sprintf("failed to look up organization %q", spec.Org),
- }
- }
- orgID = &oID
- } else if spec.OrgID != "" {
- if orgID, err = platform.IDFromString(spec.OrgID); err != nil {
- return nil, err
- }
- } else {
- // No org or orgID provided as an arg, use the orgID from the context
- req := query.RequestFromContext(ctx)
- if req == nil {
- return nil, errors.New("missing request on context")
- }
- orgID = &req.OrganizationID
- }
-
- // Get bucket ID
- if spec.Bucket != "" {
- bID, ok := deps.BucketLookup.Lookup(ctx, *orgID, spec.Bucket)
- if !ok {
- return nil, &flux.Error{
- Code: codes.NotFound,
- Msg: fmt.Sprintf("failed to look up bucket %q in org %q", spec.Bucket, spec.Org),
- }
- }
- bucketID = &bID
- } else if bucketID, err = platform.IDFromString(spec.BucketID); err != nil {
- return nil, &flux.Error{
- Code: codes.Invalid,
- Msg: "invalid bucket id",
- Err: err,
- }
- }
- if orgID == nil || bucketID == nil {
- return nil, &flux.Error{
- Code: codes.Unknown,
- Msg: "You must specify org and bucket",
- }
- }
- return &ToTransformation{
- Ctx: ctx,
- OrgID: *orgID,
- BucketID: *bucketID,
- d: d,
- fn: fn,
- cache: cache,
- spec: toSpec,
- implicitTagColumns: spec.TagColumns == nil,
- deps: deps,
- buf: storage.NewBufferedPointsWriter(DefaultBufferSize, deps.PointsWriter),
- }, nil
-}
-
-// Process does the actual work for the ToTransformation.
-func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
- if t.implicitTagColumns {
-
- // If no tag columns are specified, by default we exclude
- // _field and _value from being tag columns.
- excludeColumns := map[string]bool{
- execute.DefaultValueColLabel: true,
- defaultFieldColLabel: true,
- }
-
- // If a field function is specified then we exclude any column that
- // is referenced in the function expression from being a tag column.
- if t.spec.Spec.FieldFn.Fn != nil {
- recordParam := t.spec.Spec.FieldFn.Fn.Block.Parameters.List[0].Key.Name
- exprNode := t.spec.Spec.FieldFn.Fn
- colVisitor := newFieldFunctionVisitor(recordParam, tbl.Cols())
-
- // Walk the field function expression and record which columns
- // are referenced. None of these columns will be used as tag columns.
- semantic.Walk(colVisitor, exprNode)
- excludeColumns = colVisitor.captured
- }
-
- addTagsFromTable(t.spec.Spec, tbl, excludeColumns)
- }
- return writeTable(t.Ctx, t, tbl)
-}
-
-// fieldFunctionVisitor implements semantic.Visitor.
-// fieldFunctionVisitor is used to walk the the field function expression
-// of the `to` operation and to record all referenced columns. This visitor
-// is only used when no tag columns are provided as input to the `to` func.
-type fieldFunctionVisitor struct {
- columns map[string]bool
- visited map[semantic.Node]bool
- captured map[string]bool
- rowParam string
-}
-
-func newFieldFunctionVisitor(rowParam string, cols []flux.ColMeta) *fieldFunctionVisitor {
- columns := make(map[string]bool, len(cols))
- for _, col := range cols {
- columns[col.Label] = true
- }
- return &fieldFunctionVisitor{
- columns: columns,
- visited: make(map[semantic.Node]bool, len(cols)),
- captured: make(map[string]bool, len(cols)),
- rowParam: rowParam,
- }
-}
-
-// A field function is of the form `(r) => { Function Body }`, and it returns an object
-// mapping field keys to values for each row r of the input. Visit records every column
-// that is referenced in `Function Body`. These columns are either directly or indirectly
-// used as value columns and as such need to be recorded so as not to be used as tag columns.
-func (v *fieldFunctionVisitor) Visit(node semantic.Node) semantic.Visitor {
- if v.visited[node] {
- return v
- }
- if member, ok := node.(*semantic.MemberExpression); ok {
- if obj, ok := member.Object.(*semantic.IdentifierExpression); ok {
- if obj.Name == v.rowParam && v.columns[member.Property] {
- v.captured[member.Property] = true
- }
- }
- }
- v.visited[node] = true
- return v
-}
-
-func (v *fieldFunctionVisitor) Done(node semantic.Node) {}
-
-func addTagsFromTable(spec *ToOpSpec, table flux.Table, exclude map[string]bool) {
- if cap(spec.TagColumns) < len(table.Cols()) {
- spec.TagColumns = make([]string, 0, len(table.Cols()))
- } else {
- spec.TagColumns = spec.TagColumns[:0]
- }
- for _, column := range table.Cols() {
- if column.Type == flux.TString && !exclude[column.Label] {
- spec.TagColumns = append(spec.TagColumns, column.Label)
- }
- }
- sort.Strings(spec.TagColumns)
-}
-
-// UpdateWatermark updates the watermark for the transformation for the `to` flux function.
-func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
- return t.d.UpdateWatermark(pt)
-}
-
-// UpdateProcessingTime updates the processing time for the transformation for the `to` flux function.
-func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
- return t.d.UpdateProcessingTime(pt)
-}
-
-// Finish is called after the `to` flux function's transformation is done processing.
-func (t *ToTransformation) Finish(id execute.DatasetID, err error) {
- if err == nil {
- err = t.buf.Flush(t.Ctx)
- }
- t.d.Finish(err)
-}
-
-// ToDependencies contains the dependencies for executing the `to` function.
-type ToDependencies struct {
- BucketLookup BucketLookup
- OrganizationLookup OrganizationLookup
- PointsWriter storage.PointsWriter
-}
-
-// Validate returns an error if any required field is unset.
-func (d ToDependencies) Validate() error {
- if d.BucketLookup == nil {
- return errors.New("missing bucket lookup dependency")
- }
- if d.OrganizationLookup == nil {
- return errors.New("missing organization lookup dependency")
- }
- if d.PointsWriter == nil {
- return errors.New("missing points writer dependency")
- }
- return nil
-}
-
-type Stats struct {
- NRows int
- Latest time.Time
- Earliest time.Time
- NFields int
- NTags int
-}
-
-func (s Stats) Update(o Stats) {
- s.NRows += o.NRows
- if s.Latest.IsZero() || o.Latest.Unix() > s.Latest.Unix() {
- s.Latest = o.Latest
- }
-
- if s.Earliest.IsZero() || o.Earliest.Unix() < s.Earliest.Unix() {
- s.Earliest = o.Earliest
- }
-
- if o.NFields > s.NFields {
- s.NFields = o.NFields
- }
-
- if o.NTags > s.NTags {
- s.NTags = o.NTags
- }
-}
-
-func writeTable(ctx context.Context, t *ToTransformation, tbl flux.Table) (err error) {
- span, ctx := tracing.StartSpanFromContext(ctx)
- defer span.Finish()
-
- spec := t.spec.Spec
-
- // cache tag columns
- columns := tbl.Cols()
- isTag := make([]bool, len(columns))
- for i, col := range columns {
- tagIdx := sort.SearchStrings(spec.TagColumns, col.Label)
- isTag[i] = tagIdx < len(spec.TagColumns) && spec.TagColumns[tagIdx] == col.Label
- }
- // do time
- timeColLabel := spec.TimeColumn
- timeColIdx := execute.ColIdx(timeColLabel, columns)
-
- if timeColIdx < 0 {
- return &flux.Error{
- Code: codes.Invalid,
- Msg: "no time column detected",
- }
- }
- if columns[timeColIdx].Type != flux.TTime {
- return &flux.Error{
- Code: codes.Invalid,
- Msg: fmt.Sprintf("column %s of type %s is not of type %s", timeColLabel, columns[timeColIdx].Type, flux.TTime),
- }
- }
-
- // prepare field function if applicable and record the number of values to write per row
- if spec.FieldFn.Fn != nil {
- if err = t.fn.Prepare(columns); err != nil {
- return err
- }
-
- }
-
- builder, new := t.cache.TableBuilder(tbl.Key())
- if new {
- if err := execute.AddTableCols(tbl, builder); err != nil {
- return err
- }
- }
-
- measurementStats := make(map[string]Stats)
- measurementName := ""
- return tbl.Do(func(er flux.ColReader) error {
- var pointTime time.Time
- var points models.Points
- var tags models.Tags
- kv := make([][]byte, 2, er.Len()*2+2) // +2 for field key, value
- var fieldValues values.Object
- for i := 0; i < er.Len(); i++ {
- measurementName = ""
- fields := make(models.Fields)
- // leave space for measurement key, value at start, in an effort to
- // keep kv sorted
- kv = kv[:2]
- // Gather the timestamp and the tags.
- for j, col := range er.Cols() {
- switch {
- case col.Label == spec.MeasurementColumn:
- measurementName = string(er.Strings(j).Value(i))
- kv[0] = models.MeasurementTagKeyBytes
- kv[1] = er.Strings(j).Value(i)
- case col.Label == timeColLabel:
- pointTime = execute.ValueForRow(er, i, j).Time().Time()
- case isTag[j]:
- if col.Type != flux.TString {
- return errors.New("invalid type for tag column")
- }
- // TODO(docmerlin): instead of doing this sort of thing, it would be nice if we had a way that allocated a lot less.
- kv = append(kv, []byte(col.Label), er.Strings(j).Value(i))
- }
- }
-
- if pointTime.IsZero() {
- return &flux.Error{
- Code: codes.Invalid,
- Msg: "timestamp missing from block",
- }
- }
-
- if measurementName == "" {
- return &flux.Error{
- Code: codes.Invalid,
- Msg: fmt.Sprintf("no column with label %s exists", spec.MeasurementColumn),
- }
- }
-
- if spec.FieldFn.Fn == nil {
- if fieldValues, err = defaultFieldMapping(er, i); err != nil {
- return err
- }
- } else if fieldValues, err = t.fn.Eval(t.Ctx, i, er); err != nil {
- return err
- }
-
- fieldValues.Range(func(k string, v values.Value) {
- if v.IsNull() {
- fields[k] = nil
- return
- }
- switch v.Type() {
- case semantic.Float:
- fields[k] = v.Float()
- case semantic.Int:
- fields[k] = v.Int()
- case semantic.UInt:
- fields[k] = v.UInt()
- case semantic.String:
- fields[k] = v.Str()
- case semantic.Time:
- fields[k] = v.Time()
- case semantic.Bool:
- fields[k] = v.Bool()
- }
- })
-
- mstats := Stats{
- NRows: 1,
- Latest: pointTime,
- Earliest: pointTime,
- NFields: len(fields),
- NTags: len(kv) / 2,
- }
- _, ok := measurementStats[measurementName]
- if !ok {
- measurementStats[measurementName] = mstats
- } else {
- measurementStats[measurementName].Update(mstats)
- }
-
- name := tsdb.EncodeNameString(t.OrgID, t.BucketID)
-
- fieldNames := make([]string, 0, len(fields))
- for k := range fields {
- fieldNames = append(fieldNames, k)
- }
- sort.Strings(fieldNames)
-
- for _, k := range fieldNames {
- v := fields[k]
- // append field tag key and field key
- kvf := append(kv, models.FieldKeyTagKeyBytes, []byte(k))
- tags, _ = models.NewTagsKeyValues(tags, kvf...)
-
- pt, err := models.NewPoint(name, tags, models.Fields{k: v}, pointTime)
- if err != nil {
- return err
- }
- points = append(points, pt)
- }
-
- if err := execute.AppendRecord(i, er, builder); err != nil {
- return err
- }
- }
-
- return t.buf.WritePoints(ctx, points)
- })
-}
-
-func defaultFieldMapping(er flux.ColReader, row int) (values.Object, error) {
- fieldColumnIdx := execute.ColIdx(defaultFieldColLabel, er.Cols())
- valueColumnIdx := execute.ColIdx(execute.DefaultValueColLabel, er.Cols())
-
- if fieldColumnIdx < 0 {
- return nil, &flux.Error{
- Code: codes.Invalid,
- Msg: "table has no _field column",
- }
- }
-
- if valueColumnIdx < 0 {
- return nil, &flux.Error{
- Code: codes.Invalid,
- Msg: "table has no _value column",
- }
- }
-
- value := execute.ValueForRow(er, row, valueColumnIdx)
-
- fieldValueMapping := values.NewObject()
- field := execute.ValueForRow(er, row, fieldColumnIdx)
- fieldValueMapping.Set(field.Str(), value)
-
- return fieldValueMapping, nil
-}
+// TODO(jsternberg): Implement the to method in influxdb 1.x.
+// This file is kept around so it shows up in the patch.
diff --git b/flux/stdlib/influxdata/influxdb/to_test.go a/flux/stdlib/influxdata/influxdb/to_test.go
index 8afc9128a8..daba8c9362 100644
--- b/flux/stdlib/influxdata/influxdb/to_test.go
+++ a/flux/stdlib/influxdata/influxdb/to_test.go
@@ -1,853 +1 @@
package influxdb_test
-
-import (
- "context"
- "fmt"
- "testing"
-
- "github.com/google/go-cmp/cmp"
- "github.com/influxdata/flux"
- "github.com/influxdata/flux/ast"
- "github.com/influxdata/flux/dependencies/dependenciestest"
- "github.com/influxdata/flux/execute"
- "github.com/influxdata/flux/execute/executetest"
- "github.com/influxdata/flux/interpreter"
- "github.com/influxdata/flux/querytest"
- "github.com/influxdata/flux/semantic"
- "github.com/influxdata/flux/values/valuestest"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/mock"
- "github.com/influxdata/influxdb/models"
- _ "github.com/influxdata/influxdb/query/builtin"
- pquerytest "github.com/influxdata/influxdb/query/querytest"
- "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
- "github.com/influxdata/influxdb/tsdb"
-)
-
-func TestTo_Query(t *testing.T) {
- tests := []querytest.NewQueryTestCase{
- {
- Name: "from with database with range",
- Raw: `from(bucket:"mydb") |> to(bucket:"series1", org:"fred", host:"localhost", token:"auth-token", fieldFn: (r) => ({ col: r.col }) )`,
- Want: &flux.Spec{
- Operations: []*flux.Operation{
- {
- ID: "influxDBFrom0",
- Spec: &influxdb.FromOpSpec{
- Bucket: "mydb",
- },
- },
- {
- ID: "to1",
- Spec: &influxdb.ToOpSpec{
- Bucket: "series1",
- Org: "fred",
- Host: "localhost",
- Token: "auth-token",
- TimeColumn: execute.DefaultTimeColLabel,
- MeasurementColumn: influxdb.DefaultMeasurementColLabel,
- FieldFn: interpreter.ResolvedFunction{
- Scope: valuestest.NowScope(),
- Fn: &semantic.FunctionExpression{
- Block: &semantic.FunctionBlock{
- Parameters: &semantic.FunctionParameters{
- List: []*semantic.FunctionParameter{
- {
- Key: &semantic.Identifier{Name: "r"},
- },
- },
- },
- Body: &semantic.ObjectExpression{
- Properties: []*semantic.Property{
- {
- Key: &semantic.Identifier{Name: "col"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "col",
- },
- },
- },
- },
- },
- },
- },
- },
- },
- },
- Edges: []flux.Edge{
- {Parent: "influxDBFrom0", Child: "to1"},
- },
- },
- },
- }
- for _, tc := range tests {
- tc := tc
- t.Run(tc.Name, func(t *testing.T) {
- t.Parallel()
- querytest.NewQueryTestHelper(t, tc)
- })
- }
-}
-
-func TestToOpSpec_BucketsAccessed(t *testing.T) {
- bucketName := "my_bucket"
- bucketIDString := "ddddccccbbbbaaaa"
- bucketID, err := platform.IDFromString(bucketIDString)
- if err != nil {
- t.Fatal(err)
- }
- orgName := "my_org"
- orgIDString := "aaaabbbbccccdddd"
- orgID, err := platform.IDFromString(orgIDString)
- if err != nil {
- t.Fatal(err)
- }
- tests := []pquerytest.BucketsAccessedTestCase{
- {
- Name: "from() with bucket and to with org and bucket",
- Raw: fmt.Sprintf(`from(bucket:"%s") |> to(bucket:"%s", org:"%s")`, bucketName, bucketName, orgName),
- WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
- WantWriteBuckets: &[]platform.BucketFilter{{Name: &bucketName, Org: &orgName}},
- },
- {
- Name: "from() with bucket and to with orgID and bucket",
- Raw: fmt.Sprintf(`from(bucket:"%s") |> to(bucket:"%s", orgID:"%s")`, bucketName, bucketName, orgIDString),
- WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
- WantWriteBuckets: &[]platform.BucketFilter{{Name: &bucketName, OrganizationID: orgID}},
- },
- {
- Name: "from() with bucket and to with orgID and bucketID",
- Raw: fmt.Sprintf(`from(bucket:"%s") |> to(bucketID:"%s", orgID:"%s")`, bucketName, bucketIDString, orgIDString),
- WantReadBuckets: &[]platform.BucketFilter{{Name: &bucketName}},
- WantWriteBuckets: &[]platform.BucketFilter{{ID: bucketID, OrganizationID: orgID}},
- },
- }
-
- for _, tc := range tests {
- tc := tc
- t.Run(tc.Name, func(t *testing.T) {
- t.Parallel()
- pquerytest.BucketsAccessedTestHelper(t, tc)
- })
- }
-}
-
-func TestTo_Process(t *testing.T) {
- oid, _ := mock.OrganizationLookup{}.Lookup(context.Background(), "my-org")
- bid, _ := mock.BucketLookup{}.Lookup(context.Background(), oid, "my-bucket")
- type wanted struct {
- result *mock.PointsWriter
- tables []*executetest.Table
- }
- testCases := []struct {
- name string
- spec *influxdb.ToProcedureSpec
- data []flux.Table
- want wanted
- }{
- {
- name: "default case",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "_measurement",
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_start", Type: flux.TTime},
- {Label: "_stop", Type: flux.TTime},
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(31), "a", "_value", 3.0},
- {execute.Time(0), execute.Time(100), execute.Time(41), "c", "_value", 4.0},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a _value=2 11
-a _value=2 21
-b _value=1 21
-a _value=3 31
-c _value=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_start", Type: flux.TTime},
- {Label: "_stop", Type: flux.TTime},
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(31), "a", "_value", 3.0},
- {execute.Time(0), execute.Time(100), execute.Time(41), "c", "_value", 4.0},
- },
- }},
- },
- },
- {
- name: "default with heterogeneous tag columns",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "_measurement",
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- KeyCols: []string{"_measurement", "tag1", "tag2", "_field"},
- Data: [][]interface{}{
- {execute.Time(11), "a", "a", "aa", "_value", 2.0},
- {execute.Time(21), "a", "a", "bb", "_value", 2.0},
- {execute.Time(21), "a", "b", "cc", "_value", 1.0},
- {execute.Time(31), "a", "a", "dd", "_value", 3.0},
- {execute.Time(41), "a", "c", "ee", "_value", 4.0},
- },
- }),
- executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "tagA", Type: flux.TString},
- {Label: "tagB", Type: flux.TString},
- {Label: "tagC", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- KeyCols: []string{"_measurement", "tagA", "tagB", "tagC", "_field"},
- Data: [][]interface{}{
- {execute.Time(11), "b", "a", "aa", "ff", "_value", 2.0},
- {execute.Time(21), "b", "a", "bb", "gg", "_value", 2.0},
- {execute.Time(21), "b", "b", "cc", "hh", "_value", 1.0},
- {execute.Time(31), "b", "a", "dd", "ii", "_value", 3.0},
- {execute.Time(41), "b", "c", "ee", "jj", "_value", 4.0},
- },
- }),
- },
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a,tag1=a,tag2=aa _value=2 11
-a,tag1=a,tag2=bb _value=2 21
-a,tag1=b,tag2=cc _value=1 21
-a,tag1=a,tag2=dd _value=3 31
-a,tag1=c,tag2=ee _value=4 41
-b,tagA=a,tagB=aa,tagC=ff _value=2 11
-b,tagA=a,tagB=bb,tagC=gg _value=2 21
-b,tagA=b,tagB=cc,tagC=hh _value=1 21
-b,tagA=a,tagB=dd,tagC=ii _value=3 31
-b,tagA=c,tagB=ee,tagC=jj _value=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- KeyCols: []string{"_measurement", "tag1", "tag2", "_field"},
- Data: [][]interface{}{
- {execute.Time(11), "a", "a", "aa", "_value", 2.0},
- {execute.Time(21), "a", "a", "bb", "_value", 2.0},
- {execute.Time(21), "a", "b", "cc", "_value", 1.0},
- {execute.Time(31), "a", "a", "dd", "_value", 3.0},
- {execute.Time(41), "a", "c", "ee", "_value", 4.0},
- },
- },
- {
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "tagA", Type: flux.TString},
- {Label: "tagB", Type: flux.TString},
- {Label: "tagC", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- KeyCols: []string{"_measurement", "tagA", "tagB", "tagC", "_field"},
- Data: [][]interface{}{
- {execute.Time(11), "b", "a", "aa", "ff", "_value", 2.0},
- {execute.Time(21), "b", "a", "bb", "gg", "_value", 2.0},
- {execute.Time(21), "b", "b", "cc", "hh", "_value", 1.0},
- {execute.Time(31), "b", "a", "dd", "ii", "_value", 3.0},
- {execute.Time(41), "b", "c", "ee", "jj", "_value", 4.0},
- },
- },
- },
- },
- },
- {
- name: "no _measurement with multiple tag columns",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "tag1",
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(11), "a", "aa", "_value", 2.0},
- {execute.Time(21), "a", "bb", "_value", 2.0},
- {execute.Time(21), "b", "cc", "_value", 1.0},
- {execute.Time(31), "a", "dd", "_value", 3.0},
- {execute.Time(41), "c", "ee", "_value", 4.0},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a,tag2=aa _value=2 11
-a,tag2=bb _value=2 21
-b,tag2=cc _value=1 21
-a,tag2=dd _value=3 31
-c,tag2=ee _value=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(11), "a", "aa", "_value", 2.0},
- {execute.Time(21), "a", "bb", "_value", 2.0},
- {execute.Time(21), "b", "cc", "_value", 1.0},
- {execute.Time(31), "a", "dd", "_value", 3.0},
- {execute.Time(41), "c", "ee", "_value", 4.0},
- },
- }},
- },
- },
- {
- name: "explicit tags",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- TagColumns: []string{"tag1", "tag2"},
- MeasurementColumn: "_measurement",
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- },
- Data: [][]interface{}{
- {execute.Time(11), "m", "_value", 2.0, "a", "aa"},
- {execute.Time(21), "m", "_value", 2.0, "a", "bb"},
- {execute.Time(21), "m", "_value", 1.0, "b", "cc"},
- {execute.Time(31), "m", "_value", 3.0, "a", "dd"},
- {execute.Time(41), "m", "_value", 4.0, "c", "ee"},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `m,tag1=a,tag2=aa _value=2 11
-m,tag1=a,tag2=bb _value=2 21
-m,tag1=b,tag2=cc _value=1 21
-m,tag1=a,tag2=dd _value=3 31
-m,tag1=c,tag2=ee _value=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- },
- Data: [][]interface{}{
- {execute.Time(11), "m", "_value", 2.0, "a", "aa"},
- {execute.Time(21), "m", "_value", 2.0, "a", "bb"},
- {execute.Time(21), "m", "_value", 1.0, "b", "cc"},
- {execute.Time(31), "m", "_value", 3.0, "a", "dd"},
- {execute.Time(41), "m", "_value", 4.0, "c", "ee"},
- },
- }},
- },
- },
- {
- name: "explicit field function",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "_measurement",
- FieldFn: interpreter.ResolvedFunction{
- Scope: valuestest.NowScope(),
- Fn: &semantic.FunctionExpression{
- Block: &semantic.FunctionBlock{
- Parameters: &semantic.FunctionParameters{
- List: []*semantic.FunctionParameter{
- {
- Key: &semantic.Identifier{Name: "r"},
- },
- },
- },
- Body: &semantic.ObjectExpression{
- Properties: []*semantic.Property{
- {
- Key: &semantic.Identifier{Name: "temperature"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "temperature",
- },
- },
- },
- },
- },
- },
- },
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "temperature", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(11), "a", 2.0},
- {execute.Time(21), "a", 2.0},
- {execute.Time(21), "b", 1.0},
- {execute.Time(31), "a", 3.0},
- {execute.Time(41), "c", 4.0},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a temperature=2 11
-a temperature=2 21
-b temperature=1 21
-a temperature=3 31
-c temperature=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "temperature", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(11), "a", 2.0},
- {execute.Time(21), "a", 2.0},
- {execute.Time(21), "b", 1.0},
- {execute.Time(31), "a", 3.0},
- {execute.Time(41), "c", 4.0},
- },
- }},
- },
- },
- {
- name: "infer tags from complex field function",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "tag",
- FieldFn: interpreter.ResolvedFunction{
- Scope: valuestest.NowScope(),
- Fn: &semantic.FunctionExpression{
- Block: &semantic.FunctionBlock{
- Parameters: &semantic.FunctionParameters{
- List: []*semantic.FunctionParameter{
- {
- Key: &semantic.Identifier{Name: "r"},
- },
- },
- },
- Body: &semantic.ObjectExpression{
- Properties: []*semantic.Property{
- {
- Key: &semantic.Identifier{Name: "day"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "day",
- },
- },
- {
- Key: &semantic.Identifier{Name: "temperature"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "temperature",
- },
- },
- {
- Key: &semantic.Identifier{Name: "humidity"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "humidity",
- },
- },
- {
- Key: &semantic.Identifier{Name: "ratio"},
- Value: &semantic.BinaryExpression{
- Operator: ast.DivisionOperator,
- Left: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "temperature",
- },
- Right: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "humidity",
- },
- },
- },
- },
- },
- },
- },
- },
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "day", Type: flux.TString},
- {Label: "tag", Type: flux.TString},
- {Label: "temperature", Type: flux.TFloat},
- {Label: "humidity", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(11), "Monday", "a", 2.0, 1.0},
- {execute.Time(21), "Tuesday", "a", 2.0, 2.0},
- {execute.Time(21), "Wednesday", "b", 1.0, 4.0},
- {execute.Time(31), "Thursday", "a", 3.0, 3.0},
- {execute.Time(41), "Friday", "c", 4.0, 5.0},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a day="Monday",humidity=1,ratio=2,temperature=2 11
-a day="Tuesday",humidity=2,ratio=1,temperature=2 21
-b day="Wednesday",humidity=4,ratio=0.25,temperature=1 21
-a day="Thursday",humidity=3,ratio=1,temperature=3 31
-c day="Friday",humidity=5,ratio=0.8,temperature=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "day", Type: flux.TString},
- {Label: "tag", Type: flux.TString},
- {Label: "temperature", Type: flux.TFloat},
- {Label: "humidity", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(11), "Monday", "a", 2.0, 1.0},
- {execute.Time(21), "Tuesday", "a", 2.0, 2.0},
- {execute.Time(21), "Wednesday", "b", 1.0, 4.0},
- {execute.Time(31), "Thursday", "a", 3.0, 3.0},
- {execute.Time(41), "Friday", "c", 4.0, 5.0},
- },
- }},
- },
- },
- {
- name: "explicit tag columns, multiple values in field function, and extra columns",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "tag1",
- TagColumns: []string{"tag2"},
- FieldFn: interpreter.ResolvedFunction{
- Scope: valuestest.NowScope(),
- Fn: &semantic.FunctionExpression{
- Block: &semantic.FunctionBlock{
- Parameters: &semantic.FunctionParameters{
- List: []*semantic.FunctionParameter{
- {
- Key: &semantic.Identifier{Name: "r"},
- },
- },
- },
- Body: &semantic.ObjectExpression{
- Properties: []*semantic.Property{
- {
- Key: &semantic.Identifier{Name: "temperature"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "temperature",
- },
- },
- {
- Key: &semantic.Identifier{Name: "humidity"},
- Value: &semantic.MemberExpression{
- Object: &semantic.IdentifierExpression{Name: "r"},
- Property: "humidity",
- },
- },
- },
- },
- },
- },
- },
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_start", Type: flux.TTime},
- {Label: "_stop", Type: flux.TTime},
- {Label: "_time", Type: flux.TTime},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- {Label: "other-string-column", Type: flux.TString},
- {Label: "temperature", Type: flux.TFloat},
- {Label: "humidity", Type: flux.TInt},
- {Label: "other-value-column", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a,tag2=d humidity=50i,temperature=2 11
-a,tag2=d humidity=50i,temperature=2 21
-b,tag2=d humidity=50i,temperature=1 21
-a,tag2=e humidity=60i,temperature=3 31
-c,tag2=e humidity=65i,temperature=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_start", Type: flux.TTime},
- {Label: "_stop", Type: flux.TTime},
- {Label: "_time", Type: flux.TTime},
- {Label: "tag1", Type: flux.TString},
- {Label: "tag2", Type: flux.TString},
- {Label: "other-string-column", Type: flux.TString},
- {Label: "temperature", Type: flux.TFloat},
- {Label: "humidity", Type: flux.TInt},
- {Label: "other-value-column", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(0), execute.Time(100), execute.Time(11), "a", "d", "misc", 2.0, int64(50), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "a", "d", "misc", 2.0, int64(50), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "b", "d", "misc", 1.0, int64(50), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(31), "a", "e", "misc", 3.0, int64(60), 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(41), "c", "e", "misc", 4.0, int64(65), 1.0},
- },
- }},
- },
- },
- {
- name: "multiple _field",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- MeasurementColumn: "_measurement",
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_start", Type: flux.TTime},
- {Label: "_stop", Type: flux.TTime},
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(31), "a", "_hello", 3.0},
- {execute.Time(0), execute.Time(100), execute.Time(41), "c", "_hello", 4.0},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `a _value=2 11
-a _value=2 21
-b _value=1 21
-a _hello=3 31
-c _hello=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_start", Type: flux.TTime},
- {Label: "_stop", Type: flux.TTime},
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- },
- Data: [][]interface{}{
- {execute.Time(0), execute.Time(100), execute.Time(11), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "a", "_value", 2.0},
- {execute.Time(0), execute.Time(100), execute.Time(21), "b", "_value", 1.0},
- {execute.Time(0), execute.Time(100), execute.Time(31), "a", "_hello", 3.0},
- {execute.Time(0), execute.Time(100), execute.Time(41), "c", "_hello", 4.0},
- },
- }},
- },
- },
- {
- name: "unordered tags",
- spec: &influxdb.ToProcedureSpec{
- Spec: &influxdb.ToOpSpec{
- Org: "my-org",
- Bucket: "my-bucket",
- TimeColumn: "_time",
- TagColumns: []string{"tag1", "tag2"},
- MeasurementColumn: "_measurement",
- },
- },
- data: []flux.Table{executetest.MustCopyTable(&executetest.Table{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- {Label: "tag2", Type: flux.TString},
- {Label: "tag1", Type: flux.TString},
- },
- Data: [][]interface{}{
- {execute.Time(11), "m", "_value", 2.0, "aa", "a"},
- {execute.Time(21), "m", "_value", 2.0, "bb", "a"},
- {execute.Time(21), "m", "_value", 1.0, "cc", "b"},
- {execute.Time(31), "m", "_value", 3.0, "dd", "a"},
- {execute.Time(41), "m", "_value", 4.0, "ee", "c"},
- },
- })},
- want: wanted{
- result: &mock.PointsWriter{
- Points: mockPoints(oid, bid, `m,tag1=a,tag2=aa _value=2 11
-m,tag1=a,tag2=bb _value=2 21
-m,tag1=b,tag2=cc _value=1 21
-m,tag1=a,tag2=dd _value=3 31
-m,tag1=c,tag2=ee _value=4 41`),
- },
- tables: []*executetest.Table{{
- ColMeta: []flux.ColMeta{
- {Label: "_time", Type: flux.TTime},
- {Label: "_measurement", Type: flux.TString},
- {Label: "_field", Type: flux.TString},
- {Label: "_value", Type: flux.TFloat},
- {Label: "tag2", Type: flux.TString},
- {Label: "tag1", Type: flux.TString},
- },
- Data: [][]interface{}{
- {execute.Time(11), "m", "_value", 2.0, "aa", "a"},
- {execute.Time(21), "m", "_value", 2.0, "bb", "a"},
- {execute.Time(21), "m", "_value", 1.0, "cc", "b"},
- {execute.Time(31), "m", "_value", 3.0, "dd", "a"},
- {execute.Time(41), "m", "_value", 4.0, "ee", "c"},
- },
- }},
- },
- },
- }
-
- for _, tc := range testCases {
- tc := tc
- t.Run(tc.name, func(t *testing.T) {
- deps := influxdb.Dependencies{
- FluxDeps: dependenciestest.Default(),
- StorageDeps: influxdb.StorageDependencies{
- ToDeps: mockDependencies(),
- },
- }
- executetest.ProcessTestHelper(
- t,
- tc.data,
- tc.want.tables,
- nil,
- func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
- ctx := deps.Inject(context.Background())
- newT, err := influxdb.NewToTransformation(ctx, d, c, tc.spec, deps.StorageDeps.ToDeps)
- if err != nil {
- t.Error(err)
- }
- return newT
- },
- )
- pw := deps.StorageDeps.ToDeps.PointsWriter.(*mock.PointsWriter)
- if len(pw.Points) != len(tc.want.result.Points) {
- t.Errorf("Expected result values to have length of %d but got %d", len(tc.want.result.Points), len(pw.Points))
- }
-
- gotStr := pointsToStr(pw.Points)
- wantStr := pointsToStr(tc.want.result.Points)
-
- if !cmp.Equal(gotStr, wantStr) {
- t.Errorf("got other than expected %s", cmp.Diff(gotStr, wantStr))
- }
- })
- }
-}
-
-func mockDependencies() influxdb.ToDependencies {
- return influxdb.ToDependencies{
- BucketLookup: mock.BucketLookup{},
- OrganizationLookup: mock.OrganizationLookup{},
- PointsWriter: new(mock.PointsWriter),
- }
-}
-
-func pointsToStr(points []models.Point) string {
- outStr := ""
- for _, x := range points {
- outStr += x.String() + "\n"
- }
- return outStr
-}
-
-func mockPoints(org, bucket platform.ID, pointdata string) []models.Point {
- name := tsdb.EncodeName(org, bucket)
- points, err := models.ParsePoints([]byte(pointdata), name[:])
- if err != nil {
- return nil
- }
- return points
-}
diff --git b/flux/stdlib/influxdata/influxdb/v1/databases.go a/flux/stdlib/influxdata/influxdb/v1/databases.go
index 6a6c59a76c..1779f411c5 100644
--- b/flux/stdlib/influxdata/influxdb/v1/databases.go
+++ a/flux/stdlib/influxdata/influxdb/v1/databases.go
@@ -2,8 +2,8 @@ package v1
import (
"context"
+ "errors"
"fmt"
- "time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
@@ -11,9 +11,9 @@ import (
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/stdlib/influxdata/influxdb/v1"
"github.com/influxdata/flux/values"
- platform "github.com/influxdata/influxdb"
- "github.com/influxdata/influxdb/query"
- "github.com/pkg/errors"
+ "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
+ "github.com/influxdata/influxdb/services/meta"
+ "github.com/influxdata/influxql"
)
const DatabasesKind = v1.DatabasesKind
@@ -67,9 +67,9 @@ func init() {
}
type DatabasesDecoder struct {
- orgID platform.ID
- deps *DatabasesDependencies
- databases []*platform.DBRPMapping
+ deps *influxdb.StorageDependencies
+ databases []meta.DatabaseInfo
+ user meta.User
alloc *memory.Allocator
}
@@ -78,45 +78,13 @@ func (bd *DatabasesDecoder) Connect(ctx context.Context) error {
}
func (bd *DatabasesDecoder) Fetch(ctx context.Context) (bool, error) {
- b, _, err := bd.deps.DBRP.FindMany(ctx, platform.DBRPMappingFilter{})
- if err != nil {
- return false, err
- }
- bd.databases = b
+ bd.databases = bd.deps.MetaClient.Databases()
return false, nil
}
func (bd *DatabasesDecoder) Decode(ctx context.Context) (flux.Table, error) {
- type databaseInfo struct {
- *platform.DBRPMapping
- RetentionPeriod time.Duration
- }
-
- databases := make([]databaseInfo, 0, len(bd.databases))
- for _, db := range bd.databases {
- bucket, err := bd.deps.BucketLookup.FindBucketByID(ctx, db.BucketID)
- if err != nil {
- code := platform.ErrorCode(err)
- if code == platform.EUnauthorized || code == platform.EForbidden {
- continue
- }
- return nil, err
- }
- databases = append(databases, databaseInfo{
- DBRPMapping: db,
- RetentionPeriod: bucket.RetentionPeriod,
- })
- }
-
- if len(databases) == 0 {
- return nil, &platform.Error{
- Code: platform.ENotFound,
- Msg: "no 1.x databases found",
- }
- }
-
kb := execute.NewGroupKeyBuilder(nil)
- kb.AddKeyValue("organizationID", values.NewString(databases[0].OrganizationID.String()))
+ kb.AddKeyValue("organizationID", values.NewString(""))
gk, err := kb.Build()
if err != nil {
return nil, err
@@ -160,13 +128,29 @@ func (bd *DatabasesDecoder) Decode(ctx context.Context) (flux.Table, error) {
return nil, err
}
- for _, db := range databases {
- _ = b.AppendString(0, db.OrganizationID.String())
- _ = b.AppendString(1, db.Database)
- _ = b.AppendString(2, db.RetentionPolicy)
- _ = b.AppendInt(3, db.RetentionPeriod.Nanoseconds())
- _ = b.AppendBool(4, db.Default)
- _ = b.AppendString(5, db.BucketID.String())
+ var hasAccess func(db string) bool
+ if bd.user == nil {
+ hasAccess = func(db string) bool {
+ return true
+ }
+ } else {
+ hasAccess = func(db string) bool {
+ return bd.deps.Authorizer.AuthorizeDatabase(bd.user, influxql.ReadPrivilege, db) == nil ||
+ bd.deps.Authorizer.AuthorizeDatabase(bd.user, influxql.WritePrivilege, db) == nil
+ }
+ }
+
+ for _, db := range bd.databases {
+ if hasAccess(db.Name) {
+ for _, rp := range db.RetentionPolicies {
+ _ = b.AppendString(0, "")
+ _ = b.AppendString(1, db.Name)
+ _ = b.AppendString(2, rp.Name)
+ _ = b.AppendInt(3, rp.Duration.Nanoseconds())
+ _ = b.AppendBool(4, db.DefaultRetentionPolicy == rp.Name)
+ _ = b.AppendString(5, "")
+ }
+ }
}
return b.Table()
@@ -181,41 +165,14 @@ func createDatabasesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a
if !ok {
return nil, fmt.Errorf("invalid spec type %T", prSpec)
}
- deps := GetDatabasesDependencies(a.Context())
- req := query.RequestFromContext(a.Context())
- if req == nil {
- return nil, errors.New("missing request on context")
+ deps := influxdb.GetStorageDependencies(a.Context())
+ var user meta.User
+ if deps.AuthEnabled {
+ user = meta.UserFromContext(a.Context())
+ if user == nil {
+ return nil, errors.New("createDatabasesSource: no user")
+ }
}
- orgID := req.OrganizationID
-
- bd := &DatabasesDecoder{orgID: orgID, deps: &deps, alloc: a.Allocator()}
-
+ bd := &DatabasesDecoder{deps: &deps, alloc: a.Allocator(), user: user}
return execute.CreateSourceFromDecoder(bd, dsid, a)
}
-
-type key int
-
-const dependenciesKey key = iota
-
-type DatabasesDependencies struct {
- DBRP platform.DBRPMappingService
- BucketLookup platform.BucketService
-}
-
-func (d DatabasesDependencies) Inject(ctx context.Context) context.Context {
- return context.WithValue(ctx, dependenciesKey, d)
-}
-
-func GetDatabasesDependencies(ctx context.Context) DatabasesDependencies {
- return ctx.Value(dependenciesKey).(DatabasesDependencies)
-}
-
-func (d DatabasesDependencies) Validate() error {
- if d.DBRP == nil {
- return errors.New("missing all databases lookup dependency")
- }
- if d.BucketLookup == nil {
- return errors.New("missing buckets lookup dependency")
- }
- return nil
-}