mirror of https://github.com/milvus-io/milvus.git
enhance: Use `struct{}` for sync task future result (#32673)
Related to #27675 Use `struct{}` instead `error` for sync task future result type to reduce result size and preventing logci error. Also change some unused parameter to `_` to suppress lint warning Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/32688/head
parent
09cd56d44f
commit
2c1e8f4774
|
@ -109,7 +109,7 @@ func (wNode *writeNode) Operate(in []Msg) []Msg {
|
|||
}
|
||||
|
||||
func newWriteNode(
|
||||
ctx context.Context,
|
||||
_ context.Context,
|
||||
writeBufferManager writebuffer.BufferManager,
|
||||
updater statsUpdater,
|
||||
config *nodeConfig,
|
||||
|
|
|
@ -281,7 +281,7 @@ func (s *scheduler) Import(task Task) []*conc.Future[any] {
|
|||
|
||||
func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error {
|
||||
iTask := task.(*ImportTask)
|
||||
syncFutures := make([]*conc.Future[error], 0)
|
||||
syncFutures := make([]*conc.Future[struct{}], 0)
|
||||
syncTasks := make([]syncmgr.Task, 0)
|
||||
for {
|
||||
data, err := reader.Read()
|
||||
|
@ -321,9 +321,9 @@ func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[error], []syncmgr.Task, error) {
|
||||
func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
|
||||
log.Info("start to sync import data", WrapLogFields(task)...)
|
||||
futures := make([]*conc.Future[error], 0)
|
||||
futures := make([]*conc.Future[struct{}], 0)
|
||||
syncTasks := make([]syncmgr.Task, 0)
|
||||
segmentImportedSizes := make(map[int64]int)
|
||||
for channelIdx, datas := range hashedData {
|
||||
|
|
|
@ -357,9 +357,9 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
|
|||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||
s.scheduler.cm = cm
|
||||
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] {
|
||||
future := conc.Go(func() (error, error) {
|
||||
return nil, nil
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
||||
future := conc.Go(func() (struct{}, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
return future
|
||||
})
|
||||
|
@ -418,9 +418,9 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
|
|||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||
s.scheduler.cm = cm
|
||||
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] {
|
||||
future := conc.Go(func() (error, error) {
|
||||
return errors.New("mock err"), errors.New("mock err")
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
||||
future := conc.Go(func() (struct{}, error) {
|
||||
return struct{}{}, errors.New("mock err")
|
||||
})
|
||||
return future
|
||||
})
|
||||
|
@ -494,9 +494,9 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
|
|||
}
|
||||
|
||||
func (s *SchedulerSuite) TestScheduler_ImportFile() {
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[error] {
|
||||
future := conc.Go(func() (error, error) {
|
||||
return nil, nil
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] {
|
||||
future := conc.Go(func() (struct{}, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
return future
|
||||
})
|
||||
|
|
|
@ -65,7 +65,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (node *DataNode) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
// TODO(dragondriver): add more metrics
|
||||
usedMem := hardware.GetUsedMemoryCount()
|
||||
totalMem := hardware.GetMemoryCount()
|
||||
|
|
|
@ -132,6 +132,38 @@ func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) RunAndReturn(run func()) *M
|
|||
return _c
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields:
|
||||
func (_m *MockFlowgraphManager) Close() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// MockFlowgraphManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
|
||||
type MockFlowgraphManager_Close_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Close is a helper method to define mock.On call
|
||||
func (_e *MockFlowgraphManager_Expecter) Close() *MockFlowgraphManager_Close_Call {
|
||||
return &MockFlowgraphManager_Close_Call{Call: _e.mock.On("Close")}
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_Close_Call) Run(run func()) *MockFlowgraphManager_Close_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_Close_Call) Return() *MockFlowgraphManager_Close_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgraphManager_Close_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetCollectionIDs provides a mock function with given fields:
|
||||
func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 {
|
||||
ret := _m.Called()
|
||||
|
|
|
@ -113,15 +113,15 @@ func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string
|
|||
}
|
||||
|
||||
// SyncData provides a mock function with given fields: ctx, task
|
||||
func (_m *MockSyncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] {
|
||||
func (_m *MockSyncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] {
|
||||
ret := _m.Called(ctx, task)
|
||||
|
||||
var r0 *conc.Future[error]
|
||||
if rf, ok := ret.Get(0).(func(context.Context, Task) *conc.Future[error]); ok {
|
||||
var r0 *conc.Future[struct{}]
|
||||
if rf, ok := ret.Get(0).(func(context.Context, Task) *conc.Future[struct{}]); ok {
|
||||
r0 = rf(ctx, task)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*conc.Future[error])
|
||||
r0 = ret.Get(0).(*conc.Future[struct{}])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,12 +147,12 @@ func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[error]) *MockSyncManager_SyncData_Call {
|
||||
func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}]) *MockSyncManager_SyncData_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task) *conc.Future[error]) *MockSyncManager_SyncData_Call {
|
||||
func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
|
|
@ -158,6 +158,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) {
|
|||
}
|
||||
|
||||
func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) {
|
||||
log := log.Ctx(ctx)
|
||||
blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -44,7 +44,7 @@ type SyncMeta struct {
|
|||
// it processes the sync tasks inside and changes the meta.
|
||||
type SyncManager interface {
|
||||
// SyncData is the method to submit sync task.
|
||||
SyncData(ctx context.Context, task Task) *conc.Future[error]
|
||||
SyncData(ctx context.Context, task Task) *conc.Future[struct{}]
|
||||
// GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel.
|
||||
GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition)
|
||||
// Block allows caller to block tasks of provided segment id.
|
||||
|
@ -104,7 +104,7 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] {
|
||||
func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] {
|
||||
switch t := task.(type) {
|
||||
case *SyncTask:
|
||||
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
|
||||
|
@ -118,16 +118,16 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[er
|
|||
// safeSubmitTask handles submitting task logic with optimistic target check logic
|
||||
// when task returns errTargetSegmentNotMatch error
|
||||
// perform refetch then retry logic
|
||||
func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[error] {
|
||||
func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[struct{}] {
|
||||
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
|
||||
mgr.tasks.Insert(taskKey, task)
|
||||
|
||||
return conc.Go[error](func() (error, error) {
|
||||
return conc.Go(func() (struct{}, error) {
|
||||
defer mgr.tasks.Remove(taskKey)
|
||||
for {
|
||||
targetID, err := task.CalcTargetSegment()
|
||||
if err != nil {
|
||||
return err, err
|
||||
return struct{}{}, err
|
||||
}
|
||||
log.Info("task calculated target segment id",
|
||||
zap.Int64("targetID", targetID),
|
||||
|
@ -142,7 +142,7 @@ func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[error] {
|
|||
log.Info("target updated during submitting", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
return err, err
|
||||
return struct{}{}, err
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -171,9 +171,8 @@ func (s *SyncManagerSuite) TestSubmit() {
|
|||
f := manager.SyncData(context.Background(), task)
|
||||
s.NotNil(f)
|
||||
|
||||
r, err := f.Await()
|
||||
_, err = f.Await()
|
||||
s.NoError(err)
|
||||
s.NoError(r)
|
||||
}
|
||||
|
||||
func (s *SyncManagerSuite) TestCompacted() {
|
||||
|
@ -203,9 +202,8 @@ func (s *SyncManagerSuite) TestCompacted() {
|
|||
f := manager.SyncData(context.Background(), task)
|
||||
s.NotNil(f)
|
||||
|
||||
r, err := f.Await()
|
||||
_, err = f.Await()
|
||||
s.NoError(err)
|
||||
s.NoError(r)
|
||||
s.EqualValues(1001, segmentID.Load())
|
||||
}
|
||||
|
||||
|
@ -321,7 +319,7 @@ func (s *SyncManagerSuite) TestTargetUpdated() {
|
|||
task.EXPECT().Run().Return(nil).Once()
|
||||
|
||||
f := manager.SyncData(context.Background(), task)
|
||||
err, _ = f.Await()
|
||||
_, err = f.Await()
|
||||
s.NoError(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -289,7 +289,7 @@ func (wb *writeBufferBase) cleanupCompactedSegments() {
|
|||
}
|
||||
}
|
||||
|
||||
func (wb *writeBufferBase) sealSegments(ctx context.Context, segmentIDs []int64) error {
|
||||
func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) error {
|
||||
// mark segment flushing if segment was growing
|
||||
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Sealed),
|
||||
metacache.WithSegmentIDs(segmentIDs...),
|
||||
|
@ -297,9 +297,9 @@ func (wb *writeBufferBase) sealSegments(ctx context.Context, segmentIDs []int64)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[error] {
|
||||
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] {
|
||||
log := log.Ctx(ctx)
|
||||
result := make([]*conc.Future[error], 0, len(segmentIDs))
|
||||
result := make([]*conc.Future[struct{}], 0, len(segmentIDs))
|
||||
for _, segmentID := range segmentIDs {
|
||||
syncTask, err := wb.getSyncTask(ctx, segmentID)
|
||||
if err != nil {
|
||||
|
@ -563,7 +563,7 @@ func (wb *writeBufferBase) Close(drop bool) {
|
|||
return
|
||||
}
|
||||
|
||||
var futures []*conc.Future[error]
|
||||
var futures []*conc.Future[struct{}]
|
||||
for id := range wb.buffers {
|
||||
syncTask, err := wb.getSyncTask(context.Background(), id)
|
||||
if err != nil {
|
||||
|
|
|
@ -357,8 +357,8 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
|
|||
s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true)
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[error](func() (error, error) {
|
||||
return nil, nil
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
|
||||
return struct{}{}, nil
|
||||
}))
|
||||
defer func() {
|
||||
s.wb.mut.Lock()
|
||||
|
|
Loading…
Reference in New Issue