mirror of https://github.com/milvus-io/milvus.git
parent
d35b0718e7
commit
40c6e01b41
|
@ -60,6 +60,8 @@ func TestReloadClusterFromKV(t *testing.T) {
|
|||
}
|
||||
}
|
||||
queryNode.stop()
|
||||
err = removeNodeSession(queryNode.queryNodeID)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test LoadOfflineNodes", func(t *testing.T) {
|
||||
|
@ -100,6 +102,9 @@ func TestReloadClusterFromKV(t *testing.T) {
|
|||
assert.Equal(t, 1, len(cluster.nodes))
|
||||
collection := cluster.getCollectionInfosByID(context.Background(), 100)
|
||||
assert.Equal(t, defaultCollectionID, collection[0].CollectionID)
|
||||
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -127,14 +132,7 @@ func TestGrpcRequest(t *testing.T) {
|
|||
nodeSession := node.session
|
||||
nodeID := node.queryNodeID
|
||||
cluster.registerNode(baseCtx, nodeSession, nodeID, disConnect)
|
||||
|
||||
for {
|
||||
online, err := cluster.isOnline(nodeID)
|
||||
assert.Nil(t, err)
|
||||
if online {
|
||||
break
|
||||
}
|
||||
}
|
||||
waitQueryNodeOnline(cluster, nodeID)
|
||||
|
||||
t.Run("Test GetComponentInfos", func(t *testing.T) {
|
||||
_, err := cluster.getComponentInfos(baseCtx)
|
||||
|
@ -191,4 +189,6 @@ func TestGrpcRequest(t *testing.T) {
|
|||
})
|
||||
|
||||
node.stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
|
@ -320,15 +320,12 @@ func TestGrpcTask(t *testing.T) {
|
|||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, res.Status.ErrorCode)
|
||||
})
|
||||
|
||||
//nodes, err := queryCoord.cluster.getOnServiceNodes()
|
||||
//assert.Nil(t, err)
|
||||
|
||||
err = node.stop()
|
||||
//assert.Nil(t, err)
|
||||
|
||||
//allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
||||
//assert.Equal(t, allNodeOffline, true)
|
||||
err = removeNodeSession(node.queryNodeID)
|
||||
assert.Nil(t, err)
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestLoadBalanceTask(t *testing.T) {
|
||||
|
@ -398,6 +395,8 @@ func TestLoadBalanceTask(t *testing.T) {
|
|||
queryNode1.stop()
|
||||
queryNode2.stop()
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestGrpcTaskBeforeHealthy(t *testing.T) {
|
||||
|
@ -550,4 +549,7 @@ func TestGrpcTaskBeforeHealthy(t *testing.T) {
|
|||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, res.Status.ErrorCode)
|
||||
})
|
||||
|
||||
unHealthyCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
|
@ -164,6 +164,8 @@ func TestWatchNodeLoop(t *testing.T) {
|
|||
}
|
||||
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test RegisterNewNode", func(t *testing.T) {
|
||||
|
@ -184,6 +186,8 @@ func TestWatchNodeLoop(t *testing.T) {
|
|||
|
||||
queryCoord.Stop()
|
||||
queryNode1.stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test RemoveNode", func(t *testing.T) {
|
||||
|
@ -196,6 +200,8 @@ func TestWatchNodeLoop(t *testing.T) {
|
|||
|
||||
nodeID := queryNode1.queryNodeID
|
||||
queryNode1.stop()
|
||||
err = removeNodeSession(nodeID)
|
||||
assert.Nil(t, err)
|
||||
for {
|
||||
_, err = queryCoord.cluster.getNodeByID(nodeID)
|
||||
if err != nil {
|
||||
|
@ -203,5 +209,7 @@ func TestWatchNodeLoop(t *testing.T) {
|
|||
}
|
||||
}
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -75,6 +75,18 @@ func waitAllQueryNodeOffline(cluster Cluster, nodes map[int64]Node) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func waitQueryNodeOnline(cluster Cluster, nodeID int64) {
|
||||
for {
|
||||
online, err := cluster.isOnline(nodeID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if online {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryNode_MultiNode_stop(t *testing.T) {
|
||||
refreshParams()
|
||||
baseCtx := context.Background()
|
||||
|
@ -84,11 +96,12 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
|
|||
|
||||
queryNode1, err := startQueryNodeServer(baseCtx)
|
||||
assert.Nil(t, err)
|
||||
waitQueryNodeOnline(queryCoord.cluster, queryNode1.queryNodeID)
|
||||
|
||||
queryNode5, err := startQueryNodeServer(baseCtx)
|
||||
queryNode2, err := startQueryNodeServer(baseCtx)
|
||||
assert.Nil(t, err)
|
||||
waitQueryNodeOnline(queryCoord.cluster, queryNode2.queryNodeID)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
queryNode1.stop()
|
||||
err = removeNodeSession(queryNode1.queryNodeID)
|
||||
assert.Nil(t, err)
|
||||
|
@ -100,7 +113,6 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
|
|||
CollectionID: defaultCollectionID,
|
||||
Schema: genCollectionSchema(defaultCollectionID, false),
|
||||
})
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
_, err = queryCoord.ReleaseCollection(baseCtx, &querypb.ReleaseCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ReleaseCollection,
|
||||
|
@ -111,8 +123,8 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
|
|||
time.Sleep(100 * time.Millisecond)
|
||||
nodes, err := queryCoord.cluster.onlineNodes()
|
||||
assert.Nil(t, err)
|
||||
queryNode5.stop()
|
||||
err = removeNodeSession(queryNode5.queryNodeID)
|
||||
queryNode2.stop()
|
||||
err = removeNodeSession(queryNode2.queryNodeID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
||||
|
@ -131,6 +143,7 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) {
|
|||
|
||||
queryNode1, err := startQueryNodeServer(baseCtx)
|
||||
assert.Nil(t, err)
|
||||
waitQueryNodeOnline(queryCoord.cluster, queryNode1.queryNodeID)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
queryCoord.LoadCollection(baseCtx, &querypb.LoadCollectionRequest{
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
|
@ -151,8 +150,9 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) {
|
|||
}
|
||||
queryCoord.scheduler.Enqueue([]task{testTask})
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
queryCoord.cluster.stopNode(nodeID)
|
||||
queryNode.stop()
|
||||
err = removeNodeSession(queryNode.queryNodeID)
|
||||
assert.Nil(t, err)
|
||||
for {
|
||||
newActiveTaskIDKeys, _, err := queryCoord.scheduler.client.LoadWithPrefix(activeTaskPrefix)
|
||||
assert.Nil(t, err)
|
||||
|
@ -161,7 +161,8 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) {
|
|||
}
|
||||
}
|
||||
queryCoord.Stop()
|
||||
queryNode.stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestUnMarshalTask(t *testing.T) {
|
||||
|
|
|
@ -123,13 +123,8 @@ func TestTriggerTask(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
//nodes, err := queryCoord.cluster.getOnServiceNodes()
|
||||
//assert.Nil(t, err)
|
||||
|
||||
err = node.stop()
|
||||
//assert.Nil(t, err)
|
||||
|
||||
//allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
||||
//assert.Equal(t, allNodeOffline, true)
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue