mirror of https://github.com/milvus-io/milvus.git
enhance: resource group unittest refactory (#32740)
issue: #30647 pr: #32739 Signed-off-by: chyezh <chyezh@outlook.com>pull/32764/head
parent
15b78749a6
commit
fe7a1777d8
|
@ -16,7 +16,9 @@
|
|||
package observers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -26,14 +28,11 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type ResourceObserverSuite struct {
|
||||
|
@ -76,7 +75,6 @@ func (suite *ResourceObserverSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, suite.store, suite.nodeMgr)
|
||||
|
||||
suite.observer = NewResourceObserver(suite.meta)
|
||||
suite.observer.Start()
|
||||
|
||||
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
|
||||
suite.store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
|
||||
|
@ -91,161 +89,113 @@ func (suite *ResourceObserverSuite) SetupTest() {
|
|||
}
|
||||
|
||||
func (suite *ResourceObserverSuite) TearDownTest() {
|
||||
suite.observer.Stop()
|
||||
suite.store.ExpectedCalls = nil
|
||||
}
|
||||
|
||||
func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
|
||||
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
|
||||
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil)
|
||||
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
|
||||
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||
&querypb.Replica{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
Nodes: []int64{100, 101},
|
||||
ResourceGroup: "rg",
|
||||
},
|
||||
typeutil.NewUniqueSet(100, 101),
|
||||
))
|
||||
|
||||
// hack all node down from replica
|
||||
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||
&querypb.Replica{
|
||||
ID: 2,
|
||||
CollectionID: 1,
|
||||
Nodes: []int64{},
|
||||
ResourceGroup: "rg",
|
||||
},
|
||||
typeutil.NewUniqueSet(),
|
||||
))
|
||||
func (suite *ResourceObserverSuite) TestObserverRecoverOperation() {
|
||||
suite.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: 4},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 4},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 6},
|
||||
})
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(100),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(101),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(102),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(103),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.meta.ResourceManager.HandleNodeUp(100)
|
||||
suite.meta.ResourceManager.HandleNodeUp(101)
|
||||
suite.meta.ResourceManager.HandleNodeUp(102)
|
||||
suite.meta.ResourceManager.HandleNodeUp(103)
|
||||
suite.meta.ResourceManager.HandleNodeDown(100)
|
||||
suite.meta.ResourceManager.HandleNodeDown(101)
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg"))
|
||||
// There's 10 exists node in cluster, new incoming resource group should get 4 nodes after recover.
|
||||
suite.observer.checkAndRecoverResourceGroup()
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg"))
|
||||
|
||||
rg := suite.meta.ResourceManager.GetResourceGroup("rg")
|
||||
suite.NotNil(rg)
|
||||
suite.meta.ResourceManager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: 6},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 10},
|
||||
})
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
// There's 10 exists node in cluster, new incoming resource group should get 6 nodes after recover.
|
||||
suite.observer.checkAndRecoverResourceGroup()
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
|
||||
suite.False(rg.ContainNode(100))
|
||||
suite.False(rg.ContainNode(101))
|
||||
suite.True(rg.ContainNode(102))
|
||||
suite.True(rg.ContainNode(103))
|
||||
suite.meta.ResourceManager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: 1},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 1},
|
||||
})
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
// There's 10 exists node in cluster, but has been occupied by rg1 and rg2, new incoming resource group cannot get any node.
|
||||
suite.observer.checkAndRecoverResourceGroup()
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
// New node up, rg3 should get the node.
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 10,
|
||||
}))
|
||||
suite.meta.ResourceManager.HandleNodeUp(10)
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
// but new node with id 10 is not in
|
||||
suite.nodeMgr.Remove(10)
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
// new node is down, rg3 cannot use that node anymore.
|
||||
suite.observer.checkAndRecoverResourceGroup()
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
|
||||
// create a new incoming node failure.
|
||||
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Unset()
|
||||
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(errors.New("failure"))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 11,
|
||||
}))
|
||||
// should be failure, so new node cannot be used by rg3.
|
||||
suite.meta.ResourceManager.HandleNodeUp(11)
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Unset()
|
||||
suite.store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
|
||||
// storage recovered, so next recover will be success.
|
||||
suite.observer.checkAndRecoverResourceGroup()
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
|
||||
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
|
||||
}
|
||||
|
||||
func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() {
|
||||
suite.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: 50},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 50},
|
||||
})
|
||||
for i := 100; i < 200; i++ {
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(i),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.meta.ResourceManager.HandleNodeUp(int64(i))
|
||||
suite.meta.ResourceManager.HandleNodeDown(int64(i))
|
||||
func (suite *ResourceObserverSuite) TestSchedule() {
|
||||
suite.observer.Start()
|
||||
defer suite.observer.Stop()
|
||||
|
||||
check := func() {
|
||||
suite.Eventually(func() bool {
|
||||
rgs := suite.meta.ResourceManager.ListResourceGroups()
|
||||
for _, rg := range rgs {
|
||||
if err := suite.meta.ResourceManager.GetResourceGroup(rg).MeetRequirement(); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
suite.Error(suite.meta.ResourceManager.MeetRequirement("rg"))
|
||||
}
|
||||
for i := 1; i <= 4; i++ {
|
||||
suite.meta.ResourceManager.AddResourceGroup(fmt.Sprintf("rg%d", i), &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: int32(i)},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: int32(i)},
|
||||
})
|
||||
}
|
||||
check()
|
||||
|
||||
func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
|
||||
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
|
||||
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil)
|
||||
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
|
||||
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||
&querypb.Replica{
|
||||
ID: 1,
|
||||
CollectionID: 1,
|
||||
Nodes: []int64{100, 101},
|
||||
ResourceGroup: "rg",
|
||||
},
|
||||
typeutil.NewUniqueSet(100, 101),
|
||||
))
|
||||
|
||||
// hack all node down from replica
|
||||
suite.meta.ReplicaManager.Put(meta.NewReplica(
|
||||
&querypb.Replica{
|
||||
ID: 2,
|
||||
CollectionID: 1,
|
||||
Nodes: []int64{},
|
||||
ResourceGroup: "rg",
|
||||
},
|
||||
typeutil.NewUniqueSet(),
|
||||
))
|
||||
|
||||
suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error"))
|
||||
suite.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: 4},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 4},
|
||||
})
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(100),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(101),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(102),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: int64(103),
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.meta.ResourceManager.HandleNodeUp(100)
|
||||
suite.meta.ResourceManager.HandleNodeUp(101)
|
||||
suite.meta.ResourceManager.HandleNodeUp(102)
|
||||
suite.meta.ResourceManager.HandleNodeUp(103)
|
||||
suite.meta.ResourceManager.HandleNodeDown(100)
|
||||
suite.meta.ResourceManager.HandleNodeDown(101)
|
||||
|
||||
rg := suite.meta.ResourceManager.GetResourceGroup("rg")
|
||||
suite.NotNil(rg)
|
||||
|
||||
suite.False(rg.ContainNode(100))
|
||||
suite.False(rg.ContainNode(101))
|
||||
suite.True(rg.ContainNode(102))
|
||||
suite.True(rg.ContainNode(103))
|
||||
for i := 1; i <= 4; i++ {
|
||||
suite.meta.ResourceManager.AddResourceGroup(fmt.Sprintf("rg%d", i), &rgpb.ResourceGroupConfig{
|
||||
Requests: &rgpb.ResourceGroupLimit{NodeNum: 0},
|
||||
Limits: &rgpb.ResourceGroupLimit{NodeNum: 0},
|
||||
})
|
||||
}
|
||||
check()
|
||||
}
|
||||
|
||||
func (suite *ResourceObserverSuite) TearDownSuite() {
|
||||
suite.kv.Close()
|
||||
suite.observer.Stop()
|
||||
}
|
||||
|
||||
func TestResourceObserver(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue