diff --git a/pkg/rhh/rhh.go b/pkg/rhh/rhh.go index f4e7f42959..0530715900 100644 --- a/pkg/rhh/rhh.go +++ b/pkg/rhh/rhh.go @@ -244,15 +244,9 @@ func HashKey(key []byte) int64 { // HashUint64 computes a hash of an int64. Hash is always non-zero. func HashUint64(key uint64) int64 { - hash := xxhash.New() - binary.Write(hash, binary.BigEndian, key) - h := int64(hash.Sum64()) - if h == 0 { - h = 1 - } else if h < 0 { - h = 0 - h - } - return h + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, key) + return HashKey(buf) } // Dist returns the probe distance for a hash in a slot index. diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 1178b1476d..b1a8599fdd 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -666,7 +666,14 @@ func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, // Reindex all partitions. for _, segment := range segments { + errDone := errors.New("done") + if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + // Make sure we don't go past the offset where the compaction began. + if offset >= index.maxOffset { + return errDone + } + // Only process insert entries. switch flag { case SeriesEntryInsertFlag: // fallthrough @@ -681,13 +688,15 @@ func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, return nil } - // Save highest id/offset to header. + // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) - }); err != nil { + }); err == errDone { + break + } else if err != nil { return err } } @@ -730,7 +739,8 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segment hash := rhh.HashKey(key) // Continue searching until we find an empty slot or lower probe distance. - for dist, pos := int64(0), hash&mask; ; dist, pos = dist+1, (pos+1)&mask { + for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + assert(i <= capacity, "key/id map full") elem := dst[(pos * SeriesIndexElemSize):] // If empty slot found or matching offset, insert and exit. @@ -749,7 +759,6 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segment // If the existing elem has probed less than us, then swap places with // existing elem, and keep going to find another slot for that elem. if d := rhh.Dist(elemHash, pos, capacity); d < dist { - // Insert current values. binary.BigEndian.PutUint64(elem[:8], uint64(offset)) binary.BigEndian.PutUint64(elem[8:], id) @@ -769,6 +778,7 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id u // Continue searching until we find an empty slot or lower probe distance. for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + assert(i <= capacity, "id/offset map full") elem := dst[(pos * SeriesIndexElemSize):] // If empty slot found or matching id, insert and exit. @@ -796,10 +806,6 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id u // Update current distance. dist = d } - - if i > capacity { - panic("rhh map full") - } } } diff --git a/tsdb/series_index.go b/tsdb/series_index.go index 078d0f1776..dbcff6ecb4 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -165,6 +165,13 @@ func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byt idx.keyIDMap.Put(key, id) idx.idOffsetMap[id] = offset + if id > idx.maxSeriesID { + idx.maxSeriesID = id + } + if offset > idx.maxOffset { + idx.maxOffset = offset + } + case SeriesEntryTombstoneFlag: idx.tombstones[id] = struct{}{} @@ -255,6 +262,8 @@ func (idx *SeriesIndex) Clone() *SeriesIndex { count: idx.count, capacity: idx.capacity, mask: idx.mask, + maxSeriesID: idx.maxSeriesID, + maxOffset: idx.maxOffset, data: idx.data, keyIDData: idx.keyIDData, idOffsetData: idx.idOffsetData,