From cf1d7732597e6b85d810b93b5ae4d3ccc975ab9c Mon Sep 17 00:00:00 2001 From: dragondriver Date: Wed, 15 Sep 2021 10:13:49 +0800 Subject: [PATCH] Add test cases for metrics info of proxy (#7927) Signed-off-by: dragondriver --- internal/proxy/data_coord_mock_test.go | 176 +++++++++++++++ internal/proxy/impl.go | 2 +- internal/proxy/index_coord_mock_test.go | 169 ++++++++++++++ internal/proxy/metrics_info.go | 5 +- internal/proxy/metrics_info_test.go | 280 ++++++++++++++++++++++++ internal/proxy/query_coord_mock_test.go | 15 +- internal/proxy/rootcoord_mock_test.go | 70 ++---- 7 files changed, 657 insertions(+), 60 deletions(-) create mode 100644 internal/proxy/data_coord_mock_test.go create mode 100644 internal/proxy/index_coord_mock_test.go create mode 100644 internal/proxy/metrics_info_test.go diff --git a/internal/proxy/data_coord_mock_test.go b/internal/proxy/data_coord_mock_test.go new file mode 100644 index 0000000000..f094cd567c --- /dev/null +++ b/internal/proxy/data_coord_mock_test.go @@ -0,0 +1,176 @@ +package proxy + +import ( + "context" + "sync/atomic" + + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/uniquegenerator" + + "github.com/milvus-io/milvus/internal/proto/datapb" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type DataCoordMock struct { + nodeID typeutil.UniqueID + address string + + state atomic.Value // internal.StateCode + + getMetricsFunc getMetricsFuncType + + statisticsChannel string + timeTickChannel string +} + +func (coord *DataCoordMock) updateState(state internalpb.StateCode) { + coord.state.Store(state) +} + +func (coord *DataCoordMock) getState() internalpb.StateCode { + return coord.state.Load().(internalpb.StateCode) +} + +func (coord *DataCoordMock) healthy() bool { + return coord.getState() == internalpb.StateCode_Healthy +} + +func (coord *DataCoordMock) Init() error { + coord.updateState(internalpb.StateCode_Initializing) + return nil +} + +func (coord *DataCoordMock) Start() error { + defer coord.updateState(internalpb.StateCode_Healthy) + + return nil +} + +func (coord *DataCoordMock) Stop() error { + defer coord.updateState(internalpb.StateCode_Abnormal) + + return nil +} + +func (coord *DataCoordMock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + NodeID: coord.nodeID, + Role: typeutil.DataCoordRole, + StateCode: coord.getState(), + ExtraInfo: nil, + }, + SubcomponentStates: nil, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + }, nil +} + +func (coord *DataCoordMock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Value: coord.statisticsChannel, + }, nil +} + +func (coord *DataCoordMock) Register() error { + return nil +} + +func (coord *DataCoordMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Value: coord.timeTickChannel, + }, nil +} + +func (coord *DataCoordMock) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { + panic("implement me") +} + +func (coord *DataCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + if !coord.healthy() { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "unhealthy", + }, + }, nil + } + + if coord.getMetricsFunc != nil { + return coord.getMetricsFunc(ctx, req) + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "not implemented", + }, + Response: "", + ComponentName: "", + }, nil +} + +func NewDataCoordMock() *DataCoordMock { + return &DataCoordMock{ + nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + address: funcutil.GenRandomStr(), // random address + state: atomic.Value{}, + getMetricsFunc: nil, + statisticsChannel: funcutil.GenRandomStr(), + timeTickChannel: funcutil.GenRandomStr(), + } +} diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index bee7e618a0..a812667060 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2210,7 +2210,7 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque log.Debug("failed to get system info metrics from cache, recompute instead", zap.Error(err)) - metrics, err := getSystemInfoMetrics(ctx, req, node) + metrics, err := getSystemInfoMetrics(ctx, req, node, node.session.Address) log.Debug("Proxy.GetMetrics", zap.Int64("node_id", Params.ProxyID), diff --git a/internal/proxy/index_coord_mock_test.go b/internal/proxy/index_coord_mock_test.go new file mode 100644 index 0000000000..21d45d0b14 --- /dev/null +++ b/internal/proxy/index_coord_mock_test.go @@ -0,0 +1,169 @@ +package proxy + +import ( + "context" + "sync/atomic" + + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/uniquegenerator" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type IndexCoordMock struct { + nodeID typeutil.UniqueID + address string + + state atomic.Value // internal.StateCode + + getMetricsFunc getMetricsFuncType + + statisticsChannel string + timeTickChannel string + + minioBucketName string +} + +func (coord *IndexCoordMock) updateState(state internalpb.StateCode) { + coord.state.Store(state) +} + +func (coord *IndexCoordMock) getState() internalpb.StateCode { + return coord.state.Load().(internalpb.StateCode) +} + +func (coord *IndexCoordMock) healthy() bool { + return coord.getState() == internalpb.StateCode_Healthy +} + +func (coord *IndexCoordMock) Init() error { + coord.updateState(internalpb.StateCode_Initializing) + return nil +} + +func (coord *IndexCoordMock) Start() error { + defer coord.updateState(internalpb.StateCode_Healthy) + + return nil +} + +func (coord *IndexCoordMock) Stop() error { + defer coord.updateState(internalpb.StateCode_Abnormal) + + return nil +} + +func (coord *IndexCoordMock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{ + NodeID: coord.nodeID, + Role: typeutil.IndexCoordRole, + StateCode: coord.getState(), + ExtraInfo: nil, + }, + SubcomponentStates: nil, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + }, nil +} + +func (coord *IndexCoordMock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Value: coord.statisticsChannel, + }, nil +} + +func (coord *IndexCoordMock) Register() error { + return nil +} + +func (coord *IndexCoordMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Value: coord.timeTickChannel, + }, nil +} + +func (coord *IndexCoordMock) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { + return &indexpb.BuildIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + IndexBuildID: 0, + }, nil +} + +func (coord *IndexCoordMock) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, nil +} + +func (coord *IndexCoordMock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) { + return &indexpb.GetIndexStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + States: nil, + }, nil +} + +func (coord *IndexCoordMock) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { + return &indexpb.GetIndexFilePathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + FilePaths: nil, + }, nil +} + +func (coord *IndexCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + if !coord.healthy() { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "unhealthy", + }, + }, nil + } + + if coord.getMetricsFunc != nil { + return coord.getMetricsFunc(ctx, req) + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "not implemented", + }, + Response: "", + ComponentName: "", + }, nil +} + +func NewIndexCoordMock() *IndexCoordMock { + return &IndexCoordMock{ + nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + address: funcutil.GenRandomStr(), // TODO(dragondriver): random address + statisticsChannel: funcutil.GenRandomStr(), + timeTickChannel: funcutil.GenRandomStr(), + minioBucketName: funcutil.GenRandomStr(), + } +} diff --git a/internal/proxy/metrics_info.go b/internal/proxy/metrics_info.go index fd71cfc9b8..6a4ff38df7 100644 --- a/internal/proxy/metrics_info.go +++ b/internal/proxy/metrics_info.go @@ -26,10 +26,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" ) +type getMetricsFuncType func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) + func getSystemInfoMetrics( ctx context.Context, request *milvuspb.GetMetricsRequest, node *Proxy, + ip string, ) (*milvuspb.GetMetricsResponse, error) { var err error @@ -51,7 +54,7 @@ func getSystemInfoMetrics( ErrorReason: "", Name: proxyRoleName, HardwareInfos: metricsinfo.HardwareMetrics{ - IP: node.session.Address, + IP: ip, CPUCoreCount: metricsinfo.GetCPUCoreCount(false), CPUCoreUsage: metricsinfo.GetCPUUsage(), Memory: metricsinfo.GetMemoryCount(), diff --git a/internal/proxy/metrics_info_test.go b/internal/proxy/metrics_info_test.go new file mode 100644 index 0000000000..87a70bab9e --- /dev/null +++ b/internal/proxy/metrics_info_test.go @@ -0,0 +1,280 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package proxy + +import ( + "context" + "testing" + + "github.com/milvus-io/milvus/internal/util/uniquegenerator" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/util/metricsinfo" + + "github.com/milvus-io/milvus/internal/util/funcutil" +) + +func TestProxy_metrics(t *testing.T) { + var err error + + ctx := context.Background() + + rc := NewRootCoordMock() + rc.Start() + defer rc.Stop() + + qc := NewQueryCoordMock() + qc.Start() + defer qc.Stop() + + dc := NewDataCoordMock() + dc.Start() + defer dc.Stop() + + ic := NewIndexCoordMock() + ic.Start() + defer ic.Stop() + + proxy := &Proxy{ + rootCoord: rc, + queryCoord: qc, + dataCoord: dc, + indexCoord: ic, + } + + rc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + + rootCoordTopology := metricsinfo.RootCoordTopology{ + Self: metricsinfo.RootCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id), + HardwareInfos: metricsinfo.HardwareMetrics{}, + SystemInfo: metricsinfo.DeployMetrics{}, + Type: typeutil.RootCoordRole, + }, + SystemConfigurations: metricsinfo.RootCoordConfiguration{}, + }, + Connections: metricsinfo.ConnTopology{ + Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id), + ConnectedComponents: []metricsinfo.ConnectionInfo{ + { + TargetName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id), + TargetType: typeutil.IndexCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id), + TargetType: typeutil.QueryCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id), + TargetType: typeutil.DataCoordRole, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(rootCoordTopology) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id), + }, nil + } + + qc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + + clusterTopology := metricsinfo.QueryClusterTopology{ + Self: metricsinfo.QueryCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id), + HardwareInfos: metricsinfo.HardwareMetrics{}, + SystemInfo: metricsinfo.DeployMetrics{}, + Type: typeutil.QueryCoordRole, + }, + SystemConfigurations: metricsinfo.QueryCoordConfiguration{}, + }, + ConnectedNodes: make([]metricsinfo.QueryNodeInfos, 0), + } + + infos := metricsinfo.QueryNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{}, + SystemConfigurations: metricsinfo.QueryNodeConfiguration{}, + } + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos) + + coordTopology := metricsinfo.QueryCoordTopology{ + Cluster: clusterTopology, + Connections: metricsinfo.ConnTopology{ + Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id), + ConnectedComponents: []metricsinfo.ConnectionInfo{ + { + TargetName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id), + TargetType: typeutil.RootCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id), + TargetType: typeutil.DataCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id), + TargetType: typeutil.IndexCoordRole, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(coordTopology) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id), + }, nil + } + + dc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + + clusterTopology := metricsinfo.DataClusterTopology{ + Self: metricsinfo.DataCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id), + HardwareInfos: metricsinfo.HardwareMetrics{}, + SystemInfo: metricsinfo.DeployMetrics{}, + Type: typeutil.DataCoordRole, + }, + SystemConfigurations: metricsinfo.DataCoordConfiguration{}, + }, + ConnectedNodes: make([]metricsinfo.DataNodeInfos, 0), + } + + infos := metricsinfo.DataNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{}, + SystemConfigurations: metricsinfo.DataNodeConfiguration{}, + } + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos) + + coordTopology := metricsinfo.DataCoordTopology{ + Cluster: clusterTopology, + Connections: metricsinfo.ConnTopology{ + Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id), + ConnectedComponents: []metricsinfo.ConnectionInfo{ + { + TargetName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id), + TargetType: typeutil.RootCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id), + TargetType: typeutil.QueryCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id), + TargetType: typeutil.IndexCoordRole, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(coordTopology) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id), + }, nil + + } + + ic.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + + clusterTopology := metricsinfo.IndexClusterTopology{ + Self: metricsinfo.IndexCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id), + HardwareInfos: metricsinfo.HardwareMetrics{}, + SystemInfo: metricsinfo.DeployMetrics{}, + Type: typeutil.IndexCoordRole, + }, + SystemConfigurations: metricsinfo.IndexCoordConfiguration{}, + }, + ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0), + } + + infos := metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{}, + SystemConfigurations: metricsinfo.IndexNodeConfiguration{}, + } + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos) + + coordTopology := metricsinfo.IndexCoordTopology{ + Cluster: clusterTopology, + Connections: metricsinfo.ConnTopology{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id), + ConnectedComponents: []metricsinfo.ConnectionInfo{ + { + TargetName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id), + TargetType: typeutil.RootCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id), + TargetType: typeutil.QueryCoordRole, + }, + { + TargetName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id), + TargetType: typeutil.DataCoordRole, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(coordTopology) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id), + }, nil + + } + + ip := funcutil.GenRandomStr() + req, _ := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + resp, err := getSystemInfoMetrics(ctx, req, proxy, ip) + assert.NoError(t, err) + assert.NotNil(t, resp) + + rc.getMetricsFunc = nil + qc.getMetricsFunc = nil + dc.getMetricsFunc = nil + ic.getMetricsFunc = nil +} diff --git a/internal/proxy/query_coord_mock_test.go b/internal/proxy/query_coord_mock_test.go index e080953114..ed84b36f25 100644 --- a/internal/proxy/query_coord_mock_test.go +++ b/internal/proxy/query_coord_mock_test.go @@ -37,6 +37,7 @@ type QueryCoordMock struct { colMtx sync.RWMutex showCollectionsFunc queryCoordShowCollectionsFuncType + getMetricsFunc getMetricsFuncType statisticsChannel string timeTickChannel string @@ -289,7 +290,19 @@ func (coord *QueryCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMe }, }, nil } - panic("implement me") + + if coord.getMetricsFunc != nil { + return coord.getMetricsFunc(ctx, req) + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "not implemented", + }, + Response: "", + ComponentName: "", + }, nil } func NewQueryCoordMock(opts ...QueryCoordMockOption) *QueryCoordMock { diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index ef2373f3f2..586bbd03c9 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -14,17 +14,12 @@ package proxy import ( "context" "fmt" - "os" "sync" "sync/atomic" "time" "github.com/milvus-io/milvus/internal/common" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/metricsinfo" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/uniquegenerator" @@ -95,6 +90,7 @@ type RootCoordMock struct { describeCollectionFunc describeCollectionFuncType showPartitionsFunc showPartitionsFuncType + getMetricsFunc getMetricsFuncType // TODO(dragondriver): index-related @@ -114,6 +110,10 @@ func (coord *RootCoordMock) getState() internalpb.StateCode { return coord.state.Load().(internalpb.StateCode) } +func (coord *RootCoordMock) healthy() bool { + return coord.getState() == internalpb.StateCode_Healthy +} + func (coord *RootCoordMock) Init() error { coord.updateState(internalpb.StateCode_Initializing) return nil @@ -805,70 +805,26 @@ func (coord *RootCoordMock) SegmentFlushCompleted(ctx context.Context, in *datap } func (coord *RootCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - code := coord.state.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - + if !coord.healthy() { return &milvuspb.GetMetricsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "failed", + Reason: "unhealthy", }, - Response: "", }, nil } - rootCoordTopology := metricsinfo.RootCoordTopology{ - Self: metricsinfo.RootCoordInfos{ - BaseComponentInfos: metricsinfo.BaseComponentInfos{ - Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID), - HardwareInfos: metricsinfo.HardwareMetrics{ - IP: coord.address, - CPUCoreCount: metricsinfo.GetCPUCoreCount(false), - CPUCoreUsage: metricsinfo.GetCPUUsage(), - Memory: metricsinfo.GetMemoryCount(), - MemoryUsage: metricsinfo.GetUsedMemoryCount(), - Disk: metricsinfo.GetDiskCount(), - DiskUsage: metricsinfo.GetDiskUsage(), - }, - SystemInfo: metricsinfo.DeployMetrics{ - SystemVersion: os.Getenv(metricsinfo.GitCommitEnvKey), - DeployMode: os.Getenv(metricsinfo.DeployModeEnvKey), - }, - // TODO(dragondriver): CreatedTime & UpdatedTime, easy but time-costing - Type: typeutil.RootCoordRole, - }, - SystemConfigurations: metricsinfo.RootCoordConfiguration{ - MinSegmentSizeToEnableIndex: 100, - }, - }, - Connections: metricsinfo.ConnTopology{ - Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID), - // TODO(dragondriver): fill ConnectedComponents if necessary - ConnectedComponents: []metricsinfo.ConnectionInfo{}, - }, - } - resp, err := metricsinfo.MarshalTopology(rootCoordTopology) - if err != nil { - log.Warn("Failed to marshal system info metrics of root coordinator", - zap.Error(err)) - - return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - Response: "", - ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID), - }, nil + if coord.getMetricsFunc != nil { + return coord.getMetricsFunc(ctx, req) } return &milvuspb.GetMetricsResponse{ Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "not implemented", }, - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID), + Response: "", + ComponentName: "", }, nil }