fix: add write permissions check for DELETE and DROP MEASUREMENT (#23219)
We previously allowed read tokens access to all of v1 query, including
InfluxQL queries that made state changes to the DB, specifically,
'DELETE' and 'DROP MEASUREMENT'. This allowed tokens with only read
permissions to delete points via the legacy /query endpoint.
/api/v2/query was unaffected.
This adjusts the behavior to verify that the token has write permissions
when specifying 'DELETE' and 'DROP MEASUREMENT' InfluxQL queries. We
follow the same pattern as other existing v1 failure scenarios and
instead of failing hard with 401, we use ectx.Send() to send an error to
the user (with 200 status):
{"results":[{"statement_id":0,"error":"insufficient permissions"}]}
Returning in this manner is consistent with Cloud 2, which also returns
200 with "insufficient permissions" for these two InfluxQL queries.
To facilitate authorization unit tests, we add MustNewPermission() to
testing/util.go.
Closes: #22799
pull/23226/head
parent
5e3ea7b94c
commit
e304ef9764
|
|
@ -120,6 +120,14 @@ func MustCreateUsers(ctx context.Context, svc influxdb.UserService, us ...*influ
|
|||
}
|
||||
}
|
||||
|
||||
func MustNewPermission(a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission {
|
||||
perm, err := influxdb.NewPermission(a, rt, orgID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return perm
|
||||
}
|
||||
|
||||
func MustNewPermissionAtID(id platform.ID, a influxdb.Action, rt influxdb.ResourceType, orgID platform.ID) *influxdb.Permission {
|
||||
perm, err := influxdb.NewPermissionAtID(id, a, rt, orgID)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -372,6 +372,14 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(ctx context.Context, q
|
|||
return err
|
||||
}
|
||||
|
||||
// Require write for DELETE queries
|
||||
_, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID)
|
||||
if err != nil {
|
||||
return ectx.Send(ctx, &query.Result{
|
||||
Err: fmt.Errorf("insufficient permissions"),
|
||||
})
|
||||
}
|
||||
|
||||
// Convert "now()" to current time.
|
||||
q.Condition = influxql.Reduce(q.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
|
||||
|
||||
|
|
@ -383,6 +391,15 @@ func (e *StatementExecutor) executeDropMeasurementStatement(ctx context.Context,
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Require write for DROP MEASUREMENT queries
|
||||
_, _, err = authorizer.AuthorizeWrite(ctx, influxdb.BucketsResourceType, mapping.BucketID, ectx.OrgID)
|
||||
if err != nil {
|
||||
return ectx.Send(ctx, &query.Result{
|
||||
Err: fmt.Errorf("insufficient permissions"),
|
||||
})
|
||||
}
|
||||
|
||||
return e.TSDBStore.DeleteMeasurement(ctx, mapping.BucketID.String(), q.Name)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -386,6 +386,164 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testExecDeleteSeriesOrDropMeasurement(t *testing.T, qType string) {
|
||||
orgID := platform.ID(0xff00)
|
||||
otherOrgID := platform.ID(0xff01)
|
||||
bucketID := platform.ID(0xffee)
|
||||
otherBucketID := platform.ID(0xffef)
|
||||
|
||||
qStr := qType
|
||||
if qStr == "DELETE" {
|
||||
qStr = "DELETE FROM"
|
||||
}
|
||||
qErr := errors.New("insufficient permissions")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
query string
|
||||
permissions []influxdb.Permission
|
||||
expectedErr error
|
||||
}{
|
||||
// expected FAIL
|
||||
{
|
||||
name: fmt.Sprintf("read-only bucket (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: qErr,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("read-only all buckets (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: qErr,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("write-only other bucket (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermissionAtID(otherBucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: qErr,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("write-only other org (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID),
|
||||
},
|
||||
expectedErr: qErr,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("read-write other org (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, otherOrgID),
|
||||
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, otherOrgID),
|
||||
},
|
||||
expectedErr: qErr,
|
||||
},
|
||||
// expected PASS
|
||||
{
|
||||
name: fmt.Sprintf("write-only bucket (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("write-only all buckets (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("read-write bucket (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermissionAtID(bucketID, influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
|
||||
*itesting.MustNewPermissionAtID(bucketID, influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: fmt.Sprintf("read-write all buckets (%s)", qType),
|
||||
query: qStr,
|
||||
permissions: []influxdb.Permission{
|
||||
*itesting.MustNewPermission(influxdb.ReadAction, influxdb.BucketsResourceType, orgID),
|
||||
*itesting.MustNewPermission(influxdb.WriteAction, influxdb.BucketsResourceType, orgID),
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// setup a DBRP that we can use
|
||||
dbrp := mocks.NewMockDBRPMappingService(ctrl)
|
||||
db := "db0"
|
||||
|
||||
empty := ""
|
||||
isDefault := true
|
||||
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: nil, Default: &isDefault}
|
||||
res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: empty, OrganizationID: orgID, BucketID: bucketID, Default: isDefault}}
|
||||
dbrp.EXPECT().
|
||||
FindMany(gomock.Any(), filt).
|
||||
Return(res, 1, nil)
|
||||
|
||||
qe := DefaultQueryExecutor(t, WithDBRP(dbrp))
|
||||
|
||||
// assume storage succeeds if we get that far
|
||||
qe.TSDBStore.DeleteSeriesFn = func(context.Context, string, []influxql.Source, influxql.Expr) error {
|
||||
return nil
|
||||
}
|
||||
qe.TSDBStore.DeleteMeasurementFn = func(context.Context, string, string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = icontext.SetAuthorizer(ctx, &influxdb.Authorization{
|
||||
ID: orgID,
|
||||
OrgID: orgID,
|
||||
Status: influxdb.Active,
|
||||
Permissions: testCase.permissions,
|
||||
})
|
||||
|
||||
results := ReadAllResults(qe.ExecuteQuery(ctx, fmt.Sprintf("%s cpu", testCase.query), "db0", 0, orgID))
|
||||
|
||||
var exp []*query.Result
|
||||
if testCase.expectedErr != nil {
|
||||
exp = []*query.Result{
|
||||
{
|
||||
StatementID: 0,
|
||||
Err: testCase.expectedErr,
|
||||
},
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(results, exp) {
|
||||
t.Fatalf("unexpected results: exp %s, got %s", spew.Sdump(exp), spew.Sdump(results))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryExecutor_ExecuteQuery_DeleteSeries(t *testing.T) {
|
||||
testExecDeleteSeriesOrDropMeasurement(t, "DELETE")
|
||||
}
|
||||
|
||||
func TestQueryExecutor_ExecuteQuery_DropMeasurement(t *testing.T) {
|
||||
testExecDeleteSeriesOrDropMeasurement(t, "DROP MEASUREMENT")
|
||||
}
|
||||
|
||||
// QueryExecutor is a test wrapper for coordinator.QueryExecutor.
|
||||
type QueryExecutor struct {
|
||||
*query.Executor
|
||||
|
|
|
|||
Loading…
Reference in New Issue