refactor: have retention use DeleteBucketRange
parent
aa12144fc7
commit
7f54e816e3
|
@ -286,22 +286,11 @@ func (e *Engine) reloadWALFile(file string) error {
|
|||
}
|
||||
|
||||
case *wal.DeleteRangeWALEntry:
|
||||
err := e.engine.DeleteSeriesRangeWithPredicate(newFixedSeriesIterator(t.Keys),
|
||||
func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
return t.Min, t.Max, true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO(jeff): implement?
|
||||
|
||||
case *wal.DeleteWALEntry:
|
||||
err := e.engine.DeleteSeriesRangeWithPredicate(newFixedSeriesIterator(t.Keys),
|
||||
func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
return math.MinInt64, math.MaxInt64, true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO(jeff): implement?
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -524,6 +513,11 @@ func (e *Engine) CommitSegments(segs []string, fn func() error) error {
|
|||
|
||||
// DeleteBucket deletes an entire bucket from the storage engine.
|
||||
func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
|
||||
return e.DeleteBucketRange(orgID, bucketID, math.MinInt64, math.MaxInt64)
|
||||
}
|
||||
|
||||
// DeleteBucketRange deletes an entire bucket from the storage engine.
|
||||
func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
if e.closing == nil {
|
||||
|
@ -537,23 +531,7 @@ func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
|
|||
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||
name := models.EscapeMeasurement(encoded[:])
|
||||
|
||||
return e.engine.DeleteBucket(name, math.MinInt64, math.MaxInt64)
|
||||
}
|
||||
|
||||
// DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns
|
||||
// true for that series.
|
||||
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
if e.closing == nil {
|
||||
return ErrEngineClosed
|
||||
}
|
||||
|
||||
// TODO(jeff): this can't exist because we can't WAL a predicate. We'd have to run the
|
||||
// iterator and predicate to completion, store the results in the WAL, and then run it
|
||||
// again.
|
||||
|
||||
return e.engine.DeleteSeriesRangeWithPredicate(itr, fn)
|
||||
return e.engine.DeleteBucketRange(name, min, max)
|
||||
}
|
||||
|
||||
// SeriesCardinality returns the number of series in the engine.
|
||||
|
|
|
@ -4,15 +4,10 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -24,8 +19,7 @@ const (
|
|||
|
||||
// A Deleter implementation is capable of deleting data from a storage engine.
|
||||
type Deleter interface {
|
||||
CreateSeriesCursor(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error)
|
||||
DeleteSeriesRangeWithPredicate(tsdb.SeriesIterator, func([]byte, models.Tags) (int64, int64, bool)) error
|
||||
DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error
|
||||
}
|
||||
|
||||
// A BucketFinder is responsible for providing access to buckets via a filter.
|
||||
|
@ -78,9 +72,9 @@ func (s *retentionEnforcer) run() {
|
|||
log, logEnd := logger.NewOperation(s.logger, "Data retention check", "data_retention_check")
|
||||
defer logEnd()
|
||||
|
||||
rpByBucketID, err := s.getRetentionPeriodPerBucket()
|
||||
buckets, err := s.getBucketInformation()
|
||||
if err != nil {
|
||||
log.Error("Unable to determine bucket:RP mapping", zap.Error(err))
|
||||
log.Error("Unable to determine bucket information", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -88,7 +82,7 @@ func (s *retentionEnforcer) run() {
|
|||
labels := s.metrics.Labels()
|
||||
labels["status"] = "ok"
|
||||
|
||||
if err := s.expireData(rpByBucketID, now); err != nil {
|
||||
if err := s.expireData(buckets, now); err != nil {
|
||||
log.Error("Deletion not successful", zap.Error(err))
|
||||
labels["status"] = "error"
|
||||
}
|
||||
|
@ -98,149 +92,37 @@ func (s *retentionEnforcer) run() {
|
|||
|
||||
// expireData runs a delete operation on the storage engine.
|
||||
//
|
||||
// Any series data that (1) belongs to a bucket in the provided map and
|
||||
// Any series data that (1) belongs to a bucket in the provided list and
|
||||
// (2) falls outside the bucket's indicated retention period will be deleted.
|
||||
func (s *retentionEnforcer) expireData(rpByBucketID map[platform.ID]time.Duration, now time.Time) error {
|
||||
func (s *retentionEnforcer) expireData(buckets []*platform.Bucket, now time.Time) error {
|
||||
_, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion")
|
||||
defer logEnd()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), engineAPITimeout)
|
||||
defer cancel()
|
||||
cur, err := s.Engine.CreateSeriesCursor(ctx, SeriesCursorRequest{}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cur.Close()
|
||||
|
||||
var mu sync.Mutex
|
||||
badMSketch := make(map[string]struct{}) // Badly formatted measurements.
|
||||
missingBSketch := make(map[platform.ID]struct{}) // Missing buckets.
|
||||
|
||||
var seriesDeleted uint64 // Number of series where a delete is attempted.
|
||||
var seriesSkipped uint64 // Number of series that were skipped from delete.
|
||||
|
||||
fn := func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
if len(name) != platform.IDLength {
|
||||
mu.Lock()
|
||||
badMSketch[string(name)] = struct{}{}
|
||||
mu.Unlock()
|
||||
atomic.AddUint64(&seriesSkipped, 1)
|
||||
return 0, 0, false
|
||||
|
||||
for _, b := range buckets {
|
||||
if b.RetentionPeriod == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var n [16]byte
|
||||
copy(n[:], name)
|
||||
_, bucketID := tsdb.DecodeName(n)
|
||||
|
||||
retentionPeriod, ok := rpByBucketID[bucketID]
|
||||
if !ok {
|
||||
mu.Lock()
|
||||
missingBSketch[bucketID] = struct{}{}
|
||||
mu.Unlock()
|
||||
atomic.AddUint64(&seriesSkipped, 1)
|
||||
return 0, 0, false
|
||||
max := now.Add(-b.RetentionPeriod).UnixNano()
|
||||
err := s.Engine.DeleteBucketRange(b.OrganizationID, b.ID, math.MinInt64, max)
|
||||
if err != nil {
|
||||
// TODO(jeff): metrics?
|
||||
}
|
||||
if retentionPeriod == 0 {
|
||||
return 0, 0, false
|
||||
}
|
||||
|
||||
atomic.AddUint64(&seriesDeleted, 1)
|
||||
to := now.Add(-retentionPeriod).UnixNano()
|
||||
return math.MinInt64, to, true
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if s.metrics == nil {
|
||||
return
|
||||
}
|
||||
labels := s.metrics.Labels()
|
||||
labels["status"] = "bad_measurement"
|
||||
s.metrics.Unprocessable.With(labels).Add(float64(len(badMSketch)))
|
||||
|
||||
labels["status"] = "missing_bucket"
|
||||
s.metrics.Unprocessable.With(labels).Add(float64(len(missingBSketch)))
|
||||
|
||||
labels["status"] = "ok"
|
||||
s.metrics.Series.With(labels).Add(float64(atomic.LoadUint64(&seriesDeleted)))
|
||||
|
||||
labels["status"] = "skipped"
|
||||
s.metrics.Series.With(labels).Add(float64(atomic.LoadUint64(&seriesSkipped)))
|
||||
}()
|
||||
|
||||
return s.Engine.DeleteSeriesRangeWithPredicate(newSeriesIteratorAdapter(cur), fn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getRetentionPeriodPerBucket returns a map of (bucket ID -> retention period)
|
||||
// for all buckets.
|
||||
func (s *retentionEnforcer) getRetentionPeriodPerBucket() (map[platform.ID]time.Duration, error) {
|
||||
// getBucketInformation returns a slice of buckets to run retention on.
|
||||
func (s *retentionEnforcer) getBucketInformation() ([]*platform.Bucket, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout)
|
||||
defer cancel()
|
||||
|
||||
buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpByBucketID := make(map[platform.ID]time.Duration, len(buckets))
|
||||
for _, bucket := range buckets {
|
||||
rpByBucketID[bucket.ID] = bucket.RetentionPeriod
|
||||
}
|
||||
return rpByBucketID, nil
|
||||
return buckets, err
|
||||
}
|
||||
|
||||
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
|
||||
func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector {
|
||||
return s.metrics.PrometheusCollectors()
|
||||
}
|
||||
|
||||
type seriesIteratorAdapter struct {
|
||||
itr SeriesCursor
|
||||
ea seriesElemAdapter
|
||||
elem tsdb.SeriesElem
|
||||
}
|
||||
|
||||
func newSeriesIteratorAdapter(itr SeriesCursor) *seriesIteratorAdapter {
|
||||
si := &seriesIteratorAdapter{itr: itr}
|
||||
si.elem = &si.ea
|
||||
return si
|
||||
}
|
||||
|
||||
// Next returns the next tsdb.SeriesElem.
|
||||
//
|
||||
// The returned tsdb.SeriesElem is valid for use until Next is called again.
|
||||
func (s *seriesIteratorAdapter) Next() (tsdb.SeriesElem, error) {
|
||||
if s.itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
row, err := s.itr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if row == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s.ea.name = row.Name
|
||||
s.ea.tags = row.Tags
|
||||
return s.elem, nil
|
||||
}
|
||||
|
||||
func (s *seriesIteratorAdapter) Close() error {
|
||||
if s.itr != nil {
|
||||
err := s.itr.Close()
|
||||
s.itr = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type seriesElemAdapter struct {
|
||||
name []byte
|
||||
tags models.Tags
|
||||
}
|
||||
|
||||
func (e *seriesElemAdapter) Name() []byte { return e.name }
|
||||
func (e *seriesElemAdapter) Tags() models.Tags { return e.tags }
|
||||
func (e *seriesElemAdapter) Deleted() bool { return false }
|
||||
func (e *seriesElemAdapter) Expr() influxql.Expr { return nil }
|
||||
|
|
|
@ -2,7 +2,6 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
|
@ -10,103 +9,80 @@ import (
|
|||
"time"
|
||||
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
func TestService_expireData(t *testing.T) {
|
||||
func TestRetentionService(t *testing.T) {
|
||||
engine := NewTestEngine()
|
||||
service := newRetentionEnforcer(engine, NewTestBucketFinder())
|
||||
now := time.Date(2018, 4, 10, 23, 12, 33, 0, time.UTC)
|
||||
|
||||
t.Run("no rpByBucketID", func(t *testing.T) {
|
||||
t.Run("no buckets", func(t *testing.T) {
|
||||
if err := service.expireData(nil, now); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := service.expireData(map[platform.ID]time.Duration{}, now); err != nil {
|
||||
if err := service.expireData([]*platform.Bucket{}, now); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Generate some measurement names
|
||||
var names [][]byte
|
||||
rpByBucketID := map[platform.ID]time.Duration{}
|
||||
expMatchedFrequencies := map[string]int{} // To be used for verifying test results.
|
||||
expRejectedFrequencies := map[string]int{} // To be used for verifying test results.
|
||||
// Generate some buckets to expire
|
||||
buckets := []*platform.Bucket{}
|
||||
expMatched := map[string]struct{}{} // To be used for verifying test results.
|
||||
expRejected := map[string]struct{}{} // To be used for verifying test results.
|
||||
for i := 0; i < 15; i++ {
|
||||
repeat := rand.Intn(10) + 1 // [1, 10]
|
||||
name := genMeasurementName()
|
||||
for j := 0; j < repeat; j++ {
|
||||
names = append(names, name)
|
||||
}
|
||||
|
||||
var n [16]byte
|
||||
copy(n[:], name)
|
||||
_, bucketID := tsdb.DecodeName(n)
|
||||
orgID, bucketID := tsdb.DecodeName(n)
|
||||
|
||||
// Put 1/3rd in the rpByBucketID into the set to delete and 1/3rd into the set
|
||||
// to not delete because no rp, and 1/3rd into the set to not delete because 0 rp.
|
||||
if i%3 == 0 {
|
||||
rpByBucketID[bucketID] = 3 * time.Hour
|
||||
expMatchedFrequencies[string(name)] = repeat
|
||||
buckets = append(buckets, &platform.Bucket{
|
||||
OrganizationID: orgID,
|
||||
ID: bucketID,
|
||||
RetentionPeriod: 3 * time.Hour,
|
||||
})
|
||||
expMatched[string(name)] = struct{}{}
|
||||
} else if i%3 == 1 {
|
||||
expRejectedFrequencies[string(name)] = repeat
|
||||
expRejected[string(name)] = struct{}{}
|
||||
} else if i%3 == 2 {
|
||||
rpByBucketID[bucketID] = 0
|
||||
expRejectedFrequencies[string(name)] = repeat
|
||||
buckets = append(buckets, &platform.Bucket{
|
||||
OrganizationID: orgID,
|
||||
ID: bucketID,
|
||||
RetentionPeriod: 0,
|
||||
})
|
||||
expRejected[string(name)] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a badly formatted measurement.
|
||||
for i := 0; i < 5; i++ {
|
||||
names = append(names, []byte("zyzwrong"))
|
||||
}
|
||||
expRejectedFrequencies["zyzwrong"] = 5
|
||||
|
||||
gotMatchedFrequencies := map[string]int{}
|
||||
gotRejectedFrequencies := map[string]int{}
|
||||
engine.DeleteSeriesRangeWithPredicateFn = func(_ tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error {
|
||||
|
||||
// Iterate over the generated names updating the frequencies by which
|
||||
// the predicate function in expireData matches or rejects them.
|
||||
for _, name := range names {
|
||||
from, to, shouldDelete := fn(name, nil)
|
||||
if shouldDelete {
|
||||
gotMatchedFrequencies[string(name)]++
|
||||
if from != math.MinInt64 {
|
||||
return fmt.Errorf("got from %d, expected %d", from, math.MinInt64)
|
||||
}
|
||||
wantTo := now.Add(-3 * time.Hour).UnixNano()
|
||||
if to != wantTo {
|
||||
return fmt.Errorf("got to %d, expected %d", to, wantTo)
|
||||
}
|
||||
} else {
|
||||
gotRejectedFrequencies[string(name)]++
|
||||
}
|
||||
gotMatched := map[string]struct{}{}
|
||||
engine.DeleteBucketRangeFn = func(orgID, bucketID platform.ID, from, to int64) error {
|
||||
if from != math.MinInt64 {
|
||||
t.Fatalf("got from %d, expected %d", from, math.MinInt64)
|
||||
}
|
||||
wantTo := now.Add(-3 * time.Hour).UnixNano()
|
||||
if to != wantTo {
|
||||
t.Fatalf("got to %d, expected %d", to, wantTo)
|
||||
}
|
||||
|
||||
name := tsdb.EncodeName(orgID, bucketID)
|
||||
if _, ok := expRejected[string(name[:])]; ok {
|
||||
t.Fatalf("got a delete for %x", name)
|
||||
}
|
||||
gotMatched[string(name[:])] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
t.Run("multiple bucket", func(t *testing.T) {
|
||||
if err := service.expireData(rpByBucketID, now); err != nil {
|
||||
t.Run("multiple buckets", func(t *testing.T) {
|
||||
if err := service.expireData(buckets, now); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Verify that the correct series were marked to be deleted.
|
||||
t.Run("matched", func(t *testing.T) {
|
||||
if !reflect.DeepEqual(gotMatchedFrequencies, expMatchedFrequencies) {
|
||||
t.Fatalf("got\n%#v\nexpected\n%#v", gotMatchedFrequencies, expMatchedFrequencies)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("rejected", func(t *testing.T) {
|
||||
// Verify that badly formatted measurements were rejected.
|
||||
if !reflect.DeepEqual(gotRejectedFrequencies, expRejectedFrequencies) {
|
||||
t.Fatalf("got\n%#v\nexpected\n%#v", gotRejectedFrequencies, expRejectedFrequencies)
|
||||
}
|
||||
})
|
||||
if !reflect.DeepEqual(gotMatched, expMatched) {
|
||||
t.Fatalf("got\n%#v\nexpected\n%#v", gotMatched, expMatched)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -120,40 +96,18 @@ func genMeasurementName() []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
type TestSeriesCursor struct {
|
||||
CloseFn func() error
|
||||
NextFn func() (*SeriesCursorRow, error)
|
||||
}
|
||||
|
||||
func (f *TestSeriesCursor) Close() error { return f.CloseFn() }
|
||||
func (f *TestSeriesCursor) Next() (*SeriesCursorRow, error) { return f.NextFn() }
|
||||
|
||||
type TestEngine struct {
|
||||
CreateSeriesCursorFn func(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error)
|
||||
DeleteSeriesRangeWithPredicateFn func(tsdb.SeriesIterator, func([]byte, models.Tags) (int64, int64, bool)) error
|
||||
|
||||
SeriesCursor *TestSeriesCursor
|
||||
DeleteBucketRangeFn func(platform.ID, platform.ID, int64, int64) error
|
||||
}
|
||||
|
||||
func NewTestEngine() *TestEngine {
|
||||
cursor := &TestSeriesCursor{
|
||||
CloseFn: func() error { return nil },
|
||||
NextFn: func() (*SeriesCursorRow, error) { return nil, nil },
|
||||
}
|
||||
|
||||
return &TestEngine{
|
||||
SeriesCursor: cursor,
|
||||
CreateSeriesCursorFn: func(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error) { return cursor, nil },
|
||||
DeleteSeriesRangeWithPredicateFn: func(tsdb.SeriesIterator, func([]byte, models.Tags) (int64, int64, bool)) error { return nil },
|
||||
DeleteBucketRangeFn: func(platform.ID, platform.ID, int64, int64) error { return nil },
|
||||
}
|
||||
}
|
||||
|
||||
func (e *TestEngine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) {
|
||||
return e.CreateSeriesCursorFn(ctx, req, cond)
|
||||
}
|
||||
|
||||
func (e *TestEngine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error {
|
||||
return e.DeleteSeriesRangeWithPredicateFn(itr, fn)
|
||||
func (e *TestEngine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error {
|
||||
return e.DeleteBucketRangeFn(orgID, bucketID, min, max)
|
||||
}
|
||||
|
||||
type TestBucketFinder struct {
|
||||
|
|
|
@ -4,11 +4,8 @@ package tsm1 // import "github.com/influxdata/influxdb/tsdb/tsm1"
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
|
@ -20,7 +17,6 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/bytesutil"
|
||||
"github.com/influxdata/influxdb/pkg/limiter"
|
||||
"github.com/influxdata/influxdb/pkg/metrics"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
|
@ -594,361 +590,11 @@ func (e *Engine) WriteValues(values map[string][]Value) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeleteSeriesRange removes the values between min and max (inclusive) from all series
|
||||
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error {
|
||||
return e.DeleteSeriesRangeWithPredicate(itr, func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
return min, max, true
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series
|
||||
// for which predicate() returns true. If predicate() is nil, then all values in range are removed.
|
||||
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error {
|
||||
var disableOnce bool
|
||||
|
||||
// Ensure that the index does not compact away the measurement or series we're
|
||||
// going to delete before we're done with them.
|
||||
e.index.DisableCompactions()
|
||||
defer e.index.EnableCompactions()
|
||||
e.index.Wait()
|
||||
|
||||
fs, err := e.index.RetainFileSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fs.Release()
|
||||
|
||||
var (
|
||||
sz int
|
||||
min, max int64 = math.MinInt64, math.MaxInt64
|
||||
|
||||
// Indicator that the min/max time for the current batch has changed and
|
||||
// we need to flush the current batch before appending to it.
|
||||
flushBatch bool
|
||||
)
|
||||
|
||||
// These are reversed from min/max to ensure they are different the first time through.
|
||||
newMin, newMax := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
|
||||
// There is no predicate, so setup newMin/newMax to delete the full time range.
|
||||
if predicate == nil {
|
||||
newMin = min
|
||||
newMax = max
|
||||
}
|
||||
|
||||
batch := make([][]byte, 0, 10000)
|
||||
for {
|
||||
elem, err := itr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if elem == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// See if the series should be deleted and if so, what range of time.
|
||||
if predicate != nil {
|
||||
var shouldDelete bool
|
||||
newMin, newMax, shouldDelete = predicate(elem.Name(), elem.Tags())
|
||||
if !shouldDelete {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the min/max happens to change for the batch, we need to flush
|
||||
// the current batch and start a new one.
|
||||
flushBatch = (min != newMin || max != newMax) && len(batch) > 0
|
||||
}
|
||||
|
||||
if elem.Expr() != nil {
|
||||
if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
|
||||
return errors.New("fields not supported in WHERE clause during deletion")
|
||||
}
|
||||
}
|
||||
|
||||
if !disableOnce {
|
||||
// Disable and abort running compactions so that tombstones added existing tsm
|
||||
// files don't get removed. This would cause deleted measurements/series to
|
||||
// re-appear once the compaction completed. We only disable the level compactions
|
||||
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
|
||||
// and writing tombstones takes a long time, writes can get rejected due to the cache
|
||||
// filling up.
|
||||
e.disableLevelCompactions(true)
|
||||
defer e.enableLevelCompactions(true)
|
||||
|
||||
e.sfile.DisableCompactions()
|
||||
defer e.sfile.EnableCompactions()
|
||||
e.sfile.Wait()
|
||||
|
||||
disableOnce = true
|
||||
}
|
||||
|
||||
if sz >= deleteFlushThreshold || flushBatch {
|
||||
// Delete all matching batch.
|
||||
if err := e.deleteSeriesRange(batch, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
batch = batch[:0]
|
||||
sz = 0
|
||||
flushBatch = false
|
||||
}
|
||||
|
||||
// Use the new min/max time for the next iteration
|
||||
min = newMin
|
||||
max = newMax
|
||||
|
||||
key := models.MakeKey(elem.Name(), elem.Tags())
|
||||
sz += len(key)
|
||||
batch = append(batch, key)
|
||||
}
|
||||
|
||||
if len(batch) > 0 {
|
||||
// Delete all matching batch.
|
||||
if err := e.deleteSeriesRange(batch, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
e.index.Rebuild()
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteSeriesRange removes the values between min and max (inclusive) from all series. This
|
||||
// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange
|
||||
// and not directly.
|
||||
func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
||||
if len(seriesKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ensure keys are sorted since lower layers require them to be.
|
||||
if !bytesutil.IsSorted(seriesKeys) {
|
||||
bytesutil.Sort(seriesKeys)
|
||||
}
|
||||
|
||||
// Min and max time in the engine are slightly different from the query language values.
|
||||
if min == influxql.MinTime {
|
||||
min = math.MinInt64
|
||||
}
|
||||
if max == influxql.MaxTime {
|
||||
max = math.MaxInt64
|
||||
}
|
||||
|
||||
// Run the delete on each TSM file in parallel
|
||||
if err := e.FileStore.Apply(func(r TSMFile) error {
|
||||
// See if this TSM file contains the keys and time range
|
||||
minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1]
|
||||
tsmMin, tsmMax := r.KeyRange()
|
||||
|
||||
tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin)
|
||||
tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax)
|
||||
|
||||
overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0
|
||||
if !overlaps || !r.OverlapsTimeRange(min, max) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete each key we find in the file. We seek to the min key and walk from there.
|
||||
batch := r.BatchDelete()
|
||||
iter := r.Iterator(minKey)
|
||||
var j int
|
||||
for iter.Next() {
|
||||
indexKey := iter.Key()
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
|
||||
|
||||
for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 {
|
||||
j++
|
||||
}
|
||||
|
||||
if j >= len(seriesKeys) {
|
||||
break
|
||||
}
|
||||
if bytes.Equal(seriesKeys[j], seriesKey) {
|
||||
if err := batch.DeleteRange([][]byte{indexKey}, min, max); err != nil {
|
||||
batch.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
batch.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
return batch.Commit()
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// find the keys in the cache and remove them
|
||||
deleteKeys := make([][]byte, 0, len(seriesKeys))
|
||||
|
||||
// ApplySerialEntryFn cannot return an error in this invocation.
|
||||
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
|
||||
|
||||
// Cache does not walk keys in sorted order, so search the sorted
|
||||
// series we need to delete to see if any of the cache keys match.
|
||||
i := bytesutil.SearchBytes(seriesKeys, seriesKey)
|
||||
if i < len(seriesKeys) && bytes.Equal(seriesKey, seriesKeys[i]) {
|
||||
// k is the measurement + tags + sep + field
|
||||
deleteKeys = append(deleteKeys, k)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Sort the series keys because ApplyEntryFn iterates over the keys randomly.
|
||||
bytesutil.Sort(deleteKeys)
|
||||
|
||||
e.Cache.DeleteRange(deleteKeys, min, max)
|
||||
|
||||
// The series are deleted on disk, but the index may still say they exist.
|
||||
// Depending on the the min,max time passed in, the series may or not actually
|
||||
// exists now. To reconcile the index, we walk the series keys that still exists
|
||||
// on disk and cross out any keys that match the passed in series. Any series
|
||||
// left in the slice at the end do not exist and can be deleted from the index.
|
||||
// Note: this is inherently racy if writes are occurring to the same measurement/series are
|
||||
// being removed. A write could occur and exist in the cache at this point, but we
|
||||
// would delete it from the index.
|
||||
minKey := seriesKeys[0]
|
||||
|
||||
// Apply runs this func concurrently. The seriesKeys slice is mutated concurrently
|
||||
// by different goroutines setting positions to nil.
|
||||
if err := e.FileStore.Apply(func(r TSMFile) error {
|
||||
var j int
|
||||
|
||||
// Start from the min deleted key that exists in this file.
|
||||
iter := r.Iterator(minKey)
|
||||
for iter.Next() {
|
||||
if j >= len(seriesKeys) {
|
||||
return nil
|
||||
}
|
||||
|
||||
indexKey := iter.Key()
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
|
||||
|
||||
// Skip over any deleted keys that are less than our tsm key
|
||||
cmp := bytes.Compare(seriesKeys[j], seriesKey)
|
||||
for j < len(seriesKeys) && cmp < 0 {
|
||||
j++
|
||||
if j >= len(seriesKeys) {
|
||||
return nil
|
||||
}
|
||||
cmp = bytes.Compare(seriesKeys[j], seriesKey)
|
||||
}
|
||||
|
||||
// We've found a matching key, cross it out so we do not remove it from the index.
|
||||
if j < len(seriesKeys) && cmp == 0 {
|
||||
seriesKeys[j] = emptyBytes
|
||||
j++
|
||||
}
|
||||
}
|
||||
|
||||
return iter.Err()
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Have we deleted all values for the series? If so, we need to remove
|
||||
// the series from the index.
|
||||
if len(seriesKeys) > 0 {
|
||||
buf := make([]byte, 1024) // For use when accessing series file.
|
||||
ids := tsdb.NewSeriesIDSet()
|
||||
measurements := make(map[string]struct{}, 1)
|
||||
|
||||
for _, k := range seriesKeys {
|
||||
if len(k) == 0 {
|
||||
continue // This key was wiped because it shouldn't be removed from index.
|
||||
}
|
||||
|
||||
name, tags := models.ParseKeyBytes(k)
|
||||
sid := e.sfile.SeriesID(name, tags, buf)
|
||||
if sid.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
// See if this series was found in the cache earlier
|
||||
i := bytesutil.SearchBytes(deleteKeys, k)
|
||||
|
||||
var hasCacheValues bool
|
||||
// If there are multiple fields, they will have the same prefix. If any field
|
||||
// has values, then we can't delete it from the index.
|
||||
for i < len(deleteKeys) && bytes.HasPrefix(deleteKeys[i], k) {
|
||||
if e.Cache.Values(deleteKeys[i]).Len() > 0 {
|
||||
hasCacheValues = true
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if hasCacheValues {
|
||||
continue
|
||||
}
|
||||
|
||||
measurements[string(name)] = struct{}{}
|
||||
// Remove the series from the local index.
|
||||
if err := e.index.DropSeries(sid, k, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the id to the set of delete ids.
|
||||
ids.Add(sid)
|
||||
}
|
||||
|
||||
for k := range measurements {
|
||||
if err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the remaining ids from the series file as they no longer exist
|
||||
// in any shard.
|
||||
var err error
|
||||
ids.ForEach(func(id tsdb.SeriesID) {
|
||||
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
|
||||
err = err1
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
func (e *Engine) DeleteMeasurement(name []byte) error {
|
||||
// Delete the bulk of data outside of the fields lock.
|
||||
if err := e.deleteMeasurement(name); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
func (e *Engine) deleteMeasurement(name []byte) error {
|
||||
// Attempt to find the series keys.
|
||||
itr, err := e.index.MeasurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if itr == nil {
|
||||
return nil
|
||||
}
|
||||
defer itr.Close()
|
||||
return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64)
|
||||
}
|
||||
|
||||
// ForEachMeasurementName iterates over each measurement name in the engine.
|
||||
func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error {
|
||||
return e.index.ForEachMeasurementName(fn)
|
||||
}
|
||||
|
||||
func (e *Engine) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) error {
|
||||
return e.index.CreateSeriesListIfNotExists(collection)
|
||||
}
|
||||
|
||||
// WriteTo is not implemented.
|
||||
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
|
||||
|
||||
// compactionLevel describes a snapshot or levelled compaction.
|
||||
type compactionLevel int
|
||||
|
||||
|
|
|
@ -11,10 +11,10 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
// DeleteBucket removes all TSM data belonging to a bucket, and removes all index
|
||||
// DeleteBucketRange removes all TSM data belonging to a bucket, and removes all index
|
||||
// and series file data associated with the bucket. The provided time range ensures
|
||||
// that only bucket data for that range is removed.
|
||||
func (e *Engine) DeleteBucket(name []byte, min, max int64) error {
|
||||
func (e *Engine) DeleteBucketRange(name []byte, min, max int64) error {
|
||||
// TODO(jeff): we need to block writes to this prefix while deletes are in progress
|
||||
// otherwise we can end up in a situation where we have staged data in the cache or
|
||||
// WAL that was deleted from the index, or worse. This needs to happen at a higher
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
func TestEngine_DeletePrefix(t *testing.T) {
|
||||
func TestEngine_DeleteBucket(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=0 value=1.1 6")
|
||||
p2 := MustParsePointString("cpu,host=A value=1.2 2")
|
||||
|
@ -44,7 +44,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if err := e.DeleteBucket([]byte("cpu"), 0, 3); err != nil {
|
||||
if err := e.DeleteBucketRange([]byte("cpu"), 0, 3); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
iter.Close()
|
||||
|
||||
// Deleting remaining series should remove them from the series.
|
||||
if err := e.DeleteBucket([]byte("cpu"), 0, 9); err != nil {
|
||||
if err := e.DeleteBucketRange([]byte("cpu"), 0, 9); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
package tsm1_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
@ -59,8 +58,9 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Drop all the series for the gpu measurement and they should no longer
|
||||
// be in the series ID set.
|
||||
if err := engine.DeleteMeasurement([]byte("gpu")); err != nil {
|
||||
// be in the series ID set. This relies on the fact that DeleteBucketRange is really
|
||||
// operating on prefixes.
|
||||
if err := engine.DeleteBucketRange([]byte("gpu"), math.MinInt64, math.MaxInt64); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -72,17 +72,6 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
|||
delete(seriesIDMap, "gpu")
|
||||
delete(seriesIDMap, "gpu,host=b")
|
||||
|
||||
// Drop the specific mem series
|
||||
ditr := &seriesIterator{keys: [][]byte{[]byte("mem,host=z")}}
|
||||
if err := engine.DeleteSeriesRange(ditr, math.MinInt64, math.MaxInt64); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if engine.SeriesIDSet().Contains(seriesIDMap["mem,host=z"]) {
|
||||
t.Fatalf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["mem,host=z"], "mem,host=z")
|
||||
}
|
||||
delete(seriesIDMap, "mem,host=z")
|
||||
|
||||
// The rest of the keys should still be in the set.
|
||||
for key, id := range seriesIDMap {
|
||||
if !engine.SeriesIDSet().Contains(id) {
|
||||
|
@ -106,530 +95,6 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensures that deleting series from TSM files with multiple fields removes all the
|
||||
/// series
|
||||
func TestEngine_DeleteSeries(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
|
||||
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
|
||||
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
|
||||
|
||||
e, err := NewEngine()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
if err := e.writePoints(p1, p2, p3); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
keys := e.FileStore.Keys()
|
||||
if exp, got := 3, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
|
||||
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
keys = e.FileStore.Keys()
|
||||
if exp, got := 1, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
exp := "cpu,host=B#!~#value"
|
||||
if _, ok := keys[exp]; !ok {
|
||||
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DeleteSeriesRange(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted
|
||||
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000")
|
||||
p3 := MustParsePointString("cpu,host=A value=1.3 3000000000")
|
||||
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted
|
||||
p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted
|
||||
p6 := MustParsePointString("cpu,host=C value=1.3 1000000000")
|
||||
p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted
|
||||
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
|
||||
|
||||
e, err := NewEngine()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
keys := e.FileStore.Keys()
|
||||
if exp, got := 6, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}}
|
||||
if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
keys = e.FileStore.Keys()
|
||||
if exp, got := 4, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
exp := "cpu,host=B#!~#value"
|
||||
if _, ok := keys[exp]; !ok {
|
||||
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
|
||||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
elem, err := iter.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if elem.SeriesID.IsZero() {
|
||||
t.Fatalf("series index mismatch: EOF, exp 2 series")
|
||||
}
|
||||
|
||||
// Lookup series.
|
||||
name, tags := e.sfile.Series(elem.SeriesID)
|
||||
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
|
||||
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
|
||||
}
|
||||
|
||||
if !tags.Equal(models.NewTags(map[string]string{"host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) {
|
||||
t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags)
|
||||
}
|
||||
iter.Close()
|
||||
|
||||
// Deleting remaining series should remove them from the series.
|
||||
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=B")}}
|
||||
if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
if iter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
if elem, err = iter.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !elem.SeriesID.IsZero() {
|
||||
t.Fatalf("got an undeleted series id, but series should be dropped from index")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
|
||||
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
|
||||
p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
|
||||
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
|
||||
p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
|
||||
p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
|
||||
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
|
||||
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
|
||||
|
||||
e, err := NewEngine()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
keys := e.FileStore.Keys()
|
||||
if exp, got := 6, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
|
||||
predicate := func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
if bytes.Equal(name, []byte("mem")) {
|
||||
return math.MinInt64, math.MaxInt64, true
|
||||
}
|
||||
if bytes.Equal(name, []byte("cpu")) {
|
||||
for _, tag := range tags {
|
||||
if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) {
|
||||
return math.MinInt64, math.MaxInt64, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return math.MinInt64, math.MaxInt64, false
|
||||
}
|
||||
if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
keys = e.FileStore.Keys()
|
||||
if exp, got := 3, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"}
|
||||
for _, exp := range exps {
|
||||
if _, ok := keys[exp]; !ok {
|
||||
t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys)
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
elem, err := iter.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if elem.SeriesID.IsZero() {
|
||||
t.Fatalf("series index mismatch: EOF, exp 2 series")
|
||||
}
|
||||
|
||||
// Lookup series.
|
||||
name, tags := e.sfile.Series(elem.SeriesID)
|
||||
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
|
||||
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
|
||||
}
|
||||
|
||||
if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) {
|
||||
t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags)
|
||||
}
|
||||
iter.Close()
|
||||
|
||||
// Deleting remaining series should remove them from the series.
|
||||
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}}
|
||||
if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
if iter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
if elem, err = iter.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !elem.SeriesID.IsZero() {
|
||||
t.Fatalf("got an undeleted series id, but series should be dropped from index")
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that a nil predicate deletes all values returned from the series iterator.
|
||||
func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
|
||||
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
|
||||
p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
|
||||
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
|
||||
p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
|
||||
p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
|
||||
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
|
||||
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
|
||||
|
||||
e, err := NewEngine()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
keys := e.FileStore.Keys()
|
||||
if exp, got := 6, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
|
||||
if err := e.DeleteSeriesRangeWithPredicate(itr, nil); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
keys = e.FileStore.Keys()
|
||||
if exp, got := 1, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
} else if iter == nil {
|
||||
return
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
if elem, err := iter.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !elem.SeriesID.IsZero() {
|
||||
t.Fatalf("got an undeleted series id, but series should be dropped from index")
|
||||
}
|
||||
|
||||
// Check that disk series still exists
|
||||
iter, err = e.index.MeasurementSeriesIDIterator([]byte("disk"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
} else if iter == nil {
|
||||
return
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
if elem, err := iter.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if elem.SeriesID.IsZero() {
|
||||
t.Fatalf("got an undeleted series id, but series should be dropped from index")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
|
||||
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
|
||||
p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
|
||||
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
|
||||
p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
|
||||
p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
|
||||
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
|
||||
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
|
||||
|
||||
e, err := NewEngine()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
keys := e.FileStore.Keys()
|
||||
if exp, got := 6, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
|
||||
predicate := func(name []byte, tags models.Tags) (int64, int64, bool) {
|
||||
if bytes.Equal(name, []byte("mem")) {
|
||||
return 1000000000, 1000000000, true
|
||||
}
|
||||
|
||||
if bytes.Equal(name, []byte("cpu")) {
|
||||
for _, tag := range tags {
|
||||
if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) {
|
||||
return 3000000000, 4000000000, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return math.MinInt64, math.MaxInt64, false
|
||||
}
|
||||
if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
keys = e.FileStore.Keys()
|
||||
if exp, got := 3, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"}
|
||||
for _, exp := range exps {
|
||||
if _, ok := keys[exp]; !ok {
|
||||
t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys)
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
elem, err := iter.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if elem.SeriesID.IsZero() {
|
||||
t.Fatalf("series index mismatch: EOF, exp 2 series")
|
||||
}
|
||||
|
||||
// Lookup series.
|
||||
name, tags := e.sfile.Series(elem.SeriesID)
|
||||
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
|
||||
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
|
||||
}
|
||||
|
||||
if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) {
|
||||
t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags)
|
||||
}
|
||||
iter.Close()
|
||||
|
||||
// Deleting remaining series should remove them from the series.
|
||||
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}}
|
||||
if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
if iter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
if elem, err = iter.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !elem.SeriesID.IsZero() {
|
||||
t.Fatalf("got an undeleted series id, but series should be dropped from index")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted
|
||||
|
||||
e, err := NewEngine()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
if err := e.writePoints(p1); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
keys := e.FileStore.Keys()
|
||||
if exp, got := 1, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
|
||||
if err := e.DeleteSeriesRange(itr, 0, 0); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
keys = e.FileStore.Keys()
|
||||
if exp, got := 1, len(keys); exp != got {
|
||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
exp := "cpu,host=A#!~#value"
|
||||
if _, ok := keys[exp]; !ok {
|
||||
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
|
||||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
elem, err := iter.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if elem.SeriesID.IsZero() {
|
||||
t.Fatalf("series index mismatch: EOF, exp 1 series")
|
||||
}
|
||||
|
||||
// Lookup series.
|
||||
name, tags := e.sfile.Series(elem.SeriesID)
|
||||
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
|
||||
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := tags, models.NewTags(map[string]string{"host": "A"}); !got.Equal(exp) {
|
||||
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_SnapshotsDisabled(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
@ -968,7 +433,7 @@ func (e *Engine) WritePointsString(ptstr ...string) error {
|
|||
func (e *Engine) writePoints(points ...models.Point) error {
|
||||
// Write into the index.
|
||||
collection := tsdb.NewSeriesCollection(points)
|
||||
if err := e.CreateSeriesListIfNotExists(collection); err != nil {
|
||||
if err := e.index.CreateSeriesListIfNotExists(collection); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write the points into the cache/wal.
|
||||
|
|
Loading…
Reference in New Issue