added physical implementation for the buckets() function (#10373)
parent
0734f6fe21
commit
74f2cbe500
|
@ -388,7 +388,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:ee19d49e5e08eed14432f6b082fae33db9e3da7f2ea049254f8276036e9c7cc9"
|
||||
digest = "1:cc977ce9615b1e9e23fe61d1fb36845a971199193c174968fdb017ec58913b79"
|
||||
name = "github.com/influxdata/flux"
|
||||
packages = [
|
||||
".",
|
||||
|
@ -409,7 +409,7 @@
|
|||
"values",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "331c3678304cd48fac3e47f4f80cdfe27a8e880d"
|
||||
revision = "59f53657bd7fb74bcfa5af586be28214239d7556"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
|
@ -972,6 +972,7 @@
|
|||
"github.com/influxdata/flux/lang",
|
||||
"github.com/influxdata/flux/options",
|
||||
"github.com/influxdata/flux/plan",
|
||||
"github.com/influxdata/flux/values",
|
||||
"github.com/influxdata/influxql",
|
||||
"github.com/influxdata/platform/models",
|
||||
"github.com/influxdata/platform/query/functions/inputs/storage",
|
||||
|
|
|
@ -5,11 +5,12 @@ import (
|
|||
"github.com/influxdata/flux/execute"
|
||||
_ "github.com/influxdata/influxdb/flux/builtin"
|
||||
"github.com/influxdata/influxdb/flux/functions/inputs"
|
||||
"github.com/influxdata/influxdb/services/storage"
|
||||
"github.com/influxdata/platform/storage/reads"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func NewController(s reads.Store, logger *zap.Logger) *control.Controller {
|
||||
func NewController(s *storage.Store, logger *zap.Logger) *control.Controller {
|
||||
// flux
|
||||
var (
|
||||
concurrencyQuota = 10
|
||||
|
@ -28,5 +29,11 @@ func NewController(s reads.Store, logger *zap.Logger) *control.Controller {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = inputs.InjectBucketDependencies(cc.ExecutorDependencies, s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return control.New(cc)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
package inputs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/functions/inputs"
|
||||
"github.com/influxdata/flux/plan"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/influxdata/influxdb/services/storage"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
execute.RegisterSource(inputs.BucketsKind, createBucketsSource)
|
||||
}
|
||||
|
||||
type BucketsDecoder struct {
|
||||
deps BucketDependencies
|
||||
alloc *execute.Allocator
|
||||
}
|
||||
|
||||
func (bd *BucketsDecoder) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bd *BucketsDecoder) Fetch() (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (bd *BucketsDecoder) Decode() (flux.Table, error) {
|
||||
kb := execute.NewGroupKeyBuilder(nil)
|
||||
kb.AddKeyValue("organizationID", values.NewStringValue("influxdb"))
|
||||
gk, err := kb.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b := execute.NewColListTableBuilder(gk, bd.alloc)
|
||||
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "name",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "id",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "organization",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "organizationID",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "retentionPolicy",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "retentionPeriod",
|
||||
Type: flux.TInt,
|
||||
})
|
||||
|
||||
for _, database := range bd.deps.TSDBStore.Databases() {
|
||||
bucket := bd.deps.MetaClient.Database(database)
|
||||
rp := bucket.RetentionPolicy(bucket.DefaultRetentionPolicy)
|
||||
b.AppendString(0, bucket.Name)
|
||||
b.AppendString(1, "")
|
||||
b.AppendString(2, "influxdb")
|
||||
b.AppendString(3, "")
|
||||
b.AppendString(4, rp.Name)
|
||||
b.AppendInt(5, rp.Duration.Nanoseconds())
|
||||
}
|
||||
|
||||
return b.Table()
|
||||
}
|
||||
|
||||
func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
|
||||
_, ok := prSpec.(*inputs.BucketsProcedureSpec)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid spec type %T", prSpec)
|
||||
}
|
||||
|
||||
// the dependencies used for FromKind are adequate for what we need here
|
||||
// so there's no need to inject custom dependencies for buckets()
|
||||
deps := a.Dependencies()[inputs.BucketsKind].(BucketDependencies)
|
||||
|
||||
bd := &BucketsDecoder{deps: deps, alloc: a.Allocator()}
|
||||
|
||||
return inputs.CreateSourceFromDecoder(bd, dsid, a)
|
||||
|
||||
}
|
||||
|
||||
type BucketDependencies *storage.Store
|
||||
|
||||
func InjectBucketDependencies(depsMap execute.Dependencies, deps BucketDependencies) error {
|
||||
if deps == nil {
|
||||
return errors.New("bucket store dependency")
|
||||
}
|
||||
depsMap[inputs.BucketsKind] = deps
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue