diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index 50d6765e7e..f1b0a4e160 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -115,7 +115,7 @@ func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok boo if ok { return buffer, ok } - return nil, ok + return nil, false } func (bm *DelBufferManager) Delete(segID UniqueID) { @@ -127,14 +127,6 @@ func (bm *DelBufferManager) Delete(segID UniqueID) { } } -func (bm *DelBufferManager) LoadAndDelete(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) { - if buf, ok := bm.Load(segID); ok { - bm.Delete(segID) - return buf, ok - } - return nil, ok -} - func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) { var compactToDelBuff *DelDataBuf compactToDelBuff, loaded := bm.Load(compactedToSegID) @@ -143,8 +135,9 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr } for _, segID := range compactedFromSegIDs { - if delDataBuf, loaded := bm.LoadAndDelete(segID); loaded { + if delDataBuf, loaded := bm.Load(segID); loaded { compactToDelBuff.mergeDelDataBuf(delDataBuf) + bm.Delete(segID) } } // only store delBuf if EntriesNum > 0 diff --git a/internal/datanode/buffer_test.go b/internal/datanode/buffer_test.go index 4034809cef..7971e30470 100644 --- a/internal/datanode/buffer_test.go +++ b/internal/datanode/buffer_test.go @@ -17,6 +17,7 @@ package datanode import ( + "container/heap" "fmt" "math" "testing" @@ -137,3 +138,46 @@ func TestBufferData_updateTimeRange(t *testing.T) { }) } } + +func Test_CompactSegBuff(t *testing.T) { + channelSegments := make(map[UniqueID]*Segment) + delBufferManager := &DelBufferManager{ + channel: &ChannelMeta{ + segments: channelSegments, + }, + delMemorySize: 0, + delBufHeap: &PriorityQueue{}, + } + //1. set compactTo and compactFrom + compactedFromSegIDs := make([]UniqueID, 2) + var segID1 UniqueID = 1111 + var segID2 UniqueID = 2222 + compactedFromSegIDs[0] = segID1 + compactedFromSegIDs[1] = segID2 + channelSegments[segID1] = &Segment{} + channelSegments[segID2] = &Segment{} + var compactedToSegID UniqueID = 3333 + channelSegments[compactedToSegID] = &Segment{} + + //2. set up deleteDataBuf for seg1 and seg2 + delDataBuf1 := newDelDataBuf() + delDataBuf1.EntriesNum++ + delBufferManager.Store(segID1, delDataBuf1) + heap.Push(delBufferManager.delBufHeap, delDataBuf1.item) + delDataBuf2 := newDelDataBuf() + delDataBuf2.EntriesNum++ + delBufferManager.Store(segID2, delDataBuf2) + heap.Push(delBufferManager.delBufHeap, delDataBuf2.item) + + //3. test compact + delBufferManager.CompactSegBuf(compactedToSegID, compactedFromSegIDs) + + //4. expect results in two aspects: + //4.1 compactedFrom segments are removed from delBufferManager + //4.2 compactedTo seg is set properly with correct entriesNum + _, seg1Exist := delBufferManager.Load(segID1) + _, seg2Exist := delBufferManager.Load(segID2) + assert.False(t, seg1Exist) + assert.False(t, seg2Exist) + assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID)) +} diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 8b327b4859..89451b7092 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -123,7 +123,6 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { //then we will add all segments in the fgMsg.segmentsToFlush into the toFlushSeg and remove duplicate segments //the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit segmentsToFlush := dn.delBufferManager.ShouldFlushSegments() - log.Info("should flush segments, ", zap.Int("seg_count", len(segmentsToFlush))) for _, msgSegmentID := range fgMsg.segmentsToSync { existed := false for _, autoFlushSegment := range segmentsToFlush {