From c0a46d2d3d1c31f2873a8f8cc810b2b737c04667 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 5 Jan 2018 14:42:01 -0700 Subject: [PATCH] Fix series file compaction stall. The series file compaction previously did not snapshot the max offset before compacting and would keep compacting until it reached the end of segment file. This caused more entries than expected into the RHH map and this map gets exponentially slower as it gets close to full. --- pkg/rhh/rhh.go | 12 +++--------- tsdb/series_file.go | 22 ++++++++++++++-------- tsdb/series_index.go | 9 +++++++++ 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pkg/rhh/rhh.go b/pkg/rhh/rhh.go index f4e7f42959..0530715900 100644 --- a/pkg/rhh/rhh.go +++ b/pkg/rhh/rhh.go @@ -244,15 +244,9 @@ func HashKey(key []byte) int64 { // HashUint64 computes a hash of an int64. Hash is always non-zero. func HashUint64(key uint64) int64 { - hash := xxhash.New() - binary.Write(hash, binary.BigEndian, key) - h := int64(hash.Sum64()) - if h == 0 { - h = 1 - } else if h < 0 { - h = 0 - h - } - return h + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, key) + return HashKey(buf) } // Dist returns the probe distance for a hash in a slot index. diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 1178b1476d..b1a8599fdd 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -666,7 +666,14 @@ func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, // Reindex all partitions. for _, segment := range segments { + errDone := errors.New("done") + if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + // Make sure we don't go past the offset where the compaction began. + if offset >= index.maxOffset { + return errDone + } + // Only process insert entries. switch flag { case SeriesEntryInsertFlag: // fallthrough @@ -681,13 +688,15 @@ func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, return nil } - // Save highest id/offset to header. + // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) - }); err != nil { + }); err == errDone { + break + } else if err != nil { return err } } @@ -730,7 +739,8 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segment hash := rhh.HashKey(key) // Continue searching until we find an empty slot or lower probe distance. - for dist, pos := int64(0), hash&mask; ; dist, pos = dist+1, (pos+1)&mask { + for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + assert(i <= capacity, "key/id map full") elem := dst[(pos * SeriesIndexElemSize):] // If empty slot found or matching offset, insert and exit. @@ -749,7 +759,6 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segment // If the existing elem has probed less than us, then swap places with // existing elem, and keep going to find another slot for that elem. if d := rhh.Dist(elemHash, pos, capacity); d < dist { - // Insert current values. binary.BigEndian.PutUint64(elem[:8], uint64(offset)) binary.BigEndian.PutUint64(elem[8:], id) @@ -769,6 +778,7 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id u // Continue searching until we find an empty slot or lower probe distance. for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + assert(i <= capacity, "id/offset map full") elem := dst[(pos * SeriesIndexElemSize):] // If empty slot found or matching id, insert and exit. @@ -796,10 +806,6 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id u // Update current distance. dist = d } - - if i > capacity { - panic("rhh map full") - } } } diff --git a/tsdb/series_index.go b/tsdb/series_index.go index 078d0f1776..dbcff6ecb4 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -165,6 +165,13 @@ func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byt idx.keyIDMap.Put(key, id) idx.idOffsetMap[id] = offset + if id > idx.maxSeriesID { + idx.maxSeriesID = id + } + if offset > idx.maxOffset { + idx.maxOffset = offset + } + case SeriesEntryTombstoneFlag: idx.tombstones[id] = struct{}{} @@ -255,6 +262,8 @@ func (idx *SeriesIndex) Clone() *SeriesIndex { count: idx.count, capacity: idx.capacity, mask: idx.mask, + maxSeriesID: idx.maxSeriesID, + maxOffset: idx.maxOffset, data: idx.data, keyIDData: idx.keyIDData, idOffsetData: idx.idOffsetData,