Merge pull request #1000 from influxdata/bj-fix-series-index-tombstone
fix(tsdb): Fix series file tombstoning.pull/10616/head
commit
4d58c7df8c
|
@ -169,6 +169,54 @@ func TestSeriesFile_Type(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure series file deletions persist across compactions.
|
||||
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
|
||||
Names: [][]byte{[]byte("m1")},
|
||||
Tags: []models.Tags{{}},
|
||||
Types: []models.FieldType{models.String},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
|
||||
Names: [][]byte{[]byte("m2")},
|
||||
Tags: []models.Tags{{}},
|
||||
Types: []models.FieldType{models.String},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.ForceCompact(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
id := sfile.SeriesID([]byte("m1"), nil, nil)
|
||||
|
||||
// Delete and ensure deletion.
|
||||
if err := sfile.DeleteSeriesID(id); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
|
||||
Names: [][]byte{[]byte("m1")},
|
||||
Tags: []models.Tags{{}},
|
||||
Types: []models.FieldType{models.String},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(id) {
|
||||
t.Fatal("expected deletion before compaction")
|
||||
}
|
||||
|
||||
if err := sfile.ForceCompact(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(id) {
|
||||
t.Fatal("expected deletion after compaction")
|
||||
}
|
||||
|
||||
if err := sfile.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(id) {
|
||||
t.Fatal("expected deletion after reopen")
|
||||
}
|
||||
}
|
||||
|
||||
// Series represents name/tagset pairs that are used in testing.
|
||||
type Series struct {
|
||||
Name []byte
|
||||
|
@ -206,3 +254,22 @@ func (f *SeriesFile) Close() error {
|
|||
defer os.RemoveAll(f.Path())
|
||||
return f.SeriesFile.Close()
|
||||
}
|
||||
|
||||
// Reopen close & reopens the series file.
|
||||
func (f *SeriesFile) Reopen() error {
|
||||
if err := f.SeriesFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
f.SeriesFile = tsdb.NewSeriesFile(f.SeriesFile.Path())
|
||||
return f.SeriesFile.Open()
|
||||
}
|
||||
|
||||
// ForceCompact executes an immediate compaction across all partitions.
|
||||
func (f *SeriesFile) ForceCompact() error {
|
||||
for _, p := range f.Partitions() {
|
||||
if err := tsdb.NewSeriesPartitionCompactor().Compact(p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -156,8 +156,10 @@ func (idx *SeriesIndex) Delete(id SeriesID) {
|
|||
|
||||
// IsDeleted returns true if series id has been deleted.
|
||||
func (idx *SeriesIndex) IsDeleted(id SeriesID) bool {
|
||||
_, ok := idx.tombstones[id]
|
||||
return ok
|
||||
if _, ok := idx.tombstones[id]; ok {
|
||||
return true
|
||||
}
|
||||
return idx.FindOffsetByID(id) == 0
|
||||
}
|
||||
|
||||
func (idx *SeriesIndex) execEntry(flag uint8, id SeriesIDTyped, offset int64, key []byte) {
|
||||
|
@ -263,6 +265,11 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
tombstones[id] = struct{}{}
|
||||
}
|
||||
|
||||
idOffsetMap := make(map[SeriesID]int64)
|
||||
for k, v := range idx.idOffsetMap {
|
||||
idOffsetMap[k] = v
|
||||
}
|
||||
|
||||
return &SeriesIndex{
|
||||
path: idx.path,
|
||||
count: idx.count,
|
||||
|
@ -274,6 +281,7 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
keyIDData: idx.keyIDData,
|
||||
idOffsetData: idx.idOffsetData,
|
||||
tombstones: tombstones,
|
||||
idOffsetMap: idOffsetMap,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -339,6 +339,13 @@ func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Flush active segment write.
|
||||
if segment := p.activeSegment(); segment != nil {
|
||||
if err := segment.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Mark tombstone in memory.
|
||||
p.index.Delete(id)
|
||||
|
||||
|
@ -580,7 +587,7 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
|
|||
|
||||
if err := segment.ForEachEntry(func(flag uint8, id SeriesIDTyped, offset int64, key []byte) error {
|
||||
// Make sure we don't go past the offset where the compaction began.
|
||||
if offset >= index.maxOffset {
|
||||
if offset > index.maxOffset {
|
||||
return errDone
|
||||
}
|
||||
|
||||
|
@ -604,14 +611,14 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
|
|||
|
||||
untypedID := id.SeriesID()
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = untypedID, offset
|
||||
|
||||
// Ignore entry if tombstoned.
|
||||
if index.IsDeleted(untypedID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = untypedID, offset
|
||||
|
||||
// Insert into maps.
|
||||
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, untypedID, offset)
|
||||
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
|
||||
|
|
Loading…
Reference in New Issue