diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index 7ace89add1..25e575ce20 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -169,6 +169,54 @@ func TestSeriesFile_Type(t *testing.T) { } } +// Ensure series file deletions persist across compactions. +func TestSeriesFile_DeleteSeriesID(t *testing.T) { + sfile := MustOpenSeriesFile() + defer sfile.Close() + + if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{ + Names: [][]byte{[]byte("m1")}, + Tags: []models.Tags{{}}, + Types: []models.FieldType{models.String}, + }); err != nil { + t.Fatal(err) + } else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{ + Names: [][]byte{[]byte("m2")}, + Tags: []models.Tags{{}}, + Types: []models.FieldType{models.String}, + }); err != nil { + t.Fatal(err) + } else if err := sfile.ForceCompact(); err != nil { + t.Fatal(err) + } + id := sfile.SeriesID([]byte("m1"), nil, nil) + + // Delete and ensure deletion. + if err := sfile.DeleteSeriesID(id); err != nil { + t.Fatal(err) + } else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{ + Names: [][]byte{[]byte("m1")}, + Tags: []models.Tags{{}}, + Types: []models.FieldType{models.String}, + }); err != nil { + t.Fatal(err) + } else if !sfile.IsDeleted(id) { + t.Fatal("expected deletion before compaction") + } + + if err := sfile.ForceCompact(); err != nil { + t.Fatal(err) + } else if !sfile.IsDeleted(id) { + t.Fatal("expected deletion after compaction") + } + + if err := sfile.Reopen(); err != nil { + t.Fatal(err) + } else if !sfile.IsDeleted(id) { + t.Fatal("expected deletion after reopen") + } +} + // Series represents name/tagset pairs that are used in testing. type Series struct { Name []byte @@ -206,3 +254,22 @@ func (f *SeriesFile) Close() error { defer os.RemoveAll(f.Path()) return f.SeriesFile.Close() } + +// Reopen close & reopens the series file. +func (f *SeriesFile) Reopen() error { + if err := f.SeriesFile.Close(); err != nil { + return err + } + f.SeriesFile = tsdb.NewSeriesFile(f.SeriesFile.Path()) + return f.SeriesFile.Open() +} + +// ForceCompact executes an immediate compaction across all partitions. +func (f *SeriesFile) ForceCompact() error { + for _, p := range f.Partitions() { + if err := tsdb.NewSeriesPartitionCompactor().Compact(p); err != nil { + return err + } + } + return nil +} diff --git a/tsdb/series_index.go b/tsdb/series_index.go index 1906c39854..1dab0ac095 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -156,8 +156,10 @@ func (idx *SeriesIndex) Delete(id SeriesID) { // IsDeleted returns true if series id has been deleted. func (idx *SeriesIndex) IsDeleted(id SeriesID) bool { - _, ok := idx.tombstones[id] - return ok + if _, ok := idx.tombstones[id]; ok { + return true + } + return idx.FindOffsetByID(id) == 0 } func (idx *SeriesIndex) execEntry(flag uint8, id SeriesIDTyped, offset int64, key []byte) { @@ -263,6 +265,11 @@ func (idx *SeriesIndex) Clone() *SeriesIndex { tombstones[id] = struct{}{} } + idOffsetMap := make(map[SeriesID]int64) + for k, v := range idx.idOffsetMap { + idOffsetMap[k] = v + } + return &SeriesIndex{ path: idx.path, count: idx.count, @@ -274,6 +281,7 @@ func (idx *SeriesIndex) Clone() *SeriesIndex { keyIDData: idx.keyIDData, idOffsetData: idx.idOffsetData, tombstones: tombstones, + idOffsetMap: idOffsetMap, } } diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index b6c926b688..aafd2e6242 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -339,6 +339,13 @@ func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error { return err } + // Flush active segment write. + if segment := p.activeSegment(); segment != nil { + if err := segment.Flush(); err != nil { + return err + } + } + // Mark tombstone in memory. p.index.Delete(id) @@ -580,7 +587,7 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui if err := segment.ForEachEntry(func(flag uint8, id SeriesIDTyped, offset int64, key []byte) error { // Make sure we don't go past the offset where the compaction began. - if offset >= index.maxOffset { + if offset > index.maxOffset { return errDone } @@ -604,14 +611,14 @@ func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN ui untypedID := id.SeriesID() + // Save max series identifier processed. + hdr.MaxSeriesID, hdr.MaxOffset = untypedID, offset + // Ignore entry if tombstoned. if index.IsDeleted(untypedID) { return nil } - // Save max series identifier processed. - hdr.MaxSeriesID, hdr.MaxOffset = untypedID, offset - // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, untypedID, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)