fix(query): use auth-wrapped org and bucket services for query

pull/14400/head
Christopher Wolff 2019-07-19 16:04:50 -07:00 committed by Nathaniel Cook
parent e0325296d3
commit 63c0f40548
11 changed files with 70 additions and 29 deletions

View File

@ -511,8 +511,10 @@ func (m *Launcher) run(ctx context.Context) (err error) {
Logger: m.logger.With(zap.String("service", "storage-reads")),
}
authBucketSvc := authorizer.NewBucketService(bucketSvc)
authOrgSvc := authorizer.NewOrgService(orgSvc)
if err := readservice.AddControllerConfigDependencies(
&cc, m.engine, bucketSvc, orgSvc,
&cc, m.engine, authBucketSvc, authOrgSvc,
); err != nil {
m.logger.Error("Failed to configure query controller dependencies", zap.Error(err))
return err

View File

@ -18,7 +18,9 @@ import (
"github.com/influxdata/flux/lang"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
influxdbcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/query"
)
@ -188,7 +190,8 @@ func (tl *TestLauncher) MustExecuteQuery(query string) *QueryResults {
// ExecuteQuery executes the provided query against the ith query node.
// Callers of ExecuteQuery must call Done on the returned QueryResults.
func (tl *TestLauncher) ExecuteQuery(q string) (*QueryResults, error) {
fq, err := tl.QueryController().Query(context.Background(), &query.Request{
ctx := influxdbcontext.SetAuthorizer(context.Background(), &mock.Authorization{})
fq, err := tl.QueryController().Query(ctx, &query.Request{
Authorization: tl.Auth,
OrganizationID: tl.Auth.OrgID,
Compiler: lang.FluxCompiler{

2
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
github.com/influxdata/flux v0.36.2
github.com/influxdata/flux v0.36.3-0.20190719233022-0464a6216e79
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jessevdk/go-flags v1.4.0

4
go.sum
View File

@ -195,8 +195,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.36.2 h1:gCM+EcY7v+tE+HvSt5S7xUtG4tyJIkwrSuGJmaHfLOI=
github.com/influxdata/flux v0.36.2/go.mod h1:Pbi7l/bTezrTajfFwkx/wYp69A2QCS23AetZQU/8aVQ=
github.com/influxdata/flux v0.36.3-0.20190719233022-0464a6216e79 h1:fa3RvsBeVSWxVyJDlHLPP2/lyirqVaRmup1h8L3UAr0=
github.com/influxdata/flux v0.36.3-0.20190719233022-0464a6216e79/go.mod h1:Pbi7l/bTezrTajfFwkx/wYp69A2QCS23AetZQU/8aVQ=
github.com/influxdata/goreleaser v0.97.0-influx h1:jT5OrcW7WfS0e2QxfwmTBjhLvpIC9CDLRhNgZJyhj8s=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=

33
mock/authorization.go Normal file
View File

@ -0,0 +1,33 @@
package mock
import (
platform "github.com/influxdata/influxdb"
)
// Authorization is an Authorizer that always allows everything
type Authorization struct {
}
func (Authorization) Allowed(p platform.Permission) bool {
return true
}
func (Authorization) Identifier() platform.ID {
return mustID("beefdeaddeadbeef")
}
func (Authorization) GetUserID() platform.ID {
return mustID("deadbeefbeefdead")
}
func (Authorization) Kind() string {
return "mock-authorizer"
}
func mustID(str string) platform.ID {
id, err := platform.IDFromString(str)
if err != nil {
panic(err)
}
return *id
}

View File

@ -2,7 +2,6 @@ package query
import (
"context"
platform "github.com/influxdata/influxdb"
)
@ -32,12 +31,12 @@ func (b *BucketLookup) Lookup(ctx context.Context, orgID platform.ID, name strin
return bucket.ID, true
}
func (b *BucketLookup) FindAllBuckets(orgID platform.ID) ([]*platform.Bucket, int) {
func (b *BucketLookup) FindAllBuckets(ctx context.Context, orgID platform.ID) ([]*platform.Bucket, int) {
oid := platform.ID(orgID)
filter := platform.BucketFilter{
OrganizationID: &oid,
}
buckets, count, err := b.BucketService.FindBuckets(context.Background(), filter)
buckets, count, err := b.BucketService.FindBuckets(ctx, filter)
if err != nil {
return nil, count
}

View File

@ -1,6 +1,7 @@
package influxdb
import (
"context"
"errors"
"fmt"
@ -26,12 +27,12 @@ type BucketsDecoder struct {
alloc *memory.Allocator
}
func (bd *BucketsDecoder) Connect() error {
func (bd *BucketsDecoder) Connect(ctx context.Context) error {
return nil
}
func (bd *BucketsDecoder) Fetch() (bool, error) {
b, count := bd.deps.FindAllBuckets(bd.orgID)
func (bd *BucketsDecoder) Fetch(ctx context.Context) (bool, error) {
b, count := bd.deps.FindAllBuckets(ctx, bd.orgID)
if count <= 0 {
return false, &flux.Error{
Code: codes.NotFound,
@ -42,7 +43,7 @@ func (bd *BucketsDecoder) Fetch() (bool, error) {
return false, nil
}
func (bd *BucketsDecoder) Decode() (flux.Table, error) {
func (bd *BucketsDecoder) Decode(ctx context.Context) (flux.Table, error) {
kb := execute.NewGroupKeyBuilder(nil)
kb.AddKeyValue("organizationID", values.NewString(bd.buckets[0].OrgID.String()))
gk, err := kb.Build()
@ -125,7 +126,7 @@ func createBucketsSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
}
type AllBucketLookup interface {
FindAllBuckets(orgID platform.ID) ([]*platform.Bucket, int)
FindAllBuckets(ctx context.Context, orgID platform.ID) ([]*platform.Bucket, int)
}
type BucketDependencies AllBucketLookup

View File

@ -258,7 +258,7 @@ func createToTransformation(id execute.DatasetID, mode execute.AccumulationMode,
d := execute.NewDataset(id, mode, cache)
deps := a.Dependencies()[ToKind].(ToDependencies)
t, err := NewToTransformation(d, cache, s, deps)
t, err := NewToTransformation(a.Context(), d, cache, s, deps)
if err != nil {
return nil, nil, err
}
@ -267,6 +267,7 @@ func createToTransformation(id execute.DatasetID, mode execute.AccumulationMode,
// ToTransformation is the transformation for the `to` flux function.
type ToTransformation struct {
ctx context.Context
d execute.Dataset
fn *execute.RowMapFn
cache execute.TableBuilderCache
@ -281,7 +282,7 @@ func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey)
}
// NewToTransformation returns a new *ToTransformation with the appropriate fields set.
func NewToTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *ToProcedureSpec, deps ToDependencies) (*ToTransformation, error) {
func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, spec *ToProcedureSpec, deps ToDependencies) (*ToTransformation, error) {
var fn *execute.RowMapFn
var err error
@ -292,6 +293,7 @@ func NewToTransformation(d execute.Dataset, cache execute.TableBuilderCache, spe
}
return &ToTransformation{
ctx: ctx,
d: d,
fn: fn,
cache: cache,
@ -327,7 +329,7 @@ func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
addTagsFromTable(t.spec.Spec, tbl, excludeColumns)
}
return writeTable(t, tbl)
return writeTable(t.ctx, t, tbl)
}
// fieldFunctionVisitor implements semantic.Visitor.
@ -461,8 +463,8 @@ func (s Stats) Update(o Stats) {
}
}
func writeTable(t *ToTransformation, tbl flux.Table) error {
span, ctx := tracing.StartSpanFromContext(context.TODO())
func writeTable(ctx context.Context, t *ToTransformation, tbl flux.Table) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var bucketID, orgID *platform.ID
@ -473,7 +475,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
// Get organization ID
if spec.Org != "" {
oID, ok := d.OrganizationLookup.Lookup(context.TODO(), spec.Org)
oID, ok := d.OrganizationLookup.Lookup(ctx, spec.Org)
if !ok {
return &flux.Error{
Code: codes.NotFound,
@ -652,7 +654,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
return err
}
}
return d.PointsWriter.WritePoints(context.TODO(), points)
return d.PointsWriter.WritePoints(ctx, points)
})
}

View File

@ -718,7 +718,7 @@ c _hello=4 41`),
tc.want.tables,
nil,
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
newT, _ := influxdb.NewToTransformation(d, c, tc.spec, deps)
newT, _ := influxdb.NewToTransformation(context.Background(), d, c, tc.spec, deps)
return newT
},
)

View File

@ -70,15 +70,14 @@ type DatabasesDecoder struct {
deps *DatabasesDependencies
databases []*platform.DBRPMapping
alloc *memory.Allocator
ctx context.Context
}
func (bd *DatabasesDecoder) Connect() error {
func (bd *DatabasesDecoder) Connect(ctx context.Context) error {
return nil
}
func (bd *DatabasesDecoder) Fetch() (bool, error) {
b, _, err := bd.deps.DBRP.FindMany(bd.ctx, platform.DBRPMappingFilter{})
func (bd *DatabasesDecoder) Fetch(ctx context.Context) (bool, error) {
b, _, err := bd.deps.DBRP.FindMany(ctx, platform.DBRPMappingFilter{})
if err != nil {
return false, err
}
@ -86,7 +85,7 @@ func (bd *DatabasesDecoder) Fetch() (bool, error) {
return false, nil
}
func (bd *DatabasesDecoder) Decode() (flux.Table, error) {
func (bd *DatabasesDecoder) Decode(ctx context.Context) (flux.Table, error) {
kb := execute.NewGroupKeyBuilder(nil)
if len(bd.databases) == 0 {
return nil, errors.New("no 1.x databases found")
@ -137,7 +136,7 @@ func (bd *DatabasesDecoder) Decode() (flux.Table, error) {
}
for _, db := range bd.databases {
if bucket, err := bd.deps.BucketLookup.FindBucketByID(bd.ctx, db.BucketID); err != nil {
if bucket, err := bd.deps.BucketLookup.FindBucketByID(ctx, db.BucketID); err != nil {
return nil, err
} else {
_ = b.AppendString(0, db.OrganizationID.String())
@ -171,7 +170,7 @@ func createDatabasesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a
}
orgID := req.OrganizationID
bd := &DatabasesDecoder{orgID: orgID, deps: &deps, alloc: a.Allocator(), ctx: a.Context()}
bd := &DatabasesDecoder{orgID: orgID, deps: &deps, alloc: a.Allocator()}
return execute.CreateSourceFromDecoder(bd, dsid, a)
}

View File

@ -16,6 +16,8 @@ import (
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
influxdbcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/query"
itesting "github.com/influxdata/influxdb/query/stdlib/testing"
@ -24,7 +26,7 @@ import (
)
// Default context.
var ctx = context.Background()
var ctx = influxdbcontext.SetAuthorizer(context.Background(), &mock.Authorization{})
func init() {
flux.FinalizeBuiltIns()