enhance: make SyncManager pool size refreshable (#29224)

See also #29223

This PR make `conc.Pool` resizable by adding `Resize` method for it. 
Also make newly added datanode `MaxParallelSyncMgrTasks` config
refreshable

---------

Signed-off-by: Congqi.Xia <congqi.xia@zilliz.com>
pull/29136/head
congqixia 2023-12-15 09:58:43 +08:00 committed by GitHub
parent 25a4525297
commit 4731c1b0d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 143 additions and 20 deletions

View File

@ -273,8 +273,7 @@ func (node *DataNode) Init() error {
}
node.chunkManager = chunkManager
syncMgr, err := syncmgr.NewSyncManager(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt(),
node.chunkManager, node.allocator)
syncMgr, err := syncmgr.NewSyncManager(node.chunkManager, node.allocator)
if err != nil {
initError = err
log.Error("failed to create sync manager", zap.Error(err))

View File

@ -92,7 +92,7 @@ func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod
node.broker = broker
node.timeTickSender = newTimeTickSender(node.broker, 0)
syncMgr, _ := syncmgr.NewSyncManager(10, node.chunkManager, node.allocator)
syncMgr, _ := syncmgr.NewSyncManager(node.chunkManager, node.allocator)
node.syncMgr = syncMgr
node.writeBufferManager = writebuffer.NewManager(node.syncMgr)

View File

@ -20,10 +20,11 @@ type keyLockDispatcher[K comparable] struct {
}
func newKeyLockDispatcher[K comparable](maxParallel int) *keyLockDispatcher[K] {
return &keyLockDispatcher[K]{
workerPool: conc.NewPool[error](maxParallel, conc.WithPreAlloc(true)),
dispatcher := &keyLockDispatcher[K]{
workerPool: conc.NewPool[error](maxParallel, conc.WithPreAlloc(false)),
keyLock: lock.NewKeyLock[K](),
}
return dispatcher
}
func (d *keyLockDispatcher[K]) Submit(key K, t Task, callbacks ...func(error)) *conc.Future[error] {

View File

@ -5,13 +5,18 @@ import (
"fmt"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -57,19 +62,48 @@ type syncManager struct {
tasks *typeutil.ConcurrentMap[string, Task]
}
func NewSyncManager(parallelTask int, chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) {
if parallelTask < 1 {
return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(parallelTask), 10))
func NewSyncManager(chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) {
params := paramtable.Get()
initPoolSize := params.DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt()
if initPoolSize < 1 {
return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(initPoolSize), 10))
}
return &syncManager{
keyLockDispatcher: newKeyLockDispatcher[int64](parallelTask),
dispatcher := newKeyLockDispatcher[int64](initPoolSize)
log.Info("sync manager initialized", zap.Int("initPoolSize", initPoolSize))
syncMgr := &syncManager{
keyLockDispatcher: dispatcher,
chunkManager: chunkManager,
allocator: allocator,
tasks: typeutil.NewConcurrentMap[string, Task](),
}, nil
}
// setup config update watcher
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler))
return syncMgr, nil
}
func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] {
func (mgr *syncManager) resizeHandler(evt *config.Event) {
if evt.HasUpdated {
log := log.Ctx(context.Background()).With(
zap.String("key", evt.Key),
zap.String("value", evt.Value),
)
size, err := strconv.ParseInt(evt.Value, 10, 64)
if err != nil {
log.Warn("failed to parse new datanode syncmgr pool size", zap.Error(err))
return
}
err = mgr.keyLockDispatcher.workerPool.Resize(int(size))
if err != nil {
log.Warn("failed to resize datanode syncmgr pool size", zap.String("key", evt.Key), zap.String("value", evt.Value), zap.Error(err))
return
}
log.Info("sync mgr pool size updated", zap.Int64("newSize", size))
}
}
func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] {
switch t := task.(type) {
case *SyncTask:
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
@ -88,7 +122,7 @@ func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[err
})
}
func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
var cp *msgpb.MsgPosition
var segmentID int64
mgr.tasks.Range(func(_ string, task Task) bool {
@ -106,10 +140,10 @@ func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPos
return segmentID, cp
}
func (mgr syncManager) Block(segmentID int64) {
func (mgr *syncManager) Block(segmentID int64) {
mgr.keyLock.Lock(segmentID)
}
func (mgr syncManager) Unblock(segmentID int64) {
func (mgr *syncManager) Unblock(segmentID int64) {
mgr.keyLock.Unlock(segmentID)
}

View File

@ -3,6 +3,7 @@ package syncmgr
import (
"context"
"math/rand"
"strconv"
"testing"
"time"
@ -21,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -41,7 +43,7 @@ type SyncManagerSuite struct {
}
func (s *SyncManagerSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true)))
s.collectionID = 100
s.partitionID = 101
@ -155,7 +157,7 @@ func (s *SyncManagerSuite) TestSubmit() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
@ -187,7 +189,7 @@ func (s *SyncManagerSuite) TestCompacted() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
@ -225,7 +227,7 @@ func (s *SyncManagerSuite) TestBlock() {
}
})
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
// block
@ -253,6 +255,59 @@ func (s *SyncManagerSuite) TestBlock() {
<-sig
}
func (s *SyncManagerSuite) TestResizePool() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
syncMgr, ok := manager.(*syncManager)
s.Require().True(ok)
cap := syncMgr.keyLockDispatcher.workerPool.Cap()
s.NotZero(cap)
params := paramtable.Get()
configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key
syncMgr.resizeHandler(&config.Event{
Key: configKey,
Value: "abc",
HasUpdated: true,
})
s.Equal(cap, syncMgr.keyLockDispatcher.workerPool.Cap())
syncMgr.resizeHandler(&config.Event{
Key: configKey,
Value: "-1",
HasUpdated: true,
})
s.Equal(cap, syncMgr.keyLockDispatcher.workerPool.Cap())
syncMgr.resizeHandler(&config.Event{
Key: configKey,
Value: strconv.FormatInt(int64(cap*2), 10),
HasUpdated: true,
})
s.Equal(cap*2, syncMgr.keyLockDispatcher.workerPool.Cap())
}
func (s *SyncManagerSuite) TestNewSyncManager() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
_, ok := manager.(*syncManager)
s.Require().True(ok)
params := paramtable.Get()
configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key
defer params.Reset(configKey)
params.Save(configKey, "0")
_, err = NewSyncManager(s.chunkManager, s.allocator)
s.Error(err)
}
func TestSyncManager(t *testing.T) {
suite.Run(t, new(SyncManagerSuite))
}

View File

@ -18,12 +18,14 @@ package conc
import (
"fmt"
"strconv"
"sync"
ants "github.com/panjf2000/ants/v2"
"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// A goroutine pool
@ -110,6 +112,17 @@ func (pool *Pool[T]) Release() {
pool.inner.Release()
}
func (pool *Pool[T]) Resize(size int) error {
if pool.opt.preAlloc {
return merr.WrapErrServiceInternal("cannot resize pre-alloc pool")
}
if size <= 0 {
return merr.WrapErrParameterInvalid("positive size", strconv.FormatInt(int64(size), 10))
}
pool.inner.Tune(size)
return nil
}
// WarmupPool do warm up logic for each goroutine in pool
func WarmupPool[T any](pool *Pool[T], warmup func()) {
cap := pool.Cap()

View File

@ -21,6 +21,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/hardware"
)
func TestPool(t *testing.T) {
@ -55,6 +57,25 @@ func TestPool(t *testing.T) {
}
}
func TestPoolResize(t *testing.T) {
cpuNum := hardware.GetCPUNum()
pool := NewPool[any](cpuNum)
assert.Equal(t, cpuNum, pool.Cap())
err := pool.Resize(cpuNum * 2)
assert.NoError(t, err)
assert.Equal(t, cpuNum*2, pool.Cap())
err = pool.Resize(0)
assert.Error(t, err)
pool = NewDefaultPool[any]()
err = pool.Resize(cpuNum * 2)
assert.Error(t, err)
}
func TestPoolWithPanic(t *testing.T) {
pool := NewPool[any](1, WithConcealPanic(true))

View File

@ -2589,7 +2589,7 @@ type dataNodeConfig struct {
FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
MaxParallelSyncTaskNum ParamItem `refreshable:"false"`
MaxParallelSyncMgrTasks ParamItem `refreshable:"false"`
MaxParallelSyncMgrTasks ParamItem `refreshable:"true"`
// skip mode
FlowGraphSkipModeEnable ParamItem `refreshable:"true"`