Use NodeIds instead of NodeID in segment info (#16912)

The NodeID field is deprecated in current design
Query Node segment detector shall read NodeIds instead of NodeID

Fix #16911

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/16917/head
congqixia 2022-05-11 17:39:53 +08:00 committed by GitHub
parent 13c643a3a4
commit a0614a3ff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 345 additions and 143 deletions

View File

@ -73,7 +73,7 @@ type segmentEvent struct {
eventType segmentEventType
segmentID int64
partitionID int64
nodeID int64
nodeIDs []int64 // nodes from events
state segmentState
}
@ -222,7 +222,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) {
}
// updateSegment apply segment change to shard cluster
func (sc *ShardCluster) updateSegment(evt segmentEvent) {
func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) {
log.Info("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
// notify handoff wait online if any
defer func() {
@ -273,8 +273,7 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo
continue
}
sc.transferSegment(old, segmentEvent{
eventType: segmentAdd,
sc.transferSegment(old, shardSegmentInfo{
nodeID: line.GetNodeId(),
partitionID: line.GetPartitionId(),
segmentID: segmentID,
@ -289,7 +288,7 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo
// Offline | OK | OK | OK
// Loading | OK | OK | NodeID check
// Loaded | OK | OK | legacy pending
func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt segmentEvent) {
func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt shardSegmentInfo) {
switch old.state {
case segmentStateOffline: // safe to update nodeID and state
old.nodeID = evt.nodeID
@ -330,7 +329,7 @@ func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt segmentEvent)
// removeSegment removes segment from cluster
// should only applied in hand-off or load balance procedure
func (sc *ShardCluster) removeSegment(evt segmentEvent) {
func (sc *ShardCluster) removeSegment(evt shardSegmentInfo) {
log.Info("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
sc.mut.Lock()
@ -362,13 +361,32 @@ func (sc *ShardCluster) init() {
// list segments
segments, segmentEvtCh := sc.segmentDetector.watchSegments(sc.collectionID, sc.replicaID, sc.vchannelName)
for _, segment := range segments {
sc.updateSegment(segment)
info, ok := sc.pickNode(segment)
if ok {
sc.updateSegment(info)
}
}
go sc.watchSegments(segmentEvtCh)
sc.healthCheck()
}
// pickNode selects node id in cluster
func (sc *ShardCluster) pickNode(evt segmentEvent) (shardSegmentInfo, bool) {
for _, nodeID := range evt.nodeIDs {
_, has := sc.getNode(nodeID)
if has { // assume one segment shall exist once in one replica
return shardSegmentInfo{
segmentID: evt.segmentID,
partitionID: evt.partitionID,
nodeID: nodeID,
state: evt.state,
}, true
}
}
return shardSegmentInfo{}, false
}
// healthCheck iterate all segments to to check cluster could provide service.
func (sc *ShardCluster) healthCheck() {
for _, segment := range sc.segments {
@ -411,11 +429,15 @@ func (sc *ShardCluster) watchSegments(evtCh <-chan segmentEvent) {
log.Warn("ShardCluster segment channel closed", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID))
return
}
info, ok := sc.pickNode(evt)
if !ok {
continue
}
switch evt.eventType {
case segmentAdd:
sc.updateSegment(evt)
sc.updateSegment(info)
case segmentDel:
sc.removeSegment(evt)
sc.removeSegment(info)
}
case <-sc.closeCh:
log.Info("ShardCluster watchSegments quit", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName))
@ -565,7 +587,7 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
if seg.GetCollectionID() != sc.collectionID || seg.GetDmChannel() != sc.vchannelName {
continue
}
sc.removeSegment(segmentEvent{segmentID: seg.GetSegmentID(), nodeID: seg.GetNodeID()})
sc.removeSegment(shardSegmentInfo{segmentID: seg.GetSegmentID(), nodeID: seg.GetNodeID()})
removes[seg.GetNodeID()] = append(removes[seg.GetNodeID()], seg.SegmentID)
}

View File

@ -117,25 +117,38 @@ func TestShardCluster_Create(t *testing.T) {
})
t.Run("init segments", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoading,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
},
}
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{
initNodes: nodeEvents,
}, &mockSegmentDetector{
initSegments: segmentEvents,
}, buildMockQueryNode)
defer sc.Close()
@ -143,11 +156,16 @@ func TestShardCluster_Create(t *testing.T) {
for _, e := range segmentEvents {
sc.mut.RLock()
segment, has := sc.segments[e.segmentID]
_, inCluster := sc.pickNode(e)
sc.mut.RUnlock()
assert.True(t, has)
assert.Equal(t, e.segmentID, segment.segmentID)
assert.Equal(t, e.nodeID, segment.nodeID)
assert.Equal(t, e.state, segment.state)
if inCluster {
assert.True(t, has)
assert.Equal(t, e.segmentID, segment.segmentID)
assert.Contains(t, e.nodeIDs, segment.nodeID)
assert.Equal(t, e.state, segment.state)
} else {
assert.False(t, has)
}
}
assert.EqualValues(t, unavailable, sc.state.Load())
})
@ -238,17 +256,17 @@ func TestShardCluster_nodeEvent(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoading,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{2},
state: segmentStateOffline,
},
}
@ -310,27 +328,44 @@ func TestShardCluster_segmentEvent(t *testing.T) {
replicaID := int64(0)
t.Run("from loading", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoading,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoading,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateLoading,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{
initNodes: nodeEvents,
}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -338,21 +373,21 @@ func TestShardCluster_segmentEvent(t *testing.T) {
evtCh <- segmentEvent{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoading,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
eventType: segmentAdd,
}
@ -373,7 +408,7 @@ func TestShardCluster_segmentEvent(t *testing.T) {
// node id not match
evtCh <- segmentEvent{
segmentID: 1,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
@ -386,27 +421,41 @@ func TestShardCluster_segmentEvent(t *testing.T) {
})
t.Run("from loaded", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateLoaded,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -415,33 +464,40 @@ func TestShardCluster_segmentEvent(t *testing.T) {
// make reference greater than 0
allocs := sc.segmentAllocations(nil)
evtCh <- segmentEvent{
segmentID: 4,
nodeIDs: []int64{4},
state: segmentStateLoaded,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoading,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
eventType: segmentAdd,
}
assert.Eventually(t, func() bool {
seg, has := sc.getSegment(1)
return has && seg.nodeID == 1 && seg.state == segmentStateLoading
@ -456,6 +512,9 @@ func TestShardCluster_segmentEvent(t *testing.T) {
return has && seg.nodeID == 2 && seg.state == segmentStateLoaded
}, time.Second, time.Millisecond)
_, has := sc.getSegment(4)
assert.False(t, has)
sc.mut.RLock()
assert.Equal(t, 0, len(sc.legacySegments))
sc.mut.RUnlock()
@ -467,22 +526,33 @@ func TestShardCluster_segmentEvent(t *testing.T) {
})
t.Run("from loaded, node changed", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -494,14 +564,14 @@ func TestShardCluster_segmentEvent(t *testing.T) {
// bring segment online in the other querynode
evtCh <- segmentEvent{
segmentID: 1,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
eventType: segmentAdd,
}
@ -531,47 +601,62 @@ func TestShardCluster_segmentEvent(t *testing.T) {
})
t.Run("from offline", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateOffline,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateOffline,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{2},
state: segmentStateOffline,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
defer sc.Close()
evtCh <- segmentEvent{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoading,
eventType: segmentAdd,
}
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
@ -594,27 +679,42 @@ func TestShardCluster_segmentEvent(t *testing.T) {
})
t.Run("remove segments", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoading,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -622,17 +722,17 @@ func TestShardCluster_segmentEvent(t *testing.T) {
evtCh <- segmentEvent{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
eventType: segmentDel,
}
evtCh <- segmentEvent{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
eventType: segmentDel,
}
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
eventType: segmentDel,
}
@ -652,27 +752,42 @@ func TestShardCluster_segmentEvent(t *testing.T) {
})
t.Run("remove failed", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoading,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -681,20 +796,20 @@ func TestShardCluster_segmentEvent(t *testing.T) {
// non-exist segment
evtCh <- segmentEvent{
segmentID: 4,
nodeID: 4,
nodeIDs: []int64{3},
eventType: segmentDel,
}
// segment node id not match
evtCh <- segmentEvent{
segmentID: 3,
nodeID: 4,
nodeIDs: []int64{4},
eventType: segmentDel,
}
// use add segment as event process signal
evtCh <- segmentEvent{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
eventType: segmentAdd,
}
@ -714,11 +829,26 @@ func TestShardCluster_SyncSegments(t *testing.T) {
replicaID := int64(0)
t.Run("sync new segments", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -754,27 +884,42 @@ func TestShardCluster_SyncSegments(t *testing.T) {
})
t.Run("sync existing segments", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateOffline,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateOffline,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
},
}
evtCh := make(chan segmentEvent, 10)
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
evtCh: evtCh,
}, buildMockQueryNode)
@ -817,26 +962,41 @@ func TestShardCluster_Search(t *testing.T) {
ctx := context.Background()
t.Run("search unavailable cluster", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateOffline,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateOffline,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
},
}
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
}, buildMockQueryNode)
@ -876,17 +1036,17 @@ func TestShardCluster_Search(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
{
segmentID: 3,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -923,17 +1083,17 @@ func TestShardCluster_Search(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
{
segmentID: 3,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -973,22 +1133,15 @@ func TestShardCluster_Search(t *testing.T) {
nodeAddr: "addr_2",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
state: segmentStateLoaded,
},
// segment belongs to node not registered
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1000,6 +1153,15 @@ func TestShardCluster_Search(t *testing.T) {
initSegments: segmentEvents,
}, buildMockQueryNode)
//mock meta error
sc.mut.Lock()
sc.segments[3] = &shardSegmentInfo{
segmentID: 3,
nodeID: 3, // node does not exist
state: segmentStateLoaded,
}
sc.mut.Unlock()
defer sc.Close()
require.EqualValues(t, available, sc.state.Load())
@ -1017,26 +1179,41 @@ func TestShardCluster_Query(t *testing.T) {
ctx := context.Background()
t.Run("query unavailable cluster", func(t *testing.T) {
nodeEvents := []nodeEvent{
{
nodeID: 1,
nodeAddr: "addr_1",
},
{
nodeID: 2,
nodeAddr: "addr_2",
},
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateOffline,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateOffline,
},
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{3},
state: segmentStateOffline,
},
}
sc := NewShardCluster(collectionID, replicaID, vchannelName,
&mockNodeDetector{}, &mockSegmentDetector{
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{
initSegments: segmentEvents,
}, buildMockQueryNode)
@ -1074,17 +1251,17 @@ func TestShardCluster_Query(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
{
segmentID: 3,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1121,17 +1298,17 @@ func TestShardCluster_Query(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
{
segmentID: 3,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1174,18 +1351,12 @@ func TestShardCluster_Query(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
state: segmentStateLoaded,
},
// segment belongs to node not registered
{
segmentID: 3,
nodeID: 3,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1197,6 +1368,15 @@ func TestShardCluster_Query(t *testing.T) {
initSegments: segmentEvents,
}, buildMockQueryNode)
//mock meta error
sc.mut.Lock()
sc.segments[3] = &shardSegmentInfo{
segmentID: 3,
nodeID: 3, // node does not exist
state: segmentStateLoaded,
}
sc.mut.Unlock()
defer sc.Close()
require.EqualValues(t, available, sc.state.Load())
@ -1229,12 +1409,12 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1282,12 +1462,12 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1338,12 +1518,12 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1369,7 +1549,7 @@ func TestShardCluster_ReferenceCount(t *testing.T) {
evtCh <- segmentEvent{
eventType: segmentAdd,
segmentID: 3,
nodeID: 1,
nodeIDs: []int64{1, 4},
state: segmentStateLoaded,
}
@ -1400,12 +1580,12 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1451,12 +1631,12 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1501,12 +1681,12 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1546,7 +1726,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
evtCh <- segmentEvent{
eventType: segmentAdd,
segmentID: 3,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
}
@ -1596,12 +1776,12 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1641,7 +1821,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
evtCh <- segmentEvent{
eventType: segmentAdd,
segmentID: 1,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
}
@ -1690,12 +1870,12 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}
@ -1736,12 +1916,12 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
segmentEvents := []segmentEvent{
{
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
{
segmentID: 2,
nodeID: 2,
nodeIDs: []int64{2},
state: segmentStateLoaded,
},
}

View File

@ -97,7 +97,7 @@ func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID
eventType: segmentAdd,
segmentID: info.GetSegmentID(),
partitionID: info.GetPartitionID(),
nodeID: info.GetNodeID(),
nodeIDs: info.GetNodeIds(),
state: segmentStateLoaded,
})
}
@ -158,7 +158,7 @@ func (sd *etcdShardSegmentDetector) handlePutEvent(e *clientv3.Event, collection
eventType: segmentAdd,
segmentID: info.GetSegmentID(),
partitionID: info.GetPartitionID(),
nodeID: info.GetNodeID(),
nodeIDs: info.GetNodeIds(),
state: segmentStateLoaded,
}
}
@ -182,7 +182,7 @@ func (sd *etcdShardSegmentDetector) handleDelEvent(e *clientv3.Event, collection
eventType: segmentDel,
segmentID: info.GetSegmentID(),
partitionID: info.GetPartitionID(),
nodeID: info.GetNodeID(),
nodeIDs: info.GetNodeIds(),
state: segmentStateOffline,
}
}

View File

@ -57,7 +57,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentAdd,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
},
@ -105,7 +105,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentAdd,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
},
@ -119,7 +119,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
"segment_1": {
CollectionID: 1,
SegmentID: 1,
NodeID: 1,
NodeIds: []int64{1},
DmChannel: "dml_1_1_v0",
ReplicaIds: []int64{1, 2},
},
@ -131,7 +131,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentAdd,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
},
@ -145,14 +145,14 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
"segment_1": {
CollectionID: 1,
SegmentID: 1,
NodeID: 1,
NodeIds: []int64{1},
DmChannel: "dml_1_1_v0",
ReplicaIds: []int64{1, 2},
},
"segment_2": {
CollectionID: 1,
SegmentID: 2,
NodeID: 2,
NodeIds: []int64{2},
DmChannel: "dml_2_1_v1",
ReplicaIds: []int64{2},
},
@ -164,7 +164,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentAdd,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
},
@ -178,7 +178,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
"segment_1": {
CollectionID: 1,
SegmentID: 1,
NodeID: 1,
NodeIds: []int64{1},
DmChannel: "dml_1_1_v0",
ReplicaIds: []int64{1, 2},
},
@ -190,7 +190,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentAdd,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
},
@ -201,7 +201,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentDel,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateOffline,
},
},
@ -215,14 +215,14 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
"segment_1": {
CollectionID: 1,
SegmentID: 1,
NodeID: 1,
NodeIds: []int64{1},
DmChannel: "dml_1_1_v0",
ReplicaIds: []int64{1, 2},
},
"segment_2": {
CollectionID: 1,
SegmentID: 2,
NodeID: 2,
NodeIds: []int64{2},
DmChannel: "dml_2_1_v1",
ReplicaIds: []int64{2},
},
@ -231,7 +231,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentAdd,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateLoaded,
},
},
@ -242,7 +242,7 @@ func TestEtcdShardSegmentDetector_watch(t *testing.T) {
{
eventType: segmentDel,
segmentID: 1,
nodeID: 1,
nodeIDs: []int64{1},
state: segmentStateOffline,
},
},