Add getNodeInfoByID and getSegmentInfoByNode function for cluster (#11247)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/11320/head
xige-16 2021-11-05 16:00:55 +08:00 committed by GitHub
parent fa6ea60324
commit 9fcfbde368
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 491 additions and 159 deletions

View File

@ -58,14 +58,16 @@ type Cluster interface {
releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error
getNodeByID(nodeID int64) (Node, error)
getNodeInfoByID(nodeID int64) (Node, error)
removeNodeInfo(nodeID int64) error
stopNode(nodeID int64)
onlineNodes() (map[int64]Node, error)
isOnline(nodeID int64) (bool, error)
offlineNodes() (map[int64]Node, error)
hasNode(nodeID int64) bool
getSessionVersion() int64
@ -96,7 +98,7 @@ type queryNodeCluster struct {
newNodeFn newQueryNodeFn
}
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (*queryNodeCluster, error) {
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (Cluster, error) {
childCtx, cancel := context.WithCancel(ctx)
nodes := make(map[int64]Node)
c := &queryNodeCluster{
@ -223,7 +225,7 @@ func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *q
return nil
}
return errors.New("LoadSegments: Can't find query node by nodeID ")
return fmt.Errorf("LoadSegments: Can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error {
@ -244,7 +246,7 @@ func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in
return nil
}
return errors.New("ReleaseSegments: Can't find query node by nodeID ")
return fmt.Errorf("ReleaseSegments: Can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
@ -271,7 +273,7 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in
return nil
}
return errors.New("WatchDmChannels: Can't find query node by nodeID ")
return fmt.Errorf("WatchDmChannels: Can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error {
@ -308,7 +310,7 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in
return nil
}
return errors.New("AddQueryChannel: can't find query node by nodeID")
return fmt.Errorf("AddQueryChannel: can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error {
c.Lock()
@ -324,7 +326,7 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64,
return nil
}
return errors.New("RemoveQueryChannel: can't find query node by nodeID")
return fmt.Errorf("RemoveQueryChannel: can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
@ -345,7 +347,7 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64,
return nil
}
return errors.New("ReleaseCollection: can't find query node by nodeID")
return fmt.Errorf("ReleaseCollection: can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
@ -369,7 +371,7 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
return nil
}
return errors.New("ReleasePartitions: can't find query node by nodeID")
return fmt.Errorf("ReleasePartitions: can't find query node by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
@ -391,6 +393,21 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe
return segmentInfos, nil
}
func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.RLock()
defer c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
res, err := node.getSegmentInfo(ctx, in)
if err != nil {
return nil, err
}
return res.Infos, nil
}
return nil, fmt.Errorf("ReleasePartitions: can't find query node by nodeID, nodeID = %d", nodeID)
}
type queryNodeGetMetricsResponse struct {
resp *milvuspb.GetMetricsResponse
err error
@ -417,7 +434,7 @@ func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("GetNumDmChannels: Can't find query node by nodeID ")
return 0, fmt.Errorf("GetNumDmChannels: Can't find query node by nodeID, nodeID = %d", nodeID)
}
numChannel := 0
@ -437,7 +454,7 @@ func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("getNumSegments: Can't find query node by nodeID ")
return 0, fmt.Errorf("getNumSegments: Can't find query node by nodeID, nodeID = %d", nodeID)
}
numSegment := 0
@ -486,12 +503,16 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
return fmt.Errorf("RegisterNode: node %d alredy exists in cluster", id)
}
func (c *queryNodeCluster) getNodeByID(nodeID int64) (Node, error) {
func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) {
c.RLock()
defer c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
return node, nil
nodeInfo, err := node.getNodeInfo()
if err != nil {
return nil, err
}
return nodeInfo, nil
}
return nil, fmt.Errorf("GetNodeByID: query node %d not exist", nodeID)
@ -557,6 +578,17 @@ func (c *queryNodeCluster) offlineNodes() (map[int64]Node, error) {
return c.getOfflineNodes()
}
func (c *queryNodeCluster) hasNode(nodeID int64) bool {
c.RLock()
defer c.RUnlock()
if _, ok := c.nodes[nodeID]; ok {
return true
}
return false
}
func (c *queryNodeCluster) getOfflineNodes() (map[int64]Node, error) {
nodes := make(map[int64]Node)
for nodeID, node := range c.nodes {
@ -579,7 +611,7 @@ func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) {
return node.isOnline(), nil
}
return false, fmt.Errorf("IsOnService: query node %d not exist", nodeID)
return false, fmt.Errorf("IsOnline: query node %d not exist", nodeID)
}
//func (c *queryNodeCluster) printMeta() {

View File

@ -22,6 +22,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -53,12 +54,8 @@ func TestReloadClusterFromKV(t *testing.T) {
cluster.reloadFromKV()
nodeID := queryNode.queryNodeID
for {
_, err = cluster.getNodeByID(nodeID)
if err == nil {
break
}
}
waitQueryNodeOnline(cluster, nodeID)
queryNode.stop()
err = removeNodeSession(queryNode.queryNodeID)
assert.Nil(t, err)
@ -128,6 +125,22 @@ func TestGrpcRequest(t *testing.T) {
session: clusterSession,
}
t.Run("Test GetNodeInfoByIDWithNodeNotExist", func(t *testing.T) {
_, err := cluster.getNodeInfoByID(defaultQueryNodeID)
assert.NotNil(t, err)
})
t.Run("Test GetSegmentInfoByNodeWithNodeNotExist", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, defaultQueryNodeID, getSegmentInfoReq)
assert.NotNil(t, err)
})
node, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
nodeSession := node.session
@ -149,6 +162,7 @@ func TestGrpcRequest(t *testing.T) {
loadSegmentReq := &querypb.LoadSegmentsRequest{
DstNodeID: nodeID,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: genCollectionSchema(defaultCollectionID, false),
}
err := cluster.loadSegments(baseCtx, nodeID, loadSegmentReq)
assert.Nil(t, err)
@ -191,7 +205,81 @@ func TestGrpcRequest(t *testing.T) {
assert.Nil(t, err)
})
node.stop()
t.Run("Test GetSegmentInfo", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfo(baseCtx, getSegmentInfoReq)
assert.Nil(t, err)
})
t.Run("Test GetSegmentInfoByNode", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
assert.Nil(t, err)
})
node.getSegmentInfos = returnFailedGetSegmentInfoResult
t.Run("Test GetSegmentInfoFailed", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfo(baseCtx, getSegmentInfoReq)
assert.NotNil(t, err)
})
t.Run("Test GetSegmentInfoByNodeFailed", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
assert.NotNil(t, err)
})
node.getSegmentInfos = returnSuccessGetSegmentInfoResult
t.Run("Test GetNodeInfoByID", func(t *testing.T) {
res, err := cluster.getNodeInfoByID(nodeID)
assert.Nil(t, err)
assert.NotNil(t, res)
})
node.getMetrics = returnFailedGetMetricsResult
t.Run("Test GetNodeInfoByIDFailed", func(t *testing.T) {
_, err := cluster.getNodeInfoByID(nodeID)
assert.NotNil(t, err)
})
node.getMetrics = returnSuccessGetMetricsResult
cluster.stopNode(nodeID)
t.Run("Test GetSegmentInfoByNodeAfterNodeStop", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
assert.NotNil(t, err)
})
err = removeAllSession()
assert.Nil(t, err)
}

View File

@ -25,11 +25,16 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
defaultTotalmemPerNode = 6000000000
)
type queryNodeServerMock struct {
querypb.QueryNodeServer
ctx context.Context
@ -50,6 +55,12 @@ type queryNodeServerMock struct {
releaseCollection func() (*commonpb.Status, error)
releasePartition func() (*commonpb.Status, error)
releaseSegments func() (*commonpb.Status, error)
getSegmentInfos func() (*querypb.GetSegmentInfoResponse, error)
getMetrics func() (*milvuspb.GetMetricsResponse, error)
totalMem uint64
memUsage uint64
memUsageRate float64
}
func newQueryNodeServerMock(ctx context.Context) *queryNodeServerMock {
@ -67,6 +78,12 @@ func newQueryNodeServerMock(ctx context.Context) *queryNodeServerMock {
releaseCollection: returnSuccessResult,
releasePartition: returnSuccessResult,
releaseSegments: returnSuccessResult,
getSegmentInfos: returnSuccessGetSegmentInfoResult,
getMetrics: returnSuccessGetMetricsResult,
totalMem: defaultTotalmemPerNode,
memUsage: uint64(0),
memUsageRate: float64(0),
}
}
@ -173,6 +190,16 @@ func (qs *queryNodeServerMock) WatchDeltaChannels(ctx context.Context, req *quer
}
func (qs *queryNodeServerMock) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
sizePerRecord, err := typeutil.EstimateSizePerRecord(req.Schema)
if err != nil {
return returnFailedResult()
}
totalNumRow := int64(0)
for _, info := range req.Infos {
totalNumRow += info.NumOfRows
}
qs.memUsage += uint64(totalNumRow) * uint64(sizePerRecord)
qs.memUsageRate = float64(qs.memUsage) / float64(qs.totalMem)
return qs.loadSegment()
}
@ -189,19 +216,37 @@ func (qs *queryNodeServerMock) ReleaseSegments(ctx context.Context, req *querypb
}
func (qs *queryNodeServerMock) GetSegmentInfo(context.Context, *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
return &querypb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
return qs.getSegmentInfos()
}
func (qs *queryNodeServerMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
response, err := qs.getMetrics()
if err != nil {
return nil, err
}
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, errors.New("query node do task failed")
}
nodeInfos := metricsinfo.QueryNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, qs.queryNodeID),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: qs.queryNodeIP,
Memory: qs.totalMem,
MemoryUsage: qs.memUsage,
},
Type: typeutil.QueryNodeRole,
ID: qs.queryNodeID,
},
}, nil
}
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
response.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
response.Status.Reason = err.Error()
return response, err
}
response.Response = resp
return response, nil
}
func startQueryNodeServer(ctx context.Context) (*queryNodeServerMock, error) {
@ -225,3 +270,35 @@ func returnFailedResult() (*commonpb.Status, error) {
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, errors.New("query node do task failed")
}
func returnSuccessGetSegmentInfoResult() (*querypb.GetSegmentInfoResponse, error) {
return &querypb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
func returnFailedGetSegmentInfoResult() (*querypb.GetSegmentInfoResponse, error) {
return &querypb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("query node do task failed")
}
func returnSuccessGetMetricsResult() (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
func returnFailedGetMetricsResult() (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("query node do task failed")
}

View File

@ -311,8 +311,8 @@ func (qc *QueryCoord) watchNodeLoop() {
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
_, err := qc.cluster.getNodeByID(serverID)
if err != nil {
nodeExist := qc.cluster.hasNode(serverID)
if !nodeExist {
log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
continue
}

View File

@ -186,12 +186,7 @@ func TestWatchNodeLoop(t *testing.T) {
assert.Nil(t, err)
nodeID := queryNode1.queryNodeID
for {
_, err = queryCoord.cluster.getNodeByID(nodeID)
if err == nil {
break
}
}
waitQueryNodeOnline(queryCoord.cluster, nodeID)
queryCoord.Stop()
queryNode1.stop()
@ -208,15 +203,16 @@ func TestWatchNodeLoop(t *testing.T) {
assert.Nil(t, err)
nodeID := queryNode1.queryNodeID
waitQueryNodeOnline(queryCoord.cluster, nodeID)
nodes, err := queryCoord.cluster.onlineNodes()
assert.Nil(t, err)
queryNode1.stop()
err = removeNodeSession(nodeID)
assert.Nil(t, err)
for {
_, err = queryCoord.cluster.getNodeByID(nodeID)
if err != nil {
break
}
}
waitAllQueryNodeOffline(queryCoord.cluster, nodes)
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)

View File

@ -29,12 +29,14 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
)
// Node provides many interfaces to access querynode via grpc
type Node interface {
start() error
stop()
getNodeInfo() (Node, error)
clearNodeInfo() error
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
@ -80,6 +82,11 @@ type queryNode struct {
watchedQueryChannels map[UniqueID]*querypb.QueryChannelInfo
state nodeState
stateLock sync.RWMutex
totalMem uint64
memUsage uint64
memUsageRate float64
cpuUsage float64
}
func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error) {
@ -512,15 +519,18 @@ func (qn *queryNode) releasePartitions(ctx context.Context, in *querypb.ReleaseP
func (qn *queryNode) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
if !qn.isOnline() {
return nil, nil
return nil, fmt.Errorf("getSegmentInfo: queryNode %d is offline", qn.id)
}
res, err := qn.client.GetSegmentInfo(ctx, in)
if err == nil && res.Status.ErrorCode == commonpb.ErrorCode_Success {
return res, nil
if err != nil {
return nil, err
}
if res.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, errors.New(res.Status.Reason)
}
return nil, nil
return res, nil
}
func (qn *queryNode) getComponentInfo(ctx context.Context) *internalpb.ComponentInfo {
@ -592,6 +602,46 @@ func (qn *queryNode) releaseSegments(ctx context.Context, in *querypb.ReleaseSeg
return nil
}
func (qn *queryNode) getNodeInfo() (Node, error) {
qn.RLock()
defer qn.RUnlock()
if !qn.isOnline() {
return nil, errors.New("getNodeInfo: queryNode is offline")
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
if err != nil {
return nil, err
}
resp, err := qn.client.GetMetrics(qn.ctx, req)
if err != nil {
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return nil, errors.New(resp.Status.Reason)
}
infos := metricsinfo.QueryNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(resp.Response, &infos)
if err != nil {
return nil, err
}
qn.cpuUsage = infos.HardwareInfos.CPUCoreUsage
qn.totalMem = infos.HardwareInfos.Memory
qn.memUsage = infos.HardwareInfos.MemoryUsage
qn.memUsageRate = float64(qn.memUsage) / float64(qn.totalMem)
return &queryNode{
id: qn.id,
address: qn.address,
state: qn.state,
totalMem: qn.totalMem,
memUsage: qn.memUsage,
memUsageRate: qn.memUsageRate,
cpuUsage: qn.cpuUsage,
}, nil
}
//****************************************************//
func saveNodeCollectionInfo(collectionID UniqueID, info *querypb.CollectionInfo, nodeID int64, kv *etcdkv.EtcdKV) error {

View File

@ -22,6 +22,8 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -53,15 +55,11 @@ func removeAllSession() error {
}
func waitAllQueryNodeOffline(cluster Cluster, nodes map[int64]Node) bool {
reDoCount := 40
for {
if reDoCount <= 0 {
return false
}
allOffline := true
for nodeID := range nodes {
_, err := cluster.getNodeByID(nodeID)
if err == nil {
nodeExist := cluster.hasNode(nodeID)
if nodeExist {
allOffline = false
break
}
@ -71,7 +69,6 @@ func waitAllQueryNodeOffline(cluster Cluster, nodes map[int64]Node) bool {
}
log.Debug("wait all queryNode offline")
time.Sleep(100 * time.Millisecond)
reDoCount--
}
}
@ -127,8 +124,7 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
err = removeNodeSession(queryNode2.queryNodeID)
assert.Nil(t, err)
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
assert.Equal(t, allNodeOffline, true)
waitAllQueryNodeOffline(queryCoord.cluster, nodes)
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
@ -173,8 +169,7 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) {
err = removeNodeSession(queryNode3.queryNodeID)
assert.Nil(t, err)
allNodeOffline := waitAllQueryNodeOffline(queryCoord.cluster, nodes)
assert.Equal(t, allNodeOffline, true)
waitAllQueryNodeOffline(queryCoord.cluster, nodes)
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
@ -276,3 +271,88 @@ func TestSealedSegmentChangeAfterQueryNodeStop(t *testing.T) {
err = removeAllSession()
assert.Nil(t, err)
}
func TestGrpcRequestWithNodeOffline(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
assert.Nil(t, err)
nodeServer, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
address := nodeServer.queryNodeIP
nodeID := nodeServer.queryNodeID
node, err := newQueryNodeTest(baseCtx, address, nodeID, kv)
assert.Equal(t, false, node.isOnline())
t.Run("Test WatchDmChannels", func(t *testing.T) {
req := &querypb.WatchDmChannelsRequest{}
err = node.watchDmChannels(baseCtx, req)
assert.NotNil(t, err)
})
t.Run("Test AddQueryChannel", func(t *testing.T) {
req := &querypb.AddQueryChannelRequest{}
err = node.addQueryChannel(baseCtx, req)
assert.NotNil(t, err)
})
t.Run("Test RemoveQueryChannel", func(t *testing.T) {
req := &querypb.RemoveQueryChannelRequest{}
err = node.removeQueryChannel(baseCtx, req)
assert.Nil(t, err)
})
t.Run("Test ReleaseCollection", func(t *testing.T) {
req := &querypb.ReleaseCollectionRequest{}
err = node.releaseCollection(baseCtx, req)
assert.Nil(t, err)
})
t.Run("Test ReleasePartition", func(t *testing.T) {
req := &querypb.ReleasePartitionsRequest{}
err = node.releasePartitions(baseCtx, req)
assert.Nil(t, err)
})
t.Run("Test getSegmentInfo", func(t *testing.T) {
req := &querypb.GetSegmentInfoRequest{}
res, err := node.getSegmentInfo(baseCtx, req)
assert.NotNil(t, err)
assert.Nil(t, res)
})
t.Run("Test getComponentInfo", func(t *testing.T) {
res := node.getComponentInfo(baseCtx)
assert.Equal(t, internalpb.StateCode_Abnormal, res.StateCode)
})
t.Run("Test getMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
res, err := node.getMetrics(baseCtx, req)
assert.NotNil(t, err)
assert.Nil(t, res)
})
t.Run("Test LoadSegment", func(t *testing.T) {
req := &querypb.LoadSegmentsRequest{}
err = node.loadSegments(baseCtx, req)
assert.NotNil(t, err)
})
t.Run("Test ReleaseSegments", func(t *testing.T) {
req := &querypb.ReleaseSegmentsRequest{}
err = node.releaseSegments(baseCtx, req)
assert.NotNil(t, err)
})
t.Run("Test getNodeInfo", func(t *testing.T) {
node, err = node.getNodeInfo()
assert.NotNil(t, err)
assert.Nil(t, node)
})
cancel()
err = removeAllSession()
assert.Nil(t, err)
}

View File

@ -33,8 +33,10 @@ import (
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
@ -61,6 +63,8 @@ type ReplicaInterface interface {
getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error)
// getPKFieldIDsByCollectionID returns vector field ids of collection
getPKFieldIDByCollectionID(collectionID UniqueID) (FieldID, error)
// getSegmentInfosByColID return segments info by collectionID
getSegmentInfosByColID(collectionID UniqueID) ([]*querypb.SegmentInfo, error)
// partition
// addPartition adds a new partition to collection
@ -318,6 +322,35 @@ func (colReplica *collectionReplica) getFieldsByCollectionIDPrivate(collectionID
return collection.Schema().Fields, nil
}
func (colReplica *collectionReplica) getSegmentInfosByColID(collectionID UniqueID) ([]*querypb.SegmentInfo, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
segmentInfos := make([]*querypb.SegmentInfo, 0)
collection, ok := colReplica.collections[collectionID]
if !ok {
// collection not exist, so result segmentInfos is empty
return segmentInfos, nil
}
for _, partitionID := range collection.partitionIDs {
partition, ok := colReplica.partitions[partitionID]
if !ok {
return nil, errors.New("the meta of collection and partition are inconsistent in query node")
}
for _, segmentID := range partition.segmentIDs {
segment, ok := colReplica.segments[segmentID]
if !ok {
return nil, errors.New("the meta of partition and segment are inconsistent in query node")
}
segmentInfo := getSegmentInfo(segment)
segmentInfos = append(segmentInfos, segmentInfo)
}
}
return segmentInfos, nil
}
//----------------------------------------------------------------------------------------------------- partition
// addPartition adds a new partition to collection
func (colReplica *collectionReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error {
@ -646,3 +679,43 @@ func newCollectionReplica(etcdKv *etcdkv.EtcdKV) ReplicaInterface {
return replica
}
// trans segment to queryPb.segmentInfo
func getSegmentInfo(segment *Segment) *querypb.SegmentInfo {
var indexName string
var indexID int64
// TODO:: segment has multi vec column
for fieldID := range segment.indexInfos {
indexName = segment.getIndexName(fieldID)
indexID = segment.getIndexID(fieldID)
break
}
info := &querypb.SegmentInfo{
SegmentID: segment.ID(),
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
NodeID: Params.QueryNodeID,
MemSize: segment.getMemSize(),
NumRows: segment.getRowCount(),
IndexName: indexName,
IndexID: indexID,
ChannelID: segment.vChannelID,
State: getSegmentStateBySegmentType(segment.segmentType),
}
return info
}
// TODO: remove segmentType and use queryPb.SegmentState instead
func getSegmentStateBySegmentType(segType segmentType) commonpb.SegmentState {
switch segType {
case segmentTypeGrowing:
return commonpb.SegmentState_Growing
case segmentTypeSealed:
return commonpb.SegmentState_Sealed
// TODO: remove segmentTypeIndexing
case segmentTypeIndexing:
return commonpb.SegmentState_Sealed
default:
return commonpb.SegmentState_NotExist
}
}

View File

@ -223,6 +223,29 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_getSegmentInfosByColID(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
err := node.historical.replica.addSegment(UniqueID(1), defaultPartitionID, collectionID, "", segmentTypeGrowing, true)
assert.NoError(t, err)
err = node.historical.replica.addSegment(UniqueID(2), defaultPartitionID, collectionID, "", segmentTypeSealed, true)
assert.NoError(t, err)
err = node.historical.replica.addSegment(UniqueID(3), defaultPartitionID, collectionID, "", segmentTypeSealed, true)
assert.NoError(t, err)
segment, err := node.historical.replica.getSegmentByID(UniqueID(3))
assert.NoError(t, err)
segment.segmentType = segmentTypeIndexing
targetSeg, err := node.historical.replica.getSegmentInfosByColID(collectionID)
assert.NoError(t, err)
assert.Equal(t, 4, len(targetSeg))
err = node.Stop()
assert.NoError(t, err)
}
func TestCollectionReplica_hasSegment(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)

View File

@ -19,8 +19,6 @@ import (
"strconv"
"strings"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
@ -28,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -485,47 +484,12 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
return res, err
}
infos := make([]*queryPb.SegmentInfo, 0)
// TODO: remove segmentType and use queryPb.SegmentState instead
getSegmentStateBySegmentType := func(segType segmentType) commonpb.SegmentState {
switch segType {
case segmentTypeGrowing:
return commonpb.SegmentState_Growing
case segmentTypeSealed:
return commonpb.SegmentState_Sealed
// TODO: remove segmentTypeIndexing
case segmentTypeIndexing:
return commonpb.SegmentState_Sealed
default:
return commonpb.SegmentState_NotExist
}
}
getSegmentInfo := func(segment *Segment) *queryPb.SegmentInfo {
var indexName string
var indexID int64
// TODO:: segment has multi vec column
for fieldID := range segment.indexInfos {
indexName = segment.getIndexName(fieldID)
indexID = segment.getIndexID(fieldID)
break
}
info := &queryPb.SegmentInfo{
SegmentID: segment.ID(),
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
NodeID: Params.QueryNodeID,
MemSize: segment.getMemSize(),
NumRows: segment.getRowCount(),
IndexName: indexName,
IndexID: indexID,
ChannelID: segment.vChannelID,
State: getSegmentStateBySegmentType(segment.segmentType),
}
return info
}
// get info from historical
node.historical.replica.printReplica()
partitionIDs, err := node.historical.replica.getPartitionIDs(in.CollectionID)
historicalSegmentInfos, err := node.historical.replica.getSegmentInfosByColID(in.CollectionID)
if err != nil {
log.Debug("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -534,39 +498,13 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
}
return res, err
}
for _, partitionID := range partitionIDs {
segmentIDs, err := node.historical.replica.getSegmentIDs(partitionID)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
for _, id := range segmentIDs {
segment, err := node.historical.replica.getSegmentByID(id)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
info := getSegmentInfo(segment)
log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id), zap.Any("info", info))
infos = append(infos, info)
}
}
infos = append(infos, historicalSegmentInfos...)
// get info from streaming
node.streaming.replica.printReplica()
partitionIDs, err = node.streaming.replica.getPartitionIDs(in.CollectionID)
streamingSegmentInfos, err := node.streaming.replica.getSegmentInfosByColID(in.CollectionID)
if err != nil {
log.Debug("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -575,33 +513,8 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
}
return res, err
}
for _, partitionID := range partitionIDs {
segmentIDs, err := node.streaming.replica.getSegmentIDs(partitionID)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
for _, id := range segmentIDs {
segment, err := node.streaming.replica.getSegmentByID(id)
if err != nil {
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}
return res, err
}
info := getSegmentInfo(segment)
log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id), zap.Any("info", info))
infos = append(infos, info)
}
}
infos = append(infos, streamingSegmentInfos...)
log.Debug("GetSegmentInfo: get segment info from query node", zap.Int64("nodeID", node.session.ServerID), zap.Any("segment infos", infos))
return &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,

View File

@ -383,8 +383,8 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
}
rsp, err := node.GetSegmentInfo(ctx, req)
assert.Error(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
t.Run("test no collection in streaming", func(t *testing.T) {
@ -404,8 +404,8 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
}
rsp, err := node.GetSegmentInfo(ctx, req)
assert.Error(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
t.Run("test different segment type", func(t *testing.T) {