Add ut for loadbalanceTask when index created (#11981)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/12116/head
xige-16 2021-11-18 22:21:55 +08:00 committed by GitHub
parent cc4013bbdc
commit 38afc63c16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 1 deletions

View File

@ -48,7 +48,7 @@ const (
defaultSegmentID = UniqueID(2021)
defaultQueryNodeID = int64(100)
defaultChannelNum = 2
defaultNumRowPerSegment = 10000
defaultNumRowPerSegment = 1000
)
func genCollectionSchema(collectionID UniqueID, isBinary bool) *schemapb.CollectionSchema {

View File

@ -943,3 +943,89 @@ func TestLoadBalanceSegmentsTask(t *testing.T) {
err = removeAllSession()
assert.Nil(t, err)
}
func TestLoadBalanceIndexedSegmentsTask(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
indexCoord := newIndexCoordMock()
indexCoord.returnIndexFile = true
queryCoord.indexCoordClient = indexCoord
node1, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
loadCollectionTask := genLoadCollectionTask(ctx, queryCoord)
err = queryCoord.scheduler.Enqueue(loadCollectionTask)
assert.Nil(t, err)
waitTaskFinalState(loadCollectionTask, taskExpired)
node2, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
},
SourceNodeIDs: []int64{node1.queryNodeID},
SealedSegmentIDs: []UniqueID{defaultSegmentID},
},
rootCoord: queryCoord.rootCoordClient,
dataCoord: queryCoord.dataCoordClient,
indexCoord: queryCoord.indexCoordClient,
cluster: queryCoord.cluster,
meta: queryCoord.meta,
}
err = queryCoord.scheduler.Enqueue(loadBalanceTask)
assert.Nil(t, err)
waitTaskFinalState(loadBalanceTask, taskExpired)
node1.stop()
node2.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func TestLoadBalanceIndexedSegmentsAfterNodeDown(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node1, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
loadCollectionTask := genLoadCollectionTask(ctx, queryCoord)
err = queryCoord.scheduler.Enqueue(loadCollectionTask)
assert.Nil(t, err)
waitTaskFinalState(loadCollectionTask, taskExpired)
node2, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
indexCoord := newIndexCoordMock()
indexCoord.returnIndexFile = true
queryCoord.indexCoordClient = indexCoord
removeNodeSession(node1.queryNodeID)
for {
if len(queryCoord.meta.getSegmentInfosByNode(node1.queryNodeID)) == 0 {
break
}
}
node2.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}