From 7f54e816e3c64e7051600f8328409dca227884e2 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Mon, 21 Jan 2019 13:09:15 -0700 Subject: [PATCH] refactor: have retention use DeleteBucketRange --- storage/engine.go | 40 +- storage/retention.go | 154 +---- storage/retention_test.go | 134 ++--- tsdb/tsm1/engine.go | 354 ------------ ...measurement.go => engine_delete_bucket.go} | 4 +- ...t_test.go => engine_delete_bucket_test.go} | 6 +- tsdb/tsm1/engine_test.go | 543 +----------------- 7 files changed, 80 insertions(+), 1155 deletions(-) rename tsdb/tsm1/{engine_delete_measurement.go => engine_delete_bucket.go} (97%) rename tsdb/tsm1/{engine_delete_measurement_test.go => engine_delete_bucket_test.go} (94%) diff --git a/storage/engine.go b/storage/engine.go index 06e774ec54..5b8b4954c0 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -286,22 +286,11 @@ func (e *Engine) reloadWALFile(file string) error { } case *wal.DeleteRangeWALEntry: - err := e.engine.DeleteSeriesRangeWithPredicate(newFixedSeriesIterator(t.Keys), - func(name []byte, tags models.Tags) (int64, int64, bool) { - return t.Min, t.Max, true - }) - if err != nil { - return err - } + // TODO(jeff): implement? case *wal.DeleteWALEntry: - err := e.engine.DeleteSeriesRangeWithPredicate(newFixedSeriesIterator(t.Keys), - func(name []byte, tags models.Tags) (int64, int64, bool) { - return math.MinInt64, math.MaxInt64, true - }) - if err != nil { - return err - } + // TODO(jeff): implement? + } } @@ -524,6 +513,11 @@ func (e *Engine) CommitSegments(segs []string, fn func() error) error { // DeleteBucket deletes an entire bucket from the storage engine. func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error { + return e.DeleteBucketRange(orgID, bucketID, math.MinInt64, math.MaxInt64) +} + +// DeleteBucketRange deletes an entire bucket from the storage engine. +func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error { e.mu.RLock() defer e.mu.RUnlock() if e.closing == nil { @@ -537,23 +531,7 @@ func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error { encoded := tsdb.EncodeName(orgID, bucketID) name := models.EscapeMeasurement(encoded[:]) - return e.engine.DeleteBucket(name, math.MinInt64, math.MaxInt64) -} - -// DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns -// true for that series. -func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error { - e.mu.RLock() - defer e.mu.RUnlock() - if e.closing == nil { - return ErrEngineClosed - } - - // TODO(jeff): this can't exist because we can't WAL a predicate. We'd have to run the - // iterator and predicate to completion, store the results in the WAL, and then run it - // again. - - return e.engine.DeleteSeriesRangeWithPredicate(itr, fn) + return e.engine.DeleteBucketRange(name, min, max) } // SeriesCardinality returns the number of series in the engine. diff --git a/storage/retention.go b/storage/retention.go index ed3a08282a..3951b4d7e3 100644 --- a/storage/retention.go +++ b/storage/retention.go @@ -4,15 +4,10 @@ import ( "context" "errors" "math" - "sync" - "sync/atomic" "time" platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/logger" - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/tsdb" - "github.com/influxdata/influxql" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -24,8 +19,7 @@ const ( // A Deleter implementation is capable of deleting data from a storage engine. type Deleter interface { - CreateSeriesCursor(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error) - DeleteSeriesRangeWithPredicate(tsdb.SeriesIterator, func([]byte, models.Tags) (int64, int64, bool)) error + DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error } // A BucketFinder is responsible for providing access to buckets via a filter. @@ -78,9 +72,9 @@ func (s *retentionEnforcer) run() { log, logEnd := logger.NewOperation(s.logger, "Data retention check", "data_retention_check") defer logEnd() - rpByBucketID, err := s.getRetentionPeriodPerBucket() + buckets, err := s.getBucketInformation() if err != nil { - log.Error("Unable to determine bucket:RP mapping", zap.Error(err)) + log.Error("Unable to determine bucket information", zap.Error(err)) return } @@ -88,7 +82,7 @@ func (s *retentionEnforcer) run() { labels := s.metrics.Labels() labels["status"] = "ok" - if err := s.expireData(rpByBucketID, now); err != nil { + if err := s.expireData(buckets, now); err != nil { log.Error("Deletion not successful", zap.Error(err)) labels["status"] = "error" } @@ -98,149 +92,37 @@ func (s *retentionEnforcer) run() { // expireData runs a delete operation on the storage engine. // -// Any series data that (1) belongs to a bucket in the provided map and +// Any series data that (1) belongs to a bucket in the provided list and // (2) falls outside the bucket's indicated retention period will be deleted. -func (s *retentionEnforcer) expireData(rpByBucketID map[platform.ID]time.Duration, now time.Time) error { +func (s *retentionEnforcer) expireData(buckets []*platform.Bucket, now time.Time) error { _, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion") defer logEnd() - ctx, cancel := context.WithTimeout(context.Background(), engineAPITimeout) - defer cancel() - cur, err := s.Engine.CreateSeriesCursor(ctx, SeriesCursorRequest{}, nil) - if err != nil { - return err - } - defer cur.Close() - - var mu sync.Mutex - badMSketch := make(map[string]struct{}) // Badly formatted measurements. - missingBSketch := make(map[platform.ID]struct{}) // Missing buckets. - - var seriesDeleted uint64 // Number of series where a delete is attempted. - var seriesSkipped uint64 // Number of series that were skipped from delete. - - fn := func(name []byte, tags models.Tags) (int64, int64, bool) { - if len(name) != platform.IDLength { - mu.Lock() - badMSketch[string(name)] = struct{}{} - mu.Unlock() - atomic.AddUint64(&seriesSkipped, 1) - return 0, 0, false - + for _, b := range buckets { + if b.RetentionPeriod == 0 { + continue } - var n [16]byte - copy(n[:], name) - _, bucketID := tsdb.DecodeName(n) - - retentionPeriod, ok := rpByBucketID[bucketID] - if !ok { - mu.Lock() - missingBSketch[bucketID] = struct{}{} - mu.Unlock() - atomic.AddUint64(&seriesSkipped, 1) - return 0, 0, false + max := now.Add(-b.RetentionPeriod).UnixNano() + err := s.Engine.DeleteBucketRange(b.OrganizationID, b.ID, math.MinInt64, max) + if err != nil { + // TODO(jeff): metrics? } - if retentionPeriod == 0 { - return 0, 0, false - } - - atomic.AddUint64(&seriesDeleted, 1) - to := now.Add(-retentionPeriod).UnixNano() - return math.MinInt64, to, true } - defer func() { - if s.metrics == nil { - return - } - labels := s.metrics.Labels() - labels["status"] = "bad_measurement" - s.metrics.Unprocessable.With(labels).Add(float64(len(badMSketch))) - - labels["status"] = "missing_bucket" - s.metrics.Unprocessable.With(labels).Add(float64(len(missingBSketch))) - - labels["status"] = "ok" - s.metrics.Series.With(labels).Add(float64(atomic.LoadUint64(&seriesDeleted))) - - labels["status"] = "skipped" - s.metrics.Series.With(labels).Add(float64(atomic.LoadUint64(&seriesSkipped))) - }() - - return s.Engine.DeleteSeriesRangeWithPredicate(newSeriesIteratorAdapter(cur), fn) + return nil } -// getRetentionPeriodPerBucket returns a map of (bucket ID -> retention period) -// for all buckets. -func (s *retentionEnforcer) getRetentionPeriodPerBucket() (map[platform.ID]time.Duration, error) { +// getBucketInformation returns a slice of buckets to run retention on. +func (s *retentionEnforcer) getBucketInformation() ([]*platform.Bucket, error) { ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout) defer cancel() + buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{}) - if err != nil { - return nil, err - } - rpByBucketID := make(map[platform.ID]time.Duration, len(buckets)) - for _, bucket := range buckets { - rpByBucketID[bucket.ID] = bucket.RetentionPeriod - } - return rpByBucketID, nil + return buckets, err } // PrometheusCollectors satisfies the prom.PrometheusCollector interface. func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector { return s.metrics.PrometheusCollectors() } - -type seriesIteratorAdapter struct { - itr SeriesCursor - ea seriesElemAdapter - elem tsdb.SeriesElem -} - -func newSeriesIteratorAdapter(itr SeriesCursor) *seriesIteratorAdapter { - si := &seriesIteratorAdapter{itr: itr} - si.elem = &si.ea - return si -} - -// Next returns the next tsdb.SeriesElem. -// -// The returned tsdb.SeriesElem is valid for use until Next is called again. -func (s *seriesIteratorAdapter) Next() (tsdb.SeriesElem, error) { - if s.itr == nil { - return nil, nil - } - - row, err := s.itr.Next() - if err != nil { - return nil, err - } - - if row == nil { - return nil, nil - } - - s.ea.name = row.Name - s.ea.tags = row.Tags - return s.elem, nil -} - -func (s *seriesIteratorAdapter) Close() error { - if s.itr != nil { - err := s.itr.Close() - s.itr = nil - return err - } - return nil -} - -type seriesElemAdapter struct { - name []byte - tags models.Tags -} - -func (e *seriesElemAdapter) Name() []byte { return e.name } -func (e *seriesElemAdapter) Tags() models.Tags { return e.tags } -func (e *seriesElemAdapter) Deleted() bool { return false } -func (e *seriesElemAdapter) Expr() influxql.Expr { return nil } diff --git a/storage/retention_test.go b/storage/retention_test.go index c4d2a8e9b2..9b4db3cf41 100644 --- a/storage/retention_test.go +++ b/storage/retention_test.go @@ -2,7 +2,6 @@ package storage import ( "context" - "fmt" "math" "math/rand" "reflect" @@ -10,103 +9,80 @@ import ( "time" platform "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" - "github.com/influxdata/influxql" ) -func TestService_expireData(t *testing.T) { +func TestRetentionService(t *testing.T) { engine := NewTestEngine() service := newRetentionEnforcer(engine, NewTestBucketFinder()) now := time.Date(2018, 4, 10, 23, 12, 33, 0, time.UTC) - t.Run("no rpByBucketID", func(t *testing.T) { + t.Run("no buckets", func(t *testing.T) { if err := service.expireData(nil, now); err != nil { t.Error(err) } - - if err := service.expireData(map[platform.ID]time.Duration{}, now); err != nil { + if err := service.expireData([]*platform.Bucket{}, now); err != nil { t.Error(err) } }) - // Generate some measurement names - var names [][]byte - rpByBucketID := map[platform.ID]time.Duration{} - expMatchedFrequencies := map[string]int{} // To be used for verifying test results. - expRejectedFrequencies := map[string]int{} // To be used for verifying test results. + // Generate some buckets to expire + buckets := []*platform.Bucket{} + expMatched := map[string]struct{}{} // To be used for verifying test results. + expRejected := map[string]struct{}{} // To be used for verifying test results. for i := 0; i < 15; i++ { - repeat := rand.Intn(10) + 1 // [1, 10] name := genMeasurementName() - for j := 0; j < repeat; j++ { - names = append(names, name) - } var n [16]byte copy(n[:], name) - _, bucketID := tsdb.DecodeName(n) + orgID, bucketID := tsdb.DecodeName(n) // Put 1/3rd in the rpByBucketID into the set to delete and 1/3rd into the set // to not delete because no rp, and 1/3rd into the set to not delete because 0 rp. if i%3 == 0 { - rpByBucketID[bucketID] = 3 * time.Hour - expMatchedFrequencies[string(name)] = repeat + buckets = append(buckets, &platform.Bucket{ + OrganizationID: orgID, + ID: bucketID, + RetentionPeriod: 3 * time.Hour, + }) + expMatched[string(name)] = struct{}{} } else if i%3 == 1 { - expRejectedFrequencies[string(name)] = repeat + expRejected[string(name)] = struct{}{} } else if i%3 == 2 { - rpByBucketID[bucketID] = 0 - expRejectedFrequencies[string(name)] = repeat + buckets = append(buckets, &platform.Bucket{ + OrganizationID: orgID, + ID: bucketID, + RetentionPeriod: 0, + }) + expRejected[string(name)] = struct{}{} } } - // Add a badly formatted measurement. - for i := 0; i < 5; i++ { - names = append(names, []byte("zyzwrong")) - } - expRejectedFrequencies["zyzwrong"] = 5 - - gotMatchedFrequencies := map[string]int{} - gotRejectedFrequencies := map[string]int{} - engine.DeleteSeriesRangeWithPredicateFn = func(_ tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error { - - // Iterate over the generated names updating the frequencies by which - // the predicate function in expireData matches or rejects them. - for _, name := range names { - from, to, shouldDelete := fn(name, nil) - if shouldDelete { - gotMatchedFrequencies[string(name)]++ - if from != math.MinInt64 { - return fmt.Errorf("got from %d, expected %d", from, math.MinInt64) - } - wantTo := now.Add(-3 * time.Hour).UnixNano() - if to != wantTo { - return fmt.Errorf("got to %d, expected %d", to, wantTo) - } - } else { - gotRejectedFrequencies[string(name)]++ - } + gotMatched := map[string]struct{}{} + engine.DeleteBucketRangeFn = func(orgID, bucketID platform.ID, from, to int64) error { + if from != math.MinInt64 { + t.Fatalf("got from %d, expected %d", from, math.MinInt64) } + wantTo := now.Add(-3 * time.Hour).UnixNano() + if to != wantTo { + t.Fatalf("got to %d, expected %d", to, wantTo) + } + + name := tsdb.EncodeName(orgID, bucketID) + if _, ok := expRejected[string(name[:])]; ok { + t.Fatalf("got a delete for %x", name) + } + gotMatched[string(name[:])] = struct{}{} return nil } - t.Run("multiple bucket", func(t *testing.T) { - if err := service.expireData(rpByBucketID, now); err != nil { + t.Run("multiple buckets", func(t *testing.T) { + if err := service.expireData(buckets, now); err != nil { t.Error(err) } - - // Verify that the correct series were marked to be deleted. - t.Run("matched", func(t *testing.T) { - if !reflect.DeepEqual(gotMatchedFrequencies, expMatchedFrequencies) { - t.Fatalf("got\n%#v\nexpected\n%#v", gotMatchedFrequencies, expMatchedFrequencies) - } - }) - - t.Run("rejected", func(t *testing.T) { - // Verify that badly formatted measurements were rejected. - if !reflect.DeepEqual(gotRejectedFrequencies, expRejectedFrequencies) { - t.Fatalf("got\n%#v\nexpected\n%#v", gotRejectedFrequencies, expRejectedFrequencies) - } - }) + if !reflect.DeepEqual(gotMatched, expMatched) { + t.Fatalf("got\n%#v\nexpected\n%#v", gotMatched, expMatched) + } }) } @@ -120,40 +96,18 @@ func genMeasurementName() []byte { return b } -type TestSeriesCursor struct { - CloseFn func() error - NextFn func() (*SeriesCursorRow, error) -} - -func (f *TestSeriesCursor) Close() error { return f.CloseFn() } -func (f *TestSeriesCursor) Next() (*SeriesCursorRow, error) { return f.NextFn() } - type TestEngine struct { - CreateSeriesCursorFn func(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error) - DeleteSeriesRangeWithPredicateFn func(tsdb.SeriesIterator, func([]byte, models.Tags) (int64, int64, bool)) error - - SeriesCursor *TestSeriesCursor + DeleteBucketRangeFn func(platform.ID, platform.ID, int64, int64) error } func NewTestEngine() *TestEngine { - cursor := &TestSeriesCursor{ - CloseFn: func() error { return nil }, - NextFn: func() (*SeriesCursorRow, error) { return nil, nil }, - } - return &TestEngine{ - SeriesCursor: cursor, - CreateSeriesCursorFn: func(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error) { return cursor, nil }, - DeleteSeriesRangeWithPredicateFn: func(tsdb.SeriesIterator, func([]byte, models.Tags) (int64, int64, bool)) error { return nil }, + DeleteBucketRangeFn: func(platform.ID, platform.ID, int64, int64) error { return nil }, } } -func (e *TestEngine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) { - return e.CreateSeriesCursorFn(ctx, req, cond) -} - -func (e *TestEngine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error { - return e.DeleteSeriesRangeWithPredicateFn(itr, fn) +func (e *TestEngine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error { + return e.DeleteBucketRangeFn(orgID, bucketID, min, max) } type TestBucketFinder struct { diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 34ea98b3c3..de37de6b55 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -4,11 +4,8 @@ package tsm1 // import "github.com/influxdata/influxdb/tsdb/tsm1" import ( "bytes" "context" - "errors" "fmt" - "io" "io/ioutil" - "math" "os" "path/filepath" "regexp" @@ -20,7 +17,6 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/metrics" "github.com/influxdata/influxdb/query" @@ -594,361 +590,11 @@ func (e *Engine) WriteValues(values map[string][]Value) error { return nil } -// DeleteSeriesRange removes the values between min and max (inclusive) from all series -func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error { - return e.DeleteSeriesRangeWithPredicate(itr, func(name []byte, tags models.Tags) (int64, int64, bool) { - return min, max, true - }) -} - -// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series -// for which predicate() returns true. If predicate() is nil, then all values in range are removed. -func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error { - var disableOnce bool - - // Ensure that the index does not compact away the measurement or series we're - // going to delete before we're done with them. - e.index.DisableCompactions() - defer e.index.EnableCompactions() - e.index.Wait() - - fs, err := e.index.RetainFileSet() - if err != nil { - return err - } - defer fs.Release() - - var ( - sz int - min, max int64 = math.MinInt64, math.MaxInt64 - - // Indicator that the min/max time for the current batch has changed and - // we need to flush the current batch before appending to it. - flushBatch bool - ) - - // These are reversed from min/max to ensure they are different the first time through. - newMin, newMax := int64(math.MaxInt64), int64(math.MinInt64) - - // There is no predicate, so setup newMin/newMax to delete the full time range. - if predicate == nil { - newMin = min - newMax = max - } - - batch := make([][]byte, 0, 10000) - for { - elem, err := itr.Next() - if err != nil { - return err - } else if elem == nil { - break - } - - // See if the series should be deleted and if so, what range of time. - if predicate != nil { - var shouldDelete bool - newMin, newMax, shouldDelete = predicate(elem.Name(), elem.Tags()) - if !shouldDelete { - continue - } - - // If the min/max happens to change for the batch, we need to flush - // the current batch and start a new one. - flushBatch = (min != newMin || max != newMax) && len(batch) > 0 - } - - if elem.Expr() != nil { - if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val { - return errors.New("fields not supported in WHERE clause during deletion") - } - } - - if !disableOnce { - // Disable and abort running compactions so that tombstones added existing tsm - // files don't get removed. This would cause deleted measurements/series to - // re-appear once the compaction completed. We only disable the level compactions - // so that snapshotting does not stop while writing out tombstones. If it is stopped, - // and writing tombstones takes a long time, writes can get rejected due to the cache - // filling up. - e.disableLevelCompactions(true) - defer e.enableLevelCompactions(true) - - e.sfile.DisableCompactions() - defer e.sfile.EnableCompactions() - e.sfile.Wait() - - disableOnce = true - } - - if sz >= deleteFlushThreshold || flushBatch { - // Delete all matching batch. - if err := e.deleteSeriesRange(batch, min, max); err != nil { - return err - } - batch = batch[:0] - sz = 0 - flushBatch = false - } - - // Use the new min/max time for the next iteration - min = newMin - max = newMax - - key := models.MakeKey(elem.Name(), elem.Tags()) - sz += len(key) - batch = append(batch, key) - } - - if len(batch) > 0 { - // Delete all matching batch. - if err := e.deleteSeriesRange(batch, min, max); err != nil { - return err - } - } - - e.index.Rebuild() - return nil -} - -// deleteSeriesRange removes the values between min and max (inclusive) from all series. This -// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange -// and not directly. -func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { - if len(seriesKeys) == 0 { - return nil - } - - // Ensure keys are sorted since lower layers require them to be. - if !bytesutil.IsSorted(seriesKeys) { - bytesutil.Sort(seriesKeys) - } - - // Min and max time in the engine are slightly different from the query language values. - if min == influxql.MinTime { - min = math.MinInt64 - } - if max == influxql.MaxTime { - max = math.MaxInt64 - } - - // Run the delete on each TSM file in parallel - if err := e.FileStore.Apply(func(r TSMFile) error { - // See if this TSM file contains the keys and time range - minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1] - tsmMin, tsmMax := r.KeyRange() - - tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin) - tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax) - - overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0 - if !overlaps || !r.OverlapsTimeRange(min, max) { - return nil - } - - // Delete each key we find in the file. We seek to the min key and walk from there. - batch := r.BatchDelete() - iter := r.Iterator(minKey) - var j int - for iter.Next() { - indexKey := iter.Key() - seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey) - - for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 { - j++ - } - - if j >= len(seriesKeys) { - break - } - if bytes.Equal(seriesKeys[j], seriesKey) { - if err := batch.DeleteRange([][]byte{indexKey}, min, max); err != nil { - batch.Rollback() - return err - } - } - } - if err := iter.Err(); err != nil { - batch.Rollback() - return err - } - - return batch.Commit() - }); err != nil { - return err - } - - // find the keys in the cache and remove them - deleteKeys := make([][]byte, 0, len(seriesKeys)) - - // ApplySerialEntryFn cannot return an error in this invocation. - _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { - seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k)) - - // Cache does not walk keys in sorted order, so search the sorted - // series we need to delete to see if any of the cache keys match. - i := bytesutil.SearchBytes(seriesKeys, seriesKey) - if i < len(seriesKeys) && bytes.Equal(seriesKey, seriesKeys[i]) { - // k is the measurement + tags + sep + field - deleteKeys = append(deleteKeys, k) - } - return nil - }) - - // Sort the series keys because ApplyEntryFn iterates over the keys randomly. - bytesutil.Sort(deleteKeys) - - e.Cache.DeleteRange(deleteKeys, min, max) - - // The series are deleted on disk, but the index may still say they exist. - // Depending on the the min,max time passed in, the series may or not actually - // exists now. To reconcile the index, we walk the series keys that still exists - // on disk and cross out any keys that match the passed in series. Any series - // left in the slice at the end do not exist and can be deleted from the index. - // Note: this is inherently racy if writes are occurring to the same measurement/series are - // being removed. A write could occur and exist in the cache at this point, but we - // would delete it from the index. - minKey := seriesKeys[0] - - // Apply runs this func concurrently. The seriesKeys slice is mutated concurrently - // by different goroutines setting positions to nil. - if err := e.FileStore.Apply(func(r TSMFile) error { - var j int - - // Start from the min deleted key that exists in this file. - iter := r.Iterator(minKey) - for iter.Next() { - if j >= len(seriesKeys) { - return nil - } - - indexKey := iter.Key() - seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey) - - // Skip over any deleted keys that are less than our tsm key - cmp := bytes.Compare(seriesKeys[j], seriesKey) - for j < len(seriesKeys) && cmp < 0 { - j++ - if j >= len(seriesKeys) { - return nil - } - cmp = bytes.Compare(seriesKeys[j], seriesKey) - } - - // We've found a matching key, cross it out so we do not remove it from the index. - if j < len(seriesKeys) && cmp == 0 { - seriesKeys[j] = emptyBytes - j++ - } - } - - return iter.Err() - }); err != nil { - return err - } - - // Have we deleted all values for the series? If so, we need to remove - // the series from the index. - if len(seriesKeys) > 0 { - buf := make([]byte, 1024) // For use when accessing series file. - ids := tsdb.NewSeriesIDSet() - measurements := make(map[string]struct{}, 1) - - for _, k := range seriesKeys { - if len(k) == 0 { - continue // This key was wiped because it shouldn't be removed from index. - } - - name, tags := models.ParseKeyBytes(k) - sid := e.sfile.SeriesID(name, tags, buf) - if sid.IsZero() { - continue - } - - // See if this series was found in the cache earlier - i := bytesutil.SearchBytes(deleteKeys, k) - - var hasCacheValues bool - // If there are multiple fields, they will have the same prefix. If any field - // has values, then we can't delete it from the index. - for i < len(deleteKeys) && bytes.HasPrefix(deleteKeys[i], k) { - if e.Cache.Values(deleteKeys[i]).Len() > 0 { - hasCacheValues = true - break - } - i++ - } - - if hasCacheValues { - continue - } - - measurements[string(name)] = struct{}{} - // Remove the series from the local index. - if err := e.index.DropSeries(sid, k, false); err != nil { - return err - } - - // Add the id to the set of delete ids. - ids.Add(sid) - } - - for k := range measurements { - if err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil { - return err - } - } - - // Remove the remaining ids from the series file as they no longer exist - // in any shard. - var err error - ids.ForEach(func(id tsdb.SeriesID) { - if err1 := e.sfile.DeleteSeriesID(id); err1 != nil { - err = err1 - } - }) - if err != nil { - return err - } - } - - return nil -} - -// DeleteMeasurement deletes a measurement and all related series. -func (e *Engine) DeleteMeasurement(name []byte) error { - // Delete the bulk of data outside of the fields lock. - if err := e.deleteMeasurement(name); err != nil { - return err - } - return nil -} - -// DeleteMeasurement deletes a measurement and all related series. -func (e *Engine) deleteMeasurement(name []byte) error { - // Attempt to find the series keys. - itr, err := e.index.MeasurementSeriesIDIterator(name) - if err != nil { - return err - } else if itr == nil { - return nil - } - defer itr.Close() - return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64) -} - // ForEachMeasurementName iterates over each measurement name in the engine. func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error { return e.index.ForEachMeasurementName(fn) } -func (e *Engine) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) error { - return e.index.CreateSeriesListIfNotExists(collection) -} - -// WriteTo is not implemented. -func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } - // compactionLevel describes a snapshot or levelled compaction. type compactionLevel int diff --git a/tsdb/tsm1/engine_delete_measurement.go b/tsdb/tsm1/engine_delete_bucket.go similarity index 97% rename from tsdb/tsm1/engine_delete_measurement.go rename to tsdb/tsm1/engine_delete_bucket.go index 466716fc46..57957942ae 100644 --- a/tsdb/tsm1/engine_delete_measurement.go +++ b/tsdb/tsm1/engine_delete_bucket.go @@ -11,10 +11,10 @@ import ( "github.com/influxdata/influxql" ) -// DeleteBucket removes all TSM data belonging to a bucket, and removes all index +// DeleteBucketRange removes all TSM data belonging to a bucket, and removes all index // and series file data associated with the bucket. The provided time range ensures // that only bucket data for that range is removed. -func (e *Engine) DeleteBucket(name []byte, min, max int64) error { +func (e *Engine) DeleteBucketRange(name []byte, min, max int64) error { // TODO(jeff): we need to block writes to this prefix while deletes are in progress // otherwise we can end up in a situation where we have staged data in the cache or // WAL that was deleted from the index, or worse. This needs to happen at a higher diff --git a/tsdb/tsm1/engine_delete_measurement_test.go b/tsdb/tsm1/engine_delete_bucket_test.go similarity index 94% rename from tsdb/tsm1/engine_delete_measurement_test.go rename to tsdb/tsm1/engine_delete_bucket_test.go index c0b21bb378..755d116fd9 100644 --- a/tsdb/tsm1/engine_delete_measurement_test.go +++ b/tsdb/tsm1/engine_delete_bucket_test.go @@ -8,7 +8,7 @@ import ( "github.com/influxdata/influxdb/models" ) -func TestEngine_DeletePrefix(t *testing.T) { +func TestEngine_DeleteBucket(t *testing.T) { // Create a few points. p1 := MustParsePointString("cpu,host=0 value=1.1 6") p2 := MustParsePointString("cpu,host=A value=1.2 2") @@ -44,7 +44,7 @@ func TestEngine_DeletePrefix(t *testing.T) { t.Fatalf("series count mismatch: exp %v, got %v", exp, got) } - if err := e.DeleteBucket([]byte("cpu"), 0, 3); err != nil { + if err := e.DeleteBucketRange([]byte("cpu"), 0, 3); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -90,7 +90,7 @@ func TestEngine_DeletePrefix(t *testing.T) { iter.Close() // Deleting remaining series should remove them from the series. - if err := e.DeleteBucket([]byte("cpu"), 0, 9); err != nil { + if err := e.DeleteBucketRange([]byte("cpu"), 0, 9); err != nil { t.Fatalf("failed to delete series: %v", err) } diff --git a/tsdb/tsm1/engine_test.go b/tsdb/tsm1/engine_test.go index a1ab2674bb..b8f86d6d0d 100644 --- a/tsdb/tsm1/engine_test.go +++ b/tsdb/tsm1/engine_test.go @@ -1,7 +1,6 @@ package tsm1_test import ( - "bytes" "fmt" "io/ioutil" "math" @@ -59,8 +58,9 @@ func TestIndex_SeriesIDSet(t *testing.T) { } // Drop all the series for the gpu measurement and they should no longer - // be in the series ID set. - if err := engine.DeleteMeasurement([]byte("gpu")); err != nil { + // be in the series ID set. This relies on the fact that DeleteBucketRange is really + // operating on prefixes. + if err := engine.DeleteBucketRange([]byte("gpu"), math.MinInt64, math.MaxInt64); err != nil { t.Fatal(err) } @@ -72,17 +72,6 @@ func TestIndex_SeriesIDSet(t *testing.T) { delete(seriesIDMap, "gpu") delete(seriesIDMap, "gpu,host=b") - // Drop the specific mem series - ditr := &seriesIterator{keys: [][]byte{[]byte("mem,host=z")}} - if err := engine.DeleteSeriesRange(ditr, math.MinInt64, math.MaxInt64); err != nil { - t.Fatal(err) - } - - if engine.SeriesIDSet().Contains(seriesIDMap["mem,host=z"]) { - t.Fatalf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["mem,host=z"], "mem,host=z") - } - delete(seriesIDMap, "mem,host=z") - // The rest of the keys should still be in the set. for key, id := range seriesIDMap { if !engine.SeriesIDSet().Contains(id) { @@ -106,530 +95,6 @@ func TestIndex_SeriesIDSet(t *testing.T) { } } -// Ensures that deleting series from TSM files with multiple fields removes all the -/// series -func TestEngine_DeleteSeries(t *testing.T) { - // Create a few points. - p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") - p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") - p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") - - e, err := NewEngine() - if err != nil { - t.Fatal(err) - } - - // mock the planner so compactions don't run during the test - e.CompactionPlan = &mockPlanner{} - if err := e.Open(); err != nil { - t.Fatal(err) - } - defer e.Close() - - if err := e.writePoints(p1, p2, p3); err != nil { - t.Fatalf("failed to write points: %s", err.Error()) - } - if err := e.WriteSnapshot(); err != nil { - t.Fatalf("failed to snapshot: %s", err.Error()) - } - - keys := e.FileStore.Keys() - if exp, got := 3, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} - if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - keys = e.FileStore.Keys() - if exp, got := 1, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - exp := "cpu,host=B#!~#value" - if _, ok := keys[exp]; !ok { - t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) - } -} - -func TestEngine_DeleteSeriesRange(t *testing.T) { - // Create a few points. - p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted - p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") - p3 := MustParsePointString("cpu,host=A value=1.3 3000000000") - p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted - p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted - p6 := MustParsePointString("cpu,host=C value=1.3 1000000000") - p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted - p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - - e, err := NewEngine() - if err != nil { - t.Fatal(err) - } - - // mock the planner so compactions don't run during the test - e.CompactionPlan = &mockPlanner{} - if err := e.Open(); err != nil { - t.Fatal(err) - } - defer e.Close() - - if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { - t.Fatalf("failed to write points: %s", err.Error()) - } - - if err := e.WriteSnapshot(); err != nil { - t.Fatalf("failed to snapshot: %s", err.Error()) - } - - keys := e.FileStore.Keys() - if exp, got := 6, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}} - if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - keys = e.FileStore.Keys() - if exp, got := 4, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - exp := "cpu,host=B#!~#value" - if _, ok := keys[exp]; !ok { - t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) - } - - // Check that the series still exists in the index - iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) - if err != nil { - t.Fatalf("iterator error: %v", err) - } - defer iter.Close() - - elem, err := iter.Next() - if err != nil { - t.Fatal(err) - } - if elem.SeriesID.IsZero() { - t.Fatalf("series index mismatch: EOF, exp 2 series") - } - - // Lookup series. - name, tags := e.sfile.Series(elem.SeriesID) - if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if !tags.Equal(models.NewTags(map[string]string{"host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) { - t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags) - } - iter.Close() - - // Deleting remaining series should remove them from the series. - itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=B")}} - if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { - t.Fatalf("iterator error: %v", err) - } - if iter == nil { - return - } - - defer iter.Close() - if elem, err = iter.Next(); err != nil { - t.Fatal(err) - } - if !elem.SeriesID.IsZero() { - t.Fatalf("got an undeleted series id, but series should be dropped from index") - } -} - -func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { - // Create a few points. - p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted - p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted - p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") - p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") - p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted - p6 := MustParsePointString("mem,host=B value=1.3 1000000000") - p7 := MustParsePointString("mem,host=C value=1.3 1000000000") - p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - - e, err := NewEngine() - if err != nil { - t.Fatal(err) - } - - // mock the planner so compactions don't run during the test - e.CompactionPlan = &mockPlanner{} - if err := e.Open(); err != nil { - t.Fatal(err) - } - defer e.Close() - - if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { - t.Fatalf("failed to write points: %s", err.Error()) - } - - if err := e.WriteSnapshot(); err != nil { - t.Fatalf("failed to snapshot: %s", err.Error()) - } - - keys := e.FileStore.Keys() - if exp, got := 6, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} - predicate := func(name []byte, tags models.Tags) (int64, int64, bool) { - if bytes.Equal(name, []byte("mem")) { - return math.MinInt64, math.MaxInt64, true - } - if bytes.Equal(name, []byte("cpu")) { - for _, tag := range tags { - if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) { - return math.MinInt64, math.MaxInt64, true - } - } - } - return math.MinInt64, math.MaxInt64, false - } - if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - keys = e.FileStore.Keys() - if exp, got := 3, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"} - for _, exp := range exps { - if _, ok := keys[exp]; !ok { - t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys) - } - } - - // Check that the series still exists in the index - iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) - if err != nil { - t.Fatalf("iterator error: %v", err) - } - defer iter.Close() - - elem, err := iter.Next() - if err != nil { - t.Fatal(err) - } - if elem.SeriesID.IsZero() { - t.Fatalf("series index mismatch: EOF, exp 2 series") - } - - // Lookup series. - name, tags := e.sfile.Series(elem.SeriesID) - if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) { - t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags) - } - iter.Close() - - // Deleting remaining series should remove them from the series. - itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}} - if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { - t.Fatalf("iterator error: %v", err) - } - if iter == nil { - return - } - - defer iter.Close() - if elem, err = iter.Next(); err != nil { - t.Fatal(err) - } - if !elem.SeriesID.IsZero() { - t.Fatalf("got an undeleted series id, but series should be dropped from index") - } -} - -// Tests that a nil predicate deletes all values returned from the series iterator. -func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) { - // Create a few points. - p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted - p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted - p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") - p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") - p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted - p6 := MustParsePointString("mem,host=B value=1.3 1000000000") - p7 := MustParsePointString("mem,host=C value=1.3 1000000000") - p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - - e, err := NewEngine() - if err != nil { - t.Fatal(err) - } - - // mock the planner so compactions don't run during the test - e.CompactionPlan = &mockPlanner{} - if err := e.Open(); err != nil { - t.Fatal(err) - } - defer e.Close() - - if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { - t.Fatalf("failed to write points: %s", err.Error()) - } - - if err := e.WriteSnapshot(); err != nil { - t.Fatalf("failed to snapshot: %s", err.Error()) - } - - keys := e.FileStore.Keys() - if exp, got := 6, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} - if err := e.DeleteSeriesRangeWithPredicate(itr, nil); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - keys = e.FileStore.Keys() - if exp, got := 1, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - // Check that the series still exists in the index - iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) - if err != nil { - t.Fatalf("iterator error: %v", err) - } else if iter == nil { - return - } - defer iter.Close() - - if elem, err := iter.Next(); err != nil { - t.Fatal(err) - } else if !elem.SeriesID.IsZero() { - t.Fatalf("got an undeleted series id, but series should be dropped from index") - } - - // Check that disk series still exists - iter, err = e.index.MeasurementSeriesIDIterator([]byte("disk")) - if err != nil { - t.Fatalf("iterator error: %v", err) - } else if iter == nil { - return - } - defer iter.Close() - - if elem, err := iter.Next(); err != nil { - t.Fatal(err) - } else if elem.SeriesID.IsZero() { - t.Fatalf("got an undeleted series id, but series should be dropped from index") - } -} - -func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) { - // Create a few points. - p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted - p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted - p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") - p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") - p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted - p6 := MustParsePointString("mem,host=B value=1.3 1000000000") - p7 := MustParsePointString("mem,host=C value=1.3 1000000000") - p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted - - e, err := NewEngine() - if err != nil { - t.Fatal(err) - } - - // mock the planner so compactions don't run during the test - e.CompactionPlan = &mockPlanner{} - if err := e.Open(); err != nil { - t.Fatal(err) - } - defer e.Close() - - if err := e.writePoints(p1, p2, p3, p4, p5, p6, p7, p8); err != nil { - t.Fatalf("failed to write points: %s", err.Error()) - } - - if err := e.WriteSnapshot(); err != nil { - t.Fatalf("failed to snapshot: %s", err.Error()) - } - - keys := e.FileStore.Keys() - if exp, got := 6, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} - predicate := func(name []byte, tags models.Tags) (int64, int64, bool) { - if bytes.Equal(name, []byte("mem")) { - return 1000000000, 1000000000, true - } - - if bytes.Equal(name, []byte("cpu")) { - for _, tag := range tags { - if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) { - return 3000000000, 4000000000, true - } - } - } - return math.MinInt64, math.MaxInt64, false - } - if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - keys = e.FileStore.Keys() - if exp, got := 3, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"} - for _, exp := range exps { - if _, ok := keys[exp]; !ok { - t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys) - } - } - - // Check that the series still exists in the index - iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) - if err != nil { - t.Fatalf("iterator error: %v", err) - } - defer iter.Close() - - elem, err := iter.Next() - if err != nil { - t.Fatal(err) - } - if elem.SeriesID.IsZero() { - t.Fatalf("series index mismatch: EOF, exp 2 series") - } - - // Lookup series. - name, tags := e.sfile.Series(elem.SeriesID) - if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) { - t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags) - } - iter.Close() - - // Deleting remaining series should remove them from the series. - itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}} - if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { - t.Fatalf("iterator error: %v", err) - } - if iter == nil { - return - } - - defer iter.Close() - if elem, err = iter.Next(); err != nil { - t.Fatal(err) - } - if !elem.SeriesID.IsZero() { - t.Fatalf("got an undeleted series id, but series should be dropped from index") - } -} - -func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { - // Create a few points. - p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted - - e, err := NewEngine() - if err != nil { - t.Fatal(err) - } - - // mock the planner so compactions don't run during the test - e.CompactionPlan = &mockPlanner{} - if err := e.Open(); err != nil { - t.Fatal(err) - } - defer e.Close() - - if err := e.writePoints(p1); err != nil { - t.Fatalf("failed to write points: %s", err.Error()) - } - - if err := e.WriteSnapshot(); err != nil { - t.Fatalf("failed to snapshot: %s", err.Error()) - } - - keys := e.FileStore.Keys() - if exp, got := 1, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} - if err := e.DeleteSeriesRange(itr, 0, 0); err != nil { - t.Fatalf("failed to delete series: %v", err) - } - - keys = e.FileStore.Keys() - if exp, got := 1, len(keys); exp != got { - t.Fatalf("series count mismatch: exp %v, got %v", exp, got) - } - - exp := "cpu,host=A#!~#value" - if _, ok := keys[exp]; !ok { - t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) - } - - // Check that the series still exists in the index - iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) - if err != nil { - t.Fatalf("iterator error: %v", err) - } - defer iter.Close() - - elem, err := iter.Next() - if err != nil { - t.Fatal(err) - } - if elem.SeriesID.IsZero() { - t.Fatalf("series index mismatch: EOF, exp 1 series") - } - - // Lookup series. - name, tags := e.sfile.Series(elem.SeriesID) - if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if got, exp := tags, models.NewTags(map[string]string{"host": "A"}); !got.Equal(exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } -} - func TestEngine_SnapshotsDisabled(t *testing.T) { sfile := MustOpenSeriesFile() defer sfile.Close() @@ -968,7 +433,7 @@ func (e *Engine) WritePointsString(ptstr ...string) error { func (e *Engine) writePoints(points ...models.Point) error { // Write into the index. collection := tsdb.NewSeriesCollection(points) - if err := e.CreateSeriesListIfNotExists(collection); err != nil { + if err := e.index.CreateSeriesListIfNotExists(collection); err != nil { return err } // Write the points into the cache/wal.