enhance: add hostname into node info (#30673)

issue: https://github.com/milvus-io/milvus/issues/30647

- Address may be reused in k8s environment. Using hostname can be
better.

Signed-off-by: chyezh <chyezh@outlook.com>
pull/31290/head
chyezh 2024-03-15 10:45:06 +08:00 committed by GitHub
parent db79be3ae0
commit ff4237bb90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 580 additions and 131 deletions

View File

@ -85,7 +85,11 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
suite.Run(c.name, func() {
suite.SetupTest()
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodeIDs[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
@ -145,7 +149,11 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
suite.Run(c.name, func() {
suite.SetupTest()
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodeIDs[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(c.channelCnts[i]))
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)

View File

@ -126,7 +126,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegment() {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
@ -408,7 +412,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
balancer.dist.ChannelDistManager.Update(node, v...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
@ -615,7 +623,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
balancer.dist.ChannelDistManager.Update(node, v...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
@ -752,7 +764,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
balancer.dist.ChannelDistManager.Update(node, v...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
@ -849,7 +865,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {
}
for _, node := range lo.Keys(distributions) {
nodeInfo := session.NewNodeInfo(node, "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(20))
nodeInfo.SetState(session.NodeStateNormal)
suite.balancer.nodeManager.Add(nodeInfo)
@ -977,7 +997,11 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
balancer.dist.ChannelDistManager.Update(node, v...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
@ -1104,7 +1128,10 @@ func (suite *RowCountBasedBalancerTestSuite) TestMultiReplicaBalance() {
// 3. set up nodes info and resourceManager for balancer
for _, nodes := range c.replicaWithNodes {
for i := range nodes {
nodeInfo := session.NewNodeInfo(nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodes[i],
Address: "127.0.0.1:0",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)

View File

@ -222,7 +222,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
@ -253,7 +257,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {
}
for _, node := range lo.Keys(distributions) {
nodeInfo := session.NewNodeInfo(node, "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(20))
nodeInfo.SetState(session.NodeStateNormal)
suite.balancer.nodeManager.Add(nodeInfo)
@ -365,7 +373,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
@ -474,7 +486,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
// 3. set up nodes info and resourceManager for balancer
for i := range balanceCase.nodes {
nodeInfo := session.NewNodeInfo(balanceCase.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: balanceCase.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.SetState(balanceCase.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, balanceCase.nodes[i])
@ -618,7 +634,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
@ -734,7 +754,10 @@ func (suite *ScoreBasedBalancerTestSuite) TestMultiReplicaBalance() {
// 3. set up nodes info and resourceManager for balancer
for _, nodes := range c.replicaWithNodes {
for i := range nodes {
nodeInfo := session.NewNodeInfo(nodes[i], "127.0.0.1:0")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodes[i],
Address: "127.0.0.1:0",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)

View File

@ -88,8 +88,16 @@ func (suite *BalanceCheckerTestSuite) TearDownTest() {
func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
@ -157,8 +165,16 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
@ -212,8 +228,16 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
// set up nodes info, stopping node1
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))
@ -276,8 +300,14 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
// set up nodes info, stopping node1
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
}))
suite.nodeMgr.Stopping(int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1))
suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2))

View File

@ -88,7 +88,11 @@ func (suite *ChannelCheckerTestSuite) TearDownTest() {
func (suite *ChannelCheckerTestSuite) setNodeAvailable(nodes ...int64) {
for _, node := range nodes {
nodeInfo := session.NewNodeInfo(node, "")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "",
Hostname: "localhost",
})
nodeInfo.SetLastHeartbeat(time.Now())
suite.nodeMgr.Add(nodeInfo)
}
@ -117,7 +121,11 @@ func (suite *ChannelCheckerTestSuite) TestLoadChannel() {
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
channels := []*datapb.VchannelInfo{

View File

@ -89,8 +89,16 @@ func (suite *CheckerControllerSuite) TestBasic() {
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
suite.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)

View File

@ -87,8 +87,16 @@ func (suite *IndexCheckerSuite) TestLoadIndex() {
coll.FieldIndexID = map[int64]int64{101: 1000}
checker.meta.CollectionManager.PutCollection(coll)
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
@ -133,8 +141,16 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() {
coll.FieldIndexID = map[int64]int64{101: 1000}
checker.meta.CollectionManager.PutCollection(coll)
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
@ -178,8 +194,16 @@ func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() {
coll.FieldIndexID = map[int64]int64{101: 1000}
checker.meta.CollectionManager.PutCollection(coll)
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)

View File

@ -195,7 +195,11 @@ func (suite *LeaderCheckerTestSuite) TestStoppingNode() {
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(2)
tasks := suite.checker.Check(context.TODO())

View File

@ -109,8 +109,16 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
@ -166,8 +174,16 @@ func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() {
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)

View File

@ -39,7 +39,11 @@ func (suite *UtilTestSuite) SetupTest() {
func (suite *UtilTestSuite) setNodeAvailable(nodes ...int64) {
for _, node := range nodes {
nodeInfo := session.NewNodeInfo(node, "")
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "",
Hostname: "localhost",
})
nodeInfo.SetLastHeartbeat(time.Now())
suite.nodeMgr.Add(nodeInfo)
}

View File

@ -170,10 +170,21 @@ func (suite *JobSuite) SetupTest() {
suite.scheduler.Start()
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
suite.nodeMgr.Add(session.NewNodeInfo(1000, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2000, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(3000, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1000,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2000,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3000,
Address: "localhost",
Hostname: "localhost",
}))
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 1000)
suite.NoError(err)
err = suite.meta.AssignNode(meta.DefaultResourceGroupName, 2000)

View File

@ -86,7 +86,11 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() {
}
func (suite *ResourceManagerSuite) TestManipulateNode() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1")
suite.NoError(err)
// test add node to rg
@ -132,10 +136,26 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
_, err = suite.manager.TransferNode("rg1", "rg2", 5)
suite.ErrorIs(err, ErrNodeNotEnough)
suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(12, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(13, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(14, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 11,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 12,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 13,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 14,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.AssignNode("rg1", 11)
suite.manager.AssignNode("rg1", 12)
suite.manager.AssignNode("rg1", 13)
@ -153,11 +173,31 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
}
func (suite *ResourceManagerSuite) TestHandleNodeUp() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(100, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(101, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 100,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 101,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1")
suite.NoError(err)
@ -193,10 +233,26 @@ func (suite *ResourceManagerSuite) TestHandleNodeUp() {
}
func (suite *ResourceManagerSuite) TestRecover() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(4, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg1")
suite.NoError(err)
err = suite.manager.AddResourceGroup("rg2")
@ -236,9 +292,21 @@ func (suite *ResourceManagerSuite) TestRecover() {
}
func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg")
suite.NoError(err)
suite.manager.AssignNode("rg", 1)
@ -261,9 +329,21 @@ func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
}
func (suite *ResourceManagerSuite) TestCheckResourceGroup() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg")
suite.NoError(err)
suite.manager.AssignNode("rg", 1)
@ -285,9 +365,21 @@ func (suite *ResourceManagerSuite) TestCheckResourceGroup() {
}
func (suite *ResourceManagerSuite) TestGetOutboundNode() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.AddResourceGroup("rg")
suite.manager.AddResourceGroup("rg1")
suite.manager.AssignNode("rg", 1)
@ -312,9 +404,21 @@ func (suite *ResourceManagerSuite) TestGetOutboundNode() {
}
func (suite *ResourceManagerSuite) TestAutoRecover() {
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
err := suite.manager.AddResourceGroup("rg")
suite.NoError(err)
suite.manager.AssignNode(DefaultResourceGroupName, 1)
@ -337,7 +441,11 @@ func (suite *ResourceManagerSuite) TestAutoRecover() {
nodes, _ = suite.manager.GetNodes(DefaultResourceGroupName)
suite.Len(nodes, 0)
suite.manager.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
suite.manager.HandleNodeUp(1)
suite.manager.AutoRecoverResourceGroup("rg")
nodes, _ = suite.manager.GetNodes("rg")
@ -348,7 +456,11 @@ func (suite *ResourceManagerSuite) TestAutoRecover() {
func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
for i := 0; i < 10; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
}
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
@ -387,9 +499,21 @@ func (suite *ResourceManagerSuite) TestStoreFailed() {
nodeMgr := session.NewNodeManager()
manager := NewResourceManager(store, nodeMgr)
nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
nodeMgr.Add(session.NewNodeInfo(3, "localhost"))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "127.0.0.1:0",
Hostname: "localhost",
}))
storeErr := errors.New("store error")
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(storeErr)
store.EXPECT().RemoveResourceGroup(mock.Anything).Return(storeErr)

View File

@ -84,10 +84,26 @@ func (suite *ReplicaObserverSuite) SetupTest() {
func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
suite.meta.ResourceManager.AddResourceGroup("rg1")
suite.meta.ResourceManager.AddResourceGroup("rg2")
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(3, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(4, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost:8080",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost:8080",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "localhost:8080",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
Address: "localhost:8080",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg1", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)

View File

@ -80,7 +80,11 @@ func (suite *ResourceObserverSuite) SetupTest() {
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
for i := 0; i < 10; i++ {
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(i))
}
}
@ -110,10 +114,26 @@ func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
typeutil.NewUniqueSet(),
))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(100),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(101),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(102),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(103),
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg", 100)
suite.meta.ResourceManager.AssignNode("rg", 101)
suite.meta.ResourceManager.AssignNode("rg", 102)
@ -139,7 +159,11 @@ func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() {
suite.meta.ResourceManager.AddResourceGroup("rg")
for i := 100; i < 200; i++ {
suite.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg", int64(i))
suite.meta.ResourceManager.HandleNodeDown(int64(i))
}
@ -177,10 +201,26 @@ func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error"))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(int64(100), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(101), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(102), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(int64(103), "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(100),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(101),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(102),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(103),
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg", 100)
suite.meta.ResourceManager.AssignNode("rg", 101)
suite.meta.ResourceManager.AssignNode("rg", 102)

View File

@ -397,9 +397,12 @@ func (s *Server) startQueryCoord() error {
return err
}
for _, node := range sessions {
n := session.NewNodeInfo(node.ServerID, node.Address)
n.SetVersion(node.Version)
s.nodeMgr.Add(n)
s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node.ServerID,
Address: node.Address,
Hostname: node.HostName,
Version: node.Version,
}))
s.taskScheduler.AddExecutor(node.ServerID)
if node.Stopping {
@ -613,7 +616,12 @@ func (s *Server) watchNodes(revision int64) {
zap.Int64("nodeID", nodeID),
zap.String("nodeAddr", addr),
)
s.nodeMgr.Add(session.NewNodeInfo(nodeID, addr))
s.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID,
Address: addr,
Hostname: event.Session.HostName,
Version: event.Session.Version,
}))
s.nodeUpEventChan <- nodeID
select {
case s.notifyNodeUp <- struct{}{}:

View File

@ -209,7 +209,11 @@ func (suite *ServerSuite) TestNodeUp() {
}, 5*time.Second, time.Second)
// mock unhealthy node
suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost",
Hostname: "localhost",
}))
node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 101)
node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Success()}, nil).Maybe()

View File

@ -153,7 +153,11 @@ func (suite *ServiceSuite) SetupTest() {
)
suite.targetObserver.Start()
for _, node := range suite.nodes {
suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "localhost",
Hostname: "localhost",
}))
err := suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, node)
suite.NoError(err)
}
@ -377,10 +381,26 @@ func (suite *ServiceSuite) TestResourceGroup() {
suite.Equal(commonpb.ErrorCode_Success, resp1.GetStatus().GetErrorCode())
suite.Len(resp1.ResourceGroups, 2)
server.nodeMgr.Add(session.NewNodeInfo(1011, "localhost"))
server.nodeMgr.Add(session.NewNodeInfo(1012, "localhost"))
server.nodeMgr.Add(session.NewNodeInfo(1013, "localhost"))
server.nodeMgr.Add(session.NewNodeInfo(1014, "localhost"))
server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1011,
Address: "localhost",
Hostname: "localhost",
}))
server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1012,
Address: "localhost",
Hostname: "localhost",
}))
server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1013,
Address: "localhost",
Hostname: "localhost",
}))
server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1014,
Address: "localhost",
Hostname: "localhost",
}))
server.meta.ResourceManager.AddResourceGroup("rg11")
server.meta.ResourceManager.AssignNode("rg11", 1011)
server.meta.ResourceManager.AssignNode("rg11", 1012)
@ -549,10 +569,26 @@ func (suite *ServiceSuite) TestTransferNode() {
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg4")
suite.NoError(err)
suite.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(12, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(13, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(14, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 11,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 12,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 13,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 14,
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.AssignNode("rg3", 11)
suite.meta.ResourceManager.AssignNode("rg3", 12)
suite.meta.ResourceManager.AssignNode("rg3", 13)
@ -661,11 +697,31 @@ func (suite *ServiceSuite) TestTransferReplica() {
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(3)))
suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(1002, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(1003, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(1004, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(1005, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost",
Hostname: "localhost",
}))
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1002,
Address: "localhost",
Hostname: "localhost",
}))
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1003,
Address: "localhost",
Hostname: "localhost",
}))
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1004,
Address: "localhost",
Hostname: "localhost",
}))
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1005,
Address: "localhost",
Hostname: "localhost",
}))
suite.server.meta.AssignNode("rg1", 1001)
suite.server.meta.AssignNode("rg2", 1002)
suite.server.meta.AssignNode("rg3", 1003)
@ -1144,8 +1200,16 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() {
}
}
}
suite.nodeMgr.Add(session.NewNodeInfo(1001, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(1002, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1002,
Address: "localhost",
Hostname: "localhost",
}))
defer func() {
for _, collection := range suite.collections {
replicas := suite.meta.ReplicaManager.GetByCollection(collection)
@ -1280,7 +1344,11 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.nodeMgr.Add(session.NewNodeInfo(10, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 10,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Stopping(10)
resp, err = server.LoadBalance(ctx, req)
suite.NoError(err)
@ -1518,7 +1586,11 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode())
for _, node := range suite.nodes {
suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "localhost",
Hostname: "localhost",
}))
}
// Segment not fully loaded
@ -1573,7 +1645,11 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
suite.taskScheduler.EXPECT().AddExecutor(mock.Anything)
suite.distController.EXPECT().StartDistInstance(mock.Anything, mock.Anything)
suite.nodeMgr.Add(session.NewNodeInfo(111, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 111,
Address: "localhost",
Hostname: "localhost",
}))
server.handleNodeUp(111)
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodes, 1)
@ -1582,7 +1658,11 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
// when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp
suite.server.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(222, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 222,
Address: "localhost",
Hostname: "localhost",
}))
server.handleNodeUp(222)
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodes, 2)

View File

@ -94,7 +94,11 @@ func (suite *ClusterTestSuite) setupServers() {
func (suite *ClusterTestSuite) setupCluster() {
suite.nodeManager = NewNodeManager()
for i, lis := range suite.listeners {
node := NewNodeInfo(int64(i), lis.Addr().String())
node := NewNodeInfo(ImmutableNodeInfo{
NodeID: int64(i),
Address: lis.Addr().String(),
Hostname: "localhost",
})
suite.nodeManager.Add(node)
}
suite.cluster = NewCluster(suite.nodeManager, DefaultQueryNodeCreator)

View File

@ -102,22 +102,31 @@ const (
NodeStateStopping
)
type ImmutableNodeInfo struct {
NodeID int64
Address string
Hostname string
Version semver.Version
}
type NodeInfo struct {
stats
mu sync.RWMutex
id int64
addr string
immutableInfo ImmutableNodeInfo
state State
lastHeartbeat *atomic.Int64
version semver.Version
}
func (n *NodeInfo) ID() int64 {
return n.id
return n.immutableInfo.NodeID
}
func (n *NodeInfo) Addr() string {
return n.addr
return n.immutableInfo.Address
}
func (n *NodeInfo) Hostname() string {
return n.immutableInfo.Hostname
}
func (n *NodeInfo) SegmentCnt() int {
@ -160,19 +169,14 @@ func (n *NodeInfo) UpdateStats(opts ...StatsOption) {
n.mu.Unlock()
}
func (n *NodeInfo) SetVersion(v semver.Version) {
n.version = v
}
func (n *NodeInfo) Version() semver.Version {
return n.version
return n.immutableInfo.Version
}
func NewNodeInfo(id int64, addr string) *NodeInfo {
func NewNodeInfo(info ImmutableNodeInfo) *NodeInfo {
return &NodeInfo{
stats: newStats(),
id: id,
addr: addr,
immutableInfo: info,
lastHeartbeat: atomic.NewInt64(0),
}
}

View File

@ -156,7 +156,11 @@ func (suite *TaskSuite) SetupTest() {
func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
for node := range suite.distributions {
suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: node,
Address: "localhost",
Hostname: "localhost",
}))
}
switch testName {

View File

@ -54,10 +54,12 @@ func TestSpawnReplicasWithRG(t *testing.T) {
m.ResourceManager.AddResourceGroup("rg1")
m.ResourceManager.AddResourceGroup("rg2")
m.ResourceManager.AddResourceGroup("rg3")
for i := 1; i < 10; i++ {
nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
}))
if i%3 == 0 {
m.ResourceManager.AssignNode("rg1", int64(i))
}