Reduce queryNodeCluster range prevent blocking online/offline events (#13249)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/13306/head
congqixia 2021-12-13 21:51:10 +08:00 committed by GitHub
parent 973b98c721
commit 835e0985fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 92 additions and 57 deletions

View File

@ -459,49 +459,71 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
}
func (c *queryNodeCluster) getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) {
c.RLock()
defer c.RUnlock()
segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID)
if err != nil {
return nil, err
}
if node, ok := c.nodes[segmentInfo.NodeID]; ok {
res, err := node.getSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: segmentInfo.CollectionID,
})
if err != nil {
return nil, err
}
if res != nil {
for _, info := range res.Infos {
if info.SegmentID == segmentID {
return info, nil
}
}
}
return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on query node %d", segmentID, segmentInfo.NodeID)
c.RLock()
targetNode, ok := c.nodes[segmentInfo.NodeID]
c.RUnlock()
if !ok {
return nil, fmt.Errorf("updateSegmentInfo: can't find query node by nodeID, nodeID = %d", segmentInfo.NodeID)
}
return nil, fmt.Errorf("updateSegmentInfo: can't find query node by nodeID, nodeID = %d", segmentInfo.NodeID)
res, err := targetNode.getSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
},
CollectionID: segmentInfo.CollectionID,
})
if err != nil {
return nil, err
}
// protobuf convention, it's ok to call GetXXX on nil
for _, info := range res.GetInfos() {
if info.GetSegmentID() == segmentID {
return info, nil
}
}
return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on query node %d", segmentID, segmentInfo.NodeID)
}
func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.RLock()
defer c.RUnlock()
segmentInfos := make([]*querypb.SegmentInfo, 0)
type respTuple struct {
res *querypb.GetSegmentInfoResponse
err error
}
c.RLock()
var wg sync.WaitGroup
cnt := len(c.nodes)
resChan := make(chan respTuple, cnt)
wg.Add(cnt)
for _, node := range c.nodes {
res, err := node.getSegmentInfo(ctx, in)
if err != nil {
return nil, err
}
if res != nil {
segmentInfos = append(segmentInfos, res.Infos...)
go func(node Node) {
defer wg.Done()
res, err := node.getSegmentInfo(ctx, in)
resChan <- respTuple{
res: res,
err: err,
}
}(node)
}
c.RUnlock()
wg.Wait()
close(resChan)
var segmentInfos []*querypb.SegmentInfo
for tuple := range resChan {
if tuple.err != nil {
return nil, tuple.err
}
segmentInfos = append(segmentInfos, tuple.res.GetInfos()...)
}
//TODO::update meta
@ -510,17 +532,17 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe
func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.RLock()
defer c.RUnlock()
node, ok := c.nodes[nodeID]
c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
res, err := node.getSegmentInfo(ctx, in)
if err != nil {
return nil, err
}
return res.Infos, nil
if !ok {
return nil, fmt.Errorf("getSegmentInfoByNode: can't find query node by nodeID, nodeID = %d", nodeID)
}
return nil, fmt.Errorf("getSegmentInfoByNode: can't find query node by nodeID, nodeID = %d", nodeID)
res, err := node.getSegmentInfo(ctx, in)
if err != nil {
return nil, err
}
return res.GetInfos(), nil
}
type queryNodeGetMetricsResponse struct {
@ -530,15 +552,28 @@ type queryNodeGetMetricsResponse struct {
func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
c.RLock()
defer c.RUnlock()
ret := make([]queryNodeGetMetricsResponse, 0, len(c.nodes))
var wg sync.WaitGroup
cnt := len(c.nodes)
wg.Add(cnt)
respChan := make(chan queryNodeGetMetricsResponse, cnt)
for _, node := range c.nodes {
resp, err := node.getMetrics(ctx, in)
ret = append(ret, queryNodeGetMetricsResponse{
resp: resp,
err: err,
})
go func(node Node) {
defer wg.Done()
resp, err := node.getMetrics(ctx, in)
respChan <- queryNodeGetMetricsResponse{
resp: resp,
err: err,
}
}(node)
}
c.RUnlock()
wg.Wait()
close(respChan)
ret := make([]queryNodeGetMetricsResponse, 0, cnt)
for res := range respChan {
ret = append(ret, res)
}
return ret
@ -620,17 +655,17 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) {
c.RLock()
defer c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
nodeInfo, err := node.getNodeInfo()
if err != nil {
return nil, err
}
return nodeInfo, nil
node, ok := c.nodes[nodeID]
c.RUnlock()
if !ok {
return nil, fmt.Errorf("getNodeInfoByID: query node %d not exist", nodeID)
}
return nil, fmt.Errorf("getNodeInfoByID: query node %d not exist", nodeID)
nodeInfo, err := node.getNodeInfo()
if err != nil {
return nil, err
}
return nodeInfo, nil
}
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {