mirror of https://github.com/milvus-io/milvus.git
Add legacySegments for load balance segments rc track (#16831)
- Add legacySegments in ShardCluster tracking old segment inUse - Apply node id check for segment online/inUse check - Add unit test simulating load balance case Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/16845/head
parent
ac389f3e93
commit
c7935d1a86
|
@ -123,13 +123,14 @@ type ShardCluster struct {
|
|||
segmentDetector ShardSegmentDetector
|
||||
nodeBuilder ShardNodeBuilder
|
||||
|
||||
mut sync.RWMutex
|
||||
nodes map[int64]*shardNode // online nodes
|
||||
segments map[int64]*shardSegmentInfo // shard segments
|
||||
handoffs map[int32]*querypb.SegmentChangeInfo // current pending handoff
|
||||
lastToken *atomic.Int32 // last token used for segment change info
|
||||
segmentCond *sync.Cond // segment state change condition
|
||||
rcCond *sync.Cond // segment rc change condition
|
||||
mut sync.RWMutex
|
||||
nodes map[int64]*shardNode // online nodes
|
||||
segments map[int64]*shardSegmentInfo // shard segments
|
||||
legacySegments []shardSegmentInfo // legacySegments records, stores segment usage BEFORE load balance
|
||||
handoffs map[int32]*querypb.SegmentChangeInfo // current pending handoff
|
||||
lastToken *atomic.Int32 // last token used for segment change info
|
||||
segmentCond *sync.Cond // segment state change condition
|
||||
rcCond *sync.Cond // segment rc change condition
|
||||
|
||||
closeOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
|
@ -284,6 +285,10 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo
|
|||
}
|
||||
|
||||
// transferSegment apply segment state transition.
|
||||
// old\new | Offline | Loading | Loaded
|
||||
// Offline | OK | OK | OK
|
||||
// Loading | OK | OK | NodeID check
|
||||
// Loaded | OK | OK | legacy pending
|
||||
func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt segmentEvent) {
|
||||
switch old.state {
|
||||
case segmentStateOffline: // safe to update nodeID and state
|
||||
|
@ -303,6 +308,18 @@ func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt segmentEvent)
|
|||
sc.healthCheck()
|
||||
}
|
||||
case segmentStateLoaded:
|
||||
// load balance
|
||||
if old.nodeID != evt.nodeID {
|
||||
sc.legacySegments = append(sc.legacySegments, shardSegmentInfo{
|
||||
nodeID: old.nodeID,
|
||||
segmentID: old.segmentID,
|
||||
inUse: old.inUse,
|
||||
state: old.state,
|
||||
})
|
||||
// set `old` segment.inUse (actually the new online segment) to zero, since there is no allocation for new segment
|
||||
// original inUse is recorded in legacySegments
|
||||
old.inUse = 0
|
||||
}
|
||||
old.nodeID = evt.nodeID
|
||||
old.state = evt.state
|
||||
if evt.state != segmentStateLoaded {
|
||||
|
@ -462,6 +479,7 @@ func (sc *ShardCluster) segmentAllocations(partitionIDs []int64) map[int64][]int
|
|||
|
||||
// inHandoffOffline checks whether segment is pending handoff offline list
|
||||
// Note that sc.mut Lock is assumed to be hold outside of this function!
|
||||
// legacySegments will no be checked as same segment is in another node with loaded state
|
||||
func (sc *ShardCluster) inHandoffOffline(segmentID int64) bool {
|
||||
for _, handoff := range sc.handoffs {
|
||||
for _, offlineSegment := range handoff.OfflineSegments {
|
||||
|
@ -482,16 +500,31 @@ func (sc *ShardCluster) finishUsage(allocs map[int64][]int64) {
|
|||
}()
|
||||
sc.mut.Lock()
|
||||
defer sc.mut.Unlock()
|
||||
for _, segments := range allocs {
|
||||
for nodeID, segments := range allocs {
|
||||
for _, segmentID := range segments {
|
||||
// checking online segments
|
||||
segment, ok := sc.segments[segmentID]
|
||||
if !ok || segment == nil {
|
||||
// this shall not happen, since removing segment without decreasing rc to zero is illegal
|
||||
log.Error("finishUsage with non-existing segment", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannel", sc.vchannelName), zap.Int64("segmentID", segmentID))
|
||||
continue
|
||||
}
|
||||
// decrease the reference count
|
||||
segment.inUse--
|
||||
if segment.nodeID == nodeID {
|
||||
// decrease the reference count
|
||||
segment.inUse--
|
||||
}
|
||||
|
||||
// checking legacy segments
|
||||
for idx, segment := range sc.legacySegments {
|
||||
if segment.segmentID == segmentID && segment.nodeID == nodeID {
|
||||
sc.legacySegments[idx].inUse--
|
||||
// rc is zero, remove from legacy list
|
||||
if sc.legacySegments[idx].inUse == 0 {
|
||||
sc.legacySegments[idx] = sc.legacySegments[len(sc.legacySegments)-1]
|
||||
sc.legacySegments = sc.legacySegments[:len(sc.legacySegments)-1]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -499,13 +532,16 @@ func (sc *ShardCluster) finishUsage(allocs map[int64][]int64) {
|
|||
// HandoffSegments processes the handoff/load balance segments update procedure.
|
||||
func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
|
||||
// wait for all OnlineSegment is loaded
|
||||
onlineSegments := make([]int64, 0, len(info.OnlineSegments))
|
||||
onlineSegments := make([]shardSegmentInfo, 0, len(info.OnlineSegments))
|
||||
for _, seg := range info.OnlineSegments {
|
||||
// filter out segments not maintained in this cluster
|
||||
if seg.GetCollectionID() != sc.collectionID || seg.GetDmChannel() != sc.vchannelName {
|
||||
continue
|
||||
}
|
||||
onlineSegments = append(onlineSegments, seg.GetSegmentID())
|
||||
onlineSegments = append(onlineSegments, shardSegmentInfo{
|
||||
nodeID: seg.GetNodeID(),
|
||||
segmentID: seg.GetSegmentID(),
|
||||
})
|
||||
}
|
||||
sc.waitSegmentsOnline(onlineSegments)
|
||||
|
||||
|
@ -513,9 +549,12 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
|
|||
token := sc.appendHandoff(info)
|
||||
|
||||
// wait for all OfflineSegments is not in use
|
||||
offlineSegments := make([]int64, 0, len(info.OfflineSegments))
|
||||
offlineSegments := make([]shardSegmentInfo, 0, len(info.OfflineSegments))
|
||||
for _, seg := range info.OfflineSegments {
|
||||
offlineSegments = append(offlineSegments, seg.GetSegmentID())
|
||||
offlineSegments = append(offlineSegments, shardSegmentInfo{
|
||||
nodeID: seg.GetNodeID(),
|
||||
segmentID: seg.GetSegmentID(),
|
||||
})
|
||||
}
|
||||
sc.waitSegmentsNotInUse(offlineSegments)
|
||||
|
||||
|
@ -582,7 +621,7 @@ func (sc *ShardCluster) finishHandoff(token int32) {
|
|||
}
|
||||
|
||||
// waitSegmentsOnline waits until all provided segments is loaded.
|
||||
func (sc *ShardCluster) waitSegmentsOnline(segments []int64) {
|
||||
func (sc *ShardCluster) waitSegmentsOnline(segments []shardSegmentInfo) {
|
||||
sc.segmentCond.L.Lock()
|
||||
for !sc.segmentsOnline(segments) {
|
||||
sc.segmentCond.Wait()
|
||||
|
@ -591,7 +630,7 @@ func (sc *ShardCluster) waitSegmentsOnline(segments []int64) {
|
|||
}
|
||||
|
||||
// waitSegmentsNotInUse waits until all provided segments is not in use.
|
||||
func (sc *ShardCluster) waitSegmentsNotInUse(segments []int64) {
|
||||
func (sc *ShardCluster) waitSegmentsNotInUse(segments []shardSegmentInfo) {
|
||||
sc.rcCond.L.Lock()
|
||||
for sc.segmentsInUse(segments) {
|
||||
sc.rcCond.Wait()
|
||||
|
@ -599,13 +638,14 @@ func (sc *ShardCluster) waitSegmentsNotInUse(segments []int64) {
|
|||
sc.rcCond.L.Unlock()
|
||||
}
|
||||
|
||||
// checkOnline checks whether all segment ids provided in online state.
|
||||
func (sc *ShardCluster) segmentsOnline(segmentIDs []int64) bool {
|
||||
// checkOnline checks whether all segment info provided in online state.
|
||||
func (sc *ShardCluster) segmentsOnline(segments []shardSegmentInfo) bool {
|
||||
sc.mut.RLock()
|
||||
defer sc.mut.RUnlock()
|
||||
for _, segID := range segmentIDs {
|
||||
segment, ok := sc.segments[segID]
|
||||
if !ok || segment.state != segmentStateLoaded {
|
||||
for _, segInfo := range segments {
|
||||
segment, ok := sc.segments[segInfo.segmentID]
|
||||
// check segment online on #specified Node#
|
||||
if !ok || segment.state != segmentStateLoaded || segment.nodeID != segInfo.nodeID {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -613,18 +653,22 @@ func (sc *ShardCluster) segmentsOnline(segmentIDs []int64) bool {
|
|||
}
|
||||
|
||||
// segmentsInUse checks whether all segment ids provided still in use.
|
||||
func (sc *ShardCluster) segmentsInUse(segmentIDs []int64) bool {
|
||||
func (sc *ShardCluster) segmentsInUse(segments []shardSegmentInfo) bool {
|
||||
sc.mut.RLock()
|
||||
defer sc.mut.RUnlock()
|
||||
for _, segID := range segmentIDs {
|
||||
segment, ok := sc.segments[segID]
|
||||
if !ok {
|
||||
// ignore missing segments, since they might be in streaming
|
||||
continue
|
||||
}
|
||||
if segment.inUse > 0 {
|
||||
for _, segInfo := range segments {
|
||||
// check online segments
|
||||
segment, ok := sc.segments[segInfo.segmentID]
|
||||
if ok && segment.inUse > 0 && segment.nodeID == segInfo.nodeID {
|
||||
return true
|
||||
}
|
||||
|
||||
// check legacy segments
|
||||
for _, segment := range sc.legacySegments {
|
||||
if segment.nodeID == segInfo.nodeID && segment.segmentID == segInfo.segmentID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -412,6 +412,9 @@ func TestShardCluster_segmentEvent(t *testing.T) {
|
|||
}, buildMockQueryNode)
|
||||
defer sc.Close()
|
||||
|
||||
// make reference greater than 0
|
||||
allocs := sc.segmentAllocations(nil)
|
||||
|
||||
evtCh <- segmentEvent{
|
||||
segmentID: 2,
|
||||
nodeID: 2,
|
||||
|
@ -453,6 +456,78 @@ func TestShardCluster_segmentEvent(t *testing.T) {
|
|||
return has && seg.nodeID == 2 && seg.state == segmentStateLoaded
|
||||
}, time.Second, time.Millisecond)
|
||||
|
||||
sc.mut.RLock()
|
||||
assert.Equal(t, 0, len(sc.legacySegments))
|
||||
sc.mut.RUnlock()
|
||||
|
||||
sc.finishUsage(allocs)
|
||||
sc.mut.RLock()
|
||||
assert.Equal(t, 0, len(sc.legacySegments))
|
||||
sc.mut.RUnlock()
|
||||
})
|
||||
|
||||
t.Run("from loaded, node changed", func(t *testing.T) {
|
||||
segmentEvents := []segmentEvent{
|
||||
{
|
||||
segmentID: 1,
|
||||
nodeID: 1,
|
||||
state: segmentStateLoaded,
|
||||
},
|
||||
{
|
||||
segmentID: 2,
|
||||
nodeID: 2,
|
||||
state: segmentStateLoaded,
|
||||
},
|
||||
}
|
||||
|
||||
evtCh := make(chan segmentEvent, 10)
|
||||
sc := NewShardCluster(collectionID, replicaID, vchannelName,
|
||||
&mockNodeDetector{}, &mockSegmentDetector{
|
||||
initSegments: segmentEvents,
|
||||
evtCh: evtCh,
|
||||
}, buildMockQueryNode)
|
||||
defer sc.Close()
|
||||
|
||||
// make reference greater than 0
|
||||
allocs := sc.segmentAllocations(nil)
|
||||
|
||||
// bring segment online in the other querynode
|
||||
evtCh <- segmentEvent{
|
||||
segmentID: 1,
|
||||
nodeID: 2,
|
||||
state: segmentStateLoaded,
|
||||
eventType: segmentAdd,
|
||||
}
|
||||
|
||||
evtCh <- segmentEvent{
|
||||
segmentID: 2,
|
||||
nodeID: 1,
|
||||
state: segmentStateLoaded,
|
||||
eventType: segmentAdd,
|
||||
}
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
seg, has := sc.getSegment(1)
|
||||
return has && seg.nodeID == 2 && seg.state == segmentStateLoaded
|
||||
}, time.Second, time.Millisecond)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
seg, has := sc.getSegment(2)
|
||||
return has && seg.nodeID == 1 && seg.state == segmentStateLoaded
|
||||
}, time.Second, time.Millisecond)
|
||||
|
||||
sc.mut.RLock()
|
||||
assert.Equal(t, 2, len(sc.legacySegments))
|
||||
assert.ElementsMatch(t, []shardSegmentInfo{
|
||||
{segmentID: 1, nodeID: 1, state: segmentStateLoaded, inUse: 1},
|
||||
{segmentID: 2, nodeID: 2, state: segmentStateLoaded, inUse: 1},
|
||||
}, sc.legacySegments)
|
||||
sc.mut.RUnlock()
|
||||
|
||||
sc.finishUsage(allocs)
|
||||
sc.mut.RLock()
|
||||
assert.Equal(t, 0, len(sc.legacySegments))
|
||||
sc.mut.RUnlock()
|
||||
})
|
||||
|
||||
t.Run("from offline", func(t *testing.T) {
|
||||
|
@ -1179,8 +1254,8 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
|
|||
}
|
||||
sc.mut.RUnlock()
|
||||
|
||||
assert.True(t, sc.segmentsInUse([]int64{1, 2}))
|
||||
assert.True(t, sc.segmentsInUse([]int64{1, 2, -1}))
|
||||
assert.True(t, sc.segmentsInUse([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}}))
|
||||
assert.True(t, sc.segmentsInUse([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}, {nodeID: 2, segmentID: -1}}))
|
||||
|
||||
sc.finishUsage(allocs)
|
||||
sc.mut.RLock()
|
||||
|
@ -1188,9 +1263,8 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
|
|||
assert.EqualValues(t, segment.inUse, 0)
|
||||
}
|
||||
sc.mut.RUnlock()
|
||||
|
||||
assert.False(t, sc.segmentsInUse([]int64{1, 2}))
|
||||
assert.False(t, sc.segmentsInUse([]int64{1, 2, -1}))
|
||||
assert.False(t, sc.segmentsInUse([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}}))
|
||||
assert.False(t, sc.segmentsInUse([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}, {nodeID: 2, segmentID: -1}}))
|
||||
})
|
||||
|
||||
t.Run("alloc & finish with modified alloc", func(t *testing.T) {
|
||||
|
@ -1283,12 +1357,12 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
|
|||
}, buildMockQueryNode)
|
||||
defer sc.Close()
|
||||
|
||||
assert.True(t, sc.segmentsOnline([]int64{1, 2}))
|
||||
assert.False(t, sc.segmentsOnline([]int64{1, 2, 3}))
|
||||
assert.True(t, sc.segmentsOnline([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}}))
|
||||
assert.False(t, sc.segmentsOnline([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}, {nodeID: 1, segmentID: 3}}))
|
||||
|
||||
sig := make(chan struct{})
|
||||
go func() {
|
||||
sc.waitSegmentsOnline([]int64{1, 2, 3})
|
||||
sc.waitSegmentsOnline([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}, {nodeID: 1, segmentID: 3}})
|
||||
close(sig)
|
||||
}()
|
||||
|
||||
|
@ -1300,7 +1374,7 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
|
|||
}
|
||||
|
||||
<-sig
|
||||
assert.True(t, sc.segmentsOnline([]int64{1, 2, 3}))
|
||||
assert.True(t, sc.segmentsOnline([]shardSegmentInfo{{nodeID: 1, segmentID: 1}, {nodeID: 2, segmentID: 2}, {nodeID: 1, segmentID: 3}}))
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1507,6 +1581,100 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
|
|||
assert.False(t, has)
|
||||
})
|
||||
|
||||
t.Run("load balance wait online and usage", func(t *testing.T) {
|
||||
nodeEvents := []nodeEvent{
|
||||
{
|
||||
nodeID: 1,
|
||||
nodeAddr: "addr_1",
|
||||
},
|
||||
{
|
||||
nodeID: 2,
|
||||
nodeAddr: "addr_2",
|
||||
},
|
||||
}
|
||||
|
||||
segmentEvents := []segmentEvent{
|
||||
{
|
||||
segmentID: 1,
|
||||
nodeID: 1,
|
||||
state: segmentStateLoaded,
|
||||
},
|
||||
{
|
||||
segmentID: 2,
|
||||
nodeID: 2,
|
||||
state: segmentStateLoaded,
|
||||
},
|
||||
}
|
||||
evtCh := make(chan segmentEvent, 10)
|
||||
sc := NewShardCluster(collectionID, replicaID, vchannelName,
|
||||
&mockNodeDetector{
|
||||
initNodes: nodeEvents,
|
||||
}, &mockSegmentDetector{
|
||||
initSegments: segmentEvents,
|
||||
evtCh: evtCh,
|
||||
}, buildMockQueryNode)
|
||||
defer sc.Close()
|
||||
|
||||
// add rc to all segments
|
||||
allocs := sc.segmentAllocations(nil)
|
||||
|
||||
sig := make(chan struct{})
|
||||
go func() {
|
||||
err := sc.HandoffSegments(&querypb.SegmentChangeInfo{
|
||||
OnlineSegments: []*querypb.SegmentInfo{
|
||||
{SegmentID: 1, NodeID: 2, CollectionID: collectionID, DmChannel: vchannelName, NodeIds: []UniqueID{1}},
|
||||
},
|
||||
OfflineSegments: []*querypb.SegmentInfo{
|
||||
{SegmentID: 1, NodeID: 1, CollectionID: collectionID, DmChannel: vchannelName, NodeIds: []UniqueID{1}},
|
||||
},
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
close(sig)
|
||||
}()
|
||||
|
||||
sc.mut.RLock()
|
||||
// still waiting online
|
||||
assert.Equal(t, 0, len(sc.handoffs))
|
||||
sc.mut.RUnlock()
|
||||
|
||||
evtCh <- segmentEvent{
|
||||
eventType: segmentAdd,
|
||||
segmentID: 1,
|
||||
nodeID: 2,
|
||||
state: segmentStateLoaded,
|
||||
}
|
||||
|
||||
// wait for handoff appended into list
|
||||
assert.Eventually(t, func() bool {
|
||||
sc.mut.RLock()
|
||||
defer sc.mut.RUnlock()
|
||||
return len(sc.handoffs) > 0
|
||||
}, time.Second, time.Millisecond*10)
|
||||
|
||||
tmpAllocs := sc.segmentAllocations(nil)
|
||||
for nodeID, segments := range tmpAllocs {
|
||||
for _, segment := range segments {
|
||||
if segment == int64(1) {
|
||||
assert.Equal(t, int64(2), nodeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
sc.finishUsage(tmpAllocs)
|
||||
// rc shall be 0 now
|
||||
sc.finishUsage(allocs)
|
||||
|
||||
// wait handoff finished
|
||||
<-sig
|
||||
|
||||
sc.mut.RLock()
|
||||
info, has := sc.segments[1]
|
||||
sc.mut.RUnlock()
|
||||
|
||||
assert.True(t, has)
|
||||
assert.Equal(t, int64(2), info.nodeID)
|
||||
})
|
||||
|
||||
t.Run("handoff from non-exist node", func(t *testing.T) {
|
||||
nodeEvents := []nodeEvent{
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue