Fix panic error due to rollDeleteOp ahead by load-and-delete (#20563)

issue:#20501
Signed-off-by: MrPresent-Han <jamesharden11122@gmail.com>
pull/20611/head
MrPresent-Han 2022-11-15 14:45:07 +08:00 committed by GitHub
parent db33ffa518
commit c848896bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 11 deletions

View File

@ -115,7 +115,7 @@ func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok boo
if ok {
return buffer, ok
}
return nil, ok
return nil, false
}
func (bm *DelBufferManager) Delete(segID UniqueID) {
@ -127,14 +127,6 @@ func (bm *DelBufferManager) Delete(segID UniqueID) {
}
}
func (bm *DelBufferManager) LoadAndDelete(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
if buf, ok := bm.Load(segID); ok {
bm.Delete(segID)
return buf, ok
}
return nil, ok
}
func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) {
var compactToDelBuff *DelDataBuf
compactToDelBuff, loaded := bm.Load(compactedToSegID)
@ -143,8 +135,9 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr
}
for _, segID := range compactedFromSegIDs {
if delDataBuf, loaded := bm.LoadAndDelete(segID); loaded {
if delDataBuf, loaded := bm.Load(segID); loaded {
compactToDelBuff.mergeDelDataBuf(delDataBuf)
bm.Delete(segID)
}
}
// only store delBuf if EntriesNum > 0

View File

@ -17,6 +17,7 @@
package datanode
import (
"container/heap"
"fmt"
"math"
"testing"
@ -137,3 +138,46 @@ func TestBufferData_updateTimeRange(t *testing.T) {
})
}
}
func Test_CompactSegBuff(t *testing.T) {
channelSegments := make(map[UniqueID]*Segment)
delBufferManager := &DelBufferManager{
channel: &ChannelMeta{
segments: channelSegments,
},
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
//1. set compactTo and compactFrom
compactedFromSegIDs := make([]UniqueID, 2)
var segID1 UniqueID = 1111
var segID2 UniqueID = 2222
compactedFromSegIDs[0] = segID1
compactedFromSegIDs[1] = segID2
channelSegments[segID1] = &Segment{}
channelSegments[segID2] = &Segment{}
var compactedToSegID UniqueID = 3333
channelSegments[compactedToSegID] = &Segment{}
//2. set up deleteDataBuf for seg1 and seg2
delDataBuf1 := newDelDataBuf()
delDataBuf1.EntriesNum++
delBufferManager.Store(segID1, delDataBuf1)
heap.Push(delBufferManager.delBufHeap, delDataBuf1.item)
delDataBuf2 := newDelDataBuf()
delDataBuf2.EntriesNum++
delBufferManager.Store(segID2, delDataBuf2)
heap.Push(delBufferManager.delBufHeap, delDataBuf2.item)
//3. test compact
delBufferManager.CompactSegBuf(compactedToSegID, compactedFromSegIDs)
//4. expect results in two aspects:
//4.1 compactedFrom segments are removed from delBufferManager
//4.2 compactedTo seg is set properly with correct entriesNum
_, seg1Exist := delBufferManager.Load(segID1)
_, seg2Exist := delBufferManager.Load(segID2)
assert.False(t, seg1Exist)
assert.False(t, seg2Exist)
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID))
}

View File

@ -123,7 +123,6 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
//then we will add all segments in the fgMsg.segmentsToFlush into the toFlushSeg and remove duplicate segments
//the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit
segmentsToFlush := dn.delBufferManager.ShouldFlushSegments()
log.Info("should flush segments, ", zap.Int("seg_count", len(segmentsToFlush)))
for _, msgSegmentID := range fgMsg.segmentsToSync {
existed := false
for _, autoFlushSegment := range segmentsToFlush {