enhance: add disk stats within system metrics (#38033)

issue: ##36621

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/38238/head
jaime 2024-12-06 16:32:41 +08:00 committed by GitHub
parent 60dd55f292
commit 8ed019735c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 202 additions and 107 deletions

View File

@ -261,17 +261,28 @@ func (s *Server) getSystemInfoMetrics(
// getDataCoordMetrics composes datacoord infos
func (s *Server) getDataCoordMetrics(ctx context.Context) metricsinfo.DataCoordInfos {
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
ret := metricsinfo.DataCoordInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: s.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
IP: s.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: total,
DiskUsage: used,
IOWaitPercentage: ioWait,
},
SystemInfo: metricsinfo.DeployMetrics{},
CreatedTime: paramtable.GetCreateTime().String(),

View File

@ -19,8 +19,11 @@ package datanode
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -65,7 +68,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
}, nil
}
func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMetricsRequest) (string, error) {
func (node *DataNode) getSystemInfoMetrics(ctx context.Context, _ *milvuspb.GetMetricsRequest) (string, error) {
// TODO(dragondriver): add more metrics
usedMem := hardware.GetUsedMemoryCount()
totalMem := hardware.GetMemoryCount()
@ -74,14 +77,26 @@ func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMet
if err != nil {
return "", err
}
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
hardwareMetrics := metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: total,
DiskUsage: used,
IOWaitPercentage: ioWait,
}
quotaMetrics.Hms = hardwareMetrics

View File

@ -19,7 +19,10 @@ package indexnode
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -34,17 +37,30 @@ func getSystemInfoMetrics(
node *IndexNode,
) (*milvuspb.GetMetricsResponse, error) {
// TODO(dragondriver): add more metrics
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
iowait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
nodeInfos := metricsinfo.IndexNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, paramtable.GetNodeID()),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: total,
DiskUsage: used,
IOWaitPercentage: iowait,
},
SystemInfo: metricsinfo.DeployMetrics{},
CreatedTime: paramtable.GetCreateTime().String(),

View File

@ -20,8 +20,11 @@ import (
"context"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -82,25 +85,54 @@ func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) {
// getProxyMetrics get metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func getProxyMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest, node *Proxy) (*milvuspb.GetMetricsResponse, error) {
totalMem := hardware.GetMemoryCount()
usedMem := hardware.GetUsedMemoryCount()
quotaMetrics, err := getQuotaMetrics()
if err != nil {
return nil, err
}
hardwareMetrics := metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
proxyMetricInfo := getProxyMetricInfo(ctx, node, quotaMetrics)
proxyRoleName := metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID())
resp, err := metricsinfo.MarshalComponentInfos(proxyMetricInfo)
if err != nil {
return nil, err
}
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: proxyRoleName,
}, nil
}
func getProxyMetricInfo(ctx context.Context, node *Proxy, quotaMetrics *metricsinfo.ProxyQuotaMetrics) *metricsinfo.ProxyInfos {
totalMem := hardware.GetMemoryCount()
usedMem := hardware.GetUsedMemoryCount()
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
hardwareMetrics := metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: total,
DiskUsage: used,
IOWaitPercentage: ioWait,
}
if quotaMetrics != nil {
quotaMetrics.Hms = hardwareMetrics
}
quotaMetrics.Hms = hardwareMetrics
proxyRoleName := metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID())
proxyMetricInfo := metricsinfo.ProxyInfos{
return &metricsinfo.ProxyInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
HasError: false,
Name: proxyRoleName,
@ -117,17 +149,6 @@ func getProxyMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest, n
},
QuotaMetrics: quotaMetrics,
}
resp, err := metricsinfo.MarshalComponentInfos(proxyMetricInfo)
if err != nil {
return nil, err
}
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID()),
}, nil
}
// getSystemInfoMetrics returns the system information metrics.
@ -146,34 +167,12 @@ func getSystemInfoMetrics(
proxyRoleName := metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID())
identifierMap[proxyRoleName] = int(node.session.ServerID)
// FIXME:All proxy metrics are retrieved from RootCoord, while single proxy metrics are obtained from the proxy itself.
proxyInfo := getProxyMetricInfo(ctx, node, nil)
proxyTopologyNode := metricsinfo.SystemTopologyNode{
Identifier: int(node.session.ServerID),
Connected: make([]metricsinfo.ConnectionEdge, 0),
Infos: &metricsinfo.ProxyInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
HasError: false,
ErrorReason: "",
Name: proxyRoleName,
HardwareInfos: metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
},
SystemInfo: metricsinfo.DeployMetrics{},
CreatedTime: paramtable.GetCreateTime().String(),
UpdatedTime: paramtable.GetUpdateTime().String(),
Type: typeutil.ProxyRole,
ID: node.session.ServerID,
},
SystemConfigurations: metricsinfo.ProxyConfiguration{
DefaultPartitionName: Params.CommonCfg.DefaultPartitionName.GetValue(),
DefaultIndexName: Params.CommonCfg.DefaultIndexName.GetValue(),
},
},
Infos: proxyInfo,
}
metricsinfo.FillDeployMetricsWithEnv(&(proxyTopologyNode.Infos.(*metricsinfo.ProxyInfos).SystemInfo))

View File

@ -328,18 +328,29 @@ func (s *Server) getSystemInfoMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (string, error) {
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
clusterTopology := metricsinfo.QueryClusterTopology{
Self: metricsinfo.QueryCoordInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, paramtable.GetNodeID()),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: s.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
IP: s.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: total,
DiskUsage: used,
IOWaitPercentage: ioWait,
},
SystemInfo: metricsinfo.DeployMetrics{},
CreatedTime: paramtable.GetCreateTime().String(),

View File

@ -230,14 +230,26 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
if err != nil {
return "", err
}
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
hardwareInfos := metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: total,
DiskUsage: used,
IOWaitPercentage: ioWait,
}
quotaMetrics.Hms = hardwareInfos

View File

@ -19,7 +19,10 @@ package rootcoord
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -27,18 +30,29 @@ import (
)
func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
rootCoordTopology := metricsinfo.RootCoordTopology{
Self: metricsinfo.RootCoordInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID),
HardwareInfos: metricsinfo.HardwareMetrics{
IP: c.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: hardware.GetDiskCount(),
DiskUsage: hardware.GetDiskUsage(),
IP: c.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: hardware.GetMemoryCount(),
MemoryUsage: hardware.GetUsedMemoryCount(),
Disk: total,
DiskUsage: used,
IOWaitPercentage: ioWait,
},
SystemInfo: metricsinfo.DeployMetrics{},
CreatedTime: paramtable.GetCreateTime().String(),

View File

@ -18,6 +18,7 @@ import (
"sync"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
@ -102,16 +103,27 @@ func GetFreeMemoryCount() uint64 {
return GetMemoryCount() - GetUsedMemoryCount()
}
// TODO(dragondriver): not accurate to calculate disk usage when we use distributed storage
// GetDiskCount returns the disk count in bytes.
func GetDiskCount() uint64 {
return 100 * 1024 * 1024
// GetDiskUsage Get Disk Usage in GB
func GetDiskUsage(path string) (float64, float64, error) {
diskStats, err := disk.Usage(path)
if err != nil {
return 0, 0, err
}
usedGB := float64(diskStats.Used) / 1e9
totalGB := float64(diskStats.Total) / 1e9
return usedGB, totalGB, nil
}
// GetDiskUsage returns the disk usage in bytes.
func GetDiskUsage() uint64 {
return 2 * 1024 * 1024
// GetIOWait Get IO Wait Percentage
func GetIOWait() (float64, error) {
cpuTimes, err := cpu.Times(false)
if err != nil {
return 0, err
}
if len(cpuTimes) > 0 {
return cpuTimes[0].Iowait, nil
}
return 0, nil
}
func GetMemoryUseRatio() float64 {

View File

@ -42,14 +42,17 @@ func Test_GetUsedMemoryCount(t *testing.T) {
zap.Uint64("UsedMemoryCount", GetUsedMemoryCount()))
}
func Test_GetDiskCount(t *testing.T) {
log.Info("TestGetDiskCount",
zap.Uint64("DiskCount", GetDiskCount()))
func TestGetDiskUsage(t *testing.T) {
used, total, err := GetDiskUsage("/")
assert.NoError(t, err)
assert.GreaterOrEqual(t, used, 0.0)
assert.GreaterOrEqual(t, total, 0.0)
}
func Test_GetDiskUsage(t *testing.T) {
log.Info("TestGetDiskUsage",
zap.Uint64("DiskUsage", GetDiskUsage()))
func TestGetIOWait(t *testing.T) {
iowait, err := GetIOWait()
assert.NoError(t, err)
assert.GreaterOrEqual(t, iowait, 0.0)
}
func Test_GetMemoryUsageRatio(t *testing.T) {

View File

@ -40,8 +40,10 @@ type HardwareMetrics struct {
MemoryUsage uint64 `json:"memory_usage"`
// how to metric disk & disk usage in distributed storage
Disk uint64 `json:"disk"`
DiskUsage uint64 `json:"disk_usage"`
Disk float64 `json:"disk,string"`
DiskUsage float64 `json:"disk_usage,string"`
IOWaitPercentage float64 `json:"io_wait_percentage,string"` // IO Wait in %
}
const (