fix: refactor milvus config and change default txn timeout (#36522)

issue: #36498

Signed-off-by: chyezh <chyezh@outlook.com>
pull/36592/head
Zhen Ye 2024-09-29 11:01:15 +08:00 committed by GitHub
parent c43d94319f
commit a6545b2e29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 87 additions and 53 deletions

View File

@ -333,6 +333,11 @@ func WriteYaml(w io.Writer) {
header: `
# Any configuration related to the streaming node server.`,
},
{
name: "streaming",
header: `
# Any configuration related to the streaming service.`,
},
}
marshller := YamlMarshaller{w, groups, result}
marshller.writeYamlRecursive(lo.Filter(result, func(d DocContent, _ int) bool {

View File

@ -1036,3 +1036,16 @@ streamingNode:
serverMaxRecvSize: 268435456 # The maximum size of each RPC request that the streamingNode can receive, unit: byte
clientMaxSendSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can send, unit: byte
clientMaxRecvSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can receive, unit: byte
# Any configuration related to the streaming service.
streaming:
walBalancer:
# The interval of balance task trigger at background, 1 min by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
triggerInterval: 1m
# The initial interval of balance task trigger backoff, 50 ms by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffInitialInterval: 50ms
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default

View File

@ -44,7 +44,8 @@ type TxnOption struct {
// Keepalive is the time to keepalive of the transaction.
// If the txn don't append message in the keepalive time, the txn will be expired.
// Only make sense when ttl is greater than 1ms.
// Only make sense when keepalive is greater than 1ms.
// The default value is 0, which means the keepalive is setted by the wal at streaming node.
Keepalive time.Duration
}

View File

@ -3,7 +3,6 @@ package streaming
import (
"context"
"sync"
"time"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -97,8 +96,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
// Otherwise, we start a transaction to append the messages.
// The transaction will be committed when all messages are appended.
txn, err := u.Txn(ctx, TxnOption{
VChannel: vchannel,
Keepalive: 5 * time.Second,
VChannel: vchannel,
})
if err != nil {
resp.fillAllError(err)

View File

@ -93,7 +93,7 @@ func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error)
w.lifetime.Done()
return nil, status.NewInvaildArgument("vchannel is required")
}
if opts.Keepalive < 1*time.Millisecond {
if opts.Keepalive != 0 && opts.Keepalive < 1*time.Millisecond {
w.lifetime.Done()
return nil, status.NewInvaildArgument("ttl must be greater than or equal to 1ms")
}

View File

@ -291,12 +291,12 @@ type backoffConfigFetcher struct{}
func (f *backoffConfigFetcher) BackoffConfig() typeutil.BackoffConfig {
return typeutil.BackoffConfig{
InitialInterval: paramtable.Get().StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse(),
Multiplier: paramtable.Get().StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat(),
MaxInterval: paramtable.Get().StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse(),
InitialInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse(),
Multiplier: paramtable.Get().StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat(),
MaxInterval: paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse(),
}
}
func (f *backoffConfigFetcher) DefaultInterval() time.Duration {
return paramtable.Get().StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse()
return paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()
}

View File

@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// NewTxnManager creates a new transaction manager.
@ -36,6 +37,13 @@ type TxnManager struct {
// We only support a transaction work on a streaming node, once the wal is transferred to another node,
// the transaction is treated as expired (rollback), and user will got a expired error, then perform a retry.
func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive time.Duration) (*TxnSession, error) {
if keepalive == 0 {
// If keepalive is 0, the txn set the keepalive with default keepalive.
keepalive = paramtable.Get().StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()
}
if keepalive < 1*time.Millisecond {
return nil, status.NewInvaildArgument("keepalive must be greater than 1ms")
}
id, err := resource.Resource().IDAllocator().Allocate(ctx)
if err != nil {
return nil, err

View File

@ -69,18 +69,17 @@ type ComponentParam struct {
GpuConfig gpuConfig
TraceCfg traceConfig
RootCoordCfg rootCoordConfig
ProxyCfg proxyConfig
QueryCoordCfg queryCoordConfig
QueryNodeCfg queryNodeConfig
DataCoordCfg dataCoordConfig
DataNodeCfg dataNodeConfig
IndexNodeCfg indexNodeConfig
HTTPCfg httpConfig
LogCfg logConfig
RoleCfg roleConfig
StreamingCoordCfg streamingCoordConfig
StreamingNodeCfg streamingNodeConfig
RootCoordCfg rootCoordConfig
ProxyCfg proxyConfig
QueryCoordCfg queryCoordConfig
QueryNodeCfg queryNodeConfig
DataCoordCfg dataCoordConfig
DataNodeCfg dataNodeConfig
IndexNodeCfg indexNodeConfig
HTTPCfg httpConfig
LogCfg logConfig
RoleCfg roleConfig
StreamingCfg streamingConfig
RootCoordGrpcServerCfg GrpcServerConfig
ProxyGrpcServerCfg GrpcServerConfig
@ -130,14 +129,11 @@ func (p *ComponentParam) init(bt *BaseTable) {
p.DataCoordCfg.init(bt)
p.DataNodeCfg.init(bt)
p.IndexNodeCfg.init(bt)
p.StreamingCoordCfg.init(bt)
p.StreamingNodeCfg.init(bt)
p.StreamingCfg.init(bt)
p.HTTPCfg.init(bt)
p.LogCfg.init(bt)
p.RoleCfg.init(bt)
p.GpuConfig.init(bt)
p.StreamingCoordCfg.init(bt)
p.StreamingNodeCfg.init(bt)
p.RootCoordGrpcServerCfg.Init("rootCoord", bt)
p.ProxyGrpcServerCfg.Init("proxy", bt)
@ -4635,44 +4631,54 @@ func (p *indexNodeConfig) init(base *BaseTable) {
p.GracefulStopTimeout.Init(base.mgr)
}
type streamingCoordConfig struct {
AutoBalanceTriggerInterval ParamItem `refreshable:"true"`
AutoBalanceBackoffInitialInterval ParamItem `refreshable:"true"`
AutoBalanceBackoffMultiplier ParamItem `refreshable:"true"`
type streamingConfig struct {
// balancer
WALBalancerTriggerInterval ParamItem `refreshable:"true"`
WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"`
WALBalancerBackoffMultiplier ParamItem `refreshable:"true"`
// txn
TxnDefaultKeepaliveTimeout ParamItem `refreshable:"true"`
}
func (p *streamingCoordConfig) init(base *BaseTable) {
p.AutoBalanceTriggerInterval = ParamItem{
Key: "streamingCoord.autoBalanceTriggerInterval",
func (p *streamingConfig) init(base *BaseTable) {
// balancer
p.WALBalancerTriggerInterval = ParamItem{
Key: "streaming.walBalancer.triggerInterval",
Version: "2.5.0",
Doc: `The interval of balance task trigger at background, 1 min by default.
It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`,
DefaultValue: "1m",
Export: true,
}
p.AutoBalanceTriggerInterval.Init(base.mgr)
p.AutoBalanceBackoffInitialInterval = ParamItem{
Key: "streamingCoord.autoBalanceBackoffInitialInterval",
p.WALBalancerTriggerInterval.Init(base.mgr)
p.WALBalancerBackoffInitialInterval = ParamItem{
Key: "streaming.walBalancer.backoffInitialInterval",
Version: "2.5.0",
Doc: `The initial interval of balance task trigger backoff, 50 ms by default.
It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`,
DefaultValue: "50ms",
Export: true,
}
p.AutoBalanceBackoffInitialInterval.Init(base.mgr)
p.AutoBalanceBackoffMultiplier = ParamItem{
Key: "streamingCoord.autoBalanceBackoffMultiplier",
p.WALBalancerBackoffInitialInterval.Init(base.mgr)
p.WALBalancerBackoffMultiplier = ParamItem{
Key: "streaming.walBalancer.backoffMultiplier",
Version: "2.5.0",
Doc: "The multiplier of balance task trigger backoff, 2 by default",
DefaultValue: "2",
Export: true,
}
p.AutoBalanceBackoffMultiplier.Init(base.mgr)
}
p.WALBalancerBackoffMultiplier.Init(base.mgr)
type streamingNodeConfig struct{}
func (p *streamingNodeConfig) init(base *BaseTable) {
// txn
p.TxnDefaultKeepaliveTimeout = ParamItem{
Key: "streaming.txn.defaultKeepaliveTimeout",
Version: "2.5.0",
Doc: "The default keepalive timeout for wal txn, 10s by default",
DefaultValue: "10s",
Export: true,
}
p.TxnDefaultKeepaliveTimeout.Init(base.mgr)
}
type runtimeConfig struct {

View File

@ -583,16 +583,19 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
})
t.Run("test streamingCoordConfig", func(t *testing.T) {
assert.Equal(t, 1*time.Minute, params.StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Millisecond, params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 2.0, params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat())
params.Save(params.StreamingCoordCfg.AutoBalanceTriggerInterval.Key, "50s")
params.Save(params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.Key, "50s")
params.Save(params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.Key, "3.5")
assert.Equal(t, 50*time.Second, params.StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat())
t.Run("test streamingConfig", func(t *testing.T) {
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffMultiplier.Key, "3.5")
params.Save(params.StreamingCfg.TxnDefaultKeepaliveTimeout.Key, "3500ms")
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 3500*time.Millisecond, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
})
t.Run("channel config priority", func(t *testing.T) {