enhance: Add ctx in `SyncTask.Run` to be cancellable (#34042)

Related to #33716

This PR add context param in SyncTask.Run execution functions to make it
cancellable from the caller.

This make it possible to cancel task when datanode/data sync service is
beeing shut down.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/34138/head
congqixia 2024-06-25 14:22:04 +08:00 committed by GitHub
parent 93291c0dca
commit 962a5446f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 142 additions and 104 deletions

View File

@ -141,7 +141,7 @@ func (s *L0ImportSuite) TestL0Import() {
s.cm.(*mocks.ChunkManager).EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil)
task.(*syncmgr.SyncTask).WithChunkManager(s.cm)
err := task.Run()
err := task.Run(context.Background())
s.NoError(err)
future := conc.Go(func() (struct{}, error) {

View File

@ -52,7 +52,7 @@ func (s *InsertBinlogIteratorSuite) TestBinlogIterator() {
values := [][]byte{}
for _, b := range blobs {
values = append(values, b.Value[:])
values = append(values, b.Value)
}
s.Run("invalid blobs", func() {
iter, err := NewInsertBinlogIterator([][]byte{}, Int64Field, schemapb.DataType_Int64, nil)

View File

@ -35,7 +35,7 @@ func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() {
dCodec := storage.NewDeleteCodec()
blob, err := dCodec.Serialize(CollectionID, 1, 1, dData)
s.Require().NoError(err)
value := [][]byte{blob.Value[:]}
value := [][]byte{blob.Value}
iter := NewDeltalogIterator(value, &Label{segmentID: 100})
s.NotNil(iter)

View File

@ -1,6 +1,8 @@
package syncmgr
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
@ -12,7 +14,7 @@ type Task interface {
Checkpoint() *msgpb.MsgPosition
StartPosition() *msgpb.MsgPosition
ChannelName() string
Run() error
Run(context.Context) error
HandleError(error)
}
@ -29,12 +31,12 @@ func newKeyLockDispatcher[K comparable](maxParallel int) *keyLockDispatcher[K] {
return dispatcher
}
func (d *keyLockDispatcher[K]) Submit(key K, t Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (d *keyLockDispatcher[K]) Submit(ctx context.Context, key K, t Task, callbacks ...func(error) error) *conc.Future[struct{}] {
d.keyLock.Lock(key)
return d.workerPool.Submit(func() (struct{}, error) {
defer d.keyLock.Unlock(key)
err := t.Run()
err := t.Run(ctx)
for _, callback := range callbacks {
err = callback(err)

View File

@ -1,6 +1,7 @@
package syncmgr
import (
"context"
"testing"
"time"
@ -41,22 +42,24 @@ type KeyLockDispatcherSuite struct {
}
func (s *KeyLockDispatcherSuite) TestKeyLock() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := newKeyLockDispatcher[int64](2)
done := make(chan struct{})
t1 := NewMockTask(s.T())
t1.EXPECT().Run().Run(func() {
t1.EXPECT().Run(ctx).Run(func(_ context.Context) {
<-done
}).Return(nil)
t2 := NewMockTask(s.T())
t2.EXPECT().Run().Return(nil)
t2.EXPECT().Run(ctx).Return(nil)
sig := atomic.NewBool(false)
d.Submit(1, t1)
d.Submit(ctx, 1, t1)
go func() {
d.Submit(1, t2)
d.Submit(ctx, 1, t2)
sig.Store(true)
}()
@ -69,23 +72,25 @@ func (s *KeyLockDispatcherSuite) TestKeyLock() {
}
func (s *KeyLockDispatcherSuite) TestCap() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := newKeyLockDispatcher[int64](1)
t1 := NewMockTask(s.T())
t2 := NewMockTask(s.T())
done := make(chan struct{})
t1.EXPECT().Run().Run(func() {
t1.EXPECT().Run(ctx).Run(func(_ context.Context) {
<-done
}).Return(nil)
t2.EXPECT().Run().Return(nil)
t2.EXPECT().Run(ctx).Return(nil)
sig := atomic.NewBool(false)
d.Submit(1, t1)
d.Submit(ctx, 1, t1)
go func() {
// defer t2.done()
d.Submit(2, t2)
d.Submit(ctx, 2, t2)
sig.Store(true)
}()

View File

@ -19,9 +19,9 @@ import (
// MetaWriter is the interface for SyncManager to write segment sync meta.
type MetaWriter interface {
UpdateSync(*SyncTask) error
UpdateSync(context.Context, *SyncTask) error
UpdateSyncV2(*SyncTaskV2) error
DropChannel(string) error
DropChannel(context.Context, string) error
}
type brokerMetaWriter struct {
@ -38,7 +38,7 @@ func BrokerMetaWriter(broker broker.Broker, serverID int64, opts ...retry.Option
}
}
func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error {
var (
checkPoints = []*datapb.CheckPoint{}
deltaFieldBinlogs = []*datapb.FieldBinlog{}
@ -102,8 +102,8 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
Channel: pack.channelName,
SegLevel: pack.level,
}
err := retry.Do(context.Background(), func() error {
err := b.broker.SaveBinlogPaths(context.Background(), req)
err := retry.Handle(ctx, func() (bool, error) {
err := b.broker.SaveBinlogPaths(ctx, req)
// Segment not found during stale segment flush. Segment might get compacted already.
// Stop retry and still proceed to the end, ignoring this error.
if !pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) {
@ -112,19 +112,19 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
return nil
return false, nil
}
// meta error, datanode handles a virtual channel does not belong here
if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrChannelNotFound) {
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", pack.channelName))
return nil
return false, nil
}
if err != nil {
return err
return !merr.IsCanceledOrTimeout(err), err
}
return nil
return false, nil
}, b.opts...)
if err != nil {
log.Warn("failed to SaveBinlogPaths",
@ -214,15 +214,19 @@ func (b *brokerMetaWriter) UpdateSyncV2(pack *SyncTaskV2) error {
return err
}
func (b *brokerMetaWriter) DropChannel(channelName string) error {
err := retry.Do(context.Background(), func() error {
func (b *brokerMetaWriter) DropChannel(ctx context.Context, channelName string) error {
err := retry.Handle(ctx, func() (bool, error) {
status, err := b.broker.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(b.serverID),
),
ChannelName: channelName,
})
return merr.CheckRPCCall(status, err)
err = merr.CheckRPCCall(status, err)
if err != nil {
return !merr.IsCanceledOrTimeout(err), err
}
return false, nil
}, b.opts...)
if err != nil {
log.Warn("failed to DropChannel",

View File

@ -1,6 +1,7 @@
package syncmgr
import (
"context"
"testing"
"github.com/cockroachdb/errors"
@ -34,6 +35,8 @@ func (s *MetaWriterSuite) SetupTest() {
}
func (s *MetaWriterSuite) TestNormalSave() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
@ -44,11 +47,13 @@ func (s *MetaWriterSuite) TestNormalSave() {
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
task := NewSyncTask()
task.WithMetaCache(s.metacache)
err := s.writer.UpdateSync(task)
err := s.writer.UpdateSync(ctx, task)
s.NoError(err)
}
func (s *MetaWriterSuite) TestReturnError() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked"))
bfs := metacache.NewBloomFilterSet()
@ -58,7 +63,7 @@ func (s *MetaWriterSuite) TestReturnError() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTask()
task.WithMetaCache(s.metacache)
err := s.writer.UpdateSync(task)
err := s.writer.UpdateSync(ctx, task)
s.Error(err)
}

View File

@ -2,7 +2,11 @@
package syncmgr
import mock "github.com/stretchr/testify/mock"
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockMetaWriter is an autogenerated mock type for the MetaWriter type
type MockMetaWriter struct {
@ -17,13 +21,13 @@ func (_m *MockMetaWriter) EXPECT() *MockMetaWriter_Expecter {
return &MockMetaWriter_Expecter{mock: &_m.Mock}
}
// DropChannel provides a mock function with given fields: _a0
func (_m *MockMetaWriter) DropChannel(_a0 string) error {
ret := _m.Called(_a0)
// DropChannel provides a mock function with given fields: _a0, _a1
func (_m *MockMetaWriter) DropChannel(_a0 context.Context, _a1 string) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
@ -37,14 +41,15 @@ type MockMetaWriter_DropChannel_Call struct {
}
// DropChannel is a helper method to define mock.On call
// - _a0 string
func (_e *MockMetaWriter_Expecter) DropChannel(_a0 interface{}) *MockMetaWriter_DropChannel_Call {
return &MockMetaWriter_DropChannel_Call{Call: _e.mock.On("DropChannel", _a0)}
// - _a0 context.Context
// - _a1 string
func (_e *MockMetaWriter_Expecter) DropChannel(_a0 interface{}, _a1 interface{}) *MockMetaWriter_DropChannel_Call {
return &MockMetaWriter_DropChannel_Call{Call: _e.mock.On("DropChannel", _a0, _a1)}
}
func (_c *MockMetaWriter_DropChannel_Call) Run(run func(_a0 string)) *MockMetaWriter_DropChannel_Call {
func (_c *MockMetaWriter_DropChannel_Call) Run(run func(_a0 context.Context, _a1 string)) *MockMetaWriter_DropChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
run(args[0].(context.Context), args[1].(string))
})
return _c
}
@ -54,18 +59,18 @@ func (_c *MockMetaWriter_DropChannel_Call) Return(_a0 error) *MockMetaWriter_Dro
return _c
}
func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(string) error) *MockMetaWriter_DropChannel_Call {
func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(context.Context, string) error) *MockMetaWriter_DropChannel_Call {
_c.Call.Return(run)
return _c
}
// UpdateSync provides a mock function with given fields: _a0
func (_m *MockMetaWriter) UpdateSync(_a0 *SyncTask) error {
ret := _m.Called(_a0)
// UpdateSync provides a mock function with given fields: _a0, _a1
func (_m *MockMetaWriter) UpdateSync(_a0 context.Context, _a1 *SyncTask) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(*SyncTask) error); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(context.Context, *SyncTask) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
@ -79,14 +84,15 @@ type MockMetaWriter_UpdateSync_Call struct {
}
// UpdateSync is a helper method to define mock.On call
// - _a0 *SyncTask
func (_e *MockMetaWriter_Expecter) UpdateSync(_a0 interface{}) *MockMetaWriter_UpdateSync_Call {
return &MockMetaWriter_UpdateSync_Call{Call: _e.mock.On("UpdateSync", _a0)}
// - _a0 context.Context
// - _a1 *SyncTask
func (_e *MockMetaWriter_Expecter) UpdateSync(_a0 interface{}, _a1 interface{}) *MockMetaWriter_UpdateSync_Call {
return &MockMetaWriter_UpdateSync_Call{Call: _e.mock.On("UpdateSync", _a0, _a1)}
}
func (_c *MockMetaWriter_UpdateSync_Call) Run(run func(_a0 *SyncTask)) *MockMetaWriter_UpdateSync_Call {
func (_c *MockMetaWriter_UpdateSync_Call) Run(run func(_a0 context.Context, _a1 *SyncTask)) *MockMetaWriter_UpdateSync_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*SyncTask))
run(args[0].(context.Context), args[1].(*SyncTask))
})
return _c
}
@ -96,7 +102,7 @@ func (_c *MockMetaWriter_UpdateSync_Call) Return(_a0 error) *MockMetaWriter_Upda
return _c
}
func (_c *MockMetaWriter_UpdateSync_Call) RunAndReturn(run func(*SyncTask) error) *MockMetaWriter_UpdateSync_Call {
func (_c *MockMetaWriter_UpdateSync_Call) RunAndReturn(run func(context.Context, *SyncTask) error) *MockMetaWriter_UpdateSync_Call {
_c.Call.Return(run)
return _c
}

View File

@ -3,6 +3,8 @@
package syncmgr
import (
context "context"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
mock "github.com/stretchr/testify/mock"
)
@ -137,13 +139,13 @@ func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_Han
return _c
}
// Run provides a mock function with given fields:
func (_m *MockTask) Run() error {
ret := _m.Called()
// Run provides a mock function with given fields: _a0
func (_m *MockTask) Run(_a0 context.Context) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
@ -157,13 +159,14 @@ type MockTask_Run_Call struct {
}
// Run is a helper method to define mock.On call
func (_e *MockTask_Expecter) Run() *MockTask_Run_Call {
return &MockTask_Run_Call{Call: _e.mock.On("Run")}
// - _a0 context.Context
func (_e *MockTask_Expecter) Run(_a0 interface{}) *MockTask_Run_Call {
return &MockTask_Run_Call{Call: _e.mock.On("Run", _a0)}
}
func (_c *MockTask_Run_Call) Run(run func()) *MockTask_Run_Call {
func (_c *MockTask_Run_Call) Run(run func(_a0 context.Context)) *MockTask_Run_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
run(args[0].(context.Context))
})
return _c
}
@ -173,7 +176,7 @@ func (_c *MockTask_Run_Call) Return(_a0 error) *MockTask_Run_Call {
return _c
}
func (_c *MockTask_Run_Call) RunAndReturn(run func() error) *MockTask_Run_Call {
func (_c *MockTask_Run_Call) RunAndReturn(run func(context.Context) error) *MockTask_Run_Call {
_c.Call.Return(run)
return _c
}

View File

@ -105,19 +105,19 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...fu
t.WithAllocator(mgr.allocator)
}
return mgr.safeSubmitTask(task, callbacks...)
return mgr.safeSubmitTask(ctx, task, callbacks...)
}
// safeSubmitTask submits task to SyncManager
func (mgr *syncManager) safeSubmitTask(task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (mgr *syncManager) safeSubmitTask(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
mgr.tasks.Insert(taskKey, task)
key := task.SegmentID()
return mgr.submit(key, task, callbacks...)
return mgr.submit(ctx, key, task, callbacks...)
}
func (mgr *syncManager) submit(key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
handler := func(err error) error {
if err == nil {
return nil
@ -127,5 +127,5 @@ func (mgr *syncManager) submit(key int64, task Task, callbacks ...func(error) er
}
callbacks = append([]func(error) error{handler}, callbacks...)
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))
return mgr.Submit(key, task, callbacks...)
return mgr.Submit(ctx, key, task, callbacks...)
}

View File

@ -268,7 +268,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().Run().Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().Run(mock.Anything).Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)
f := manager.SyncData(context.Background(), task)
@ -283,7 +283,7 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().Run().Return(errors.New("mock err")).Once()
task.EXPECT().Run(mock.Anything).Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)
f := manager.SyncData(context.Background(), task)

View File

@ -113,7 +113,7 @@ func (t *SyncTask) HandleError(err error) {
}
}
func (t *SyncTask) Run() (err error) {
func (t *SyncTask) Run(ctx context.Context) (err error) {
t.tr = timerecord.NewTimeRecorder("syncTask")
log := t.getLogger()
@ -145,7 +145,7 @@ func (t *SyncTask) Run() (err error) {
t.processStatsBlob()
t.processDeltaBlob()
err = t.writeLogs()
err = t.writeLogs(ctx)
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
return err
@ -164,7 +164,7 @@ func (t *SyncTask) Run() (err error) {
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))
if t.metaWriter != nil {
err = t.writeMeta()
err = t.writeMeta(ctx)
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
return err
@ -312,15 +312,19 @@ func (t *SyncTask) appendDeltalog(deltalog *datapb.Binlog) {
}
// writeLogs writes log files (binlog/deltalog/statslog) into storage via chunkManger.
func (t *SyncTask) writeLogs() error {
return retry.Do(context.Background(), func() error {
return t.chunkManager.MultiWrite(context.Background(), t.segmentData)
func (t *SyncTask) writeLogs(ctx context.Context) error {
return retry.Handle(ctx, func() (bool, error) {
err := t.chunkManager.MultiWrite(ctx, t.segmentData)
if err != nil {
return !merr.IsCanceledOrTimeout(err), err
}
return false, nil
}, t.writeRetryOpts...)
}
// writeMeta updates segments via meta writer in option.
func (t *SyncTask) writeMeta() error {
return t.metaWriter.UpdateSync(t)
func (t *SyncTask) writeMeta(ctx context.Context) error {
return t.metaWriter.UpdateSync(ctx, t)
}
func (t *SyncTask) SegmentID() int64 {

View File

@ -17,6 +17,7 @@
package syncmgr
import (
"context"
"fmt"
"math/rand"
"testing"
@ -164,6 +165,8 @@ func (s *SyncTaskSuite) getSuiteSyncTask() *SyncTask {
}
func (s *SyncTaskSuite) TestRunNormal() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{
@ -198,7 +201,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
Timestamp: 100,
})
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
@ -216,7 +219,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
Value: []byte("test_data"),
}
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
@ -239,7 +242,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
Value: []byte("test_data"),
}
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
@ -259,12 +262,14 @@ func (s *SyncTaskSuite) TestRunNormal() {
Value: []byte("test_data"),
}
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
}
func (s *SyncTaskSuite) TestRunL0Segment() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs)
@ -287,19 +292,21 @@ func (s *SyncTaskSuite) TestRunL0Segment() {
})
task.WithFlush()
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
}
func (s *SyncTaskSuite) TestRunError() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("segment_not_found", func() {
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false)
flag := false
handler := func(_ error) { flag = true }
task := s.getSuiteSyncTask().WithFailureCallback(handler)
err := task.Run()
err := task.Run(ctx)
s.Error(err)
s.True(flag)
@ -318,7 +325,7 @@ func (s *SyncTaskSuite) TestRunError() {
task := s.getSuiteSyncTask()
task.allocator = mockAllocator
err := task.Run()
err := task.Run(ctx)
s.Error(err)
})
@ -334,7 +341,7 @@ func (s *SyncTaskSuite) TestRunError() {
Timestamp: 100,
})
err := task.Run()
err := task.Run(ctx)
s.Error(err)
})
@ -352,7 +359,7 @@ func (s *SyncTaskSuite) TestRunError() {
task.WithWriteRetryOptions(retry.Attempts(1))
err := task.Run()
err := task.Run(ctx)
s.Error(err)
s.True(flag)

View File

@ -65,7 +65,7 @@ func (t *SyncTaskV2) handleError(err error) {
}
}
func (t *SyncTaskV2) Run() error {
func (t *SyncTaskV2) Run(ctx context.Context) error {
log := t.getLogger()
var err error

View File

@ -196,6 +196,8 @@ func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 {
}
func (s *SyncTaskSuiteV2) TestRunNormal() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{
@ -229,7 +231,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() {
Timestamp: 100,
})
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
@ -243,7 +245,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() {
Timestamp: 100,
})
err := task.Run()
err := task.Run(ctx)
s.NoError(err)
})
}

View File

@ -242,7 +242,7 @@ func (m *bufferManager) RemoveChannel(channel string) {
return
}
buf.Close(false)
buf.Close(context.Background(), false)
}
// DropChannel removes channel WriteBuffer and process `DropChannel`
@ -258,7 +258,7 @@ func (m *bufferManager) DropChannel(channel string) {
return
}
buf.Close(true)
buf.Close(context.Background(), true)
}
func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {

View File

@ -240,7 +240,6 @@ func (s *ManagerSuite) TestMemoryCheck() {
}
return int64(float64(memoryLimit) * 0.6)
})
//.Return(int64(float64(memoryLimit) * 0.6))
wb.EXPECT().EvictBuffer(mock.Anything).Run(func(polices ...SyncPolicy) {
select {
case signal <- struct{}{}:

View File

@ -69,9 +69,9 @@ func (_c *MockWriteBuffer_BufferData_Call) RunAndReturn(run func([]*msgstream.In
return _c
}
// Close provides a mock function with given fields: drop
func (_m *MockWriteBuffer) Close(drop bool) {
_m.Called(drop)
// Close provides a mock function with given fields: ctx, drop
func (_m *MockWriteBuffer) Close(ctx context.Context, drop bool) {
_m.Called(ctx, drop)
}
// MockWriteBuffer_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
@ -80,14 +80,15 @@ type MockWriteBuffer_Close_Call struct {
}
// Close is a helper method to define mock.On call
// - ctx context.Context
// - drop bool
func (_e *MockWriteBuffer_Expecter) Close(drop interface{}) *MockWriteBuffer_Close_Call {
return &MockWriteBuffer_Close_Call{Call: _e.mock.On("Close", drop)}
func (_e *MockWriteBuffer_Expecter) Close(ctx interface{}, drop interface{}) *MockWriteBuffer_Close_Call {
return &MockWriteBuffer_Close_Call{Call: _e.mock.On("Close", ctx, drop)}
}
func (_c *MockWriteBuffer_Close_Call) Run(run func(drop bool)) *MockWriteBuffer_Close_Call {
func (_c *MockWriteBuffer_Close_Call) Run(run func(ctx context.Context, drop bool)) *MockWriteBuffer_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool))
run(args[0].(context.Context), args[1].(bool))
})
return _c
}
@ -97,7 +98,7 @@ func (_c *MockWriteBuffer_Close_Call) Return() *MockWriteBuffer_Close_Call {
return _c
}
func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(bool)) *MockWriteBuffer_Close_Call {
func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(context.Context, bool)) *MockWriteBuffer_Close_Call {
_c.Call.Return(run)
return _c
}

View File

@ -55,7 +55,7 @@ type WriteBuffer interface {
// EvictBuffer evicts buffer to sync manager which match provided sync policies.
EvictBuffer(policies ...SyncPolicy)
// Close is the method to close and sink current buffer data.
Close(drop bool)
Close(ctx context.Context, drop bool)
}
type checkpointCandidate struct {
@ -631,7 +631,7 @@ func (wb *writeBufferBase) getEstBatchSize() uint {
return uint(sizeLimit / int64(wb.estSizePerRecord))
}
func (wb *writeBufferBase) Close(drop bool) {
func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
log := wb.logger
// sink all data and call Drop for meta writer
wb.mut.Lock()
@ -642,7 +642,7 @@ func (wb *writeBufferBase) Close(drop bool) {
var futures []*conc.Future[struct{}]
for id := range wb.buffers {
syncTask, err := wb.getSyncTask(context.Background(), id)
syncTask, err := wb.getSyncTask(ctx, id)
if err != nil {
// TODO
continue
@ -654,7 +654,7 @@ func (wb *writeBufferBase) Close(drop bool) {
t.WithDrop()
}
f := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error {
f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
@ -672,7 +672,7 @@ func (wb *writeBufferBase) Close(drop bool) {
// TODO change to remove channel in the future
panic(err)
}
err = wb.metaWriter.DropChannel(wb.channelName)
err = wb.metaWriter.DropChannel(ctx, wb.channelName)
if err != nil {
log.Error("failed to drop channel", zap.Error(err))
// TODO change to remove channel in the future