Refactoring delete tests

pull/9315/head
Edd Robinson 2018-01-11 21:14:11 +00:00
parent 74481b9415
commit a4bef3a4bc
5 changed files with 65 additions and 44 deletions

View File

@ -1368,7 +1368,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
name, tags := models.ParseKey(k) name, tags := models.ParseKey(k)
sid := e.sfile.SeriesID([]byte(name), tags, buf) sid := e.sfile.SeriesID([]byte(name), tags, buf)
if sid == 0 { if sid == 0 {
return fmt.Errorf("unable to find id for series key %s during deletion", k) return fmt.Errorf("unable to find id for series key %q during deletion", k)
} }
id := (sid << 32) | e.id id := (sid << 32) | e.id

View File

@ -14,7 +14,6 @@ import (
"path/filepath" "path/filepath"
"reflect" "reflect"
"runtime" "runtime"
"sort"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -947,7 +946,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
engine.MustAddSeries("mem", map[string]string{"host": "z"}) engine.MustAddSeries("mem", map[string]string{"host": "z"})
// Collect series IDs. // Collect series IDs.
var ids []uint64 seriesIDMap := map[string]uint64{}
var e tsdb.SeriesIDElem var e tsdb.SeriesIDElem
var err error var err error
@ -958,10 +957,13 @@ func TestIndex_SeriesIDSet(t *testing.T) {
} else if e.SeriesID == 0 { } else if e.SeriesID == 0 {
break break
} }
ids = append(ids, e.SeriesID)
name, tags := tsdb.ParseSeriesKey(engine.sfile.SeriesKey(e.SeriesID))
key := fmt.Sprintf("%s%s", name, tags.HashKey())
seriesIDMap[key] = e.SeriesID
} }
for _, id := range ids { for _, id := range seriesIDMap {
if !engine.SeriesIDSet().Contains(id) { if !engine.SeriesIDSet().Contains(id) {
return fmt.Errorf("bitmap does not contain ID: %d", id) return fmt.Errorf("bitmap does not contain ID: %d", id)
} }
@ -973,24 +975,29 @@ func TestIndex_SeriesIDSet(t *testing.T) {
return err return err
} }
if engine.SeriesIDSet().Contains(seriesIDMap["gpu"]) {
return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["gpu"], "gpu")
} else if engine.SeriesIDSet().Contains(seriesIDMap["gpu,host=b"]) {
return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["gpu,host=b"], "gpu,host=b")
}
delete(seriesIDMap, "gpu")
delete(seriesIDMap, "gpu,host=b")
// Drop the specific mem series // Drop the specific mem series
ditr := &seriesIterator{keys: [][]byte{[]byte("mem,host=z")}} ditr := &seriesIterator{keys: [][]byte{[]byte("mem,host=z")}}
if err := engine.DeleteSeriesRange(ditr, math.MinInt64, math.MaxInt64, true); err != nil { if err := engine.DeleteSeriesRange(ditr, math.MinInt64, math.MaxInt64); err != nil {
return err return err
} }
// Since series IDs are added sequentially, the last two would be the if engine.SeriesIDSet().Contains(seriesIDMap["mem,host=z"]) {
// series for the gpu measurement... return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["mem,host=z"], "mem,host=z")
for _, id := range ids { }
contains := engine.SeriesIDSet().Contains(id) delete(seriesIDMap, "mem,host=z")
key, _ := engine.sfile.Series(id)
isGpu := bytes.Equal(key, []byte("gpu"))
isMem := bytes.Equal(key, []byte("mem"))
if (isGpu || isMem) && contains { // The rest of the keys should still be in the set.
return fmt.Errorf("bitmap still contains ID: %d after delete: %s", id, string(key)) for key, id := range seriesIDMap {
} else if !(isGpu || isMem) && !contains { if !engine.SeriesIDSet().Contains(id) {
return fmt.Errorf("bitmap does not contain ID: %d, but should: %s", id, string(key)) return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", id, key)
} }
} }
@ -999,17 +1006,14 @@ func TestIndex_SeriesIDSet(t *testing.T) {
panic(err) panic(err)
} }
for _, id := range ids { // Check bitset is expected.
contains := engine.SeriesIDSet().Contains(id) expected := tsdb.NewSeriesIDSet()
key, _ := engine.sfile.Series(id) for _, id := range seriesIDMap {
isGpu := bytes.Equal(key, []byte("gpu")) expected.Add(id)
isMem := bytes.Equal(key, []byte("mem")) }
if (isGpu || isMem) && contains { if !engine.SeriesIDSet().Equals(expected) {
return fmt.Errorf("[after re-open] bitmap still contains ID: %d after delete: %s", id, string(key)) return fmt.Errorf("got bitset %s, expected %s", engine.SeriesIDSet().String(), expected.String())
} else if !(isGpu || isMem) && !contains {
return fmt.Errorf("[after re-open] bitmap does not contain ID: %d, but should: %s", id, string(key))
}
} }
return nil return nil
} }
@ -1175,7 +1179,6 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
if got, exp := tags, models.NewTags(map[string]string{"host": "B"}); !got.Equal(exp) { if got, exp := tags, models.NewTags(map[string]string{"host": "B"}); !got.Equal(exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp) t.Fatalf("series mismatch: got %s, exp %s", got, exp)
} }
sort.Strings(gotKeys)
iter.Close() iter.Close()
@ -1789,12 +1792,11 @@ func MustInitDefaultBenchmarkEngine(pointN int) *Engine {
// Engine is a test wrapper for tsm1.Engine. // Engine is a test wrapper for tsm1.Engine.
type Engine struct { type Engine struct {
*tsm1.Engine *tsm1.Engine
root string root string
indexPath string indexPath string
indexType string indexType string
index tsdb.Index index tsdb.Index
seriesIDSet *tsdb.SeriesIDSet sfile *tsdb.SeriesFile
sfile *tsdb.SeriesFile
} }
// NewEngine returns a new instance of Engine at a temporary location. // NewEngine returns a new instance of Engine at a temporary location.
@ -1834,13 +1836,12 @@ func NewEngine(index string) (*Engine, error) {
tsm1Engine := tsm1.NewEngine(1, idx, db, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine) tsm1Engine := tsm1.NewEngine(1, idx, db, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine)
return &Engine{ return &Engine{
Engine: tsm1Engine, Engine: tsm1Engine,
root: root, root: root,
indexPath: idxPath, indexPath: idxPath,
indexType: index, indexType: index,
index: idx, index: idx,
seriesIDSet: seriesIDs, sfile: sfile,
sfile: sfile,
}, nil }, nil
} }
@ -1896,9 +1897,12 @@ func (e *Engine) Reopen() error {
opt := tsdb.NewEngineOptions() opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex(db, e.sfile) opt.InmemIndex = inmem.NewIndex(db, e.sfile)
// Re-initialise the series id set
seriesIDSet := tsdb.NewSeriesIDSet()
opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDSet})
// Re-open index. // Re-open index.
e.seriesIDSet = tsdb.NewSeriesIDSet() e.index = tsdb.MustOpenIndex(1, db, e.indexPath, seriesIDSet, e.sfile, opt)
e.index = tsdb.MustOpenIndex(1, db, e.indexPath, e.seriesIDSet, e.sfile, opt)
// Re-initialize engine. // Re-initialize engine.
e.Engine = tsm1.NewEngine(1, e.index, db, filepath.Join(e.root, "data"), filepath.Join(e.root, "wal"), e.sfile, opt).(*tsm1.Engine) e.Engine = tsm1.NewEngine(1, e.index, db, filepath.Join(e.root, "data"), filepath.Join(e.root, "wal"), e.sfile, opt).(*tsm1.Engine)

View File

@ -143,6 +143,7 @@ func (i *Index) SeriesIDSet() *tsdb.SeriesIDSet {
seriesIDSet := tsdb.NewSeriesIDSet() seriesIDSet := tsdb.NewSeriesIDSet()
others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN) others := make([]*tsdb.SeriesIDSet, 0, i.PartitionN)
for _, p := range i.partitions { for _, p := range i.partitions {
fmt.Printf("Partition %s: bitset to merge is %v\n", p.id, p.seriesSet)
others = append(others, p.seriesSet) others = append(others, p.seriesSet)
} }
seriesIDSet.Merge(others...) seriesIDSet.Merge(others...)
@ -480,7 +481,7 @@ func (i *Index) CreateSeriesListIfNotExists(_ [][]byte, names [][]byte, tagsSlic
// Determine partition for series using each series key. // Determine partition for series using each series key.
buf := make([]byte, 2048) buf := make([]byte, 2048)
for k, _ := range names { for k := range names {
buf = tsdb.AppendSeriesKey(buf[:0], names[k], tagsSlice[k]) buf = tsdb.AppendSeriesKey(buf[:0], names[k], tagsSlice[k])
pidx := i.partitionIdx(buf) pidx := i.partitionIdx(buf)

View File

@ -611,6 +611,9 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
name, tags := models.ParseKeyBytes(key) name, tags := models.ParseKeyBytes(key)
seriesID := i.sfile.SeriesID(name, tags, nil) seriesID := i.sfile.SeriesID(name, tags, nil)
if seriesID == 0 {
return fmt.Errorf("[partition %s] no series id for key %q when attempting index drop", i.id, string(key))
}
// Remove from series id set. // Remove from series id set.
i.seriesSet.Remove(seriesID) i.seriesSet.Remove(seriesID)

View File

@ -72,6 +72,19 @@ func (s *SeriesIDSet) Merge(others ...*SeriesIDSet) {
s.bitmap = roaring.FastOr(bms...) s.bitmap = roaring.FastOr(bms...)
} }
// Equals returns true if other and s are the same set of ids.
func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool {
if s == other {
return true
}
s.RLock()
defer s.RUnlock()
other.RLock()
other.RUnlock()
return s.bitmap.Equals(other.bitmap)
}
// AndNot returns the set of elements that only exist in s. // AndNot returns the set of elements that only exist in s.
func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet { func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
s.RLock() s.RLock()