Fix buffer calculate memorysize negative (#23152)

Along with the following small patches:
- And fix logging dup segmentIDs for delete buffer status
- Add String method for PriorityQueue to debug

See also: #23105

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/23162/head
XuanYang-cn 2023-03-31 18:30:23 +08:00 committed by GitHub
parent 127867b873
commit be0827417a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 11 deletions

View File

@ -20,6 +20,7 @@ import (
"container/heap"
"fmt"
"math"
"strings"
"sync"
"github.com/cockroachdb/errors"
@ -120,11 +121,7 @@ func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
}
func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
buffer, ok := bm.channel.getCurDeleteBuffer(segID)
if ok {
return buffer, ok
}
return nil, false
return bm.channel.getCurDeleteBuffer(segID)
}
func (bm *DelBufferManager) Delete(segID UniqueID) {
@ -161,9 +158,9 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr
} else {
heap.Push(bm.delBufHeap, compactToDelBuff.item)
}
//note that when compacting segment in del buffer manager
//there is no need to modify the general memory size as there is no new
//added del into the memory
// We need to re-add the memorySize because bm.Delete(segID) sub them all.
bm.delMemorySize += compactToDelBuff.item.memorySize
bm.channel.setCurDeleteBuffer(compactedToSegID, compactToDelBuff)
}
}
@ -203,12 +200,26 @@ type Item struct {
// The index is needed by update and is maintained by the heap.Interface methods.
}
// String format Item as <segmentID=0, memorySize=1>
func (i *Item) String() string {
return fmt.Sprintf("<segmentID=%d, memorySize=%d>", i.segmentID, i.memorySize)
}
// A PriorityQueue implements heap.Interface and holds Items.
// We use PriorityQueue to manage memory consumed by del buf
type PriorityQueue struct {
items []*Item
}
// String format PriorityQueue as [item, item]
func (pq *PriorityQueue) String() string {
var items []string
for _, item := range pq.items {
items = append(items, item.String())
}
return fmt.Sprintf("[%s]", strings.Join(items, ","))
}
func (pq *PriorityQueue) Len() int { return len(pq.items) }
func (pq *PriorityQueue) Less(i, j int) bool {

View File

@ -143,6 +143,19 @@ func TestBufferData_updateTimeRange(t *testing.T) {
}
}
func TestPriorityQueueString(t *testing.T) {
item := &Item{
segmentID: 0,
memorySize: 1,
}
assert.Equal(t, "<segmentID=0, memorySize=1>", item.String())
pq := &PriorityQueue{}
heap.Push(pq, item)
assert.Equal(t, "[<segmentID=0, memorySize=1>]", pq.String())
}
func Test_CompactSegBuff(t *testing.T) {
channelSegments := make(map[UniqueID]*Segment)
delBufferManager := &DelBufferManager{

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// DeleteNode is to process delete msg, flush delete info into storage.
@ -94,7 +95,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
dn.updateCompactedSegments()
// process delete messages
var segIDs []UniqueID
segIDs := typeutil.NewUniqueSet()
for i, msg := range fgMsg.deleteMessages {
traceID := spans[i].SpanContext().TraceID().String()
log.Debug("Buffer delete request in DataNode", zap.String("traceID", traceID))
@ -105,12 +106,12 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
log.Error(err.Error())
panic(err)
}
segIDs = append(segIDs, tmpSegIDs...)
segIDs.Insert(tmpSegIDs...)
}
// display changed segment's status in dn.delBuf of a certain ts
if len(fgMsg.deleteMessages) != 0 {
dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax)
dn.showDelBuf(segIDs.Collect(), fgMsg.timeRange.timestampMax)
}
// process flush messages