From e304ef9764ecb9070f29edd2bfbb22cb0c228137 Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Thu, 24 Mar 2022 07:28:38 -0500 Subject: [PATCH] fix: add write permissions check for DELETE and DROP MEASUREMENT (#23219) We previously allowed read tokens access to all of v1 query, including InfluxQL queries that made state changes to the DB, specifically, 'DELETE' and 'DROP MEASUREMENT'. This allowed tokens with only read permissions to delete points via the legacy /query endpoint. /api/v2/query was unaffected. This adjusts the behavior to verify that the token has write permissions when specifying 'DELETE' and 'DROP MEASUREMENT' InfluxQL queries. We follow the same pattern as other existing v1 failure scenarios and instead of failing hard with 401, we use ectx.Send() to send an error to the user (with 200 status): {"results":[{"statement_id":0,"error":"insufficient permissions"}]} Returning in this manner is consistent with Cloud 2, which also returns 200 with "insufficient permissions" for these two InfluxQL queries. To facilitate authorization unit tests, we add MustNewPermission() to testing/util.go. Closes: #22799 --- testing/util.go | 8 ++ v1/coordinator/statement_executor.go | 17 +++ v1/coordinator/statement_executor_test.go | 158 ++++++++++++++++++++++ 3 files changed, 183 insertions(+) diff --git a/testing/util.go b/testing/util.go index 9534a37920..11ebcd95c7 100644 --- a/testing/util.go +++ b/testing/util.go @@ -120,6 +120,14 @@ func MustCreateUsers(ctx context.Context, svc influxdb.UserService, us ...*influ } } +func MustNewPermission(a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission { + perm, err := influxdb.NewPermission(a, rt, orgID) + if err != nil { + panic(err) + } + return perm +} + func MustNewPermissionAtID(id platform.ID, a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission { perm, err := influxdb.NewPermissionAtID(id, a, rt, orgID) if err != nil { diff --git a/v1/coordinator/statement_executor.go b/v1/coordinator/statement_executor.go index 496f016c1b..01f43c5b47 100644 --- a/v1/coordinator/statement_executor.go +++ b/v1/coordinator/statement_executor.go @@ -372,6 +372,14 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(ctx context.Context, q return err } + // Require write for DELETE queries + _, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID) + if err != nil { + return ectx.Send(ctx, &query.Result{ + Err: fmt.Errorf("insufficient permissions"), + }) + } + // Convert "now()" to current time. q.Condition = influxql.Reduce(q.Condition, &influxql.NowValuer{Now: time.Now().UTC()}) @@ -383,6 +391,15 @@ func (e *StatementExecutor) executeDropMeasurementStatement(ctx context.Context, if err != nil { return err } + + // Require write for DROP MEASUREMENT queries + _, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID) + if err != nil { + return ectx.Send(ctx, &query.Result{ + Err: fmt.Errorf("insufficient permissions"), + }) + } + return e.TSDBStore.DeleteMeasurement(ctx, mapping.BucketID.String(), q.Name) } diff --git a/v1/coordinator/statement_executor_test.go b/v1/coordinator/statement_executor_test.go index 1131f3684f..4bb2a9d113 100644 --- a/v1/coordinator/statement_executor_test.go +++ b/v1/coordinator/statement_executor_test.go @@ -386,6 +386,164 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { } } +func testExecDeleteSeriesOrDropMeasurement(t *testing.T, qType string) { + orgID := platform.ID(0xff00) + otherOrgID := platform.ID(0xff01) + bucketID := platform.ID(0xffee) + otherBucketID := platform.ID(0xffef) + + qStr := qType + if qStr == "DELETE" { + qStr = "DELETE FROM" + } + qErr := errors.New("insufficient permissions") + + testCases := []struct { + name string + query string + permissions []influxdb.Permission + expectedErr error + }{ + // expected FAIL + { + name: fmt.Sprintf("read-only bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("read-only all buckets (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("write-only other bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(otherBucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("write-only other org (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID), + }, + expectedErr: qErr, + }, + { + name: fmt.Sprintf("read-write other org (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, otherOrgID), + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID), + }, + expectedErr: qErr, + }, + // expected PASS + { + name: fmt.Sprintf("write-only bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + { + name: fmt.Sprintf("write-only all buckets (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + { + name: fmt.Sprintf("read-write bucket (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + *itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + { + name: fmt.Sprintf("read-write all buckets (%s)", qType), + query: qStr, + permissions: []influxdb.Permission{ + *itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID), + *itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID), + }, + expectedErr: nil, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // setup a DBRP that we can use + dbrp := mocks.NewMockDBRPMappingService(ctrl) + db := "db0" + + empty := "" + isDefault := true + filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: nil, Default: &isDefault} + res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: empty, OrganizationID: orgID, BucketID: bucketID, Default: isDefault}} + dbrp.EXPECT(). + FindMany(gomock.Any(), filt). + Return(res, 1, nil) + + qe := DefaultQueryExecutor(t, WithDBRP(dbrp)) + + // assume storage succeeds if we get that far + qe.TSDBStore.DeleteSeriesFn = func(context.Context, string, []influxql.Source, influxql.Expr) error { + return nil + } + qe.TSDBStore.DeleteMeasurementFn = func(context.Context, string, string) error { + return nil + } + + ctx := context.Background() + ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{ + ID: orgID, + OrgID: orgID, + Status: influxdb.Active, + Permissions: testCase.permissions, + }) + + results := ReadAllResults(qe.ExecuteQuery(ctx, fmt.Sprintf("%s cpu", testCase.query), "db0", 0, orgID)) + + var exp []*query.Result + if testCase.expectedErr != nil { + exp = []*query.Result{ + { + StatementID: 0, + Err: testCase.expectedErr, + }, + } + } + if !reflect.DeepEqual(results, exp) { + t.Fatalf("unexpected results: exp %s, got %s", spew.Sdump(exp), spew.Sdump(results)) + } + }) + } +} + +func TestQueryExecutor_ExecuteQuery_DeleteSeries(t *testing.T) { + testExecDeleteSeriesOrDropMeasurement(t, "DELETE") +} + +func TestQueryExecutor_ExecuteQuery_DropMeasurement(t *testing.T) { + testExecDeleteSeriesOrDropMeasurement(t, "DROP MEASUREMENT") +} + // QueryExecutor is a test wrapper for coordinator.QueryExecutor. type QueryExecutor struct { *query.Executor