fix: sync task still running after DataNode has stopped (#38377)

issue: #38319

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/38527/head
jaime 2024-12-17 18:06:44 +08:00 committed by GitHub
parent d0a7e98a27
commit 29e620fa6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 158 additions and 31 deletions

View File

@ -878,6 +878,8 @@ common:
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode.
sync:
taskPoolReleaseTimeoutSeconds: 60 # The maximum time to wait for the task to finish and release resources in the pool
# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:

View File

@ -402,6 +402,13 @@ func (node *DataNode) Stop() error {
node.writeBufferManager.Stop()
}
if node.syncMgr != nil {
err := node.syncMgr.Close()
if err != nil {
log.Error("sync manager close failed", zap.Error(err))
}
}
if node.allocator != nil {
log.Ctx(node.ctx).Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.allocator.Close()

View File

@ -246,11 +246,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
@ -307,11 +307,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, errors.New("mock err")
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
@ -384,11 +384,11 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}
func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
var once sync.Once
data, err := testutil.CreateInsertData(s.schema, s.numRows)

View File

@ -228,7 +228,7 @@ func (t *ImportTask) importFile(reader importutilv2.Reader) error {
}
func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync import data", WrapLogFields(t)...)
log.Ctx(context.TODO()).Info("start to sync import data", WrapLogFields(t)...)
futures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for channelIdx, datas := range hashedData {
@ -256,7 +256,11 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Ctx(context.TODO()).Error("sync data failed", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}

View File

@ -213,7 +213,7 @@ func (t *L0ImportTask) importL0(reader binlog.L0Reader) error {
}
func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync l0 delete data", WrapLogFields(t)...)
log.Ctx(context.TODO()).Info("start to sync l0 delete data", WrapLogFields(t)...)
futures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for channelIdx, data := range delData {
@ -231,7 +231,11 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Ctx(context.TODO()).Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}

View File

@ -132,7 +132,7 @@ func (s *L0ImportSuite) TestL0PreImport() {
func (s *L0ImportSuite) TestL0Import() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
task.(*syncmgr.SyncTask).WithAllocator(alloc)
@ -147,7 +147,7 @@ func (s *L0ImportSuite) TestL0Import() {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
req := &datapb.ImportRequest{

View File

@ -23,8 +23,53 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter {
return &MockSyncManager_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockSyncManager) Close() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSyncManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockSyncManager_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockSyncManager_Expecter) Close() *MockSyncManager_Close_Call {
return &MockSyncManager_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockSyncManager_Close_Call) Run(run func()) *MockSyncManager_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSyncManager_Close_Call) Return(_a0 error) *MockSyncManager_Close_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSyncManager_Close_Call) RunAndReturn(run func() error) *MockSyncManager_Close_Call {
_c.Call.Return(run)
return _c
}
// SyncData provides a mock function with given fields: ctx, task, callbacks
func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
_va := make([]interface{}, len(callbacks))
for _i := range callbacks {
_va[_i] = callbacks[_i]
@ -39,6 +84,10 @@ func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ..
}
var r0 *conc.Future[struct{}]
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) (*conc.Future[struct{}], error)); ok {
return rf(ctx, task, callbacks...)
}
if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]); ok {
r0 = rf(ctx, task, callbacks...)
} else {
@ -47,7 +96,13 @@ func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ..
}
}
return r0
if rf, ok := ret.Get(1).(func(context.Context, Task, ...func(error) error) error); ok {
r1 = rf(ctx, task, callbacks...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockSyncManager_SyncData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncData'
@ -77,12 +132,12 @@ func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task
return _c
}
func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}]) *MockSyncManager_SyncData_Call {
_c.Call.Return(_a0)
func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}], _a1 error) *MockSyncManager_SyncData_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call {
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task, ...func(error) error) (*conc.Future[struct{}], error)) *MockSyncManager_SyncData_Call {
_c.Call.Return(run)
return _c
}

View File

@ -47,8 +47,10 @@ type SyncMeta struct {
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}]
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)
// Close waits for the task to finish and then shuts down the sync manager.
Close() error
TaskStatsJSON() string
}
@ -97,13 +99,17 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
}
}
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
if mgr.workerPool.IsClosed() {
return nil, fmt.Errorf("sync manager is closed")
}
switch t := task.(type) {
case *SyncTask:
t.WithChunkManager(mgr.chunkManager)
}
return mgr.safeSubmitTask(ctx, task, callbacks...)
return mgr.safeSubmitTask(ctx, task, callbacks...), nil
}
// safeSubmitTask submits task to SyncManager
@ -126,6 +132,7 @@ func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callba
}
callbacks = append([]func(error) error{handler}, callbacks...)
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))
return mgr.Submit(ctx, key, task, callbacks...)
}
@ -142,3 +149,8 @@ func (mgr *syncManager) TaskStatsJSON() string {
}
return string(ret)
}
func (mgr *syncManager) Close() error {
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
return mgr.workerPool.ReleaseTimeout(timeout)
}

View File

@ -173,13 +173,24 @@ func (s *SyncManagerSuite) TestSubmit() {
Timestamp: 100,
})
f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)
_, err := f.Await()
_, err = f.Await()
s.NoError(err)
}
func (s *SyncManagerSuite) TestClose() {
manager := NewSyncManager(s.chunkManager)
err := manager.Close()
s.NoError(err)
f, err := manager.SyncData(context.Background(), nil)
s.Error(err)
s.Nil(f)
}
func (s *SyncManagerSuite) TestCompacted() {
var segmentID atomic.Int64
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, req *datapb.SaveBinlogPathsRequest) {
@ -202,10 +213,11 @@ func (s *SyncManagerSuite) TestCompacted() {
Timestamp: 100,
})
f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)
_, err := f.Await()
_, err = f.Await()
s.NoError(err)
s.EqualValues(1001, segmentID.Load())
}
@ -254,7 +266,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
task.EXPECT().Run(mock.Anything).Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)
f := manager.SyncData(context.Background(), task)
f, _ := manager.SyncData(context.Background(), task)
_, err := f.Await()
s.Error(err)
}
@ -268,7 +280,7 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
task.EXPECT().Run(mock.Anything).Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)
f := manager.SyncData(context.Background(), task)
f, _ := manager.SyncData(context.Background(), task)
_, err := f.Await()
s.Error(err)
}

View File

@ -322,7 +322,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}
result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}
@ -342,7 +342,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}
return nil
}))
})
if err != nil {
log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
}
result = append(result, future)
}
return result
}
@ -643,7 +647,7 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
t.WithDrop()
}
f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
f, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}
@ -656,6 +660,9 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
}
return nil
})
if err != nil {
log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
}
futures = append(futures, f)
}

View File

@ -317,7 +317,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
return struct{}{}, nil
}))
}), nil)
defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()

View File

@ -20,8 +20,9 @@ import (
"fmt"
"strconv"
"sync"
"time"
ants "github.com/panjf2000/ants/v2"
"github.com/panjf2000/ants/v2"
"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/hardware"
@ -107,10 +108,18 @@ func (pool *Pool[T]) Free() int {
return pool.inner.Free()
}
func (pool *Pool[T]) IsClosed() bool {
return pool.inner.IsClosed()
}
func (pool *Pool[T]) Release() {
pool.inner.Release()
}
func (pool *Pool[T]) ReleaseTimeout(timeout time.Duration) error {
return pool.inner.ReleaseTimeout(timeout)
}
func (pool *Pool[T]) Resize(size int) error {
if pool.opt.preAlloc {
return merr.WrapErrServiceInternal("cannot resize pre-alloc pool")

View File

@ -290,6 +290,8 @@ type commonConfig struct {
HealthCheckInterval ParamItem `refreshable:"true"`
HealthCheckRPCTimeout ParamItem `refreshable:"true"`
SyncTaskPoolReleaseTimeoutSeconds ParamItem `refreshable:"true"`
}
func (p *commonConfig) init(base *BaseTable) {
@ -965,6 +967,15 @@ This helps Milvus-CDC synchronize incremental data`,
Doc: `RPC timeout for health check request`,
}
p.HealthCheckRPCTimeout.Init(base.mgr)
p.SyncTaskPoolReleaseTimeoutSeconds = ParamItem{
Key: "common.sync.taskPoolReleaseTimeoutSeconds",
DefaultValue: "60",
Version: "2.4.19",
Doc: "The maximum time to wait for the task to finish and release resources in the pool",
Export: true,
}
p.SyncTaskPoolReleaseTimeoutSeconds.Init(base.mgr)
}
type gpuConfig struct {

View File

@ -143,6 +143,10 @@ func TestComponentParam(t *testing.T) {
assert.False(t, params.CommonCfg.LocalRPCEnabled.GetAsBool())
params.Save("common.localRPCEnabled", "true")
assert.True(t, params.CommonCfg.LocalRPCEnabled.GetAsBool())
assert.Equal(t, 60*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second))
params.Save("common.sync.taskPoolReleaseTimeoutSeconds", "100")
assert.Equal(t, 100*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second))
})
t.Run("test rootCoordConfig", func(t *testing.T) {