perf(tsi1): batch write tombstone entries when dropping/deleting

pull/18689/head
dengzhi.ldz 2020-03-13 17:29:33 +08:00 committed by Ben Johnson
parent 57ea78e984
commit 331569bc11
7 changed files with 220 additions and 30 deletions

View File

@ -1673,6 +1673,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
ids := tsdb.NewSeriesIDSet() ids := tsdb.NewSeriesIDSet()
measurements := make(map[string]struct{}, 1) measurements := make(map[string]struct{}, 1)
deleteIDList := make([]uint64, 0, 10000)
deleteKeyList := make([][]byte, 0, 10000)
for _, k := range seriesKeys { for _, k := range seriesKeys {
if len(k) == 0 { if len(k) == 0 {
continue // This key was wiped because it shouldn't be removed from index. continue // This key was wiped because it shouldn't be removed from index.
@ -1702,15 +1705,18 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
continue continue
} }
// Insert deleting series info into queue
measurements[string(name)] = struct{}{} measurements[string(name)] = struct{}{}
// Remove the series from the local index. deleteIDList = append(deleteIDList, sid)
if err := e.index.DropSeries(sid, k, false); err != nil { deleteKeyList = append(deleteKeyList, k)
return err
}
// Add the id to the set of delete ids. // Add the id to the set of delete ids.
ids.Add(sid) ids.Add(sid)
} }
// Remove the series from the local index.
if err := e.index.DropSeriesList(deleteIDList, deleteKeyList, false); err != nil {
return err
}
fielsetChanged := false fielsetChanged := false
for k := range measurements { for k := range measurements {

View File

@ -43,6 +43,7 @@ type Index interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(seriesID uint64, key []byte, cascade bool) error DropSeries(seriesID uint64, key []byte, cascade bool) error
DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) (bool, error) DropMeasurementIfSeriesNotExist(name []byte) (bool, error)
// Used to clean up series in inmem index that were dropped with a shard. // Used to clean up series in inmem index that were dropped with a shard.

View File

@ -1135,6 +1135,30 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, key []byte, _ bool) error {
return nil return nil
} }
// DropSeriesList removes the provided series ids from the local bitset that tracks
// series in this shard only.
func (idx *ShardIndex) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
// All slices must be of equal length.
if len(seriesIDs) != len(keys) {
return errors.New("seriesIDs/keys length mismatch in index")
}
idx.seriesIDSet.Lock()
for i, seriesID := range seriesIDs {
if idx.seriesIDSet.ContainsNoLock(seriesID) {
idx.seriesIDSet.RemoveNoLock(seriesID)
name := models.ParseName(keys[i])
if curr := idx.measurements[string(name)]; curr <= 1 {
delete(idx.measurements, string(name))
} else {
idx.measurements[string(name)] = curr - 1
}
}
}
idx.seriesIDSet.Unlock()
return nil
}
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
// series for the measurment. // series for the measurment.
func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {

View File

@ -824,6 +824,64 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
return nil return nil
} }
// DropSeries drops the provided series from the index. If cascade is true
// and this is the last series to the measurement, the measurment will also be dropped.
func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
// All slices must be of equal length.
if len(seriesIDs) != len(keys) {
return errors.New("seriesIDs/keys length mismatch in index")
}
// We need to move different series into collections for each partition
// to process.
pSeriesIDs := make([][]uint64, i.PartitionN)
pKeys := make([][][]byte, i.PartitionN)
for idx, key := range keys {
pidx := i.partitionIdx(key)
pSeriesIDs[pidx] = append(pSeriesIDs[pidx], seriesIDs[idx])
pKeys[pidx] = append(pKeys[pidx], key)
}
// Process each subset of series on each partition.
n := i.availableThreads()
// Store errors.
errC := make(chan error, i.PartitionN)
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx >= len(i.partitions) {
return // No more work.
}
// Drop from partition.
err := i.partitions[idx].DropSeriesList(pSeriesIDs[idx])
errC <- err
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
// Add sketch tombstone.
i.mu.Lock()
for _, key := range keys {
i.sTSketch.Add(key)
}
i.mu.Unlock()
return nil
}
// DropSeriesGlobal is a no-op on the tsi1 index. // DropSeriesGlobal is a no-op on the tsi1 index.
func (i *Index) DropSeriesGlobal(key []byte) error { return nil } func (i *Index) DropSeriesGlobal(key []byte) error { return nil }

View File

@ -466,6 +466,69 @@ func TestIndex_TagValueSeriesIDIterator(t *testing.T) {
}) })
} }
func TestIndex_DropSeriesList(t *testing.T) {
idx := MustOpenDefaultIndex() // Uses the single series creation method CreateSeriesIfNotExists
defer idx.Close()
// Add some series.
data := []struct {
Key string
Name string
Tags map[string]string
}{
{"cpu,region=west,server=a", "cpu", map[string]string{"region": "west", "server": "a"}},
{"cpu,region=west,server=b", "cpu", map[string]string{"region": "west", "server": "b"}},
{"cpu,region=west,server=c", "cpu", map[string]string{"region": "west", "server": "c"}},
{"cpu,region=east,server=a", "cpu", map[string]string{"region": "east", "server": "a"}},
{"cpu,region=east,server=c", "cpu", map[string]string{"region": "east", "server": "c"}},
{"cpu,region=east,server=d", "cpu", map[string]string{"region": "east", "server": "d"}},
{"cpu,region=north,server=b", "cpu", map[string]string{"region": "north", "server": "b"}},
{"cpu,region=north,server=c", "cpu", map[string]string{"region": "north", "server": "c"}},
{"cpu,region=north,server=d", "cpu", map[string]string{"region": "north", "server": "d"}},
{"cpu,region=south,server=a", "cpu", map[string]string{"region": "south", "server": "a"}},
{"cpu,region=south,server=d", "cpu", map[string]string{"region": "south", "server": "d"}},
}
keys := make([][]byte, 0, 15)
seriesIDs := make([]uint64, 0, 15)
for _, pt := range data {
if err := idx.CreateSeriesIfNotExists([]byte(pt.Key), []byte(pt.Name), models.NewTags(pt.Tags)); err != nil {
t.Fatal(err)
}
keys = append(keys, []byte(pt.Key))
seriesIDs = append(seriesIDs, idx.Index.SeriesFile().SeriesID([]byte(pt.Name), models.NewTags(pt.Tags), nil))
}
// Drop series list
if err := idx.DropSeriesList(seriesIDs[0:len(seriesIDs)-2], keys[0:len(keys)-2], false); err != nil {
t.Fatal(err)
}
// Verify series still exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected series to still exist")
}
})
// Drop series list lefted
if err := idx.DropSeriesList(seriesIDs[len(seriesIDs)-2:], keys[len(keys)-2:], false); err != nil {
t.Fatal(err)
}
// Verify series is now deleted.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected series to be deleted")
}
})
}
// Index is a test wrapper for tsi1.Index. // Index is a test wrapper for tsi1.Index.
type Index struct { type Index struct {
*tsi1.Index *tsi1.Index

View File

@ -578,6 +578,23 @@ func (f *LogFile) DeleteSeriesID(id uint64) error {
return f.FlushAndSync() return f.FlushAndSync()
} }
// DeleteSeriesIDList adds a tombstone for seriesIDList
func (f *LogFile) DeleteSeriesIDList(ids []uint64) error {
f.mu.Lock()
defer f.mu.Unlock()
for _, id := range ids {
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
}
// SeriesN returns the total number of series in the file. // SeriesN returns the total number of series in the file.
func (f *LogFile) SeriesN() (n uint64) { func (f *LogFile) SeriesN() (n uint64) {
f.mu.RLock() f.mu.RLock()
@ -1056,6 +1073,21 @@ func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error)
return sketch, tSketch, nil return sketch, tSketch, nil
} }
func (f *LogFile) Writes(entries []LogEntry) error {
f.mu.RLock()
defer f.mu.RUnlock()
for i := range entries {
entry := &entries[i]
if err := f.appendEntry(entry); err != nil {
return err
}
f.execEntry(entry)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
}
// LogEntry represents a single log entry in the write-ahead log. // LogEntry represents a single log entry in the write-ahead log.
type LogEntry struct { type LogEntry struct {
Flag byte // flag Flag byte // flag

View File

@ -585,31 +585,20 @@ func (p *Partition) DropMeasurement(name []byte) error {
} }
defer fs.Release() defer fs.Release()
entries := make([]LogEntry, 0, 100)
// Delete all keys and values. // Delete all keys and values.
if kitr := fs.TagKeyIterator(name); kitr != nil { if kitr := fs.TagKeyIterator(name); kitr != nil {
for k := kitr.Next(); k != nil; k = kitr.Next() { for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted. // Delete key if not already deleted.
if !k.Deleted() { if !k.Deleted() {
if err := func() error { entries = append(entries, LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: k.Key()})
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
}
} }
// Delete each value in key. // Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil { if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() { for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() { if !v.Deleted() {
if err := func() error { entries = append(entries, LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: k.Key(), Value: v.Value()})
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
}
} }
} }
} }
@ -626,13 +615,7 @@ func (p *Partition) DropMeasurement(name []byte) error {
} else if elem.SeriesID == 0 { } else if elem.SeriesID == 0 {
break break
} }
if err := func() error { entries = append(entries, LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: elem.SeriesID})
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteSeriesID(elem.SeriesID)
}(); err != nil {
return err
}
} }
if err = itr.Close(); err != nil { if err = itr.Close(); err != nil {
return err return err
@ -640,13 +623,14 @@ func (p *Partition) DropMeasurement(name []byte) error {
} }
// Mark measurement as deleted. // Mark measurement as deleted.
if err := func() error { entries = append(entries, LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name})
p.mu.RLock()
defer p.mu.RUnlock() p.mu.RLock()
return p.activeLogFile.DeleteMeasurement(name) if err := p.activeLogFile.Writes(entries); err != nil {
}(); err != nil { p.mu.RUnlock()
return err return err
} }
p.mu.RUnlock()
// Check if the log file needs to be swapped. // Check if the log file needs to be swapped.
if err := p.CheckLogFile(); err != nil { if err := p.CheckLogFile(); err != nil {
@ -705,6 +689,28 @@ func (p *Partition) DropSeries(seriesID uint64) error {
return p.CheckLogFile() return p.CheckLogFile()
} }
func (p *Partition) DropSeriesList(seriesIDs []uint64) error {
if len(seriesIDs) == 0 {
return nil
}
// Delete series from index.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteSeriesIDList(seriesIDs)
}(); err != nil {
return err
}
for _, seriesID := range seriesIDs {
p.seriesIDSet.Remove(seriesID)
}
// Swap log file, if necessary.
return p.CheckLogFile()
}
// MeasurementsSketches returns the two sketches for the partition by merging all // MeasurementsSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files. // instances of the type sketch types in all the index files.
func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {