fix: improve delete speed when a measurement is part of the predicate (#23786)
* fix: improve delete speed when a measurement is part of the predicate * test: add test for deleting measurement by predicate * chore: improve error messaging and capturing * chore: set goland to use the right formatting stylepull/23812/head
parent
4ed184dd82
commit
2ad8995355
|
@ -7,6 +7,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/http"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
|
@ -119,8 +121,8 @@ func (t *TemporaryEngine) SeriesCardinality(ctx context.Context, bucketID platfo
|
|||
}
|
||||
|
||||
// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
|
||||
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred)
|
||||
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
|
||||
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, measurement)
|
||||
}
|
||||
|
||||
func (t *TemporaryEngine) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||
|
|
|
@ -3,6 +3,8 @@ package influxdb
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
)
|
||||
|
||||
|
@ -15,5 +17,5 @@ type Predicate interface {
|
|||
|
||||
// DeleteService will delete a bucket from the range and predict.
|
||||
type DeleteService interface {
|
||||
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred Predicate) error
|
||||
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred Predicate, measurement influxql.Expr) error
|
||||
}
|
||||
|
|
|
@ -5,9 +5,12 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
http "net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
pcontext "github.com/influxdata/influxdb/v2/context"
|
||||
|
@ -91,7 +94,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
dr, err := decodeDeleteRequest(
|
||||
dr, measurement, err := decodeDeleteRequest(
|
||||
ctx, r,
|
||||
h.OrganizationService,
|
||||
h.BucketService,
|
||||
|
@ -121,7 +124,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil {
|
||||
if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate, measurement); err != nil {
|
||||
h.HandleHTTPError(ctx, &errors.Error{
|
||||
Code: errors.EInternal,
|
||||
Op: "http/handleDelete",
|
||||
|
@ -139,24 +142,78 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) {
|
||||
func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, influxql.Expr, error) {
|
||||
dr := new(deleteRequest)
|
||||
err := json.NewDecoder(r.Body).Decode(dr)
|
||||
buf, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return nil, &errors.Error{
|
||||
je := &errors.Error{
|
||||
Code: errors.EInvalid,
|
||||
Msg: "invalid request; error parsing request json",
|
||||
Msg: "error reading json body",
|
||||
Err: err,
|
||||
}
|
||||
return nil, nil, je
|
||||
}
|
||||
buffer := bytes.NewBuffer(buf)
|
||||
err = json.NewDecoder(buffer).Decode(dr)
|
||||
if err != nil {
|
||||
je := &errors.Error{
|
||||
Code: errors.EInvalid,
|
||||
Msg: "error decoding json body",
|
||||
Err: err,
|
||||
}
|
||||
return nil, nil, je
|
||||
}
|
||||
|
||||
var drd deleteRequestDecode
|
||||
err = json.Unmarshal(buf, &drd)
|
||||
if err != nil {
|
||||
je := &errors.Error{
|
||||
Code: errors.EInvalid,
|
||||
Msg: "error decoding json body for predicate",
|
||||
Err: err,
|
||||
}
|
||||
return nil, nil, je
|
||||
}
|
||||
var measurementExpr influxql.Expr
|
||||
if drd.Predicate != "" {
|
||||
expr, err := influxql.ParseExpr(drd.Predicate)
|
||||
if err != nil {
|
||||
return nil, nil, &errors.Error{
|
||||
Code: errors.EInvalid,
|
||||
Msg: "invalid request; error parsing predicate",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
measurementExpr, _, err = influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if ok && tag.Val == "_measurement" {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, &errors.Error{
|
||||
Code: errors.EInvalid,
|
||||
Msg: "invalid request; error partitioning predicate",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if dr.Org, err = queryOrganization(ctx, r, orgSvc); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if dr.Bucket, err = queryBucket(ctx, dr.Org.ID, r, bucketSvc); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return dr, nil
|
||||
return dr, measurementExpr, nil
|
||||
}
|
||||
|
||||
type deleteRequest struct {
|
||||
|
|
|
@ -72,7 +72,7 @@ func TestDelete(t *testing.T) {
|
|||
contentType: "application/json; charset=utf-8",
|
||||
body: `{
|
||||
"code": "invalid",
|
||||
"message": "invalid request; error parsing request json: invalid RFC3339Nano for field start, please format your time with RFC3339Nano format, example: 2009-01-02T23:00:00Z"
|
||||
"message": "error decoding json body: invalid RFC3339Nano for field start, please format your time with RFC3339Nano format, example: 2009-01-02T23:00:00Z"
|
||||
}`,
|
||||
},
|
||||
},
|
||||
|
@ -89,7 +89,7 @@ func TestDelete(t *testing.T) {
|
|||
contentType: "application/json; charset=utf-8",
|
||||
body: `{
|
||||
"code": "invalid",
|
||||
"message": "invalid request; error parsing request json: invalid RFC3339Nano for field stop, please format your time with RFC3339Nano format, example: 2009-01-01T23:00:00Z"
|
||||
"message": "error decoding json body: invalid RFC3339Nano for field stop, please format your time with RFC3339Nano format, example: 2009-01-01T23:00:00Z"
|
||||
}`,
|
||||
},
|
||||
},
|
||||
|
@ -106,7 +106,7 @@ func TestDelete(t *testing.T) {
|
|||
contentType: "application/json; charset=utf-8",
|
||||
body: fmt.Sprintf(`{
|
||||
"code": "invalid",
|
||||
"message": "invalid request; error parsing request json: %s"
|
||||
"message": "error decoding json body: %s"
|
||||
}`, msgStartTooSoon),
|
||||
},
|
||||
},
|
||||
|
@ -123,7 +123,7 @@ func TestDelete(t *testing.T) {
|
|||
contentType: "application/json; charset=utf-8",
|
||||
body: fmt.Sprintf(`{
|
||||
"code": "invalid",
|
||||
"message": "invalid request; error parsing request json: %s"
|
||||
"message": "error decoding json body: %s"
|
||||
}`, msgStopTooLate),
|
||||
},
|
||||
},
|
||||
|
@ -321,7 +321,61 @@ func TestDelete(t *testing.T) {
|
|||
statusCode: http.StatusBadRequest,
|
||||
body: `{
|
||||
"code": "invalid",
|
||||
"message": "invalid request; error parsing request json: the logical operator OR is not supported yet at position 25"
|
||||
"message": "error decoding json body: the logical operator OR is not supported yet at position 25"
|
||||
}`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unsupported delete measurements",
|
||||
args: args{
|
||||
queryParams: map[string][]string{
|
||||
"org": {"org1"},
|
||||
"bucket": {"buck1"},
|
||||
},
|
||||
body: []byte(`{
|
||||
"start":"2009-01-01T23:00:00Z",
|
||||
"stop":"2019-11-10T01:00:00Z",
|
||||
"predicate": "_measurement=\"cpu\" or _measurement=\"mem\""
|
||||
}`),
|
||||
authorizer: &influxdb.Authorization{
|
||||
UserID: user1ID,
|
||||
Status: influxdb.Active,
|
||||
Permissions: []influxdb.Permission{
|
||||
{
|
||||
Action: influxdb.WriteAction,
|
||||
Resource: influxdb.Resource{
|
||||
Type: influxdb.BucketsResourceType,
|
||||
ID: influxtesting.IDPtr(platform.ID(2)),
|
||||
OrgID: influxtesting.IDPtr(platform.ID(1)),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
fields: fields{
|
||||
DeleteService: mock.NewDeleteService(),
|
||||
BucketService: &mock.BucketService{
|
||||
FindBucketFn: func(ctx context.Context, f influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
||||
return &influxdb.Bucket{
|
||||
ID: platform.ID(2),
|
||||
Name: "bucket1",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
OrganizationService: &mock.OrganizationService{
|
||||
FindOrganizationF: func(ctx context.Context, f influxdb.OrganizationFilter) (*influxdb.Organization, error) {
|
||||
return &influxdb.Organization{
|
||||
ID: platform.ID(1),
|
||||
Name: "org1",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
wants: wants{
|
||||
statusCode: http.StatusBadRequest,
|
||||
body: `{
|
||||
"code": "invalid",
|
||||
"message": "error decoding json body: the logical operator OR is not supported yet at position 19"
|
||||
}`,
|
||||
},
|
||||
},
|
||||
|
@ -335,7 +389,7 @@ func TestDelete(t *testing.T) {
|
|||
body: []byte(`{
|
||||
"start":"2009-01-01T23:00:00Z",
|
||||
"stop":"2019-11-10T01:00:00Z",
|
||||
"predicate": "tag1=\"v1\" and (tag2=\"v2\" and tag3=\"v3\")"
|
||||
"predicate": "_measurement=\"testing\" and tag1=\"v1\" and (tag2=\"v2\" and tag3=\"v3\")"
|
||||
}`),
|
||||
authorizer: &influxdb.Authorization{
|
||||
UserID: user1ID,
|
||||
|
|
|
@ -3,6 +3,8 @@ package mock
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
)
|
||||
|
@ -11,20 +13,20 @@ var _ influxdb.DeleteService = &DeleteService{}
|
|||
|
||||
// DeleteService is a mock delete server.
|
||||
type DeleteService struct {
|
||||
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error
|
||||
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error
|
||||
}
|
||||
|
||||
// NewDeleteService returns a mock DeleteService where its methods will return
|
||||
// zero values.
|
||||
func NewDeleteService() DeleteService {
|
||||
return DeleteService{
|
||||
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
|
||||
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred)
|
||||
// DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
|
||||
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
|
||||
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, measurement)
|
||||
}
|
||||
|
|
|
@ -327,7 +327,7 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID platform.ID)
|
|||
|
||||
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
|
||||
// deleted must be in [min, max], and the key must match the predicate if provided.
|
||||
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
|
||||
span, _ := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -336,7 +336,7 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
|
|||
if e.closing == nil {
|
||||
return ErrEngineClosed
|
||||
}
|
||||
return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred)
|
||||
return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred, measurement)
|
||||
}
|
||||
|
||||
// RLockKVStore locks the KV store as well as the engine in preparation for doing a backup.
|
||||
|
|
|
@ -15,6 +15,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
errors3 "github.com/influxdata/influxdb/v2/pkg/errors"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
|
@ -1335,7 +1337,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
|
|||
|
||||
// DeleteSeries loops through the local shards and deletes the series data for
|
||||
// the passed in series keys.
|
||||
func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, min, max int64, pred influxdb.Predicate) error {
|
||||
func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
|
||||
s.mu.RLock()
|
||||
if s.databases[database].hasMultipleIndexTypes() {
|
||||
s.mu.RUnlock()
|
||||
|
@ -1355,7 +1357,7 @@ func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string,
|
|||
// of series keys can be very memory intensive if run concurrently.
|
||||
limit := limiter.NewFixed(1)
|
||||
|
||||
return s.walkShards(shards, func(sh *Shard) error {
|
||||
return s.walkShards(shards, func(sh *Shard) (err error) {
|
||||
if err := limit.Take(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1372,12 +1374,43 @@ func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string,
|
|||
return err
|
||||
}
|
||||
|
||||
measurementName := make([]byte, 0)
|
||||
|
||||
if measurement != nil {
|
||||
if m, ok := measurement.(*influxql.BinaryExpr); ok {
|
||||
rhs, ok := m.RHS.(*influxql.VarRef)
|
||||
if ok {
|
||||
measurementName = []byte(rhs.Val)
|
||||
exists, err := sh.MeasurementExists(measurementName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Find matching series keys for each measurement.
|
||||
mitr, err := index.MeasurementIterator()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mitr.Close()
|
||||
defer errors3.Capture(&err, mitr.Close)()
|
||||
|
||||
deleteSeries := func(mm []byte) error {
|
||||
sitr, err := index.MeasurementSeriesIDIterator(mm)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr == nil {
|
||||
return nil
|
||||
}
|
||||
defer errors3.Capture(&err, sitr.Close)()
|
||||
|
||||
itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred))
|
||||
return sh.DeleteSeriesRange(ctx, itr, min, max)
|
||||
}
|
||||
|
||||
for {
|
||||
mm, err := mitr.Next()
|
||||
|
@ -1387,19 +1420,14 @@ func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string,
|
|||
break
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
sitr, err := index.MeasurementSeriesIDIterator(mm)
|
||||
// If we are deleting within a measurement and have found a match, we can return after the delete.
|
||||
if measurementName != nil && bytes.Equal(mm, measurementName) {
|
||||
return deleteSeries(mm)
|
||||
} else {
|
||||
err := deleteSeries(mm)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr == nil {
|
||||
return nil
|
||||
}
|
||||
defer sitr.Close()
|
||||
|
||||
itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred))
|
||||
return sh.DeleteSeriesRange(ctx, itr, min, max)
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/influxdata/influxdb/v2/predicate"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -2125,6 +2126,58 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStore_DeleteByPredicate(t *testing.T) {
|
||||
test := func(t *testing.T, index string) error {
|
||||
s := MustOpenStore(t, index)
|
||||
defer s.Close()
|
||||
|
||||
s.MustCreateShardWithData("db0", "rp0", 0,
|
||||
`cpu,host=serverA value=1 0`,
|
||||
`cpu,region=west value=3 20`,
|
||||
`cpu,secret=foo value=5 30`,
|
||||
`mem,secret=foo value=1 30`,
|
||||
`disk value=4 30`,
|
||||
)
|
||||
|
||||
p, err := predicate.Parse(`_measurement="cpu"`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pred, err := predicate.New(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
expr, err := influxql.ParseExpr(`_measurement="cpu"`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.DeleteSeriesWithPredicate(context.Background(), "db0", math.MinInt, math.MaxInt, pred, expr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
names, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
require.Equal(t, 2, len(names), "expected cpu to be deleted, leaving 2 measurements")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) {
|
||||
if err := test(t, index); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
store := NewStore(b, index)
|
||||
|
|
Loading…
Reference in New Issue