mirror of https://github.com/milvus-io/milvus.git
fix: Wrong behavior of CurrentTargetFirst/NextTargetFirst in target manager (#31378)
issue: #31162 pr: #31379 when give scope CurrentTargetFirst/NextTargetFirst, it's expected to scan both current and next target. This PR fixed wrong behavior of CurrentTargetFirst/NextTargetFirst in target manager, which may cause unexpected task generated, and load collection may stuck forever due to dirty leader view. --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/31398/head
parent
8946aa10d4
commit
f4449d4ef4
|
@ -330,22 +330,49 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect
|
|||
return NewCollectionTarget(segments, channels)
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) *CollectionTarget {
|
||||
func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget {
|
||||
switch scope {
|
||||
case CurrentTarget:
|
||||
return mgr.current.getCollectionTarget(collectionID)
|
||||
|
||||
ret := make([]*CollectionTarget, 0, 1)
|
||||
current := mgr.current.getCollectionTarget(collectionID)
|
||||
if current != nil {
|
||||
ret = append(ret, current)
|
||||
}
|
||||
return ret
|
||||
case NextTarget:
|
||||
return mgr.next.getCollectionTarget(collectionID)
|
||||
ret := make([]*CollectionTarget, 0, 1)
|
||||
next := mgr.next.getCollectionTarget(collectionID)
|
||||
if next != nil {
|
||||
ret = append(ret, next)
|
||||
}
|
||||
return ret
|
||||
case CurrentTargetFirst:
|
||||
if current := mgr.current.getCollectionTarget(collectionID); current != nil {
|
||||
return current
|
||||
ret := make([]*CollectionTarget, 0, 2)
|
||||
current := mgr.current.getCollectionTarget(collectionID)
|
||||
if current != nil {
|
||||
ret = append(ret, current)
|
||||
}
|
||||
return mgr.next.getCollectionTarget(collectionID)
|
||||
|
||||
next := mgr.next.getCollectionTarget(collectionID)
|
||||
if next != nil {
|
||||
ret = append(ret, next)
|
||||
}
|
||||
|
||||
return ret
|
||||
case NextTargetFirst:
|
||||
if next := mgr.next.getCollectionTarget(collectionID); next != nil {
|
||||
return next
|
||||
ret := make([]*CollectionTarget, 0, 2)
|
||||
next := mgr.next.getCollectionTarget(collectionID)
|
||||
if next != nil {
|
||||
ret = append(ret, next)
|
||||
}
|
||||
return mgr.current.getCollectionTarget(collectionID)
|
||||
|
||||
current := mgr.current.getCollectionTarget(collectionID)
|
||||
if current != nil {
|
||||
ret = append(ret, current)
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -356,18 +383,20 @@ func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
|
|||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
for _, t := range targets {
|
||||
segments := typeutil.NewUniqueSet()
|
||||
for _, channel := range t.GetAllDmChannels() {
|
||||
segments.Insert(channel.GetUnflushedSegmentIds()...)
|
||||
}
|
||||
|
||||
if len(segments) > 0 {
|
||||
return segments
|
||||
}
|
||||
}
|
||||
|
||||
segments := typeutil.NewUniqueSet()
|
||||
for _, channel := range collectionTarget.GetAllDmChannels() {
|
||||
segments.Insert(channel.GetUnflushedSegmentIds()...)
|
||||
}
|
||||
|
||||
return segments
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
|
||||
|
@ -377,20 +406,21 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
|
|||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
segments := typeutil.NewUniqueSet()
|
||||
for _, channel := range t.GetAllDmChannels() {
|
||||
if channel.ChannelName == channelName {
|
||||
segments.Insert(channel.GetUnflushedSegmentIds()...)
|
||||
}
|
||||
}
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
segments := typeutil.NewUniqueSet()
|
||||
for _, channel := range collectionTarget.GetAllDmChannels() {
|
||||
if channel.ChannelName == channelName {
|
||||
segments.Insert(channel.GetUnflushedSegmentIds()...)
|
||||
if len(segments) > 0 {
|
||||
return segments
|
||||
}
|
||||
}
|
||||
|
||||
return segments
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
|
||||
|
@ -399,12 +429,13 @@ func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
|
|||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
for _, t := range targets {
|
||||
return t.GetAllSegments()
|
||||
}
|
||||
return collectionTarget.GetAllSegments()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
|
||||
|
@ -414,19 +445,21 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
|
|||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
}
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
ret := make(map[int64]*datapb.SegmentInfo)
|
||||
for k, v := range t.GetAllSegments() {
|
||||
if v.GetInsertChannel() == channelName {
|
||||
ret[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
ret := make(map[int64]*datapb.SegmentInfo)
|
||||
for k, v := range collectionTarget.GetAllSegments() {
|
||||
if v.GetInsertChannel() == channelName {
|
||||
ret[k] = v
|
||||
if len(ret) > 0 {
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
|
||||
|
@ -436,86 +469,92 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
|
|||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
if channel, ok := t.dmChannels[channelName]; ok {
|
||||
return channel.GetDroppedSegmentIds()
|
||||
}
|
||||
}
|
||||
|
||||
channel := collectionTarget.dmChannels[channelName]
|
||||
if channel == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return channel.GetDroppedSegmentIds()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
|
||||
partitionID int64, scope TargetScope,
|
||||
partitionID int64,
|
||||
scope TargetScope,
|
||||
) map[int64]*datapb.SegmentInfo {
|
||||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
segments := make(map[int64]*datapb.SegmentInfo)
|
||||
for _, s := range t.GetAllSegments() {
|
||||
if s.GetPartitionID() == partitionID {
|
||||
segments[s.GetID()] = s
|
||||
}
|
||||
}
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
segments := make(map[int64]*datapb.SegmentInfo)
|
||||
for _, s := range collectionTarget.GetAllSegments() {
|
||||
if s.GetPartitionID() == partitionID {
|
||||
segments[s.GetID()] = s
|
||||
if len(segments) > 0 {
|
||||
return segments
|
||||
}
|
||||
}
|
||||
|
||||
return segments
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel {
|
||||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
for _, t := range targets {
|
||||
return t.GetAllDmChannels()
|
||||
}
|
||||
return collectionTarget.GetAllDmChannels()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel {
|
||||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
if ch, ok := t.GetAllDmChannels()[channel]; ok {
|
||||
return ch
|
||||
}
|
||||
}
|
||||
return collectionTarget.GetAllDmChannels()[channel]
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
|
||||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return nil
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
if s, ok := t.GetAllSegments()[id]; ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return collectionTarget.GetAllSegments()[id]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 {
|
||||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
collectionTarget := mgr.getCollectionTarget(scope, collectionID)
|
||||
|
||||
if collectionTarget == nil {
|
||||
return 0
|
||||
targets := mgr.getCollectionTarget(scope, collectionID)
|
||||
for _, t := range targets {
|
||||
if t.GetTargetVersion() > 0 {
|
||||
return t.GetTargetVersion()
|
||||
}
|
||||
}
|
||||
return collectionTarget.GetTargetVersion()
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool {
|
||||
|
|
|
@ -418,6 +418,10 @@ func (suite *TargetManagerSuite) TestGetSegmentByChannel() {
|
|||
suite.Len(suite.mgr.GetGrowingSegmentsByChannel(collectionID, "channel-1", NextTarget), 4)
|
||||
suite.Len(suite.mgr.GetGrowingSegmentsByChannel(collectionID, "channel-2", NextTarget), 1)
|
||||
suite.Len(suite.mgr.GetDroppedSegmentsByChannel(collectionID, "channel-1", NextTarget), 3)
|
||||
suite.Len(suite.mgr.GetGrowingSegmentsByCollection(collectionID, NextTarget), 5)
|
||||
suite.Len(suite.mgr.GetSealedSegmentsByPartition(collectionID, 1, NextTarget), 2)
|
||||
suite.NotNil(suite.mgr.GetSealedSegment(collectionID, 11, NextTarget))
|
||||
suite.NotNil(suite.mgr.GetDmChannel(collectionID, "channel-1", NextTarget))
|
||||
}
|
||||
|
||||
func (suite *TargetManagerSuite) TestGetTarget() {
|
||||
|
@ -425,7 +429,7 @@ func (suite *TargetManagerSuite) TestGetTarget() {
|
|||
tag string
|
||||
mgr *TargetManager
|
||||
scope TargetScope
|
||||
expectTarget *CollectionTarget
|
||||
expectTarget int
|
||||
}
|
||||
|
||||
current := &CollectionTarget{}
|
||||
|
@ -439,7 +443,7 @@ func (suite *TargetManagerSuite) TestGetTarget() {
|
|||
},
|
||||
next: &target{
|
||||
collectionTargetMap: map[int64]*CollectionTarget{
|
||||
1000: current,
|
||||
1000: next,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -462,89 +466,90 @@ func (suite *TargetManagerSuite) TestGetTarget() {
|
|||
|
||||
cases := []testCase{
|
||||
{
|
||||
tag: "both_scope_unknown",
|
||||
mgr: bothMgr,
|
||||
scope: -1,
|
||||
expectTarget: nil,
|
||||
tag: "both_scope_unknown",
|
||||
mgr: bothMgr,
|
||||
scope: -1,
|
||||
|
||||
expectTarget: 0,
|
||||
},
|
||||
{
|
||||
tag: "both_scope_current",
|
||||
mgr: bothMgr,
|
||||
scope: CurrentTarget,
|
||||
expectTarget: current,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "both_scope_next",
|
||||
mgr: bothMgr,
|
||||
scope: NextTarget,
|
||||
expectTarget: next,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "both_scope_current_first",
|
||||
mgr: bothMgr,
|
||||
scope: CurrentTargetFirst,
|
||||
expectTarget: current,
|
||||
expectTarget: 2,
|
||||
},
|
||||
{
|
||||
tag: "both_scope_next_first",
|
||||
mgr: bothMgr,
|
||||
scope: NextTargetFirst,
|
||||
expectTarget: next,
|
||||
expectTarget: 2,
|
||||
},
|
||||
{
|
||||
tag: "next_scope_current",
|
||||
mgr: nextMgr,
|
||||
scope: CurrentTarget,
|
||||
expectTarget: nil,
|
||||
expectTarget: 0,
|
||||
},
|
||||
{
|
||||
tag: "next_scope_next",
|
||||
mgr: nextMgr,
|
||||
scope: NextTarget,
|
||||
expectTarget: next,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "next_scope_current_first",
|
||||
mgr: nextMgr,
|
||||
scope: CurrentTargetFirst,
|
||||
expectTarget: next,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "next_scope_next_first",
|
||||
mgr: nextMgr,
|
||||
scope: NextTargetFirst,
|
||||
expectTarget: next,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "current_scope_current",
|
||||
mgr: currentMgr,
|
||||
scope: CurrentTarget,
|
||||
expectTarget: current,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "current_scope_next",
|
||||
mgr: currentMgr,
|
||||
scope: NextTarget,
|
||||
expectTarget: nil,
|
||||
expectTarget: 0,
|
||||
},
|
||||
{
|
||||
tag: "current_scope_current_first",
|
||||
mgr: currentMgr,
|
||||
scope: CurrentTargetFirst,
|
||||
expectTarget: current,
|
||||
expectTarget: 1,
|
||||
},
|
||||
{
|
||||
tag: "current_scope_next_first",
|
||||
mgr: currentMgr,
|
||||
scope: NextTargetFirst,
|
||||
expectTarget: current,
|
||||
expectTarget: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
suite.Run(tc.tag, func() {
|
||||
target := tc.mgr.getCollectionTarget(tc.scope, 1000)
|
||||
suite.Equal(tc.expectTarget, target)
|
||||
targets := tc.mgr.getCollectionTarget(tc.scope, 1000)
|
||||
suite.Equal(tc.expectTarget, len(targets))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue