Merge pull request #9288 from influxdata/bj-fix-stalled-series-file-compaction
Fix series file compaction stall.pull/9291/head
commit
f2adaf68a7
|
@ -244,15 +244,9 @@ func HashKey(key []byte) int64 {
|
|||
|
||||
// HashUint64 computes a hash of an int64. Hash is always non-zero.
|
||||
func HashUint64(key uint64) int64 {
|
||||
hash := xxhash.New()
|
||||
binary.Write(hash, binary.BigEndian, key)
|
||||
h := int64(hash.Sum64())
|
||||
if h == 0 {
|
||||
h = 1
|
||||
} else if h < 0 {
|
||||
h = 0 - h
|
||||
}
|
||||
return h
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, key)
|
||||
return HashKey(buf)
|
||||
}
|
||||
|
||||
// Dist returns the probe distance for a hash in a slot index.
|
||||
|
|
|
@ -666,7 +666,14 @@ func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64,
|
|||
|
||||
// Reindex all partitions.
|
||||
for _, segment := range segments {
|
||||
errDone := errors.New("done")
|
||||
|
||||
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
|
||||
// Make sure we don't go past the offset where the compaction began.
|
||||
if offset >= index.maxOffset {
|
||||
return errDone
|
||||
}
|
||||
|
||||
// Only process insert entries.
|
||||
switch flag {
|
||||
case SeriesEntryInsertFlag: // fallthrough
|
||||
|
@ -681,13 +688,15 @@ func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64,
|
|||
return nil
|
||||
}
|
||||
|
||||
// Save highest id/offset to header.
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
|
||||
|
||||
// Insert into maps.
|
||||
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
|
||||
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
|
||||
}); err != nil {
|
||||
}); err == errDone {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -730,7 +739,8 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segment
|
|||
hash := rhh.HashKey(key)
|
||||
|
||||
// Continue searching until we find an empty slot or lower probe distance.
|
||||
for dist, pos := int64(0), hash&mask; ; dist, pos = dist+1, (pos+1)&mask {
|
||||
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
|
||||
assert(i <= capacity, "key/id map full")
|
||||
elem := dst[(pos * SeriesIndexElemSize):]
|
||||
|
||||
// If empty slot found or matching offset, insert and exit.
|
||||
|
@ -749,7 +759,6 @@ func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segment
|
|||
// If the existing elem has probed less than us, then swap places with
|
||||
// existing elem, and keep going to find another slot for that elem.
|
||||
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
|
||||
|
||||
// Insert current values.
|
||||
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
|
||||
binary.BigEndian.PutUint64(elem[8:], id)
|
||||
|
@ -769,6 +778,7 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id u
|
|||
|
||||
// Continue searching until we find an empty slot or lower probe distance.
|
||||
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
|
||||
assert(i <= capacity, "id/offset map full")
|
||||
elem := dst[(pos * SeriesIndexElemSize):]
|
||||
|
||||
// If empty slot found or matching id, insert and exit.
|
||||
|
@ -796,10 +806,6 @@ func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id u
|
|||
// Update current distance.
|
||||
dist = d
|
||||
}
|
||||
|
||||
if i > capacity {
|
||||
panic("rhh map full")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -165,6 +165,13 @@ func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byt
|
|||
idx.keyIDMap.Put(key, id)
|
||||
idx.idOffsetMap[id] = offset
|
||||
|
||||
if id > idx.maxSeriesID {
|
||||
idx.maxSeriesID = id
|
||||
}
|
||||
if offset > idx.maxOffset {
|
||||
idx.maxOffset = offset
|
||||
}
|
||||
|
||||
case SeriesEntryTombstoneFlag:
|
||||
idx.tombstones[id] = struct{}{}
|
||||
|
||||
|
@ -255,6 +262,8 @@ func (idx *SeriesIndex) Clone() *SeriesIndex {
|
|||
count: idx.count,
|
||||
capacity: idx.capacity,
|
||||
mask: idx.mask,
|
||||
maxSeriesID: idx.maxSeriesID,
|
||||
maxOffset: idx.maxOffset,
|
||||
data: idx.data,
|
||||
keyIDData: idx.keyIDData,
|
||||
idOffsetData: idx.idOffsetData,
|
||||
|
|
Loading…
Reference in New Issue