Add streaming segments memory to used memory when checkLoadMemory (#8940)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/8984/head
bigsheeper 2021-09-30 21:54:08 +08:00 committed by GitHub
parent 9303a6414d
commit e7d92e4e79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 103 deletions

View File

@ -13,10 +13,16 @@ package querynode
import (
"context"
"errors"
"fmt"
"os"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -73,3 +79,50 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, Params.QueryNodeID),
}, nil
}
func checkSegmentMemory(segmentLoadInfos []*querypb.SegmentLoadInfo, historicalReplica, streamingReplica ReplicaInterface) error {
historicalSegmentsMemSize := historicalReplica.getSegmentsMemSize()
streamingSegmentsMemSize := streamingReplica.getSegmentsMemSize()
usedRAMInMB := (historicalSegmentsMemSize + streamingSegmentsMemSize) / 1024.0 / 1024.0
totalRAMInMB := Params.CacheSize * 1024.0
segmentTotalSize := int64(0)
for _, segInfo := range segmentLoadInfos {
collectionID := segInfo.CollectionID
segmentID := segInfo.SegmentID
col, err := historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
if err != nil {
return err
}
segmentSize := int64(sizePerRecord) * segInfo.NumOfRows
segmentTotalSize += segmentSize / 1024.0 / 1024.0
// TODO: get threshold factor from param table
thresholdMemSize := float64(totalRAMInMB) * 0.7
log.Debug("memory stats when load segment",
zap.Any("collectionIDs", collectionID),
zap.Any("segmentID", segmentID),
zap.Any("numOfRows", segInfo.NumOfRows),
zap.Any("totalRAM(MB)", totalRAMInMB),
zap.Any("usedRAM(MB)", usedRAMInMB),
zap.Any("segmentTotalSize(MB)", segmentTotalSize),
zap.Any("thresholdMemSize(MB)", thresholdMemSize),
)
if usedRAMInMB+segmentTotalSize > int64(thresholdMemSize) {
return errors.New(fmt.Sprintln("load segment failed, OOM if load, "+
"collectionID = ", collectionID, ", ",
"usedRAM(MB) = ", usedRAMInMB, ", ",
"segmentTotalSize(MB) = ", segmentTotalSize, ", ",
"thresholdMemSize(MB) = ", thresholdMemSize))
}
}
return nil
}

View File

@ -19,7 +19,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func TestGetSystemInfoMetrics(t *testing.T) {
@ -38,3 +40,67 @@ func TestGetSystemInfoMetrics(t *testing.T) {
assert.NoError(t, err)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
}
func TestCheckSegmentMemory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genSegmentLoadInfo := func() *querypb.SegmentLoadInfo {
return &querypb.SegmentLoadInfo{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
NumOfRows: 1,
}
}
t.Run("valid test", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{genSegmentLoadInfo()}, node.historical.replica, node.streaming.replica)
assert.NoError(t, err)
})
t.Run("test no collection", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
node.historical.replica.freeAll()
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{genSegmentLoadInfo()}, node.historical.replica, node.streaming.replica)
assert.Error(t, err)
})
t.Run("test OOM", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
totalRAM := Params.CacheSize * 1024 * 1024 * 1024
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
assert.NoError(t, err)
info := genSegmentLoadInfo()
info.NumOfRows = totalRAM / int64(sizePerRecord)
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{info}, node.historical.replica, node.streaming.replica)
assert.Error(t, err)
})
t.Run("test EstimateSizePerRecord failed", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
for _, param := range col.schema.Fields[0].TypeParams {
if param.Key == "dim" {
param.Value = "%&^%&"
}
}
info := genSegmentLoadInfo()
err = checkSegmentMemory([]*querypb.SegmentLoadInfo{info}, node.historical.replica, node.streaming.replica)
assert.Error(t, err)
})
}

View File

@ -137,6 +137,8 @@ func (p *ParamTable) Init() {
}
func (p *ParamTable) initCacheSize() {
defer log.Debug("init cacheSize", zap.Any("cacheSize (GB)", p.CacheSize))
const defaultCacheSize = 32 // GB
p.CacheSize = defaultCacheSize
@ -153,7 +155,6 @@ func (p *ParamTable) initCacheSize() {
return
}
p.CacheSize = value
log.Debug("init cacheSize", zap.Any("cacheSize (GB)", p.CacheSize))
}
// ---------------------------------------------------------- minio

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
@ -71,11 +70,6 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
return nil
}
err := loader.checkSegmentMemory(req.Infos)
if err != nil {
return err
}
newSegments := make([]*Segment, 0)
segmentGC := func() {
for _, s := range newSegments {
@ -199,47 +193,6 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment
return nil
}
func (loader *segmentLoader) checkSegmentMemory(segmentLoadInfos []*querypb.SegmentLoadInfo) error {
totalRAMInMB := Params.CacheSize * 1024.0
usedRAMInMB := loader.historicalReplica.getSegmentsMemSize() / 1024.0 / 1024.0
segmentTotalSize := int64(0)
for _, segInfo := range segmentLoadInfos {
collectionID := segInfo.CollectionID
segmentID := segInfo.SegmentID
col, err := loader.historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
if err != nil {
return err
}
segmentSize := int64(sizePerRecord) * segInfo.NumOfRows
segmentTotalSize += segmentSize / 1024.0 / 1024.0
// TODO: get threshold factor from param table
thresholdMemSize := float64(totalRAMInMB) * 0.5
log.Debug("memory stats when load segment",
zap.Any("collectionIDs", collectionID),
zap.Any("segmentID", segmentID),
zap.Any("numOfRows", segInfo.NumOfRows),
zap.Any("totalRAM(MB)", totalRAMInMB),
zap.Any("usedRAM(MB)", usedRAMInMB),
zap.Any("segmentTotalSize(MB)", segmentTotalSize),
zap.Any("thresholdMemSize(MB)", thresholdMemSize),
)
if usedRAMInMB+segmentTotalSize > int64(thresholdMemSize) {
return errors.New("load segment failed, OOM if load, collectionID = " + fmt.Sprintln(collectionID))
}
}
return nil
}
//func (loader *segmentLoader) GetSegmentStates(segmentID UniqueID) (*datapb.GetSegmentStatesResponse, error) {
// ctx := context.TODO()
// if loader.dataCoord == nil {

View File

@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func TestSegmentLoader_loadSegment(t *testing.T) {
@ -192,60 +191,6 @@ func TestSegmentLoader_notOnService(t *testing.T) {
assert.NoError(t, err)
}
func TestSegmentLoader_CheckSegmentMemory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
collectionID := UniqueID(0)
segmentID := UniqueID(0)
genSegmentLoader := func() *segmentLoader {
replica := newCollectionReplica(nil)
err := replica.addCollection(collectionID, genTestCollectionSchema(collectionID, false, 128))
assert.NoError(t, err)
loader := newSegmentLoader(ctx, nil, nil, replica, nil)
return loader
}
genSegmentLoadInfo := func() *querypb.SegmentLoadInfo {
return &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: UniqueID(0),
CollectionID: collectionID,
NumOfRows: 1000,
}
}
t.Run("valid test", func(t *testing.T) {
loader := genSegmentLoader()
err := loader.checkSegmentMemory([]*querypb.SegmentLoadInfo{genSegmentLoadInfo()})
assert.NoError(t, err)
})
t.Run("test no collection", func(t *testing.T) {
loader := genSegmentLoader()
loader.historicalReplica.freeAll()
err := loader.checkSegmentMemory([]*querypb.SegmentLoadInfo{genSegmentLoadInfo()})
assert.Error(t, err)
})
t.Run("test OOM", func(t *testing.T) {
totalRAM := Params.CacheSize * 1024 * 1024 * 1024
loader := genSegmentLoader()
col, err := loader.historicalReplica.getCollectionByID(collectionID)
assert.NoError(t, err)
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
assert.NoError(t, err)
info := genSegmentLoadInfo()
info.NumOfRows = totalRAM / int64(sizePerRecord)
err = loader.checkSegmentMemory([]*querypb.SegmentLoadInfo{info})
assert.Error(t, err)
})
}
func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -381,6 +381,12 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
}
}
err = checkSegmentMemory(l.req.Infos, l.node.historical.replica, l.node.streaming.replica)
if err != nil {
log.Warn(err.Error())
return err
}
switch l.req.LoadCondition {
case queryPb.TriggerCondition_handoff:
err = l.node.historical.loader.loadSegmentOfConditionHandOff(l.req)

View File

@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func TestTask_watchDmChannelsTask(t *testing.T) {
@ -284,6 +285,35 @@ func TestTask_loadSegmentsTask(t *testing.T) {
err = task.Execute(ctx)
assert.Error(t, err)
})
t.Run("test OOM", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
totalRAM := Params.CacheSize * 1024 * 1024 * 1024
col, err := node.historical.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
sizePerRecord, err := typeutil.EstimateSizePerRecord(col.schema)
assert.NoError(t, err)
task := loadSegmentsTask{
req: genLoadEmptySegmentsRequest(),
node: node,
}
task.req.Infos = []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
NumOfRows: totalRAM / int64(sizePerRecord),
},
}
task.req.LoadCondition = querypb.TriggerCondition_handoff
err = task.Execute(ctx)
assert.Error(t, err)
})
}
func TestTask_releaseCollectionTask(t *testing.T) {