Optimize DeleteSeriesRange

This removes more allocations and speeds up some critical sections.
pull/9084/head
Jason Wilder 2017-11-01 15:24:08 -06:00
parent aee395d3bd
commit 80cd5e63af
6 changed files with 201 additions and 158 deletions

View File

@ -65,6 +65,9 @@ const (
// keyFieldSeparator separates the series key from the field name in the composite key
// that identifies a specific field in series
keyFieldSeparator = "#!~#"
// deleteFlushThreshold is the size in bytes of a batch of series keys to delete.
deleteFlushThreshold = 50 * 1024 * 1024
)
// Statistics gathered by the engine.
@ -560,7 +563,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
// Save reference to index for iterator creation.
e.index = index
if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error {
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
@ -931,16 +934,11 @@ func (e *Engine) WritePoints(points []models.Point) error {
return err
}
// DeleteSeriesRange removes the values between min and max (inclusive) from all series
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error {
// Disable and abort running compactions so that tombstones added existing tsm
// files don't get removed. This would cause deleted measurements/series to
// re-appear once the compaction completed. We only disable the level compactions
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
// and writing tombstones takes a long time, writes can get rejected due to the cache
// filling up.
e.disableLevelCompactions(true)
defer e.enableLevelCompactions(true)
var disableOnce bool
var sz int
batch := make([][]byte, 0, 10000)
for elem := itr.Next(); elem != nil; elem = itr.Next() {
if elem.Expr() != nil {
@ -949,14 +947,29 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
}
}
batch = append(batch, models.MakeKey(elem.Name(), elem.Tags()))
if !disableOnce {
// Disable and abort running compactions so that tombstones added existing tsm
// files don't get removed. This would cause deleted measurements/series to
// re-appear once the compaction completed. We only disable the level compactions
// so that snapshotting does not stop while writing out tombstones. If it is stopped,
// and writing tombstones takes a long time, writes can get rejected due to the cache
// filling up.
e.disableLevelCompactions(true)
defer e.enableLevelCompactions(true)
disableOnce = true
}
if len(batch) == 10000 {
key := models.MakeKey(elem.Name(), elem.Tags())
sz += len(key)
batch = append(batch, key)
if sz >= deleteFlushThreshold {
// Delete all matching batch.
if err := e.deleteSeriesRange(batch, min, max); err != nil {
return err
}
batch = batch[:0]
sz = 0
}
}
@ -967,6 +980,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
}
batch = batch[:0]
}
go e.index.Rebuild()
return nil
}
@ -992,46 +1006,55 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
max = math.MaxInt64
}
var i int
var abort = errors.New("iteration aborted") // sentinel error value
deleteKeys := make([][]byte, 0, len(seriesKeys))
// Run the delete on each TSM file in parallel
if err := e.FileStore.Apply(func(r TSMFile) error {
// See if this TSM file contains the keys and time range
minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1]
tsmMin, tsmMax := r.KeyRange()
// Walk through the keys in the file store in sorted order and track any series keys
// we find that match the set passed in. The file store keys contain the field as a
// suffix, but the passed in set is only the measurement and tag sets.
if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
if i >= len(seriesKeys) {
return abort
}
tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin)
tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax)
// Strip off the field portion of the key
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
// Skip any series keys passed in that are less than the current key. These
// don't exist on disk.
cmp := bytes.Compare(seriesKeys[i], seriesKey)
for i < len(seriesKeys) && cmp < 0 {
cmp = bytes.Compare(seriesKeys[i], seriesKey)
i++
}
// We've found a matching key, add the series key (w/ field) to the set we need
// to delete.
if cmp == 0 {
deleteKeys = append(deleteKeys, k)
overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0
if !overlaps || !r.OverlapsTimeRange(min, max) {
return nil
}
// Delete each key we find in the file. We seek to the min key and walk from there.
batch := r.BatchDelete()
n := r.KeyCount()
var j int
for i := r.Seek(minKey); i < n; i++ {
indexKey, _ := r.KeyAt(i)
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 {
j++
}
if j >= len(seriesKeys) {
break
}
if bytes.Equal(seriesKeys[j], seriesKey) {
if err := batch.DeleteRange([][]byte{indexKey}, min, max); err != nil {
batch.Rollback()
return err
}
}
}
if err := batch.Commit(); err != nil {
return err
}
return nil
}); err != nil && err != abort {
}); err != nil {
return err
}
if err := e.FileStore.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
var abort = errors.New("iteration aborted") // sentinel error value
// find the keys in the cache and remove them
deleteKeys = deleteKeys[:0]
deleteKeys := make([][]byte, 0, len(seriesKeys))
// ApplySerialEntryFn cannot return an error in this invocation.
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
@ -1062,27 +1085,43 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// Note: this is inherently racy if writes are occuring to the same measurement/series are
// being removed. A write could occur and exist in the cache at this point, but we
// would delete it from the index.
i = 0
if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
if i >= len(seriesKeys) {
return abort
}
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
minKey, maxKey := seriesKeys[0], seriesKeys[len(seriesKeys)-1]
if err := e.FileStore.Apply(func(r TSMFile) error {
tsmMin, tsmMax := r.KeyRange()
cmp := bytes.Compare(seriesKeys[i], seriesKey)
for i < len(seriesKeys) && cmp < 0 {
cmp = bytes.Compare(seriesKeys[i], seriesKey)
i++
}
tsmMin, _ = SeriesAndFieldFromCompositeKey(tsmMin)
tsmMax, _ = SeriesAndFieldFromCompositeKey(tsmMax)
for i < len(seriesKeys) && cmp == 0 {
seriesKeys[i] = nil
cmp = bytes.Compare(seriesKeys[i], seriesKey)
i++
overlaps := bytes.Compare(tsmMin, maxKey) <= 0 && bytes.Compare(tsmMax, minKey) >= 0
if !overlaps || !r.OverlapsTimeRange(min, max) {
return nil
}
n := r.KeyCount()
var j int
for i := r.Seek(minKey); i < n; i++ {
if j >= len(seriesKeys) {
return abort
}
indexKey, _ := r.KeyAt(i)
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
cmp := bytes.Compare(seriesKeys[j], seriesKey)
for j < len(seriesKeys) && cmp < 0 {
cmp = bytes.Compare(seriesKeys[j], seriesKey)
j++
}
if j < len(seriesKeys) && cmp == 0 {
seriesKeys[j] = nil
j++
}
return nil
}
return nil
}); err != nil && err != abort {
}); err != nil {
return err
}
@ -1150,7 +1189,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
}
// Check the filestore.
return e.FileStore.WalkKeys(func(k []byte, typ byte) error {
return e.FileStore.WalkKeys(name, func(k []byte, typ byte) error {
if bytes.HasPrefix(k, encodedName) {
return abortErr
}

View File

@ -8,6 +8,7 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
@ -16,6 +17,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/query"
"go.uber.org/zap"
@ -53,6 +55,9 @@ type TSMFile interface {
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
OverlapsTimeRange(min, max int64) bool
// OverlapsKeyRange returns true if the key range of the file intersects min and max.
OverlapsKeyRange(min, max []byte) bool
// TimeRange returns the min and max time across all keys in the file.
TimeRange() (int64, int64)
@ -65,6 +70,9 @@ type TSMFile interface {
// KeyCount returns the number of distinct keys in the file.
KeyCount() int
// Seek returns the position in the index with the key <= key.
Seek(key []byte) int
// KeyAt returns the key located at index position idx.
KeyAt(idx int) ([]byte, byte)
@ -288,7 +296,7 @@ func (f *FileStore) NextGeneration() int {
// WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key
// exists in multiple files, it will be invoked for each file.
func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error {
func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) error {
f.mu.RLock()
if len(f.files) == 0 {
f.mu.RUnlock()
@ -302,8 +310,13 @@ func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error {
readers = append(readers, ch)
go func(c chan seriesKey, r TSMFile) {
start := 0
if len(seek) > 0 {
start = r.Seek(seek)
}
n := r.KeyCount()
for i := 0; i < n; i++ {
for i := start; i < n; i++ {
key, typ := r.KeyAt(i)
select {
@ -341,7 +354,7 @@ func (f *FileStore) Keys() map[string]byte {
defer f.mu.RUnlock()
uniqueKeys := map[string]byte{}
if err := f.WalkKeys(func(key []byte, typ byte) error {
if err := f.WalkKeys(nil, func(key []byte, typ byte) error {
uniqueKeys[string(key)] = typ
return nil
}); err != nil {
@ -369,73 +382,38 @@ func (f *FileStore) Delete(keys [][]byte) error {
return f.DeleteRange(keys, math.MinInt64, math.MaxInt64)
}
// DeleteRangeWith removes the values between timestamps min and max for keys where
// fn returns true.
func (f *FileStore) DeleteRangeWith(fn func(name []byte, tags models.Tags) bool, min, max int64) error {
var batches BatchDeleters
func (f *FileStore) Apply(fn func(r TSMFile) error) error {
// Limit apply fn to number of cores
limiter := limiter.NewFixed(runtime.GOMAXPROCS(0))
f.mu.RLock()
errC := make(chan error, len(f.files))
for _, f := range f.files {
if f.OverlapsTimeRange(min, max) {
batches = append(batches, f.BatchDelete())
go func(r TSMFile) {
limiter.Take()
defer limiter.Release()
r.Ref()
defer r.Unref()
errC <- fn(r)
}(f)
}
var applyErr error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
applyErr = err
}
}
f.mu.RUnlock()
// No TSM files contain this time range, nothing to delete.
if len(batches) == 0 {
return nil
}
// Bound the deletes in batches to avoid large allocations
var batchSize = 10000
// Delete groups of series in batches
if err := func() error {
deleteKeys := make([][]byte, 0, batchSize)
if err := f.WalkKeys(func(k []byte, _ byte) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
meas, tags := models.ParseKeyBytes(seriesKey)
if !fn(meas, tags) {
return nil
}
deleteKeys = append(deleteKeys, k)
// Delete this batch and reset it
if len(deleteKeys) == batchSize {
if err := batches.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
deleteKeys = deleteKeys[:0]
}
return nil
}); err != nil {
return err
}
// Delete the last batch if there is one
if len(deleteKeys) > 0 {
if err := batches.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
}
// Try to commit these deletes
return batches.Commit()
}(); err != nil {
// Rollback the deletes
_ = batches.Rollback()
return err
}
f.mu.Lock()
f.lastModified = time.Now().UTC()
f.lastFileStats = nil
f.mu.Unlock()
return nil
return applyErr
}
// DeleteRange removes the values for keys between timestamps min and max. This should only

View File

@ -1,7 +1,6 @@
package tsm1_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
@ -9,11 +8,11 @@ import (
"path/filepath"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
@ -2541,7 +2540,7 @@ func TestFileStore_Delete(t *testing.T) {
}
}
func TestFileStore_DeleteRangeWith(t *testing.T) {
func TestFileStore_Apply(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
@ -2565,25 +2564,16 @@ func TestFileStore_DeleteRangeWith(t *testing.T) {
t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
}
if err := fs.DeleteRangeWith(func(name []byte, tags models.Tags) bool {
if !bytes.Equal([]byte("cpu"), name) {
return false
}
hostTag := tags.Get([]byte("host"))
return bytes.Equal(hostTag, []byte("server1")) || bytes.Equal(hostTag, []byte("server2"))
}, 0, 1); err != nil {
fatal(t, "deleting", err)
var n int64
if err := fs.Apply(func(r tsm1.TSMFile) error {
atomic.AddInt64(&n, 1)
return nil
}); err != nil {
t.Fatalf("unexpected error deleting: %v", err)
}
keys = fs.Keys()
if got, exp := len(keys), 1; got != exp {
t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
}
if _, ok := keys["mem,host=server1#!~#value"]; !ok {
t.Fatalf("key missing: %v", "mem,host=server1#!~#value")
if got, exp := n, int64(3); got != exp {
t.Fatalf("apply mismatch: got %v, exp %v", got, exp)
}
}

View File

@ -88,6 +88,9 @@ type TSMIndex interface {
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Seek returns the position in the index where key <= value in the index.
Seek(key []byte) int
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
OverlapsTimeRange(min, max int64) bool
@ -313,6 +316,10 @@ func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
return t.index.KeyAt(idx)
}
func (t *TSMReader) Seek(key []byte) int {
return t.index.Seek(key)
}
// ReadAt returns the values corresponding to the given index entry.
func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
t.mu.RLock()
@ -503,6 +510,11 @@ func (t *TSMReader) OverlapsTimeRange(min, max int64) bool {
return t.index.OverlapsTimeRange(min, max)
}
// OverlapsKeyRange returns true if the key range of the file intersect min and max.
func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool {
return t.index.OverlapsKeyRange(min, max)
}
// TimeRange returns the min and max time across all keys in the file.
func (t *TSMReader) TimeRange() (int64, int64) {
return t.index.TimeRange()
@ -788,6 +800,12 @@ func (d *indirectIndex) offset(i int) int {
return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4]))
}
func (d *indirectIndex) Seek(key []byte) int {
d.mu.RLock()
defer d.mu.RUnlock()
return d.searchOffset(key)
}
// searchOffset searches the offsets slice for key and returns the position in
// offsets where key would exist.
func (d *indirectIndex) searchOffset(key []byte) int {
@ -966,10 +984,6 @@ func (d *indirectIndex) KeyCount() int {
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys [][]byte) {
for len(keys) > 0 && !d.ContainsKey(keys[0]) {
keys = keys[1:]
}
if len(keys) == 0 {
return
}
@ -981,11 +995,8 @@ func (d *indirectIndex) Delete(keys [][]byte) {
// Both keys and offsets are sorted. Walk both in order and skip
// any keys that exist in both.
d.mu.Lock()
for i := 0; i+4 <= len(d.offsets); i += 4 {
for len(keys) > 0 && !d.ContainsKey(keys[0]) {
keys = keys[1:]
}
start := d.searchOffset(keys[0])
for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 {
offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
_, indexKey := readKey(d.b[offset:])
@ -996,7 +1007,6 @@ func (d *indirectIndex) Delete(keys [][]byte) {
if len(keys) > 0 && bytes.Equal(keys[0], indexKey) {
keys = keys[1:]
copy(d.offsets[i:i+4], nilOffset[:])
continue
}
}
d.offsets = bytesutil.Pack(d.offsets, 4, 255)

View File

@ -44,10 +44,11 @@ type Tombstoner struct {
// These are references used for pending writes that have not been committed. If
// these are nil, then no pending writes are in progress.
gz *gzip.Writer
bw *bufio.Writer
pendingFile *os.File
tmp [8]byte
gz *gzip.Writer
bw *bufio.Writer
pendingFile *os.File
tmp [8]byte
lastAppliedOffset int64
}
// Tombstone represents an individual deletion.
@ -153,6 +154,8 @@ func (t *Tombstoner) Delete() error {
return err
}
t.statsLoaded = false
t.lastAppliedOffset = 0
return nil
}
@ -202,6 +205,9 @@ func (t *Tombstoner) TombstoneFiles() []FileStat {
// Walk calls fn for every Tombstone under the Tombstoner.
func (t *Tombstoner) Walk(fn func(t Tombstone) error) error {
t.mu.Lock()
defer t.mu.Unlock()
f, err := os.Open(t.tombstonePath())
if os.IsNotExist(err) {
return nil
@ -569,10 +575,15 @@ func (t *Tombstoner) readTombstoneV3(f *os.File, fn func(t Tombstone) error) err
// of storing multiple v3 files appended together.
func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) error {
// Skip header, already checked earlier
if _, err := f.Seek(headerSize, io.SeekStart); err != nil {
return err
if t.lastAppliedOffset != 0 {
if _, err := f.Seek(t.lastAppliedOffset, io.SeekStart); err != nil {
return err
}
} else {
if _, err := f.Seek(headerSize, io.SeekStart); err != nil {
return err
}
}
var (
min, max int64
key []byte
@ -580,7 +591,9 @@ func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) err
br := bufio.NewReaderSize(f, 64*1024)
gr, err := gzip.NewReader(br)
if err != nil {
if err == io.EOF {
return nil
} else if err != nil {
return err
}
defer gr.Close()
@ -640,9 +653,18 @@ func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) err
err = gr.Reset(br)
if err == io.EOF {
return nil
break
}
}
// Save the position of tombstone file so we don't re-apply the same set again if there are
// more deletes.
pos, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
t.lastAppliedOffset = pos
return nil
}
func (t *Tombstoner) tombstonePath() string {

View File

@ -1027,9 +1027,11 @@ var errMaxSeriesPerDatabaseExceeded = errors.New("max series per database exceed
type seriesIterator struct {
keys [][]byte
elem series
}
type series struct {
tsdb.SeriesElem
name []byte
tags models.Tags
deleted bool
@ -1044,8 +1046,10 @@ func (itr *seriesIterator) Next() tsdb.SeriesElem {
if len(itr.keys) == 0 {
return nil
}
name, tags := models.ParseKeyBytes(itr.keys[0])
s := series{name: name, tags: tags}
itr.elem.name = name
itr.elem.tags = tags
itr.keys = itr.keys[1:]
return s
return &itr.elem
}