enhance: Remove the storage info report (#31772)

issue: #30436
origin pr: #30438

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/31800/head
SimFG 2024-04-02 11:50:59 +08:00 committed by GitHub
parent 5d97693bcd
commit ac26908cc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 9 additions and 451 deletions

View File

@ -139,31 +139,6 @@ func (s *Server) getSystemInfoMetrics(
return resp, nil
}
func (s *Server) getCollectionStorageMetrics(ctx context.Context) (*milvuspb.GetMetricsResponse, error) {
coordTopology := metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
Self: s.getDataCoordMetrics(ctx),
},
Connections: metricsinfo.ConnTopology{
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
ConnectedComponents: []metricsinfo.ConnectionInfo{},
},
}
resp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
}
var err error
resp.Response, err = metricsinfo.MarshalTopology(coordTopology)
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
return resp, nil
}
// getDataCoordMetrics composes datacoord infos
func (s *Server) getDataCoordMetrics(ctx context.Context) metricsinfo.DataCoordInfos {
ret := metricsinfo.DataCoordInfos{

View File

@ -34,7 +34,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -3837,67 +3836,3 @@ func TestUpdateAutoBalanceConfigLoop(t *testing.T) {
wg.Wait()
})
}
func TestGetCollectionStorage(t *testing.T) {
paramtable.Init()
mockSession := sessionutil.NewMockSession(t)
mockSession.EXPECT().GetAddress().Return("localhost:8888")
size := atomic.NewInt64(100)
s := &Server{
session: mockSession,
meta: &meta{
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Growing,
CollectionID: 10001,
PartitionID: 10000,
NumOfRows: 10,
},
size: *size,
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
State: commonpb.SegmentState_Dropped,
CollectionID: 10001,
PartitionID: 10000,
NumOfRows: 10,
},
size: *size,
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
State: commonpb.SegmentState_Flushed,
CollectionID: 10002,
PartitionID: 9999,
NumOfRows: 10,
},
size: *size,
},
},
},
},
}
s.stateCode.Store(commonpb.StateCode_Healthy)
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.CollectionStorageMetrics)
assert.NoError(t, err)
resp, err := s.GetMetrics(context.TODO(), req)
assert.NoError(t, err)
var coordTopology metricsinfo.DataCoordTopology
err = metricsinfo.UnmarshalTopology(resp.Response, &coordTopology)
assert.NoError(t, err)
m := coordTopology.Cluster.Self.QuotaMetrics
assert.NotNil(t, m)
assert.Equal(t, int64(200), m.TotalBinlogSize)
assert.Len(t, m.CollectionBinlogSize, 2)
assert.Equal(t, int64(100), m.CollectionBinlogSize[10001])
assert.Equal(t, int64(100), m.CollectionBinlogSize[10002])
}

View File

@ -1030,23 +1030,6 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
return metrics, nil
} else if metricType == metricsinfo.CollectionStorageMetrics {
metrics, err := s.getCollectionStorageMetrics(ctx)
if err != nil {
log.Warn("DataCoord GetMetrics CollectionStorage failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
}, nil
}
log.RatedDebug(60, "DataCoord.GetMetrics CollectionStorage",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.String("metricType", metricType),
zap.Any("metrics", metrics),
zap.Error(err))
return metrics, nil
}

View File

@ -26,7 +26,6 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -48,7 +47,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
@ -426,7 +424,6 @@ func (node *Proxy) Start() error {
hookutil.OpTypeKey: hookutil.OpTypeNodeID,
hookutil.NodeIDKey: paramtable.GetNodeID(),
})
node.startReportCollectionStorage()
log.Debug("update state code", zap.String("role", typeutil.ProxyRole), zap.String("State", commonpb.StateCode_Healthy.String()))
node.UpdateStateCode(commonpb.StateCode_Healthy)
@ -550,87 +547,3 @@ func (node *Proxy) GetRateLimiter() (types.Limiter, error) {
}
return node.multiRateLimiter, nil
}
func (node *Proxy) startReportCollectionStorage() {
go func() {
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
for {
select {
case <-node.ctx.Done():
return
case <-tick.C:
_ = node.reportCollectionStorage()
}
}
}()
}
func (node *Proxy) reportCollectionStorage() error {
if node.dataCoord == nil {
return errors.New("nil datacoord")
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.CollectionStorageMetrics)
if err != nil {
return err
}
rsp, err := node.dataCoord.GetMetrics(node.ctx, req)
if err = merr.CheckRPCCall(rsp, err); err != nil {
log.Warn("failed to get metrics", zap.Error(err))
return err
}
dataCoordTopology := &metricsinfo.DataCoordTopology{}
err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology)
if err != nil {
log.Warn("failed to unmarshal topology", zap.Error(err))
return err
}
quotaMetric := dataCoordTopology.Cluster.Self.QuotaMetrics
if quotaMetric == nil {
log.Warn("quota metric is nil")
return errors.New("quota metric is nil")
}
ctx, cancelFunc := context.WithTimeout(node.ctx, 5*time.Second)
defer cancelFunc()
ids := lo.Keys(quotaMetric.CollectionBinlogSize)
dbNames, collectionNames, err := globalMetaCache.GetCollectionNamesByID(ctx, ids)
if err != nil {
log.Warn("failed to get collection names", zap.Error(err))
return err
}
if len(ids) != len(dbNames) || len(ids) != len(collectionNames) {
log.Warn("failed to get collection names",
zap.Int("len(ids)", len(ids)),
zap.Int("len(dbNames)", len(dbNames)),
zap.Int("len(collectionNames)", len(collectionNames)))
return errors.New("failed to get collection names")
}
nameInfos := make(map[typeutil.UniqueID]lo.Tuple2[string, string])
for i, k := range ids {
nameInfos[k] = lo.Tuple2[string, string]{A: dbNames[i], B: collectionNames[i]}
}
storeInfo := make(map[string]int64)
for collectionID, dataSize := range quotaMetric.CollectionBinlogSize {
nameTuple, ok := nameInfos[collectionID]
if !ok {
continue
}
storeInfo[nameTuple.A] += dataSize
}
if len(storeInfo) > 0 {
Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeStorage,
hookutil.StorageDetailKey: lo.MapValues(storeInfo, func(v int64, _ string) any { return v }),
})
}
return nil
}

View File

@ -60,7 +60,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -4805,221 +4804,6 @@ func TestUnhealthProxy_GetIndexStatistics(t *testing.T) {
})
}
func TestProxy_ReportCollectionStorage(t *testing.T) {
t.Run("nil datacoord", func(t *testing.T) {
proxy := &Proxy{}
err := proxy.reportCollectionStorage()
assert.Error(t, err)
})
t.Run("fail to get metric", func(t *testing.T) {
datacoord := mocks.NewMockDataCoordClient(t)
datacoord.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Code: 500,
},
}, nil).Once()
ctx := context.Background()
proxy := &Proxy{
ctx: ctx,
dataCoord: datacoord,
}
err := proxy.reportCollectionStorage()
assert.Error(t, err)
})
t.Run("fail to unmarshal metric", func(t *testing.T) {
datacoord := mocks.NewMockDataCoordClient(t)
datacoord.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Response: "invalid",
}, nil).Once()
ctx := context.Background()
proxy := &Proxy{
ctx: ctx,
dataCoord: datacoord,
}
err := proxy.reportCollectionStorage()
assert.Error(t, err)
})
t.Run("empty metric", func(t *testing.T) {
datacoord := mocks.NewMockDataCoordClient(t)
r, _ := json.Marshal(&metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
Self: metricsinfo.DataCoordInfos{},
},
})
datacoord.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Response: string(r),
ComponentName: "DataCoord",
}, nil).Once()
ctx := context.Background()
proxy := &Proxy{
ctx: ctx,
dataCoord: datacoord,
}
err := proxy.reportCollectionStorage()
assert.Error(t, err)
})
t.Run("fail to get cache", func(t *testing.T) {
origin := globalMetaCache
defer func() {
globalMetaCache = origin
}()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
datacoord := mocks.NewMockDataCoordClient(t)
r, _ := json.Marshal(&metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
Self: metricsinfo.DataCoordInfos{
QuotaMetrics: &metricsinfo.DataCoordQuotaMetrics{
TotalBinlogSize: 200,
CollectionBinlogSize: map[int64]int64{
1: 100,
2: 50,
3: 50,
},
},
},
},
})
datacoord.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Response: string(r),
ComponentName: "DataCoord",
}, nil).Once()
mockCache.EXPECT().GetCollectionNamesByID(mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock get collection names by id error")).Once()
ctx := context.Background()
proxy := &Proxy{
ctx: ctx,
dataCoord: datacoord,
}
err := proxy.reportCollectionStorage()
assert.Error(t, err)
})
t.Run("not match data", func(t *testing.T) {
origin := globalMetaCache
defer func() {
globalMetaCache = origin
}()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
datacoord := mocks.NewMockDataCoordClient(t)
r, _ := json.Marshal(&metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
Self: metricsinfo.DataCoordInfos{
QuotaMetrics: &metricsinfo.DataCoordQuotaMetrics{
TotalBinlogSize: 200,
CollectionBinlogSize: map[int64]int64{
1: 100,
2: 50,
3: 50,
},
},
},
},
})
datacoord.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Response: string(r),
ComponentName: "DataCoord",
}, nil).Once()
mockCache.EXPECT().GetCollectionNamesByID(mock.Anything, mock.Anything).Return(
[]string{"db1", "db1"}, []string{"col1", "col2"}, nil).Once()
ctx := context.Background()
proxy := &Proxy{
ctx: ctx,
dataCoord: datacoord,
}
err := proxy.reportCollectionStorage()
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
origin := globalMetaCache
defer func() {
globalMetaCache = origin
}()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
datacoord := mocks.NewMockDataCoordClient(t)
r, _ := json.Marshal(&metricsinfo.DataCoordTopology{
Cluster: metricsinfo.DataClusterTopology{
Self: metricsinfo.DataCoordInfos{
QuotaMetrics: &metricsinfo.DataCoordQuotaMetrics{
TotalBinlogSize: 200,
CollectionBinlogSize: map[int64]int64{
1: 100,
2: 50,
3: 50,
},
},
},
},
})
datacoord.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Response: string(r),
ComponentName: "DataCoord",
}, nil).Once()
mockCache.EXPECT().GetCollectionNamesByID(mock.Anything, mock.Anything).Return(
[]string{"db1", "db1", "db2"}, []string{"col1", "col2", "col3"}, nil).Once()
originExtension := Extension
defer func() {
Extension = originExtension
}()
hasCheck := false
Extension = CheckExtension{
reportChecker: func(info any) {
infoMap := info.(map[string]any)
storage := infoMap[hookutil.StorageDetailKey].(map[string]any)
log.Info("storage map", zap.Any("storage", storage))
assert.EqualValues(t, 150, storage["db1"])
assert.EqualValues(t, 50, storage["db2"])
hasCheck = true
},
}
ctx := context.Background()
proxy := &Proxy{
ctx: ctx,
dataCoord: datacoord,
}
err := proxy.reportCollectionStorage()
assert.NoError(t, err)
assert.True(t, hasCheck)
})
}
type CheckExtension struct {
reportChecker func(info any)
}

View File

@ -21,16 +21,15 @@ package hookutil
var (
// WARN: Please DO NOT modify all constants.
OpTypeKey = "op_type"
DatabaseKey = "database"
UsernameKey = "username"
DataSizeKey = "data_size"
SuccessCntKey = "success_cnt"
FailCntKey = "fail_cnt"
RelatedCntKey = "related_cnt"
StorageDetailKey = "storage_detail"
NodeIDKey = "id"
DimensionKey = "dim"
OpTypeKey = "op_type"
DatabaseKey = "database"
UsernameKey = "username"
DataSizeKey = "data_size"
SuccessCntKey = "success_cnt"
FailCntKey = "fail_cnt"
RelatedCntKey = "related_cnt"
NodeIDKey = "id"
DimensionKey = "dim"
OpTypeInsert = "insert"
OpTypeDelete = "delete"
@ -38,6 +37,5 @@ var (
OpTypeQuery = "query"
OpTypeSearch = "search"
OpTypeHybridSearch = "hybrid_search"
OpTypeStorage = "storage"
OpTypeNodeID = "node_id"
)

View File

@ -90,9 +90,6 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("insert report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeInsert, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
return
@ -179,9 +176,6 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
@ -205,9 +199,6 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("query report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
@ -239,9 +230,6 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("delete report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeDelete, reportInfo[hookutil.OpTypeKey])
s.EqualValues(2, reportInfo[hookutil.SuccessCntKey])
s.EqualValues(0, reportInfo[hookutil.RelatedCntKey])

View File

@ -197,9 +197,6 @@ func (s *PartitionKeySuite) TestPartitionKey() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.EqualValues(rowNum*3, reportInfo[hookutil.RelatedCntKey])
@ -239,9 +236,6 @@ func (s *PartitionKeySuite) TestPartitionKey() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
@ -272,9 +266,6 @@ func (s *PartitionKeySuite) TestPartitionKey() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("query report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.EqualValues(3*rowNum, reportInfo[hookutil.RelatedCntKey])
@ -309,9 +300,6 @@ func (s *PartitionKeySuite) TestPartitionKey() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("query report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
@ -346,9 +334,6 @@ func (s *PartitionKeySuite) TestPartitionKey() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("delete report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeDelete, reportInfo[hookutil.OpTypeKey])
s.EqualValues(rowNum, reportInfo[hookutil.SuccessCntKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
@ -382,9 +367,6 @@ func (s *PartitionKeySuite) TestPartitionKey() {
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("delete report info", zap.Any("reportInfo", reportInfo))
if reportInfo[hookutil.OpTypeKey] == hookutil.OpTypeStorage {
continue
}
s.Equal(hookutil.OpTypeDelete, reportInfo[hookutil.OpTypeKey])
s.EqualValues(rowNum, reportInfo[hookutil.SuccessCntKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])