Refine Concurrent Load (#20220)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/20243/head
Xiaofan 2022-11-01 19:25:37 +08:00 committed by GitHub
parent 4f6b87311e
commit ded919a7a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 110 deletions

View File

@ -484,45 +484,43 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegment
node.taskLock.Unlock(strconv.FormatInt(id, 10))
}
}()
future := node.taskPool.Submit(func() (interface{}, error) {
log.Info("loadSegmentsTask start ", zap.Int64("collectionID", in.CollectionID),
zap.Int64s("segmentIDs", segmentIDs),
zap.Duration("timeInQueue", time.Since(startTs)))
err := task.PreExecute(ctx)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn("failed to load segments on preExecute ", zap.Error(err))
return status, nil
}
err = task.Execute(ctx)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn("failed to load segment", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Error(err))
return status, nil
}
err = task.PostExecute(ctx)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn("failed to load segments on postExecute ", zap.Error(err))
return status, nil
// TODO remove concurrent load segment for now, unless we solve the memory issue
log.Info("loadSegmentsTask start ", zap.Int64("collectionID", in.CollectionID),
zap.Int64s("segmentIDs", segmentIDs),
zap.Duration("timeInQueue", time.Since(startTs)))
err := task.PreExecute(ctx)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Info("loadSegmentsTask done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
})
ret, _ := future.Await()
return ret.(*commonpb.Status), nil
log.Warn("failed to load segments on preExecute ", zap.Error(err))
return status, nil
}
err = task.Execute(ctx)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn("failed to load segment", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Error(err))
return status, nil
}
err = task.PostExecute(ctx)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn("failed to load segments on postExecute ", zap.Error(err))
return status, nil
}
log.Info("loadSegmentsTask done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
// ReleaseCollection clears all data related to this collection on the querynode

View File

@ -28,6 +28,7 @@ import (
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
@ -52,8 +53,7 @@ import (
)
const (
requestConcurrencyLevelLimit = 8
UsedDiskMemoryRatio = 6
UsedDiskMemoryRatio = 4
)
var (
@ -69,8 +69,7 @@ type segmentLoader struct {
cm storage.ChunkManager // minio cm
etcdKV *etcdkv.EtcdKV
ioPool *concurrency.Pool
cpuPool *concurrency.Pool
ioPool *concurrency.Pool
// cgoPool for all cgo invocation
cgoPool *concurrency.Pool
@ -112,10 +111,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, req *querypb.LoadS
}
return minValue
}
concurrencyLevel := min(loader.cpuPool.Cap(),
len(req.Infos),
requestConcurrencyLevelLimit)
concurrencyLevel := min(runtime.GOMAXPROCS(0), len(req.Infos))
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
err := loader.checkSegmentSize(req.CollectionID, req.Infos, concurrencyLevel)
if err == nil {
@ -334,7 +330,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(ctx context.Context, segme
_, _, insertData, err := iCodec.Deserialize(blobs)
if err != nil {
log.Warn(err.Error())
log.Warn("failed to deserialize", zap.Int64("segment", segment.segmentID), zap.Error(err))
return err
}
@ -363,15 +359,15 @@ func (loader *segmentLoader) loadGrowingSegmentFields(ctx context.Context, segme
}
func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segment *Segment, fields []*datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) error {
// Load fields concurrently
futures := make([]*concurrency.Future, 0, len(fields))
runningGroup, groupCtx := errgroup.WithContext(ctx)
for _, field := range fields {
future := loader.loadSealedFieldAsync(ctx, segment, field, loadInfo)
futures = append(futures, future)
fieldBinLog := field
runningGroup.Go(func() error {
// reload data from dml channel
return loader.loadSealedField(groupCtx, segment, fieldBinLog, loadInfo)
})
}
err := concurrency.AwaitAll(futures...)
err := runningGroup.Wait()
if err != nil {
return err
}
@ -386,35 +382,34 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen
}
// async load field of sealed segment
func (loader *segmentLoader) loadSealedFieldAsync(ctx context.Context, segment *Segment, field *datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) *concurrency.Future {
func (loader *segmentLoader) loadSealedField(ctx context.Context, segment *Segment, field *datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) error {
iCodec := storage.InsertCodec{}
// Avoid consuming too much memory if no CPU worker ready,
// acquire a CPU worker before load field binlogs
return loader.cpuPool.Submit(func() (interface{}, error) {
futures := loader.loadFieldBinlogsAsync(ctx, field)
futures := loader.loadFieldBinlogsAsync(ctx, field)
blobs := make([]*storage.Blob, len(futures))
for index, future := range futures {
if !future.OK() {
return nil, future.Err()
}
err := concurrency.AwaitAll(futures...)
if err != nil {
return err
}
blob := future.Value().(*storage.Blob)
blobs[index] = blob
}
blobs := make([]*storage.Blob, len(futures))
for index, future := range futures {
blob := future.Value().(*storage.Blob)
blobs[index] = blob
}
insertData := storage.InsertData{
Data: make(map[int64]storage.FieldData),
}
_, _, _, err := iCodec.DeserializeInto(blobs, int(loadInfo.GetNumOfRows()), &insertData)
if err != nil {
log.Warn(err.Error())
return nil, err
}
insertData := storage.InsertData{
Data: make(map[int64]storage.FieldData),
}
_, _, _, err = iCodec.DeserializeInto(blobs, int(loadInfo.GetNumOfRows()), &insertData)
if err != nil {
log.Warn("failed to load sealed field", zap.Int64("SegmentId", segment.segmentID), zap.Error(err))
return err
}
return nil, loader.loadSealedSegments(segment, &insertData)
})
return loader.loadSealedSegments(segment, &insertData)
}
// Load binlogs concurrently into memory from KV storage asyncly
@ -466,20 +461,17 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se
futures := make([]*concurrency.Future, 0, len(indexInfo.IndexFilePaths))
indexCodec := storage.NewIndexFileBinlogCodec()
// TODO, remove the load index info froam
for _, indexPath := range indexInfo.IndexFilePaths {
// get index params when detecting indexParamPrefix
if path.Base(indexPath) == storage.IndexParamsKey {
indexParamsFuture := loader.ioPool.Submit(func() (interface{}, error) {
log.Info("load index params file", zap.String("path", indexPath))
return loader.cm.Read(ctx, indexPath)
})
indexParamsBlob, err := indexParamsFuture.Await()
log.Info("load index params file", zap.String("path", indexPath))
indexParamsBlob, err := loader.cm.Read(ctx, indexPath)
if err != nil {
return err
}
_, indexParams, _, _, err := indexCodec.Deserialize([]*storage.Blob{{Key: storage.IndexParamsKey, Value: indexParamsBlob.([]byte)}})
_, indexParams, _, _, err := indexCodec.Deserialize([]*storage.Blob{{Key: storage.IndexParamsKey, Value: indexParamsBlob}})
if err != nil {
return err
}
@ -504,30 +496,22 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se
}
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
// load on disk index
if indexParams["index_type"] == indexparamcheck.IndexDISKANN {
return segment.segmentLoadIndexData(nil, indexInfo, fieldType)
}
// load in memory index
for _, p := range indexInfo.IndexFilePaths {
indexPath := p
indexFuture := loader.cpuPool.Submit(func() (interface{}, error) {
indexBlobFuture := loader.ioPool.Submit(func() (interface{}, error) {
log.Info("load index file", zap.String("path", indexPath))
data, err := loader.cm.Read(ctx, indexPath)
if err != nil {
log.Warn("failed to load index file", zap.String("path", indexPath), zap.Error(err))
return nil, err
}
return data, nil
})
indexBlob, err := indexBlobFuture.Await()
indexFuture := loader.ioPool.Submit(func() (interface{}, error) {
log.Info("load index file", zap.String("path", indexPath))
data, err := loader.cm.Read(ctx, indexPath)
if err != nil {
log.Warn("failed to load index file", zap.String("path", indexPath), zap.Error(err))
return nil, err
}
data, _, _, _, err := indexCodec.Deserialize([]*storage.Blob{{Key: path.Base(indexPath), Value: indexBlob.([]byte)}})
return data, err
blobs, _, _, _, err := indexCodec.Deserialize([]*storage.Blob{{Key: path.Base(indexPath), Value: data}})
return blobs, err
})
futures = append(futures, indexFuture)
@ -946,18 +930,15 @@ func newSegmentLoader(
pool *concurrency.Pool) *segmentLoader {
cpuNum := runtime.GOMAXPROCS(0)
// This error is not nil only if the options of creating pool is invalid
cpuPool, err := concurrency.NewPool(cpuNum, ants.WithPreAlloc(true))
if err != nil {
log.Error("failed to create goroutine pool for segment loader",
zap.Error(err))
panic(err)
}
ioPoolSize := cpuNum * 2
ioPoolSize := cpuNum * 8
// make sure small machines could load faster
if ioPoolSize < 32 {
ioPoolSize = 32
}
// limit the number of concurrency
if ioPoolSize > 256 {
ioPoolSize = 256
}
ioPool, err := concurrency.NewPool(ioPoolSize, ants.WithPreAlloc(true))
if err != nil {
log.Error("failed to create goroutine pool for segment loader",
@ -965,9 +946,7 @@ func newSegmentLoader(
panic(err)
}
log.Info("SegmentLoader created",
zap.Int("cpu-pool-size", cpuNum),
zap.Int("io-pool-size", ioPoolSize))
log.Info("SegmentLoader created", zap.Int("io-pool-size", ioPoolSize))
loader := &segmentLoader{
metaReplica: metaReplica,
@ -977,7 +956,6 @@ func newSegmentLoader(
// init them later
ioPool: ioPool,
cpuPool: cpuPool,
cgoPool: pool,
factory: factory,