mirror of https://github.com/milvus-io/milvus.git
Add test cases for metrics info of proxy (#7927)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/7939/head
parent
0b849d3089
commit
cf1d773259
|
@ -0,0 +1,176 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
type DataCoordMock struct {
|
||||
nodeID typeutil.UniqueID
|
||||
address string
|
||||
|
||||
state atomic.Value // internal.StateCode
|
||||
|
||||
getMetricsFunc getMetricsFuncType
|
||||
|
||||
statisticsChannel string
|
||||
timeTickChannel string
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) updateState(state internalpb.StateCode) {
|
||||
coord.state.Store(state)
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) getState() internalpb.StateCode {
|
||||
return coord.state.Load().(internalpb.StateCode)
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) healthy() bool {
|
||||
return coord.getState() == internalpb.StateCode_Healthy
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) Init() error {
|
||||
coord.updateState(internalpb.StateCode_Initializing)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) Start() error {
|
||||
defer coord.updateState(internalpb.StateCode_Healthy)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) Stop() error {
|
||||
defer coord.updateState(internalpb.StateCode_Abnormal)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{
|
||||
NodeID: coord.nodeID,
|
||||
Role: typeutil.DataCoordRole,
|
||||
StateCode: coord.getState(),
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: coord.statisticsChannel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: coord.timeTickChannel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (coord *DataCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
if !coord.healthy() {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "unhealthy",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
if coord.getMetricsFunc != nil {
|
||||
return coord.getMetricsFunc(ctx, req)
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "not implemented",
|
||||
},
|
||||
Response: "",
|
||||
ComponentName: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewDataCoordMock() *DataCoordMock {
|
||||
return &DataCoordMock{
|
||||
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
|
||||
address: funcutil.GenRandomStr(), // random address
|
||||
state: atomic.Value{},
|
||||
getMetricsFunc: nil,
|
||||
statisticsChannel: funcutil.GenRandomStr(),
|
||||
timeTickChannel: funcutil.GenRandomStr(),
|
||||
}
|
||||
}
|
|
@ -2210,7 +2210,7 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque
|
|||
log.Debug("failed to get system info metrics from cache, recompute instead",
|
||||
zap.Error(err))
|
||||
|
||||
metrics, err := getSystemInfoMetrics(ctx, req, node)
|
||||
metrics, err := getSystemInfoMetrics(ctx, req, node, node.session.Address)
|
||||
|
||||
log.Debug("Proxy.GetMetrics",
|
||||
zap.Int64("node_id", Params.ProxyID),
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
type IndexCoordMock struct {
|
||||
nodeID typeutil.UniqueID
|
||||
address string
|
||||
|
||||
state atomic.Value // internal.StateCode
|
||||
|
||||
getMetricsFunc getMetricsFuncType
|
||||
|
||||
statisticsChannel string
|
||||
timeTickChannel string
|
||||
|
||||
minioBucketName string
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) updateState(state internalpb.StateCode) {
|
||||
coord.state.Store(state)
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) getState() internalpb.StateCode {
|
||||
return coord.state.Load().(internalpb.StateCode)
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) healthy() bool {
|
||||
return coord.getState() == internalpb.StateCode_Healthy
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) Init() error {
|
||||
coord.updateState(internalpb.StateCode_Initializing)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) Start() error {
|
||||
defer coord.updateState(internalpb.StateCode_Healthy)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) Stop() error {
|
||||
defer coord.updateState(internalpb.StateCode_Abnormal)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{
|
||||
NodeID: coord.nodeID,
|
||||
Role: typeutil.IndexCoordRole,
|
||||
StateCode: coord.getState(),
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: coord.statisticsChannel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: coord.timeTickChannel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
|
||||
return &indexpb.BuildIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
IndexBuildID: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
|
||||
return &indexpb.GetIndexStatesResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
States: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
|
||||
return &indexpb.GetIndexFilePathsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
FilePaths: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *IndexCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
if !coord.healthy() {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "unhealthy",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
if coord.getMetricsFunc != nil {
|
||||
return coord.getMetricsFunc(ctx, req)
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "not implemented",
|
||||
},
|
||||
Response: "",
|
||||
ComponentName: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewIndexCoordMock() *IndexCoordMock {
|
||||
return &IndexCoordMock{
|
||||
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
|
||||
address: funcutil.GenRandomStr(), // TODO(dragondriver): random address
|
||||
statisticsChannel: funcutil.GenRandomStr(),
|
||||
timeTickChannel: funcutil.GenRandomStr(),
|
||||
minioBucketName: funcutil.GenRandomStr(),
|
||||
}
|
||||
}
|
|
@ -26,10 +26,13 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
)
|
||||
|
||||
type getMetricsFuncType func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
|
||||
|
||||
func getSystemInfoMetrics(
|
||||
ctx context.Context,
|
||||
request *milvuspb.GetMetricsRequest,
|
||||
node *Proxy,
|
||||
ip string,
|
||||
) (*milvuspb.GetMetricsResponse, error) {
|
||||
|
||||
var err error
|
||||
|
@ -51,7 +54,7 @@ func getSystemInfoMetrics(
|
|||
ErrorReason: "",
|
||||
Name: proxyRoleName,
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{
|
||||
IP: node.session.Address,
|
||||
IP: ip,
|
||||
CPUCoreCount: metricsinfo.GetCPUCoreCount(false),
|
||||
CPUCoreUsage: metricsinfo.GetCPUUsage(),
|
||||
Memory: metricsinfo.GetMemoryCount(),
|
||||
|
|
|
@ -0,0 +1,280 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
)
|
||||
|
||||
func TestProxy_metrics(t *testing.T) {
|
||||
var err error
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
rc := NewRootCoordMock()
|
||||
rc.Start()
|
||||
defer rc.Stop()
|
||||
|
||||
qc := NewQueryCoordMock()
|
||||
qc.Start()
|
||||
defer qc.Stop()
|
||||
|
||||
dc := NewDataCoordMock()
|
||||
dc.Start()
|
||||
defer dc.Stop()
|
||||
|
||||
ic := NewIndexCoordMock()
|
||||
ic.Start()
|
||||
defer ic.Stop()
|
||||
|
||||
proxy := &Proxy{
|
||||
rootCoord: rc,
|
||||
queryCoord: qc,
|
||||
dataCoord: dc,
|
||||
indexCoord: ic,
|
||||
}
|
||||
|
||||
rc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
|
||||
rootCoordTopology := metricsinfo.RootCoordTopology{
|
||||
Self: metricsinfo.RootCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id),
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{},
|
||||
SystemInfo: metricsinfo.DeployMetrics{},
|
||||
Type: typeutil.RootCoordRole,
|
||||
},
|
||||
SystemConfigurations: metricsinfo.RootCoordConfiguration{},
|
||||
},
|
||||
Connections: metricsinfo.ConnTopology{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id),
|
||||
ConnectedComponents: []metricsinfo.ConnectionInfo{
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id),
|
||||
TargetType: typeutil.IndexCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id),
|
||||
TargetType: typeutil.QueryCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id),
|
||||
TargetType: typeutil.DataCoordRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp, _ := metricsinfo.MarshalTopology(rootCoordTopology)
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id),
|
||||
}, nil
|
||||
}
|
||||
|
||||
qc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
|
||||
clusterTopology := metricsinfo.QueryClusterTopology{
|
||||
Self: metricsinfo.QueryCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id),
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{},
|
||||
SystemInfo: metricsinfo.DeployMetrics{},
|
||||
Type: typeutil.QueryCoordRole,
|
||||
},
|
||||
SystemConfigurations: metricsinfo.QueryCoordConfiguration{},
|
||||
},
|
||||
ConnectedNodes: make([]metricsinfo.QueryNodeInfos, 0),
|
||||
}
|
||||
|
||||
infos := metricsinfo.QueryNodeInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{},
|
||||
SystemConfigurations: metricsinfo.QueryNodeConfiguration{},
|
||||
}
|
||||
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos)
|
||||
|
||||
coordTopology := metricsinfo.QueryCoordTopology{
|
||||
Cluster: clusterTopology,
|
||||
Connections: metricsinfo.ConnTopology{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id),
|
||||
ConnectedComponents: []metricsinfo.ConnectionInfo{
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id),
|
||||
TargetType: typeutil.RootCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id),
|
||||
TargetType: typeutil.DataCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id),
|
||||
TargetType: typeutil.IndexCoordRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp, _ := metricsinfo.MarshalTopology(coordTopology)
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id),
|
||||
}, nil
|
||||
}
|
||||
|
||||
dc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
|
||||
clusterTopology := metricsinfo.DataClusterTopology{
|
||||
Self: metricsinfo.DataCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id),
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{},
|
||||
SystemInfo: metricsinfo.DeployMetrics{},
|
||||
Type: typeutil.DataCoordRole,
|
||||
},
|
||||
SystemConfigurations: metricsinfo.DataCoordConfiguration{},
|
||||
},
|
||||
ConnectedNodes: make([]metricsinfo.DataNodeInfos, 0),
|
||||
}
|
||||
|
||||
infos := metricsinfo.DataNodeInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{},
|
||||
SystemConfigurations: metricsinfo.DataNodeConfiguration{},
|
||||
}
|
||||
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos)
|
||||
|
||||
coordTopology := metricsinfo.DataCoordTopology{
|
||||
Cluster: clusterTopology,
|
||||
Connections: metricsinfo.ConnTopology{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id),
|
||||
ConnectedComponents: []metricsinfo.ConnectionInfo{
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id),
|
||||
TargetType: typeutil.RootCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id),
|
||||
TargetType: typeutil.QueryCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id),
|
||||
TargetType: typeutil.IndexCoordRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp, _ := metricsinfo.MarshalTopology(coordTopology)
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
ic.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
id := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
|
||||
clusterTopology := metricsinfo.IndexClusterTopology{
|
||||
Self: metricsinfo.IndexCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id),
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{},
|
||||
SystemInfo: metricsinfo.DeployMetrics{},
|
||||
Type: typeutil.IndexCoordRole,
|
||||
},
|
||||
SystemConfigurations: metricsinfo.IndexCoordConfiguration{},
|
||||
},
|
||||
ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0),
|
||||
}
|
||||
|
||||
infos := metricsinfo.IndexNodeInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{},
|
||||
SystemConfigurations: metricsinfo.IndexNodeConfiguration{},
|
||||
}
|
||||
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos)
|
||||
|
||||
coordTopology := metricsinfo.IndexCoordTopology{
|
||||
Cluster: clusterTopology,
|
||||
Connections: metricsinfo.ConnTopology{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id),
|
||||
ConnectedComponents: []metricsinfo.ConnectionInfo{
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, id),
|
||||
TargetType: typeutil.RootCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, id),
|
||||
TargetType: typeutil.QueryCoordRole,
|
||||
},
|
||||
{
|
||||
TargetName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, id),
|
||||
TargetType: typeutil.DataCoordRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp, _ := metricsinfo.MarshalTopology(coordTopology)
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, id),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
ip := funcutil.GenRandomStr()
|
||||
req, _ := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
||||
resp, err := getSystemInfoMetrics(ctx, req, proxy, ip)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, resp)
|
||||
|
||||
rc.getMetricsFunc = nil
|
||||
qc.getMetricsFunc = nil
|
||||
dc.getMetricsFunc = nil
|
||||
ic.getMetricsFunc = nil
|
||||
}
|
|
@ -37,6 +37,7 @@ type QueryCoordMock struct {
|
|||
colMtx sync.RWMutex
|
||||
|
||||
showCollectionsFunc queryCoordShowCollectionsFuncType
|
||||
getMetricsFunc getMetricsFuncType
|
||||
|
||||
statisticsChannel string
|
||||
timeTickChannel string
|
||||
|
@ -289,7 +290,19 @@ func (coord *QueryCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMe
|
|||
},
|
||||
}, nil
|
||||
}
|
||||
panic("implement me")
|
||||
|
||||
if coord.getMetricsFunc != nil {
|
||||
return coord.getMetricsFunc(ctx, req)
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "not implemented",
|
||||
},
|
||||
Response: "",
|
||||
ComponentName: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewQueryCoordMock(opts ...QueryCoordMockOption) *QueryCoordMock {
|
||||
|
|
|
@ -14,17 +14,12 @@ package proxy
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
@ -95,6 +90,7 @@ type RootCoordMock struct {
|
|||
|
||||
describeCollectionFunc describeCollectionFuncType
|
||||
showPartitionsFunc showPartitionsFuncType
|
||||
getMetricsFunc getMetricsFuncType
|
||||
|
||||
// TODO(dragondriver): index-related
|
||||
|
||||
|
@ -114,6 +110,10 @@ func (coord *RootCoordMock) getState() internalpb.StateCode {
|
|||
return coord.state.Load().(internalpb.StateCode)
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) healthy() bool {
|
||||
return coord.getState() == internalpb.StateCode_Healthy
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) Init() error {
|
||||
coord.updateState(internalpb.StateCode_Initializing)
|
||||
return nil
|
||||
|
@ -805,70 +805,26 @@ func (coord *RootCoordMock) SegmentFlushCompleted(ctx context.Context, in *datap
|
|||
}
|
||||
|
||||
func (coord *RootCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
code := coord.state.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
if !coord.healthy() {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "unhealthy",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
if coord.getMetricsFunc != nil {
|
||||
return coord.getMetricsFunc(ctx, req)
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "failed",
|
||||
Reason: "not implemented",
|
||||
},
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
rootCoordTopology := metricsinfo.RootCoordTopology{
|
||||
Self: metricsinfo.RootCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{
|
||||
IP: coord.address,
|
||||
CPUCoreCount: metricsinfo.GetCPUCoreCount(false),
|
||||
CPUCoreUsage: metricsinfo.GetCPUUsage(),
|
||||
Memory: metricsinfo.GetMemoryCount(),
|
||||
MemoryUsage: metricsinfo.GetUsedMemoryCount(),
|
||||
Disk: metricsinfo.GetDiskCount(),
|
||||
DiskUsage: metricsinfo.GetDiskUsage(),
|
||||
},
|
||||
SystemInfo: metricsinfo.DeployMetrics{
|
||||
SystemVersion: os.Getenv(metricsinfo.GitCommitEnvKey),
|
||||
DeployMode: os.Getenv(metricsinfo.DeployModeEnvKey),
|
||||
},
|
||||
// TODO(dragondriver): CreatedTime & UpdatedTime, easy but time-costing
|
||||
Type: typeutil.RootCoordRole,
|
||||
},
|
||||
SystemConfigurations: metricsinfo.RootCoordConfiguration{
|
||||
MinSegmentSizeToEnableIndex: 100,
|
||||
},
|
||||
},
|
||||
Connections: metricsinfo.ConnTopology{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := metricsinfo.MarshalTopology(rootCoordTopology)
|
||||
if err != nil {
|
||||
log.Warn("Failed to marshal system info metrics of root coordinator",
|
||||
zap.Error(err))
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
Response: "",
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
ComponentName: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue