enhance: Change update channel cp magic number to param item (#30555)

See also #28817

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/30532/head
congqixia 2024-02-06 16:02:00 +08:00 committed by GitHub
parent 98adbb3b6d
commit d4100d5442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 21 additions and 4 deletions

View File

@ -26,8 +26,6 @@ import (
)
const (
updateChanCPInterval = 1 * time.Minute
updateChanCPTimeout = 10 * time.Second
defaultUpdateChanCPMaxParallel = 1000
)
@ -49,7 +47,7 @@ func newChannelCheckpointUpdater(dn *DataNode) *channelCheckpointUpdater {
func (ccu *channelCheckpointUpdater) updateChannelCP(channelPos *msgpb.MsgPosition, callback func() error) error {
ccu.workerPool.Submit(func() (any, error) {
ctx, cancel := context.WithTimeout(context.Background(), updateChanCPTimeout)
ctx, cancel := context.WithTimeout(context.Background(), paramtable.Get().DataNodeCfg.UpdateChannelCheckpointRPCTimeout.GetAsDuration(time.Second))
defer cancel()
err := ccu.dn.broker.UpdateChannelCheckpoint(ctx, channelPos.GetChannelName(), channelPos)
if err != nil {

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/writebuffer"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -105,7 +106,7 @@ func (ttn *ttNode) Operate(in []Msg) []Msg {
ttn.updateChannelCP(channelPos, curTs)
}
if needUpdate || curTs.Sub(ttn.lastUpdateTime.Load()) >= updateChanCPInterval {
if needUpdate || curTs.Sub(ttn.lastUpdateTime.Load()) >= paramtable.Get().DataNodeCfg.UpdateChannelCheckpointInterval.GetAsDuration(time.Second) {
nonBlockingNotify()
return []Msg{}
}

View File

@ -2862,6 +2862,8 @@ type dataNodeConfig struct {
ChannelWorkPoolSize ParamItem `refreshable:"true"`
UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"`
UpdateChannelCheckpointInterval ParamItem `refreshable:"true"`
UpdateChannelCheckpointRPCTimeout ParamItem `refreshable:"true"`
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
}
@ -3099,6 +3101,22 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.UpdateChannelCheckpointMaxParallel.Init(base.mgr)
p.UpdateChannelCheckpointInterval = ParamItem{
Key: "datanode.channel.updateChannelCheckpointInterval",
Version: "2.4.0",
Doc: "the interval duration(in seconds) for datanode to update channel checkpoint of each channel",
DefaultValue: "60",
}
p.UpdateChannelCheckpointInterval.Init(base.mgr)
p.UpdateChannelCheckpointRPCTimeout = ParamItem{
Key: "datanode.channel.updateChannelCheckpointInterval",
Version: "2.4.0",
Doc: "timeout in seconds for UpdateChannelCheckpoint RPC call",
DefaultValue: "10",
}
p.UpdateChannelCheckpointRPCTimeout.Init(base.mgr)
p.MaxConcurrentImportTaskNum = ParamItem{
Key: "datanode.import.maxConcurrentTaskNum",
Version: "2.4.0",