From ea4c08b327efa62f870eb95ee217361e232e56dc Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 17 Oct 2018 10:21:39 -0400 Subject: [PATCH] (feat/query): implement a buckets() source function that retrieves all buckets and retention policy information for the active organization. (#1045) --- go.mod | 8 +- go.sum | 12 ++- query/dependency.go | 13 ++++ query/functions/inputs/buckets.go | 119 ++++++++++++++++++++++++++++++ storage/readservice/service.go | 5 +- 5 files changed, 148 insertions(+), 9 deletions(-) create mode 100644 query/functions/inputs/buckets.go diff --git a/go.mod b/go.mod index 2a5e1e9c66..bb59ce881b 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/hashicorp/raft v1.0.0 // indirect github.com/imdario/mergo v0.3.6 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect - github.com/influxdata/flux v0.0.0-20181009183631-f95c57ab4e42 + github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e @@ -83,13 +83,13 @@ require ( github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 // indirect go.uber.org/zap v1.9.1 golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4 - golang.org/x/net v0.0.0-20181005035420-146acd28ed58 + golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f - golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e + golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba golang.org/x/text v0.3.0 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 - golang.org/x/tools v0.0.0-20181009172131-6d96510a3a1c // indirect + golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 // indirect google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c google.golang.org/grpc v1.15.0 gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 diff --git a/go.sum b/go.sum index 8970990362..9786ade7a5 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/influxdata/flux v0.0.0-20181009183631-f95c57ab4e42 h1:nMueWs0Rwz6G38UBuDq3tbarsyaR3M2znXLSSOltsOY= -github.com/influxdata/flux v0.0.0-20181009183631-f95c57ab4e42/go.mod h1:BHspYxVDHrBApUfImcwa0ZOpRanbmO/ACD2iXbBD3Ic= +github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f h1:vvHzSLXoF9n5Zq28QZzHUOhv5viN8JodORYq+Xk4TBg= +github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f/go.mod h1:BHspYxVDHrBApUfImcwa0ZOpRanbmO/ACD2iXbBD3Ic= github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg= github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec h1:TCUzgPkjJ/gcXXjxjssQoAYHeUxPHEdTz4XgKrNKw+I= github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= @@ -262,6 +262,8 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE= golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w= +golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced h1:4oqSq7eft7MdPKBGQK11X9WYUxmj6ZLgGTqYIbY1kyw= golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -272,6 +274,8 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e h1:EfdBzeKbFSvOjoIqSZcfS8wp0FBLokGBEs9lz1OtSg0= golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba h1:nZJIJPGow0Kf9bU9QTc1U6OXbs/7Hu4e+cNv+hxH+Zc= +golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= @@ -279,8 +283,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181004163742-59602fdee893 h1:rsE8bdRd+SQZ1eQSuWpO3bw7AmfVa+vsnxkZ9tcPzAA= golang.org/x/tools v0.0.0-20181004163742-59602fdee893/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181009172131-6d96510a3a1c h1:mivpEQ5q6dnP/41ZBOAA5SKkZ+4qrLiHkhKKCcXQrYg= -golang.org/x/tools v0.0.0-20181009172131-6d96510a3a1c/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 h1:3sQpWsmaX/260ENaYpHTEljOMDVUlW9WHBGg9wGAXJk= +golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c h1:qSBE8MLMBtzNDa9QWZiS0qSIAYpU4BbVXbM70aNG55g= google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/query/dependency.go b/query/dependency.go index a7fd634a26..29e01e8953 100644 --- a/query/dependency.go +++ b/query/dependency.go @@ -32,6 +32,19 @@ func (b *BucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool return bucket.ID, true } +func (b *BucketLookup) FindAllBuckets(orgID platform.ID) ([]*platform.Bucket, int) { + oid := platform.ID(orgID) + filter := platform.BucketFilter{ + OrganizationID: &oid, + } + buckets, count, err := b.BucketService.FindBuckets(context.Background(), filter) + if err != nil { + return nil, count + } + return buckets, count + +} + // FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface. func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup { return &OrganizationLookup{OrganizationService: srv} diff --git a/query/functions/inputs/buckets.go b/query/functions/inputs/buckets.go new file mode 100644 index 0000000000..cfe49f6807 --- /dev/null +++ b/query/functions/inputs/buckets.go @@ -0,0 +1,119 @@ +package inputs + +import ( + "fmt" + "github.com/influxdata/flux/values" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/functions/inputs" + "github.com/influxdata/flux/plan" + "github.com/influxdata/platform" + "github.com/influxdata/platform/query" + "github.com/pkg/errors" +) + +func init() { + execute.RegisterSource(inputs.BucketsKind, createBucketsSource) +} + +type BucketsDecoder struct { + orgID platform.ID + deps BucketDependencies + buckets []*platform.Bucket + alloc *execute.Allocator +} + +func (bd *BucketsDecoder) Connect() error { + return nil +} + +func (bd *BucketsDecoder) Fetch() (bool, error) { + b, count := bd.deps.FindAllBuckets(bd.orgID) + if count <= 0 { + return false, fmt.Errorf("no buckets found in organization %v", bd.orgID) + } + bd.buckets = b + return false, nil +} + +func (bd *BucketsDecoder) Decode() (flux.Table, error) { + kb := execute.NewGroupKeyBuilder(nil) + kb.AddKeyValue("organizationID", values.NewStringValue(bd.buckets[0].OrganizationID.String())) + gk, err := kb.Build() + if err != nil { + return nil, err + } + + b := execute.NewColListTableBuilder(gk, bd.alloc) + + b.AddCol(flux.ColMeta{ + Label: "name", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "id", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "organization", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "organizationID", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "retentionPolicy", + Type: flux.TString, + }) + b.AddCol(flux.ColMeta{ + Label: "retentionPeriod", + Type: flux.TInt, + }) + + for _, bucket := range bd.buckets { + b.AppendString(0, bucket.Name) + b.AppendString(1, bucket.ID.String()) + b.AppendString(2, bucket.Organization) + b.AppendString(3, bucket.OrganizationID.String()) + b.AppendString(4, bucket.RetentionPolicyName) + b.AppendInt(5, bucket.RetentionPeriod.Nanoseconds()) + } + + return b.Table() +} + +func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) { + _, ok := prSpec.(*inputs.BucketsProcedureSpec) + if !ok { + return nil, fmt.Errorf("invalid spec type %T", prSpec) + } + + // the dependencies used for FromKind are adequate for what we need here + // so there's no need to inject custom dependencies for buckets() + deps := a.Dependencies()[inputs.BucketsKind].(BucketDependencies) + req := query.RequestFromContext(a.Context()) + if req == nil { + return nil, errors.New("missing request on context") + } + orgID := req.OrganizationID + + bd := &BucketsDecoder{orgID: orgID, deps: deps, alloc: a.Allocator()} + + return inputs.CreateSourceFromDecoder(bd, dsid, a) + +} + +type AllBucketLookup interface { + FindAllBuckets(orgID platform.ID) ([]*platform.Bucket, int) +} +type BucketDependencies AllBucketLookup + +func InjectBucketDependencies(depsMap execute.Dependencies, deps BucketDependencies) error { + if deps == nil { + return errors.New("missing all bucket lookup dependency") + } + depsMap[inputs.BucketsKind] = deps + return nil +} diff --git a/storage/readservice/service.go b/storage/readservice/service.go index abfa904905..d3ebca8c9c 100644 --- a/storage/readservice/service.go +++ b/storage/readservice/service.go @@ -29,15 +29,18 @@ func NewProxyQueryService(engine *storage.Engine, bucketSvc platform.BucketServi Verbose: false, } + lookupSvc := query.FromBucketService(bucketSvc) err := inputs.InjectFromDependencies(cc.ExecutorDependencies, fstorage.Dependencies{ Reader: reads.NewReader(newStore(engine)), - BucketLookup: query.FromBucketService(bucketSvc), + BucketLookup: lookupSvc, OrganizationLookup: query.FromOrganizationService(orgSvc), }) if err != nil { return nil, err } + err = inputs.InjectBucketDependencies(cc.ExecutorDependencies, lookupSvc) + return query.ProxyQueryServiceBridge{ QueryService: query.QueryServiceBridge{ AsyncQueryService: &queryAdapter{