diff --git a/Gopkg.lock b/Gopkg.lock index 41c0976134..00e5a4713b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -388,7 +388,7 @@ [[projects]] branch = "master" - digest = "1:ee19d49e5e08eed14432f6b082fae33db9e3da7f2ea049254f8276036e9c7cc9" + digest = "1:cc977ce9615b1e9e23fe61d1fb36845a971199193c174968fdb017ec58913b79" name = "github.com/influxdata/flux" packages = [ ".", @@ -409,7 +409,7 @@ "values", ] pruneopts = "UT" - revision = "331c3678304cd48fac3e47f4f80cdfe27a8e880d" + revision = "59f53657bd7fb74bcfa5af586be28214239d7556" [[projects]] branch = "master" @@ -972,6 +972,7 @@ "github.com/influxdata/flux/lang", "github.com/influxdata/flux/options", "github.com/influxdata/flux/plan", + "github.com/influxdata/flux/values", "github.com/influxdata/influxql", "github.com/influxdata/platform/models", "github.com/influxdata/platform/query/functions/inputs/storage", diff --git a/flux/control/controller.go b/flux/control/controller.go index a2cc5fe479..086d0582cb 100644 --- a/flux/control/controller.go +++ b/flux/control/controller.go @@ -5,11 +5,12 @@ import ( "github.com/influxdata/flux/execute" _ "github.com/influxdata/influxdb/flux/builtin" "github.com/influxdata/influxdb/flux/functions/inputs" + "github.com/influxdata/influxdb/services/storage" "github.com/influxdata/platform/storage/reads" "go.uber.org/zap" ) -func NewController(s reads.Store, logger *zap.Logger) *control.Controller { +func NewController(s *storage.Store, logger *zap.Logger) *control.Controller { // flux var ( concurrencyQuota = 10 @@ -28,5 +29,11 @@ func NewController(s reads.Store, logger *zap.Logger) *control.Controller { if err != nil { panic(err) } + + err = inputs.InjectBucketDependencies(cc.ExecutorDependencies, s) + if err != nil { + panic(err) + } + return control.New(cc) } diff --git a/flux/functions/inputs/buckets.go b/flux/functions/inputs/buckets.go new file mode 100644 index 0000000000..8838003902 --- /dev/null +++ b/flux/functions/inputs/buckets.go @@ -0,0 +1,104 @@ +package inputs + +import ( + "fmt" + "github.com/influxdata/flux" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/functions/inputs" + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/values" + "github.com/influxdata/influxdb/services/storage" + "github.com/pkg/errors" +) + +func init() { + execute.RegisterSource(inputs.BucketsKind, createBucketsSource) +} + +type BucketsDecoder struct { + deps BucketDependencies + alloc *execute.Allocator +} + +func (bd *BucketsDecoder) Connect() error { + return nil +} + +func (bd *BucketsDecoder) Fetch() (bool, error) { + return false, nil +} + +func (bd *BucketsDecoder) Decode() (flux.Table, error) { + kb := execute.NewGroupKeyBuilder(nil) + kb.AddKeyValue("organizationID", values.NewStringValue("influxdb")) + gk, err := kb.Build() + if err != nil { + return nil, err + } + + b := execute.NewColListTableBuilder(gk, bd.alloc) + + b.AddCol(flux.ColMeta{ + Label: "name", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "id", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "organization", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "organizationID", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "retentionPolicy", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "retentionPeriod", + Type: flux.TInt, + }) + + for _, database := range bd.deps.TSDBStore.Databases() { + bucket := bd.deps.MetaClient.Database(database) + rp := bucket.RetentionPolicy(bucket.DefaultRetentionPolicy) + b.AppendString(0, bucket.Name) + b.AppendString(1, "") + b.AppendString(2, "influxdb") + b.AppendString(3, "") + b.AppendString(4, rp.Name) + b.AppendInt(5, rp.Duration.Nanoseconds()) + } + + return b.Table() +} + +func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) { + _, ok := prSpec.(*inputs.BucketsProcedureSpec) + if !ok { + return nil, fmt.Errorf("invalid spec type %T", prSpec) + } + + // the dependencies used for FromKind are adequate for what we need here + // so there's no need to inject custom dependencies for buckets() + deps := a.Dependencies()[inputs.BucketsKind].(BucketDependencies) + + bd := &BucketsDecoder{deps: deps, alloc: a.Allocator()} + + return inputs.CreateSourceFromDecoder(bd, dsid, a) + +} + +type BucketDependencies *storage.Store + +func InjectBucketDependencies(depsMap execute.Dependencies, deps BucketDependencies) error { + if deps == nil { + return errors.New("bucket store dependency") + } + depsMap[inputs.BucketsKind] = deps + return nil +}