diff --git a/query/stdlib/influxdata/influxdb/provider.go b/query/stdlib/influxdata/influxdb/provider.go index e91c946319..a3c18f70ed 100644 --- a/query/stdlib/influxdata/influxdb/provider.go +++ b/query/stdlib/influxdata/influxdb/provider.go @@ -12,6 +12,8 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/values" + influxdb2 "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/authorizer" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/influxdb/v2/models" @@ -120,6 +122,15 @@ func (p Provider) WriterFor(ctx context.Context, conf influxdb.Config) (influxdb return nil, err } + // err will be set if we are not authorized, we don't care about the other return values. + _, _, err = authorizer.AuthorizeWrite(ctx, influxdb2.BucketsResourceType, bucketID, reqOrgID) + if err != nil { + return nil, &errors.Error{ + Code: errors.EForbidden, + Msg: "user not authorized to write", + } + } + return &localPointsWriter{ ctx: ctx, buf: make([]models.Point, 1<<14), diff --git a/query/stdlib/influxdata/influxdb/provider_test.go b/query/stdlib/influxdata/influxdb/provider_test.go index a6e8d9b99b..cff4070235 100644 --- a/query/stdlib/influxdata/influxdb/provider_test.go +++ b/query/stdlib/influxdata/influxdb/provider_test.go @@ -10,6 +10,8 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/table" "github.com/influxdata/flux/execute/table/static" + influxdb2 "github.com/influxdata/influxdb/v2" + context2 "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/influxdb/v2/mock" @@ -220,3 +222,84 @@ func TestProvider_SeriesCardinalityReader_MissingRequestContext(t *testing.T) { require.Equal(t, wantErr, gotErr) } + +func TestWriterFor(t *testing.T) { + t.Parallel() + + auth := influxdb2.Authorization{ + Status: influxdb2.Active, + Permissions: []influxdb2.Permission{ + { + Action: influxdb2.WriteAction, + Resource: influxdb2.Resource{ + Type: influxdb2.BucketsResourceType, + }, + }, + }, + } + + provider := influxdb.Provider{ + Reader: storageflux.NewReader(&mock.ReadsStore{}), + BucketLookup: mock.BucketLookup{}, + } + + conf := influxdb.Config{ + Bucket: influxdb.NameOrID{ + Name: "my-bucket", + }, + } + + ctx := context.Background() + req := query.Request{ + OrganizationID: platform.ID(2), + } + ctx = query.ContextWithRequest(ctx, &req) + ctx = context2.SetAuthorizer(ctx, &auth) + + _, gotErr := provider.WriterFor(ctx, conf) + + require.Nil(t, gotErr) +} + +func TestWriterFor_Error(t *testing.T) { + t.Parallel() + + auth := influxdb2.Authorization{ + Status: influxdb2.Active, + Permissions: []influxdb2.Permission{ + { + Action: influxdb2.ReadAction, + Resource: influxdb2.Resource{ + Type: influxdb2.BucketsResourceType, + }, + }, + }, + } + + provider := influxdb.Provider{ + Reader: storageflux.NewReader(&mock.ReadsStore{}), + BucketLookup: mock.BucketLookup{}, + } + + conf := influxdb.Config{ + Bucket: influxdb.NameOrID{ + Name: "my-bucket", + }, + } + + ctx := context.Background() + req := query.Request{ + OrganizationID: platform.ID(2), + } + ctx = query.ContextWithRequest(ctx, &req) + ctx = context2.SetAuthorizer(ctx, &auth) + + _, gotErr := provider.WriterFor(ctx, conf) + + wantErr := &errors.Error{ + Code: errors.EForbidden, + Msg: "user not authorized to write", + } + + require.Equal(t, wantErr, gotErr) +}