fix(tsdb): Fix series file tombstoning.
This commit fixes an issue with the series file compaction process where tombstones are lost after compaction and series existence checks are not correct. This commit also fixes some smaller flushing issues within the series file that mainly related to testing.pull/10616/head
parent
cf3c70a1a0
commit
3ce51e7f02
|
@ -169,6 +169,54 @@ func TestSeriesFile_Type(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure series file deletions persist across compactions.
|
||||
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
|
||||
Names: [][]byte{[]byte("m1")},
|
||||
Tags: []models.Tags{{}},
|
||||
Types: []models.FieldType{models.String},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
|
||||
Names: [][]byte{[]byte("m2")},
|
||||
Tags: []models.Tags{{}},
|
||||
Types: []models.FieldType{models.String},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.ForceCompact(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
id := sfile.SeriesID([]byte("m1"), nil, nil)
|
||||
|
||||
// Delete and ensure deletion.
|
||||
if err := sfile.DeleteSeriesID(id); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
|
||||
Names: [][]byte{[]byte("m1")},
|
||||
Tags: []models.Tags{{}},
|
||||
Types: []models.FieldType{models.String},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(id) {
|
||||
t.Fatal("expected deletion before compaction")
|
||||
}
|
||||
|
||||
if err := sfile.ForceCompact(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(id) {
|
||||
t.Fatal("expected deletion after compaction")
|
||||
}
|
||||
|
||||
if err := sfile.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(id) {
|
||||
t.Fatal("expected deletion after reopen")
|
||||
}
|
||||
}
|
||||
|
||||
// Series represents name/tagset pairs that are used in testing.
|
||||
type Series struct {
|
||||
Name []byte
|
||||
|
@ -206,3 +254,22 @@ func (f *SeriesFile) Close() error {
|
|||
defer os.RemoveAll(f.Path())
|
||||
return f.SeriesFile.Close()
|
||||
}
|
||||
|
||||
// Reopen close & reopens the series file.
|
||||
func (f *SeriesFile) Reopen() error {
|
||||
if err := f.SeriesFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
f.SeriesFile = tsdb.NewSeriesFile(f.SeriesFile.Path())
|
||||
return f.SeriesFile.Open()
|
||||
}
|
||||
|
||||
// ForceCompact executes an immediate compaction across all partitions.
|
||||
func (f *SeriesFile) ForceCompact() error {
|
||||
for _, p := range f.Partitions() {
|
||||
if err := tsdb.NewSeriesPartitionCompactor().Compact(p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -156,8 +156,10 @@ func (idx *SeriesIndex) Delete(id SeriesID) {
|
|||
|
||||
// IsDeleted returns true if series id has been deleted.
|
||||
func (idx *SeriesIndex) IsDeleted(id SeriesID) bool {
|
||||
_, ok := idx.tombstones[id]
|
||||
return ok
|
||||
if _, ok := idx.tombstones[id]; ok {
|
||||
return true
|
||||
}
|
||||
return idx.FindOffsetByID(id) == 0
|
||||
}
|
||||
|
||||
func (idx *SeriesIndex) execEntry(flag uint8, id SeriesIDTyped, offset int64, key []byte) {
|
||||
|
@ -263,6 +265,11 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
tombstones[id] = struct{}{}
|
||||
}
|
||||
|
||||
idOffsetMap := make(map[SeriesID]int64)
|
||||
for k, v := range idx.idOffsetMap {
|
||||
idOffsetMap[k] = v
|
||||
}
|
||||
|
||||
return &SeriesIndex{
|
||||
path: idx.path,
|
||||
count: idx.count,
|
||||
|
@ -274,6 +281,7 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
keyIDData: idx.keyIDData,
|
||||
idOffsetData: idx.idOffsetData,
|
||||
tombstones: tombstones,
|
||||
idOffsetMap: idOffsetMap,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -339,6 +339,13 @@ func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Flush active segment write.
|
||||
if segment := p.activeSegment(); segment != nil {
|
||||
if err := segment.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Mark tombstone in memory.
|
||||
p.index.Delete(id)
|
||||
|
||||
|
@ -580,7 +587,7 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
|
|||
|
||||
if err := segment.ForEachEntry(func(flag uint8, id SeriesIDTyped, offset int64, key []byte) error {
|
||||
// Make sure we don't go past the offset where the compaction began.
|
||||
if offset >= index.maxOffset {
|
||||
if offset > index.maxOffset {
|
||||
return errDone
|
||||
}
|
||||
|
||||
|
@ -604,14 +611,14 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
|
|||
|
||||
untypedID := id.SeriesID()
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = untypedID, offset
|
||||
|
||||
// Ignore entry if tombstoned.
|
||||
if index.IsDeleted(untypedID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = untypedID, offset
|
||||
|
||||
// Insert into maps.
|
||||
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, untypedID, offset)
|
||||
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
|
||||
|
|
Loading…
Reference in New Issue