diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 36f82de817..37775973f8 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -119,6 +119,10 @@ mq: pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes mqBufSize: 16 # MQ client consumer buffer length + dispatcher: + mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge + targetBufSize: 16 # the lenth of channel buffer for targe + maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack # Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services. pulsar: @@ -315,8 +319,6 @@ queryNode: publishInterval: 1000 # Interval for querynode to report node information (milliseconds) segcore: knowhereThreadPoolNumRatio: 4 # The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]). - # used for zilliz-cloud ; please ignore it for open source. - knowhereScoreConsistency: false chunkRows: 128 # The number of vectors in a chunk. interimIndex: enableIndex: true # Enable segment build with index to accelerate vector search when segment is in growing or binlog. @@ -324,6 +326,7 @@ queryNode: nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist memExpansionRate: 1.15 # extra memory needed by building interim index buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num + knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments enableDisk: false # enable querynode load disk index, and search on disk index maxDiskUsagePercentage: 95 @@ -331,9 +334,9 @@ queryNode: enabled: true memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed` - # options: async, sync, disable. + # options: async, sync, disable. # Specifies the necessity for warming up the chunk cache. - # 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the + # 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the # chunk cache during the load process. This approach has the potential to substantially reduce query/search latency # for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage; # 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query. diff --git a/pkg/mq/msgdispatcher/manager.go b/pkg/mq/msgdispatcher/manager.go index ecd3d079ae..4f88fd5521 100644 --- a/pkg/mq/msgdispatcher/manager.go +++ b/pkg/mq/msgdispatcher/manager.go @@ -29,13 +29,12 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var CheckPeriod = 1 * time.Second // TODO: dyh, move to config - type DispatcherManager interface { Add(ctx context.Context, vchannel string, pos *Pos, subPos SubPos) (<-chan *MsgPack, error) Remove(vchannel string) @@ -154,7 +153,7 @@ func (c *dispatcherManager) Run() { zap.Int64("nodeID", c.nodeID), zap.String("pchannel", c.pchannel)) log.Info("dispatcherManager is running...") ticker1 := time.NewTicker(10 * time.Second) - ticker2 := time.NewTicker(CheckPeriod) + ticker2 := time.NewTicker(paramtable.Get().MQCfg.MergeCheckInterval.GetAsDuration(time.Second)) defer ticker1.Stop() defer ticker2.Stop() for { diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index 89b15bb2f3..51c7790b40 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -97,7 +98,9 @@ func TestManager(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 3, c.Num()) - CheckPeriod = 10 * time.Millisecond + checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key + paramtable.Get().Save(checkIntervalK, "0.01") + defer paramtable.Get().Reset(checkIntervalK) go c.Run() assert.Eventually(t, func() bool { return c.Num() == 1 // expected merged @@ -342,11 +345,17 @@ func (suite *SimulationSuite) TestSplit() { splitNum = 3 ) suite.vchannels = make(map[string]*vchannelHelper, vchannelNum) - MaxTolerantLag = 500 * time.Millisecond - DefaultTargetChanSize = 65536 + maxTolerantLagK := paramtable.Get().MQCfg.MaxTolerantLag.Key + paramtable.Get().Save(maxTolerantLagK, "0.5") + defer paramtable.Get().Reset(maxTolerantLagK) + + targetBufSizeK := paramtable.Get().MQCfg.TargetBufSize.Key + defer paramtable.Get().Reset(targetBufSizeK) + for i := 0; i < vchannelNum; i++ { + paramtable.Get().Save(targetBufSizeK, "65536") if i >= vchannelNum-splitNum { - DefaultTargetChanSize = 10 + paramtable.Get().Save(targetBufSizeK, "10") } vchannel := fmt.Sprintf("%s_vchannelv%d", suite.pchannel, i) _, err := suite.manager.Add(context.Background(), vchannel, nil, mqwrapper.SubscriptionPositionEarliest) diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index fd4a18b81b..8fd231e296 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -20,12 +20,8 @@ import ( "fmt" "sync" "time" -) -// TODO: dyh, move to config -var ( - MaxTolerantLag = 3 * time.Second - DefaultTargetChanSize = 1024 + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type target struct { @@ -41,7 +37,7 @@ type target struct { func newTarget(vchannel string, pos *Pos) *target { t := &target{ vchannel: vchannel, - ch: make(chan *MsgPack, DefaultTargetChanSize), + ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()), pos: pos, } t.closed = false @@ -63,9 +59,10 @@ func (t *target) send(pack *MsgPack) error { if t.closed { return nil } + maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second) select { - case <-time.After(MaxTolerantLag): - return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, MaxTolerantLag) + case <-time.After(maxTolerantLag): + return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag) case t.ch <- pack: return nil } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 67c2bbc826..2a41d30b69 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -475,6 +475,11 @@ type MQConfig struct { MQBufSize ParamItem `refreshable:"false"` ReceiveBufSize ParamItem `refreshable:"false"` IgnoreBadPosition ParamItem `refreshable:"true"` + + // msgdispatcher + MergeCheckInterval ParamItem `refreshable:"false"` + TargetBufSize ParamItem `refreshable:"false"` + MaxTolerantLag ParamItem `refreshable:"true"` } // Init initializes the MQConfig object with a BaseTable. @@ -489,6 +494,33 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`, } p.Type.Init(base.mgr) + p.MaxTolerantLag = ParamItem{ + Key: "mq.dispatcher.maxTolerantLag", + Version: "2.4.4", + DefaultValue: "3", + Doc: `Default value: "3", the timeout(in seconds) that target sends msgPack`, + Export: true, + } + p.MaxTolerantLag.Init(base.mgr) + + p.TargetBufSize = ParamItem{ + Key: "mq.dispatcher.targetBufSize", + Version: "2.4.4", + DefaultValue: "16", + Doc: `the lenth of channel buffer for targe`, + Export: true, + } + p.TargetBufSize.Init(base.mgr) + + p.MergeCheckInterval = ParamItem{ + Key: "mq.dispatcher.mergeCheckInterval", + Version: "2.4.4", + DefaultValue: "1", + Doc: `the interval time(in seconds) for dispatcher to check whether to merge`, + Export: true, + } + p.MergeCheckInterval.Init(base.mgr) + p.EnablePursuitMode = ParamItem{ Key: "mq.enablePursuitMode", Version: "2.3.0",