From 6635398a6de576aa71d6519651f18b049b898526 Mon Sep 17 00:00:00 2001
From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com>
Date: Tue, 19 Sep 2023 18:05:22 +0800
Subject: [PATCH] Fix Bin log concurrency by adding a pool (#27189)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
---
 internal/datanode/binlog_io.go         | 111 ++++++++++++++-----------
 internal/datanode/binlog_io_test.go    |  23 +++--
 internal/datanode/compactor.go         |  42 ++++------
 internal/datanode/compactor_test.go    |   2 +-
 internal/datanode/io_pool.go           |  14 ++++
 pkg/util/paramtable/component_param.go |  14 +++-
 6 files changed, 117 insertions(+), 89 deletions(-)

diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go
index eede6ada63..7708ab0d7e 100644
--- a/internal/datanode/binlog_io.go
+++ b/internal/datanode/binlog_io.go
@@ -30,9 +30,9 @@ import (
 	"github.com/milvus-io/milvus/internal/storage"
 	"github.com/milvus-io/milvus/pkg/common"
 	"github.com/milvus-io/milvus/pkg/log"
+	"github.com/milvus-io/milvus/pkg/util/conc"
 	"github.com/milvus-io/milvus/pkg/util/metautil"
 	"go.uber.org/zap"
-	"golang.org/x/sync/errgroup"
 )
 
 var (
@@ -68,41 +68,43 @@ var _ downloader = (*binlogIO)(nil)
 var _ uploader = (*binlogIO)(nil)
 
 func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
-	var (
-		err = errStart
-		vs  [][]byte
-	)
-
 	log.Debug("down load", zap.Strings("path", paths))
-	g, gCtx := errgroup.WithContext(ctx)
-	g.Go(func() error {
-		for err != nil {
-			select {
-			case <-gCtx.Done():
-				log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
-				return errDownloadFromBlobStorage
-
-			default:
-				if err != errStart {
-					log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths))
-					time.Sleep(50 * time.Millisecond)
+	resp := make([]*Blob, len(paths))
+	if len(paths) == 0 {
+		return resp, nil
+	}
+	futures := make([]*conc.Future[any], len(paths))
+	for i, path := range paths {
+		localPath := path
+		future := getMultiReadPool().Submit(func() (any, error) {
+			var vs []byte
+			var err = errStart
+			for err != nil {
+				select {
+				case <-ctx.Done():
+					log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
+					return nil, errDownloadFromBlobStorage
+				default:
+					if err != errStart {
+						time.Sleep(50 * time.Millisecond)
+					}
+					vs, err = b.Read(ctx, localPath)
 				}
-				vs, err = b.MultiRead(ctx, paths)
 			}
+			return vs, nil
+		})
+		futures[i] = future
+	}
+
+	for i := range futures {
+		if !futures[i].OK() {
+			return nil, futures[i].Err()
+		} else {
+			resp[i] = &Blob{Value: futures[i].Value().([]byte)}
 		}
-		return nil
-	})
-
-	if err := g.Wait(); err != nil {
-		return nil, err
 	}
 
-	rst := make([]*Blob, len(vs))
-	for i := range rst {
-		rst[i] = &Blob{Value: vs[i]}
-	}
-
-	return rst, nil
+	return resp, nil
 }
 
 func (b *binlogIO) uploadSegmentFiles(
@@ -110,24 +112,39 @@ func (b *binlogIO) uploadSegmentFiles(
 	CollectionID UniqueID,
 	segID UniqueID,
 	kvs map[string][]byte) error {
-	var err = errStart
-	for err != nil {
-		select {
-		case <-ctx.Done():
-			log.Warn("ctx done when saving kvs to blob storage",
-				zap.Int64("collectionID", CollectionID),
-				zap.Int64("segmentID", segID),
-				zap.Int("number of kvs", len(kvs)))
-			return errUploadToBlobStorage
-		default:
-			if err != errStart {
-				log.Warn("save binlog failed, retry in 50ms",
-					zap.Int64("collectionID", CollectionID),
-					zap.Int64("segmentID", segID))
-				time.Sleep(50 * time.Millisecond)
+	log.Debug("update", zap.Int64("collectionID", CollectionID), zap.Int64("segmentID", segID))
+	if len(kvs) == 0 {
+		return nil
+	}
+	futures := make([]*conc.Future[any], 0)
+	for key, val := range kvs {
+		localPath := key
+		localVal := val
+		future := getMultiReadPool().Submit(func() (any, error) {
+			var err = errStart
+			for err != nil {
+				select {
+				case <-ctx.Done():
+					log.Warn("ctx done when saving kvs to blob storage",
+						zap.Int64("collectionID", CollectionID),
+						zap.Int64("segmentID", segID),
+						zap.Int("number of kvs", len(kvs)))
+					return nil, errUploadToBlobStorage
+				default:
+					if err != errStart {
+						time.Sleep(50 * time.Millisecond)
+					}
+					err = b.Write(ctx, localPath, localVal)
+				}
 			}
-			err = b.MultiWrite(ctx, kvs)
-		}
+			return nil, nil
+		})
+		futures = append(futures, future)
+	}
+
+	err := conc.AwaitAll(futures...)
+	if err != nil {
+		return err
 	}
 	return nil
 }
diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go
index a51f5f481a..7084d819e5 100644
--- a/internal/datanode/binlog_io_test.go
+++ b/internal/datanode/binlog_io_test.go
@@ -89,7 +89,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
 					ctx, cancel := context.WithCancel(test.inctx)
 					cancel()
 
-					_, err := b.download(ctx, nil)
+					_, err := b.download(ctx, []string{"test"})
 					assert.EqualError(t, err, errDownloadFromBlobStorage.Error())
 				}
 			})
@@ -97,7 +97,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
 	})
 
 	t.Run("Test download twice", func(t *testing.T) {
-		mkc := &mockCm{errMultiLoad: true}
+		mkc := &mockCm{errRead: true}
 		alloc := allocator.NewMockAllocator(t)
 		b := &binlogIO{mkc, alloc}
 
@@ -145,7 +145,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
 		})
 
 		t.Run("upload failed", func(t *testing.T) {
-			mkc := &mockCm{errMultiLoad: true, errMultiSave: true}
+			mkc := &mockCm{errRead: true, errSave: true}
 			alloc := allocator.NewMockAllocator(t)
 			b := binlogIO{mkc, alloc}
 
@@ -360,8 +360,8 @@ func TestBinlogIOInnerMethods(t *testing.T) {
 
 type mockCm struct {
 	storage.ChunkManager
-	errMultiLoad    bool
-	errMultiSave    bool
+	errRead         bool
+	errSave         bool
 	MultiReadReturn [][]byte
 	ReadReturn      []byte
 }
@@ -373,25 +373,24 @@ func (mk *mockCm) RootPath() string {
 }
 
 func (mk *mockCm) Write(ctx context.Context, filePath string, content []byte) error {
+	if mk.errSave {
+		return errors.New("mockKv save error")
+	}
 	return nil
 }
 
 func (mk *mockCm) MultiWrite(ctx context.Context, contents map[string][]byte) error {
-	if mk.errMultiSave {
-		return errors.New("mockKv multisave error")
-	}
 	return nil
 }
 
 func (mk *mockCm) Read(ctx context.Context, filePath string) ([]byte, error) {
+	if mk.errRead {
+		return nil, errors.New("mockKv read error")
+	}
 	return mk.ReadReturn, nil
 }
 
 func (mk *mockCm) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) {
-	if mk.errMultiLoad {
-		return nil, errors.New("mockKv multiload error")
-	}
-
 	if mk.MultiReadReturn != nil {
 		return mk.MultiReadReturn, nil
 	}
diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go
index 40f33b2262..25e30663a8 100644
--- a/internal/datanode/compactor.go
+++ b/internal/datanode/compactor.go
@@ -22,15 +22,9 @@ import (
 	"path"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/cockroachdb/errors"
-	"github.com/milvus-io/milvus/pkg/util/merr"
-	"github.com/milvus-io/milvus/pkg/util/tsoutil"
-	"go.uber.org/zap"
-	"golang.org/x/sync/errgroup"
-
 	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
 	"github.com/milvus-io/milvus/internal/datanode/allocator"
 	"github.com/milvus-io/milvus/internal/proto/datapb"
@@ -40,10 +34,13 @@ import (
 	"github.com/milvus-io/milvus/pkg/log"
 	"github.com/milvus-io/milvus/pkg/metrics"
 	"github.com/milvus-io/milvus/pkg/util/funcutil"
+	"github.com/milvus-io/milvus/pkg/util/merr"
 	"github.com/milvus-io/milvus/pkg/util/metautil"
 	"github.com/milvus-io/milvus/pkg/util/paramtable"
 	"github.com/milvus-io/milvus/pkg/util/timerecord"
+	"github.com/milvus-io/milvus/pkg/util/tsoutil"
 	"github.com/milvus-io/milvus/pkg/util/typeutil"
+	"go.uber.org/zap"
 )
 
 var (
@@ -627,16 +624,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
 	<-ti.Injected()
 	log.Info("compact inject elapse", zap.Duration("elapse", time.Since(injectStart)))
 
-	var (
-		// SegmentID to deltaBlobs
-		dblobs = make(map[UniqueID][]*Blob)
-		dmu    sync.Mutex
-	)
-
+	var dblobs = make(map[UniqueID][]*Blob)
 	allPath := make([][]string, 0)
 
 	downloadStart := time.Now()
-	g, gCtx := errgroup.WithContext(ctxTimeout)
 	for _, s := range t.plan.GetSegmentBinlogs() {
 
 		// Get the number of field binlog files from non-empty segment
@@ -662,27 +653,24 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
 		}
 
 		segID := s.GetSegmentID()
+		paths := make([]string, 0)
 		for _, d := range s.GetDeltalogs() {
 			for _, l := range d.GetBinlogs() {
 				path := l.GetLogPath()
-				g.Go(func() error {
-					bs, err := t.download(gCtx, []string{path})
-					if err != nil {
-						log.Warn("compact download deltalogs wrong", zap.String("path", path), zap.Error(err))
-						return err
-					}
-
-					dmu.Lock()
-					dblobs[segID] = append(dblobs[segID], bs...)
-					dmu.Unlock()
-
-					return nil
-				})
+				paths = append(paths, path)
 			}
 		}
+
+		if len(paths) != 0 {
+			bs, err := t.download(ctxTimeout, paths)
+			if err != nil {
+				log.Warn("compact download deltalogs wrong", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err))
+				return nil, err
+			}
+			dblobs[segID] = append(dblobs[segID], bs...)
+		}
 	}
 
-	err = g.Wait()
 	log.Info("compact download deltalogs elapse", zap.Duration("elapse", time.Since(downloadStart)))
 
 	if err != nil {
diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go
index 49f120da40..5f0ebbb434 100644
--- a/internal/datanode/compactor_test.go
+++ b/internal/datanode/compactor_test.go
@@ -667,7 +667,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
 			stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
 
 			ct := &compactionTask{
-				uploader: &binlogIO{&mockCm{errMultiSave: true}, alloc},
+				uploader: &binlogIO{&mockCm{errSave: true}, alloc},
 				done:     make(chan struct{}, 1),
 			}
 
diff --git a/internal/datanode/io_pool.go b/internal/datanode/io_pool.go
index 5f5feaa0c9..fdac4d3d5e 100644
--- a/internal/datanode/io_pool.go
+++ b/internal/datanode/io_pool.go
@@ -35,3 +35,17 @@ func getOrCreateStatsPool() *conc.Pool[any] {
 	statsPoolInitOnce.Do(initStatsPool)
 	return statsPool
 }
+
+func initMultiReadPool() {
+	capacity := Params.DataNodeCfg.FileReadConcurrency.GetAsInt()
+	if capacity > runtime.GOMAXPROCS(0) {
+		capacity = runtime.GOMAXPROCS(0)
+	}
+	// error only happens with negative expiry duration or with negative pre-alloc size.
+	ioPool = conc.NewPool[any](capacity)
+}
+
+func getMultiReadPool() *conc.Pool[any] {
+	ioPoolInitOnce.Do(initMultiReadPool)
+	return ioPool
+}
diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go
index e13c01e721..4d374601e2 100644
--- a/pkg/util/paramtable/component_param.go
+++ b/pkg/util/paramtable/component_param.go
@@ -2339,9 +2339,12 @@ type dataNodeConfig struct {
 	// watchEvent
 	WatchEventTicklerInterval ParamItem `refreshable:"false"`
 
-	// io concurrency to fetch stats logs
+	// io concurrency to add segment
 	IOConcurrency ParamItem `refreshable:"false"`
 
+	// Concurrency to handle compaction file read
+	FileReadConcurrency ParamItem `refreshable:"false"`
+
 	// memory management
 	MemoryForceSyncEnable     ParamItem `refreshable:"true"`
 	MemoryForceSyncSegmentNum ParamItem `refreshable:"true"`
@@ -2468,10 +2471,17 @@ func (p *dataNodeConfig) init(base *BaseTable) {
 	p.IOConcurrency = ParamItem{
 		Key:          "dataNode.dataSync.ioConcurrency",
 		Version:      "2.0.0",
-		DefaultValue: "10",
+		DefaultValue: "16",
 	}
 	p.IOConcurrency.Init(base.mgr)
 
+	p.FileReadConcurrency = ParamItem{
+		Key:          "dataNode.multiRead.concurrency",
+		Version:      "2.0.0",
+		DefaultValue: "16",
+	}
+	p.FileReadConcurrency.Init(base.mgr)
+
 	p.DataNodeTimeTickByRPC = ParamItem{
 		Key:          "datanode.timetick.byRPC",
 		Version:      "2.2.9",