fix(influx_inspect): verify-seriesfile with deleted index entries

The series index looks at a set of tombstones when querying the id for
a given key, but it does not look when asking for the offset for some
id, even if that id is deleted.

Update the verify tooling to check that the index agrees with the
deleted status of the id, but skip doing the extra checks if the
id is deleted.
pull/13458/head
Jeff Wendling 2019-04-17 12:24:19 -06:00
parent 1d23b6ba6f
commit 6833c4d082
2 changed files with 25 additions and 13 deletions

View File

@ -353,26 +353,33 @@ func (v Verify) VerifyIndex(indexPath string, segments []*tsdb.SeriesSegment,
IDData := ids[id]
expectedOffset, expectedID := IDData.Offset, id
if IDData.Deleted {
expectedOffset, expectedID = 0, 0
}
// check both that the offset is right and that we get the right
// id for the key
if gotOffset := index.FindOffsetByID(id); gotOffset != expectedOffset {
if gotDeleted := index.IsDeleted(id); gotDeleted != IDData.Deleted {
v.Logger.Error("Index inconsistency",
zap.Uint64("id", id),
zap.Int64("got_offset", gotOffset),
zap.Int64("expected_offset", expectedOffset))
zap.Bool("got_deleted", gotDeleted),
zap.Bool("expected_deleted", IDData.Deleted))
return false, nil
}
if gotID := index.FindIDBySeriesKey(segments, IDData.Key); gotID != expectedID {
// do not perform any other checks if the id is deleted.
if IDData.Deleted {
continue
}
// otherwise, check both that the offset is right and that we get the right id for the key
if gotOffset := index.FindOffsetByID(id); gotOffset != IDData.Offset {
v.Logger.Error("Index inconsistency",
zap.Uint64("id", id),
zap.Int64("got_offset", gotOffset),
zap.Int64("expected_offset", IDData.Offset))
return false, nil
}
if gotID := index.FindIDBySeriesKey(segments, IDData.Key); gotID != id {
v.Logger.Error("Index inconsistency",
zap.Uint64("id", id),
zap.Uint64("got_id", gotID),
zap.Uint64("expected_id", expectedID))
zap.Uint64("expected_id", id))
return false, nil
}
}

View File

@ -102,11 +102,16 @@ func NewTest(t *testing.T) *Test {
tagsSlice = append(tagsSlice, nil)
}
_, err := seriesFile.CreateSeriesListIfNotExists(names, tagsSlice)
ids, err := seriesFile.CreateSeriesListIfNotExists(names, tagsSlice)
if err != nil {
return err
}
// delete one series
if err := seriesFile.DeleteSeriesID(ids[0]); err != nil {
return err
}
// wait for compaction to make sure we detect issues with the index
partitions := seriesFile.Partitions()
wait: