enhance: Add consts of MsgDispatcher to configs (#33679)

See also: #33676

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/33686/head
XuanYang-cn 2024-06-07 14:21:59 +08:00 committed by GitHub
parent 3540eee977
commit 1629833060
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 59 additions and 19 deletions

View File

@ -119,6 +119,10 @@ mq:
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes
mqBufSize: 16 # MQ client consumer buffer length
dispatcher:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
targetBufSize: 16 # the lenth of channel buffer for targe
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
pulsar:
@ -315,8 +319,6 @@ queryNode:
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
segcore:
knowhereThreadPoolNumRatio: 4 # The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]).
# used for zilliz-cloud ; please ignore it for open source.
knowhereScoreConsistency: false
chunkRows: 128 # The number of vectors in a chunk.
interimIndex:
enableIndex: true # Enable segment build with index to accelerate vector search when segment is in growing or binlog.
@ -324,6 +326,7 @@ queryNode:
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index
maxDiskUsagePercentage: 95
@ -331,9 +334,9 @@ queryNode:
enabled: true
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
# options: async, sync, disable.
# options: async, sync, disable.
# Specifies the necessity for warming up the chunk cache.
# 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
# 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
# chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
# for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
# 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.

View File

@ -29,13 +29,12 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var CheckPeriod = 1 * time.Second // TODO: dyh, move to config
type DispatcherManager interface {
Add(ctx context.Context, vchannel string, pos *Pos, subPos SubPos) (<-chan *MsgPack, error)
Remove(vchannel string)
@ -154,7 +153,7 @@ func (c *dispatcherManager) Run() {
zap.Int64("nodeID", c.nodeID), zap.String("pchannel", c.pchannel))
log.Info("dispatcherManager is running...")
ticker1 := time.NewTicker(10 * time.Second)
ticker2 := time.NewTicker(CheckPeriod)
ticker2 := time.NewTicker(paramtable.Get().MQCfg.MergeCheckInterval.GetAsDuration(time.Second))
defer ticker1.Stop()
defer ticker2.Stop()
for {

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -97,7 +98,9 @@ func TestManager(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 3, c.Num())
CheckPeriod = 10 * time.Millisecond
checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key
paramtable.Get().Save(checkIntervalK, "0.01")
defer paramtable.Get().Reset(checkIntervalK)
go c.Run()
assert.Eventually(t, func() bool {
return c.Num() == 1 // expected merged
@ -342,11 +345,17 @@ func (suite *SimulationSuite) TestSplit() {
splitNum = 3
)
suite.vchannels = make(map[string]*vchannelHelper, vchannelNum)
MaxTolerantLag = 500 * time.Millisecond
DefaultTargetChanSize = 65536
maxTolerantLagK := paramtable.Get().MQCfg.MaxTolerantLag.Key
paramtable.Get().Save(maxTolerantLagK, "0.5")
defer paramtable.Get().Reset(maxTolerantLagK)
targetBufSizeK := paramtable.Get().MQCfg.TargetBufSize.Key
defer paramtable.Get().Reset(targetBufSizeK)
for i := 0; i < vchannelNum; i++ {
paramtable.Get().Save(targetBufSizeK, "65536")
if i >= vchannelNum-splitNum {
DefaultTargetChanSize = 10
paramtable.Get().Save(targetBufSizeK, "10")
}
vchannel := fmt.Sprintf("%s_vchannelv%d", suite.pchannel, i)
_, err := suite.manager.Add(context.Background(), vchannel, nil, mqwrapper.SubscriptionPositionEarliest)

View File

@ -20,12 +20,8 @@ import (
"fmt"
"sync"
"time"
)
// TODO: dyh, move to config
var (
MaxTolerantLag = 3 * time.Second
DefaultTargetChanSize = 1024
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type target struct {
@ -41,7 +37,7 @@ type target struct {
func newTarget(vchannel string, pos *Pos) *target {
t := &target{
vchannel: vchannel,
ch: make(chan *MsgPack, DefaultTargetChanSize),
ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()),
pos: pos,
}
t.closed = false
@ -63,9 +59,10 @@ func (t *target) send(pack *MsgPack) error {
if t.closed {
return nil
}
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
select {
case <-time.After(MaxTolerantLag):
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, MaxTolerantLag)
case <-time.After(maxTolerantLag):
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag)
case t.ch <- pack:
return nil
}

View File

@ -475,6 +475,11 @@ type MQConfig struct {
MQBufSize ParamItem `refreshable:"false"`
ReceiveBufSize ParamItem `refreshable:"false"`
IgnoreBadPosition ParamItem `refreshable:"true"`
// msgdispatcher
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
}
// Init initializes the MQConfig object with a BaseTable.
@ -489,6 +494,33 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
}
p.Type.Init(base.mgr)
p.MaxTolerantLag = ParamItem{
Key: "mq.dispatcher.maxTolerantLag",
Version: "2.4.4",
DefaultValue: "3",
Doc: `Default value: "3", the timeout(in seconds) that target sends msgPack`,
Export: true,
}
p.MaxTolerantLag.Init(base.mgr)
p.TargetBufSize = ParamItem{
Key: "mq.dispatcher.targetBufSize",
Version: "2.4.4",
DefaultValue: "16",
Doc: `the lenth of channel buffer for targe`,
Export: true,
}
p.TargetBufSize.Init(base.mgr)
p.MergeCheckInterval = ParamItem{
Key: "mq.dispatcher.mergeCheckInterval",
Version: "2.4.4",
DefaultValue: "1",
Doc: `the interval time(in seconds) for dispatcher to check whether to merge`,
Export: true,
}
p.MergeCheckInterval.Init(base.mgr)
p.EnablePursuitMode = ParamItem{
Key: "mq.enablePursuitMode",
Version: "2.3.0",