fix: [2.4] Check partition in current target when observing partition load status (#34282) (#34305)

Cherry-pick from master
pr: #34282
See also #34234

`LoadPartitions` does not guarantee the current target has loading
partitions if there are some partitions already loaded before.

This PR check current target contains the partition to load when
advancing loading percentage to 100.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/34326/head
congqixia 2024-07-02 15:48:10 +08:00 committed by GitHub
parent bc8ca593bd
commit 4aa8a12ce8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 47 additions and 29 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -74,7 +75,7 @@ func (b *BalanceChecker) Description() string {
func (b *BalanceChecker) readyToCheck(collectionID int64) bool {
metaExist := (b.meta.GetCollection(collectionID) != nil)
targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID)
targetExist := b.targetMgr.IsNextTargetExist(collectionID) || b.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID)
return metaExist && targetExist
}

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -70,7 +71,7 @@ func (c *ChannelChecker) Description() string {
func (c *ChannelChecker) readyToCheck(collectionID int64) bool {
metaExist := (c.meta.GetCollection(collectionID) != nil)
targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID)
targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID)
return metaExist && targetExist
}

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -68,7 +69,7 @@ func (c *LeaderChecker) Description() string {
func (c *LeaderChecker) readyToCheck(collectionID int64) bool {
metaExist := (c.meta.GetCollection(collectionID) != nil)
targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID)
targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID, common.AllPartitionsID)
return metaExist && targetExist
}

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
)
@ -75,7 +76,7 @@ func (c *SegmentChecker) Description() string {
func (c *SegmentChecker) readyToCheck(collectionID int64) bool {
metaExist := (c.meta.GetCollection(collectionID) != nil)
targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID)
targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID)
return metaExist && targetExist
}

View File

@ -478,13 +478,13 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run
return _c
}
// IsCurrentTargetExist provides a mock function with given fields: collectionID
func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool {
ret := _m.Called(collectionID)
// IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID
func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool {
ret := _m.Called(collectionID, partitionID)
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(collectionID)
if rf, ok := ret.Get(0).(func(int64, int64) bool); ok {
r0 = rf(collectionID, partitionID)
} else {
r0 = ret.Get(0).(bool)
}
@ -499,13 +499,14 @@ type MockTargetManager_IsCurrentTargetExist_Call struct {
// IsCurrentTargetExist is a helper method to define mock.On call
// - collectionID int64
func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call {
return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)}
// - partitionID int64
func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}, partitionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call {
return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID, partitionID)}
}
func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call {
func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64, partitionID int64)) *MockTargetManager_IsCurrentTargetExist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
run(args[0].(int64), args[1].(int64))
})
return _c
}
@ -515,7 +516,7 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTar
return _c
}
func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call {
func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_IsCurrentTargetExist_Call {
_c.Call.Return(run)
return _c
}

View File

@ -23,19 +23,22 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// CollectionTarget collection target is immutable,
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
}
func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget {
func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
}
}
@ -43,6 +46,7 @@ func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[
func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
segments := make(map[int64]*datapb.SegmentInfo)
dmChannels := make(map[string]*DmChannel)
var partitions []int64
for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
@ -55,6 +59,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
InsertChannel: t.GetChannelName(),
}
}
partitions = append(partitions, partition.GetPartitionID())
}
dmChannels[t.GetChannelName()] = &DmChannel{
VchannelInfo: &datapb.VchannelInfo{
@ -68,7 +73,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}
}
return NewCollectionTarget(segments, dmChannels)
return NewCollectionTarget(segments, dmChannels, partitions)
}
func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget {

View File

@ -67,7 +67,7 @@ type TargetManagerInterface interface {
GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel
GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo
GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64
IsCurrentTargetExist(collectionID int64) bool
IsCurrentTargetExist(collectionID int64, partitionID int64) bool
IsNextTargetExist(collectionID int64) bool
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
@ -136,7 +136,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
return partition.PartitionID
})
allocatedTarget := NewCollectionTarget(nil, nil)
allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs)
mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID),
@ -373,8 +373,11 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect
for _, channel := range oldTarget.GetAllDmChannels() {
channels[channel.GetChannelName()] = channel
}
partitions := lo.Filter(oldTarget.partitions.Collect(), func(partitionID int64, _ int) bool {
return !partitionSet.Contain(partitionID)
})
return NewCollectionTarget(segments, channels)
return NewCollectionTarget(segments, channels, partitions)
}
func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget {
@ -604,10 +607,13 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T
return 0
}
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool {
newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
return len(newChannels) > 0
targets := mgr.getCollectionTarget(CurrentTarget, collectionID)
return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0
}
func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool {

View File

@ -346,7 +346,7 @@ func (suite *TargetManagerSuite) assertSegments(expected []int64, actual map[int
func (suite *TargetManagerSuite) TestGetCollectionTargetVersion() {
t1 := time.Now().UnixNano()
target := NewCollectionTarget(nil, nil)
target := NewCollectionTarget(nil, nil, nil)
t2 := time.Now().UnixNano()
version := target.GetTargetVersion()

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -221,7 +222,7 @@ func (ob *CollectionObserver) observeTimeout() {
func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {
metaExist := (ob.meta.GetCollection(collectionID) != nil)
targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID)
targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID)
return metaExist && targetExist
}
@ -332,7 +333,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID()) {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
log.Warn("failed to manual check current target, skip update load status")
return
}

View File

@ -171,9 +171,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
}
// Check whether provided collection is has current target.
// If not, submit a async task into dispatcher.
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64) bool {
result := ob.targetMgr.IsCurrentTargetExist(collectionID)
// If not, submit an async task into dispatcher.
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID)
if !result {
ob.dispatcher.AddTask(collectionID)
}

View File

@ -32,6 +32,7 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -284,7 +285,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
}
func (s *TargetObserverCheckSuite) TestCheck() {
r := s.observer.Check(context.Background(), s.collectionID)
r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID)
s.False(r)
s.True(s.observer.dispatcher.tasks.Contain(s.collectionID))
}