mirror of https://github.com/milvus-io/milvus.git
parent
f121be436c
commit
75672abab4
internal
querycoord
util/sessionutil
|
@ -13,6 +13,7 @@ package querycoord
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -22,12 +23,37 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
//func waitQueryNodeOnline(cluster *queryNodeCluster, nodeID int64)
|
//func waitQueryNodeOnline(cluster *queryNodeCluster, nodeID int64)
|
||||||
|
|
||||||
|
func removeNodeSession(id int64) error {
|
||||||
|
kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = kv.Remove(fmt.Sprintf("session/"+typeutil.QueryNodeRole+"-%d", id))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeAllSession() error {
|
||||||
|
kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = kv.RemoveWithPrefix("session")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func waitAllQueryNodeOffline(cluster Cluster, nodes map[int64]Node) bool {
|
func waitAllQueryNodeOffline(cluster Cluster, nodes map[int64]Node) bool {
|
||||||
reDoCount := 20
|
reDoCount := 40
|
||||||
for {
|
for {
|
||||||
if reDoCount <= 0 {
|
if reDoCount <= 0 {
|
||||||
return false
|
return false
|
||||||
|
@ -64,6 +90,8 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
queryNode1.stop()
|
queryNode1.stop()
|
||||||
|
err = removeNodeSession(queryNode1.queryNodeID)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
queryCoord.LoadCollection(baseCtx, &querypb.LoadCollectionRequest{
|
queryCoord.LoadCollection(baseCtx, &querypb.LoadCollectionRequest{
|
||||||
Base: &commonpb.MsgBase{
|
Base: &commonpb.MsgBase{
|
||||||
|
@ -84,10 +112,14 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
|
||||||
nodes, err := queryCoord.cluster.onlineNodes()
|
nodes, err := queryCoord.cluster.onlineNodes()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
queryNode5.stop()
|
queryNode5.stop()
|
||||||
|
err = removeNodeSession(queryNode5.queryNodeID)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
||||||
assert.Equal(t, allNodeOffline, true)
|
assert.Equal(t, allNodeOffline, true)
|
||||||
queryCoord.Stop()
|
queryCoord.Stop()
|
||||||
|
err = removeAllSession()
|
||||||
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryNode_MultiNode_reStart(t *testing.T) {
|
func TestQueryNode_MultiNode_reStart(t *testing.T) {
|
||||||
|
@ -109,6 +141,8 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) {
|
||||||
Schema: genCollectionSchema(defaultCollectionID, false),
|
Schema: genCollectionSchema(defaultCollectionID, false),
|
||||||
})
|
})
|
||||||
queryNode1.stop()
|
queryNode1.stop()
|
||||||
|
err = removeNodeSession(queryNode1.queryNodeID)
|
||||||
|
assert.Nil(t, err)
|
||||||
queryNode3, err := startQueryNodeServer(baseCtx)
|
queryNode3, err := startQueryNodeServer(baseCtx)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
@ -123,10 +157,14 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) {
|
||||||
nodes, err := queryCoord.cluster.onlineNodes()
|
nodes, err := queryCoord.cluster.onlineNodes()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
queryNode3.stop()
|
queryNode3.stop()
|
||||||
|
err = removeNodeSession(queryNode3.queryNodeID)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
|
||||||
assert.Equal(t, allNodeOffline, true)
|
assert.Equal(t, allNodeOffline, true)
|
||||||
queryCoord.Stop()
|
queryCoord.Stop()
|
||||||
|
err = removeAllSession()
|
||||||
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryNode_getMetrics(t *testing.T) {
|
func TestQueryNode_getMetrics(t *testing.T) {
|
||||||
|
@ -153,6 +191,8 @@ func TestNewQueryNode(t *testing.T) {
|
||||||
cancel()
|
cancel()
|
||||||
node.stop()
|
node.stop()
|
||||||
queryNode1.stop()
|
queryNode1.stop()
|
||||||
|
err = removeAllSession()
|
||||||
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReleaseCollectionOnOfflineNode(t *testing.T) {
|
func TestReleaseCollectionOnOfflineNode(t *testing.T) {
|
||||||
|
|
|
@ -20,7 +20,7 @@ const (
|
||||||
DefaultServiceRoot = "session/"
|
DefaultServiceRoot = "session/"
|
||||||
DefaultIDKey = "id"
|
DefaultIDKey = "id"
|
||||||
DefaultRetryTimes = 30
|
DefaultRetryTimes = 30
|
||||||
DefaultTTL = 10
|
DefaultTTL = 60
|
||||||
)
|
)
|
||||||
|
|
||||||
type SessionEventType int
|
type SessionEventType int
|
||||||
|
|
Loading…
Reference in New Issue