From ff4237bb90b309ff3be6fb13f21e5c08e1c740da Mon Sep 17 00:00:00 2001 From: chyezh Date: Fri, 15 Mar 2024 10:45:06 +0800 Subject: [PATCH] enhance: add hostname into node info (#30673) issue: https://github.com/milvus-io/milvus/issues/30647 - Address may be reused in k8s environment. Using hostname can be better. Signed-off-by: chyezh --- internal/querycoordv2/balance/balance_test.go | 12 +- .../balance/rowcount_based_balancer_test.go | 41 +++- .../balance/score_based_balancer_test.go | 35 +++- .../checkers/balance_checker_test.go | 46 ++++- .../checkers/channel_checker_test.go | 12 +- .../querycoordv2/checkers/controller_test.go | 12 +- .../checkers/index_checker_test.go | 36 +++- .../checkers/leader_checker_test.go | 6 +- .../checkers/segment_checker_test.go | 24 ++- internal/querycoordv2/checkers/util_test.go | 6 +- internal/querycoordv2/job/job_test.go | 19 +- .../meta/resource_manager_test.go | 186 +++++++++++++++--- .../observers/replica_observer_test.go | 24 ++- .../observers/resource_observer_test.go | 60 +++++- internal/querycoordv2/server.go | 16 +- internal/querycoordv2/server_test.go | 6 +- internal/querycoordv2/services_test.go | 120 +++++++++-- internal/querycoordv2/session/cluster_test.go | 6 +- internal/querycoordv2/session/node_manager.go | 30 +-- internal/querycoordv2/task/task_test.go | 6 +- internal/querycoordv2/utils/meta_test.go | 8 +- 21 files changed, 580 insertions(+), 131 deletions(-) diff --git a/internal/querycoordv2/balance/balance_test.go b/internal/querycoordv2/balance/balance_test.go index 4a9e8a8415..d49eb87ac4 100644 --- a/internal/querycoordv2/balance/balance_test.go +++ b/internal/querycoordv2/balance/balance_test.go @@ -85,7 +85,11 @@ func (suite *BalanceTestSuite) TestAssignBalance() { suite.Run(c.name, func() { suite.SetupTest() for i := range c.nodeIDs { - nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodeIDs[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.SetState(c.states[i]) suite.roundRobinBalancer.nodeManager.Add(nodeInfo) @@ -145,7 +149,11 @@ func (suite *BalanceTestSuite) TestAssignChannel() { suite.Run(c.name, func() { suite.SetupTest() for i := range c.nodeIDs { - nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodeIDs[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithChannelCnt(c.channelCnts[i])) nodeInfo.SetState(c.states[i]) suite.roundRobinBalancer.nodeManager.Add(nodeInfo) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 9f15de00c4..9f4cb73bb4 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -126,7 +126,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegment() { balancer.dist.SegmentDistManager.Update(node, s...) } for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.SetState(c.states[i]) suite.balancer.nodeManager.Add(nodeInfo) @@ -408,7 +412,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { balancer.dist.ChannelDistManager.Update(node, v...) } for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) nodeInfo.SetState(c.states[i]) @@ -615,7 +623,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { balancer.dist.ChannelDistManager.Update(node, v...) } for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) nodeInfo.SetState(c.states[i]) @@ -752,7 +764,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { balancer.dist.ChannelDistManager.Update(node, v...) } for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) nodeInfo.SetState(c.states[i]) @@ -849,7 +865,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { } for _, node := range lo.Keys(distributions) { - nodeInfo := session.NewNodeInfo(node, "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(20)) nodeInfo.SetState(session.NodeStateNormal) suite.balancer.nodeManager.Add(nodeInfo) @@ -977,7 +997,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() { balancer.dist.ChannelDistManager.Update(node, v...) } for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) nodeInfo.SetState(c.states[i]) @@ -1104,7 +1128,10 @@ func (suite *RowCountBasedBalancerTestSuite) TestMultiReplicaBalance() { // 3. set up nodes info and resourceManager for balancer for _, nodes := range c.replicaWithNodes { for i := range nodes { - nodeInfo := session.NewNodeInfo(nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodes[i], + Address: "127.0.0.1:0", + }) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]]))) nodeInfo.SetState(c.states[i]) suite.balancer.nodeManager.Add(nodeInfo) diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 4e9ccfee5f..48545e9755 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -222,7 +222,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { balancer.dist.SegmentDistManager.Update(node, s...) } for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.SetState(c.states[i]) suite.balancer.nodeManager.Add(nodeInfo) @@ -253,7 +257,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { } for _, node := range lo.Keys(distributions) { - nodeInfo := session.NewNodeInfo(node, "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithSegmentCnt(20)) nodeInfo.SetState(session.NodeStateNormal) suite.balancer.nodeManager.Add(nodeInfo) @@ -365,7 +373,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { // 3. set up nodes info and resourceManager for balancer for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) nodeInfo.SetState(c.states[i]) suite.balancer.nodeManager.Add(nodeInfo) @@ -474,7 +486,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { // 3. set up nodes info and resourceManager for balancer for i := range balanceCase.nodes { - nodeInfo := session.NewNodeInfo(balanceCase.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: balanceCase.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.SetState(balanceCase.states[i]) suite.balancer.nodeManager.Add(nodeInfo) suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, balanceCase.nodes[i]) @@ -618,7 +634,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { // 3. set up nodes info and resourceManager for balancer for i := range c.nodes { - nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) nodeInfo.SetState(c.states[i]) suite.balancer.nodeManager.Add(nodeInfo) @@ -734,7 +754,10 @@ func (suite *ScoreBasedBalancerTestSuite) TestMultiReplicaBalance() { // 3. set up nodes info and resourceManager for balancer for _, nodes := range c.replicaWithNodes { for i := range nodes { - nodeInfo := session.NewNodeInfo(nodes[i], "127.0.0.1:0") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodes[i], + Address: "127.0.0.1:0", + }) nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]]))) nodeInfo.SetState(c.states[i]) suite.balancer.nodeManager.Add(nodeInfo) diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index db485c1f39..249c20b894 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -88,8 +88,16 @@ func (suite *BalanceCheckerTestSuite) TearDownTest() { func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() { // set up nodes info nodeID1, nodeID2 := 1, 2 - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID1), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID2), + Address: "localhost", + Hostname: "localhost", + })) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) @@ -157,8 +165,16 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() { func (suite *BalanceCheckerTestSuite) TestBusyScheduler() { // set up nodes info nodeID1, nodeID2 := 1, 2 - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID1), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID2), + Address: "localhost", + Hostname: "localhost", + })) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) @@ -212,8 +228,16 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() { func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { // set up nodes info, stopping node1 nodeID1, nodeID2 := 1, 2 - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID1), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID2), + Address: "localhost", + Hostname: "localhost", + })) suite.nodeMgr.Stopping(int64(nodeID1)) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) @@ -276,8 +300,14 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { // set up nodes info, stopping node1 nodeID1, nodeID2 := 1, 2 - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID1), + Address: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID2), + Address: "localhost", + })) suite.nodeMgr.Stopping(int64(nodeID1)) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index c067155db6..107059a29c 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -88,7 +88,11 @@ func (suite *ChannelCheckerTestSuite) TearDownTest() { func (suite *ChannelCheckerTestSuite) setNodeAvailable(nodes ...int64) { for _, node := range nodes { - nodeInfo := session.NewNodeInfo(node, "") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "", + Hostname: "localhost", + }) nodeInfo.SetLastHeartbeat(time.Now()) suite.nodeMgr.Add(nodeInfo) } @@ -117,7 +121,11 @@ func (suite *ChannelCheckerTestSuite) TestLoadChannel() { checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) channels := []*datapb.VchannelInfo{ diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 10bea36a75..c3d41c1bd5 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -89,8 +89,16 @@ func (suite *CheckerControllerSuite) TestBasic() { suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) suite.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) diff --git a/internal/querycoordv2/checkers/index_checker_test.go b/internal/querycoordv2/checkers/index_checker_test.go index 19bf8f9a0d..8ad379c219 100644 --- a/internal/querycoordv2/checkers/index_checker_test.go +++ b/internal/querycoordv2/checkers/index_checker_test.go @@ -87,8 +87,16 @@ func (suite *IndexCheckerSuite) TestLoadIndex() { coll.FieldIndexID = map[int64]int64{101: 1000} checker.meta.CollectionManager.PutCollection(coll) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) @@ -133,8 +141,16 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() { coll.FieldIndexID = map[int64]int64{101: 1000} checker.meta.CollectionManager.PutCollection(coll) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) @@ -178,8 +194,16 @@ func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() { coll.FieldIndexID = map[int64]int64{101: 1000} checker.meta.CollectionManager.PutCollection(coll) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index d8c5794492..cfdd27865b 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -195,7 +195,11 @@ func (suite *LeaderCheckerTestSuite) TestStoppingNode() { view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) observer.dist.LeaderViewManager.Update(2, view) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) suite.nodeMgr.Stopping(2) tasks := suite.checker.Check(context.TODO()) diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 21bd7e458a..28582d6312 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -109,8 +109,16 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) @@ -166,8 +174,16 @@ func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) diff --git a/internal/querycoordv2/checkers/util_test.go b/internal/querycoordv2/checkers/util_test.go index 245148920b..d68a432a52 100644 --- a/internal/querycoordv2/checkers/util_test.go +++ b/internal/querycoordv2/checkers/util_test.go @@ -39,7 +39,11 @@ func (suite *UtilTestSuite) SetupTest() { func (suite *UtilTestSuite) setNodeAvailable(nodes ...int64) { for _, node := range nodes { - nodeInfo := session.NewNodeInfo(node, "") + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "", + Hostname: "localhost", + }) nodeInfo.SetLastHeartbeat(time.Now()) suite.nodeMgr.Add(nodeInfo) } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index d7e8e1af88..cd81e38e50 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -170,10 +170,21 @@ func (suite *JobSuite) SetupTest() { suite.scheduler.Start() meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() - suite.nodeMgr.Add(session.NewNodeInfo(1000, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2000, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(3000, "localhost")) - + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1000, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2000, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3000, + Address: "localhost", + Hostname: "localhost", + })) err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 1000) suite.NoError(err) err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 2000) diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 00a8fde88e..0c8828237d 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -86,7 +86,11 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() { } func (suite *ResourceManagerSuite) TestManipulateNode() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) err := suite.manager.AddResourceGroup("rg1") suite.NoError(err) // test add node to rg @@ -132,10 +136,26 @@ func (suite *ResourceManagerSuite) TestManipulateNode() { _, err = suite.manager.TransferNode("rg1", "rg2", 5) suite.ErrorIs(err, ErrNodeNotEnough) - suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(12, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(13, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(14, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 11, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 12, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 13, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 14, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) suite.manager.AssignNode("rg1", 11) suite.manager.AssignNode("rg1", 12) suite.manager.AssignNode("rg1", 13) @@ -153,11 +173,31 @@ func (suite *ResourceManagerSuite) TestManipulateNode() { } func (suite *ResourceManagerSuite) TestHandleNodeUp() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(100, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(101, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 100, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 101, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) err := suite.manager.AddResourceGroup("rg1") suite.NoError(err) @@ -193,10 +233,26 @@ func (suite *ResourceManagerSuite) TestHandleNodeUp() { } func (suite *ResourceManagerSuite) TestRecover() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(4, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 4, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) err := suite.manager.AddResourceGroup("rg1") suite.NoError(err) err = suite.manager.AddResourceGroup("rg2") @@ -236,9 +292,21 @@ func (suite *ResourceManagerSuite) TestRecover() { } func (suite *ResourceManagerSuite) TestCheckOutboundNodes() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) err := suite.manager.AddResourceGroup("rg") suite.NoError(err) suite.manager.AssignNode("rg", 1) @@ -261,9 +329,21 @@ func (suite *ResourceManagerSuite) TestCheckOutboundNodes() { } func (suite *ResourceManagerSuite) TestCheckResourceGroup() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) err := suite.manager.AddResourceGroup("rg") suite.NoError(err) suite.manager.AssignNode("rg", 1) @@ -285,9 +365,21 @@ func (suite *ResourceManagerSuite) TestCheckResourceGroup() { } func (suite *ResourceManagerSuite) TestGetOutboundNode() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) suite.manager.AddResourceGroup("rg") suite.manager.AddResourceGroup("rg1") suite.manager.AssignNode("rg", 1) @@ -312,9 +404,21 @@ func (suite *ResourceManagerSuite) TestGetOutboundNode() { } func (suite *ResourceManagerSuite) TestAutoRecover() { - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) err := suite.manager.AddResourceGroup("rg") suite.NoError(err) suite.manager.AssignNode(DefaultResourceGroupName, 1) @@ -337,7 +441,11 @@ func (suite *ResourceManagerSuite) TestAutoRecover() { nodes, _ = suite.manager.GetNodes(DefaultResourceGroupName) suite.Len(nodes, 0) - suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) suite.manager.HandleNodeUp(1) suite.manager.AutoRecoverResourceGroup("rg") nodes, _ = suite.manager.GetNodes("rg") @@ -348,7 +456,11 @@ func (suite *ResourceManagerSuite) TestAutoRecover() { func (suite *ResourceManagerSuite) TestDefaultResourceGroup() { for i := 0; i < 10; i++ { - suite.manager.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "127.0.0.1:0", + Hostname: "localhost", + })) } defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName) suite.NoError(err) @@ -387,9 +499,21 @@ func (suite *ResourceManagerSuite) TestStoreFailed() { nodeMgr := session.NewNodeManager() manager := NewResourceManager(store, nodeMgr) - nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - nodeMgr.Add(session.NewNodeInfo(3, "localhost")) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "127.0.0.1:0", + Hostname: "localhost", + })) storeErr := errors.New("store error") store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(storeErr) store.EXPECT().RemoveResourceGroup(mock.Anything).Return(storeErr) diff --git a/internal/querycoordv2/observers/replica_observer_test.go b/internal/querycoordv2/observers/replica_observer_test.go index 1efcc0597b..102fe01cae 100644 --- a/internal/querycoordv2/observers/replica_observer_test.go +++ b/internal/querycoordv2/observers/replica_observer_test.go @@ -84,10 +84,26 @@ func (suite *ReplicaObserverSuite) SetupTest() { func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() { suite.meta.ResourceManager.AddResourceGroup("rg1") suite.meta.ResourceManager.AddResourceGroup("rg2") - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost:8080")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost:8080")) - suite.nodeMgr.Add(session.NewNodeInfo(3, "localhost:8080")) - suite.nodeMgr.Add(session.NewNodeInfo(4, "localhost:8080")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost:8080", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost:8080", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Address: "localhost:8080", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 4, + Address: "localhost:8080", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg1", 1) suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) diff --git a/internal/querycoordv2/observers/resource_observer_test.go b/internal/querycoordv2/observers/resource_observer_test.go index 7565c06e2a..101574b3c8 100644 --- a/internal/querycoordv2/observers/resource_observer_test.go +++ b/internal/querycoordv2/observers/resource_observer_test.go @@ -80,7 +80,11 @@ func (suite *ResourceObserverSuite) SetupTest() { suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil) for i := 0; i < 10; i++ { - suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i)) } } @@ -110,10 +114,26 @@ func (suite *ResourceObserverSuite) TestCheckNodesInReplica() { typeutil.NewUniqueSet(), )) suite.meta.ResourceManager.AddResourceGroup("rg") - suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(100), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(101), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(102), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(103), + Address: "localhost", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode("rg", 100) suite.meta.ResourceManager.AssignNode("rg", 101) suite.meta.ResourceManager.AssignNode("rg", 102) @@ -139,7 +159,11 @@ func (suite *ResourceObserverSuite) TestCheckNodesInReplica() { func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() { suite.meta.ResourceManager.AddResourceGroup("rg") for i := 100; i < 200; i++ { - suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode("rg", int64(i)) suite.meta.ResourceManager.HandleNodeDown(int64(i)) } @@ -177,10 +201,26 @@ func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() { suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error")) suite.meta.ResourceManager.AddResourceGroup("rg") - suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(100), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(101), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(102), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(103), + Address: "localhost", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode("rg", 100) suite.meta.ResourceManager.AssignNode("rg", 101) suite.meta.ResourceManager.AssignNode("rg", 102) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index f843fb294e..a548c17dca 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -397,9 +397,12 @@ func (s *Server) startQueryCoord() error { return err } for _, node := range sessions { - n := session.NewNodeInfo(node.ServerID, node.Address) - n.SetVersion(node.Version) - s.nodeMgr.Add(n) + s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node.ServerID, + Address: node.Address, + Hostname: node.HostName, + Version: node.Version, + })) s.taskScheduler.AddExecutor(node.ServerID) if node.Stopping { @@ -613,7 +616,12 @@ func (s *Server) watchNodes(revision int64) { zap.Int64("nodeID", nodeID), zap.String("nodeAddr", addr), ) - s.nodeMgr.Add(session.NewNodeInfo(nodeID, addr)) + s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodeID, + Address: addr, + Hostname: event.Session.HostName, + Version: event.Session.Version, + })) s.nodeUpEventChan <- nodeID select { case s.notifyNodeUp <- struct{}{}: diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 0685514992..ca4ce29c30 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -209,7 +209,11 @@ func (suite *ServerSuite) TestNodeUp() { }, 5*time.Second, time.Second) // mock unhealthy node - suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost")) + suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1001, + Address: "localhost", + Hostname: "localhost", + })) node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 101) node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Success()}, nil).Maybe() diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 67e7bb3c8d..35283f4f87 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -153,7 +153,11 @@ func (suite *ServiceSuite) SetupTest() { ) suite.targetObserver.Start() for _, node := range suite.nodes { - suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "localhost", + Hostname: "localhost", + })) err := suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, node) suite.NoError(err) } @@ -377,10 +381,26 @@ func (suite *ServiceSuite) TestResourceGroup() { suite.Equal(commonpb.ErrorCode_Success, resp1.GetStatus().GetErrorCode()) suite.Len(resp1.ResourceGroups, 2) - server.nodeMgr.Add(session.NewNodeInfo(1011, "localhost")) - server.nodeMgr.Add(session.NewNodeInfo(1012, "localhost")) - server.nodeMgr.Add(session.NewNodeInfo(1013, "localhost")) - server.nodeMgr.Add(session.NewNodeInfo(1014, "localhost")) + server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1011, + Address: "localhost", + Hostname: "localhost", + })) + server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1012, + Address: "localhost", + Hostname: "localhost", + })) + server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1013, + Address: "localhost", + Hostname: "localhost", + })) + server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1014, + Address: "localhost", + Hostname: "localhost", + })) server.meta.ResourceManager.AddResourceGroup("rg11") server.meta.ResourceManager.AssignNode("rg11", 1011) server.meta.ResourceManager.AssignNode("rg11", 1012) @@ -549,10 +569,26 @@ func (suite *ServiceSuite) TestTransferNode() { suite.NoError(err) err = server.meta.ResourceManager.AddResourceGroup("rg4") suite.NoError(err) - suite.nodeMgr.Add(session.NewNodeInfo(11, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(12, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(13, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(14, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 11, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 12, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 13, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 14, + Address: "localhost", + Hostname: "localhost", + })) suite.meta.ResourceManager.AssignNode("rg3", 11) suite.meta.ResourceManager.AssignNode("rg3", 12) suite.meta.ResourceManager.AssignNode("rg3", 13) @@ -661,11 +697,31 @@ func (suite *ServiceSuite) TestTransferReplica() { ResourceGroup: meta.DefaultResourceGroupName, }, typeutil.NewUniqueSet(3))) - suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost")) - suite.server.nodeMgr.Add(session.NewNodeInfo(1002, "localhost")) - suite.server.nodeMgr.Add(session.NewNodeInfo(1003, "localhost")) - suite.server.nodeMgr.Add(session.NewNodeInfo(1004, "localhost")) - suite.server.nodeMgr.Add(session.NewNodeInfo(1005, "localhost")) + suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1001, + Address: "localhost", + Hostname: "localhost", + })) + suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1002, + Address: "localhost", + Hostname: "localhost", + })) + suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1003, + Address: "localhost", + Hostname: "localhost", + })) + suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1004, + Address: "localhost", + Hostname: "localhost", + })) + suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1005, + Address: "localhost", + Hostname: "localhost", + })) suite.server.meta.AssignNode("rg1", 1001) suite.server.meta.AssignNode("rg2", 1002) suite.server.meta.AssignNode("rg3", 1003) @@ -1144,8 +1200,16 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() { } } } - suite.nodeMgr.Add(session.NewNodeInfo(1001, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(1002, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1001, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1002, + Address: "localhost", + Hostname: "localhost", + })) defer func() { for _, collection := range suite.collections { replicas := suite.meta.ReplicaManager.GetByCollection(collection) @@ -1280,7 +1344,11 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) - suite.nodeMgr.Add(session.NewNodeInfo(10, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 10, + Address: "localhost", + Hostname: "localhost", + })) suite.nodeMgr.Stopping(10) resp, err = server.LoadBalance(ctx, req) suite.NoError(err) @@ -1518,7 +1586,11 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode()) for _, node := range suite.nodes { - suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "localhost", + Hostname: "localhost", + })) } // Segment not fully loaded @@ -1573,7 +1645,11 @@ func (suite *ServiceSuite) TestHandleNodeUp() { suite.taskScheduler.EXPECT().AddExecutor(mock.Anything) suite.distController.EXPECT().StartDistInstance(mock.Anything, mock.Anything) - suite.nodeMgr.Add(session.NewNodeInfo(111, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 111, + Address: "localhost", + Hostname: "localhost", + })) server.handleNodeUp(111) nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes() suite.Len(nodes, 1) @@ -1582,7 +1658,11 @@ func (suite *ServiceSuite) TestHandleNodeUp() { // when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp suite.server.meta.ResourceManager.AddResourceGroup("rg") - suite.nodeMgr.Add(session.NewNodeInfo(222, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 222, + Address: "localhost", + Hostname: "localhost", + })) server.handleNodeUp(222) nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes() suite.Len(nodes, 2) diff --git a/internal/querycoordv2/session/cluster_test.go b/internal/querycoordv2/session/cluster_test.go index 4720a2db58..b10d1af7c3 100644 --- a/internal/querycoordv2/session/cluster_test.go +++ b/internal/querycoordv2/session/cluster_test.go @@ -94,7 +94,11 @@ func (suite *ClusterTestSuite) setupServers() { func (suite *ClusterTestSuite) setupCluster() { suite.nodeManager = NewNodeManager() for i, lis := range suite.listeners { - node := NewNodeInfo(int64(i), lis.Addr().String()) + node := NewNodeInfo(ImmutableNodeInfo{ + NodeID: int64(i), + Address: lis.Addr().String(), + Hostname: "localhost", + }) suite.nodeManager.Add(node) } suite.cluster = NewCluster(suite.nodeManager, DefaultQueryNodeCreator) diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index 3cd8b43552..744ff9ed08 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -102,22 +102,31 @@ const ( NodeStateStopping ) +type ImmutableNodeInfo struct { + NodeID int64 + Address string + Hostname string + Version semver.Version +} + type NodeInfo struct { stats mu sync.RWMutex - id int64 - addr string + immutableInfo ImmutableNodeInfo state State lastHeartbeat *atomic.Int64 - version semver.Version } func (n *NodeInfo) ID() int64 { - return n.id + return n.immutableInfo.NodeID } func (n *NodeInfo) Addr() string { - return n.addr + return n.immutableInfo.Address +} + +func (n *NodeInfo) Hostname() string { + return n.immutableInfo.Hostname } func (n *NodeInfo) SegmentCnt() int { @@ -160,19 +169,14 @@ func (n *NodeInfo) UpdateStats(opts ...StatsOption) { n.mu.Unlock() } -func (n *NodeInfo) SetVersion(v semver.Version) { - n.version = v -} - func (n *NodeInfo) Version() semver.Version { - return n.version + return n.immutableInfo.Version } -func NewNodeInfo(id int64, addr string) *NodeInfo { +func NewNodeInfo(info ImmutableNodeInfo) *NodeInfo { return &NodeInfo{ stats: newStats(), - id: id, - addr: addr, + immutableInfo: info, lastHeartbeat: atomic.NewInt64(0), } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index dc72855e2f..2c3a9c56e4 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -156,7 +156,11 @@ func (suite *TaskSuite) SetupTest() { func (suite *TaskSuite) BeforeTest(suiteName, testName string) { for node := range suite.distributions { - suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: node, + Address: "localhost", + Hostname: "localhost", + })) } switch testName { diff --git a/internal/querycoordv2/utils/meta_test.go b/internal/querycoordv2/utils/meta_test.go index 23b6eced87..ffc095acc8 100644 --- a/internal/querycoordv2/utils/meta_test.go +++ b/internal/querycoordv2/utils/meta_test.go @@ -54,10 +54,12 @@ func TestSpawnReplicasWithRG(t *testing.T) { m.ResourceManager.AddResourceGroup("rg1") m.ResourceManager.AddResourceGroup("rg2") m.ResourceManager.AddResourceGroup("rg3") - for i := 1; i < 10; i++ { - nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost")) - + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + })) if i%3 == 0 { m.ResourceManager.AssignNode("rg1", int64(i)) }