Merge pull request #10339 from influxdata/bj-fix-series-index-tombstone
Fix series file tombstoning.pull/10341/head
commit
1580f90be4
|
@ -356,7 +356,6 @@ func (v Verify) VerifyIndex(indexPath string, segments []*tsdb.SeriesSegment,
|
|||
|
||||
// check both that the offset is right and that we get the right
|
||||
// id for the key
|
||||
|
||||
if gotOffset := index.FindOffsetByID(id); gotOffset != expectedOffset {
|
||||
v.Logger.Error("Index inconsistency",
|
||||
zap.Uint64("id", id),
|
||||
|
|
|
@ -12,13 +12,18 @@ import (
|
|||
"github.com/influxdata/influxdb/cmd/influx_inspect/verify/seriesfile"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestVerifies_Valid(t *testing.T) {
|
||||
test := NewTest(t)
|
||||
defer test.Close()
|
||||
|
||||
passed, err := seriesfile.NewVerify().VerifySeriesFile(test.Path)
|
||||
verify := seriesfile.NewVerify()
|
||||
if testing.Verbose() {
|
||||
verify.Logger, _ = zap.NewDevelopment()
|
||||
}
|
||||
passed, err := verify.VerifySeriesFile(test.Path)
|
||||
test.AssertNoError(err)
|
||||
test.Assert(passed)
|
||||
}
|
||||
|
|
|
@ -124,6 +124,42 @@ func TestSeriesFileCompactor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure series file deletions persist across compactions.
|
||||
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := sfile.ForceCompact(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Delete and ensure deletion.
|
||||
if err := sfile.DeleteSeriesID(ids0[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(ids0[0]) {
|
||||
t.Fatal("expected deletion before compaction")
|
||||
}
|
||||
|
||||
if err := sfile.ForceCompact(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(ids0[0]) {
|
||||
t.Fatal("expected deletion after compaction")
|
||||
}
|
||||
|
||||
if err := sfile.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !sfile.IsDeleted(ids0[0]) {
|
||||
t.Fatal("expected deletion after reopen")
|
||||
}
|
||||
}
|
||||
|
||||
// Series represents name/tagset pairs that are used in testing.
|
||||
type Series struct {
|
||||
Name []byte
|
||||
|
@ -160,3 +196,22 @@ func (f *SeriesFile) Close() error {
|
|||
defer os.RemoveAll(f.Path())
|
||||
return f.SeriesFile.Close()
|
||||
}
|
||||
|
||||
// Reopen close & reopens the series file.
|
||||
func (f *SeriesFile) Reopen() error {
|
||||
if err := f.SeriesFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
f.SeriesFile = tsdb.NewSeriesFile(f.SeriesFile.Path())
|
||||
return f.SeriesFile.Open()
|
||||
}
|
||||
|
||||
// ForceCompact executes an immediate compaction across all partitions.
|
||||
func (f *SeriesFile) ForceCompact() error {
|
||||
for _, p := range f.Partitions() {
|
||||
if err := tsdb.NewSeriesPartitionCompactor().Compact(p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -155,8 +155,10 @@ func (idx *SeriesIndex) Delete(id uint64) {
|
|||
|
||||
// IsDeleted returns true if series id has been deleted.
|
||||
func (idx *SeriesIndex) IsDeleted(id uint64) bool {
|
||||
_, ok := idx.tombstones[id]
|
||||
return ok
|
||||
if _, ok := idx.tombstones[id]; ok {
|
||||
return true
|
||||
}
|
||||
return idx.FindOffsetByID(id) == 0
|
||||
}
|
||||
|
||||
func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) {
|
||||
|
@ -261,6 +263,11 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
tombstones[id] = struct{}{}
|
||||
}
|
||||
|
||||
idOffsetMap := make(map[uint64]int64)
|
||||
for k, v := range idx.idOffsetMap {
|
||||
idOffsetMap[k] = v
|
||||
}
|
||||
|
||||
return &SeriesIndex{
|
||||
path: idx.path,
|
||||
count: idx.count,
|
||||
|
@ -272,6 +279,7 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
keyIDData: idx.keyIDData,
|
||||
idOffsetData: idx.idOffsetData,
|
||||
tombstones: tombstones,
|
||||
idOffsetMap: idOffsetMap,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -309,6 +309,13 @@ func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Flush active segment write.
|
||||
if segment := p.activeSegment(); segment != nil {
|
||||
if err := segment.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Mark tombstone in memory.
|
||||
p.index.Delete(id)
|
||||
|
||||
|
@ -549,8 +556,9 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
|
|||
errDone := errors.New("done")
|
||||
|
||||
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
|
||||
|
||||
// Make sure we don't go past the offset where the compaction began.
|
||||
if offset >= index.maxOffset {
|
||||
if offset > index.maxOffset {
|
||||
return errDone
|
||||
}
|
||||
|
||||
|
@ -572,14 +580,14 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui
|
|||
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
|
||||
}
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
|
||||
|
||||
// Ignore entry if tombstoned.
|
||||
if index.IsDeleted(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
|
||||
|
||||
// Insert into maps.
|
||||
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
|
||||
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
|
||||
|
|
Loading…
Reference in New Issue