enable config policy on replica selection (#25067)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/25122/head
wei liu 2023-06-25 19:46:44 +08:00 committed by GitHub
parent 736916222b
commit 7b999b42bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -62,7 +63,18 @@ type LBPolicyImpl struct {
}
func NewLBPolicyImpl(clientMgr shardClientMgr) *LBPolicyImpl {
balancer := NewLookAsideBalancer(clientMgr)
balancePolicy := params.Params.ProxyCfg.ReplicaSelectionPolicy.GetValue()
var balancer LBBalancer
switch balancePolicy {
case "round_robin":
log.Info("use round_robin policy on replica selection")
balancer = NewRoundRobinBalancer()
default:
log.Info("use look_aside policy on replica selection")
balancer = NewLookAsideBalancer(clientMgr)
}
return &LBPolicyImpl{
balancer: balancer,
clientMgr: clientMgr,

View File

@ -17,6 +17,7 @@ package proxy
import (
"context"
"reflect"
"testing"
"github.com/cockroachdb/errors"
@ -376,6 +377,22 @@ func (s *LBPolicySuite) TestUpdateCostMetrics() {
s.lbPolicy.UpdateCostMetrics(1, &internalpb.CostAggregation{})
}
func (s *LBPolicySuite) TestNewLBPolicy() {
policy := NewLBPolicyImpl(s.mgr)
s.Equal(reflect.TypeOf(policy.balancer).String(), "*proxy.LookAsideBalancer")
policy.Close()
Params.Save(Params.ProxyCfg.ReplicaSelectionPolicy.Key, "round_robin")
policy = NewLBPolicyImpl(s.mgr)
s.Equal(reflect.TypeOf(policy.balancer).String(), "*proxy.RoundRobinBalancer")
policy.Close()
Params.Save(Params.ProxyCfg.ReplicaSelectionPolicy.Key, "look_aside")
policy = NewLBPolicyImpl(s.mgr)
s.Equal(reflect.TypeOf(policy.balancer).String(), "*proxy.LookAsideBalancer")
policy.Close()
}
func TestLBPolicySuite(t *testing.T) {
suite.Run(t, new(LBPolicySuite))
}

View File

@ -894,6 +894,7 @@ type proxyConfig struct {
MaxTaskNum ParamItem `refreshable:"false"`
AccessLog AccessLogConfig
ShardLeaderCacheInterval ParamItem `refreshable:"false"`
ReplicaSelectionPolicy ParamItem `refreshable:"false"`
}
func (p *proxyConfig) init(base *BaseTable) {
@ -1103,6 +1104,14 @@ please adjust in embedded Milvus: false`,
Doc: "time interval to update shard leader cache, in seconds",
}
p.ShardLeaderCacheInterval.Init(base.mgr)
p.ReplicaSelectionPolicy = ParamItem{
Key: "proxy.replicaSelectionPolicy",
Version: "2.3.0",
DefaultValue: "look_aside",
Doc: "replica selection policy in multiple replicas load balancing, support round_robin and look_aside",
}
p.ReplicaSelectionPolicy.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -175,6 +175,12 @@ func TestComponentParam(t *testing.T) {
t.Logf("AccessLog.MaxDays: %d", Params.AccessLog.RotatedTime.GetAsInt64())
t.Logf("ShardLeaderCacheInterval: %d", Params.ShardLeaderCacheInterval.GetAsInt64())
assert.Equal(t, Params.ReplicaSelectionPolicy.GetValue(), "look_aside")
params.Save(Params.ReplicaSelectionPolicy.Key, "round_robin")
assert.Equal(t, Params.ReplicaSelectionPolicy.GetValue(), "round_robin")
params.Save(Params.ReplicaSelectionPolicy.Key, "look_aside")
assert.Equal(t, Params.ReplicaSelectionPolicy.GetValue(), "look_aside")
})
// t.Run("test proxyConfig panic", func(t *testing.T) {