fix: Remove time tick delay metrics when nodes go offline (#30833)

See also #30832

This PR removes time tick delay metrics when rootcoord GetMetrics
response does not have previously existed querynode/datanode

Also add unit tests for this case

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Signed-off-by: Congqi.Xia <congqi.xia@zilliz.com>
pull/30848/head^2
congqixia 2024-02-28 10:10:56 +08:00 committed by GitHub
parent 57397b1307
commit af315539d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 395 additions and 13 deletions

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -178,6 +179,8 @@ func (q *QuotaCenter) clearMetrics() {
// syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes.
func (q *QuotaCenter) syncMetrics() error {
oldDataNodes := typeutil.NewSet(lo.Keys(q.dataNodeMetrics)...)
oldQueryNodes := typeutil.NewSet(lo.Keys(q.queryNodeMetrics)...)
q.clearMetrics()
ctx, cancel := context.WithTimeout(context.Background(), GetMetricsTimeout)
defer cancel()
@ -191,12 +194,9 @@ func (q *QuotaCenter) syncMetrics() error {
// get Query cluster metrics
group.Go(func() error {
rsp, err := q.queryCoord.GetMetrics(ctx, req)
if err != nil {
if err := merr.CheckRPCCall(rsp, err); err != nil {
return err
}
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return fmt.Errorf("quotaCenter get Query cluster failed, err = %s", rsp.GetStatus().GetReason())
}
queryCoordTopology := &metricsinfo.QueryCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology)
if err != nil {
@ -206,6 +206,7 @@ func (q *QuotaCenter) syncMetrics() error {
collections := typeutil.NewUniqueSet()
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
if queryNodeMetric.QuotaMetrics != nil {
oldQueryNodes.Remove(queryNodeMetric.ID)
q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics
collections.Insert(queryNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
}
@ -216,12 +217,9 @@ func (q *QuotaCenter) syncMetrics() error {
// get Data cluster metrics
group.Go(func() error {
rsp, err := q.dataCoord.GetMetrics(ctx, req)
if err != nil {
if err := merr.CheckRPCCall(rsp, err); err != nil {
return err
}
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return fmt.Errorf("quotaCenter get Data cluster failed, err = %s", rsp.GetStatus().GetReason())
}
dataCoordTopology := &metricsinfo.DataCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology)
if err != nil {
@ -231,6 +229,7 @@ func (q *QuotaCenter) syncMetrics() error {
collections := typeutil.NewUniqueSet()
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
if dataNodeMetric.QuotaMetrics != nil {
oldDataNodes.Remove(dataNodeMetric.ID)
q.dataNodeMetrics[dataNodeMetric.ID] = dataNodeMetric.QuotaMetrics
collections.Insert(dataNodeMetric.QuotaMetrics.Effect.CollectionIDs...)
}
@ -266,11 +265,13 @@ func (q *QuotaCenter) syncMetrics() error {
if err != nil {
return err
}
// log.Debug("QuotaCenter sync metrics done",
// zap.Any("dataNodeMetrics", q.dataNodeMetrics),
// zap.Any("queryNodeMetrics", q.queryNodeMetrics),
// zap.Any("proxyMetrics", q.proxyMetrics),
// zap.Any("dataCoordMetrics", q.dataCoordMetrics))
for oldDN := range oldDataNodes {
metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.DataNodeRole, strconv.FormatInt(oldDN, 10))
}
for oldQN := range oldQueryNodes {
metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(oldQN, 10))
}
return nil
}

View File

@ -24,8 +24,10 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -35,10 +37,12 @@ import (
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -669,3 +673,375 @@ func TestQuotaCenter(t *testing.T) {
assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLUpsert]), float64(6*1024*1024))
})
}
type QuotaCenterSuite struct {
testutils.PromMetricsSuite
core *Core
pcm *proxyutil.MockProxyClientManager
dc *mocks.MockDataCoordClient
qc *mocks.MockQueryCoordClient
meta *mockrootcoord.IMetaTable
}
func (s *QuotaCenterSuite) SetupSuite() {
paramtable.Init()
var err error
s.core, err = NewCore(context.Background(), nil)
s.Require().NoError(err)
}
func (s *QuotaCenterSuite) SetupTest() {
s.pcm = proxyutil.NewMockProxyClientManager(s.T())
s.dc = mocks.NewMockDataCoordClient(s.T())
s.qc = mocks.NewMockQueryCoordClient(s.T())
s.meta = mockrootcoord.NewIMetaTable(s.T())
}
func (s *QuotaCenterSuite) getEmptyQCMetricsRsp() string {
metrics := &metricsinfo.QueryCoordTopology{
Cluster: metricsinfo.QueryClusterTopology{},
}
resp, err := metricsinfo.MarshalTopology(metrics)
s.Require().NoError(err)
return resp
}
func (s *QuotaCenterSuite) getEmptyDCMetricsRsp() string {
metrics := &metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{},
}
resp, err := metricsinfo.MarshalTopology(metrics)
s.Require().NoError(err)
return resp
}
func (s *QuotaCenterSuite) TestSyncMetricsSuccess() {
pcm := s.pcm
dc := s.dc
qc := s.qc
meta := s.meta
core := s.core
s.Run("querycoord_cluster", func() {
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyDCMetricsRsp(),
}, nil).Once()
metrics := &metricsinfo.QueryCoordTopology{
Cluster: metricsinfo.QueryClusterTopology{
ConnectedNodes: []metricsinfo.QueryNodeInfos{
{BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 1}, QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 1, CollectionIDs: []int64{100, 200}}}},
{BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2}, QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}}}},
},
},
}
resp, err := metricsinfo.MarshalTopology(metrics)
s.Require().NoError(err)
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: resp,
}, nil).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
s.Require().NoError(err)
s.ElementsMatch([]int64{100, 200, 300}, quotaCenter.readableCollections)
nodes := lo.Keys(quotaCenter.queryNodeMetrics)
s.ElementsMatch([]int64{1, 2}, nodes)
})
s.Run("datacoord_cluster", func() {
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once()
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyQCMetricsRsp(),
}, nil).Once()
metrics := &metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
{BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 1}, QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 1, CollectionIDs: []int64{100, 200}}}},
{BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2}, QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}}}},
},
},
}
resp, err := metricsinfo.MarshalTopology(metrics)
s.Require().NoError(err)
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: resp,
}, nil).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
s.Require().NoError(err)
s.ElementsMatch([]int64{100, 200, 300}, quotaCenter.writableCollections)
nodes := lo.Keys(quotaCenter.dataNodeMetrics)
s.ElementsMatch([]int64{1, 2}, nodes)
})
}
func (s *QuotaCenterSuite) TestSyncMetricsFailure() {
pcm := s.pcm
dc := s.dc
qc := s.qc
meta := s.meta
core := s.core
s.Run("querycoord_failure", func() {
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyDCMetricsRsp(),
}, nil).Once()
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err := quotaCenter.syncMetrics()
s.Error(err)
})
s.Run("querycoord_bad_response", func() {
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyDCMetricsRsp(),
}, nil).Once()
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: "abc",
}, nil).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err := quotaCenter.syncMetrics()
s.Error(err)
})
s.Run("datacoord_failure", func() {
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once()
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyQCMetricsRsp(),
}, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("mocked")).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err := quotaCenter.syncMetrics()
s.Error(err)
})
s.Run("datacoord_bad_response", func() {
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once()
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyQCMetricsRsp(),
}, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: "abc",
}, nil).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err := quotaCenter.syncMetrics()
s.Error(err)
})
s.Run("proxy_manager_return_failure", func() {
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyQCMetricsRsp(),
}, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyDCMetricsRsp(),
}, nil).Once()
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, errors.New("mocked")).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err := quotaCenter.syncMetrics()
s.Error(err)
})
s.Run("proxy_manager_bad_response", func() {
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyQCMetricsRsp(),
}, nil).Once()
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: s.getEmptyDCMetricsRsp(),
}, nil).Once()
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return([]*milvuspb.GetMetricsResponse{
{
Status: merr.Status(nil),
Response: "abc",
},
}, nil).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err := quotaCenter.syncMetrics()
s.Error(err)
})
}
func (s *QuotaCenterSuite) TestNodeOffline() {
pcm := s.pcm
dc := s.dc
qc := s.qc
meta := s.meta
core := s.core
metrics.RootCoordTtDelay.Reset()
Params.Save(Params.QuotaConfig.TtProtectionEnabled.Key, "true")
defer Params.Reset(Params.QuotaConfig.TtProtectionEnabled.Key)
// proxy
pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil)
// qc first time
qcMetrics := &metricsinfo.QueryCoordTopology{
Cluster: metricsinfo.QueryClusterTopology{
ConnectedNodes: []metricsinfo.QueryNodeInfos{
{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 1},
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"},
Effect: metricsinfo.NodeEffect{
NodeID: 1, CollectionIDs: []int64{100, 200},
},
},
},
{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2},
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"},
Effect: metricsinfo.NodeEffect{
NodeID: 2, CollectionIDs: []int64{100, 200},
},
},
},
},
},
}
resp, err := metricsinfo.MarshalTopology(qcMetrics)
s.Require().NoError(err)
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: resp,
}, nil).Once()
// dc first time
dcMetrics := &metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 3},
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"},
Effect: metricsinfo.NodeEffect{NodeID: 3, CollectionIDs: []int64{100, 200}},
},
},
{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 4},
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"},
Effect: metricsinfo.NodeEffect{NodeID: 4, CollectionIDs: []int64{200, 300}},
},
},
},
},
}
resp, err = metricsinfo.MarshalTopology(dcMetrics)
s.Require().NoError(err)
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: resp,
}, nil).Once()
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
err = quotaCenter.syncMetrics()
s.Require().NoError(err)
quotaCenter.getTimeTickDelayFactor(tsoutil.ComposeTSByTime(time.Now(), 0))
s.CollectCntEqual(metrics.RootCoordTtDelay, 4)
// qc second time
qcMetrics = &metricsinfo.QueryCoordTopology{
Cluster: metricsinfo.QueryClusterTopology{
ConnectedNodes: []metricsinfo.QueryNodeInfos{
{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2},
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"},
Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}},
},
},
},
},
}
resp, err = metricsinfo.MarshalTopology(qcMetrics)
s.Require().NoError(err)
qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: resp,
}, nil).Once()
// dc second time
dcMetrics = &metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 4},
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"},
Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}},
},
},
},
},
}
resp, err = metricsinfo.MarshalTopology(dcMetrics)
s.Require().NoError(err)
dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: merr.Status(nil),
Response: resp,
}, nil).Once()
err = quotaCenter.syncMetrics()
s.Require().NoError(err)
quotaCenter.getTimeTickDelayFactor(tsoutil.ComposeTSByTime(time.Now(), 0))
s.CollectCntEqual(metrics.RootCoordTtDelay, 2)
}
func TestQuotaCenterSuite(t *testing.T) {
suite.Run(t, new(QuotaCenterSuite))
}

View File

@ -15,3 +15,8 @@ func (suite *PromMetricsSuite) MetricsEqual(c prometheus.Collector, expect float
value := testutil.ToFloat64(c)
return suite.Suite.Equal(expect, value, msgAndArgs...)
}
func (suite *PromMetricsSuite) CollectCntEqual(c prometheus.Collector, expect int, msgAndArgs ...any) bool {
cnt := testutil.CollectAndCount(c)
return suite.Suite.EqualValues(expect, cnt, msgAndArgs...)
}