From 4731c1b0d529722e2402beb3f8c788199df2afe7 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 15 Dec 2023 09:58:43 +0800 Subject: [PATCH] enhance: make SyncManager pool size refreshable (#29224) See also #29223 This PR make `conc.Pool` resizable by adding `Resize` method for it. Also make newly added datanode `MaxParallelSyncMgrTasks` config refreshable --------- Signed-off-by: Congqi.Xia --- internal/datanode/data_node.go | 3 +- internal/datanode/mock_test.go | 2 +- .../datanode/syncmgr/key_lock_dispatcher.go | 5 +- internal/datanode/syncmgr/sync_manager.go | 54 +++++++++++++--- .../datanode/syncmgr/sync_manager_test.go | 63 +++++++++++++++++-- pkg/util/conc/pool.go | 13 ++++ pkg/util/conc/pool_test.go | 21 +++++++ pkg/util/paramtable/component_param.go | 2 +- 8 files changed, 143 insertions(+), 20 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c18814900e..5b89d835f0 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -273,8 +273,7 @@ func (node *DataNode) Init() error { } node.chunkManager = chunkManager - syncMgr, err := syncmgr.NewSyncManager(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt(), - node.chunkManager, node.allocator) + syncMgr, err := syncmgr.NewSyncManager(node.chunkManager, node.allocator) if err != nil { initError = err log.Error("failed to create sync manager", zap.Error(err)) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 66aec069e3..211310f599 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -92,7 +92,7 @@ func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod node.broker = broker node.timeTickSender = newTimeTickSender(node.broker, 0) - syncMgr, _ := syncmgr.NewSyncManager(10, node.chunkManager, node.allocator) + syncMgr, _ := syncmgr.NewSyncManager(node.chunkManager, node.allocator) node.syncMgr = syncMgr node.writeBufferManager = writebuffer.NewManager(node.syncMgr) diff --git a/internal/datanode/syncmgr/key_lock_dispatcher.go b/internal/datanode/syncmgr/key_lock_dispatcher.go index 493c53c57c..6a51a0f7fb 100644 --- a/internal/datanode/syncmgr/key_lock_dispatcher.go +++ b/internal/datanode/syncmgr/key_lock_dispatcher.go @@ -20,10 +20,11 @@ type keyLockDispatcher[K comparable] struct { } func newKeyLockDispatcher[K comparable](maxParallel int) *keyLockDispatcher[K] { - return &keyLockDispatcher[K]{ - workerPool: conc.NewPool[error](maxParallel, conc.WithPreAlloc(true)), + dispatcher := &keyLockDispatcher[K]{ + workerPool: conc.NewPool[error](maxParallel, conc.WithPreAlloc(false)), keyLock: lock.NewKeyLock[K](), } + return dispatcher } func (d *keyLockDispatcher[K]) Submit(key K, t Task, callbacks ...func(error)) *conc.Future[error] { diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 9358a1f638..b78c8cda2a 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -5,13 +5,18 @@ import ( "fmt" "strconv" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/config" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -57,19 +62,48 @@ type syncManager struct { tasks *typeutil.ConcurrentMap[string, Task] } -func NewSyncManager(parallelTask int, chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) { - if parallelTask < 1 { - return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(parallelTask), 10)) +func NewSyncManager(chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) { + params := paramtable.Get() + initPoolSize := params.DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt() + if initPoolSize < 1 { + return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(initPoolSize), 10)) } - return &syncManager{ - keyLockDispatcher: newKeyLockDispatcher[int64](parallelTask), + dispatcher := newKeyLockDispatcher[int64](initPoolSize) + log.Info("sync manager initialized", zap.Int("initPoolSize", initPoolSize)) + + syncMgr := &syncManager{ + keyLockDispatcher: dispatcher, chunkManager: chunkManager, allocator: allocator, tasks: typeutil.NewConcurrentMap[string, Task](), - }, nil + } + // setup config update watcher + params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)) + + return syncMgr, nil } -func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] { +func (mgr *syncManager) resizeHandler(evt *config.Event) { + if evt.HasUpdated { + log := log.Ctx(context.Background()).With( + zap.String("key", evt.Key), + zap.String("value", evt.Value), + ) + size, err := strconv.ParseInt(evt.Value, 10, 64) + if err != nil { + log.Warn("failed to parse new datanode syncmgr pool size", zap.Error(err)) + return + } + err = mgr.keyLockDispatcher.workerPool.Resize(int(size)) + if err != nil { + log.Warn("failed to resize datanode syncmgr pool size", zap.String("key", evt.Key), zap.String("value", evt.Value), zap.Error(err)) + return + } + log.Info("sync mgr pool size updated", zap.Int64("newSize", size)) + } +} + +func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] { switch t := task.(type) { case *SyncTask: t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager) @@ -88,7 +122,7 @@ func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[err }) } -func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { +func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { var cp *msgpb.MsgPosition var segmentID int64 mgr.tasks.Range(func(_ string, task Task) bool { @@ -106,10 +140,10 @@ func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPos return segmentID, cp } -func (mgr syncManager) Block(segmentID int64) { +func (mgr *syncManager) Block(segmentID int64) { mgr.keyLock.Lock(segmentID) } -func (mgr syncManager) Unblock(segmentID int64) { +func (mgr *syncManager) Unblock(segmentID int64) { mgr.keyLock.Unlock(segmentID) } diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index 6bfdb5fc46..953f5c4c49 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -3,6 +3,7 @@ package syncmgr import ( "context" "math/rand" + "strconv" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -41,7 +43,7 @@ type SyncManagerSuite struct { } func (s *SyncManagerSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) + paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) s.collectionID = 100 s.partitionID = 101 @@ -155,7 +157,7 @@ func (s *SyncManagerSuite) TestSubmit() { s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - manager, err := NewSyncManager(10, s.chunkManager, s.allocator) + manager, err := NewSyncManager(s.chunkManager, s.allocator) s.NoError(err) task := s.getSuiteSyncTask() task.WithMetaWriter(BrokerMetaWriter(s.broker)) @@ -187,7 +189,7 @@ func (s *SyncManagerSuite) TestCompacted() { s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - manager, err := NewSyncManager(10, s.chunkManager, s.allocator) + manager, err := NewSyncManager(s.chunkManager, s.allocator) s.NoError(err) task := s.getSuiteSyncTask() task.WithMetaWriter(BrokerMetaWriter(s.broker)) @@ -225,7 +227,7 @@ func (s *SyncManagerSuite) TestBlock() { } }) - manager, err := NewSyncManager(10, s.chunkManager, s.allocator) + manager, err := NewSyncManager(s.chunkManager, s.allocator) s.NoError(err) // block @@ -253,6 +255,59 @@ func (s *SyncManagerSuite) TestBlock() { <-sig } +func (s *SyncManagerSuite) TestResizePool() { + manager, err := NewSyncManager(s.chunkManager, s.allocator) + s.NoError(err) + + syncMgr, ok := manager.(*syncManager) + s.Require().True(ok) + + cap := syncMgr.keyLockDispatcher.workerPool.Cap() + s.NotZero(cap) + + params := paramtable.Get() + configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key + + syncMgr.resizeHandler(&config.Event{ + Key: configKey, + Value: "abc", + HasUpdated: true, + }) + + s.Equal(cap, syncMgr.keyLockDispatcher.workerPool.Cap()) + + syncMgr.resizeHandler(&config.Event{ + Key: configKey, + Value: "-1", + HasUpdated: true, + }) + s.Equal(cap, syncMgr.keyLockDispatcher.workerPool.Cap()) + + syncMgr.resizeHandler(&config.Event{ + Key: configKey, + Value: strconv.FormatInt(int64(cap*2), 10), + HasUpdated: true, + }) + s.Equal(cap*2, syncMgr.keyLockDispatcher.workerPool.Cap()) +} + +func (s *SyncManagerSuite) TestNewSyncManager() { + manager, err := NewSyncManager(s.chunkManager, s.allocator) + s.NoError(err) + + _, ok := manager.(*syncManager) + s.Require().True(ok) + + params := paramtable.Get() + configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key + defer params.Reset(configKey) + + params.Save(configKey, "0") + + _, err = NewSyncManager(s.chunkManager, s.allocator) + s.Error(err) +} + func TestSyncManager(t *testing.T) { suite.Run(t, new(SyncManagerSuite)) } diff --git a/pkg/util/conc/pool.go b/pkg/util/conc/pool.go index d5b3e286e7..8c6c1fb25c 100644 --- a/pkg/util/conc/pool.go +++ b/pkg/util/conc/pool.go @@ -18,12 +18,14 @@ package conc import ( "fmt" + "strconv" "sync" ants "github.com/panjf2000/ants/v2" "github.com/milvus-io/milvus/pkg/util/generic" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/merr" ) // A goroutine pool @@ -110,6 +112,17 @@ func (pool *Pool[T]) Release() { pool.inner.Release() } +func (pool *Pool[T]) Resize(size int) error { + if pool.opt.preAlloc { + return merr.WrapErrServiceInternal("cannot resize pre-alloc pool") + } + if size <= 0 { + return merr.WrapErrParameterInvalid("positive size", strconv.FormatInt(int64(size), 10)) + } + pool.inner.Tune(size) + return nil +} + // WarmupPool do warm up logic for each goroutine in pool func WarmupPool[T any](pool *Pool[T], warmup func()) { cap := pool.Cap() diff --git a/pkg/util/conc/pool_test.go b/pkg/util/conc/pool_test.go index f6fcf4ca50..3c09fc6b8a 100644 --- a/pkg/util/conc/pool_test.go +++ b/pkg/util/conc/pool_test.go @@ -21,6 +21,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/hardware" ) func TestPool(t *testing.T) { @@ -55,6 +57,25 @@ func TestPool(t *testing.T) { } } +func TestPoolResize(t *testing.T) { + cpuNum := hardware.GetCPUNum() + + pool := NewPool[any](cpuNum) + + assert.Equal(t, cpuNum, pool.Cap()) + + err := pool.Resize(cpuNum * 2) + assert.NoError(t, err) + assert.Equal(t, cpuNum*2, pool.Cap()) + + err = pool.Resize(0) + assert.Error(t, err) + + pool = NewDefaultPool[any]() + err = pool.Resize(cpuNum * 2) + assert.Error(t, err) +} + func TestPoolWithPanic(t *testing.T) { pool := NewPool[any](1, WithConcealPanic(true)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 155b4dedc1..07f2b68509 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2589,7 +2589,7 @@ type dataNodeConfig struct { FlowGraphMaxQueueLength ParamItem `refreshable:"false"` FlowGraphMaxParallelism ParamItem `refreshable:"false"` MaxParallelSyncTaskNum ParamItem `refreshable:"false"` - MaxParallelSyncMgrTasks ParamItem `refreshable:"false"` + MaxParallelSyncMgrTasks ParamItem `refreshable:"true"` // skip mode FlowGraphSkipModeEnable ParamItem `refreshable:"true"`