Disable auto balance when old node exists (#28191) (#28224)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/28267/head
wei liu 2023-11-08 07:10:17 +08:00 committed by GitHub
parent d76c744646
commit 918333817e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 888 additions and 48 deletions

View File

@ -425,6 +425,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Handler --dir=internal/datacoord --filename=mock_handler.go --output=internal/datacoord --structname=NMockHandler --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=allocator --dir=internal/datacoord --filename=mock_allocator_test.go --output=internal/datacoord --structname=NMockAllocator --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage
generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -221,7 +221,7 @@ proxy:
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
queryCoord:
autoHandoff: true # Enable auto handoff
autoBalance: true # Enable auto balance
autoBalance: false # Enable auto balance
balancer: ScoreBasedBalancer # Balancer to use
globalRowCountFactor: 0.1 # expert parameters, only used by scoreBasedBalancer
scoreUnbalanceTolerationFactor: 0.05 # expert parameters, only used by scoreBasedBalancer

View File

@ -276,6 +276,10 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
log.Info("background checking channels loop quit")
return
case <-ticker.C:
if !Params.DataCoordCfg.AutoBalance.GetAsBool() {
return
}
c.mu.Lock()
if !c.isSilent() {
log.Info("ChannelManager is not silent, skip channel balance this round")
@ -343,6 +347,10 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
c.store.Add(nodeID)
if !Params.DataCoordCfg.AutoBalance.GetAsBool() {
return nil
}
updates := c.registerPolicy(c.store, nodeID)
if len(updates) <= 0 {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))

View File

@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -388,6 +389,8 @@ func TestChannelManager(t *testing.T) {
watchkv.Close()
}()
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true")
prefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue()
t.Run("test AddNode with avalible node", func(t *testing.T) {
// Note: this test is based on the default registerPolicy
@ -1063,6 +1066,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
prefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue()
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true")
t.Run("one node with three channels add a new node", func(t *testing.T) {
defer watchkv.RemoveWithPrefix("")
@ -1092,26 +1096,6 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
chManager.AddNode(2)
channelBalanced = "channel-1"
// waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) {
// for {
// key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)
// v, err := watchkv.Load(key)
// if err == nil && len(v) > 0 {
// watchInfo, err := parseWatchInfo(key, []byte(v))
// require.NoError(t, err)
// require.Equal(t, waitState, watchInfo.GetState())
//
// watchInfo.State = storeState
// data, err := proto.Marshal(watchInfo)
// require.NoError(t, err)
//
// watchkv.Save(key, string(data))
// break
// }
// time.Sleep(100 * time.Millisecond)
// }
// }
key := path.Join(prefix, "1", channelBalanced)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
@ -1120,7 +1104,6 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(2, "channel-1"))
chManager.AddNode(3)
@ -1260,3 +1243,63 @@ func TestChannelManager_HelperFunc(t *testing.T) {
}
})
}
func TestChannelManager_BackgroundChannelChecker(t *testing.T) {
Params.Save(Params.DataCoordCfg.ChannelBalanceInterval.Key, "1")
Params.Save(Params.DataCoordCfg.ChannelBalanceSilentDuration.Key, "1")
watchkv := getWatchKV(t)
defer func() {
watchkv.RemoveWithPrefix("")
watchkv.Close()
}()
defer watchkv.RemoveWithPrefix("")
c, err := NewChannelManager(watchkv, newMockHandler(), withStateChecker())
require.NoError(t, err)
mockStore := NewMockRWChannelStore(t)
mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{
NodeID: 1,
Channels: []*channel{
{
Name: "channel-1",
},
{
Name: "channel-2",
},
{
Name: "channel-3",
},
},
},
{
NodeID: 2,
},
}).Maybe()
c.store = mockStore
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.bgCheckChannelsWork(ctx)
updateCounter := atomic.NewInt64(0)
mockStore.EXPECT().Update(mock.Anything).Run(func(op ChannelOpSet) {
updateCounter.Inc()
}).Return(nil).Maybe()
t.Run("test disable auto balance", func(t *testing.T) {
assert.Eventually(t, func() bool {
return updateCounter.Load() == 0
}, 5*time.Second, 1*time.Second)
})
t.Run("test enable auto balance", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true")
assert.Eventually(t, func() bool {
return updateCounter.Load() > 0
}, 5*time.Second, 1*time.Second)
})
}

View File

@ -37,7 +37,7 @@ import (
// serverID return the session serverID
func (s *Server) serverID() int64 {
if s.session != nil {
return s.session.ServerID
return s.session.GetServerID()
}
// return 0 if no session exist, only for UT
return 0

View File

@ -108,7 +108,7 @@ func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos {
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: s.session.Address,
IP: s.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),

View File

@ -0,0 +1,460 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
import mock "github.com/stretchr/testify/mock"
// MockRWChannelStore is an autogenerated mock type for the RWChannelStore type
type MockRWChannelStore struct {
mock.Mock
}
type MockRWChannelStore_Expecter struct {
mock *mock.Mock
}
func (_m *MockRWChannelStore) EXPECT() *MockRWChannelStore_Expecter {
return &MockRWChannelStore_Expecter{mock: &_m.Mock}
}
// Add provides a mock function with given fields: nodeID
func (_m *MockRWChannelStore) Add(nodeID int64) {
_m.Called(nodeID)
}
// MockRWChannelStore_Add_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Add'
type MockRWChannelStore_Add_Call struct {
*mock.Call
}
// Add is a helper method to define mock.On call
// - nodeID int64
func (_e *MockRWChannelStore_Expecter) Add(nodeID interface{}) *MockRWChannelStore_Add_Call {
return &MockRWChannelStore_Add_Call{Call: _e.mock.On("Add", nodeID)}
}
func (_c *MockRWChannelStore_Add_Call) Run(run func(nodeID int64)) *MockRWChannelStore_Add_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_Add_Call) Return() *MockRWChannelStore_Add_Call {
_c.Call.Return()
return _c
}
func (_c *MockRWChannelStore_Add_Call) RunAndReturn(run func(int64)) *MockRWChannelStore_Add_Call {
_c.Call.Return(run)
return _c
}
// Delete provides a mock function with given fields: nodeID
func (_m *MockRWChannelStore) Delete(nodeID int64) ([]*channel, error) {
ret := _m.Called(nodeID)
var r0 []*channel
var r1 error
if rf, ok := ret.Get(0).(func(int64) ([]*channel, error)); ok {
return rf(nodeID)
}
if rf, ok := ret.Get(0).(func(int64) []*channel); ok {
r0 = rf(nodeID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*channel)
}
}
if rf, ok := ret.Get(1).(func(int64) error); ok {
r1 = rf(nodeID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRWChannelStore_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete'
type MockRWChannelStore_Delete_Call struct {
*mock.Call
}
// Delete is a helper method to define mock.On call
// - nodeID int64
func (_e *MockRWChannelStore_Expecter) Delete(nodeID interface{}) *MockRWChannelStore_Delete_Call {
return &MockRWChannelStore_Delete_Call{Call: _e.mock.On("Delete", nodeID)}
}
func (_c *MockRWChannelStore_Delete_Call) Run(run func(nodeID int64)) *MockRWChannelStore_Delete_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_Delete_Call) Return(_a0 []*channel, _a1 error) *MockRWChannelStore_Delete_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockRWChannelStore_Delete_Call) RunAndReturn(run func(int64) ([]*channel, error)) *MockRWChannelStore_Delete_Call {
_c.Call.Return(run)
return _c
}
// GetBufferChannelInfo provides a mock function with given fields:
func (_m *MockRWChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
ret := _m.Called()
var r0 *NodeChannelInfo
if rf, ok := ret.Get(0).(func() *NodeChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*NodeChannelInfo)
}
}
return r0
}
// MockRWChannelStore_GetBufferChannelInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBufferChannelInfo'
type MockRWChannelStore_GetBufferChannelInfo_Call struct {
*mock.Call
}
// GetBufferChannelInfo is a helper method to define mock.On call
func (_e *MockRWChannelStore_Expecter) GetBufferChannelInfo() *MockRWChannelStore_GetBufferChannelInfo_Call {
return &MockRWChannelStore_GetBufferChannelInfo_Call{Call: _e.mock.On("GetBufferChannelInfo")}
}
func (_c *MockRWChannelStore_GetBufferChannelInfo_Call) Run(run func()) *MockRWChannelStore_GetBufferChannelInfo_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRWChannelStore_GetBufferChannelInfo_Call) Return(_a0 *NodeChannelInfo) *MockRWChannelStore_GetBufferChannelInfo_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetBufferChannelInfo_Call) RunAndReturn(run func() *NodeChannelInfo) *MockRWChannelStore_GetBufferChannelInfo_Call {
_c.Call.Return(run)
return _c
}
// GetChannels provides a mock function with given fields:
func (_m *MockRWChannelStore) GetChannels() []*NodeChannelInfo {
ret := _m.Called()
var r0 []*NodeChannelInfo
if rf, ok := ret.Get(0).(func() []*NodeChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*NodeChannelInfo)
}
}
return r0
}
// MockRWChannelStore_GetChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannels'
type MockRWChannelStore_GetChannels_Call struct {
*mock.Call
}
// GetChannels is a helper method to define mock.On call
func (_e *MockRWChannelStore_Expecter) GetChannels() *MockRWChannelStore_GetChannels_Call {
return &MockRWChannelStore_GetChannels_Call{Call: _e.mock.On("GetChannels")}
}
func (_c *MockRWChannelStore_GetChannels_Call) Run(run func()) *MockRWChannelStore_GetChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRWChannelStore_GetChannels_Call) Return(_a0 []*NodeChannelInfo) *MockRWChannelStore_GetChannels_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetChannels_Call) RunAndReturn(run func() []*NodeChannelInfo) *MockRWChannelStore_GetChannels_Call {
_c.Call.Return(run)
return _c
}
// GetNode provides a mock function with given fields: nodeID
func (_m *MockRWChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
ret := _m.Called(nodeID)
var r0 *NodeChannelInfo
if rf, ok := ret.Get(0).(func(int64) *NodeChannelInfo); ok {
r0 = rf(nodeID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*NodeChannelInfo)
}
}
return r0
}
// MockRWChannelStore_GetNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNode'
type MockRWChannelStore_GetNode_Call struct {
*mock.Call
}
// GetNode is a helper method to define mock.On call
// - nodeID int64
func (_e *MockRWChannelStore_Expecter) GetNode(nodeID interface{}) *MockRWChannelStore_GetNode_Call {
return &MockRWChannelStore_GetNode_Call{Call: _e.mock.On("GetNode", nodeID)}
}
func (_c *MockRWChannelStore_GetNode_Call) Run(run func(nodeID int64)) *MockRWChannelStore_GetNode_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_GetNode_Call) Return(_a0 *NodeChannelInfo) *MockRWChannelStore_GetNode_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNode_Call) RunAndReturn(run func(int64) *NodeChannelInfo) *MockRWChannelStore_GetNode_Call {
_c.Call.Return(run)
return _c
}
// GetNodeChannelCount provides a mock function with given fields: nodeID
func (_m *MockRWChannelStore) GetNodeChannelCount(nodeID int64) int {
ret := _m.Called(nodeID)
var r0 int
if rf, ok := ret.Get(0).(func(int64) int); ok {
r0 = rf(nodeID)
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockRWChannelStore_GetNodeChannelCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelCount'
type MockRWChannelStore_GetNodeChannelCount_Call struct {
*mock.Call
}
// GetNodeChannelCount is a helper method to define mock.On call
// - nodeID int64
func (_e *MockRWChannelStore_Expecter) GetNodeChannelCount(nodeID interface{}) *MockRWChannelStore_GetNodeChannelCount_Call {
return &MockRWChannelStore_GetNodeChannelCount_Call{Call: _e.mock.On("GetNodeChannelCount", nodeID)}
}
func (_c *MockRWChannelStore_GetNodeChannelCount_Call) Run(run func(nodeID int64)) *MockRWChannelStore_GetNodeChannelCount_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelCount_Call) Return(_a0 int) *MockRWChannelStore_GetNodeChannelCount_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int64) int) *MockRWChannelStore_GetNodeChannelCount_Call {
_c.Call.Return(run)
return _c
}
// GetNodes provides a mock function with given fields:
func (_m *MockRWChannelStore) GetNodes() []int64 {
ret := _m.Called()
var r0 []int64
if rf, ok := ret.Get(0).(func() []int64); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
return r0
}
// MockRWChannelStore_GetNodes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodes'
type MockRWChannelStore_GetNodes_Call struct {
*mock.Call
}
// GetNodes is a helper method to define mock.On call
func (_e *MockRWChannelStore_Expecter) GetNodes() *MockRWChannelStore_GetNodes_Call {
return &MockRWChannelStore_GetNodes_Call{Call: _e.mock.On("GetNodes")}
}
func (_c *MockRWChannelStore_GetNodes_Call) Run(run func()) *MockRWChannelStore_GetNodes_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRWChannelStore_GetNodes_Call) Return(_a0 []int64) *MockRWChannelStore_GetNodes_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodes_Call) RunAndReturn(run func() []int64) *MockRWChannelStore_GetNodes_Call {
_c.Call.Return(run)
return _c
}
// GetNodesChannels provides a mock function with given fields:
func (_m *MockRWChannelStore) GetNodesChannels() []*NodeChannelInfo {
ret := _m.Called()
var r0 []*NodeChannelInfo
if rf, ok := ret.Get(0).(func() []*NodeChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*NodeChannelInfo)
}
}
return r0
}
// MockRWChannelStore_GetNodesChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodesChannels'
type MockRWChannelStore_GetNodesChannels_Call struct {
*mock.Call
}
// GetNodesChannels is a helper method to define mock.On call
func (_e *MockRWChannelStore_Expecter) GetNodesChannels() *MockRWChannelStore_GetNodesChannels_Call {
return &MockRWChannelStore_GetNodesChannels_Call{Call: _e.mock.On("GetNodesChannels")}
}
func (_c *MockRWChannelStore_GetNodesChannels_Call) Run(run func()) *MockRWChannelStore_GetNodesChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRWChannelStore_GetNodesChannels_Call) Return(_a0 []*NodeChannelInfo) *MockRWChannelStore_GetNodesChannels_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodesChannels_Call) RunAndReturn(run func() []*NodeChannelInfo) *MockRWChannelStore_GetNodesChannels_Call {
_c.Call.Return(run)
return _c
}
// Reload provides a mock function with given fields:
func (_m *MockRWChannelStore) Reload() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRWChannelStore_Reload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reload'
type MockRWChannelStore_Reload_Call struct {
*mock.Call
}
// Reload is a helper method to define mock.On call
func (_e *MockRWChannelStore_Expecter) Reload() *MockRWChannelStore_Reload_Call {
return &MockRWChannelStore_Reload_Call{Call: _e.mock.On("Reload")}
}
func (_c *MockRWChannelStore_Reload_Call) Run(run func()) *MockRWChannelStore_Reload_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRWChannelStore_Reload_Call) Return(_a0 error) *MockRWChannelStore_Reload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_Reload_Call) RunAndReturn(run func() error) *MockRWChannelStore_Reload_Call {
_c.Call.Return(run)
return _c
}
// Update provides a mock function with given fields: op
func (_m *MockRWChannelStore) Update(op ChannelOpSet) error {
ret := _m.Called(op)
var r0 error
if rf, ok := ret.Get(0).(func(ChannelOpSet) error); ok {
r0 = rf(op)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockRWChannelStore_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update'
type MockRWChannelStore_Update_Call struct {
*mock.Call
}
// Update is a helper method to define mock.On call
// - op ChannelOpSet
func (_e *MockRWChannelStore_Expecter) Update(op interface{}) *MockRWChannelStore_Update_Call {
return &MockRWChannelStore_Update_Call{Call: _e.mock.On("Update", op)}
}
func (_c *MockRWChannelStore_Update_Call) Run(run func(op ChannelOpSet)) *MockRWChannelStore_Update_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(ChannelOpSet))
})
return _c
}
func (_c *MockRWChannelStore_Update_Call) Return(_a0 error) *MockRWChannelStore_Update_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(ChannelOpSet) error) *MockRWChannelStore_Update_Call {
_c.Call.Return(run)
return _c
}
// NewMockRWChannelStore creates a new instance of MockRWChannelStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockRWChannelStore(t interface {
mock.TestingT
Cleanup(func())
}) *MockRWChannelStore {
mock := &MockRWChannelStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -28,6 +28,7 @@ import (
semver "github.com/blang/semver/v4"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
@ -130,7 +131,7 @@ type Server struct {
notifyIndexChan chan UniqueID
factory dependency.Factory
session *sessionutil.Session
session sessionutil.SessionInterface
icSession *sessionutil.Session
dnEventCh <-chan *sessionutil.SessionEvent
inEventCh <-chan *sessionutil.SessionEvent
@ -265,13 +266,13 @@ func (s *Server) Register() error {
log.Info("DataCoord Register Finished")
s.session.LivenessCheck(s.serverLoopCtx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID()))
if err := s.Stop(); err != nil {
logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Dec()
// manually send signal to starter goroutine
if s.session.TriggerKill {
if s.session.IsTriggerKill() {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
@ -394,8 +395,13 @@ func (s *Server) startDataCoord() {
s.compactionTrigger.start()
}
s.startServerLoop()
s.afterStart()
s.stateCode.Store(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID)
sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.GetServerID())
}
func (s *Server) afterStart() {
go s.updateBalanceConfigLoop(s.ctx)
}
func (s *Server) initCluster() error {
@ -782,7 +788,7 @@ func (s *Server) stopServiceWatch() {
// ErrCompacted is handled inside SessionWatcher, which means there is some other error occurred, closing server.
logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
go s.Stop()
if s.session.TriggerKill {
if s.session.IsTriggerKill() {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
@ -1106,3 +1112,32 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
s.meta.AddCollection(collInfo)
return nil
}
func (s *Server) updateBalanceConfigLoop(ctx context.Context) {
log := log.Ctx(s.ctx).WithRateGroup("dc.updateBalanceConfigLoop", 1, 60)
ticker := time.NewTicker(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second))
for {
select {
case <-ctx.Done():
log.Info("update balance config loop exit!")
return
case <-ticker.C:
r := semver.MustParseRange("<2.3.0")
sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r)
if err != nil {
log.Warn("check data node version occur error on etcd", zap.Error(err))
continue
}
if len(sessions) == 0 {
// only balance channel when all data node's version > 2.3.0
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true")
log.Info("all old data node down, enable auto balance!")
return
}
log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions)))
}
}
}

View File

@ -4737,3 +4737,43 @@ func TestDataNodeTtChannel(t *testing.T) {
assert.EqualValues(t, 0, len(segment.allocations))
})
}
func TestUpdateAutoBalanceConfigLoop(t *testing.T) {
Params.Save(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.Key, "1")
defer Params.Reset(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.Key)
Params.Save(Params.DataCoordCfg.AutoBalance.Key, "false")
defer Params.Reset(Params.DataCoordCfg.AutoBalance.Key)
t.Run("test old node exist", func(t *testing.T) {
oldSessions := make(map[string]*sessionutil.Session)
oldSessions["s1"] = &sessionutil.Session{}
server := &Server{}
mockSession := sessionutil.NewMockSession(t)
mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(oldSessions, 0, nil).Maybe()
server.session = mockSession
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go server.updateBalanceConfigLoop(ctx)
// old data node exist, disable auto balance
assert.Eventually(t, func() bool {
return !Params.DataCoordCfg.AutoBalance.GetAsBool()
}, 3*time.Second, 1*time.Second)
})
t.Run("test all old node down", func(t *testing.T) {
server := &Server{}
mockSession := sessionutil.NewMockSession(t)
mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(nil, 0, nil).Maybe()
server.session = mockSession
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go server.updateBalanceConfigLoop(ctx)
// all old data node down, enable auto balance
assert.Eventually(t, func() bool {
return Params.DataCoordCfg.AutoBalance.GetAsBool()
}, 3*time.Second, 1*time.Second)
})
}

View File

@ -604,7 +604,7 @@ func (s *Server) GetComponentStates(ctx context.Context, req *milvuspb.GetCompon
code := s.GetStateCode()
nodeID := common.NotRegisteredID
if s.session != nil && s.session.Registered() {
nodeID = s.session.ServerID // or Params.NodeID
nodeID = s.session.GetServerID() // or Params.NodeID
}
resp := &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{

View File

@ -179,7 +179,7 @@ func (s *Server) getSystemInfoMetrics(
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, paramtable.GetNodeID()),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: s.session.Address,
IP: s.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),

View File

@ -24,7 +24,9 @@ import (
"syscall"
"time"
"github.com/blang/semver/v4"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
@ -74,7 +76,7 @@ type Server struct {
etcdCli *clientv3.Client
tikvCli *txnkv.Client
address string
session *sessionutil.Session
session sessionutil.SessionInterface
kv kv.MetaKv
idAllocator func() (int64, error)
metricsCacheManager *metricsinfo.MetricsCacheManager
@ -146,13 +148,13 @@ func (s *Server) Register() error {
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc()
s.session.LivenessCheck(s.ctx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID))
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID()))
if err := s.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Dec()
// manually send signal to starter goroutine
if s.session.TriggerKill {
if s.session.IsTriggerKill() {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
@ -388,6 +390,7 @@ func (s *Server) initObserver() {
}
func (s *Server) afterStart() {
go s.updateBalanceConfigLoop(s.ctx)
}
func (s *Server) Start() error {
@ -429,7 +432,7 @@ func (s *Server) startQueryCoord() error {
s.startServerLoop()
s.afterStart()
s.UpdateStateCode(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.ServerID)
sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID())
return nil
}
@ -529,7 +532,7 @@ func (s *Server) State() commonpb.StateCode {
func (s *Server) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
nodeID := common.NotRegisteredID
if s.session != nil && s.session.Registered() {
nodeID = s.session.ServerID
nodeID = s.session.GetServerID()
}
serviceComponentInfo := &milvuspb.ComponentInfo{
// NodeID: Params.QueryCoordID, // will race with QueryCoord.Register()
@ -609,7 +612,7 @@ func (s *Server) watchNodes(revision int64) {
// ErrCompacted is handled inside SessionWatcher
log.Warn("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
go s.Stop()
if s.session.TriggerKill {
if s.session.IsTriggerKill() {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
@ -791,3 +794,32 @@ func (s *Server) checkReplicas() {
}
}
}
func (s *Server) updateBalanceConfigLoop(ctx context.Context) {
log := log.Ctx(s.ctx).WithRateGroup("qcv2.updateBalanceConfigLoop", 1, 60)
ticker := time.NewTicker(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second))
for {
select {
case <-ctx.Done():
log.Info("update balance config loop exit!")
return
case <-ticker.C:
r := semver.MustParseRange("<2.3.0")
sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.QueryNodeRole, r)
if err != nil {
log.Warn("check query node version occur error on etcd", zap.Error(err))
continue
}
if len(sessions) == 0 {
// only balance channel when all query node's version >= 2.3.0
Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
log.Info("all old query node down, enable auto balance!")
return
}
log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions)))
}
}
}

View File

@ -43,6 +43,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/etcd"
@ -356,6 +357,48 @@ func (suite *ServerSuite) TestStop() {
suite.server.Stop()
}
func (suite *ServerSuite) TestUpdateAutoBalanceConfigLoop() {
suite.server.Stop()
Params.Save(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.Key, "1")
defer Params.Reset(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.Key)
Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
defer Params.Reset(Params.QueryCoordCfg.AutoBalance.Key)
suite.Run("test old node exist", func() {
server := &Server{}
mockSession := sessionutil.NewMockSession(suite.T())
server.session = mockSession
oldSessions := make(map[string]*sessionutil.Session)
oldSessions["s1"] = &sessionutil.Session{}
mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(oldSessions, 0, nil).Maybe()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go server.updateBalanceConfigLoop(ctx)
// old query node exist, disable auto balance
suite.Eventually(func() bool {
return !Params.QueryCoordCfg.AutoBalance.GetAsBool()
}, 5*time.Second, 1*time.Second)
})
suite.Run("all old node down", func() {
server := &Server{}
mockSession := sessionutil.NewMockSession(suite.T())
server.session = mockSession
mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(nil, 0, nil).Maybe()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go server.updateBalanceConfigLoop(ctx)
// all old query node down, enable auto balance
suite.Eventually(func() bool {
return Params.QueryCoordCfg.AutoBalance.GetAsBool()
}, 5*time.Second, 1*time.Second)
})
}
func (suite *ServerSuite) waitNodeUp(node *mocks.MockQueryNode, timeout time.Duration) bool {
start := time.Now()
for time.Since(start) < timeout {

View File

@ -107,6 +107,88 @@ func (_c *MockSession_ForceActiveStandby_Call) RunAndReturn(run func(func() erro
return _c
}
// GetAddress provides a mock function with given fields:
func (_m *MockSession) GetAddress() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockSession_GetAddress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAddress'
type MockSession_GetAddress_Call struct {
*mock.Call
}
// GetAddress is a helper method to define mock.On call
func (_e *MockSession_Expecter) GetAddress() *MockSession_GetAddress_Call {
return &MockSession_GetAddress_Call{Call: _e.mock.On("GetAddress")}
}
func (_c *MockSession_GetAddress_Call) Run(run func()) *MockSession_GetAddress_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_GetAddress_Call) Return(_a0 string) *MockSession_GetAddress_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_GetAddress_Call) RunAndReturn(run func() string) *MockSession_GetAddress_Call {
_c.Call.Return(run)
return _c
}
// GetServerID provides a mock function with given fields:
func (_m *MockSession) GetServerID() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// MockSession_GetServerID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetServerID'
type MockSession_GetServerID_Call struct {
*mock.Call
}
// GetServerID is a helper method to define mock.On call
func (_e *MockSession_Expecter) GetServerID() *MockSession_GetServerID_Call {
return &MockSession_GetServerID_Call{Call: _e.mock.On("GetServerID")}
}
func (_c *MockSession_GetServerID_Call) Run(run func()) *MockSession_GetServerID_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_GetServerID_Call) Return(_a0 int64) *MockSession_GetServerID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_GetServerID_Call) RunAndReturn(run func() int64) *MockSession_GetServerID_Call {
_c.Call.Return(run)
return _c
}
// GetSessions provides a mock function with given fields: prefix
func (_m *MockSession) GetSessions(prefix string) (map[string]*Session, int64, error) {
ret := _m.Called(prefix)
@ -307,6 +389,47 @@ func (_c *MockSession_Init_Call) RunAndReturn(run func(string, string, bool, boo
return _c
}
// IsTriggerKill provides a mock function with given fields:
func (_m *MockSession) IsTriggerKill() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockSession_IsTriggerKill_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsTriggerKill'
type MockSession_IsTriggerKill_Call struct {
*mock.Call
}
// IsTriggerKill is a helper method to define mock.On call
func (_e *MockSession_Expecter) IsTriggerKill() *MockSession_IsTriggerKill_Call {
return &MockSession_IsTriggerKill_Call{Call: _e.mock.On("IsTriggerKill")}
}
func (_c *MockSession_IsTriggerKill_Call) Run(run func()) *MockSession_IsTriggerKill_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_IsTriggerKill_Call) Return(_a0 bool) *MockSession_IsTriggerKill_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_IsTriggerKill_Call) RunAndReturn(run func() bool) *MockSession_IsTriggerKill_Call {
_c.Call.Return(run)
return _c
}
// LivenessCheck provides a mock function with given fields: ctx, callback
func (_m *MockSession) LivenessCheck(ctx context.Context, callback func()) {
_m.Called(ctx, callback)

View File

@ -46,4 +46,8 @@ type SessionInterface interface {
SetEnableActiveStandBy(enable bool)
ProcessActiveStandBy(activateFunc func() error) error
ForceActiveStandby(activateFunc func() error) error
GetAddress() string
GetServerID() int64
IsTriggerKill() bool
}

View File

@ -102,6 +102,18 @@ type SessionRaw struct {
EnableDisk bool `json:"EnableDisk,omitempty"`
}
func (s *SessionRaw) GetAddress() string {
return s.Address
}
func (s *SessionRaw) GetServerID() int64 {
return s.ServerID
}
func (s *SessionRaw) IsTriggerKill() bool {
return s.TriggerKill
}
// Session is a struct to store service's session, including ServerID, ServerName,
// Address.
// Exclusive indicates that this server can only start one.

View File

@ -1195,16 +1195,17 @@ type queryCoordConfig struct {
// Deprecated: Since 2.2.2, use different interval for different checker
CheckInterval ParamItem `refreshable:"true"`
NextTargetSurviveTime ParamItem `refreshable:"true"`
UpdateNextTargetInterval ParamItem `refreshable:"false"`
CheckNodeInReplicaInterval ParamItem `refreshable:"false"`
CheckResourceGroupInterval ParamItem `refreshable:"false"`
EnableRGAutoRecover ParamItem `refreshable:"true"`
CheckHealthInterval ParamItem `refreshable:"false"`
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
BrokerTimeout ParamItem `refreshable:"false"`
CollectionRecoverTimesLimit ParamItem `refreshable:"true"`
ObserverTaskParallel ParamItem `refreshable:"false"`
NextTargetSurviveTime ParamItem `refreshable:"true"`
UpdateNextTargetInterval ParamItem `refreshable:"false"`
CheckNodeInReplicaInterval ParamItem `refreshable:"false"`
CheckResourceGroupInterval ParamItem `refreshable:"false"`
EnableRGAutoRecover ParamItem `refreshable:"true"`
CheckHealthInterval ParamItem `refreshable:"false"`
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
BrokerTimeout ParamItem `refreshable:"false"`
CollectionRecoverTimesLimit ParamItem `refreshable:"true"`
ObserverTaskParallel ParamItem `refreshable:"false"`
CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"`
}
func (p *queryCoordConfig) init(base *BaseTable) {
@ -1252,7 +1253,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
p.AutoBalance = ParamItem{
Key: "queryCoord.autoBalance",
Version: "2.0.0",
DefaultValue: "true",
DefaultValue: "false",
PanicIfEmpty: true,
Doc: "Enable auto balance",
Export: true,
@ -1526,6 +1527,16 @@ func (p *queryCoordConfig) init(base *BaseTable) {
Export: true,
}
p.ObserverTaskParallel.Init(base.mgr)
p.CheckAutoBalanceConfigInterval = ParamItem{
Key: "queryCoord.checkAutoBalanceConfigInterval",
Version: "2.3.3",
DefaultValue: "10",
PanicIfEmpty: true,
Doc: "the interval of check auto balance config",
Export: true,
}
p.CheckAutoBalanceConfigInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
@ -2030,6 +2041,9 @@ type dataCoordConfig struct {
IndexTaskSchedulerInterval ParamItem `refreshable:"false"`
MinSegmentNumRowsToEnableIndex ParamItem `refreshable:"true"`
// auto balance channel on datanode
AutoBalance ParamItem `refreshable:"true"`
CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"`
}
func (p *dataCoordConfig) init(base *BaseTable) {
@ -2385,6 +2399,26 @@ During compaction, the size of segment # of rows is able to exceed segment max #
DefaultValue: "1000",
}
p.IndexTaskSchedulerInterval.Init(base.mgr)
p.AutoBalance = ParamItem{
Key: "dataCoord.autoBalance",
Version: "2.3.3",
DefaultValue: "false",
PanicIfEmpty: true,
Doc: "Enable auto balance",
Export: true,
}
p.AutoBalance.Init(base.mgr)
p.CheckAutoBalanceConfigInterval = ParamItem{
Key: "dataCoord.checkAutoBalanceConfigInterval",
Version: "2.3.3",
DefaultValue: "10",
PanicIfEmpty: true,
Doc: "the interval of check auto balance config",
Export: true,
}
p.CheckAutoBalanceConfigInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -279,6 +279,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt())
assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())
assert.Equal(t, 3, Params.CollectionRecoverTimesLimit.GetAsInt())
assert.Equal(t, false, Params.AutoBalance.GetAsBool())
assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt())
})
t.Run("test queryNodeConfig", func(t *testing.T) {
@ -351,6 +353,9 @@ func TestComponentParam(t *testing.T) {
assert.True(t, Params.EnableGarbageCollection.GetAsBool())
assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false)
t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool())
assert.Equal(t, false, Params.AutoBalance.GetAsBool())
assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt())
})
t.Run("test dataNodeConfig", func(t *testing.T) {