Use buffer size at memory sync policy (#22825)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/22905/head
yihao.dai 2023-03-21 21:37:56 +08:00 committed by GitHub
parent 04da4712f3
commit f2ffb5db26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 254 additions and 74 deletions

View File

@ -314,8 +314,9 @@ dataNode:
clientMaxRecvSize: 268435456
memory:
forceSyncEnable: true # `true` to force sync if memory usage is too high
forceSyncThreshold: 0.6 # forceSync only take effects when memory usage ratio > forceSyncThreshold. note that should set it a small value like 0.2 for standalone deployment
forceSyncSegmentRatio: 0.3 # ratio of segments to sync, top largest forceSyncSegmentRatio segments will be synced
forceSyncSegmentNum: 1 # number of segments to sync, segments with top largest buffer will be synced.
watermarkStandalone: 0.2 # memory watermark for standalone, upon reaching this watermark, segments will be synced.
watermarkCluster: 0.5 # memory watermark for cluster, upon reaching this watermark, segments will be synced.
# Configures the system log output.
log:

View File

@ -24,6 +24,10 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
@ -34,8 +38,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
"go.uber.org/zap"
)
type (
@ -89,6 +91,10 @@ type Channel interface {
setCurDeleteBuffer(segmentID UniqueID, buf *DelDataBuf)
rollDeleteBuffer(segmentID UniqueID)
evictHistoryDeleteBuffer(segmentID UniqueID, endPos *msgpb.MsgPosition)
// getTotalMemorySize returns the sum of memory sizes of segments.
getTotalMemorySize() int64
forceToSync()
}
// ChannelMeta contains channel meta and the latest segments infos of the channel.
@ -101,6 +107,7 @@ type ChannelMeta struct {
segMu sync.RWMutex
segments map[UniqueID]*Segment
needToSync *atomic.Bool
syncPolicies []segmentSyncPolicy
metaService *metaService
@ -129,6 +136,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
segments: make(map[UniqueID]*Segment),
needToSync: atomic.NewBool(false),
syncPolicies: []segmentSyncPolicy{
syncPeriodically(),
syncMemoryTooHigh(),
@ -263,20 +271,14 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
validSegs = append(validSegs, seg)
}
segIDsToSync := make([]UniqueID, 0)
toSyncSegIDDict := make(map[UniqueID]bool, 0)
segIDsToSync := typeutil.NewUniqueSet()
for _, policy := range c.syncPolicies {
toSyncSegments := policy(validSegs, ts)
for _, segID := range toSyncSegments {
if _, ok := toSyncSegIDDict[segID]; ok {
continue
} else {
toSyncSegIDDict[segID] = true
segIDsToSync = append(segIDsToSync, segID)
}
segments := policy(validSegs, ts, c.needToSync)
for _, segID := range segments {
segIDsToSync.Insert(segID)
}
}
return segIDsToSync
return segIDsToSync.Collect()
}
func (c *ChannelMeta) setSegmentLastSyncTs(segID UniqueID, ts Timestamp) {
@ -813,3 +815,17 @@ func (c *ChannelMeta) evictHistoryDeleteBuffer(segmentID UniqueID, endPos *msgpb
}
log.Warn("cannot find segment when evictHistoryDeleteBuffer", zap.Int64("segmentID", segmentID))
}
func (c *ChannelMeta) forceToSync() {
c.needToSync.Store(true)
}
func (c *ChannelMeta) getTotalMemorySize() int64 {
c.segMu.RLock()
defer c.segMu.RUnlock()
var res int64
for _, segment := range c.segments {
res += segment.memorySize
}
return res
}

View File

@ -533,6 +533,8 @@ func (node *DataNode) Start() error {
// Start node watch node
go node.StartWatchChannels(node.ctx)
go node.flowgraphManager.start()
node.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}
@ -566,6 +568,7 @@ func (node *DataNode) Stop() error {
node.cancel()
node.flowgraphManager.dropAll()
node.flowgraphManager.stop()
if node.rowIDAllocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))

View File

@ -62,7 +62,6 @@ type insertBufferNode struct {
ttLogger *timeTickLogger
ttMerger *mergedTimeTickerSender
syncPolicies []segmentSyncPolicy
lastTimestamp Timestamp
}
@ -295,9 +294,14 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
// updateSegmentsMemorySize updates segments' memory size in channel meta
func (ibNode *insertBufferNode) updateSegmentsMemorySize(seg2Upload []UniqueID) {
for _, segID := range seg2Upload {
if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
ibNode.channel.updateSegmentMemorySize(segID, bd.memorySize())
var memorySize int64
if buffer, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
memorySize += buffer.memorySize()
}
if buffer, ok := ibNode.channel.getCurDeleteBuffer(segID); ok {
memorySize += buffer.GetLogSize()
}
ibNode.channel.updateSegmentMemorySize(segID, memorySize)
}
}
@ -452,6 +456,7 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *msgpb.MsgPosition) []UniqueID {
syncTasks := ibNode.FillInSyncTasks(fgMsg, seg2Upload)
segmentsToSync := make([]UniqueID, 0, len(syncTasks))
ibNode.channel.(*ChannelMeta).needToSync.Store(false)
for _, task := range syncTasks {
log.Info("insertBufferNode syncing BufferData",

View File

@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/atomic"
)
var insertNodeTestDir = "/tmp/milvus_test/insert_node"
@ -349,6 +350,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
channel := &ChannelMeta{
collectionID: collMeta.ID,
segments: make(map[UniqueID]*Segment),
needToSync: atomic.NewBool(false),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)
@ -589,6 +591,7 @@ func TestRollBF(t *testing.T) {
channel := &ChannelMeta{
collectionID: collMeta.ID,
segments: make(map[UniqueID]*Segment),
needToSync: atomic.NewBool(false),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)

View File

@ -18,23 +18,87 @@ package datanode
import (
"fmt"
"sort"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
type flowgraphManager struct {
flowgraphs sync.Map // vChannelName -> dataSyncService
closeCh chan struct{}
closeOnce sync.Once
}
func newFlowgraphManager() *flowgraphManager {
return &flowgraphManager{}
return &flowgraphManager{
closeCh: make(chan struct{}),
}
}
func (fm *flowgraphManager) start() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-fm.closeCh:
return
case <-ticker.C:
fm.execute(hardware.GetMemoryCount())
}
}
}
func (fm *flowgraphManager) stop() {
fm.closeOnce.Do(func() {
close(fm.closeCh)
})
}
func (fm *flowgraphManager) execute(totalMemory uint64) {
if !Params.DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
return
}
var total int64
channels := make([]struct {
channel string
bufferSize int64
}, 0)
fm.flowgraphs.Range(func(key, value interface{}) bool {
size := value.(*dataSyncService).channel.getTotalMemorySize()
channels = append(channels, struct {
channel string
bufferSize int64
}{key.(string), size})
total += size
return true
})
if len(channels) == 0 {
return
}
if float64(total) < float64(totalMemory)*Params.DataNodeCfg.MemoryWatermark.GetAsFloat() {
return
}
sort.Slice(channels, func(i, j int) bool {
return channels[i].bufferSize > channels[j].bufferSize
})
if fg, ok := fm.flowgraphs.Load(channels[0].channel); ok { // sync the first channel with the largest memory usage
fg.(*dataSyncService).channel.forceToSync()
log.Info("notify flowgraph to sync",
zap.String("channel", channels[0].channel), zap.Int64("bufferSize", channels[0].bufferSize))
}
}
func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *tickler) error {

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -191,4 +192,49 @@ func TestFlowGraphManager(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, fg)
})
t.Run("test execute", func(t *testing.T) {
tests := []struct {
testName string
totalMemory uint64
watermark float64
memorySizes []int64
expectNeedToSync []bool
}{
{"test over the watermark", 100, 0.5,
[]int64{15, 16, 17, 18}, []bool{false, false, false, true}},
{"test below the watermark", 100, 0.5,
[]int64{1, 2, 3, 4}, []bool{false, false, false, false}},
}
fm.dropAll()
const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-"
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
var baseParams = Params.BaseTable
baseParams.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark))
for i, memorySize := range test.memorySizes {
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
vchan := &datapb.VchannelInfo{
ChannelName: vchannel,
}
err = fm.addAndStart(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
fg, ok := fm.flowgraphs.Load(vchannel)
assert.True(t, ok)
err = fg.(*dataSyncService).channel.addSegment(addSegmentReq{segID: 0})
assert.NoError(t, err)
fg.(*dataSyncService).channel.updateSegmentMemorySize(0, memorySize)
fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Store(false)
}
fm.execute(test.totalMemory)
for i, needToSync := range test.expectNeedToSync {
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
fg, ok := fm.flowgraphs.Load(vchannel)
assert.True(t, ok)
assert.Equal(t, needToSync, fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Load())
}
})
}
})
}

View File

@ -11,7 +11,6 @@ import (
)
func Test_getOrCreateIOPool(t *testing.T) {
Params.Init()
ioConcurrency := Params.DataNodeCfg.IOConcurrency
paramtable.Get().Save(Params.DataNodeCfg.IOConcurrency.Key, "64")
defer func() { Params.DataNodeCfg.IOConcurrency = ioConcurrency }()

View File

@ -21,18 +21,21 @@ import (
"sort"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
const minSyncSize = 0.5 * 1024 * 1024
// segmentsSyncPolicy sync policy applies to segments
type segmentSyncPolicy func(segments []*Segment, ts Timestamp) []UniqueID
type segmentSyncPolicy func(segments []*Segment, ts Timestamp, needToSync *atomic.Bool) []UniqueID
// syncPeriodically get segmentSyncPolicy with segments sync periodically.
func syncPeriodically() segmentSyncPolicy {
return func(segments []*Segment, ts Timestamp) []UniqueID {
return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID {
segsToSync := make([]UniqueID, 0)
for _, seg := range segments {
endTime := tsoutil.PhysicalTime(ts)
@ -52,25 +55,24 @@ func syncPeriodically() segmentSyncPolicy {
// syncMemoryTooHigh force sync the largest segment.
func syncMemoryTooHigh() segmentSyncPolicy {
return func(segments []*Segment, ts Timestamp) []UniqueID {
if Params.DataNodeCfg.MemoryForceSyncEnable.GetAsBool() &&
hardware.GetMemoryUseRatio() >= Params.DataNodeCfg.MemoryForceSyncThreshold.GetAsFloat() &&
len(segments) >= 1 {
toSyncSegmentNum := int(math.Max(float64(len(segments))*Params.DataNodeCfg.MemoryForceSyncSegmentRatio.GetAsFloat(), 1.0))
toSyncSegmentIDs := make([]UniqueID, 0)
sort.Slice(segments, func(i, j int) bool {
return segments[i].memorySize > segments[j].memorySize
})
for i := 0; i < toSyncSegmentNum; i++ {
toSyncSegmentIDs = append(toSyncSegmentIDs, segments[i].segmentID)
}
log.Debug("sync segment due to memory usage is too high",
zap.Int64s("toSyncSegmentIDs", toSyncSegmentIDs),
zap.Int("inputSegmentNum", len(segments)),
zap.Int("toSyncSegmentNum", len(toSyncSegmentIDs)),
zap.Float64("memoryUsageRatio", hardware.GetMemoryUseRatio()))
return toSyncSegmentIDs
return func(segments []*Segment, ts Timestamp, needToSync *atomic.Bool) []UniqueID {
if len(segments) == 0 || !needToSync.Load() {
return nil
}
return []UniqueID{}
sort.Slice(segments, func(i, j int) bool {
return segments[i].memorySize > segments[j].memorySize
})
syncSegments := make([]UniqueID, 0)
syncSegmentsNum := math.Min(float64(Params.DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt()), float64(len(segments)))
for i := 0; i < int(syncSegmentsNum); i++ {
if segments[i].memorySize < minSyncSize { // prevent generating too many small binlogs
break
}
syncSegments = append(syncSegments, segments[i].segmentID)
log.Info("sync segment due to memory usage is too high",
zap.Int64("segmentID", segments[i].segmentID),
zap.Int64("memorySize", segments[i].memorySize))
}
return syncSegments
}
}

View File

@ -17,10 +17,12 @@
package datanode
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
@ -49,27 +51,47 @@ func TestSyncPeriodically(t *testing.T) {
if !test.isBufferEmpty {
segment.curInsertBuf = &BufferData{}
}
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0))
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0), nil)
assert.Equal(t, test.shouldSyncNum, len(res))
})
}
}
func TestSyncMemoryTooHigh(t *testing.T) {
s1 := &Segment{segmentID: 1, memorySize: 1}
s2 := &Segment{segmentID: 2, memorySize: 2}
s3 := &Segment{segmentID: 3, memorySize: 3}
s4 := &Segment{segmentID: 4, memorySize: 4}
s5 := &Segment{segmentID: 5, memorySize: 5}
tests := []struct {
testName string
syncSegmentNum int
needToSync bool
memorySizesInMB []float64
shouldSyncSegs []UniqueID
}{
{"test normal 1", 3, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5, 4, 3}},
{"test normal 2", 2, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5, 4}},
{"test normal 3", 5, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5, 4, 3, 2, 1}},
{"test needToSync false", 3, false,
[]float64{1, 2, 3, 4, 5}, []UniqueID{}},
{"test syncSegmentNum 1", 1, true,
[]float64{1, 2, 3, 4, 5}, []UniqueID{5}},
{"test with small segment", 3, true,
[]float64{0.1, 0.1, 0.1, 4, 5}, []UniqueID{5, 4}},
}
var baseParams = Params.BaseTable
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, "true")
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncThreshold.Key, "0.0")
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncSegmentRatio.Key, "0.6")
policy := syncMemoryTooHigh()
segs := policy([]*Segment{s3, s4, s2, s1, s5}, 0)
assert.Equal(t, 3, len(segs))
assert.Equal(t, int64(5), segs[0])
assert.Equal(t, int64(4), segs[1])
assert.Equal(t, int64(3), segs[2])
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
var baseParams = Params.BaseTable
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncSegmentNum.Key, fmt.Sprintf("%d", test.syncSegmentNum))
policy := syncMemoryTooHigh()
segments := make([]*Segment, len(test.memorySizesInMB))
for i := range segments {
segments[i] = &Segment{
segmentID: UniqueID(i + 1), memorySize: int64(test.memorySizesInMB[i] * 1024 * 1024),
}
}
segs := policy(segments, 0, atomic.NewBool(test.needToSync))
assert.ElementsMatch(t, segs, test.shouldSyncSegs)
})
}
}

View File

@ -14,6 +14,7 @@ package paramtable
import (
"fmt"
"math"
"os"
"runtime"
"strconv"
"strings"
@ -21,8 +22,11 @@ import (
"time"
"github.com/shirou/gopsutil/v3/disk"
"go.uber.org/zap"
config "github.com/milvus-io/milvus/internal/config"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
)
const (
@ -1999,9 +2003,9 @@ type dataNodeConfig struct {
IOConcurrency ParamItem `refreshable:"false"`
// memory management
MemoryForceSyncEnable ParamItem `refreshable:"true"`
MemoryForceSyncThreshold ParamItem `refreshable:"true"`
MemoryForceSyncSegmentRatio ParamItem `refreshable:"true"`
MemoryForceSyncEnable ParamItem `refreshable:"true"`
MemoryForceSyncSegmentNum ParamItem `refreshable:"true"`
MemoryWatermark ParamItem `refreshable:"true"`
}
func (p *dataNodeConfig) init(base *BaseTable) {
@ -2041,19 +2045,34 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.MemoryForceSyncEnable.Init(base.mgr)
p.MemoryForceSyncThreshold = ParamItem{
Key: "datanode.memory.forceSyncThreshold",
p.MemoryForceSyncSegmentNum = ParamItem{
Key: "datanode.memory.forceSyncSegmentNum",
Version: "2.2.4",
DefaultValue: "0.6",
DefaultValue: "1",
}
p.MemoryForceSyncThreshold.Init(base.mgr)
p.MemoryForceSyncSegmentNum.Init(base.mgr)
p.MemoryForceSyncSegmentRatio = ParamItem{
Key: "datanode.memory.forceSyncSegmentRatio",
Version: "2.2.4",
DefaultValue: "0.3",
if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode {
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkStandalone",
Version: "2.2.4",
DefaultValue: "0.2",
}
} else if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.ClusterDeployMode {
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkCluster",
Version: "2.2.4",
DefaultValue: "0.5",
}
} else {
log.Warn("DeployModeEnv is not set, use default", zap.Float64("default", 0.5))
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkCluster",
Version: "2.2.4",
DefaultValue: "0.5",
}
}
p.MemoryForceSyncSegmentRatio.Init(base.mgr)
p.MemoryWatermark.Init(base.mgr)
p.FlushDeleteBufferBytes = ParamItem{
Key: "dataNode.segment.deleteBufBytes",