Improve predict the resource usage (#23968)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/24034/head
yah01 2023-05-11 15:33:24 +08:00 committed by GitHub
parent ab95754069
commit 400364483d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 191 additions and 101 deletions

View File

@ -285,18 +285,20 @@ func (sd *shardDelegator) addGrowing(entries ...SegmentEntry) {
func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
log := sd.getLogger(ctx)
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
log.Info("loading growing segments...", zap.Int64s("segmentIDs", segmentIDs))
loaded, err := sd.loader.Load(ctx, sd.collectionID, segments.SegmentTypeGrowing, version, infos...)
if err != nil {
log.Warn("failed to load growing segment", zap.Error(err))
for _, segment := range loaded {
segments.DeleteSegment(segment.(*segments.LocalSegment))
}
return err
}
segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs))
for _, candidate := range loaded {
sd.pkOracle.Register(candidate, paramtable.GetNodeID())
}
sd.segmentManager.Put(segments.SegmentTypeGrowing, loaded...)
sd.addGrowing(lo.Map(loaded, func(segment segments.Segment, _ int) SegmentEntry {
return SegmentEntry{
NodeID: paramtable.GetNodeID(),

View File

@ -20,9 +20,6 @@ import (
"context"
"fmt"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -46,20 +43,14 @@ func NewLocalWorker(node *QueryNode) *LocalWorker {
func (w *LocalWorker) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
log := log.Ctx(ctx)
log.Info("start to load segments...")
loaded, err := w.node.loader.Load(ctx,
_, err := w.node.loader.Load(ctx,
req.GetCollectionID(),
segments.SegmentTypeSealed,
req.GetVersion(),
req.GetInfos()...,
)
if err != nil {
return err
}
log.Info("save loaded segments...",
zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() })))
w.node.manager.Segment.Put(segments.SegmentTypeSealed, loaded...)
return nil
log.Info("load segments done")
return err
}
func (w *LocalWorker) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) error {

View File

@ -131,6 +131,7 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) {
if _, ok := targetMap[segment.ID()]; ok {
continue
}
targetMap[segment.ID()] = segment
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),

View File

@ -23,6 +23,7 @@ import (
"runtime"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -48,6 +49,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -62,7 +64,7 @@ var (
type Loader interface {
// Load loads binlogs, and spawn segments,
// NOTE: make sure the ref count of the corresponding collection will never go down to 0 during this
Load(ctx context.Context, collectionID int64, segmentType SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) ([]Segment, error)
Load(ctx context.Context, collectionID int64, segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) ([]Segment, error)
LoadDeltaLogs(ctx context.Context, segment *LocalSegment, deltaLogs []*datapb.FieldBinlog) error
@ -71,7 +73,7 @@ type Loader interface {
}
func NewLoader(
manager CollectionManager,
manager *Manager,
cm storage.ChunkManager,
) *segmentLoader {
cpuNum := runtime.GOMAXPROCS(0)
@ -94,9 +96,10 @@ func NewLoader(
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
loader := &segmentLoader{
manager: manager,
cm: cm,
ioPool: ioPool,
manager: manager,
cm: cm,
ioPool: ioPool,
loadingSegments: typeutil.NewConcurrentMap[int64, chan struct{}](),
}
return loader
@ -104,38 +107,34 @@ func NewLoader(
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
manager CollectionManager
manager *Manager
cm storage.ChunkManager
ioPool *conc.Pool[*storage.Blob]
mut sync.Mutex
// The channel will be closed as the segment loaded
loadingSegments *typeutil.ConcurrentMap[int64, chan struct{}]
committedMemSize uint64
committedDiskSize uint64
}
var _ Loader = (*segmentLoader)(nil)
func (loader *segmentLoader) getFieldType(segment *LocalSegment, fieldID int64) (schemapb.DataType, error) {
collection := loader.manager.Get(segment.collectionID)
if collection == nil {
return 0, WrapCollectionNotFound(segment.Collection())
}
for _, field := range collection.Schema().GetFields() {
if field.GetFieldID() == fieldID {
return field.GetDataType(), nil
}
}
return 0, WrapFieldNotFound(fieldID)
}
func (loader *segmentLoader) Load(ctx context.Context,
collectionID int64,
segmentType SegmentType,
version int64,
infos ...*querypb.SegmentLoadInfo,
segments ...*querypb.SegmentLoadInfo,
) ([]Segment, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.String("segmentType", segmentType.String()),
)
// Filter out loaded & loading segments
infos := loader.prepare(segmentType, segments...)
defer loader.unregister(infos...)
segmentNum := len(infos)
if segmentNum == 0 {
log.Info("no segment to load")
@ -144,39 +143,26 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("start loading...", zap.Int("segmentNum", segmentNum))
// Check memory limit
var (
concurrencyLevel = funcutil.Min(runtime.GOMAXPROCS(0), len(infos))
err error
)
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
err = loader.checkSegmentSize(collectionID, infos, concurrencyLevel)
if err == nil {
break
}
}
// Check memory & storage limit
memUsage, diskUsage, concurrencyLevel, err := loader.requestResource(infos...)
if err != nil {
log.Warn("load failed, OOM if loaded", zap.Error(err))
return nil, err
}
defer func() {
loader.mut.Lock()
defer loader.mut.Unlock()
logNum := 0
for _, field := range infos[0].GetBinlogPaths() {
logNum += len(field.GetBinlogs())
}
if logNum > 0 {
// IO pool will be run out even with the new smaller level
concurrencyLevel = funcutil.Min(concurrencyLevel, loader.ioPool.Free()/logNum)
}
loader.committedMemSize -= memUsage
loader.committedDiskSize -= diskUsage
}()
newSegments := make(map[UniqueID]*LocalSegment, len(infos))
newSegments := make(map[int64]*LocalSegment, len(infos))
clearAll := func() {
for _, s := range newSegments {
DeleteSegment(s)
}
debug.FreeOSMemory()
}
loadedSegments := NewConcurrentSet[*LocalSegment]()
for _, info := range infos {
segmentID := info.SegmentID
@ -184,7 +170,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
collectionID := info.CollectionID
shard := info.InsertChannel
collection := loader.manager.Get(collectionID)
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := WrapCollectionNotFound(collectionID)
log.Warn("failed to get collection", zap.Error(err))
@ -221,10 +207,15 @@ func (loader *segmentLoader) Load(ctx context.Context,
)
return err
}
loadedSegments.Insert(segment)
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
waitCh, ok := loader.loadingSegments.Get(segmentID)
if !ok {
return errors.New("segment was removed from the loading map early")
}
close(waitCh)
return nil
}
@ -236,24 +227,103 @@ func (loader *segmentLoader) Load(ctx context.Context,
err = funcutil.ProcessFuncParallel(segmentNum,
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
if err != nil {
clearAll()
log.Warn("failed to load some segments", zap.Error(err))
return nil, err
}
loaded := loadedSegments.Collect()
if len(loaded) != len(newSegments) {
// Free the memory of segments which failed to load
for _, segment := range newSegments {
DeleteSegment(segment)
// Wait for all segments loaded
for _, segment := range segments {
if loader.manager.Segment.Get(segment.GetSegmentID()) != nil {
continue
}
debug.FreeOSMemory()
loadedIDs := lo.Map(loaded, func(segment *LocalSegment, _ int) int64 {
return segment.ID()
})
log.Info("partial segments are loaded", zap.Int64s("loadedSegments", loadedIDs))
waitCh, ok := loader.loadingSegments.Get(segment.GetSegmentID())
if !ok {
log.Warn("segment was removed from the loading map early", zap.Int64("segmentID", segment.GetSegmentID()))
return nil, errors.New("segment was removed from the loading map early")
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-waitCh:
}
}
return lo.Map(loaded, func(segment *LocalSegment, _ int) Segment { return segment }), err
loaded := make([]Segment, 0, len(newSegments))
for _, segment := range newSegments {
loaded = append(loaded, segment)
}
loader.manager.Segment.Put(segmentType, loaded...)
return loaded, nil
}
func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
loader.mut.Lock()
defer loader.mut.Unlock()
// filter out loaded & loading segments
infos := make([]*querypb.SegmentLoadInfo, 0, len(segments))
for _, segment := range segments {
// Not loaded & loading
if len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) == 0 &&
!loader.loadingSegments.Contain(segment.GetSegmentID()) {
infos = append(infos, segment)
loader.loadingSegments.Insert(segment.GetSegmentID(), make(chan struct{}))
} else {
log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()),
zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0),
zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())),
)
}
}
return infos
}
func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) {
loader.mut.Lock()
defer loader.mut.Unlock()
for i := range segments {
loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID())
}
}
// requestResource requests memory & storage to load segments,
// returns the memory usage, disk usage and concurrency with the gained memory.
func (loader *segmentLoader) requestResource(infos ...*querypb.SegmentLoadInfo) (uint64, uint64, int, error) {
loader.mut.Lock()
defer loader.mut.Unlock()
concurrencyLevel := funcutil.Min(runtime.GOMAXPROCS(0), len(infos))
logNum := 0
for _, field := range infos[0].GetBinlogPaths() {
logNum += len(field.GetBinlogs())
}
if logNum > 0 {
// IO pool will be run out even with the new smaller level
concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1))
}
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(infos, concurrencyLevel)
if err == nil {
break
}
}
memUsage, diskUsage, err := loader.checkSegmentSize(infos, concurrencyLevel)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return 0, 0, 0, err
}
loader.committedMemSize += memUsage
loader.committedDiskSize += diskUsage
return memUsage, diskUsage, concurrencyLevel, nil
}
func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) {
@ -270,7 +340,7 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
return nil, nil
}
collection := loader.manager.Get(collectionID)
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := WrapCollectionNotFound(collectionID)
log.Warn("failed to get collection while loading segment", zap.Error(err))
@ -328,7 +398,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
zap.Int64("rowNum", loadInfo.GetNumOfRows()),
zap.String("segmentType", segment.Type().String()))
collection := loader.manager.Get(segment.Collection())
collection := loader.manager.Collection.Get(segment.Collection())
if collection == nil {
err := WrapCollectionNotFound(segment.Collection())
log.Warn("failed to get collection while loading segment", zap.Error(err))
@ -677,7 +747,7 @@ func (loader *segmentLoader) insertIntoSegment(segment *LocalSegment,
Version: msgpb.InsertDataVersion_ColumnBased,
},
}
collection := loader.manager.Get(segment.Collection())
collection := loader.manager.Collection.Get(segment.Collection())
if collection == nil {
err := WrapCollectionNotFound(segment.Collection())
log.Warn("failed to get collection while inserting data into segment", zap.Error(err))
@ -872,15 +942,23 @@ func GetStorageSizeByIndexInfo(indexInfo *querypb.FieldIndexInfo) (uint64, uint6
return uint64(indexInfo.IndexSize), 0, nil
}
func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoadInfos []*querypb.SegmentLoadInfo, concurrency int) error {
usedMem := hardware.GetUsedMemoryCount()
totalMem := hardware.GetMemoryCount()
if len(segmentLoadInfos) < concurrency {
concurrency = len(segmentLoadInfos)
// checkSegmentSize checks whether the memory & disk is sufficient to load the segments with given concurrency,
// returns the memory & disk usage while loading if possible to load,
// otherwise, returns error
func (loader *segmentLoader) checkSegmentSize(segmentLoadInfos []*querypb.SegmentLoadInfo, concurrency int) (uint64, uint64, error) {
if len(segmentLoadInfos) == 0 || concurrency == 0 {
return 0, 0, nil
}
log := log.With(
zap.Int64("collectionID", segmentLoadInfos[0].GetCollectionID()),
)
usedMem := hardware.GetUsedMemoryCount() + loader.committedMemSize
totalMem := hardware.GetMemoryCount()
if usedMem == 0 || totalMem == 0 {
return fmt.Errorf("get memory failed when checkSegmentSize, collectionID = %d", collectionID)
return 0, 0, errors.New("get memory failed when checkSegmentSize")
}
usedMemAfterLoad := usedMem
@ -888,9 +966,10 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
localUsedSize, err := GetLocalUsedSize()
if err != nil {
return fmt.Errorf("get local used size failed, collectionID = %d", collectionID)
return 0, 0, errors.Wrap(err, "get local used size failed")
}
usedLocalSizeAfterLoad := uint64(localUsedSize)
diskUsed := uint64(localUsedSize) + loader.committedDiskSize
usedLocalSizeAfterLoad := diskUsed
for _, loadInfo := range segmentLoadInfos {
oldUsedMem := usedMemAfterLoad
@ -907,10 +986,13 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
neededMemSize, neededDiskSize, err := GetStorageSizeByIndexInfo(fieldIndexInfo)
if err != nil {
log.Error(err.Error(), zap.Int64("collectionID", loadInfo.CollectionID),
log.Error("failed to get index size",
zap.Int64("collectionID", loadInfo.CollectionID),
zap.Int64("segmentID", loadInfo.SegmentID),
zap.Int64("indexBuildID", fieldIndexInfo.BuildID))
return err
zap.Int64("indexBuildID", fieldIndexInfo.BuildID),
zap.Error(err),
)
return 0, 0, err
}
usedMemAfterLoad += neededMemSize
usedLocalSizeAfterLoad += neededDiskSize
@ -942,15 +1024,13 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
memLoadingUsage := usedMemAfterLoad + uint64(
float64(maxSegmentSize)*float64(concurrency)*paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat())
log.Info("predict memory and disk usage while loading (in MiB)",
zap.Int64("collectionID", collectionID),
zap.Int("concurrency", concurrency),
zap.Uint64("memUsage", toMB(memLoadingUsage)),
zap.Uint64("memUsageAfterLoad", toMB(usedMemAfterLoad)),
zap.Uint64("diskUsageAfterLoad", toMB(usedLocalSizeAfterLoad)))
if memLoadingUsage > uint64(float64(totalMem)*paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) {
return fmt.Errorf("load segment failed, OOM if load, collectionID = %d, maxSegmentSize = %v MB, concurrency = %d, usedMemAfterLoad = %v MB, totalMem = %v MB, thresholdFactor = %f",
collectionID,
return 0, 0, fmt.Errorf("load segment failed, OOM if load, maxSegmentSize = %v MB, concurrency = %d, usedMemAfterLoad = %v MB, totalMem = %v MB, thresholdFactor = %f",
toMB(maxSegmentSize),
concurrency,
toMB(usedMemAfterLoad),
@ -959,14 +1039,27 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
}
if usedLocalSizeAfterLoad > uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())*paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) {
return fmt.Errorf("load segment failed, disk space is not enough, collectionID = %d, usedDiskAfterLoad = %v MB, totalDisk = %v MB, thresholdFactor = %f",
collectionID,
return 0, 0, fmt.Errorf("load segment failed, disk space is not enough, usedDiskAfterLoad = %v MB, totalDisk = %v MB, thresholdFactor = %f",
toMB(usedLocalSizeAfterLoad),
toMB(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())),
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
}
return nil
return memLoadingUsage - usedMem, usedLocalSizeAfterLoad - diskUsed, nil
}
func (loader *segmentLoader) getFieldType(segment *LocalSegment, fieldID int64) (schemapb.DataType, error) {
collection := loader.manager.Collection.Get(segment.collectionID)
if collection == nil {
return 0, WrapCollectionNotFound(segment.Collection())
}
for _, field := range collection.Schema().GetFields() {
if field.GetFieldID() == fieldID {
return field.GetDataType(), nil
}
}
return 0, WrapFieldNotFound(fieldID)
}
func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {

View File

@ -36,8 +36,8 @@ type SegmentLoaderSuite struct {
loader Loader
// Dependencies
collectionManager CollectionManager
chunkManager storage.ChunkManager
manager *Manager
chunkManager storage.ChunkManager
// Data
collectionID int64
@ -56,11 +56,11 @@ func (suite *SegmentLoaderSuite) SetupSuite() {
func (suite *SegmentLoaderSuite) SetupTest() {
// Dependencies
suite.collectionManager = NewCollectionManager()
suite.manager = NewManager()
suite.chunkManager = storage.NewLocalChunkManager(storage.RootPath(
fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63())))
suite.loader = NewLoader(suite.collectionManager, suite.chunkManager)
suite.loader = NewLoader(suite.manager, suite.chunkManager)
// Data
schema := GenTestCollectionSchema("test", schemapb.DataType_Int64)
@ -70,7 +70,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
CollectionID: suite.collectionID,
PartitionIDs: []int64{suite.partitionID},
}
suite.collectionManager.Put(suite.collectionID, schema, indexMeta, loadMeta)
suite.manager.Collection.Put(suite.collectionID, schema, indexMeta, loadMeta)
}
func (suite *SegmentLoaderSuite) TestLoad() {

View File

@ -304,7 +304,7 @@ func (node *QueryNode) Init() error {
node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
node.subscribingChannels = typeutil.NewConcurrentSet[string]()
node.manager = segments.NewManager()
node.loader = segments.NewLoader(node.manager.Collection, node.vectorStorage)
node.loader = segments.NewLoader(node.manager, node.vectorStorage)
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())
// init pipeline manager
node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)

View File

@ -466,10 +466,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
}, nil
}
log.Info("save loaded segments...",
log.Info("load segments done...",
zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() })))
node.manager.Segment.Put(segments.SegmentTypeSealed, loaded...)
return util.SuccessStatus(), nil
}

View File

@ -74,6 +74,11 @@ func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
return value.(V), true
}
func (m *ConcurrentMap[K, V]) Contain(key K) bool {
_, ok := m.Get(key)
return ok
}
// GetOrInsert returns the `value` and `loaded` on the given `key`, `value` set.
// If the key already exists, return the value and set `loaded` to true.
// If the key does not exist, insert the given `key` and `value` to map, return the value and set `loaded` to false.