Add Promethues suite test utility (#26046)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/26082/head
congqixia 2023-08-02 10:05:05 +08:00 committed by GitHub
parent 9c55a7f422
commit 0727e017c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 190 deletions

View File

@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -36,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/testutils"
)
func getMetaKv(t *testing.T) kv.MetaKv {
@ -55,7 +55,7 @@ func getWatchKV(t *testing.T) kv.WatchKV {
}
type ClusterSuite struct {
suite.Suite
testutils.PromMetricsSuite
kv kv.WatchKV
}
@ -104,7 +104,7 @@ func (suite *ClusterSuite) TestCreate() {
suite.EqualValues(1, len(dataNodes))
suite.EqualValues("localhost:8080", dataNodes[0].info.Address)
suite.Equal(float64(1), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("startup_with_existed_channel_data", func() {
@ -223,7 +223,7 @@ func (suite *ClusterSuite) TestRegister() {
suite.EqualValues(1, len(sessions))
suite.EqualValues("localhost:8080", sessions[0].info.Address)
suite.Equal(float64(1), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("register_to_empty_cluster_with_buffer_channels", func() {
@ -258,7 +258,7 @@ func (suite *ClusterSuite) TestRegister() {
suite.EqualValues(1, nodeChannels[0].NodeID)
suite.EqualValues("ch1", nodeChannels[0].Channels[0].Name)
suite.Equal(float64(1), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("register_and_restart_with_no_channel", func() {
@ -290,7 +290,7 @@ func (suite *ClusterSuite) TestRegister() {
channels := channelManager2.GetChannels()
suite.Empty(channels)
suite.Equal(float64(1), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
}
@ -321,7 +321,7 @@ func (suite *ClusterSuite) TestUnregister() {
sessions := sessionManager.GetSessions()
suite.Empty(sessions)
suite.Equal(float64(0), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 0)
})
suite.Run("move_channel_to_online_nodes_after_unregister", func() {
@ -358,10 +358,10 @@ func (suite *ClusterSuite) TestUnregister() {
suite.EqualValues(1, len(channels[0].Channels))
suite.EqualValues("ch1", channels[0].Channels[0].Name)
suite.Equal(float64(1), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 1)
})
suite.Run("remove all channels after unregsiter", func() {
suite.Run("remove_all_channels_after_unregsiter", func() {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
@ -393,7 +393,7 @@ func (suite *ClusterSuite) TestUnregister() {
suite.EqualValues(1, len(channel.Channels))
suite.EqualValues("ch_1", channel.Channels[0].Name)
suite.Equal(float64(0), testutil.ToFloat64(metrics.DataCoordNumDataNodes))
suite.MetricsEqual(metrics.DataCoordNumDataNodes, 0)
})
}
@ -401,107 +401,6 @@ func TestCluster(t *testing.T) {
suite.Run(t, new(ClusterSuite))
}
func TestUnregister(t *testing.T) {
kv := getWatchKV(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("remove node after unregister", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)
err = cluster.UnRegister(nodes[0])
assert.NoError(t, err)
sessions := sessionManager.GetSessions()
assert.Empty(t, sessions)
})
t.Run("move channels to online nodes after unregister", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
nodeInfo1 := &NodeInfo{
Address: "localhost:8080",
NodeID: 1,
}
nodeInfo2 := &NodeInfo{
Address: "localhost:8081",
NodeID: 2,
}
nodes := []*NodeInfo{nodeInfo1, nodeInfo2}
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)
err = cluster.Watch("ch1", 1)
assert.NoError(t, err)
err = cluster.UnRegister(nodeInfo1)
assert.NoError(t, err)
channels := channelManager.GetChannels()
assert.EqualValues(t, 1, len(channels))
assert.EqualValues(t, 2, channels[0].NodeID)
assert.EqualValues(t, 1, len(channels[0].Channels))
assert.EqualValues(t, "ch1", channels[0].Channels[0].Name)
})
t.Run("remove all channels after unregsiter", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var mockSessionCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(1, nil)
}
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
nodeInfo := &NodeInfo{
Address: "localhost:8080",
NodeID: 1,
}
err = cluster.Startup(ctx, []*NodeInfo{nodeInfo})
assert.NoError(t, err)
err = cluster.Watch("ch_1", 1)
assert.NoError(t, err)
err = cluster.UnRegister(nodeInfo)
assert.NoError(t, err)
channels := channelManager.GetChannels()
assert.Empty(t, channels)
channel := channelManager.GetBufferChannels()
assert.NotNil(t, channel)
assert.EqualValues(t, 1, len(channel.Channels))
assert.EqualValues(t, "ch_1", channel.Channels[0].Name)
})
}
func TestWatchIfNeeded(t *testing.T) {
kv := getWatchKV(t)
defer func() {

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -35,71 +36,77 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/testutils"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
)
func TestMetaReloadFromKV(t *testing.T) {
t.Run("ListSegments fail", func(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.On("ListSegments",
mock.Anything,
).Return(nil, errors.New("error"))
// MetaReloadSuite tests meta reload & meta creation related logic
type MetaReloadSuite struct {
testutils.PromMetricsSuite
_, err := newMeta(context.TODO(), catalog, nil)
assert.Error(t, err)
catalog *mocks.DataCoordCatalog
meta *meta
}
func (suite *MetaReloadSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(suite.T())
suite.catalog = catalog
}
func (suite *MetaReloadSuite) resetMock() {
suite.catalog.ExpectedCalls = nil
}
func (suite *MetaReloadSuite) TestReloadFromKV() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
suite.Run("ListSegments_fail", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return(nil, errors.New("mock"))
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
})
t.Run("ListChannelCheckpoint fail", func(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.On("ListSegments",
mock.Anything,
).Return([]*datapb.SegmentInfo{}, nil)
catalog.On("ListChannelCheckpoint",
mock.Anything,
).Return(nil, errors.New("error"))
_, err := newMeta(context.TODO(), catalog, nil)
assert.Error(t, err)
suite.Run("ListChannelCheckpoint_fail", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil)
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock"))
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
})
t.Run("ListIndexes fail", func(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.On("ListSegments",
mock.Anything,
).Return([]*datapb.SegmentInfo{}, nil)
catalog.On("ListChannelCheckpoint",
mock.Anything,
).Return(map[string]*msgpb.MsgPosition{}, nil)
catalog.On("ListIndexes",
mock.Anything,
).Return(nil, errors.New("error"))
_, err := newMeta(context.TODO(), catalog, nil)
assert.Error(t, err)
suite.Run("ListIndexes_fail", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil)
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{}, nil)
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, errors.New("mock"))
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
})
t.Run("ListSegmentIndexes fails", func(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.On("ListSegments",
mock.Anything,
).Return([]*datapb.SegmentInfo{}, nil)
catalog.On("ListChannelCheckpoint",
mock.Anything,
).Return(map[string]*msgpb.MsgPosition{}, nil)
catalog.On("ListIndexes",
mock.Anything,
).Return([]*model.Index{}, nil)
catalog.On("ListSegmentIndexes",
mock.Anything,
).Return(nil, errors.New("error"))
_, err := newMeta(context.TODO(), catalog, nil)
assert.Error(t, err)
suite.Run("ListSegmentIndexes_fails", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil)
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{}, nil)
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil)
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, errors.New("mock"))
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
})
t.Run("ok", func(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.On("ListSegments",
mock.Anything,
).Return([]*datapb.SegmentInfo{
suite.Run("ok", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1,
@ -107,20 +114,14 @@ func TestMetaReloadFromKV(t *testing.T) {
State: commonpb.SegmentState_Flushed,
},
}, nil)
catalog.On("ListChannelCheckpoint",
mock.Anything,
).Return(map[string]*msgpb.MsgPosition{
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{
"ch": {
ChannelName: "cn",
MsgID: []byte{},
Timestamp: 1000,
},
}, nil)
catalog.On("ListIndexes",
mock.Anything,
).Return([]*model.Index{
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{
{
CollectionID: 1,
IndexID: 1,
@ -129,20 +130,79 @@ func TestMetaReloadFromKV(t *testing.T) {
},
}, nil)
catalog.On("ListSegmentIndexes",
mock.Anything,
).Return([]*model.SegmentIndex{
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{
{
SegmentID: 1,
IndexID: 1,
},
}, nil)
_, err := newMeta(context.TODO(), catalog, nil)
assert.NoError(t, err)
meta, err := newMeta(ctx, suite.catalog, nil)
suite.NoError(err)
suite.NotNil(meta)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel), 1)
})
}
type MetaBasicSuite struct {
testutils.PromMetricsSuite
collID int64
partIDs []int64
channelName string
meta *meta
}
func (suite *MetaBasicSuite) SetupSuite() {
Params.Init()
}
func (suite *MetaBasicSuite) SetupTest() {
suite.collID = 1
suite.partIDs = []int64{100, 101}
suite.channelName = "c1"
meta, err := newMemoryMeta()
suite.Require().NoError(err)
suite.meta = meta
}
func (suite *MetaBasicSuite) getCollectionInfo(partIDs ...int64) *collectionInfo {
testSchema := newTestSchema()
return &collectionInfo{
ID: suite.collID,
Schema: testSchema,
Partitions: partIDs,
StartPositions: []*commonpb.KeyDataPair{},
}
}
func (suite *MetaBasicSuite) TestCollection() {
meta := suite.meta
info := suite.getCollectionInfo(suite.partIDs...)
meta.AddCollection(info)
collInfo := meta.GetCollection(suite.collID)
suite.Require().NotNil(collInfo)
// check partition info
suite.EqualValues(suite.collID, collInfo.ID)
suite.EqualValues(info.Schema, collInfo.Schema)
suite.EqualValues(len(suite.partIDs), len(collInfo.Partitions))
suite.ElementsMatch(info.Partitions, collInfo.Partitions)
suite.MetricsEqual(metrics.DataCoordNumCollections.WithLabelValues(), 1)
}
func TestMeta(t *testing.T) {
suite.Run(t, new(MetaBasicSuite))
suite.Run(t, new(MetaReloadSuite))
}
func TestMeta_Basic(t *testing.T) {
const collID = UniqueID(0)
const partID0 = UniqueID(100)
@ -170,20 +230,6 @@ func TestMeta_Basic(t *testing.T) {
Partitions: []UniqueID{},
}
t.Run("Test Collection", func(t *testing.T) {
meta.AddCollection(collInfo)
// check has collection
collInfo := meta.GetCollection(collID)
assert.NotNil(t, collInfo)
// check partition info
assert.EqualValues(t, collID, collInfo.ID)
assert.EqualValues(t, testSchema, collInfo.Schema)
assert.EqualValues(t, 2, len(collInfo.Partitions))
assert.EqualValues(t, partID0, collInfo.Partitions[0])
assert.EqualValues(t, partID1, collInfo.Partitions[1])
})
t.Run("Test Segment", func(t *testing.T) {
meta.AddCollection(collInfoWoPartition)
// create seg0 for partition0, seg0/seg1 for partition1

View File

@ -0,0 +1,17 @@
package testutils
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/suite"
)
// PromMetricsSuite is a util suite wrapper providing prometheus metrics assertion functions.
type PromMetricsSuite struct {
suite.Suite
}
func (suite *PromMetricsSuite) MetricsEqual(c prometheus.Collector, expect float64, msgAndArgs ...any) bool {
value := testutil.ToFloat64(c)
return suite.Suite.Equal(expect, value, msgAndArgs...)
}