(feat/query): implement a buckets() source function that retrieves all buckets and retention policy information for the active organization. (#1045)
parent
1f98dc2d77
commit
ea4c08b327
8
go.mod
8
go.mod
|
@ -39,7 +39,7 @@ require (
|
|||
github.com/hashicorp/raft v1.0.0 // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/influxdata/flux v0.0.0-20181009183631-f95c57ab4e42
|
||||
github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f
|
||||
github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
|
||||
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
|
||||
|
@ -83,13 +83,13 @@ require (
|
|||
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 // indirect
|
||||
go.uber.org/zap v1.9.1
|
||||
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4
|
||||
golang.org/x/net v0.0.0-20181005035420-146acd28ed58
|
||||
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1
|
||||
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
|
||||
golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e
|
||||
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba
|
||||
golang.org/x/text v0.3.0
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
|
||||
golang.org/x/tools v0.0.0-20181009172131-6d96510a3a1c // indirect
|
||||
golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 // indirect
|
||||
google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c
|
||||
google.golang.org/grpc v1.15.0
|
||||
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5
|
||||
|
|
12
go.sum
12
go.sum
|
@ -117,8 +117,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
|
|||
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/influxdata/flux v0.0.0-20181009183631-f95c57ab4e42 h1:nMueWs0Rwz6G38UBuDq3tbarsyaR3M2znXLSSOltsOY=
|
||||
github.com/influxdata/flux v0.0.0-20181009183631-f95c57ab4e42/go.mod h1:BHspYxVDHrBApUfImcwa0ZOpRanbmO/ACD2iXbBD3Ic=
|
||||
github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f h1:vvHzSLXoF9n5Zq28QZzHUOhv5viN8JodORYq+Xk4TBg=
|
||||
github.com/influxdata/flux v0.0.0-20181012184356-59f53657bd7f/go.mod h1:BHspYxVDHrBApUfImcwa0ZOpRanbmO/ACD2iXbBD3Ic=
|
||||
github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg=
|
||||
github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec h1:TCUzgPkjJ/gcXXjxjssQoAYHeUxPHEdTz4XgKrNKw+I=
|
||||
github.com/influxdata/influxdb v0.0.0-20181009160823-86ac358448ec/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
|
||||
|
@ -262,6 +262,8 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r
|
|||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE=
|
||||
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w=
|
||||
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced h1:4oqSq7eft7MdPKBGQK11X9WYUxmj6ZLgGTqYIbY1kyw=
|
||||
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
|
@ -272,6 +274,8 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
|
|||
golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e h1:EfdBzeKbFSvOjoIqSZcfS8wp0FBLokGBEs9lz1OtSg0=
|
||||
golang.org/x/sys v0.0.0-20181005133103-4497e2df6f9e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba h1:nZJIJPGow0Kf9bU9QTc1U6OXbs/7Hu4e+cNv+hxH+Zc=
|
||||
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
|
||||
|
@ -279,8 +283,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
|
|||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181004163742-59602fdee893 h1:rsE8bdRd+SQZ1eQSuWpO3bw7AmfVa+vsnxkZ9tcPzAA=
|
||||
golang.org/x/tools v0.0.0-20181004163742-59602fdee893/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181009172131-6d96510a3a1c h1:mivpEQ5q6dnP/41ZBOAA5SKkZ+4qrLiHkhKKCcXQrYg=
|
||||
golang.org/x/tools v0.0.0-20181009172131-6d96510a3a1c/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 h1:3sQpWsmaX/260ENaYpHTEljOMDVUlW9WHBGg9wGAXJk=
|
||||
golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c h1:qSBE8MLMBtzNDa9QWZiS0qSIAYpU4BbVXbM70aNG55g=
|
||||
google.golang.org/api v0.0.0-20181003000758-f5c49d98d21c/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
|
|
|
@ -32,6 +32,19 @@ func (b *BucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool
|
|||
return bucket.ID, true
|
||||
}
|
||||
|
||||
func (b *BucketLookup) FindAllBuckets(orgID platform.ID) ([]*platform.Bucket, int) {
|
||||
oid := platform.ID(orgID)
|
||||
filter := platform.BucketFilter{
|
||||
OrganizationID: &oid,
|
||||
}
|
||||
buckets, count, err := b.BucketService.FindBuckets(context.Background(), filter)
|
||||
if err != nil {
|
||||
return nil, count
|
||||
}
|
||||
return buckets, count
|
||||
|
||||
}
|
||||
|
||||
// FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface.
|
||||
func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup {
|
||||
return &OrganizationLookup{OrganizationService: srv}
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
package inputs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/influxdata/flux/values"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/functions/inputs"
|
||||
"github.com/influxdata/flux/plan"
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
execute.RegisterSource(inputs.BucketsKind, createBucketsSource)
|
||||
}
|
||||
|
||||
type BucketsDecoder struct {
|
||||
orgID platform.ID
|
||||
deps BucketDependencies
|
||||
buckets []*platform.Bucket
|
||||
alloc *execute.Allocator
|
||||
}
|
||||
|
||||
func (bd *BucketsDecoder) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bd *BucketsDecoder) Fetch() (bool, error) {
|
||||
b, count := bd.deps.FindAllBuckets(bd.orgID)
|
||||
if count <= 0 {
|
||||
return false, fmt.Errorf("no buckets found in organization %v", bd.orgID)
|
||||
}
|
||||
bd.buckets = b
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (bd *BucketsDecoder) Decode() (flux.Table, error) {
|
||||
kb := execute.NewGroupKeyBuilder(nil)
|
||||
kb.AddKeyValue("organizationID", values.NewStringValue(bd.buckets[0].OrganizationID.String()))
|
||||
gk, err := kb.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b := execute.NewColListTableBuilder(gk, bd.alloc)
|
||||
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "name",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "id",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "organization",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "organizationID",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "retentionPolicy",
|
||||
Type: flux.TString,
|
||||
})
|
||||
b.AddCol(flux.ColMeta{
|
||||
Label: "retentionPeriod",
|
||||
Type: flux.TInt,
|
||||
})
|
||||
|
||||
for _, bucket := range bd.buckets {
|
||||
b.AppendString(0, bucket.Name)
|
||||
b.AppendString(1, bucket.ID.String())
|
||||
b.AppendString(2, bucket.Organization)
|
||||
b.AppendString(3, bucket.OrganizationID.String())
|
||||
b.AppendString(4, bucket.RetentionPolicyName)
|
||||
b.AppendInt(5, bucket.RetentionPeriod.Nanoseconds())
|
||||
}
|
||||
|
||||
return b.Table()
|
||||
}
|
||||
|
||||
func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
|
||||
_, ok := prSpec.(*inputs.BucketsProcedureSpec)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid spec type %T", prSpec)
|
||||
}
|
||||
|
||||
// the dependencies used for FromKind are adequate for what we need here
|
||||
// so there's no need to inject custom dependencies for buckets()
|
||||
deps := a.Dependencies()[inputs.BucketsKind].(BucketDependencies)
|
||||
req := query.RequestFromContext(a.Context())
|
||||
if req == nil {
|
||||
return nil, errors.New("missing request on context")
|
||||
}
|
||||
orgID := req.OrganizationID
|
||||
|
||||
bd := &BucketsDecoder{orgID: orgID, deps: deps, alloc: a.Allocator()}
|
||||
|
||||
return inputs.CreateSourceFromDecoder(bd, dsid, a)
|
||||
|
||||
}
|
||||
|
||||
type AllBucketLookup interface {
|
||||
FindAllBuckets(orgID platform.ID) ([]*platform.Bucket, int)
|
||||
}
|
||||
type BucketDependencies AllBucketLookup
|
||||
|
||||
func InjectBucketDependencies(depsMap execute.Dependencies, deps BucketDependencies) error {
|
||||
if deps == nil {
|
||||
return errors.New("missing all bucket lookup dependency")
|
||||
}
|
||||
depsMap[inputs.BucketsKind] = deps
|
||||
return nil
|
||||
}
|
|
@ -29,15 +29,18 @@ func NewProxyQueryService(engine *storage.Engine, bucketSvc platform.BucketServi
|
|||
Verbose: false,
|
||||
}
|
||||
|
||||
lookupSvc := query.FromBucketService(bucketSvc)
|
||||
err := inputs.InjectFromDependencies(cc.ExecutorDependencies, fstorage.Dependencies{
|
||||
Reader: reads.NewReader(newStore(engine)),
|
||||
BucketLookup: query.FromBucketService(bucketSvc),
|
||||
BucketLookup: lookupSvc,
|
||||
OrganizationLookup: query.FromOrganizationService(orgSvc),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = inputs.InjectBucketDependencies(cc.ExecutorDependencies, lookupSvc)
|
||||
|
||||
return query.ProxyQueryServiceBridge{
|
||||
QueryService: query.QueryServiceBridge{
|
||||
AsyncQueryService: &queryAdapter{
|
||||
|
|
Loading…
Reference in New Issue