diff --git a/testing/util.go b/testing/util.go index 9534a37920..11ebcd95c7 100644 --- a/testing/util.go +++ b/testing/util.go @@ -120,6 +120,14 @@ func MustCreateUsers(ctx context.Context, svc influxdb.UserService, us ...*influ } } +func MustNewPermission(a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission { + perm, err := influxdb.NewPermission(a, rt, orgID) + if err != nil { + panic(err) + } + return perm +} + func MustNewPermissionAtID(id platform.ID, a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission { perm, err := influxdb.NewPermissionAtID(id, a, rt, orgID) if err != nil { diff --git a/v1/coordinator/statement_executor.go b/v1/coordinator/statement_executor.go index 496f016c1b..01f43c5b47 100644 --- a/v1/coordinator/statement_executor.go +++ b/v1/coordinator/statement_executor.go @@ -372,6 +372,14 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(ctx context.Context, q return err } + // Require write for DELETE queries + _, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID) + if err != nil { + return ectx.Send(ctx, &query.Result{ + Err: fmt.Errorf("insufficient permissions"), + }) + } + // Convert "now()" to current time. q.Condition = influxql.Reduce(q.Condition, &influxql.NowValuer{Now: time.Now().UTC()}) @@ -383,6 +391,15 @@ func (e *StatementExecutor) executeDropMeasurementStatement(ctx context.Context, if err != nil { return err } + + // Require write for DROP MEASUREMENT queries + _, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID) + if err != nil { + return ectx.Send(ctx, &query.Result{ + Err: fmt.Errorf("insufficient permissions"), + }) + } + return e.TSDBStore.DeleteMeasurement(ctx, mapping.BucketID.String(), q.Name) } diff --git a/v1/coordinator/statement_executor_test.go b/v1/coordinator/statement_executor_test.go index 1131f3684f..4bb2a9d113 100644 --- a/v1/coordinator/statement_executor_test.go +++ b/v1/coordinator/statement_executor_test.go @@ -386,6 +386,164 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { } } +func testExecDeleteSeriesOrDropMeasurement(t *testing.T, qType string) { + orgID := platform.ID(0xff00) + otherOrgID := platform.ID(0xff01) + bucketID := platform.ID(0xffee) + otherBucketID := platform.ID(0xffef) + + qStr := qType + if qStr == "DELETE" { + qStr = "DELETE FROM" + } + qErr := errors.New("insufficient permissions") + + testCases := []struct { + name string + query string + permissions []influxdb.Permission + expectedErr error + }{ + // expected FAIL + { + name: fmt.Sprintf("read-only bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("read-only all buckets (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("write-only other bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(otherBucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("write-only other org (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("read-write other org (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, otherOrgID), + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID), + }, + expectedErr: qErr, + }, + // expected PASS + { + name: fmt.Sprintf("write-only bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + { + name: fmt.Sprintf("write-only all buckets (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + { + name: fmt.Sprintf("read-write bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + *itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + { + name: fmt.Sprintf("read-write all buckets (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // setup a DBRP that we can use + dbrp := mocks.NewMockDBRPMappingService(ctrl) + db := "db0" + + empty := "" + isDefault := true + filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: nil, Default: &isDefault} + res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: empty, OrganizationID: orgID, BucketID: bucketID, Default: isDefault}} + dbrp.EXPECT(). + FindMany(gomock.Any(), filt). + Return(res, 1, nil) + + qe := DefaultQueryExecutor(t, WithDBRP(dbrp)) + + // assume storage succeeds if we get that far + qe.TSDBStore.DeleteSeriesFn = func(context.Context, string, []influxql.Source, influxql.Expr) error { + return nil + } + qe.TSDBStore.DeleteMeasurementFn = func(context.Context, string, string) error { + return nil + } + + ctx := context.Background() + ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{ + ID: orgID, + OrgID: orgID, + Status: influxdb.Active, + Permissions: testCase.permissions, + }) + + results := ReadAllResults(qe.ExecuteQuery(ctx, fmt.Sprintf("%s cpu", testCase.query), "db0", 0, orgID)) + + var exp []*query.Result + if testCase.expectedErr != nil { + exp = []*query.Result{ + { + StatementID: 0, + Err: testCase.expectedErr, + }, + } + } + if !reflect.DeepEqual(results, exp) { + t.Fatalf("unexpected results: exp %s, got %s", spew.Sdump(exp), spew.Sdump(results)) + } + }) + } +} + +func TestQueryExecutor_ExecuteQuery_DeleteSeries(t *testing.T) { + testExecDeleteSeriesOrDropMeasurement(t, "DELETE") +} + +func TestQueryExecutor_ExecuteQuery_DropMeasurement(t *testing.T) { + testExecDeleteSeriesOrDropMeasurement(t, "DROP MEASUREMENT") +} + // QueryExecutor is a test wrapper for coordinator.QueryExecutor. type QueryExecutor struct { *query.Executor