Update memory util in metricinfo (#13320)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/13523/head
bigsheeper 2021-12-16 17:25:13 +08:00 committed by GitHub
parent e226db7b52
commit 5e5787ddd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 59 deletions

View File

@ -26,14 +26,8 @@ import (
) )
func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (*milvuspb.GetMetricsResponse, error) { func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (*milvuspb.GetMetricsResponse, error) {
usedMem, err := getUsedMemory() usedMem := metricsinfo.GetUsedMemoryCount()
if err != nil { totalMem := metricsinfo.GetMemoryCount()
return nil, err
}
totalMem, err := getTotalMemory()
if err != nil {
return nil, err
}
nodeInfos := metricsinfo.QueryNodeInfos{ nodeInfos := metricsinfo.QueryNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID), Name: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
@ -86,17 +80,3 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID), ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
}, nil }, nil
} }
func getUsedMemory() (uint64, error) {
if Params.InContainer {
return metricsinfo.GetContainerMemUsed()
}
return metricsinfo.GetUsedMemoryCount(), nil
}
func getTotalMemory() (uint64, error) {
if Params.InContainer {
return metricsinfo.GetContainerMemLimit()
}
return metricsinfo.GetMemoryCount(), nil
}

View File

@ -26,7 +26,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
) )
@ -45,7 +44,6 @@ type ParamTable struct {
QueryNodeID UniqueID QueryNodeID UniqueID
// TODO: remove cacheSize // TODO: remove cacheSize
CacheSize int64 // deprecated CacheSize int64 // deprecated
InContainer bool
// channel prefix // channel prefix
ClusterChannelPrefix string ClusterChannelPrefix string
@ -118,7 +116,6 @@ func (p *ParamTable) Init() {
p.BaseTable.Init() p.BaseTable.Init()
p.initCacheSize() p.initCacheSize()
p.initInContainer()
p.initMinioEndPoint() p.initMinioEndPoint()
p.initMinioAccessKeyID() p.initMinioAccessKeyID()
@ -178,15 +175,6 @@ func (p *ParamTable) initCacheSize() {
p.CacheSize = value p.CacheSize = value
} }
func (p *ParamTable) initInContainer() {
var err error
p.InContainer, err = metricsinfo.InContainer()
if err != nil {
panic(err)
}
log.Debug("init InContainer", zap.Any("is query node running inside a container? :", p.InContainer))
}
// ---------------------------------------------------------- minio // ---------------------------------------------------------- minio
func (p *ParamTable) initMinioEndPoint() { func (p *ParamTable) initMinioEndPoint() {
url, err := p.Load("_MinioAddress") url, err := p.Load("_MinioAddress")

View File

@ -41,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
) )
const timeoutForEachRead = 10 * time.Second const timeoutForEachRead = 10 * time.Second
@ -615,13 +616,11 @@ func (loader *segmentLoader) estimateSegmentSize(segment *Segment,
} }
func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentSizes map[UniqueID]int64) error { func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentSizes map[UniqueID]int64) error {
usedMem, err := getUsedMemory() usedMem := metricsinfo.GetUsedMemoryCount()
if err != nil { totalMem := metricsinfo.GetMemoryCount()
return err
} if usedMem == 0 || totalMem == 0 {
totalMem, err := getTotalMemory() return errors.New(fmt.Sprintln("get memory failed when checkSegmentSize, collectionID = ", collectionID))
if err != nil {
return err
} }
segmentTotalSize := int64(0) segmentTotalSize := int64(0)

View File

@ -18,8 +18,8 @@ import (
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
) )
// InContainer checks if the service is running inside a container. // inContainer checks if the service is running inside a container.
func InContainer() (bool, error) { func inContainer() (bool, error) {
paths, err := cgroups.ParseCgroupFile("/proc/1/cgroup") paths, err := cgroups.ParseCgroupFile("/proc/1/cgroup")
if err != nil { if err != nil {
return false, err return false, err
@ -28,8 +28,8 @@ func InContainer() (bool, error) {
return devicePath != "", nil return devicePath != "", nil
} }
// GetContainerMemLimit returns memory limit and error // getContainerMemLimit returns memory limit and error
func GetContainerMemLimit() (uint64, error) { func getContainerMemLimit() (uint64, error) {
control, err := cgroups.Load(cgroups.V1, cgroups.RootPath) control, err := cgroups.Load(cgroups.V1, cgroups.RootPath)
if err != nil { if err != nil {
return 0, err return 0, err
@ -44,8 +44,8 @@ func GetContainerMemLimit() (uint64, error) {
return stats.Memory.Usage.Limit, nil return stats.Memory.Usage.Limit, nil
} }
// GetContainerMemUsed returns memory usage and error // getContainerMemUsed returns memory usage and error
func GetContainerMemUsed() (uint64, error) { func getContainerMemUsed() (uint64, error) {
control, err := cgroups.Load(cgroups.V1, cgroups.RootPath) control, err := cgroups.Load(cgroups.V1, cgroups.RootPath)
if err != nil { if err != nil {
return 0, err return 0, err

View File

@ -18,19 +18,19 @@ import (
) )
func TestInContainer(t *testing.T) { func TestInContainer(t *testing.T) {
_, err := InContainer() _, err := inContainer()
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestGetContainerMemLimit(t *testing.T) { func TestGetContainerMemLimit(t *testing.T) {
limit, err := GetContainerMemLimit() limit, err := getContainerMemLimit()
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, limit > 0) assert.True(t, limit > 0)
t.Log("limit memory:", limit) t.Log("limit memory:", limit)
} }
func TestGetContainerMemUsed(t *testing.T) { func TestGetContainerMemUsed(t *testing.T) {
used, err := GetContainerMemUsed() used, err := getContainerMemUsed()
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, used > 0) assert.True(t, used > 0)
t.Log("used memory:", used) t.Log("used memory:", used)

View File

@ -15,18 +15,18 @@ import (
"errors" "errors"
) )
// IfServiceInContainer checks if the service is running inside a container // inContainer checks if the service is running inside a container
// It should be always false while under windows. // It should be always false while under windows.
func InContainer() (bool, error) { func inContainer() (bool, error) {
return false, nil return false, nil
} }
// GetContainerMemLimit returns memory limit and error // getContainerMemLimit returns memory limit and error
func GetContainerMemLimit() (uint64, error) { func getContainerMemLimit() (uint64, error) {
return 0, errors.New("Not supported") return 0, errors.New("Not supported")
} }
// GetContainerMemUsed returns memory usage and error // getContainerMemUsed returns memory usage and error
func GetContainerMemUsed() (uint64, error) { func getContainerMemUsed() (uint64, error) {
return 0, errors.New("Not supported") return 0, errors.New("Not supported")
} }

View File

@ -12,10 +12,19 @@
package metricsinfo package metricsinfo
import ( import (
"github.com/milvus-io/milvus/internal/log" "sync"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
)
var (
icOnce sync.Once
ic bool
icErr error
) )
// GetCPUCoreCount returns the count of cpu core. // GetCPUCoreCount returns the count of cpu core.
@ -50,18 +59,51 @@ func GetCPUUsage() float64 {
// GetMemoryCount returns the memory count in bytes. // GetMemoryCount returns the memory count in bytes.
func GetMemoryCount() uint64 { func GetMemoryCount() uint64 {
icOnce.Do(func() {
ic, icErr = inContainer()
})
if icErr != nil {
log.Error(icErr.Error())
return 0
}
if ic {
// in container, calculate by `cgroups`
limit, err := getContainerMemLimit()
if err != nil {
log.Error(err.Error())
return 0
}
return limit
}
// not in container, calculate by `gopsutil`
stats, err := mem.VirtualMemory() stats, err := mem.VirtualMemory()
if err != nil { if err != nil {
log.Warn("failed to get memory count", log.Warn("failed to get memory count",
zap.Error(err)) zap.Error(err))
return 0 return 0
} }
return stats.Total return stats.Total
} }
// GetUsedMemoryCount returns the memory usage in bytes. // GetUsedMemoryCount returns the memory usage in bytes.
func GetUsedMemoryCount() uint64 { func GetUsedMemoryCount() uint64 {
icOnce.Do(func() {
ic, icErr = inContainer()
})
if icErr != nil {
log.Error(icErr.Error())
return 0
}
if ic {
// in container, calculate by `cgroups`
used, err := getContainerMemUsed()
if err != nil {
log.Error(err.Error())
return 0
}
return used
}
// not in container, calculate by `gopsutil`
stats, err := mem.VirtualMemory() stats, err := mem.VirtualMemory()
if err != nil { if err != nil {
log.Warn("failed to get memory usage count", log.Warn("failed to get memory usage count",