Add memory usage too large sync policy (#22241)

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/22433/head
wayblink 2023-02-27 17:47:51 +08:00 committed by GitHub
parent 7aab677023
commit 4cf8f2ec81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 175 additions and 35 deletions

View File

@ -324,6 +324,10 @@ dataNode:
# The period to sync segments if buffer is not empty. # The period to sync segments if buffer is not empty.
syncPeriod: 600 # Seconds, 10min syncPeriod: 600 # Seconds, 10min
memory:
forceSyncEnable: true # `true` to force sync if memory usage is too high
forceSyncThreshold: 0.6 # forceSync only take effects when memory usage ratio > forceSyncThreshold
forceSyncSegmentRatio: 0.3 # ratio of segments to sync, top largest forceSyncSegmentRatio segments will be synced
# Configures the system log output. # Configures the system log output.
log: log:

View File

@ -282,6 +282,14 @@ func (bd *BufferData) updateStartAndEndPosition(startPos *internalpb.MsgPosition
} }
} }
func (bd *BufferData) memorySize() int64 {
var size int64
for _, field := range bd.buffer.Data {
size += int64(field.GetMemorySize())
}
return size
}
// DelDataBuf buffers delete data, monitoring buffer size and limit // DelDataBuf buffers delete data, monitoring buffer size and limit
// size and limit both indicate numOfRows // size and limit both indicate numOfRows
type DelDataBuf struct { type DelDataBuf struct {

View File

@ -68,7 +68,8 @@ type Channel interface {
listSegmentIDsToSync(ts Timestamp) []UniqueID listSegmentIDsToSync(ts Timestamp) []UniqueID
setSegmentLastSyncTs(segID UniqueID, ts Timestamp) setSegmentLastSyncTs(segID UniqueID, ts Timestamp)
updateStatistics(segID UniqueID, numRows int64) updateSegmentRowNumber(segID UniqueID, numRows int64)
updateSegmentMemorySize(segID UniqueID, memorySize int64)
InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error
RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats) RollPKstats(segID UniqueID, stats []*storage.PrimaryKeyStats)
getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error)
@ -117,6 +118,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
syncPolicies: []segmentSyncPolicy{ syncPolicies: []segmentSyncPolicy{
syncPeriodically(), syncPeriodically(),
syncMemoryTooHigh(),
}, },
metaService: metaService, metaService: metaService,
@ -240,15 +242,24 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
c.segMu.RLock() c.segMu.RLock()
defer c.segMu.RUnlock() defer c.segMu.RUnlock()
segIDsToSync := make([]UniqueID, 0) validSegs := make([]*Segment, 0)
for segID, seg := range c.segments { for _, seg := range c.segments {
if !seg.isValid() { if !seg.isValid() {
continue continue
} }
for _, policy := range c.syncPolicies { validSegs = append(validSegs, seg)
if policy(seg, ts) { }
segIDsToSync := make([]UniqueID, 0)
toSyncSegIDDict := make(map[UniqueID]bool, 0)
for _, policy := range c.syncPolicies {
toSyncSegments := policy(validSegs, ts)
for _, segID := range toSyncSegments {
if _, ok := toSyncSegIDDict[segID]; ok {
continue
} else {
toSyncSegIDDict[segID] = true
segIDsToSync = append(segIDsToSync, segID) segIDsToSync = append(segIDsToSync, segID)
break
} }
} }
} }
@ -454,11 +465,11 @@ func (c *ChannelMeta) hasSegment(segID UniqueID, countFlushed bool) bool {
} }
// updateStatistics updates the number of rows of a segment in channel. // updateStatistics updates the number of rows of a segment in channel.
func (c *ChannelMeta) updateStatistics(segID UniqueID, numRows int64) { func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
c.segMu.Lock() c.segMu.Lock()
defer c.segMu.Unlock() defer c.segMu.Unlock()
log.Info("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows)) log.Info("updating segment num row", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows))
seg, ok := c.segments[segID] seg, ok := c.segments[segID]
if ok && seg.notFlushed() { if ok && seg.notFlushed() {
seg.numRows += numRows seg.numRows += numRows
@ -468,6 +479,21 @@ func (c *ChannelMeta) updateStatistics(segID UniqueID, numRows int64) {
log.Warn("update segment num row not exist", zap.Int64("segID", segID)) log.Warn("update segment num row not exist", zap.Int64("segID", segID))
} }
// updateStatistics updates the number of rows of a segment in channel.
func (c *ChannelMeta) updateSegmentMemorySize(segID UniqueID, memorySize int64) {
c.segMu.Lock()
defer c.segMu.Unlock()
log.Info("updating segment memorySize", zap.Int64("Segment ID", segID), zap.Int64("memorySize", memorySize))
seg, ok := c.segments[segID]
if ok && seg.notFlushed() {
seg.memorySize = memorySize
return
}
log.Warn("update segment memorySize not exist", zap.Int64("segID", segID))
}
// getSegmentStatisticsUpdates gives current segment's statistics updates. // getSegmentStatisticsUpdates gives current segment's statistics updates.
func (c *ChannelMeta) getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) { func (c *ChannelMeta) getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) {
c.segMu.RLock() c.segMu.RLock()

View File

@ -145,8 +145,10 @@ func TestChannelMeta_InnerFunction(t *testing.T) {
assert.Equal(t, int64(0), seg.numRows) assert.Equal(t, int64(0), seg.numRows)
assert.Equal(t, datapb.SegmentType_New, seg.getType()) assert.Equal(t, datapb.SegmentType_New, seg.getType())
channel.updateStatistics(0, 10) channel.updateSegmentRowNumber(0, 10)
assert.Equal(t, int64(10), seg.numRows) assert.Equal(t, int64(10), seg.numRows)
channel.updateSegmentMemorySize(0, 10)
assert.Equal(t, int64(10), seg.memorySize)
segPos := channel.listNewSegmentsStartPositions() segPos := channel.listNewSegmentsStartPositions()
assert.Equal(t, 1, len(segPos)) assert.Equal(t, 1, len(segPos))

View File

@ -179,8 +179,8 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
ibNode.lastTimestamp = endPositions[0].Timestamp ibNode.lastTimestamp = endPositions[0].Timestamp
// Updating segment statistics in channel // Add segment in channel if need and updating segment row number
seg2Upload, err := ibNode.updateSegmentStates(fgMsg.insertMessages, startPositions[0], endPositions[0]) seg2Upload, err := ibNode.addSegmentAndUpdateRowNum(fgMsg.insertMessages, startPositions[0], endPositions[0])
if err != nil { if err != nil {
// Occurs only if the collectionID is mismatch, should not happen // Occurs only if the collectionID is mismatch, should not happen
err = fmt.Errorf("update segment states in channel meta wrong, err = %s", err) err = fmt.Errorf("update segment states in channel meta wrong, err = %s", err)
@ -199,6 +199,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
} }
} }
ibNode.updateSegmentsMemorySize(seg2Upload)
ibNode.DisplayStatistics(seg2Upload) ibNode.DisplayStatistics(seg2Upload)
segmentsToSync := ibNode.Sync(fgMsg, seg2Upload, endPositions[0]) segmentsToSync := ibNode.Sync(fgMsg, seg2Upload, endPositions[0])
@ -290,7 +291,17 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
zap.Int64("segmentID", segID), zap.Int64("segmentID", segID),
zap.String("channel", ibNode.channelName), zap.String("channel", ibNode.channelName),
zap.Int64("size", bd.size), zap.Int64("size", bd.size),
zap.Int64("limit", bd.limit)) zap.Int64("limit", bd.limit),
zap.Int64("memorySize", bd.memorySize()))
}
}
}
// updateSegmentsMemorySize updates segments' memory size in channel meta
func (ibNode *insertBufferNode) updateSegmentsMemorySize(seg2Upload []UniqueID) {
for _, segID := range seg2Upload {
if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
ibNode.channel.updateSegmentMemorySize(segID, bd.memorySize())
} }
} }
} }
@ -501,11 +512,11 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
return segmentsToSync return segmentsToSync
} }
// updateSegmentStates updates statistics in channel meta for the segments in insertMsgs. // addSegmentAndUpdateRowNum updates row number in channel meta for the segments in insertMsgs.
// //
// If the segment doesn't exist, a new segment will be created. // If the segment doesn't exist, a new segment will be created.
// The segment number of rows will be updated in mem, waiting to be uploaded to DataCoord. // The segment number of rows will be updated in mem, waiting to be uploaded to DataCoord.
func (ibNode *insertBufferNode) updateSegmentStates(insertMsgs []*msgstream.InsertMsg, startPos, endPos *internalpb.MsgPosition) (seg2Upload []UniqueID, err error) { func (ibNode *insertBufferNode) addSegmentAndUpdateRowNum(insertMsgs []*msgstream.InsertMsg, startPos, endPos *internalpb.MsgPosition) (seg2Upload []UniqueID, err error) {
uniqueSeg := make(map[UniqueID]int64) uniqueSeg := make(map[UniqueID]int64)
for _, msg := range insertMsgs { for _, msg := range insertMsgs {
@ -541,7 +552,7 @@ func (ibNode *insertBufferNode) updateSegmentStates(insertMsgs []*msgstream.Inse
seg2Upload = make([]UniqueID, 0, len(uniqueSeg)) seg2Upload = make([]UniqueID, 0, len(uniqueSeg))
for id, num := range uniqueSeg { for id, num := range uniqueSeg {
seg2Upload = append(seg2Upload, id) seg2Upload = append(seg2Upload, id)
ibNode.channel.updateStatistics(id, num) ibNode.channel.updateSegmentRowNumber(id, num)
} }
return return

View File

@ -1070,7 +1070,7 @@ func TestInsertBufferNode_updateSegmentStates(te *testing.T) {
}, },
} }
seg, err := ibNode.updateSegmentStates(im, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) seg, err := ibNode.addSegmentAndUpdateRowNum(im, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
assert.Error(te, err) assert.Error(te, err)
assert.Empty(te, seg) assert.Empty(te, seg)

View File

@ -17,23 +17,59 @@
package datanode package datanode
import ( import (
"math"
"sort"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap" "go.uber.org/zap"
) )
// segmentSyncPolicy sync policy applies to segment // segmentsSyncPolicy sync policy applies to segments
type segmentSyncPolicy func(segment *Segment, ts Timestamp) bool type segmentSyncPolicy func(segments []*Segment, ts Timestamp) []UniqueID
// syncPeriodically get segmentSyncPolicy with segment sync periodically. // syncPeriodically get segmentSyncPolicy with segments sync periodically.
func syncPeriodically() segmentSyncPolicy { func syncPeriodically() segmentSyncPolicy {
return func(segment *Segment, ts Timestamp) bool { return func(segments []*Segment, ts Timestamp) []UniqueID {
endTime := tsoutil.PhysicalTime(ts) segsToSync := make([]UniqueID, 0)
lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs) for _, seg := range segments {
shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod && !segment.isBufferEmpty() endTime := tsoutil.PhysicalTime(ts)
if shouldSync { lastSyncTime := tsoutil.PhysicalTime(seg.lastSyncTs)
log.Info("sync segment periodically ", zap.Time("now", endTime), zap.Time("last sync", lastSyncTime)) shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod && !seg.isBufferEmpty()
if shouldSync {
segsToSync = append(segsToSync, seg.segmentID)
}
} }
return shouldSync if len(segsToSync) > 0 {
log.Debug("sync segment periodically",
zap.Int64s("segmentID", segsToSync))
}
return segsToSync
}
}
// syncMemoryTooHigh force sync the largest segment.
func syncMemoryTooHigh() segmentSyncPolicy {
return func(segments []*Segment, ts Timestamp) []UniqueID {
if Params.DataNodeCfg.MemoryForceSyncEnable &&
hardware.GetMemoryUseRatio() >= Params.DataNodeCfg.MemoryForceSyncThreshold &&
len(segments) >= 1 {
toSyncSegmentNum := int(math.Max(float64(len(segments))*Params.DataNodeCfg.MemoryForceSyncSegmentRatio, 1.0))
toSyncSegmentIDs := make([]UniqueID, 0)
sort.Slice(segments, func(i, j int) bool {
return segments[i].memorySize > segments[j].memorySize
})
for i := 0; i < toSyncSegmentNum; i++ {
toSyncSegmentIDs = append(toSyncSegmentIDs, segments[i].segmentID)
}
log.Debug("sync segment due to memory usage is too high",
zap.Int64s("toSyncSegmentIDs", toSyncSegmentIDs),
zap.Int("inputSegmentNum", len(segments)),
zap.Int("toSyncSegmentNum", len(toSyncSegmentIDs)),
zap.Float64("memoryUsageRatio", hardware.GetMemoryUseRatio()))
return toSyncSegmentIDs
}
return []UniqueID{}
} }
} }

View File

@ -33,12 +33,12 @@ func TestSyncPeriodically(t *testing.T) {
lastTs time.Time lastTs time.Time
ts time.Time ts time.Time
isBufferEmpty bool isBufferEmpty bool
shouldSync bool shouldSyncNum int
}{ }{
{"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), true, false}, {"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), true, 0},
{"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), true, false}, {"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), true, 0},
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), false, true}, {"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), false, 1},
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), false, false}, {"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), false, 0},
} }
for _, test := range tests { for _, test := range tests {
@ -49,8 +49,26 @@ func TestSyncPeriodically(t *testing.T) {
if !test.isBufferEmpty { if !test.isBufferEmpty {
segment.curInsertBuf = &BufferData{} segment.curInsertBuf = &BufferData{}
} }
res := policy(segment, tsoutil.ComposeTSByTime(test.ts, 0)) res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0))
assert.Equal(t, test.shouldSync, res) assert.Equal(t, test.shouldSyncNum, len(res))
}) })
} }
} }
func TestSyncMemoryTooHigh(t *testing.T) {
s1 := &Segment{segmentID: 1, memorySize: 1}
s2 := &Segment{segmentID: 2, memorySize: 2}
s3 := &Segment{segmentID: 3, memorySize: 3}
s4 := &Segment{segmentID: 4, memorySize: 4}
s5 := &Segment{segmentID: 5, memorySize: 5}
Params.DataNodeCfg.MemoryForceSyncEnable = true
Params.DataNodeCfg.MemoryForceSyncThreshold = 0.0
Params.DataNodeCfg.MemoryForceSyncSegmentRatio = 0.6
policy := syncMemoryTooHigh()
segs := policy([]*Segment{s3, s4, s2, s1, s5}, 0)
assert.Equal(t, 3, len(segs))
assert.Equal(t, int64(5), segs[0])
assert.Equal(t, int64(4), segs[1])
assert.Equal(t, int64(3), segs[2])
}

View File

@ -149,3 +149,12 @@ func GetDiskCount() uint64 {
func GetDiskUsage() uint64 { func GetDiskUsage() uint64 {
return 2 * 1024 * 1024 return 2 * 1024 * 1024
} }
func GetMemoryUseRatio() float64 {
usedMemory := GetUsedMemoryCount()
totalMemory := GetMemoryCount()
if usedMemory > 0 && totalMemory > 0 {
return float64(usedMemory) / float64(totalMemory)
}
return 0
}

View File

@ -51,3 +51,9 @@ func Test_GetDiskUsage(t *testing.T) {
log.Info("TestGetDiskUsage", log.Info("TestGetDiskUsage",
zap.Uint64("DiskUsage", GetDiskUsage())) zap.Uint64("DiskUsage", GetDiskUsage()))
} }
func Test_GetMemoryUsageRatio(t *testing.T) {
log.Info("TestGetMemoryUsageRatio",
zap.Float64("Memory usage ratio", GetMemoryUseRatio()))
assert.True(t, GetMemoryUseRatio() > 0)
}

View File

@ -1545,6 +1545,11 @@ type dataNodeConfig struct {
CreatedTime time.Time CreatedTime time.Time
UpdatedTime time.Time UpdatedTime time.Time
// memory management
MemoryForceSyncEnable bool
MemoryForceSyncThreshold float64
MemoryForceSyncSegmentRatio float64
} }
func (p *dataNodeConfig) init(base *BaseTable) { func (p *dataNodeConfig) init(base *BaseTable) {
@ -1559,6 +1564,9 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.initIOConcurrency() p.initIOConcurrency()
p.initChannelWatchPath() p.initChannelWatchPath()
p.initMemoryForceSyncEnable()
p.initMemoryForceSyncRatio()
p.initMemoryForceSyncSegmentRatio()
} }
// InitAlias init this DataNode alias // InitAlias init this DataNode alias
@ -1627,6 +1635,18 @@ func (p *dataNodeConfig) GetNodeID() UniqueID {
return 0 return 0
} }
func (p *dataNodeConfig) initMemoryForceSyncEnable() {
p.MemoryForceSyncEnable = p.Base.ParseBool("datanode.memory.forceSyncEnable", true)
}
func (p *dataNodeConfig) initMemoryForceSyncRatio() {
p.MemoryForceSyncThreshold = p.Base.ParseFloatWithDefault("datanode.memory.forceSyncThreshold", 0.7)
}
func (p *dataNodeConfig) initMemoryForceSyncSegmentRatio() {
p.MemoryForceSyncSegmentRatio = p.Base.ParseFloatWithDefault("datanode.memory.forceSyncSegmentRatio", 0.3)
}
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////
// --- indexcoord --- // --- indexcoord ---
type indexCoordConfig struct { type indexCoordConfig struct {