mirror of https://github.com/milvus-io/milvus.git
fix: get replica should not report error when no querynode serve (#32536)
issue: #30647 - Remove error report if there's no query node serve. It's hard for programer to use it to do resource management. - Change resource group `transferNode` logic to keep compatible with old version sdk. --------- Signed-off-by: chyezh <chyezh@outlook.com>pull/32623/head
parent
0ff7a46e95
commit
f06509bf97
|
@ -18,7 +18,6 @@ package querycoordv2
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -35,7 +34,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -373,7 +371,7 @@ func (s *Server) tryGetNodesMetrics(ctx context.Context, req *milvuspb.GetMetric
|
|||
return ret
|
||||
}
|
||||
|
||||
func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*milvuspb.ReplicaInfo, error) {
|
||||
func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) *milvuspb.ReplicaInfo {
|
||||
info := &milvuspb.ReplicaInfo{
|
||||
ReplicaID: replica.GetID(),
|
||||
CollectionID: replica.GetCollectionID(),
|
||||
|
@ -384,10 +382,11 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
|
|||
|
||||
channels := s.targetMgr.GetDmChannelsByCollection(replica.GetCollectionID(), meta.CurrentTarget)
|
||||
if len(channels) == 0 {
|
||||
msg := "failed to get channels, collection not loaded"
|
||||
log.Warn(msg)
|
||||
return nil, merr.WrapErrCollectionNotFound(replica.GetCollectionID(), msg)
|
||||
log.Warn("failed to get channels, collection may be not loaded or in recovering", zap.Int64("collectionID", replica.GetCollectionID()))
|
||||
return info
|
||||
}
|
||||
shardReplicas := make([]*milvuspb.ShardReplica, 0, len(channels))
|
||||
|
||||
var segments []*meta.Segment
|
||||
if withShardNodes {
|
||||
segments = s.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()))
|
||||
|
@ -400,9 +399,11 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
|
|||
leaderInfo = s.nodeMgr.Get(leader)
|
||||
}
|
||||
if leaderInfo == nil {
|
||||
msg := fmt.Sprintf("failed to get shard leader for shard %s", channel)
|
||||
log.Warn(msg)
|
||||
return nil, merr.WrapErrNodeNotFound(leader, msg)
|
||||
log.Warn("failed to get shard leader for shard",
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replica", replica.GetID()),
|
||||
zap.String("shard", channel.GetChannelName()))
|
||||
return info
|
||||
}
|
||||
|
||||
shard := &milvuspb.ShardReplica{
|
||||
|
@ -420,9 +421,10 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
|
|||
})
|
||||
shard.NodeIds = typeutil.NewUniqueSet(shardNodes...).Collect()
|
||||
}
|
||||
info.ShardReplicas = append(info.ShardReplicas, shard)
|
||||
shardReplicas = append(shardReplicas, shard)
|
||||
}
|
||||
return info, nil
|
||||
info.ShardReplicas = shardReplicas
|
||||
return info
|
||||
}
|
||||
|
||||
func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView {
|
||||
|
|
|
@ -258,15 +258,18 @@ func (rm *ResourceManager) TransferNode(sourceRGName string, targetRGName string
|
|||
if sourceCfg.Requests.NodeNum < 0 {
|
||||
sourceCfg.Requests.NodeNum = 0
|
||||
}
|
||||
// Special case for compatibility with old version.
|
||||
if sourceRGName != DefaultResourceGroupName {
|
||||
sourceCfg.Limits.NodeNum -= int32(nodeNum)
|
||||
if sourceCfg.Limits.NodeNum < 0 {
|
||||
sourceCfg.Limits.NodeNum = 0
|
||||
}
|
||||
}
|
||||
|
||||
targetCfg.Requests.NodeNum += int32(nodeNum)
|
||||
if targetCfg.Requests.NodeNum > targetCfg.Limits.NodeNum {
|
||||
targetCfg.Limits.NodeNum = targetCfg.Requests.NodeNum
|
||||
}
|
||||
// transfer node from source resource group to target resource group at high priority.
|
||||
targetCfg.TransferFrom = append(targetCfg.TransferFrom, &rgpb.ResourceGroupTransfer{
|
||||
ResourceGroup: sourceRGName,
|
||||
})
|
||||
|
||||
return rm.updateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
|
||||
sourceRGName: sourceCfg,
|
||||
targetRGName: targetCfg,
|
||||
|
|
|
@ -537,6 +537,23 @@ func (suite *ResourceManagerSuite) TestAutoRecover() {
|
|||
}
|
||||
|
||||
func (suite *ResourceManagerSuite) testTransferNode() {
|
||||
// Test redundant nodes recover to default resource group.
|
||||
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
|
||||
DefaultResourceGroupName: newResourceGroupConfig(40, 40),
|
||||
"rg3": newResourceGroupConfig(0, 0),
|
||||
"rg2": newResourceGroupConfig(40, 40),
|
||||
"rg1": newResourceGroupConfig(20, 20),
|
||||
})
|
||||
suite.manager.AutoRecoverResourceGroup("rg1")
|
||||
suite.manager.AutoRecoverResourceGroup("rg2")
|
||||
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
|
||||
suite.manager.AutoRecoverResourceGroup("rg3")
|
||||
|
||||
suite.Equal(20, suite.manager.GetResourceGroup("rg1").NodeNum())
|
||||
suite.Equal(40, suite.manager.GetResourceGroup("rg2").NodeNum())
|
||||
suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum())
|
||||
suite.Equal(40, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
|
||||
|
||||
// Test TransferNode.
|
||||
// param error.
|
||||
err := suite.manager.TransferNode("rg1", "rg1", 1)
|
||||
|
|
|
@ -842,33 +842,13 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
|
|||
|
||||
replicas := s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
|
||||
if len(replicas) == 0 {
|
||||
err := merr.WrapErrReplicaNotFound(req.GetCollectionID(), "failed to get replicas by collection")
|
||||
msg := "failed to get replicas, collection not loaded"
|
||||
log.Warn(msg)
|
||||
resp.Status = merr.Status(err)
|
||||
return resp, nil
|
||||
return &milvuspb.GetReplicasResponse{
|
||||
Replicas: make([]*milvuspb.ReplicaInfo, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
for _, replica := range replicas {
|
||||
msg := "failed to get replica info"
|
||||
if len(replica.GetNodes()) == 0 {
|
||||
err := merr.WrapErrReplicaNotAvailable(replica.GetID(), "no available nodes in replica")
|
||||
log.Warn(msg,
|
||||
zap.Int64("replica", replica.GetID()),
|
||||
zap.Error(err))
|
||||
resp.Status = merr.Status(err)
|
||||
break
|
||||
}
|
||||
|
||||
info, err := s.fillReplicaInfo(replica, req.GetWithShardNodes())
|
||||
if err != nil {
|
||||
log.Warn(msg,
|
||||
zap.Int64("replica", replica.GetID()),
|
||||
zap.Error(err))
|
||||
resp.Status = merr.Status(err)
|
||||
break
|
||||
}
|
||||
resp.Replicas = append(resp.Replicas, info)
|
||||
resp.Replicas = append(resp.Replicas, s.fillReplicaInfo(replica, req.GetWithShardNodes()))
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1569,7 +1569,7 @@ func (suite *ServiceSuite) TestGetReplicas() {
|
|||
suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady))
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestGetReplicasFailed() {
|
||||
func (suite *ServiceSuite) TestGetReplicasWhenNoAvailableNodes() {
|
||||
suite.loadAll()
|
||||
ctx := context.Background()
|
||||
server := suite.server
|
||||
|
@ -1588,7 +1588,7 @@ func (suite *ServiceSuite) TestGetReplicasFailed() {
|
|||
}
|
||||
resp, err := server.GetReplicas(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrReplicaNotAvailable)
|
||||
suite.True(merr.Ok(resp.GetStatus()))
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestCheckHealth() {
|
||||
|
|
Loading…
Reference in New Issue