diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bb41e795b..01ddadcded 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ * The default logging format has been changed. See [#9055](https://github.com/influxdata/influxdb/pull/9055) for details. +### Features + +- [#9088](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine + ### Bugfixes - [#8538](https://github.com/influxdata/influxdb/pull/8538): Fix panic: runtime error: slice bounds out of range diff --git a/cmd/influxd/run/command_test.go b/cmd/influxd/run/command_test.go index 692928716f..7507f323cc 100644 --- a/cmd/influxd/run/command_test.go +++ b/cmd/influxd/run/command_test.go @@ -19,6 +19,11 @@ func TestCommand_PIDFile(t *testing.T) { pidFile := filepath.Join(tmpdir, "influxdb.pid") + // Override the default data/wal dir so it doesn't look in ~/.influxdb which + // might have junk not related to this test. + os.Setenv("INFLUXDB_DATA_DIR", tmpdir) + os.Setenv("INFLUXDB_DATA_WAL_DIR", tmpdir) + cmd := run.NewCommand() cmd.Getenv = func(key string) string { switch key { diff --git a/models/points.go b/models/points.go index 86594da57c..508d73a6ba 100644 --- a/models/points.go +++ b/models/points.go @@ -263,6 +263,11 @@ func ParsePointsString(buf string) ([]Point, error) { // NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. // This can have the unintended effect preventing buf from being garbage collected. func ParseKey(buf []byte) (string, Tags) { + meas, tags := ParseKeyBytes(buf) + return string(meas), tags +} + +func ParseKeyBytes(buf []byte) ([]byte, Tags) { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key state, i, _ := scanMeasurement(buf, 0) @@ -271,9 +276,9 @@ func ParseKey(buf []byte) (string, Tags) { if state == tagKeyState { tags = parseTags(buf) // scanMeasurement returns the location of the comma if there are tags, strip that off - return string(buf[:i-1]), tags + return buf[:i-1], tags } - return string(buf[:i]), tags + return buf[:i], tags } func ParseTags(buf []byte) (Tags, error) { diff --git a/pkg/bytesutil/bytesutil.go b/pkg/bytesutil/bytesutil.go index f67d1e4d93..3e80c794c1 100644 --- a/pkg/bytesutil/bytesutil.go +++ b/pkg/bytesutil/bytesutil.go @@ -107,6 +107,47 @@ func CloneSlice(a [][]byte) [][]byte { return other } +// Pack converts a sparse array to a dense one. It removes sections of a containing +// runs of val of length width. The returned value is a subslice of a. +func Pack(a []byte, width int, val byte) []byte { + var i, j, iStart, jStart, end int + + fill := make([]byte, width) + for i := 0; i < len(fill); i++ { + fill[i] = val + } + + // Skip the first run that won't move + for ; i < len(a) && a[i] != val; i += width { + } + end = i + + for i < len(a) { + // Find the next gap to remove + iStart = i + for i < len(a) && a[i] == val { + i += width + } + + // Find the next non-gap to keep + jStart = i + for j = i; j < len(a) && a[j] != val; j += width { + } + + if jStart == len(a) { + break + } + + // Move the non-gap over the section to remove. + copy(a[end:], a[jStart:j]) + i = iStart + len(a[jStart:j]) + end += j - jStart + i = j + } + + return a[:end] +} + type byteSlices [][]byte func (a byteSlices) Len() int { return len(a) } diff --git a/pkg/bytesutil/bytesutil_test.go b/pkg/bytesutil/bytesutil_test.go index b59e897b6b..8334a2675e 100644 --- a/pkg/bytesutil/bytesutil_test.go +++ b/pkg/bytesutil/bytesutil_test.go @@ -33,3 +33,100 @@ func TestSearchBytesFixed(t *testing.T) { t.Fatalf("index mismatch: exp %v, got %v", exp, got) } } + +func TestPack_WidthOne_One(t *testing.T) { + a := make([]byte, 8) + + a[4] = 1 + + a = bytesutil.Pack(a, 1, 0) + if got, exp := len(a), 1; got != exp { + t.Fatalf("len mismatch: got %v, exp %v", got, exp) + } + + for i, v := range []byte{1} { + if got, exp := a[i], v; got != exp { + t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp) + } + } +} + +func TestPack_WidthOne_Two(t *testing.T) { + a := make([]byte, 8) + + a[4] = 1 + a[6] = 2 + + a = bytesutil.Pack(a, 1, 0) + if got, exp := len(a), 2; got != exp { + t.Fatalf("len mismatch: got %v, exp %v", got, exp) + } + + for i, v := range []byte{1, 2} { + if got, exp := a[i], v; got != exp { + t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp) + } + } +} + +func TestPack_WidthTwo_Two(t *testing.T) { + a := make([]byte, 8) + + a[2] = 1 + a[3] = 1 + a[6] = 2 + a[7] = 2 + + a = bytesutil.Pack(a, 2, 0) + if got, exp := len(a), 4; got != exp { + t.Fatalf("len mismatch: got %v, exp %v", got, exp) + } + + for i, v := range []byte{1, 1, 2, 2} { + if got, exp := a[i], v; got != exp { + t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp) + } + } +} + +func TestPack_WidthOne_Last(t *testing.T) { + a := make([]byte, 8) + + a[6] = 2 + a[7] = 2 + + a = bytesutil.Pack(a, 2, 255) + if got, exp := len(a), 8; got != exp { + t.Fatalf("len mismatch: got %v, exp %v", got, exp) + } + + for i, v := range []byte{0, 0, 0, 0, 0, 0, 2, 2} { + if got, exp := a[i], v; got != exp { + t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp) + } + } +} + +func TestPack_WidthOne_LastFill(t *testing.T) { + a := make([]byte, 8) + + a[0] = 255 + a[1] = 255 + a[2] = 2 + a[3] = 2 + a[4] = 2 + a[5] = 2 + a[6] = 2 + a[7] = 2 + + a = bytesutil.Pack(a, 2, 255) + if got, exp := len(a), 6; got != exp { + t.Fatalf("len mismatch: got %v, exp %v", got, exp) + } + + for i, v := range []byte{2, 2, 2, 2, 2, 2} { + if got, exp := a[i], v; got != exp { + t.Fatalf("value mismatch: a[%d] = %v, exp %v", i, got, exp) + } + } +} diff --git a/test.sh b/test.sh index c37ca39ee7..2da83eb265 100755 --- a/test.sh +++ b/test.sh @@ -90,7 +90,7 @@ function build_docker_image { local imagename=$2 echo "Building docker image $imagename" - exit_if_fail docker build -f "$dockerfile" -t "$imagename" . + exit_if_fail docker build --rm=$DOCKER_RM -f "$dockerfile" -t "$imagename" . } diff --git a/tsdb/engine.go b/tsdb/engine.go index 2e9c5b67f4..386325578f 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -51,7 +51,7 @@ type Engine interface { CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error - DeleteSeriesRange(keys [][]byte, min, max int64) error + DeleteSeriesRange(itr SeriesIterator, min, max int64) error SeriesSketches() (estimator.Sketch, estimator.Sketch, error) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) @@ -72,6 +72,7 @@ type Engine interface { TagKeyCardinality(name, key []byte) int // InfluxQL iterators + MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index eb76d10886..95cacbfb37 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -461,6 +461,10 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { } ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, math.MinInt64, math.MaxInt64) + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) + } + a3 := tsm1.NewValue(3, 1.3) writes = map[string][]tsm1.Value{ "cpu,host=A#!~#value": []tsm1.Value{a3}, @@ -563,6 +567,10 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { // a1 should remain after compaction ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, math.MaxInt64) + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) + } + a3 := tsm1.NewValue(3, 1.3) writes = map[string][]tsm1.Value{ "cpu,host=A#!~#value": []tsm1.Value{a3}, @@ -670,6 +678,10 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, 2) ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 4, 4) + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) + } + a5 := tsm1.NewValue(5, 1.5) writes = map[string][]tsm1.Value{ "cpu,host=A#!~#value": []tsm1.Value{a5}, diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 10c547bbfa..bcefcd1f29 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -5,6 +5,7 @@ import ( "archive/tar" "bytes" "context" + "errors" "fmt" "io" "io/ioutil" @@ -64,6 +65,9 @@ const ( // keyFieldSeparator separates the series key from the field name in the composite key // that identifies a specific field in series keyFieldSeparator = "#!~#" + + // deleteFlushThreshold is the size in bytes of a batch of series keys to delete. + deleteFlushThreshold = 50 * 1024 * 1024 ) // Statistics gathered by the engine. @@ -559,7 +563,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { // Save reference to index for iterator creation. e.index = index - if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error { + if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error { fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) if err != nil { return err @@ -930,41 +934,69 @@ func (e *Engine) WritePoints(points []models.Point) error { return err } -// containsSeries returns a map of keys indicating whether the key exists and -// has values or not. -func (e *Engine) containsSeries(keys [][]byte) (map[string]bool, error) { - // keyMap is used to see if a given key exists. keys - // are the measurement + tagset (minus separate & field) - keyMap := map[string]bool{} - for _, k := range keys { - keyMap[string(k)] = false +// DeleteSeriesRange removes the values between min and max (inclusive) from all series +func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error { + var disableOnce bool + + // Ensure that the index does not compact away the measurement or series we're + // going to delete before we're done with them. + if tsiIndex, ok := e.index.(*tsi1.Index); ok { + fs := tsiIndex.RetainFileSet() + defer fs.Release() } - for _, k := range e.Cache.unsortedKeys() { - seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k)) - keyMap[string(seriesKey)] = true - } - - if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error { - seriesKey, _ := SeriesAndFieldFromCompositeKey(k) - if _, ok := keyMap[string(seriesKey)]; ok { - keyMap[string(seriesKey)] = true + var sz int + batch := make([][]byte, 0, 10000) + for elem := itr.Next(); elem != nil; elem = itr.Next() { + if elem.Expr() != nil { + if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val { + return errors.New("fields not supported in WHERE clause during deletion") + } + } + + if !disableOnce { + // Disable and abort running compactions so that tombstones added existing tsm + // files don't get removed. This would cause deleted measurements/series to + // re-appear once the compaction completed. We only disable the level compactions + // so that snapshotting does not stop while writing out tombstones. If it is stopped, + // and writing tombstones takes a long time, writes can get rejected due to the cache + // filling up. + e.disableLevelCompactions(true) + defer e.enableLevelCompactions(true) + disableOnce = true + } + + key := models.MakeKey(elem.Name(), elem.Tags()) + sz += len(key) + batch = append(batch, key) + + if sz >= deleteFlushThreshold { + // Delete all matching batch. + if err := e.deleteSeriesRange(batch, min, max); err != nil { + return err + } + batch = batch[:0] + sz = 0 } - return nil - }); err != nil { - return nil, err } - return keyMap, nil + if len(batch) > 0 { + // Delete all matching batch. + if err := e.deleteSeriesRange(batch, min, max); err != nil { + return err + } + batch = batch[:0] + } + + e.index.Rebuild() + return nil } -// deleteSeries removes all series keys from the engine. -func (e *Engine) deleteSeries(seriesKeys [][]byte) error { - return e.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64) -} - -// DeleteSeriesRange removes the values between min and max (inclusive) from all series. -func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { +// deleteSeriesRange removes the values between min and max (inclusive) from all series. This +// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange +// and not directly. +func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { + ts := time.Now().UTC().UnixNano() if len(seriesKeys) == 0 { return nil } @@ -974,43 +1006,63 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { bytesutil.Sort(seriesKeys) } - // Disable and abort running compactions so that tombstones added existing tsm - // files don't get removed. This would cause deleted measurements/series to - // re-appear once the compaction completed. We only disable the level compactions - // so that snapshotting does not stop while writing out tombstones. If it is stopped, - // and writing tombstones takes a long time, writes can get rejected due to the cache - // filling up. - e.disableLevelCompactions(true) - defer e.enableLevelCompactions(true) + // Min and max time in the engine are slightly different from the query language values. + if min == influxql.MinTime { + min = math.MinInt64 + } + if max == influxql.MaxTime { + max = math.MaxInt64 + } - tempKeys := seriesKeys[:] - deleteKeys := make([][]byte, 0, len(seriesKeys)) - // go through the keys in the file store - if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error { - seriesKey, _ := SeriesAndFieldFromCompositeKey(k) + // Run the delete on each TSM file in parallel + if err := e.FileStore.Apply(func(r TSMFile) error { + // See if this TSM file contains the keys and time range + minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1] + tsmMin, tsmMax := r.KeyRange() - // Both tempKeys and keys walked are sorted, skip any passed in keys - // that don't exist in our key set. - for len(tempKeys) > 0 && bytes.Compare(tempKeys[0], seriesKey) < 0 { - tempKeys = tempKeys[1:] + tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin) + tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax) + + overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0 + if !overlaps || !r.OverlapsTimeRange(min, max) { + return nil } - // Keys match, add the full series key to delete. - if len(tempKeys) > 0 && bytes.Equal(tempKeys[0], seriesKey) { - deleteKeys = append(deleteKeys, k) + // Delete each key we find in the file. We seek to the min key and walk from there. + batch := r.BatchDelete() + n := r.KeyCount() + var j int + for i := r.Seek(minKey); i < n; i++ { + indexKey, _ := r.KeyAt(i) + seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey) + + for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 { + j++ + } + + if j >= len(seriesKeys) { + break + } + if bytes.Equal(seriesKeys[j], seriesKey) { + if err := batch.DeleteRange([][]byte{indexKey}, min, max); err != nil { + batch.Rollback() + return err + } + } } + if err := batch.Commit(); err != nil { + return err + } return nil }); err != nil { return err } - if err := e.FileStore.DeleteRange(deleteKeys, min, max); err != nil { - return err - } + var abort = errors.New("iteration aborted") // sentinel error value // find the keys in the cache and remove them - walKeys := deleteKeys[:0] + deleteKeys := make([][]byte, 0, len(seriesKeys)) // ApplySerialEntryFn cannot return an error in this invocation. _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { @@ -1021,33 +1073,99 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { i := bytesutil.SearchBytes(seriesKeys, seriesKey) if i < len(seriesKeys) && bytes.Equal(seriesKey, seriesKeys[i]) { // k is the measurement + tags + sep + field - walKeys = append(walKeys, k) + deleteKeys = append(deleteKeys, k) } return nil }) - e.Cache.DeleteRange(walKeys, min, max) + e.Cache.DeleteRange(deleteKeys, min, max) // delete from the WAL - if _, err := e.WAL.DeleteRange(walKeys, min, max); err != nil { + if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil { return err } - // Have we deleted all points for the series? If so, we need to remove + // The series are deleted on disk, but the index may still say they exist. + // Depending on the the min,max time passed in, the series may or not actually + // exists now. To reconcile the index, we walk the series keys that still exists + // on disk and cross out any keys that match the passed in series. Any series + // left in the slice at the end do not exist and can be deleted from the index. + // Note: this is inherently racy if writes are occuring to the same measurement/series are + // being removed. A write could occur and exist in the cache at this point, but we + // would delete it from the index. + minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1] + if err := e.FileStore.Apply(func(r TSMFile) error { + tsmMin, tsmMax := r.KeyRange() + + tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin) + tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax) + + overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0 + if !overlaps || !r.OverlapsTimeRange(min, max) { + return nil + } + + n := r.KeyCount() + var j int + for i := r.Seek(minKey); i < n; i++ { + if j >= len(seriesKeys) { + return abort + } + + indexKey, _ := r.KeyAt(i) + seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey) + + cmp := bytes.Compare(seriesKeys[j], seriesKey) + for j < len(seriesKeys) && cmp < 0 { + cmp = bytes.Compare(seriesKeys[j], seriesKey) + j++ + } + + if j < len(seriesKeys) && cmp == 0 { + seriesKeys[j] = nil + j++ + } + return nil + } + + return nil + }); err != nil { + return err + } + + // Have we deleted all values for the series? If so, we need to remove // the series from the index. - existing, err := e.containsSeries(seriesKeys) - if err != nil { - return err - } + if len(seriesKeys) > 0 { + for _, k := range seriesKeys { + // This key was crossed out earlier, skip it + if k == nil { + continue + } - for k, exists := range existing { - if !exists { - if err := e.index.UnassignShard(k, e.id); err != nil { + // See if this series was found in the cache earlier + i := bytesutil.SearchBytes(deleteKeys, k) + + var hasCacheValues bool + // If there are multiple fields, they will have the same prefix. If any field + // has values, then we can't delete it from the index. + for i < len(deleteKeys) && bytes.HasPrefix(deleteKeys[i], k) { + if e.Cache.Values(deleteKeys[i]).Len() > 0 { + hasCacheValues = true + break + } + i++ + } + + // Some cache values still exists, leave the series in the index. + if hasCacheValues { + continue + } + + if err := e.index.UnassignShard(string(k), e.id, ts); err != nil { return err } } } - go e.index.Rebuild() return nil } @@ -1079,7 +1197,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error { } // Check the filestore. - return e.FileStore.WalkKeys(func(k []byte, typ byte) error { + return e.FileStore.WalkKeys(name, func(k []byte, typ byte) error { if bytes.HasPrefix(k, encodedName) { return abortErr } @@ -1097,13 +1215,11 @@ func (e *Engine) DeleteMeasurement(name []byte) error { // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) deleteMeasurement(name []byte) error { // Attempt to find the series keys. - keys, err := e.index.MeasurementSeriesKeysByExpr(name, nil) + itr, err := e.index.MeasurementSeriesKeysByExprIterator(name, nil) if err != nil { return err - } else if len(keys) > 0 { - if err := e.deleteSeries(keys); err != nil { - return err - } + } else if itr != nil { + return e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64) } return nil } @@ -1113,6 +1229,10 @@ func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error { return e.index.ForEachMeasurementName(fn) } +func (e *Engine) MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIterator, error) { + return e.index.MeasurementSeriesKeysByExprIterator(name, expr) +} + // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. func (e *Engine) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { return e.index.MeasurementSeriesKeysByExpr(name, expr) @@ -1442,61 +1562,6 @@ func (e *Engine) compactFull(grp CompactionGroup) bool { return false } -// onFileStoreReplace is callback handler invoked when the FileStore -// has replaced one set of TSM files with a new set. -func (e *Engine) onFileStoreReplace(newFiles []TSMFile) { - if e.index.Type() == tsi1.IndexName { - return - } - - // Load any new series keys to the index - readers := make([]chan seriesKey, 0, len(newFiles)) - for _, r := range newFiles { - ch := make(chan seriesKey, 1) - readers = append(readers, ch) - - go func(c chan seriesKey, r TSMFile) { - n := r.KeyCount() - for i := 0; i < n; i++ { - key, typ := r.KeyAt(i) - c <- seriesKey{key, typ} - } - close(c) - }(ch, r) - } - - // Merge and dedup all the series keys across each reader to reduce - // lock contention on the index. - merged := merge(readers...) - for v := range merged { - fieldType, err := tsmFieldTypeToInfluxQLDataType(v.typ) - if err != nil { - e.logger.Error(fmt.Sprintf("refresh index (1): %v", err)) - continue - } - - if err := e.addToIndexFromKey(v.key, fieldType); err != nil { - e.logger.Error(fmt.Sprintf("refresh index (2): %v", err)) - continue - } - } - - // load metadata from the Cache - e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error { - fieldType, err := entry.InfluxQLType() - if err != nil { - e.logger.Error(fmt.Sprintf("refresh index (3): %v", err)) - return nil - } - - if err := e.addToIndexFromKey(key, fieldType); err != nil { - e.logger.Error(fmt.Sprintf("refresh index (4): %v", err)) - return nil - } - return nil - }) -} - // compactionStrategy holds the details of what to do in a compaction. type compactionStrategy struct { group CompactionGroup @@ -1569,7 +1634,7 @@ func (s *compactionStrategy) compactGroup() { return } - if err := s.fileStore.ReplaceWithCallback(group, files, s.engine.onFileStoreReplace); err != nil { + if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil { s.logger.Info(fmt.Sprintf("error replacing new TSM files: %v", err)) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 994db85935..9430d925b1 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -128,7 +128,8 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { } // Remove series. - if err := e.DeleteSeriesRange([][]byte{[]byte("cpu,host=A")}, math.MinInt64, math.MaxInt64); err != nil { + itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { t.Fatalf("failed to delete series: %s", err.Error()) } @@ -607,7 +608,8 @@ func TestEngine_DeleteSeries(t *testing.T) { t.Fatalf("series count mismatch: exp %v, got %v", exp, got) } - if err := e.DeleteSeriesRange([][]byte{[]byte("cpu,host=A")}, math.MinInt64, math.MaxInt64); err != nil { + itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -667,7 +669,8 @@ func TestEngine_LastModified(t *testing.T) { t.Fatalf("expected time change, got %v, exp %v", got, exp) } - if err := e.DeleteSeriesRange([][]byte{[]byte("cpu,host=A")}, math.MinInt64, math.MaxInt64); err != nil { + itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -1081,3 +1084,28 @@ func ParseTags(s string) query.Tags { } return query.NewTags(m) } + +type seriesIterator struct { + keys [][]byte +} + +type series struct { + name []byte + tags models.Tags + deleted bool +} + +func (s series) Name() []byte { return s.name } +func (s series) Tags() models.Tags { return s.tags } +func (s series) Deleted() bool { return s.deleted } +func (s series) Expr() influxql.Expr { return nil } + +func (itr *seriesIterator) Next() tsdb.SeriesElem { + if len(itr.keys) == 0 { + return nil + } + name, tags := models.ParseKeyBytes(itr.keys[0]) + s := series{name: name, tags: tags} + itr.keys = itr.keys[1:] + return s +} diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 2203189494..e873cae6a7 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -8,6 +8,7 @@ import ( "math" "os" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -16,6 +17,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/pkg/metrics" "github.com/influxdata/influxdb/query" "go.uber.org/zap" @@ -50,6 +52,12 @@ type TSMFile interface { // key. Contains(key []byte) bool + // OverlapsTimeRange returns true if the time range of the file intersect min and max. + OverlapsTimeRange(min, max int64) bool + + // OverlapsKeyRange returns true if the key range of the file intersects min and max. + OverlapsKeyRange(min, max []byte) bool + // TimeRange returns the min and max time across all keys in the file. TimeRange() (int64, int64) @@ -62,6 +70,9 @@ type TSMFile interface { // KeyCount returns the number of distinct keys in the file. KeyCount() int + // Seek returns the position in the index with the key <= key. + Seek(key []byte) int + // KeyAt returns the key located at index position idx. KeyAt(idx int) ([]byte, byte) @@ -70,6 +81,10 @@ type TSMFile interface { // an error is returned. Type(key []byte) (byte, error) + // BatchDelete return a BatchDeleter that allows for multiple deletes in batches + // and group commit or rollback. + BatchDelete() BatchDeleter + // Delete removes the keys from the set of keys available in this file. Delete(keys [][]byte) error @@ -281,7 +296,7 @@ func (f *FileStore) NextGeneration() int { // WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key // exists in multiple files, it will be invoked for each file. -func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error { +func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) error { f.mu.RLock() if len(f.files) == 0 { f.mu.RUnlock() @@ -289,15 +304,28 @@ func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error { } readers := make([]chan seriesKey, 0, len(f.files)) + done := make(chan struct{}) for _, f := range f.files { ch := make(chan seriesKey, 1) readers = append(readers, ch) go func(c chan seriesKey, r TSMFile) { + + start := 0 + if len(seek) > 0 { + start = r.Seek(seek) + } n := r.KeyCount() - for i := 0; i < n; i++ { + for i := start; i < n; i++ { + key, typ := r.KeyAt(i) - c <- seriesKey{key, typ} + select { + case <-done: + // Abort iteration + break + case c <- seriesKey{key, typ}: + } + } close(ch) }(ch, f) @@ -305,13 +333,19 @@ func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error { f.mu.RUnlock() merged := merge(readers...) + var err error for v := range merged { - if err := fn(v.key, v.typ); err != nil { - return err + // Drain the remaing values so goroutines can exit + if err != nil { + continue + } + if err = fn(v.key, v.typ); err != nil { + // Signal that we should stop iterating + close(done) } } - return nil + return err } // Keys returns all keys and types for all files in the file store. @@ -320,7 +354,7 @@ func (f *FileStore) Keys() map[string]byte { defer f.mu.RUnlock() uniqueKeys := map[string]byte{} - if err := f.WalkKeys(func(key []byte, typ byte) error { + if err := f.WalkKeys(nil, func(key []byte, typ byte) error { uniqueKeys[string(key)] = typ return nil }); err != nil { @@ -348,11 +382,65 @@ func (f *FileStore) Delete(keys [][]byte) error { return f.DeleteRange(keys, math.MinInt64, math.MaxInt64) } -// DeleteRange removes the values for keys between timestamps min and max. +func (f *FileStore) Apply(fn func(r TSMFile) error) error { + // Limit apply fn to number of cores + limiter := limiter.NewFixed(runtime.GOMAXPROCS(0)) + + f.mu.RLock() + errC := make(chan error, len(f.files)) + + for _, f := range f.files { + go func(r TSMFile) { + limiter.Take() + defer limiter.Release() + + r.Ref() + defer r.Unref() + errC <- fn(r) + }(f) + } + + var applyErr error + for i := 0; i < cap(errC); i++ { + if err := <-errC; err != nil { + applyErr = err + } + } + f.mu.RUnlock() + + f.mu.Lock() + f.lastModified = time.Now().UTC() + f.lastFileStats = nil + f.mu.Unlock() + + return applyErr +} + +// DeleteRange removes the values for keys between timestamps min and max. This should only +// be used with smaller batches of series keys. func (f *FileStore) DeleteRange(keys [][]byte, min, max int64) error { - if err := f.walkFiles(func(tsm TSMFile) error { - return tsm.DeleteRange(keys, min, max) - }); err != nil { + var batches BatchDeleters + f.mu.RLock() + for _, f := range f.files { + if f.OverlapsTimeRange(min, max) { + batches = append(batches, f.BatchDelete()) + } + } + f.mu.RUnlock() + + if len(batches) == 0 { + return nil + } + + if err := func() error { + if err := batches.DeleteRange(keys, min, max); err != nil { + return err + } + + return batches.Commit() + }(); err != nil { + // Rollback the deletes + _ = batches.Rollback() return err } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 5c23193dc8..d0517cf56b 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" "strings" + "sync/atomic" "testing" "time" @@ -2539,6 +2540,43 @@ func TestFileStore_Delete(t *testing.T) { } } +func TestFileStore_Apply(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := tsm1.NewFileStore(dir) + + // Setup 3 files + data := []keyValues{ + keyValues{"cpu,host=server2#!~#value", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + keyValues{"cpu,host=server1#!~#value", []tsm1.Value{tsm1.NewValue(1, 2.0)}}, + keyValues{"mem,host=server1#!~#value", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + fs.Replace(nil, files) + + keys := fs.Keys() + if got, exp := len(keys), 3; got != exp { + t.Fatalf("key length mismatch: got %v, exp %v", got, exp) + } + + var n int64 + if err := fs.Apply(func(r tsm1.TSMFile) error { + atomic.AddInt64(&n, 1) + return nil + }); err != nil { + t.Fatalf("unexpected error deleting: %v", err) + } + + if got, exp := n, int64(3); got != exp { + t.Fatalf("apply mismatch: got %v, exp %v", got, exp) + } +} + func TestFileStore_Stats(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 5077d61ed6..df96b30210 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -18,6 +18,10 @@ import ( // ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used. var ErrFileInUse = fmt.Errorf("file still in use") +// nilOffset is the value written to the offsets to indicate that position is deleted. The value is the max +// uint32 which is an invalid position. We don't use 0 as 0 is actually a valid position. +var nilOffset = []byte{255, 255, 255, 255} + // TSMReader is a reader for a TSM file. type TSMReader struct { // refs is the count of active references to this reader. @@ -39,6 +43,9 @@ type TSMReader struct { // lastModified is the last time this file was modified on disk lastModified int64 + + // deleteMu limits concurrent deletes + deleteMu sync.Mutex } // TSMIndex represent the index section of a TSM file. The index records all @@ -50,6 +57,10 @@ type TSMIndex interface { // DeleteRange removes the given keys with data between minTime and maxTime from the index. DeleteRange(keys [][]byte, minTime, maxTime int64) + // ContainsKey returns true if the given key may exist in the index. This func is faster than + // Contains but, may return false positives. + ContainsKey(key []byte) bool + // Contains return true if the given key exists in the index. Contains(key []byte) bool @@ -77,6 +88,9 @@ type TSMIndex interface { // KeyCount returns the count of unique keys in the index. KeyCount() int + // Seek returns the position in the index where key <= value in the index. + Seek(key []byte) int + // OverlapsTimeRange returns true if the time range of the file intersect min and max. OverlapsTimeRange(min, max int64) bool @@ -229,7 +243,7 @@ func NewTSMReader(f *os.File) (*TSMReader, error) { } t.index = index - t.tombstoner = &Tombstoner{Path: t.Path()} + t.tombstoner = &Tombstoner{Path: t.Path(), FilterFn: index.ContainsKey} if err := t.applyTombstones(); err != nil { return nil, err @@ -250,12 +264,22 @@ func (t *TSMReader) applyTombstones() error { batch = batch[:0] } } - batch = append(batch, ts.Key) + + // Copy the tombstone key and re-use the buffers to avoid allocations + n := len(batch) + batch = batch[:n+1] + if cap(batch[n]) < len(ts.Key) { + batch[n] = make([]byte, len(ts.Key)) + } else { + batch[n] = batch[n][:len(ts.Key)] + } + copy(batch[n], ts.Key) if len(batch) >= 4096 { t.index.DeleteRange(batch, prev.Min, prev.Max) batch = batch[:0] } + prev = ts return nil }); err != nil { @@ -292,6 +316,10 @@ func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { return t.index.KeyAt(idx) } +func (t *TSMReader) Seek(key []byte) int { + return t.index.Seek(key) +} + // ReadAt returns the values corresponding to the given index entry. func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { t.mu.RLock() @@ -455,23 +483,12 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { return nil } - // If the keys can't exist in this TSM file, skip it. - minKey, maxKey := keys[0], keys[len(keys)-1] - if !t.index.OverlapsKeyRange(minKey, maxKey) { - return nil - } - - // If the timerange can't exist in this TSM file, skip it. - if !t.index.OverlapsTimeRange(minTime, maxTime) { - return nil - } - - if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil { + batch := t.BatchDelete() + if err := batch.DeleteRange(keys, minTime, maxTime); err != nil { + batch.Rollback() return err } - - t.index.DeleteRange(keys, minTime, maxTime) - return nil + return batch.Commit() } // Delete deletes blocks indicated by keys. @@ -480,10 +497,24 @@ func (t *TSMReader) Delete(keys [][]byte) error { return err } + if err := t.tombstoner.Flush(); err != nil { + return err + } + t.index.Delete(keys) return nil } +// OverlapsTimeRange returns true if the time range of the file intersect min and max. +func (t *TSMReader) OverlapsTimeRange(min, max int64) bool { + return t.index.OverlapsTimeRange(min, max) +} + +// OverlapsKeyRange returns true if the key range of the file intersect min and max. +func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool { + return t.index.OverlapsKeyRange(min, max) +} + // TimeRange returns the min and max time across all keys in the file. func (t *TSMReader) TimeRange() (int64, int64) { return t.index.TimeRange() @@ -583,6 +614,110 @@ func (t *TSMReader) BlockIterator() *BlockIterator { } } +type BatchDeleter interface { + DeleteRange(keys [][]byte, min, max int64) error + Commit() error + Rollback() error +} + +type batchDelete struct { + r *TSMReader +} + +func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error { + if len(keys) == 0 { + return nil + } + + // If the keys can't exist in this TSM file, skip it. + minKey, maxKey := keys[0], keys[len(keys)-1] + if !b.r.index.OverlapsKeyRange(minKey, maxKey) { + return nil + } + + // If the timerange can't exist in this TSM file, skip it. + if !b.r.index.OverlapsTimeRange(minTime, maxTime) { + return nil + } + + if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil { + return err + } + + return nil +} + +func (b *batchDelete) Commit() error { + defer b.r.deleteMu.Unlock() + if err := b.r.tombstoner.Flush(); err != nil { + return err + } + + return b.r.applyTombstones() +} + +func (b *batchDelete) Rollback() error { + defer b.r.deleteMu.Unlock() + return b.r.tombstoner.Rollback() +} + +// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time. +// Callers must either Commit or Rollback the operation. +func (r *TSMReader) BatchDelete() BatchDeleter { + r.deleteMu.Lock() + return &batchDelete{r: r} +} + +type BatchDeleters []BatchDeleter + +func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error { + errC := make(chan error, len(a)) + for _, b := range a { + go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b) + } + + var err error + for i := 0; i < len(a); i++ { + dErr := <-errC + if dErr != nil { + err = dErr + } + } + return err +} + +func (a BatchDeleters) Commit() error { + errC := make(chan error, len(a)) + for _, b := range a { + go func(b BatchDeleter) { errC <- b.Commit() }(b) + } + + var err error + for i := 0; i < len(a); i++ { + dErr := <-errC + if dErr != nil { + err = dErr + } + } + return err +} + +func (a BatchDeleters) Rollback() error { + errC := make(chan error, len(a)) + for _, b := range a { + go func(b BatchDeleter) { errC <- b.Rollback() }(b) + } + + var err error + for i := 0; i < len(a); i++ { + dErr := <-errC + if dErr != nil { + err = dErr + } + } + return err +} + // indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This // implementation can be used for indexes that may be MMAPed into memory. type indirectIndex struct { @@ -658,14 +793,52 @@ func NewIndirectIndex() *indirectIndex { } } -// search returns the index of i in offsets for where key is located. If key is not +func (d *indirectIndex) offset(i int) int { + if i < 0 || i+4 > len(d.offsets) { + return -1 + } + return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4])) +} + +func (d *indirectIndex) Seek(key []byte) int { + d.mu.RLock() + defer d.mu.RUnlock() + return d.searchOffset(key) +} + +// searchOffset searches the offsets slice for key and returns the position in +// offsets where key would exist. +func (d *indirectIndex) searchOffset(key []byte) int { + // We use a binary search across our indirect offsets (pointers to all the keys + // in the index slice). + i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { + // i is the position in offsets we are at so get offset it points to + offset := int32(binary.BigEndian.Uint32(x)) + + // It's pointing to the start of the key which is a 2 byte length + keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) + + // See if it matches + return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 + }) + + // See if we might have found the right index + if i < len(d.offsets) { + return int(i / 4) + } + + // The key is not in the index. i is the index where it would be inserted so return + // a value outside our offset range. + return int(len(d.offsets)) / 4 +} + +// search returns the byte position of key in the index. If key is not // in the index, len(index) is returned. func (d *indirectIndex) search(key []byte) int { // We use a binary search across our indirect offsets (pointers to all the keys // in the index slice). i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { // i is the position in offsets we are at so get offset it points to - //offset := d.offsets[i] offset := int32(binary.BigEndian.Uint32(x)) // It's pointing to the start of the key which is a 2 byte length @@ -695,11 +868,34 @@ func (d *indirectIndex) search(key []byte) int { return len(d.b) } +// ContainsKey returns true of key may exist in this index. +func (d *indirectIndex) ContainsKey(key []byte) bool { + return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 +} + // Entries returns all index entries for a key. func (d *indirectIndex) Entries(key []byte) []IndexEntry { return d.ReadEntries(key, nil) } +func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) { + n, k := readKey(d.b[ofs:]) + + // Read and return all the entries + ofs += n + var ie indexEntries + if entries != nil { + ie.entries = *entries + } + if _, err := readEntries(d.b[ofs:], &ie); err != nil { + panic(fmt.Sprintf("error reading entries: %v", err)) + } + if entries != nil { + *entries = ie.entries + } + return k, ie.entries +} + // ReadEntries returns all index entries for a key. func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { d.mu.RLock() @@ -707,8 +903,7 @@ func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEn ofs := d.search(key) if ofs < len(d.b) { - n, k := readKey(d.b[ofs:]) - + k, entries := d.readEntriesAt(ofs, entries) // The search may have returned an i == 0 which could indicated that the value // searched should be inserted at position 0. Make sure the key in the index // matches the search value. @@ -716,19 +911,7 @@ func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEn return nil } - // Read and return all the entries - ofs += n - var ie indexEntries - if entries != nil { - ie.entries = *entries - } - if _, err := readEntries(d.b[ofs:], &ie); err != nil { - panic(fmt.Sprintf("error reading entries: %v", err)) - } - if entries != nil { - *entries = ie.entries - } - return ie.entries + return entries } // The key is not in the index. i is the index where it would be inserted. @@ -809,13 +992,11 @@ func (d *indirectIndex) Delete(keys [][]byte) { bytesutil.Sort(keys) } - d.mu.Lock() - defer d.mu.Unlock() - // Both keys and offsets are sorted. Walk both in order and skip // any keys that exist in both. - var j int - for i := 0; i+4 <= len(d.offsets); i += 4 { + d.mu.Lock() + start := d.searchOffset(keys[0]) + for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 { offset := binary.BigEndian.Uint32(d.offsets[i : i+4]) _, indexKey := readKey(d.b[offset:]) @@ -825,13 +1006,11 @@ func (d *indirectIndex) Delete(keys [][]byte) { if len(keys) > 0 && bytes.Equal(keys[0], indexKey) { keys = keys[1:] - continue + copy(d.offsets[i:i+4], nilOffset[:]) } - - copy(d.offsets[j:j+4], d.offsets[i:i+4]) - j += 4 } - d.offsets = d.offsets[:j] + d.offsets = bytesutil.Pack(d.offsets, 4, 255) + d.mu.Unlock() } // DeleteRange removes the given keys with data between minTime and maxTime from the index. @@ -841,6 +1020,10 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { return } + if !bytesutil.IsSorted(keys) { + bytesutil.Sort(keys) + } + // If we're deleting the max time range, just use tombstoning to remove the // key from the offsets slice if minTime == math.MinInt64 && maxTime == math.MaxInt64 { @@ -856,23 +1039,42 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { fullKeys := make([][]byte, 0, len(keys)) tombstones := map[string][]TimeRange{} - for i, k := range keys { - // Is the range passed in outside the time range for this key? - entries := d.Entries(k) + var ie []IndexEntry + + for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ { + k, entries := d.readEntriesAt(d.offset(i), &ie) + + // Skip any keys that don't exist. These are less than the current key. + for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 { + keys = keys[1:] + } + + // No more keys to delete, we're done. + if len(keys) == 0 { + break + } + + // If the current key is greater than the index one, continue to the next + // index key. + if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 { + continue + } // If multiple tombstones are saved for the same key if len(entries) == 0 { continue } + // Is the time range passed outside of the time range we've have stored for this key? min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime if minTime > max || maxTime < min { continue } - // Is the range passed in cover every value for the key? + // Does the range passed in cover every value for the key? if minTime <= min && maxTime >= max { - fullKeys = append(fullKeys, keys[i]) + fullKeys = append(fullKeys, keys[0]) + keys = keys[1:] continue } @@ -926,7 +1128,8 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { // If we have a fully deleted series, delete it all of it. if minTs <= min && maxTs >= max { - fullKeys = append(fullKeys, keys[i]) + fullKeys = append(fullKeys, keys[0]) + keys = keys[1:] continue } } diff --git a/tsdb/engine/tsm1/tombstone.go b/tsdb/engine/tsm1/tombstone.go index 2c86a4a58c..3fee1709f4 100644 --- a/tsdb/engine/tsm1/tombstone.go +++ b/tsdb/engine/tsm1/tombstone.go @@ -4,6 +4,8 @@ import ( "bufio" "compress/gzip" "encoding/binary" + "errors" + "fmt" "io" "io/ioutil" "math" @@ -17,8 +19,11 @@ const ( headerSize = 4 v2header = 0x1502 v3header = 0x1503 + v4header = 0x1504 ) +var errIncompatibleVersion = errors.New("incompatible v4 version") + // Tombstoner records tombstones when entries are deleted. type Tombstoner struct { mu sync.RWMutex @@ -27,11 +32,24 @@ type Tombstoner struct { // full path to a TSM file. Path string + FilterFn func(k []byte) bool + // cache of the stats for this tombstone fileStats []FileStat // indicates that the stats may be out of sync with what is on disk and they // should be refreshed. statsLoaded bool + + // Tombstones that have been written but not flushed to disk yet. + tombstones []Tombstone + + // These are references used for pending writes that have not been committed. If + // these are nil, then no pending writes are in progress. + gz *gzip.Writer + bw *bufio.Writer + pendingFile *os.File + tmp [8]byte + lastAppliedOffset int64 } // Tombstone represents an individual deletion. @@ -51,6 +69,10 @@ func (t *Tombstoner) Add(keys [][]byte) error { // AddRange adds all keys to the tombstone specifying only the data between min and max to be removed. func (t *Tombstoner) AddRange(keys [][]byte, min, max int64) error { + for t.FilterFn != nil && len(keys) > 0 && !t.FilterFn(keys[0]) { + keys = keys[1:] + } + if len(keys) == 0 { return nil } @@ -66,31 +88,63 @@ func (t *Tombstoner) AddRange(keys [][]byte, min, max int64) error { t.statsLoaded = false - tombstones, err := t.readTombstone() - if err != nil { - return nil - } + if err := t.prepareV4(); err == errIncompatibleVersion { + if cap(t.tombstones) < len(t.tombstones)+len(keys) { + ts := make([]Tombstone, len(t.tombstones), len(t.tombstones)+len(keys)) + copy(ts, t.tombstones) + t.tombstones = ts + } - if cap(tombstones) < len(tombstones)+len(keys) { - ts := make([]Tombstone, len(tombstones), len(tombstones)+len(keys)) - copy(ts, tombstones) - tombstones = ts + for _, k := range keys { + if t.FilterFn != nil && !t.FilterFn(k) { + continue + } + + t.tombstones = append(t.tombstones, Tombstone{ + Key: k, + Min: min, + Max: max, + }) + } + return t.writeTombstoneV3(t.tombstones) + + } else if err != nil { + return err } for _, k := range keys { - tombstones = append(tombstones, Tombstone{ + if t.FilterFn != nil && !t.FilterFn(k) { + continue + } + + if err := t.writeTombstone(t.gz, Tombstone{ Key: k, Min: min, Max: max, - }) + }); err != nil { + return err + } } - return t.writeTombstone(tombstones) + return nil } -// ReadAll returns all the tombstones in the Tombstoner's directory. -func (t *Tombstoner) ReadAll() ([]Tombstone, error) { - return t.readTombstone() +func (t *Tombstoner) Flush() error { + t.mu.Lock() + defer t.mu.Unlock() + + if err := t.commit(); err != nil { + // Reset our temp references and clean up. + _ = t.rollback() + return err + } + return nil +} + +func (t *Tombstoner) Rollback() error { + t.mu.Lock() + defer t.mu.Unlock() + return t.rollback() } // Delete removes all the tombstone files from disk. @@ -101,13 +155,19 @@ func (t *Tombstoner) Delete() error { return err } t.statsLoaded = false + t.lastAppliedOffset = 0 + return nil } // HasTombstones return true if there are any tombstone entries recorded. func (t *Tombstoner) HasTombstones() bool { files := t.TombstoneFiles() - return len(files) > 0 && files[0].Size > 0 + t.mu.RLock() + n := len(t.tombstones) + t.mu.RUnlock() + + return len(files) > 0 && files[0].Size > 0 || n > 0 } // TombstoneFiles returns any tombstone files associated with Tombstoner's TSM file. @@ -146,6 +206,9 @@ func (t *Tombstoner) TombstoneFiles() []FileStat { // Walk calls fn for every Tombstone under the Tombstoner. func (t *Tombstoner) Walk(fn func(t Tombstone) error) error { + t.mu.Lock() + defer t.mu.Unlock() + f, err := os.Open(t.tombstonePath()) if os.IsNotExist(err) { return nil @@ -167,7 +230,9 @@ func (t *Tombstoner) Walk(fn func(t Tombstone) error) error { } header := binary.BigEndian.Uint32(b[:]) - if header == v3header { + if header == v4header { + return t.readTombstoneV4(f, fn) + } else if header == v3header { return t.readTombstoneV3(f, fn) } else if header == v2header { return t.readTombstoneV2(f, fn) @@ -175,7 +240,7 @@ func (t *Tombstoner) Walk(fn func(t Tombstone) error) error { return t.readTombstoneV1(f, fn) } -func (t *Tombstoner) writeTombstone(tombstones []Tombstone) error { +func (t *Tombstoner) writeTombstoneV3(tombstones []Tombstone) error { tmp, err := ioutil.TempFile(filepath.Dir(t.Path), "tombstone") if err != nil { return err @@ -192,59 +257,156 @@ func (t *Tombstoner) writeTombstone(tombstones []Tombstone) error { } gz := gzip.NewWriter(bw) - - for _, t := range tombstones { - binary.BigEndian.PutUint32(b[:4], uint32(len(t.Key))) - if _, err := gz.Write(b[:4]); err != nil { - return err - } - if _, err := gz.Write([]byte(t.Key)); err != nil { - return err - } - binary.BigEndian.PutUint64(b[:], uint64(t.Min)) - if _, err := gz.Write(b[:]); err != nil { - return err - } - - binary.BigEndian.PutUint64(b[:], uint64(t.Max)) - if _, err := gz.Write(b[:]); err != nil { + for _, ts := range tombstones { + if err := t.writeTombstone(gz, ts); err != nil { return err } } - if err := gz.Close(); err != nil { + t.gz = gz + t.bw = bw + t.pendingFile = tmp + t.tombstones = t.tombstones[:0] + + return t.commit() +} + +func (t *Tombstoner) prepareV4() error { + if t.pendingFile != nil { + return nil + } + + tmpPath := fmt.Sprintf("%s.%s", t.tombstonePath(), CompactionTempExtension) + tmp, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) + if err != nil { return err } - if err := bw.Flush(); err != nil { + removeTmp := func() { + tmp.Close() + os.Remove(tmp.Name()) + } + + // Copy the existing v4 file if it exists + f, err := os.Open(t.tombstonePath()) + if !os.IsNotExist(err) { + defer f.Close() + var b [4]byte + if n, err := f.Read(b[:]); n == 4 && err == nil { + header := binary.BigEndian.Uint32(b[:]) + // There is an existing tombstone on disk and it's not a v3. Just rewrite it as a v3 + // version again. + if header != v4header { + removeTmp() + return errIncompatibleVersion + } + + // Seek back to the beginning we copy the header + if _, err := f.Seek(0, io.SeekStart); err != nil { + removeTmp() + return err + } + + // Copy the while file + if _, err := io.Copy(tmp, f); err != nil { + f.Close() + removeTmp() + return err + } + } + } else if err != nil && !os.IsNotExist(err) { + removeTmp() + return err + } + + var b [8]byte + bw := bufio.NewWriterSize(tmp, 64*1024) + + // Write the header only if the file is new + if os.IsNotExist(err) { + binary.BigEndian.PutUint32(b[:4], v4header) + if _, err := bw.Write(b[:4]); err != nil { + removeTmp() + return err + } + } + + // Write the tombstones + gz := gzip.NewWriter(bw) + + t.pendingFile = tmp + t.gz = gz + t.bw = bw + + return nil +} + +// writeTombstoneV4 writes v3 files that are concatenated together. A v4 header is +// written to indicated this is a v4 file. +func (t *Tombstoner) writeTombstoneV4(tombstones []Tombstone) error { + if err := t.prepareV4(); err == errIncompatibleVersion { + return t.writeTombstoneV3(tombstones) + } else if err != nil { + return err + } + + for _, ts := range tombstones { + if err := t.writeTombstone(t.gz, ts); err != nil { + return err + } + } + + return t.commit() +} + +func (t *Tombstoner) commit() error { + // No pending writes + if t.pendingFile == nil { + return nil + } + + if err := t.gz.Close(); err != nil { + return err + } + + if err := t.bw.Flush(); err != nil { return err } // fsync the file to flush the write - if err := tmp.Sync(); err != nil { + if err := t.pendingFile.Sync(); err != nil { return err } - tmpFilename := tmp.Name() - tmp.Close() + tmpFilename := t.pendingFile.Name() + t.pendingFile.Close() if err := renameFile(tmpFilename, t.tombstonePath()); err != nil { return err } - return syncDir(filepath.Dir(t.tombstonePath())) + if err := syncDir(filepath.Dir(t.tombstonePath())); err != nil { + return err + } + + t.pendingFile = nil + t.bw = nil + t.gz = nil + + return nil } -func (t *Tombstoner) readTombstone() ([]Tombstone, error) { - var tombstones []Tombstone - - if err := t.Walk(func(t Tombstone) error { - tombstones = append(tombstones, t) +func (t *Tombstoner) rollback() error { + if t.pendingFile == nil { return nil - }); err != nil { - return nil, err } - return tombstones, nil + + tmpFilename := t.pendingFile.Name() + t.pendingFile.Close() + t.gz = nil + t.bw = nil + t.pendingFile = nil + return os.Remove(tmpFilename) } // readTombstoneV1 reads the first version of tombstone files that were not @@ -266,7 +428,17 @@ func (t *Tombstoner) readTombstoneV1(f *os.File, fn func(t Tombstone) error) err return err } } - return r.Err() + + if err := r.Err(); err != nil { + return err + } + + for _, t := range t.tombstones { + if err := fn(t); err != nil { + return err + } + } + return nil } // readTombstoneV2 reads the second version of tombstone files that are capable @@ -292,7 +464,7 @@ func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) err b := make([]byte, 4096) for { if n >= size { - return nil + break } if _, err = f.Read(b[:4]); err != nil { @@ -332,6 +504,13 @@ func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) err return err } } + + for _, t := range t.tombstones { + if err := fn(t); err != nil { + return err + } + } + return nil } // readTombstoneV3 reads the third version of tombstone files that are capable @@ -357,7 +536,7 @@ func (t *Tombstoner) readTombstoneV3(f *os.File, fn func(t Tombstone) error) err b := make([]byte, 4096) for { if _, err = io.ReadFull(gr, b[:4]); err == io.EOF || err == io.ErrUnexpectedEOF { - return nil + break } else if err != nil { return err } @@ -395,6 +574,109 @@ func (t *Tombstoner) readTombstoneV3(f *os.File, fn func(t Tombstone) error) err return err } } + + for _, t := range t.tombstones { + if err := fn(t); err != nil { + return err + } + } + return nil +} + +// readTombstoneV4 reads the fourth version of tombstone files that are capable +// of storing multiple v3 files appended together. +func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) error { + // Skip header, already checked earlier + if t.lastAppliedOffset != 0 { + if _, err := f.Seek(t.lastAppliedOffset, io.SeekStart); err != nil { + return err + } + } else { + if _, err := f.Seek(headerSize, io.SeekStart); err != nil { + return err + } + } + var ( + min, max int64 + key []byte + ) + + br := bufio.NewReaderSize(f, 64*1024) + gr, err := gzip.NewReader(br) + if err == io.EOF { + return nil + } else if err != nil { + return err + } + defer gr.Close() + + b := make([]byte, 4096) + for { + gr.Multistream(false) + if err := func() error { + for { + if _, err = io.ReadFull(gr, b[:4]); err == io.EOF || err == io.ErrUnexpectedEOF { + return nil + } else if err != nil { + return err + } + + keyLen := int(binary.BigEndian.Uint32(b[:4])) + if keyLen > len(b)+16 { + b = make([]byte, keyLen+16) + } + + if _, err := io.ReadFull(gr, b[:keyLen]); err != nil { + return err + } + + // Copy the key since b is re-used + key = b[:keyLen] + + minBuf := b[keyLen : keyLen+8] + maxBuf := b[keyLen+8 : keyLen+16] + if _, err := io.ReadFull(gr, minBuf); err != nil { + return err + } + + min = int64(binary.BigEndian.Uint64(minBuf)) + if _, err := io.ReadFull(gr, maxBuf); err != nil { + return err + } + + max = int64(binary.BigEndian.Uint64(maxBuf)) + if err := fn(Tombstone{ + Key: key, + Min: min, + Max: max, + }); err != nil { + return err + } + } + }(); err != nil { + return err + } + + for _, t := range t.tombstones { + if err := fn(t); err != nil { + return err + } + } + + err = gr.Reset(br) + if err == io.EOF { + break + } + } + + // Save the position of tombstone file so we don't re-apply the same set again if there are + // more deletes. + pos, err := f.Seek(0, io.SeekCurrent) + if err != nil { + return err + } + t.lastAppliedOffset = pos + return nil } func (t *Tombstoner) tombstonePath() string { @@ -414,3 +696,23 @@ func (t *Tombstoner) tombstonePath() string { // Append the "tombstone" suffix to create a 0000001.tombstone file return filepath.Join(filepath.Dir(t.Path), filename+".tombstone") } + +func (t *Tombstoner) writeTombstone(dst io.Writer, ts Tombstone) error { + binary.BigEndian.PutUint32(t.tmp[:4], uint32(len(ts.Key))) + if _, err := dst.Write(t.tmp[:4]); err != nil { + return err + } + if _, err := dst.Write([]byte(ts.Key)); err != nil { + return err + } + binary.BigEndian.PutUint64(t.tmp[:], uint64(ts.Min)) + if _, err := dst.Write(t.tmp[:]); err != nil { + return err + } + + binary.BigEndian.PutUint64(t.tmp[:], uint64(ts.Max)) + if _, err := dst.Write(t.tmp[:]); err != nil { + return err + } + return nil +} diff --git a/tsdb/engine/tsm1/tombstone_test.go b/tsdb/engine/tsm1/tombstone_test.go index 93d9b00971..6f4961b661 100644 --- a/tsdb/engine/tsm1/tombstone_test.go +++ b/tsdb/engine/tsm1/tombstone_test.go @@ -15,11 +15,7 @@ func TestTombstoner_Add(t *testing.T) { f := MustTempFile(dir) ts := &tsm1.Tombstoner{Path: f.Name()} - entries, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } - + entries := mustReadAll(ts) if got, exp := len(entries), 0; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } @@ -31,11 +27,11 @@ func TestTombstoner_Add(t *testing.T) { ts.Add([][]byte{[]byte("foo")}) - entries, err = ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) } + entries = mustReadAll(ts) stats = ts.TombstoneFiles() if got, exp := len(stats), 1; got != exp { t.Fatalf("stat length mismatch: got %v, exp %v", got, exp) @@ -63,11 +59,7 @@ func TestTombstoner_Add(t *testing.T) { // Use a new Tombstoner to verify values are persisted ts = &tsm1.Tombstoner{Path: f.Name()} - entries, err = ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } - + entries = mustReadAll(ts) if got, exp := len(entries), 1; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } @@ -77,6 +69,82 @@ func TestTombstoner_Add(t *testing.T) { } } +func TestTombstoner_Add_Multiple(t *testing.T) { + dir := MustTempDir() + defer func() { os.RemoveAll(dir) }() + + f := MustTempFile(dir) + ts := &tsm1.Tombstoner{Path: f.Name()} + + entries := mustReadAll(ts) + if got, exp := len(entries), 0; got != exp { + t.Fatalf("length mismatch: got %v, exp %v", got, exp) + } + + stats := ts.TombstoneFiles() + if got, exp := len(stats), 0; got != exp { + t.Fatalf("stat length mismatch: got %v, exp %v", got, exp) + } + + ts.Add([][]byte{[]byte("foo")}) + + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) + } + + ts.Add([][]byte{[]byte("bar")}) + + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) + } + + entries = mustReadAll(ts) + stats = ts.TombstoneFiles() + if got, exp := len(stats), 1; got != exp { + t.Fatalf("stat length mismatch: got %v, exp %v", got, exp) + } + + if stats[0].Size == 0 { + t.Fatalf("got size %v, exp > 0", stats[0].Size) + } + + if stats[0].LastModified == 0 { + t.Fatalf("got lastModified %v, exp > 0", stats[0].LastModified) + } + + if stats[0].Path == "" { + t.Fatalf("got path %v, exp != ''", stats[0].Path) + } + + if got, exp := len(entries), 2; got != exp { + t.Fatalf("length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := string(entries[0].Key), "foo"; got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } + + if got, exp := string(entries[1].Key), "bar"; got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } + + // Use a new Tombstoner to verify values are persisted + ts = &tsm1.Tombstoner{Path: f.Name()} + entries = mustReadAll(ts) + if got, exp := len(entries), 2; got != exp { + t.Fatalf("length mismatch: got %v, exp %v", got, exp) + } + + if got, exp := string(entries[0].Key), "foo"; got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } + + if got, exp := string(entries[1].Key), "bar"; got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } + +} + func TestTombstoner_Add_Empty(t *testing.T) { dir := MustTempDir() defer func() { os.RemoveAll(dir) }() @@ -84,24 +152,20 @@ func TestTombstoner_Add_Empty(t *testing.T) { f := MustTempFile(dir) ts := &tsm1.Tombstoner{Path: f.Name()} - entries, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } - + entries := mustReadAll(ts) if got, exp := len(entries), 0; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } ts.Add([][]byte{}) - // Use a new Tombstoner to verify values are persisted - ts = &tsm1.Tombstoner{Path: f.Name()} - entries, err = ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing tombstone: %v", err) } + // Use a new Tombstoner to verify values are persisted + ts = &tsm1.Tombstoner{Path: f.Name()} + entries = mustReadAll(ts) if got, exp := len(entries), 0; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } @@ -122,13 +186,13 @@ func TestTombstoner_Delete(t *testing.T) { ts.Add([][]byte{[]byte("foo")}) - // Use a new Tombstoner to verify values are persisted - ts = &tsm1.Tombstoner{Path: f.Name()} - entries, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) + if err := ts.Flush(); err != nil { + t.Fatalf("unexpected error flushing: %v", err) } + // Use a new Tombstoner to verify values are persisted + ts = &tsm1.Tombstoner{Path: f.Name()} + entries := mustReadAll(ts) if got, exp := len(entries), 1; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } @@ -147,11 +211,7 @@ func TestTombstoner_Delete(t *testing.T) { } ts = &tsm1.Tombstoner{Path: f.Name()} - entries, err = ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } - + entries = mustReadAll(ts) if got, exp := len(entries), 0; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } @@ -173,15 +233,11 @@ func TestTombstoner_ReadV1(t *testing.T) { ts := &tsm1.Tombstoner{Path: f.Name()} - _, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } + // Read once + _ = mustReadAll(ts) - entries, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } + // Read again + entries := mustReadAll(ts) if got, exp := len(entries), 1; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) @@ -193,11 +249,7 @@ func TestTombstoner_ReadV1(t *testing.T) { // Use a new Tombstoner to verify values are persisted ts = &tsm1.Tombstoner{Path: f.Name()} - entries, err = ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } - + entries = mustReadAll(ts) if got, exp := len(entries), 1; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } @@ -220,17 +272,27 @@ func TestTombstoner_ReadEmptyV1(t *testing.T) { ts := &tsm1.Tombstoner{Path: f.Name()} - _, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } - - entries, err := ts.ReadAll() - if err != nil { - fatal(t, "ReadAll", err) - } + _ = mustReadAll(ts) + entries := mustReadAll(ts) if got, exp := len(entries), 0; got != exp { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } } + +func mustReadAll(t *tsm1.Tombstoner) []tsm1.Tombstone { + var tombstones []tsm1.Tombstone + if err := t.Walk(func(t tsm1.Tombstone) error { + b := make([]byte, len(t.Key)) + copy(b, t.Key) + tombstones = append(tombstones, tsm1.Tombstone{ + Min: t.Min, + Max: t.Max, + Key: b, + }) + return nil + }); err != nil { + panic(err) + } + return tombstones +} diff --git a/tsdb/index.go b/tsdb/index.go index cdd3fc4198..d7615d76d4 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -27,7 +27,7 @@ type Index interface { InitializeSeries(key, name []byte, tags models.Tags) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error - DropSeries(key []byte) error + DropSeries(key []byte, ts int64) error SeriesSketches() (estimator.Sketch, estimator.Sketch, error) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) @@ -42,6 +42,7 @@ type Index interface { TagKeyCardinality(name, key []byte) int // InfluxQL system iterators + MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (SeriesIterator, error) MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) @@ -54,7 +55,7 @@ type Index interface { // To be removed w/ tsi1. SetFieldName(measurement []byte, name string) AssignShard(k string, shardID uint64) - UnassignShard(k string, shardID uint64) error + UnassignShard(k string, shardID uint64, ts int64) error RemoveShard(shardID uint64) Type() string @@ -62,6 +63,21 @@ type Index interface { Rebuild() } +// SeriesElem represents a generic series element. +type SeriesElem interface { + Name() []byte + Tags() models.Tags + Deleted() bool + + // InfluxQL expression associated with series during filtering. + Expr() influxql.Expr +} + +// SeriesIterator represents a iterator over a list of series. +type SeriesIterator interface { + Next() SeriesElem +} + // IndexFormat represents the format for an index. type IndexFormat int diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index fd877dd5fd..54918ddb4f 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -591,7 +591,7 @@ func (i *Index) dropMeasurement(name string) error { } // DropSeries removes the series key and its tags from the index. -func (i *Index) DropSeries(key []byte) error { +func (i *Index) DropSeries(key []byte, ts int64) error { if key == nil { return nil } @@ -605,6 +605,11 @@ func (i *Index) DropSeries(key []byte) error { return nil } + // Series was recently created, we can't drop it. + if series.LastModified() >= ts { + return nil + } + // Update the tombstone sketch. i.seriesTSSketch.Add([]byte(k)) @@ -615,7 +620,7 @@ func (i *Index) DropSeries(key []byte) error { series.Measurement().DropSeries(series) // Mark the series as deleted. - series.Delete() + series.Delete(ts) // If the measurement no longer has any series, remove it as well. if !series.Measurement().HasSeries() { @@ -680,6 +685,14 @@ func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { return nil } +func (i *Index) MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (tsdb.SeriesIterator, error) { + keys, err := i.MeasurementSeriesKeysByExpr(name, condition) + if err != nil { + return nil, err + } + return &seriesIterator{keys: keys}, err +} + func (i *Index) MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error) { i.mu.RLock() defer i.mu.RUnlock() @@ -747,18 +760,19 @@ func (i *Index) AssignShard(k string, shardID uint64) { } // UnassignShard updates the index to indicate that series k does not exist in -// the given shardID. -func (i *Index) UnassignShard(k string, shardID uint64) error { +// the given shardID. The series will be unassigned if ts is greater than the +// last time the series was modified. +func (i *Index) UnassignShard(k string, shardID uint64, ts int64) error { ss, _ := i.Series([]byte(k)) if ss != nil { if ss.Assigned(shardID) { // Remove the shard from any series - ss.UnassignShard(shardID) + ss.UnassignShard(shardID, ts) // If this series no longer has shards assigned, remove the series if ss.ShardN() == 0 { // Remove the series key from the index. - return i.DropSeries([]byte(k)) + return i.DropSeries([]byte(k), ts) } } } @@ -781,8 +795,9 @@ func (i *Index) Rebuild() { return nil } - nm := m.Rebuild() i.mu.Lock() + nm := m.Rebuild() + i.measurements[string(name)] = nm i.mu.Unlock() return nil @@ -794,7 +809,7 @@ func (i *Index) Rebuild() { // is removed from the index. func (i *Index) RemoveShard(shardID uint64) { for _, k := range i.SeriesKeys() { - i.UnassignShard(k, shardID) + i.UnassignShard(k, shardID, 0) } } @@ -1016,3 +1031,32 @@ func (itr *seriesPointIterator) nextKeys() error { // errMaxSeriesPerDatabaseExceeded is a marker error returned during series creation // to indicate that a new series would exceed the limits of the database. var errMaxSeriesPerDatabaseExceeded = errors.New("max series per database exceeded") + +type seriesIterator struct { + keys [][]byte + elem series +} + +type series struct { + tsdb.SeriesElem + name []byte + tags models.Tags + deleted bool +} + +func (s series) Name() []byte { return s.name } +func (s series) Tags() models.Tags { return s.tags } +func (s series) Deleted() bool { return s.deleted } +func (s series) Expr() influxql.Expr { return nil } + +func (itr *seriesIterator) Next() tsdb.SeriesElem { + if len(itr.keys) == 0 { + return nil + } + + name, tags := models.ParseKeyBytes(itr.keys[0]) + itr.elem.name = name + itr.elem.tags = tags + itr.keys = itr.keys[1:] + return &itr.elem +} diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 795b71676c..d9a9681e2c 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -6,6 +6,8 @@ import ( "regexp" "sort" "sync" + "sync/atomic" + "time" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" @@ -280,10 +282,22 @@ func (m *Measurement) Rebuild() *Measurement { // Re-add each series to allow the measurement indexes to get re-created. If there were // deletes, the existing measurment may have references to deleted series that need to be - // expunged. Note: we're using SeriesIDs which returns the series in sorted order so that - // re-adding does not incur a sort for each series added. - for _, id := range m.SeriesIDs() { - if s := m.SeriesByID(id); s != nil { + // expunged. Note: we're NOT using SeriesIDs which returns the series in sorted order because + // we need to do this under a write lock to prevent races. The series are added in sorted + // order to prevent resorting them again after they are all re-added. + m.mu.Lock() + defer m.mu.Unlock() + + for k, v := range m.seriesByID { + if v.Deleted() { + continue + } + m.sortedSeriesIDs = append(m.sortedSeriesIDs, k) + } + sort.Sort(m.sortedSeriesIDs) + + for _, id := range m.sortedSeriesIDs { + if s := m.seriesByID[id]; s != nil { nm.AddSeries(s) } } @@ -1137,18 +1151,25 @@ type Series struct { measurement *Measurement shardIDs map[uint64]struct{} // shards that have this series defined deleted bool + + // lastModified tracks the last time the series was created. If the series + // already exists and a request to create is received (a no-op), lastModified + // is increased to track that it is still in use. + lastModified int64 } // NewSeries returns an initialized series struct func NewSeries(key []byte, tags models.Tags) *Series { return &Series{ - Key: string(key), - tags: tags, - shardIDs: make(map[uint64]struct{}), + Key: string(key), + tags: tags, + shardIDs: make(map[uint64]struct{}), + lastModified: time.Now().UTC().UnixNano(), } } func (s *Series) AssignShard(shardID uint64) { + atomic.StoreInt64(&s.lastModified, time.Now().UTC().UnixNano()) if s.Assigned(shardID) { return } @@ -1156,13 +1177,16 @@ func (s *Series) AssignShard(shardID uint64) { s.mu.Lock() // Skip the existence check under the write lock because we're just storing // and empty struct. + s.deleted = false s.shardIDs[shardID] = struct{}{} s.mu.Unlock() } -func (s *Series) UnassignShard(shardID uint64) { +func (s *Series) UnassignShard(shardID uint64, ts int64) { s.mu.Lock() - delete(s.shardIDs, shardID) + if s.LastModified() < ts { + delete(s.shardIDs, shardID) + } s.mu.Unlock() } @@ -1173,6 +1197,10 @@ func (s *Series) Assigned(shardID uint64) bool { return ok } +func (s *Series) LastModified() int64 { + return atomic.LoadInt64(&s.lastModified) +} + func (s *Series) ShardN() int { s.mu.RLock() n := len(s.shardIDs) @@ -1221,9 +1249,11 @@ func (s *Series) GetTagString(key string) string { } // Delete marks this series as deleted. A deleted series should not be returned for queries. -func (s *Series) Delete() { +func (s *Series) Delete(ts int64) { s.mu.Lock() - s.deleted = true + if s.LastModified() < ts { + s.deleted = true + } s.mu.Unlock() } diff --git a/tsdb/index/internal/file_set.go b/tsdb/index/internal/file_set.go index 383310e135..6d5e0a6a23 100644 --- a/tsdb/index/internal/file_set.go +++ b/tsdb/index/internal/file_set.go @@ -4,6 +4,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/tsi1" ) @@ -16,16 +17,16 @@ type File struct { Measurementf func(name []byte) tsi1.MeasurementElem MeasurementIteratorf func() tsi1.MeasurementIterator HasSeriesf func(name []byte, tags models.Tags, buf []byte) (exists, tombstoned bool) - Seriesf func(name []byte, tags models.Tags) tsi1.SeriesElem + Seriesf func(name []byte, tags models.Tags) tsdb.SeriesElem SeriesNf func() uint64 TagKeyf func(name, key []byte) tsi1.TagKeyElem TagKeyIteratorf func(name []byte) tsi1.TagKeyIterator TagValuef func(name, key, value []byte) tsi1.TagValueElem TagValueIteratorf func(name, key []byte) tsi1.TagValueIterator - SeriesIteratorf func() tsi1.SeriesIterator - MeasurementSeriesIteratorf func(name []byte) tsi1.SeriesIterator - TagKeySeriesIteratorf func(name, key []byte) tsi1.SeriesIterator - TagValueSeriesIteratorf func(name, key, value []byte) tsi1.SeriesIterator + SeriesIteratorf func() tsdb.SeriesIterator + MeasurementSeriesIteratorf func(name []byte) tsdb.SeriesIterator + TagKeySeriesIteratorf func(name, key []byte) tsdb.SeriesIterator + TagValueSeriesIteratorf func(name, key, value []byte) tsdb.SeriesIterator MergeSeriesSketchesf func(s, t estimator.Sketch) error MergeMeasurementsSketchesf func(s, t estimator.Sketch) error Retainf func() @@ -42,7 +43,7 @@ func (f *File) MeasurementIterator() tsi1.MeasurementIterator { return f.Measure func (f *File) HasSeries(name []byte, tags models.Tags, buf []byte) (exists, tombstoned bool) { return f.HasSeriesf(name, tags, buf) } -func (f *File) Series(name []byte, tags models.Tags) tsi1.SeriesElem { return f.Seriesf(name, tags) } +func (f *File) Series(name []byte, tags models.Tags) tsdb.SeriesElem { return f.Seriesf(name, tags) } func (f *File) SeriesN() uint64 { return f.SeriesNf() } func (f *File) TagKey(name, key []byte) tsi1.TagKeyElem { return f.TagKeyf(name, key) } func (f *File) TagKeyIterator(name []byte) tsi1.TagKeyIterator { return f.TagKeyIteratorf(name) } @@ -52,14 +53,14 @@ func (f *File) TagValue(name, key, value []byte) tsi1.TagValueElem { func (f *File) TagValueIterator(name, key []byte) tsi1.TagValueIterator { return f.TagValueIteratorf(name, key) } -func (f *File) SeriesIterator() tsi1.SeriesIterator { return f.SeriesIteratorf() } -func (f *File) MeasurementSeriesIterator(name []byte) tsi1.SeriesIterator { +func (f *File) SeriesIterator() tsdb.SeriesIterator { return f.SeriesIteratorf() } +func (f *File) MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator { return f.MeasurementSeriesIteratorf(name) } -func (f *File) TagKeySeriesIterator(name, key []byte) tsi1.SeriesIterator { +func (f *File) TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator { return f.TagKeySeriesIteratorf(name, key) } -func (f *File) TagValueSeriesIterator(name, key, value []byte) tsi1.SeriesIterator { +func (f *File) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIterator { return f.TagValueSeriesIteratorf(name, key, value) } func (f *File) MergeSeriesSketches(s, t estimator.Sketch) error { return f.MergeSeriesSketchesf(s, t) } diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 8a0c3a44c8..1ab8fd9fec 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -185,8 +185,8 @@ func (fs *FileSet) LastContiguousIndexFilesByLevel(level int) []*IndexFile { } // SeriesIterator returns an iterator over all series in the index. -func (fs *FileSet) SeriesIterator() SeriesIterator { - a := make([]SeriesIterator, 0, len(fs.files)) +func (fs *FileSet) SeriesIterator() tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(fs.files)) for _, f := range fs.files { itr := f.SeriesIterator() if itr == nil { @@ -225,8 +225,8 @@ func (fs *FileSet) MeasurementIterator() MeasurementIterator { // MeasurementSeriesIterator returns an iterator over all non-tombstoned series // in the index for the provided measurement. -func (fs *FileSet) MeasurementSeriesIterator(name []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(fs.files)) +func (fs *FileSet) MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(fs.files)) for _, f := range fs.files { itr := f.MeasurementSeriesIterator(name) if itr != nil { @@ -392,8 +392,8 @@ func (fs *FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, r } // TagKeySeriesIterator returns a series iterator for all values across a single key. -func (fs *FileSet) TagKeySeriesIterator(name, key []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(fs.files)) +func (fs *FileSet) TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(fs.files)) for _, f := range fs.files { itr := f.TagKeySeriesIterator(name, key) if itr != nil { @@ -436,8 +436,8 @@ func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator { } // TagValueSeriesIterator returns a series iterator for a single tag value. -func (fs *FileSet) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(fs.files)) +func (fs *FileSet) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(fs.files)) for _, f := range fs.files { itr := f.TagValueSeriesIterator(name, key, value) if itr != nil { @@ -449,7 +449,7 @@ func (fs *FileSet) TagValueSeriesIterator(name, key, value []byte) SeriesIterato // MatchTagValueSeriesIterator returns a series iterator for tags which match value. // If matches is false, returns iterators which do not match value. -func (fs *FileSet) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) SeriesIterator { +func (fs *FileSet) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) tsdb.SeriesIterator { matchEmpty := value.MatchString("") if matches { @@ -465,13 +465,13 @@ func (fs *FileSet) MatchTagValueSeriesIterator(name, key []byte, value *regexp.R return FilterUndeletedSeriesIterator(fs.matchTagValueNotEqualNotEmptySeriesIterator(name, key, value)) } -func (fs *FileSet) matchTagValueEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) tsdb.SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return fs.MeasurementSeriesIterator(name) } - var itrs []SeriesIterator + var itrs []tsdb.SeriesIterator for e := vitr.Next(); e != nil; e = vitr.Next() { if !value.Match(e.Value()) { itrs = append(itrs, fs.TagValueSeriesIterator(name, key, e.Value())) @@ -484,13 +484,13 @@ func (fs *FileSet) matchTagValueEqualEmptySeriesIterator(name, key []byte, value ) } -func (fs *FileSet) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) tsdb.SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return nil } - var itrs []SeriesIterator + var itrs []tsdb.SeriesIterator for e := vitr.Next(); e != nil; e = vitr.Next() { if value.Match(e.Value()) { itrs = append(itrs, fs.TagValueSeriesIterator(name, key, e.Value())) @@ -499,13 +499,13 @@ func (fs *FileSet) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, va return MergeSeriesIterators(itrs...) } -func (fs *FileSet) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) tsdb.SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return nil } - var itrs []SeriesIterator + var itrs []tsdb.SeriesIterator for e := vitr.Next(); e != nil; e = vitr.Next() { if !value.Match(e.Value()) { itrs = append(itrs, fs.TagValueSeriesIterator(name, key, e.Value())) @@ -514,13 +514,13 @@ func (fs *FileSet) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, va return MergeSeriesIterators(itrs...) } -func (fs *FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) tsdb.SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return fs.MeasurementSeriesIterator(name) } - var itrs []SeriesIterator + var itrs []tsdb.SeriesIterator for e := vitr.Next(); e != nil; e = vitr.Next() { if value.Match(e.Value()) { itrs = append(itrs, fs.TagValueSeriesIterator(name, key, e.Value())) @@ -788,7 +788,7 @@ func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, e // MeasurementSeriesByExprIterator returns a series iterator for a measurement // that is filtered by expr. If expr only contains time expressions then this // call is equivalent to MeasurementSeriesIterator(). -func (fs *FileSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) (SeriesIterator, error) { +func (fs *FileSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) (tsdb.SeriesIterator, error) { // Return all series for the measurement if there are no tag expressions. if expr == nil { return fs.MeasurementSeriesIterator(name), nil @@ -822,7 +822,7 @@ func (fs *FileSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr, return keys, nil } -func (fs *FileSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { +func (fs *FileSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (tsdb.SeriesIterator, error) { switch expr := expr.(type) { case *influxql.BinaryExpr: switch expr.Op { @@ -860,7 +860,7 @@ func (fs *FileSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsd } // seriesByBinaryExprIterator returns a series iterator and a filtering expression. -func (fs *FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr, mf *tsdb.MeasurementFields) (tsdb.SeriesIterator, error) { // If this binary expression has another binary expression, then this // is some expression math and we should just pass it to the underlying query. if _, ok := n.LHS.(*influxql.BinaryExpr); ok { @@ -906,7 +906,7 @@ func (fs *FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExp } } -func (fs *FileSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (tsdb.SeriesIterator, error) { // Special handling for "_name" to match measurement name. if bytes.Equal(key, []byte("_name")) { if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { @@ -940,7 +940,7 @@ func (fs *FileSet) seriesByBinaryExprStringIterator(name, key, value []byte, op return fs.TagKeySeriesIterator(name, key), nil } -func (fs *FileSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (tsdb.SeriesIterator, error) { // Special handling for "_name" to match measurement name. if bytes.Equal(key, []byte("_name")) { match := value.Match(name) @@ -952,7 +952,7 @@ func (fs *FileSet) seriesByBinaryExprRegexIterator(name, key []byte, value *rege return fs.MatchTagValueSeriesIterator(name, key, value, op == influxql.EQREGEX), nil } -func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (tsdb.SeriesIterator, error) { if op == influxql.EQ { return IntersectSeriesIterators( fs.TagKeySeriesIterator(name, key), @@ -1018,7 +1018,7 @@ type File interface { Measurement(name []byte) MeasurementElem MeasurementIterator() MeasurementIterator HasSeries(name []byte, tags models.Tags, buf []byte) (exists, tombstoned bool) - Series(name []byte, tags models.Tags) SeriesElem + Series(name []byte, tags models.Tags) tsdb.SeriesElem SeriesN() uint64 TagKey(name, key []byte) TagKeyElem @@ -1028,10 +1028,10 @@ type File interface { TagValueIterator(name, key []byte) TagValueIterator // Series iteration. - SeriesIterator() SeriesIterator - MeasurementSeriesIterator(name []byte) SeriesIterator - TagKeySeriesIterator(name, key []byte) SeriesIterator - TagValueSeriesIterator(name, key, value []byte) SeriesIterator + SeriesIterator() tsdb.SeriesIterator + MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator + TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator + TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIterator // Sketches for cardinality estimation MergeSeriesSketches(s, t estimator.Sketch) error diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 1c58c30210..4dd2a4a5ae 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -559,7 +559,7 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro return nil } -func (i *Index) DropSeries(key []byte) error { +func (i *Index) DropSeries(key []byte, ts int64) error { if err := func() error { i.mu.RLock() defer i.mu.RUnlock() @@ -575,11 +575,8 @@ func (i *Index) DropSeries(key []byte) error { fs := i.retainFileSet() defer fs.Release() - // Check if that was the last series for the measurement in the entire index. - itr := fs.MeasurementSeriesIterator(mname) - if itr == nil { - return nil - } else if e := itr.Next(); e != nil { + mm := fs.Measurement(mname) + if mm == nil || mm.HasSeries() { return nil } @@ -732,6 +729,19 @@ func (i *Index) TagKeyCardinality(name, key []byte) int { return 0 } +func (i *Index) MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (tsdb.SeriesIterator, error) { + fs := i.RetainFileSet() + defer fs.Release() + + itr, err := fs.MeasurementSeriesByExprIterator(name, condition, i.fieldset) + if err != nil { + return nil, err + } else if itr == nil { + return nil, nil + } + return itr, err +} + // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { fs := i.RetainFileSet() @@ -866,9 +876,9 @@ func (i *Index) SetFieldName(measurement []byte, name string) {} func (i *Index) RemoveShard(shardID uint64) {} func (i *Index) AssignShard(k string, shardID uint64) {} -func (i *Index) UnassignShard(k string, shardID uint64) error { +func (i *Index) UnassignShard(k string, shardID uint64, ts int64) error { // This can be called directly once inmem is gone. - return i.DropSeries([]byte(k)) + return i.DropSeries([]byte(k), ts) } // SeriesPointIterator returns an influxql iterator over all series. @@ -1170,7 +1180,7 @@ type seriesPointIterator struct { fs *FileSet fieldset *tsdb.MeasurementFieldSet mitr MeasurementIterator - sitr SeriesIterator + sitr tsdb.SeriesIterator opt query.IteratorOptions point query.FloatPoint // reusable point diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index cc1c371825..b43397b4ad 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/mmap" + "github.com/influxdata/influxdb/tsdb" ) // IndexFileVersion is the current TSI1 index file version. @@ -233,7 +234,7 @@ func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator { // TagKeySeriesIterator returns a series iterator for a tag key and a flag // indicating if a tombstone exists on the measurement or key. -func (f *IndexFile) TagKeySeriesIterator(name, key []byte) SeriesIterator { +func (f *IndexFile) TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator { tblk := f.tblks[string(name)] if tblk == nil { return nil @@ -247,7 +248,7 @@ func (f *IndexFile) TagKeySeriesIterator(name, key []byte) SeriesIterator { // Merge all value series iterators together. vitr := ke.TagValueIterator() - var itrs []SeriesIterator + var itrs []tsdb.SeriesIterator for ve := vitr.Next(); ve != nil; ve = vitr.Next() { sitr := &rawSeriesIDIterator{data: ve.(*TagBlockValueElem).series.data} itrs = append(itrs, newSeriesDecodeIterator(&f.sblk, sitr)) @@ -258,7 +259,7 @@ func (f *IndexFile) TagKeySeriesIterator(name, key []byte) SeriesIterator { // TagValueSeriesIterator returns a series iterator for a tag value and a flag // indicating if a tombstone exists on the measurement, key, or value. -func (f *IndexFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { +func (f *IndexFile) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIterator { tblk := f.tblks[string(name)] if tblk == nil { return nil @@ -305,7 +306,7 @@ func (f *IndexFile) HasSeries(name []byte, tags models.Tags, buf []byte) (exists // Series returns the series and a flag indicating if the series has been // tombstoned by the measurement. -func (f *IndexFile) Series(name []byte, tags models.Tags) SeriesElem { +func (f *IndexFile) Series(name []byte, tags models.Tags) tsdb.SeriesElem { return f.sblk.Series(name, tags) } @@ -333,7 +334,7 @@ func (f *IndexFile) TagKeyIterator(name []byte) TagKeyIterator { } // MeasurementSeriesIterator returns an iterator over a measurement's series. -func (f *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator { +func (f *IndexFile) MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator { return &seriesDecodeIterator{ itr: f.mblk.seriesIDIterator(name), sblk: &f.sblk, @@ -355,7 +356,7 @@ func (f *IndexFile) SeriesN() uint64 { } // SeriesIterator returns an iterator over all series. -func (f *IndexFile) SeriesIterator() SeriesIterator { +func (f *IndexFile) SeriesIterator() tsdb.SeriesIterator { return f.sblk.SeriesIterator() } diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index 25b199aa9a..c3b9b618f7 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/mmap" + "github.com/influxdata/influxdb/tsdb" ) // IndexFiles represents a layered set of index files. @@ -90,8 +91,8 @@ func (p *IndexFiles) TagKeyIterator(name []byte) (TagKeyIterator, error) { } // SeriesIterator returns an iterator that merges series across all files. -func (p IndexFiles) SeriesIterator() SeriesIterator { - a := make([]SeriesIterator, 0, len(p)) +func (p IndexFiles) SeriesIterator() tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(p)) for _, f := range p { itr := f.SeriesIterator() if itr == nil { @@ -103,8 +104,8 @@ func (p IndexFiles) SeriesIterator() SeriesIterator { } // MeasurementSeriesIterator returns an iterator that merges series across all files. -func (p IndexFiles) MeasurementSeriesIterator(name []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(p)) +func (p IndexFiles) MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(p)) for _, f := range p { itr := f.MeasurementSeriesIterator(name) if itr == nil { @@ -116,8 +117,8 @@ func (p IndexFiles) MeasurementSeriesIterator(name []byte) SeriesIterator { } // TagValueSeriesIterator returns an iterator that merges series across all files. -func (p IndexFiles) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(p)) +func (p IndexFiles) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIterator { + a := make([]tsdb.SeriesIterator, 0, len(p)) for i := range p { itr := p[i].TagValueSeriesIterator(name, key, value) if itr != nil { diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 19029c0f72..707cccb050 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -93,7 +93,7 @@ func TestIndex_MeasurementExists(t *testing.T) { }) // Delete one series. - if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "east"}))); err != nil { + if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "east"})), 0); err != nil { t.Fatal(err) } @@ -107,7 +107,7 @@ func TestIndex_MeasurementExists(t *testing.T) { }) // Delete second series. - if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}))); err != nil { + if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "west"})), 0); err != nil { t.Fatal(err) } diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 63de598ade..5dae24b6c8 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -14,6 +14,7 @@ import ( "time" "github.com/influxdata/influxdb/pkg/estimator/hll" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" @@ -246,7 +247,7 @@ func (f *LogFile) DeleteMeasurement(name []byte) error { } // TagKeySeriesIterator returns a series iterator for a tag key. -func (f *LogFile) TagKeySeriesIterator(name, key []byte) SeriesIterator { +func (f *LogFile) TagKeySeriesIterator(name, key []byte) tsdb.SeriesIterator { f.mu.RLock() defer f.mu.RUnlock() @@ -261,7 +262,7 @@ func (f *LogFile) TagKeySeriesIterator(name, key []byte) SeriesIterator { } // Combine iterators across all tag keys. - itrs := make([]SeriesIterator, 0, len(tk.tagValues)) + itrs := make([]tsdb.SeriesIterator, 0, len(tk.tagValues)) for _, tv := range tk.tagValues { if len(tv.series) == 0 { continue @@ -361,7 +362,7 @@ func (f *LogFile) DeleteTagKey(name, key []byte) error { } // TagValueSeriesIterator returns a series iterator for a tag value. -func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { +func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) tsdb.SeriesIterator { f.mu.RLock() defer f.mu.RUnlock() @@ -562,12 +563,12 @@ func (f *LogFile) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][] } // Series returns a series by name/tags. -func (f *LogFile) Series(name []byte, tags models.Tags) SeriesElem { +func (f *LogFile) Series(name []byte, tags models.Tags) tsdb.SeriesElem { return f.SeriesWithBuffer(name, tags, nil) } // SeriesWithBuffer returns a series by name/tags. -func (f *LogFile) SeriesWithBuffer(name []byte, tags models.Tags, buf []byte) SeriesElem { +func (f *LogFile) SeriesWithBuffer(name []byte, tags models.Tags, buf []byte) tsdb.SeriesElem { key := AppendSeriesKey(buf[:0], name, tags) f.mu.RLock() @@ -704,7 +705,7 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { } // SeriesIterator returns an iterator over all series in the log file. -func (f *LogFile) SeriesIterator() SeriesIterator { +func (f *LogFile) SeriesIterator() tsdb.SeriesIterator { f.mu.RLock() defer f.mu.RUnlock() @@ -783,7 +784,7 @@ func (f *LogFile) MeasurementIterator() MeasurementIterator { } // MeasurementSeriesIterator returns an iterator over all series for a measurement. -func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator { +func (f *LogFile) MeasurementSeriesIterator(name []byte) tsdb.SeriesIterator { f.mu.RLock() defer f.mu.RUnlock() @@ -1289,6 +1290,17 @@ type logMeasurement struct { func (m *logMeasurement) Name() []byte { return m.name } func (m *logMeasurement) Deleted() bool { return m.deleted } +func (m *logMeasurement) HasSeries() bool { + if m.deleted { + return false + } + for _, v := range m.series { + if !v.deleted { + return true + } + } + return false +} func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { ts, ok := m.tagSet[string(key)] @@ -1451,7 +1463,7 @@ func newLogSeriesIterator(m map[string]*logSerie) *logSeriesIterator { } // Next returns the next element in the iterator. -func (itr *logSeriesIterator) Next() (e SeriesElem) { +func (itr *logSeriesIterator) Next() (e tsdb.SeriesElem) { if len(itr.series) == 0 { return nil } diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index 397adfd31d..b2c3f6025d 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -337,6 +337,8 @@ func (e *MeasurementBlockElem) SeriesID(i int) uint32 { return binary.BigEndian.Uint32(e.series.data[i*SeriesIDSize:]) } +func (e *MeasurementBlockElem) HasSeries() bool { return e.series.n > 0 } + // SeriesIDs returns a list of decoded series ids. // // NOTE: This should be used for testing and diagnostics purposes only. diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index 6a8badcde9..0f361cb165 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/mmap" "github.com/influxdata/influxdb/pkg/rhh" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" ) @@ -80,7 +81,7 @@ func (blk *SeriesBlock) HasSeries(name []byte, tags models.Tags, buf []byte) (ex } // Series returns a series element. -func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem { +func (blk *SeriesBlock) Series(name []byte, tags models.Tags) tsdb.SeriesElem { offset, _ := blk.Offset(name, tags, nil) if offset == 0 { return nil @@ -162,7 +163,7 @@ func (blk *SeriesBlock) SeriesCount() uint32 { } // SeriesIterator returns an iterator over all the series. -func (blk *SeriesBlock) SeriesIterator() SeriesIterator { +func (blk *SeriesBlock) SeriesIterator() tsdb.SeriesIterator { return &seriesBlockIterator{ n: blk.SeriesCount(), offset: 1, @@ -250,7 +251,7 @@ type seriesBlockIterator struct { } // Next returns the next series element. -func (itr *seriesBlockIterator) Next() SeriesElem { +func (itr *seriesBlockIterator) Next() tsdb.SeriesElem { for { // Exit if at the end. if itr.i == itr.n { @@ -295,7 +296,7 @@ func newSeriesDecodeIterator(sblk *SeriesBlock, itr seriesIDIterator) *seriesDec } // Next returns the next series element. -func (itr *seriesDecodeIterator) Next() SeriesElem { +func (itr *seriesDecodeIterator) Next() tsdb.SeriesElem { // Read next series id. id := itr.itr.next() if id == 0 { diff --git a/tsdb/index/tsi1/tsi1.go b/tsdb/index/tsi1/tsi1.go index 6c5c287358..20ac48c4a6 100644 --- a/tsdb/index/tsi1/tsi1.go +++ b/tsdb/index/tsi1/tsi1.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" ) @@ -20,6 +21,7 @@ const LoadFactor = 80 type MeasurementElem interface { Name() []byte Deleted() bool + HasSeries() bool } // MeasurementElems represents a list of MeasurementElem. @@ -114,6 +116,15 @@ func (p measurementMergeElem) Deleted() bool { return p[0].Deleted() } +func (p measurementMergeElem) HasSeries() bool { + for _, v := range p { + if v.HasSeries() { + return true + } + } + return false +} + // filterUndeletedMeasurementIterator returns all measurements which are not deleted. type filterUndeletedMeasurementIterator struct { itr MeasurementIterator @@ -341,18 +352,8 @@ func (p tagValueMergeElem) Deleted() bool { return p[0].Deleted() } -// SeriesElem represents a generic series element. -type SeriesElem interface { - Name() []byte - Tags() models.Tags - Deleted() bool - - // InfluxQL expression associated with series during filtering. - Expr() influxql.Expr -} - // SeriesElemKey encodes e as a series key. -func SeriesElemKey(e SeriesElem) []byte { +func SeriesElemKey(e tsdb.SeriesElem) []byte { name, tags := e.Name(), e.Tags() // TODO: Precompute allocation size. @@ -370,7 +371,7 @@ func SeriesElemKey(e SeriesElem) []byte { } // CompareSeriesElem returns -1 if a < b, 1 if a > b, and 0 if equal. -func CompareSeriesElem(a, b SeriesElem) int { +func CompareSeriesElem(a, b tsdb.SeriesElem) int { if cmp := bytes.Compare(a.Name(), b.Name()); cmp != 0 { return cmp } @@ -389,15 +390,10 @@ func (e *seriesElem) Tags() models.Tags { return e.tags } func (e *seriesElem) Deleted() bool { return e.deleted } func (e *seriesElem) Expr() influxql.Expr { return nil } -// SeriesIterator represents a iterator over a list of series. -type SeriesIterator interface { - Next() SeriesElem -} - // MergeSeriesIterators returns an iterator that merges a set of iterators. // Iterators that are first in the list take precendence and a deletion by those // early iterators will invalidate elements by later iterators. -func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator { +func MergeSeriesIterators(itrs ...tsdb.SeriesIterator) tsdb.SeriesIterator { if n := len(itrs); n == 0 { return nil } else if n == 1 { @@ -405,22 +401,22 @@ func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator { } return &seriesMergeIterator{ - buf: make([]SeriesElem, len(itrs)), + buf: make([]tsdb.SeriesElem, len(itrs)), itrs: itrs, } } // seriesMergeIterator is an iterator that merges multiple iterators together. type seriesMergeIterator struct { - buf []SeriesElem - itrs []SeriesIterator + buf []tsdb.SeriesElem + itrs []tsdb.SeriesIterator } // Next returns the element with the next lowest name/tags across the iterators. // // If multiple iterators contain the same name/tags then the first is returned // and the remaining ones are skipped. -func (itr *seriesMergeIterator) Next() SeriesElem { +func (itr *seriesMergeIterator) Next() tsdb.SeriesElem { // Find next lowest name/tags amongst the buffers. var name []byte var tags models.Tags @@ -452,7 +448,7 @@ func (itr *seriesMergeIterator) Next() SeriesElem { } // Refill buffer. - var e SeriesElem + var e tsdb.SeriesElem for i, buf := range itr.buf { if buf == nil || !bytes.Equal(buf.Name(), name) || models.CompareTags(buf.Tags(), tags) != 0 { continue @@ -472,23 +468,23 @@ func (itr *seriesMergeIterator) Next() SeriesElem { // IntersectSeriesIterators returns an iterator that only returns series which // occur in both iterators. If both series have associated expressions then // they are combined together. -func IntersectSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { +func IntersectSeriesIterators(itr0, itr1 tsdb.SeriesIterator) tsdb.SeriesIterator { if itr0 == nil || itr1 == nil { return nil } - return &seriesIntersectIterator{itrs: [2]SeriesIterator{itr0, itr1}} + return &seriesIntersectIterator{itrs: [2]tsdb.SeriesIterator{itr0, itr1}} } // seriesIntersectIterator is an iterator that merges two iterators together. type seriesIntersectIterator struct { e seriesExprElem - buf [2]SeriesElem - itrs [2]SeriesIterator + buf [2]tsdb.SeriesElem + itrs [2]tsdb.SeriesIterator } // Next returns the next element which occurs in both iterators. -func (itr *seriesIntersectIterator) Next() (e SeriesElem) { +func (itr *seriesIntersectIterator) Next() (e tsdb.SeriesElem) { for { // Fill buffers. if itr.buf[0] == nil { @@ -538,7 +534,7 @@ func (itr *seriesIntersectIterator) Next() (e SeriesElem) { // UnionSeriesIterators returns an iterator that returns series from both // both iterators. If both series have associated expressions then they are // combined together. -func UnionSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { +func UnionSeriesIterators(itr0, itr1 tsdb.SeriesIterator) tsdb.SeriesIterator { // Return other iterator if either one is nil. if itr0 == nil { return itr1 @@ -546,18 +542,18 @@ func UnionSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { return itr0 } - return &seriesUnionIterator{itrs: [2]SeriesIterator{itr0, itr1}} + return &seriesUnionIterator{itrs: [2]tsdb.SeriesIterator{itr0, itr1}} } // seriesUnionIterator is an iterator that unions two iterators together. type seriesUnionIterator struct { e seriesExprElem - buf [2]SeriesElem - itrs [2]SeriesIterator + buf [2]tsdb.SeriesElem + itrs [2]tsdb.SeriesIterator } // Next returns the next element which occurs in both iterators. -func (itr *seriesUnionIterator) Next() (e SeriesElem) { +func (itr *seriesUnionIterator) Next() (e tsdb.SeriesElem) { // Fill buffers. if itr.buf[0] == nil { itr.buf[0] = itr.itrs[0].Next() @@ -606,23 +602,23 @@ func (itr *seriesUnionIterator) Next() (e SeriesElem) { // DifferenceSeriesIterators returns an iterator that only returns series which // occur the first iterator but not the second iterator. -func DifferenceSeriesIterators(itr0, itr1 SeriesIterator) SeriesIterator { +func DifferenceSeriesIterators(itr0, itr1 tsdb.SeriesIterator) tsdb.SeriesIterator { if itr0 != nil && itr1 == nil { return itr0 } else if itr0 == nil { return nil } - return &seriesDifferenceIterator{itrs: [2]SeriesIterator{itr0, itr1}} + return &seriesDifferenceIterator{itrs: [2]tsdb.SeriesIterator{itr0, itr1}} } // seriesDifferenceIterator is an iterator that merges two iterators together. type seriesDifferenceIterator struct { - buf [2]SeriesElem - itrs [2]SeriesIterator + buf [2]tsdb.SeriesElem + itrs [2]tsdb.SeriesIterator } // Next returns the next element which occurs only in the first iterator. -func (itr *seriesDifferenceIterator) Next() (e SeriesElem) { +func (itr *seriesDifferenceIterator) Next() (e tsdb.SeriesElem) { for { // Fill buffers. if itr.buf[0] == nil { @@ -658,18 +654,18 @@ func (itr *seriesDifferenceIterator) Next() (e SeriesElem) { // filterUndeletedSeriesIterator returns all series which are not deleted. type filterUndeletedSeriesIterator struct { - itr SeriesIterator + itr tsdb.SeriesIterator } // FilterUndeletedSeriesIterator returns an iterator which filters all deleted series. -func FilterUndeletedSeriesIterator(itr SeriesIterator) SeriesIterator { +func FilterUndeletedSeriesIterator(itr tsdb.SeriesIterator) tsdb.SeriesIterator { if itr == nil { return nil } return &filterUndeletedSeriesIterator{itr: itr} } -func (itr *filterUndeletedSeriesIterator) Next() SeriesElem { +func (itr *filterUndeletedSeriesIterator) Next() tsdb.SeriesElem { for { e := itr.itr.Next() if e == nil { @@ -683,7 +679,7 @@ func (itr *filterUndeletedSeriesIterator) Next() SeriesElem { // seriesExprElem holds a series and its associated filter expression. type seriesExprElem struct { - SeriesElem + tsdb.SeriesElem expr influxql.Expr } @@ -692,12 +688,12 @@ func (e *seriesExprElem) Expr() influxql.Expr { return e.expr } // seriesExprIterator is an iterator that attaches an associated expression. type seriesExprIterator struct { - itr SeriesIterator + itr tsdb.SeriesIterator e seriesExprElem } // newSeriesExprIterator returns a new instance of seriesExprIterator. -func newSeriesExprIterator(itr SeriesIterator, expr influxql.Expr) SeriesIterator { +func newSeriesExprIterator(itr tsdb.SeriesIterator, expr influxql.Expr) tsdb.SeriesIterator { if itr == nil { return nil } @@ -711,7 +707,7 @@ func newSeriesExprIterator(itr SeriesIterator, expr influxql.Expr) SeriesIterato } // Next returns the next element in the iterator. -func (itr *seriesExprIterator) Next() SeriesElem { +func (itr *seriesExprIterator) Next() tsdb.SeriesElem { itr.e.SeriesElem = itr.itr.Next() if itr.e.SeriesElem == nil { return nil diff --git a/tsdb/index/tsi1/tsi1_test.go b/tsdb/index/tsi1/tsi1_test.go index 80ac598779..424f0c6a22 100644 --- a/tsdb/index/tsi1/tsi1_test.go +++ b/tsdb/index/tsi1/tsi1_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/tsi1" "github.com/influxdata/influxql" ) @@ -201,12 +202,15 @@ func TestMergeSeriesIterators(t *testing.T) { // MeasurementElem represents a test implementation of tsi1.MeasurementElem. type MeasurementElem struct { - name []byte - deleted bool + name []byte + deleted bool + hasSeries bool } -func (e *MeasurementElem) Name() []byte { return e.name } -func (e *MeasurementElem) Deleted() bool { return e.deleted } +func (e *MeasurementElem) Name() []byte { return e.name } +func (e *MeasurementElem) Deleted() bool { return e.deleted } +func (e *MeasurementElem) HasSeries() bool { return e.hasSeries } + func (e *MeasurementElem) TagKeyIterator() tsi1.TagKeyIterator { return nil } // MeasurementIterator represents an iterator over a slice of measurements. @@ -255,7 +259,7 @@ type TagValueElem struct { func (e *TagValueElem) Value() []byte { return e.value } func (e *TagValueElem) Deleted() bool { return e.deleted } -func (e *TagValueElem) SeriesIterator() tsi1.SeriesIterator { return nil } +func (e *TagValueElem) SeriesIterator() tsdb.SeriesIterator { return nil } // TagValueIterator represents an iterator over a slice of tag values. type TagValueIterator struct { @@ -290,7 +294,7 @@ type SeriesIterator struct { } // Next returns the next element in the iterator. -func (itr *SeriesIterator) Next() (e tsi1.SeriesElem) { +func (itr *SeriesIterator) Next() (e tsdb.SeriesElem) { if len(itr.Elems) == 0 { return nil } diff --git a/tsdb/shard.go b/tsdb/shard.go index 6d5f7d525c..3b3fd45528 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "math" "path/filepath" "regexp" "runtime" @@ -686,18 +685,13 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error return nil } -// DeleteSeries deletes a list of series. -func (s *Shard) DeleteSeries(seriesKeys [][]byte) error { - return s.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64) -} - // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) -func (s *Shard) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { +func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error { engine, err := s.engine() if err != nil { return err } - return engine.DeleteSeriesRange(seriesKeys, min, max) + return engine.DeleteSeriesRange(itr, min, max) } // DeleteMeasurement deletes a measurement and all underlying series. @@ -755,6 +749,14 @@ func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { return engine.MeasurementNamesByRegex(re) } +func (s *Shard) MeasurementSeriesKeysByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) { + engine, err := s.engine() + if err != nil { + return nil, err + } + return engine.MeasurementSeriesKeysByExprIterator(name, expr) +} + // MeasurementSeriesKeysByExpr returns a list of series keys from the shard // matching expr. func (s *Shard) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { diff --git a/tsdb/store.go b/tsdb/store.go index 6f7ccd22cc..a8fd69e1c8 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -897,23 +897,21 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi defer limit.Release() // Find matching series keys for each measurement. - var keys [][]byte for _, name := range names { - a, err := sh.MeasurementSeriesKeysByExpr([]byte(name), condition) + + itr, err := sh.MeasurementSeriesKeysByExprIterator([]byte(name), condition) if err != nil { return err + } else if itr == nil { + continue } - keys = append(keys, a...) + + if err := sh.DeleteSeriesRange(itr, min, max); err != nil { + return err + } + } - if !bytesutil.IsSorted(keys) { - bytesutil.Sort(keys) - } - - // Delete all matching keys. - if err := sh.DeleteSeriesRange(keys, min, max); err != nil { - return err - } return nil }) }