fix: [2.5] Querycoord will trigger unexpected balance task after restart (#38725)

issue: https://github.com/milvus-io/milvus/issues/38606
pr: https://github.com/milvus-io/milvus/pull/38630

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/38678/head
wei liu 2024-12-25 16:14:49 +08:00 committed by GitHub
parent b16d04d7cc
commit cb0618b2d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 134 additions and 40 deletions

View File

@ -831,6 +831,7 @@ message CheckerInfo {
message SegmentTarget {
int64 ID = 1;
data.SegmentLevel level = 2;
int64 num_of_rows = 3;
}
message PartitionTarget {

View File

@ -22,9 +22,9 @@ func (_m *MockBalancer) EXPECT() *MockBalancer_Expecter {
return &MockBalancer_Expecter{mock: &_m.Mock}
}
// AssignChannel provides a mock function with given fields: ctx, collectionID, channels, nodes, manualBalance
func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
ret := _m.Called(ctx, collectionID, channels, nodes, manualBalance)
// AssignChannel provides a mock function with given fields: ctx, collectionID, channels, nodes, forceAssign
func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
ret := _m.Called(ctx, collectionID, channels, nodes, forceAssign)
if len(ret) == 0 {
panic("no return value specified for AssignChannel")
@ -32,7 +32,7 @@ func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, c
var r0 []ChannelAssignPlan
if rf, ok := ret.Get(0).(func(context.Context, int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok {
r0 = rf(ctx, collectionID, channels, nodes, manualBalance)
r0 = rf(ctx, collectionID, channels, nodes, forceAssign)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]ChannelAssignPlan)
@ -52,12 +52,12 @@ type MockBalancer_AssignChannel_Call struct {
// - collectionID int64
// - channels []*meta.DmChannel
// - nodes []int64
// - manualBalance bool
func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, collectionID interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call {
return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, collectionID, channels, nodes, manualBalance)}
// - forceAssign bool
func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, collectionID interface{}, channels interface{}, nodes interface{}, forceAssign interface{}) *MockBalancer_AssignChannel_Call {
return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, collectionID, channels, nodes, forceAssign)}
}
func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call {
func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool)) *MockBalancer_AssignChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].([]*meta.DmChannel), args[3].([]int64), args[4].(bool))
})
@ -74,9 +74,9 @@ func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(context.Context
return _c
}
// AssignSegment provides a mock function with given fields: ctx, collectionID, segments, nodes, manualBalance
func (_m *MockBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
ret := _m.Called(ctx, collectionID, segments, nodes, manualBalance)
// AssignSegment provides a mock function with given fields: ctx, collectionID, segments, nodes, forceAssign
func (_m *MockBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
ret := _m.Called(ctx, collectionID, segments, nodes, forceAssign)
if len(ret) == 0 {
panic("no return value specified for AssignSegment")
@ -84,7 +84,7 @@ func (_m *MockBalancer) AssignSegment(ctx context.Context, collectionID int64, s
var r0 []SegmentAssignPlan
if rf, ok := ret.Get(0).(func(context.Context, int64, []*meta.Segment, []int64, bool) []SegmentAssignPlan); ok {
r0 = rf(ctx, collectionID, segments, nodes, manualBalance)
r0 = rf(ctx, collectionID, segments, nodes, forceAssign)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]SegmentAssignPlan)
@ -104,12 +104,12 @@ type MockBalancer_AssignSegment_Call struct {
// - collectionID int64
// - segments []*meta.Segment
// - nodes []int64
// - manualBalance bool
func (_e *MockBalancer_Expecter) AssignSegment(ctx interface{}, collectionID interface{}, segments interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignSegment_Call {
return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", ctx, collectionID, segments, nodes, manualBalance)}
// - forceAssign bool
func (_e *MockBalancer_Expecter) AssignSegment(ctx interface{}, collectionID interface{}, segments interface{}, nodes interface{}, forceAssign interface{}) *MockBalancer_AssignSegment_Call {
return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", ctx, collectionID, segments, nodes, forceAssign)}
}
func (_c *MockBalancer_AssignSegment_Call) Run(run func(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool)) *MockBalancer_AssignSegment_Call {
func (_c *MockBalancer_AssignSegment_Call) Run(run func(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool)) *MockBalancer_AssignSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].([]*meta.Segment), args[3].([]int64), args[4].(bool))
})

View File

@ -119,6 +119,17 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
return nil
}
// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
// todo: should also check distribution and leader view in the future
return !b.targetMgr.IsCurrentTargetReady(ctx, cid)
})
if len(notReadyCollections) > 0 {
log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections))
return nil
}
// iterator one normal collection in one round
normalReplicasToBalance := make([]int64, 0)
hasUnbalancedCollection := false

View File

@ -324,20 +324,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
mockTarget := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTarget
// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
@ -347,8 +335,6 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
@ -358,6 +344,17 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2)
suite.checker.meta.ReplicaManager.Put(ctx, replica2)
// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
replicasToBalance := suite.checker.replicasToBalance(ctx)
suite.Len(replicasToBalance, 0)
// test stopping balance with target not ready
mockTarget.ExpectedCalls = nil
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(false)
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid1), mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid2), mock.Anything).Return(false)
mr1 := replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
@ -366,9 +363,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
mr2.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
// test stopping balance
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.replicasToBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
}

View File

@ -733,6 +733,53 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(con
return _c
}
// IsCurrentTargetReady provides a mock function with given fields: ctx, collectionID
func (_m *MockTargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool {
ret := _m.Called(ctx, collectionID)
if len(ret) == 0 {
panic("no return value specified for IsCurrentTargetReady")
}
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, int64) bool); ok {
r0 = rf(ctx, collectionID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockTargetManager_IsCurrentTargetReady_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCurrentTargetReady'
type MockTargetManager_IsCurrentTargetReady_Call struct {
*mock.Call
}
// IsCurrentTargetReady is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
func (_e *MockTargetManager_Expecter) IsCurrentTargetReady(ctx interface{}, collectionID interface{}) *MockTargetManager_IsCurrentTargetReady_Call {
return &MockTargetManager_IsCurrentTargetReady_Call{Call: _e.mock.On("IsCurrentTargetReady", ctx, collectionID)}
}
func (_c *MockTargetManager_IsCurrentTargetReady_Call) Run(run func(ctx context.Context, collectionID int64)) *MockTargetManager_IsCurrentTargetReady_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
})
return _c
}
func (_c *MockTargetManager_IsCurrentTargetReady_Call) Return(_a0 bool) *MockTargetManager_IsCurrentTargetReady_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_IsCurrentTargetReady_Call) RunAndReturn(run func(context.Context, int64) bool) *MockTargetManager_IsCurrentTargetReady_Call {
_c.Call.Return(run)
return _c
}
// IsNextTargetExist provides a mock function with given fields: ctx, collectionID
func (_m *MockTargetManager) IsNextTargetExist(ctx context.Context, collectionID int64) bool {
ret := _m.Called(ctx, collectionID)

View File

@ -20,10 +20,12 @@ import (
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -34,6 +36,9 @@ type CollectionTarget struct {
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}
func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
@ -50,15 +55,20 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
dmChannels := make(map[string]*DmChannel)
var partitions []int64
lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
NumOfRows: segment.GetNumOfRows(),
}
}
partitions = append(partitions, partition.GetPartitionID())
@ -75,11 +85,16 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}
}
if lackSegmentInfo {
log.Info("target has lack of segment info", zap.Int64("collectionID", target.GetCollectionID()))
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}
@ -113,8 +128,9 @@ func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget {
}
partitionTarget.Segments = append(partitionTarget.Segments, &querypb.SegmentTarget{
ID: info.GetID(),
Level: info.GetLevel(),
ID: info.GetID(),
Level: info.GetLevel(),
NumOfRows: info.GetNumOfRows(),
})
}
}
@ -159,6 +175,11 @@ func (p *CollectionTarget) IsEmpty() bool {
return len(p.dmChannels)+len(p.segments) == 0
}
// if target is ready, it should have all segment info
func (p *CollectionTarget) Ready() bool {
return !p.lackSegmentInfo
}
type target struct {
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget

View File

@ -72,6 +72,7 @@ type TargetManagerInterface interface {
CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool
GetTargetJSON(ctx context.Context, scope TargetScope) string
GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error)
IsCurrentTargetReady(ctx context.Context, collectionID int64) bool
}
type TargetManager struct {
@ -673,3 +674,14 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target {
return mgr.next
}
func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
target, ok := mgr.current.collectionTargetMap[collectionID]
if !ok {
return false
}
return target.Ready()
}

View File

@ -584,11 +584,13 @@ func (suite *TargetManagerSuite) TestRecover() {
ID: 11,
PartitionID: 1,
InsertChannel: "channel-1",
NumOfRows: 100,
},
{
ID: 12,
PartitionID: 1,
InsertChannel: "channel-2",
NumOfRows: 100,
},
}
@ -609,6 +611,10 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.Len(target.GetAllDmChannelNames(), 2)
suite.Len(target.GetAllSegmentIDs(), 2)
suite.Equal(target.GetTargetVersion(), version)
for _, segment := range target.GetAllSegments() {
suite.Equal(int64(100), segment.GetNumOfRows())
}
suite.True(target.Ready())
// after recover, target info should be cleaned up
targets, err := suite.catalog.GetCollectionTargets(ctx)