fix(tsdb): Fix series file count

Previously the series file did not include tombstones in the total
count. This commit now includes tombstones in the count as well as
fixes an issue where replayed tombstone records could exist but
their underlying ID did not exist. This caused the count to become
negative and with the count being `uint64` it caused the count to
rollover to `math.Uint64Max`.
pull/13770/head
Ben Johnson 2019-05-03 09:55:31 -06:00
parent 2968741fef
commit a5ccf5ce9a
No known key found for this signature in database
GPG Key ID: 81741CD251883081
3 changed files with 60 additions and 8 deletions

View File

@ -131,6 +131,11 @@ func TestSeriesFileCompactor(t *testing.T) {
t.Fatalf("series does not exist: %s,%s", iter.Name(), iter.Tags().String())
}
}
// Verify total number of series is correct.
if got, exp := sfile.SeriesCount(), uint64(len(collection.Names)); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (after compaction)", got, exp)
}
}
// Ensures that types are tracked and checked by the series file.
@ -192,29 +197,64 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) {
}
id := sfile.SeriesID([]byte("m1"), nil, nil)
// Verify total number of series is correct.
if got, exp := sfile.SeriesCount(), uint64(2); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (before deleted)", got, exp)
}
// Delete and ensure deletion.
if err := sfile.DeleteSeriesID(id); err != nil {
t.Fatal(err)
} else if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
Names: [][]byte{[]byte("m1")},
Tags: []models.Tags{{}},
Types: []models.FieldType{models.String},
}); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(id) {
t.Fatal("expected deletion before compaction")
}
// Verify total number of series is correct.
if got, exp := sfile.SeriesCount(), uint64(1); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (before compaction)", got, exp)
}
if err := sfile.ForceCompact(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(id) {
t.Fatal("expected deletion after compaction")
} else if got, exp := sfile.SeriesCount(), uint64(1); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (after compaction)", got, exp)
}
if err := sfile.Reopen(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(id) {
t.Fatal("expected deletion after reopen")
} else if got, exp := sfile.SeriesCount(), uint64(1); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (after reopen)", got, exp)
}
// Recreate series with new ID.
if err := sfile.CreateSeriesListIfNotExists(&tsdb.SeriesCollection{
Names: [][]byte{[]byte("m1")},
Tags: []models.Tags{{}},
Types: []models.FieldType{models.String},
}); err != nil {
t.Fatal(err)
} else if got, exp := sfile.SeriesCount(), uint64(2); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (after recreate)", got, exp)
}
if err := sfile.ForceCompact(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(id) {
t.Fatal("expected deletion after compaction")
} else if got, exp := sfile.SeriesCount(), uint64(2); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (after recreate & compaction)", got, exp)
}
if err := sfile.Reopen(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(id) {
t.Fatal("expected deletion after reopen")
} else if got, exp := sfile.SeriesCount(), uint64(2); got != exp {
t.Fatalf("SeriesCount()=%d, expected %d (after recreate & compaction)", got, exp)
}
}

View File

@ -165,7 +165,11 @@ func (idx *SeriesIndex) GrowBy(delta int) {
// Count returns the number of series in the index.
func (idx *SeriesIndex) Count() uint64 {
return idx.OnDiskCount() + idx.InMemCount()
n := int64(idx.OnDiskCount()+idx.InMemCount()) - int64(len(idx.tombstones))
if n < 0 {
n = 0
}
return uint64(n)
}
// OnDiskCount returns the number of series in the on-disk index.
@ -217,7 +221,11 @@ func (idx *SeriesIndex) execEntry(flag uint8, id SeriesIDTyped, offset int64, ke
}
case SeriesEntryTombstoneFlag:
// Only add to tombstone if it exists on disk or in-memory.
// This affects counts if a tombstone exists but the ID doesn't exist.
if idx.FindOffsetByID(untypedID) != 0 {
idx.tombstones[untypedID] = struct{}{}
}
default:
panic("unreachable")

View File

@ -55,6 +55,10 @@ func TestSeriesIndex_Delete(t *testing.T) {
} else if idx.IsDeleted(tsdb.NewSeriesID(2)) {
t.Fatal("expected series to exist")
}
if exp, got := idx.Count(), uint64(1); exp != got {
t.Fatalf("Count()=%d, expected %d", exp, got)
}
}
func TestSeriesIndex_FindIDBySeriesKey(t *testing.T) {