mirror of https://github.com/milvus-io/milvus.git
Fix Bin log concurrency by adding a pool (#27189)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/27235/head
parent
19e4deb792
commit
6635398a6d
|
@ -30,9 +30,9 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -68,41 +68,43 @@ var _ downloader = (*binlogIO)(nil)
|
|||
var _ uploader = (*binlogIO)(nil)
|
||||
|
||||
func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
|
||||
var (
|
||||
err = errStart
|
||||
vs [][]byte
|
||||
)
|
||||
|
||||
log.Debug("down load", zap.Strings("path", paths))
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
for err != nil {
|
||||
select {
|
||||
case <-gCtx.Done():
|
||||
log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
|
||||
return errDownloadFromBlobStorage
|
||||
|
||||
default:
|
||||
if err != errStart {
|
||||
log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths))
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
resp := make([]*Blob, len(paths))
|
||||
if len(paths) == 0 {
|
||||
return resp, nil
|
||||
}
|
||||
futures := make([]*conc.Future[any], len(paths))
|
||||
for i, path := range paths {
|
||||
localPath := path
|
||||
future := getMultiReadPool().Submit(func() (any, error) {
|
||||
var vs []byte
|
||||
var err = errStart
|
||||
for err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
|
||||
return nil, errDownloadFromBlobStorage
|
||||
default:
|
||||
if err != errStart {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
vs, err = b.Read(ctx, localPath)
|
||||
}
|
||||
vs, err = b.MultiRead(ctx, paths)
|
||||
}
|
||||
return vs, nil
|
||||
})
|
||||
futures[i] = future
|
||||
}
|
||||
|
||||
for i := range futures {
|
||||
if !futures[i].OK() {
|
||||
return nil, futures[i].Err()
|
||||
} else {
|
||||
resp[i] = &Blob{Value: futures[i].Value().([]byte)}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rst := make([]*Blob, len(vs))
|
||||
for i := range rst {
|
||||
rst[i] = &Blob{Value: vs[i]}
|
||||
}
|
||||
|
||||
return rst, nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (b *binlogIO) uploadSegmentFiles(
|
||||
|
@ -110,24 +112,39 @@ func (b *binlogIO) uploadSegmentFiles(
|
|||
CollectionID UniqueID,
|
||||
segID UniqueID,
|
||||
kvs map[string][]byte) error {
|
||||
var err = errStart
|
||||
for err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx done when saving kvs to blob storage",
|
||||
zap.Int64("collectionID", CollectionID),
|
||||
zap.Int64("segmentID", segID),
|
||||
zap.Int("number of kvs", len(kvs)))
|
||||
return errUploadToBlobStorage
|
||||
default:
|
||||
if err != errStart {
|
||||
log.Warn("save binlog failed, retry in 50ms",
|
||||
zap.Int64("collectionID", CollectionID),
|
||||
zap.Int64("segmentID", segID))
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
log.Debug("update", zap.Int64("collectionID", CollectionID), zap.Int64("segmentID", segID))
|
||||
if len(kvs) == 0 {
|
||||
return nil
|
||||
}
|
||||
futures := make([]*conc.Future[any], 0)
|
||||
for key, val := range kvs {
|
||||
localPath := key
|
||||
localVal := val
|
||||
future := getMultiReadPool().Submit(func() (any, error) {
|
||||
var err = errStart
|
||||
for err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx done when saving kvs to blob storage",
|
||||
zap.Int64("collectionID", CollectionID),
|
||||
zap.Int64("segmentID", segID),
|
||||
zap.Int("number of kvs", len(kvs)))
|
||||
return nil, errUploadToBlobStorage
|
||||
default:
|
||||
if err != errStart {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
err = b.Write(ctx, localPath, localVal)
|
||||
}
|
||||
}
|
||||
err = b.MultiWrite(ctx, kvs)
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
futures = append(futures, future)
|
||||
}
|
||||
|
||||
err := conc.AwaitAll(futures...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(test.inctx)
|
||||
cancel()
|
||||
|
||||
_, err := b.download(ctx, nil)
|
||||
_, err := b.download(ctx, []string{"test"})
|
||||
assert.EqualError(t, err, errDownloadFromBlobStorage.Error())
|
||||
}
|
||||
})
|
||||
|
@ -97,7 +97,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Test download twice", func(t *testing.T) {
|
||||
mkc := &mockCm{errMultiLoad: true}
|
||||
mkc := &mockCm{errRead: true}
|
||||
alloc := allocator.NewMockAllocator(t)
|
||||
b := &binlogIO{mkc, alloc}
|
||||
|
||||
|
@ -145,7 +145,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("upload failed", func(t *testing.T) {
|
||||
mkc := &mockCm{errMultiLoad: true, errMultiSave: true}
|
||||
mkc := &mockCm{errRead: true, errSave: true}
|
||||
alloc := allocator.NewMockAllocator(t)
|
||||
b := binlogIO{mkc, alloc}
|
||||
|
||||
|
@ -360,8 +360,8 @@ func TestBinlogIOInnerMethods(t *testing.T) {
|
|||
|
||||
type mockCm struct {
|
||||
storage.ChunkManager
|
||||
errMultiLoad bool
|
||||
errMultiSave bool
|
||||
errRead bool
|
||||
errSave bool
|
||||
MultiReadReturn [][]byte
|
||||
ReadReturn []byte
|
||||
}
|
||||
|
@ -373,25 +373,24 @@ func (mk *mockCm) RootPath() string {
|
|||
}
|
||||
|
||||
func (mk *mockCm) Write(ctx context.Context, filePath string, content []byte) error {
|
||||
if mk.errSave {
|
||||
return errors.New("mockKv save error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mk *mockCm) MultiWrite(ctx context.Context, contents map[string][]byte) error {
|
||||
if mk.errMultiSave {
|
||||
return errors.New("mockKv multisave error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mk *mockCm) Read(ctx context.Context, filePath string) ([]byte, error) {
|
||||
if mk.errRead {
|
||||
return nil, errors.New("mockKv read error")
|
||||
}
|
||||
return mk.ReadReturn, nil
|
||||
}
|
||||
|
||||
func (mk *mockCm) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) {
|
||||
if mk.errMultiLoad {
|
||||
return nil, errors.New("mockKv multiload error")
|
||||
}
|
||||
|
||||
if mk.MultiReadReturn != nil {
|
||||
return mk.MultiReadReturn, nil
|
||||
}
|
||||
|
|
|
@ -22,15 +22,9 @@ import (
|
|||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datanode/allocator"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -40,10 +34,13 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -627,16 +624,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
<-ti.Injected()
|
||||
log.Info("compact inject elapse", zap.Duration("elapse", time.Since(injectStart)))
|
||||
|
||||
var (
|
||||
// SegmentID to deltaBlobs
|
||||
dblobs = make(map[UniqueID][]*Blob)
|
||||
dmu sync.Mutex
|
||||
)
|
||||
|
||||
var dblobs = make(map[UniqueID][]*Blob)
|
||||
allPath := make([][]string, 0)
|
||||
|
||||
downloadStart := time.Now()
|
||||
g, gCtx := errgroup.WithContext(ctxTimeout)
|
||||
for _, s := range t.plan.GetSegmentBinlogs() {
|
||||
|
||||
// Get the number of field binlog files from non-empty segment
|
||||
|
@ -662,27 +653,24 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
}
|
||||
|
||||
segID := s.GetSegmentID()
|
||||
paths := make([]string, 0)
|
||||
for _, d := range s.GetDeltalogs() {
|
||||
for _, l := range d.GetBinlogs() {
|
||||
path := l.GetLogPath()
|
||||
g.Go(func() error {
|
||||
bs, err := t.download(gCtx, []string{path})
|
||||
if err != nil {
|
||||
log.Warn("compact download deltalogs wrong", zap.String("path", path), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
dmu.Lock()
|
||||
dblobs[segID] = append(dblobs[segID], bs...)
|
||||
dmu.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
paths = append(paths, path)
|
||||
}
|
||||
}
|
||||
|
||||
if len(paths) != 0 {
|
||||
bs, err := t.download(ctxTimeout, paths)
|
||||
if err != nil {
|
||||
log.Warn("compact download deltalogs wrong", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
dblobs[segID] = append(dblobs[segID], bs...)
|
||||
}
|
||||
}
|
||||
|
||||
err = g.Wait()
|
||||
log.Info("compact download deltalogs elapse", zap.Duration("elapse", time.Since(downloadStart)))
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -667,7 +667,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
|
||||
|
||||
ct := &compactionTask{
|
||||
uploader: &binlogIO{&mockCm{errMultiSave: true}, alloc},
|
||||
uploader: &binlogIO{&mockCm{errSave: true}, alloc},
|
||||
done: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
|
|
|
@ -35,3 +35,17 @@ func getOrCreateStatsPool() *conc.Pool[any] {
|
|||
statsPoolInitOnce.Do(initStatsPool)
|
||||
return statsPool
|
||||
}
|
||||
|
||||
func initMultiReadPool() {
|
||||
capacity := Params.DataNodeCfg.FileReadConcurrency.GetAsInt()
|
||||
if capacity > runtime.GOMAXPROCS(0) {
|
||||
capacity = runtime.GOMAXPROCS(0)
|
||||
}
|
||||
// error only happens with negative expiry duration or with negative pre-alloc size.
|
||||
ioPool = conc.NewPool[any](capacity)
|
||||
}
|
||||
|
||||
func getMultiReadPool() *conc.Pool[any] {
|
||||
ioPoolInitOnce.Do(initMultiReadPool)
|
||||
return ioPool
|
||||
}
|
||||
|
|
|
@ -2339,9 +2339,12 @@ type dataNodeConfig struct {
|
|||
// watchEvent
|
||||
WatchEventTicklerInterval ParamItem `refreshable:"false"`
|
||||
|
||||
// io concurrency to fetch stats logs
|
||||
// io concurrency to add segment
|
||||
IOConcurrency ParamItem `refreshable:"false"`
|
||||
|
||||
// Concurrency to handle compaction file read
|
||||
FileReadConcurrency ParamItem `refreshable:"false"`
|
||||
|
||||
// memory management
|
||||
MemoryForceSyncEnable ParamItem `refreshable:"true"`
|
||||
MemoryForceSyncSegmentNum ParamItem `refreshable:"true"`
|
||||
|
@ -2468,10 +2471,17 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
|||
p.IOConcurrency = ParamItem{
|
||||
Key: "dataNode.dataSync.ioConcurrency",
|
||||
Version: "2.0.0",
|
||||
DefaultValue: "10",
|
||||
DefaultValue: "16",
|
||||
}
|
||||
p.IOConcurrency.Init(base.mgr)
|
||||
|
||||
p.FileReadConcurrency = ParamItem{
|
||||
Key: "dataNode.multiRead.concurrency",
|
||||
Version: "2.0.0",
|
||||
DefaultValue: "16",
|
||||
}
|
||||
p.FileReadConcurrency.Init(base.mgr)
|
||||
|
||||
p.DataNodeTimeTickByRPC = ParamItem{
|
||||
Key: "datanode.timetick.byRPC",
|
||||
Version: "2.2.9",
|
||||
|
|
Loading…
Reference in New Issue