diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4fdb0f8a5b..e4f43dc3c6 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -478,9 +478,8 @@ dataNode: # suggest to set it bigger on large collection numbers to avoid blocking workPoolSize: -1 # specify the size of global work pool for channel checkpoint updating - # if this parameter <= 0, will set it as 1000 - # suggest to set it bigger on large collection numbers to avoid blocking - updateChannelCheckpointMaxParallel: 1000 + # if this parameter <= 0, will set it as 10 + updateChannelCheckpointMaxParallel: 10 # Configures the system log output. log: diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 51730eb4d8..cf1c18a855 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -343,7 +343,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta catalog: catalog, collections: nil, segments: nil, - channelCPs: nil, + channelCPs: newChannelCps(), chunkManager: nil, indexMeta: &indexMeta{ catalog: catalog, @@ -787,13 +787,14 @@ func TestGarbageCollector_clearETCD(t *testing.T) { mock.Anything, ).Return(nil) + channelCPs := newChannelCps() + channelCPs.checkpoints["dmlChannel"] = &msgpb.MsgPosition{ + Timestamp: 1000, + } + m := &meta{ - catalog: catalog, - channelCPs: map[string]*msgpb.MsgPosition{ - "dmlChannel": { - Timestamp: 1000, - }, - }, + catalog: catalog, + channelCPs: channelCPs, segments: &SegmentsInfo{ segments: map[UniqueID]*SegmentInfo{ segID: { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index ddde6832f0..7291132cf7 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -53,14 +53,25 @@ type meta struct { sync.RWMutex ctx context.Context catalog metastore.DataCoordCatalog - collections map[UniqueID]*collectionInfo // collection id to collection info - segments *SegmentsInfo // segment id to segment info - channelCPs map[string]*msgpb.MsgPosition // vChannel -> channel checkpoint/see position + collections map[UniqueID]*collectionInfo // collection id to collection info + segments *SegmentsInfo // segment id to segment info + channelCPs *channelCPs // vChannel -> channel checkpoint/see position chunkManager storage.ChunkManager indexMeta *indexMeta } +type channelCPs struct { + sync.RWMutex + checkpoints map[string]*msgpb.MsgPosition +} + +func newChannelCps() *channelCPs { + return &channelCPs{ + checkpoints: make(map[string]*msgpb.MsgPosition), + } +} + // A local cache of segment metric update. Must call commit() to take effect. type segMetricMutation struct { stateChange map[string]int // segment state -> state change count (to increase or decrease). @@ -89,7 +100,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag catalog: catalog, collections: make(map[UniqueID]*collectionInfo), segments: NewSegmentsInfo(), - channelCPs: make(map[string]*msgpb.MsgPosition), + channelCPs: newChannelCps(), chunkManager: chunkManager, indexMeta: indexMeta, } @@ -148,7 +159,7 @@ func (m *meta) reloadFromKV() error { for vChannel, pos := range channelCPs { // for 2.2.2 issue https://github.com/milvus-io/milvus/issues/22181 pos.ChannelName = vChannel - m.channelCPs[vChannel] = pos + m.channelCPs.checkpoints[vChannel] = pos } log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan())) return nil @@ -1298,16 +1309,16 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel) } - m.Lock() - defer m.Unlock() + m.channelCPs.Lock() + defer m.channelCPs.Unlock() - oldPosition, ok := m.channelCPs[vChannel] + oldPosition, ok := m.channelCPs.checkpoints[vChannel] if !ok || oldPosition.Timestamp < pos.Timestamp { err := m.catalog.SaveChannelCheckpoint(m.ctx, vChannel, pos) if err != nil { return err } - m.channelCPs[vChannel] = pos + m.channelCPs.checkpoints[vChannel] = pos ts, _ := tsoutil.ParseTS(pos.Timestamp) log.Info("UpdateChannelCheckpoint done", zap.String("vChannel", vChannel), @@ -1320,23 +1331,53 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) return nil } +// UpdateChannelCheckpoints updates and saves channel checkpoints. +func (m *meta) UpdateChannelCheckpoints(positions []*msgpb.MsgPosition) error { + m.channelCPs.Lock() + defer m.channelCPs.Unlock() + toUpdates := lo.Filter(positions, func(pos *msgpb.MsgPosition, _ int) bool { + if pos == nil || pos.GetMsgID() == nil || pos.GetChannelName() == "" { + log.Warn("illegal channel cp", zap.Any("pos", pos)) + return false + } + vChannel := pos.GetChannelName() + oldPosition, ok := m.channelCPs.checkpoints[vChannel] + return !ok || oldPosition.Timestamp < pos.Timestamp + }) + err := m.catalog.SaveChannelCheckpoints(m.ctx, toUpdates) + if err != nil { + return err + } + for _, pos := range toUpdates { + channel := pos.GetChannelName() + m.channelCPs.checkpoints[channel] = pos + log.Info("UpdateChannelCheckpoint done", zap.String("channel", channel), + zap.Uint64("ts", pos.GetTimestamp()), + zap.Time("time", tsoutil.PhysicalTime(pos.GetTimestamp()))) + ts, _ := tsoutil.ParseTS(pos.Timestamp) + metrics.DataCoordCheckpointUnixSeconds.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel).Set(float64(ts.Unix())) + } + return nil +} + func (m *meta) GetChannelCheckpoint(vChannel string) *msgpb.MsgPosition { - m.RLock() - defer m.RUnlock() - if m.channelCPs[vChannel] == nil { + m.channelCPs.RLock() + defer m.channelCPs.RUnlock() + cp, ok := m.channelCPs.checkpoints[vChannel] + if !ok { return nil } - return proto.Clone(m.channelCPs[vChannel]).(*msgpb.MsgPosition) + return proto.Clone(cp).(*msgpb.MsgPosition) } func (m *meta) DropChannelCheckpoint(vChannel string) error { - m.Lock() - defer m.Unlock() + m.channelCPs.Lock() + defer m.channelCPs.Unlock() err := m.catalog.DropChannelCheckpoint(m.ctx, vChannel) if err != nil { return err } - delete(m.channelCPs, vChannel) + delete(m.channelCPs.checkpoints, vChannel) log.Debug("DropChannelCheckpoint done", zap.String("vChannel", vChannel)) return nil } diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 56ff181420..577f369895 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" + mocks2 "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -46,12 +47,12 @@ import ( type MetaReloadSuite struct { testutils.PromMetricsSuite - catalog *mocks.DataCoordCatalog + catalog *mocks2.DataCoordCatalog meta *meta } func (suite *MetaReloadSuite) SetupTest() { - catalog := mocks.NewDataCoordCatalog(suite.T()) + catalog := mocks2.NewDataCoordCatalog(suite.T()) suite.catalog = catalog } @@ -952,6 +953,22 @@ func TestChannelCP(t *testing.T) { assert.NoError(t, err) }) + t.Run("UpdateChannelCheckpoints", func(t *testing.T) { + meta, err := newMemoryMeta() + assert.NoError(t, err) + assert.Equal(t, 0, len(meta.channelCPs.checkpoints)) + + err = meta.UpdateChannelCheckpoints(nil) + assert.NoError(t, err) + assert.Equal(t, 0, len(meta.channelCPs.checkpoints)) + + err = meta.UpdateChannelCheckpoints([]*msgpb.MsgPosition{pos, { + ChannelName: "", + }}) + assert.NoError(t, err) + assert.Equal(t, 1, len(meta.channelCPs.checkpoints)) + }) + t.Run("GetChannelCheckpoint", func(t *testing.T) { meta, err := newMemoryMeta() assert.NoError(t, err) @@ -983,7 +1000,7 @@ func TestChannelCP(t *testing.T) { func Test_meta_GcConfirm(t *testing.T) { m := &meta{} - catalog := mocks.NewDataCoordCatalog(t) + catalog := mocks2.NewDataCoordCatalog(t) m.catalog = catalog catalog.On("GcConfirm", diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index f2e3a017d2..7ae22303ca 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + mocks2 "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -389,7 +390,7 @@ func TestFlushForImport(t *testing.T) { allocation, err = svr.segmentManager.allocSegmentForImport( context.TODO(), 0, 1, "ch-1", 1, 1) assert.NoError(t, err) - catalog := mocks.NewDataCoordCatalog(t) + catalog := mocks2.NewDataCoordCatalog(t) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(errors.New("mock err")) svr.meta.catalog = catalog req.SegmentIDs = []UniqueID{allocation.SegmentID} @@ -3714,10 +3715,10 @@ func TestGetFlushAllState(t *testing.T) { }, nil).Maybe() } - svr.meta.channelCPs = make(map[string]*msgpb.MsgPosition) + svr.meta.channelCPs = newChannelCps() for i, ts := range test.ChannelCPs { channel := vchannels[i] - svr.meta.channelCPs[channel] = &msgpb.MsgPosition{ + svr.meta.channelCPs.checkpoints[channel] = &msgpb.MsgPosition{ ChannelName: channel, Timestamp: ts, } @@ -3790,11 +3791,11 @@ func TestGetFlushAllStateWithDB(t *testing.T) { CollectionName: collectionName, }, nil).Maybe() - svr.meta.channelCPs = make(map[string]*msgpb.MsgPosition) + svr.meta.channelCPs = newChannelCps() channelCPs := []Timestamp{100, 200} for i, ts := range channelCPs { channel := vchannels[i] - svr.meta.channelCPs[channel] = &msgpb.MsgPosition{ + svr.meta.channelCPs.checkpoints[channel] = &msgpb.MsgPosition{ ChannelName: channel, Timestamp: ts, } @@ -4189,10 +4190,21 @@ func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode) - req.Position = nil + req = &datapb.UpdateChannelCheckpointRequest{ + Base: &commonpb.MsgBase{ + SourceID: paramtable.GetNodeID(), + }, + VChannel: mockVChannel, + ChannelCheckpoints: []*msgpb.MsgPosition{{ + ChannelName: mockPChannel, + Timestamp: 1000, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }}, + } + resp, err = svr.UpdateChannelCheckpoint(context.TODO(), req) assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 029f6f4925..b06f2969f9 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1381,9 +1381,19 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update return merr.Status(err), nil } - err := s.meta.UpdateChannelCheckpoint(req.GetVChannel(), req.GetPosition()) + // For compatibility with old client + if req.GetVChannel() != "" && req.GetPosition() != nil { + err := s.meta.UpdateChannelCheckpoint(req.GetVChannel(), req.GetPosition()) + if err != nil { + log.Warn("failed to UpdateChannelCheckpoint", zap.String("vChannel", req.GetVChannel()), zap.Error(err)) + return merr.Status(err), nil + } + return merr.Success(), nil + } + + err := s.meta.UpdateChannelCheckpoints(req.GetChannelCheckpoints()) if err != nil { - log.Warn("failed to UpdateChannelCheckpoint", zap.String("vChannel", req.GetVChannel()), zap.Error(err)) + log.Warn("failed to update channel checkpoint", zap.Error(err)) return merr.Status(err), nil } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index e398b213d5..e793de9359 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -11,8 +11,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" diff --git a/internal/datanode/broker/broker.go b/internal/datanode/broker/broker.go index 234d62dd7b..40db912519 100644 --- a/internal/datanode/broker/broker.go +++ b/internal/datanode/broker/broker.go @@ -46,7 +46,7 @@ type DataCoord interface { AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]typeutil.UniqueID, error) ReportTimeTick(ctx context.Context, msgs []*msgpb.DataNodeTtMsg) error GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) - UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error + UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error diff --git a/internal/datanode/broker/datacoord.go b/internal/datanode/broker/datacoord.go index 4290b543e3..df4c23fd87 100644 --- a/internal/datanode/broker/datacoord.go +++ b/internal/datanode/broker/datacoord.go @@ -2,6 +2,7 @@ package broker import ( "context" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -82,24 +83,24 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, segmentIDs []int6 return infoResp.Infos, nil } -func (dc *dataCoordBroker) UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error { - channelCPTs, _ := tsoutil.ParseTS(cp.GetTimestamp()) - log := log.Ctx(ctx).With( - zap.String("channelName", channelName), - zap.Time("channelCheckpointTime", channelCPTs), - ) - +func (dc *dataCoordBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error { req := &datapb.UpdateChannelCheckpointRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithSourceID(paramtable.GetNodeID()), ), - VChannel: channelName, - Position: cp, + ChannelCheckpoints: channelCPs, } resp, err := dc.client.UpdateChannelCheckpoint(ctx, req) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to update channel checkpoint", zap.Error(err)) + if err = merr.CheckRPCCall(resp, err); err != nil { + channels := lo.Map(channelCPs, func(pos *msgpb.MsgPosition, _ int) string { + return pos.GetChannelName() + }) + channelTimes := lo.Map(channelCPs, func(pos *msgpb.MsgPosition, _ int) time.Time { + return tsoutil.PhysicalTime(pos.GetTimestamp()) + }) + log.Warn("failed to update channel checkpoint", zap.Strings("channelNames", channels), + zap.Times("channelCheckpointTimes", channelTimes), zap.Error(err)) return err } return nil diff --git a/internal/datanode/broker/datacoord_test.go b/internal/datanode/broker/datacoord_test.go index b4564aba38..e6e2e2644c 100644 --- a/internal/datanode/broker/datacoord_test.go +++ b/internal/datanode/broker/datacoord_test.go @@ -178,15 +178,14 @@ func (s *dataCoordSuite) TestUpdateChannelCheckpoint() { s.Run("normal_case", func() { s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything). Run(func(_ context.Context, req *datapb.UpdateChannelCheckpointRequest, _ ...grpc.CallOption) { - s.Equal(channelName, req.GetVChannel()) - cp := req.GetPosition() + cp := req.GetChannelCheckpoints()[0] s.Equal(checkpoint.MsgID, cp.GetMsgID()) s.Equal(checkpoint.ChannelName, cp.GetChannelName()) s.Equal(checkpoint.Timestamp, cp.GetTimestamp()) }). Return(merr.Status(nil), nil) - err := s.broker.UpdateChannelCheckpoint(ctx, channelName, checkpoint) + err := s.broker.UpdateChannelCheckpoint(ctx, []*msgpb.MsgPosition{checkpoint}) s.NoError(err) s.resetMock() }) @@ -195,7 +194,7 @@ func (s *dataCoordSuite) TestUpdateChannelCheckpoint() { s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything). Return(nil, errors.New("mock")) - err := s.broker.UpdateChannelCheckpoint(ctx, channelName, checkpoint) + err := s.broker.UpdateChannelCheckpoint(ctx, []*msgpb.MsgPosition{checkpoint}) s.Error(err) s.resetMock() }) @@ -204,7 +203,7 @@ func (s *dataCoordSuite) TestUpdateChannelCheckpoint() { s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything). Return(merr.Status(errors.New("mock")), nil) - err := s.broker.UpdateChannelCheckpoint(ctx, channelName, checkpoint) + err := s.broker.UpdateChannelCheckpoint(ctx, []*msgpb.MsgPosition{checkpoint}) s.Error(err) s.resetMock() }) diff --git a/internal/datanode/broker/mock_broker.go b/internal/datanode/broker/mock_broker.go index 894380acdd..7865756463 100644 --- a/internal/datanode/broker/mock_broker.go +++ b/internal/datanode/broker/mock_broker.go @@ -65,8 +65,8 @@ type MockBroker_AllocTimestamp_Call struct { } // AllocTimestamp is a helper method to define mock.On call -// - ctx context.Context -// - num uint32 +// - ctx context.Context +// - num uint32 func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call { return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)} } @@ -127,8 +127,8 @@ type MockBroker_AssignSegmentID_Call struct { } // AssignSegmentID is a helper method to define mock.On call -// - ctx context.Context -// - reqs ...*datapb.SegmentIDRequest +// - ctx context.Context +// - reqs ...*datapb.SegmentIDRequest func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call { return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID", append([]interface{}{ctx}, reqs...)...)} @@ -189,9 +189,9 @@ type MockBroker_DescribeCollection_Call struct { } // DescribeCollection is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - ts uint64 +// - ctx context.Context +// - collectionID int64 +// - ts uint64 func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call { return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)} } @@ -245,8 +245,8 @@ type MockBroker_DropVirtualChannel_Call struct { } // DropVirtualChannel is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.DropVirtualChannelRequest +// - ctx context.Context +// - req *datapb.DropVirtualChannelRequest func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call { return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)} } @@ -300,8 +300,8 @@ type MockBroker_GetSegmentInfo_Call struct { } // GetSegmentInfo is a helper method to define mock.On call -// - ctx context.Context -// - segmentIDs []int64 +// - ctx context.Context +// - segmentIDs []int64 func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call { return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)} } @@ -343,8 +343,8 @@ type MockBroker_ReportImport_Call struct { } // ReportImport is a helper method to define mock.On call -// - ctx context.Context -// - req *rootcoordpb.ImportResult +// - ctx context.Context +// - req *rootcoordpb.ImportResult func (_e *MockBroker_Expecter) ReportImport(ctx interface{}, req interface{}) *MockBroker_ReportImport_Call { return &MockBroker_ReportImport_Call{Call: _e.mock.On("ReportImport", ctx, req)} } @@ -386,8 +386,8 @@ type MockBroker_ReportTimeTick_Call struct { } // ReportTimeTick is a helper method to define mock.On call -// - ctx context.Context -// - msgs []*msgpb.DataNodeTtMsg +// - ctx context.Context +// - msgs []*msgpb.DataNodeTtMsg func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call { return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)} } @@ -429,8 +429,8 @@ type MockBroker_SaveBinlogPaths_Call struct { } // SaveBinlogPaths is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.SaveBinlogPathsRequest +// - ctx context.Context +// - req *datapb.SaveBinlogPathsRequest func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call { return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)} } @@ -472,8 +472,8 @@ type MockBroker_SaveImportSegment_Call struct { } // SaveImportSegment is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.SaveImportSegmentRequest +// - ctx context.Context +// - req *datapb.SaveImportSegmentRequest func (_e *MockBroker_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *MockBroker_SaveImportSegment_Call { return &MockBroker_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)} } @@ -527,9 +527,9 @@ type MockBroker_ShowPartitions_Call struct { } // ShowPartitions is a helper method to define mock.On call -// - ctx context.Context -// - dbName string -// - collectionName string +// - ctx context.Context +// - dbName string +// - collectionName string func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call { return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)} } @@ -551,13 +551,13 @@ func (_c *MockBroker_ShowPartitions_Call) RunAndReturn(run func(context.Context, return _c } -// UpdateChannelCheckpoint provides a mock function with given fields: ctx, channelName, cp -func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error { - ret := _m.Called(ctx, channelName, cp) +// UpdateChannelCheckpoint provides a mock function with given fields: ctx, channelCPs +func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error { + ret := _m.Called(ctx, channelCPs) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, *msgpb.MsgPosition) error); ok { - r0 = rf(ctx, channelName, cp) + if rf, ok := ret.Get(0).(func(context.Context, []*msgpb.MsgPosition) error); ok { + r0 = rf(ctx, channelCPs) } else { r0 = ret.Error(0) } @@ -571,16 +571,15 @@ type MockBroker_UpdateChannelCheckpoint_Call struct { } // UpdateChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -// - channelName string -// - cp *msgpb.MsgPosition -func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelName interface{}, cp interface{}) *MockBroker_UpdateChannelCheckpoint_Call { - return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelName, cp)} +// - ctx context.Context +// - channelCPs []*msgpb.MsgPosition +func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call { + return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)} } -func (_c *MockBroker_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, channelName string, cp *msgpb.MsgPosition)) *MockBroker_UpdateChannelCheckpoint_Call { +func (_c *MockBroker_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, channelCPs []*msgpb.MsgPosition)) *MockBroker_UpdateChannelCheckpoint_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*msgpb.MsgPosition)) + run(args[0].(context.Context), args[1].([]*msgpb.MsgPosition)) }) return _c } @@ -590,7 +589,7 @@ func (_c *MockBroker_UpdateChannelCheckpoint_Call) Return(_a0 error) *MockBroker return _c } -func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string, *msgpb.MsgPosition) error) *MockBroker_UpdateChannelCheckpoint_Call { +func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context.Context, []*msgpb.MsgPosition) error) *MockBroker_UpdateChannelCheckpoint_Call { _c.Call.Return(run) return _c } @@ -615,8 +614,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct { } // UpdateSegmentStatistics is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.UpdateSegmentStatisticsRequest +// - ctx context.Context +// - req *datapb.UpdateSegmentStatisticsRequest func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call { return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)} } diff --git a/internal/datanode/channel_checkpoint_updater.go b/internal/datanode/channel_checkpoint_updater.go index 922383ec89..9f272b2ad0 100644 --- a/internal/datanode/channel_checkpoint_updater.go +++ b/internal/datanode/channel_checkpoint_updater.go @@ -18,49 +18,139 @@ package datanode import ( "context" + "sync" "time" + "github.com/samber/lo" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( - updateChanCPInterval = 1 * time.Minute - updateChanCPTimeout = 10 * time.Second - defaultUpdateChanCPMaxParallel = 1000 + defaultUpdateChanCPMaxParallel = 10 ) +type channelCPUpdateTask struct { + pos *msgpb.MsgPosition + callback func() +} + type channelCheckpointUpdater struct { - dn *DataNode - workerPool *conc.Pool[any] + dn *DataNode + + mu sync.RWMutex + tasks map[string]*channelCPUpdateTask + + closeCh chan struct{} + closeOnce sync.Once } func newChannelCheckpointUpdater(dn *DataNode) *channelCheckpointUpdater { + return &channelCheckpointUpdater{ + dn: dn, + tasks: make(map[string]*channelCPUpdateTask), + closeCh: make(chan struct{}), + } +} + +func (ccu *channelCheckpointUpdater) start() { + log.Info("channel checkpoint updater start") + ticker := time.NewTicker(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second)) + defer ticker.Stop() + for { + select { + case <-ccu.closeCh: + log.Info("channel checkpoint updater exit") + return + case <-ticker.C: + ccu.execute() + } + } +} + +func (ccu *channelCheckpointUpdater) execute() { + ccu.mu.RLock() + taskGroups := lo.Chunk(lo.Values(ccu.tasks), paramtable.Get().DataNodeCfg.MaxChannelCheckpointsPerRPC.GetAsInt()) + ccu.mu.RUnlock() + updateChanCPMaxParallel := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointMaxParallel.GetAsInt() if updateChanCPMaxParallel <= 0 { updateChanCPMaxParallel = defaultUpdateChanCPMaxParallel } - return &channelCheckpointUpdater{ - dn: dn, - workerPool: conc.NewPool[any](updateChanCPMaxParallel, conc.WithPreAlloc(true)), + rpcGroups := lo.Chunk(taskGroups, updateChanCPMaxParallel) + + finished := typeutil.NewConcurrentMap[string, *channelCPUpdateTask]() + + for _, groups := range rpcGroups { + wg := &sync.WaitGroup{} + for _, tasks := range groups { + wg.Add(1) + go func(tasks []*channelCPUpdateTask) { + defer wg.Done() + timeout := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + channelCPs := lo.Map(tasks, func(t *channelCPUpdateTask, _ int) *msgpb.MsgPosition { + return t.pos + }) + err := ccu.dn.broker.UpdateChannelCheckpoint(ctx, channelCPs) + if err != nil { + log.Warn("update channel checkpoint failed", zap.Error(err)) + return + } + for _, task := range tasks { + task.callback() + finished.Insert(task.pos.GetChannelName(), task) + } + }(tasks) + } + wg.Wait() + } + + ccu.mu.Lock() + defer ccu.mu.Unlock() + finished.Range(func(_ string, task *channelCPUpdateTask) bool { + channel := task.pos.GetChannelName() + if ccu.tasks[channel].pos.GetTimestamp() <= task.pos.GetTimestamp() { + delete(ccu.tasks, channel) + } + return true + }) +} + +func (ccu *channelCheckpointUpdater) addTask(channelPos *msgpb.MsgPosition, callback func()) { + if channelPos == nil || channelPos.GetMsgID() == nil || channelPos.GetChannelName() == "" { + log.Warn("illegal checkpoint", zap.Any("pos", channelPos)) + return + } + channel := channelPos.GetChannelName() + ccu.mu.RLock() + if ccu.tasks[channel] != nil && channelPos.GetTimestamp() <= ccu.tasks[channel].pos.GetTimestamp() { + ccu.mu.RUnlock() + return + } + ccu.mu.RUnlock() + + ccu.mu.Lock() + defer ccu.mu.Unlock() + ccu.tasks[channel] = &channelCPUpdateTask{ + pos: channelPos, + callback: callback, } } -func (ccu *channelCheckpointUpdater) updateChannelCP(channelPos *msgpb.MsgPosition, callback func() error) error { - ccu.workerPool.Submit(func() (any, error) { - ctx, cancel := context.WithTimeout(context.Background(), updateChanCPTimeout) - defer cancel() - err := ccu.dn.broker.UpdateChannelCheckpoint(ctx, channelPos.GetChannelName(), channelPos) - if err != nil { - return nil, err - } - err = callback() - return nil, err - }) - return nil +func (ccu *channelCheckpointUpdater) taskNum() int { + ccu.mu.RLock() + defer ccu.mu.RUnlock() + return len(ccu.tasks) } func (ccu *channelCheckpointUpdater) close() { - ccu.workerPool.Release() + ccu.closeOnce.Do(func() { + close(ccu.closeCh) + }) } diff --git a/internal/datanode/channel_checkpoint_updater_test.go b/internal/datanode/channel_checkpoint_updater_test.go new file mode 100644 index 0000000000..04a6b6c860 --- /dev/null +++ b/internal/datanode/channel_checkpoint_updater_test.go @@ -0,0 +1,86 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/atomic" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type ChannelCPUpdaterSuite struct { + suite.Suite + + updater *channelCheckpointUpdater +} + +func (s *ChannelCPUpdaterSuite) SetupTest() { + s.updater = newChannelCheckpointUpdater(&DataNode{}) +} + +func (s *ChannelCPUpdaterSuite) TestUpdate() { + paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "0.01") + defer paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "10") + + b := broker.NewMockBroker(s.T()) + b.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, positions []*msgpb.MsgPosition) error { + time.Sleep(10 * time.Millisecond) + return nil + }) + s.updater.dn.broker = b + + go s.updater.start() + defer s.updater.close() + + tasksNum := 100000 + counter := atomic.NewInt64(0) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < tasksNum; i++ { + // add duplicated task with same timestamp + for j := 0; j < 10; j++ { + s.updater.addTask(&msgpb.MsgPosition{ + ChannelName: fmt.Sprintf("ch-%d", i), + MsgID: []byte{0}, + Timestamp: 100, + }, func() { + counter.Add(1) + }) + } + } + }() + s.Eventually(func() bool { + return counter.Load() == int64(tasksNum) + }, time.Second*10, time.Millisecond*100) + wg.Wait() +} + +func TestChannelCPUpdater(t *testing.T) { + suite.Run(t, new(ChannelCPUpdaterSuite)) +} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 5e66be922d..e962ef9873 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -357,6 +357,8 @@ func (node *DataNode) Start() error { node.timeTickSender.start() } + go node.channelCheckpointUpdater.start() + // Start node watch node node.startWatchChannelsAtBackground(node.ctx) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index ad47f5ec00..3e113d8d7a 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -199,7 +199,7 @@ func TestDataSyncService_Start(t *testing.T) { broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() - broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() node.broker = broker node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) @@ -378,7 +378,7 @@ func TestDataSyncService_Close(t *testing.T) { broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() - broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() node.broker = broker node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 4c31a62715..ef08d52a17 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -67,7 +67,7 @@ func TestWatchChannel(t *testing.T) { broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() - broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() node.broker = broker diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index b0ecbaedc3..3c647861cd 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -61,7 +61,7 @@ func TestFlowGraphManager(t *testing.T) { broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() - broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ Status: merr.Status(nil), diff --git a/internal/datanode/flow_graph_time_tick_node.go b/internal/datanode/flow_graph_time_tick_node.go index 5844e3c0f0..4936b45e9e 100644 --- a/internal/datanode/flow_graph_time_tick_node.go +++ b/internal/datanode/flow_graph_time_tick_node.go @@ -93,25 +93,21 @@ func (ttn *ttNode) Operate(in []Msg) []Msg { // Do not block and async updateCheckPoint channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0]) - nonBlockingNotify := func() { - ttn.updateChannelCP(channelPos, curTs) - } - if curTs.Sub(ttn.lastUpdateTime.Load()) >= updateChanCPInterval { - nonBlockingNotify() + if curTs.Sub(ttn.lastUpdateTime.Load()) >= Params.DataNodeCfg.UpdateChannelCheckpointInterval.GetAsDuration(time.Second) { + ttn.updateChannelCP(channelPos, curTs) return []Msg{} } if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() { - nonBlockingNotify() + ttn.updateChannelCP(channelPos, curTs) } return []Msg{} } -func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time) error { - callBack := func() error { +func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time) { + callBack := func() { channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp()) - ttn.lastUpdateTime.Store(curTs) log.Debug("UpdateChannelCheckpoint success", zap.String("channel", ttn.vChannelName), zap.Uint64("cpTs", channelPos.GetTimestamp()), @@ -120,11 +116,9 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() { ttn.channel.setFlushTs(math.MaxUint64) } - return nil } - - err := ttn.cpUpdater.updateChannelCP(channelPos, callBack) - return err + ttn.cpUpdater.addTask(channelPos, callBack) + ttn.lastUpdateTime.Store(curTs) } func newTTNode(config *nodeConfig, broker broker.Broker, cpUpdater *channelCheckpointUpdater) (*ttNode, error) { diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index ac07f5cd41..fad2ce9414 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -113,7 +113,7 @@ func (s *DataNodeServicesSuite) SetupTest() { }, nil).Maybe() broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() - broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0), func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe() @@ -576,7 +576,7 @@ func (s *DataNodeServicesSuite) TestImport() { Return([]*datapb.SegmentInfo{}, nil).Maybe() s.broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() - s.broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + s.broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe() s.broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0), func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe() diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index ce7732b842..fa84e4eeeb 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -123,6 +123,7 @@ type DataCoordCatalog interface { ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) SaveChannelCheckpoint(ctx context.Context, vChannel string, pos *msgpb.MsgPosition) error + SaveChannelCheckpoints(ctx context.Context, positions []*msgpb.MsgPosition) error DropChannelCheckpoint(ctx context.Context, vChannel string) error CreateIndex(ctx context.Context, index *model.Index) error diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 5a8f22692b..84536cd382 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -526,6 +526,19 @@ func (kc *Catalog) SaveChannelCheckpoint(ctx context.Context, vChannel string, p return kc.MetaKv.Save(k, string(v)) } +func (kc *Catalog) SaveChannelCheckpoints(ctx context.Context, positions []*msgpb.MsgPosition) error { + kvs := make(map[string]string) + for _, position := range positions { + k := buildChannelCPKey(position.GetChannelName()) + v, err := proto.Marshal(position) + if err != nil { + return err + } + kvs[k] = string(v) + } + return kc.SaveByBatch(kvs) +} + func (kc *Catalog) DropChannelCheckpoint(ctx context.Context, vChannel string) error { k := buildChannelCPKey(vChannel) return kc.MetaKv.Remove(k) diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 079c778589..5d32bfec49 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -590,6 +590,22 @@ func TestChannelCP(t *testing.T) { assert.Error(t, err) }) + t.Run("SaveChannelCheckpoints", func(t *testing.T) { + txn := mocks.NewMetaKv(t) + txn.EXPECT().MultiSave(mock.Anything).Return(nil) + catalog := NewCatalog(txn, rootPath, "") + err := catalog.SaveChannelCheckpoints(context.TODO(), []*msgpb.MsgPosition{pos}) + assert.NoError(t, err) + }) + + t.Run("SaveChannelCheckpoints failed", func(t *testing.T) { + txn := mocks.NewMetaKv(t) + catalog := NewCatalog(txn, rootPath, "") + txn.EXPECT().MultiSave(mock.Anything).Return(errors.New("mock error")) + err = catalog.SaveChannelCheckpoints(context.TODO(), []*msgpb.MsgPosition{pos}) + assert.Error(t, err) + }) + t.Run("DropChannelCheckpoint", func(t *testing.T) { txn := mocks.NewMetaKv(t) txn.EXPECT().Save(mock.Anything, mock.Anything).Return(nil) diff --git a/internal/metastore/mocks/mock_datacoord_catalog.go b/internal/metastore/mocks/mock_datacoord_catalog.go index 59c3d1369c..0771ce9b12 100644 --- a/internal/metastore/mocks/mock_datacoord_catalog.go +++ b/internal/metastore/mocks/mock_datacoord_catalog.go @@ -953,6 +953,49 @@ func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) RunAndReturn(run func(con return _c } +// SaveChannelCheckpoints provides a mock function with given fields: ctx, positions +func (_m *DataCoordCatalog) SaveChannelCheckpoints(ctx context.Context, positions []*msgpb.MsgPosition) error { + ret := _m.Called(ctx, positions) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*msgpb.MsgPosition) error); ok { + r0 = rf(ctx, positions) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataCoordCatalog_SaveChannelCheckpoints_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveChannelCheckpoints' +type DataCoordCatalog_SaveChannelCheckpoints_Call struct { + *mock.Call +} + +// SaveChannelCheckpoints is a helper method to define mock.On call +// - ctx context.Context +// - positions []*msgpb.MsgPosition +func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoints(ctx interface{}, positions interface{}) *DataCoordCatalog_SaveChannelCheckpoints_Call { + return &DataCoordCatalog_SaveChannelCheckpoints_Call{Call: _e.mock.On("SaveChannelCheckpoints", ctx, positions)} +} + +func (_c *DataCoordCatalog_SaveChannelCheckpoints_Call) Run(run func(ctx context.Context, positions []*msgpb.MsgPosition)) *DataCoordCatalog_SaveChannelCheckpoints_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]*msgpb.MsgPosition)) + }) + return _c +} + +func (_c *DataCoordCatalog_SaveChannelCheckpoints_Call) Return(_a0 error) *DataCoordCatalog_SaveChannelCheckpoints_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DataCoordCatalog_SaveChannelCheckpoints_Call) RunAndReturn(run func(context.Context, []*msgpb.MsgPosition) error) *DataCoordCatalog_SaveChannelCheckpoints_Call { + _c.Call.Return(run) + return _c +} + // SaveDroppedSegmentsInBatch provides a mock function with given fields: ctx, segments func (_m *DataCoordCatalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { ret := _m.Called(ctx, segments) diff --git a/internal/mocks/mock_datacoord_catalog.go b/internal/mocks/mock_datacoord_catalog.go deleted file mode 100644 index 8fd49e52bc..0000000000 --- a/internal/mocks/mock_datacoord_catalog.go +++ /dev/null @@ -1,1228 +0,0 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. - -package mocks - -import ( - context "context" - - metastore "github.com/milvus-io/milvus/internal/metastore" - datapb "github.com/milvus-io/milvus/internal/proto/datapb" - - mock "github.com/stretchr/testify/mock" - - model "github.com/milvus-io/milvus/internal/metastore/model" - - msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" -) - -// DataCoordCatalog is an autogenerated mock type for the DataCoordCatalog type -type DataCoordCatalog struct { - mock.Mock -} - -type DataCoordCatalog_Expecter struct { - mock *mock.Mock -} - -func (_m *DataCoordCatalog) EXPECT() *DataCoordCatalog_Expecter { - return &DataCoordCatalog_Expecter{mock: &_m.Mock} -} - -// AddSegment provides a mock function with given fields: ctx, segment -func (_m *DataCoordCatalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error { - ret := _m.Called(ctx, segment) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *datapb.SegmentInfo) error); ok { - r0 = rf(ctx, segment) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AddSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddSegment' -type DataCoordCatalog_AddSegment_Call struct { - *mock.Call -} - -// AddSegment is a helper method to define mock.On call -// - ctx context.Context -// - segment *datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) AddSegment(ctx interface{}, segment interface{}) *DataCoordCatalog_AddSegment_Call { - return &DataCoordCatalog_AddSegment_Call{Call: _e.mock.On("AddSegment", ctx, segment)} -} - -func (_c *DataCoordCatalog_AddSegment_Call) Run(run func(ctx context.Context, segment *datapb.SegmentInfo)) *DataCoordCatalog_AddSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_AddSegment_Call) Return(_a0 error) *DataCoordCatalog_AddSegment_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AddSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo) error) *DataCoordCatalog_AddSegment_Call { - _c.Call.Return(run) - return _c -} - -// AlterIndex provides a mock function with given fields: ctx, newIndex -func (_m *DataCoordCatalog) AlterIndex(ctx context.Context, newIndex *model.Index) error { - ret := _m.Called(ctx, newIndex) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *model.Index) error); ok { - r0 = rf(ctx, newIndex) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex' -type DataCoordCatalog_AlterIndex_Call struct { - *mock.Call -} - -// AlterIndex is a helper method to define mock.On call -// - ctx context.Context -// - newIndex *model.Index -func (_e *DataCoordCatalog_Expecter) AlterIndex(ctx interface{}, newIndex interface{}) *DataCoordCatalog_AlterIndex_Call { - return &DataCoordCatalog_AlterIndex_Call{Call: _e.mock.On("AlterIndex", ctx, newIndex)} -} - -func (_c *DataCoordCatalog_AlterIndex_Call) Run(run func(ctx context.Context, newIndex *model.Index)) *DataCoordCatalog_AlterIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*model.Index)) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterIndex_Call) Return(_a0 error) *DataCoordCatalog_AlterIndex_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterIndex_Call) RunAndReturn(run func(context.Context, *model.Index) error) *DataCoordCatalog_AlterIndex_Call { - _c.Call.Return(run) - return _c -} - -// AlterIndexes provides a mock function with given fields: ctx, newIndexes -func (_m *DataCoordCatalog) AlterIndexes(ctx context.Context, newIndexes []*model.Index) error { - ret := _m.Called(ctx, newIndexes) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*model.Index) error); ok { - r0 = rf(ctx, newIndexes) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndexes' -type DataCoordCatalog_AlterIndexes_Call struct { - *mock.Call -} - -// AlterIndexes is a helper method to define mock.On call -// - ctx context.Context -// - newIndexes []*model.Index -func (_e *DataCoordCatalog_Expecter) AlterIndexes(ctx interface{}, newIndexes interface{}) *DataCoordCatalog_AlterIndexes_Call { - return &DataCoordCatalog_AlterIndexes_Call{Call: _e.mock.On("AlterIndexes", ctx, newIndexes)} -} - -func (_c *DataCoordCatalog_AlterIndexes_Call) Run(run func(ctx context.Context, newIndexes []*model.Index)) *DataCoordCatalog_AlterIndexes_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*model.Index)) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterIndexes_Call) Return(_a0 error) *DataCoordCatalog_AlterIndexes_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterIndexes_Call) RunAndReturn(run func(context.Context, []*model.Index) error) *DataCoordCatalog_AlterIndexes_Call { - _c.Call.Return(run) - return _c -} - -// AlterSegment provides a mock function with given fields: ctx, newSegment, oldSegment -func (_m *DataCoordCatalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error { - ret := _m.Called(ctx, newSegment, oldSegment) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *datapb.SegmentInfo, *datapb.SegmentInfo) error); ok { - r0 = rf(ctx, newSegment, oldSegment) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterSegment' -type DataCoordCatalog_AlterSegment_Call struct { - *mock.Call -} - -// AlterSegment is a helper method to define mock.On call -// - ctx context.Context -// - newSegment *datapb.SegmentInfo -// - oldSegment *datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) AlterSegment(ctx interface{}, newSegment interface{}, oldSegment interface{}) *DataCoordCatalog_AlterSegment_Call { - return &DataCoordCatalog_AlterSegment_Call{Call: _e.mock.On("AlterSegment", ctx, newSegment, oldSegment)} -} - -func (_c *DataCoordCatalog_AlterSegment_Call) Run(run func(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo)) *DataCoordCatalog_AlterSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*datapb.SegmentInfo), args[2].(*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterSegment_Call) Return(_a0 error) *DataCoordCatalog_AlterSegment_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo, *datapb.SegmentInfo) error) *DataCoordCatalog_AlterSegment_Call { - _c.Call.Return(run) - return _c -} - -// AlterSegmentIndex provides a mock function with given fields: ctx, newSegIndex -func (_m *DataCoordCatalog) AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error { - ret := _m.Called(ctx, newSegIndex) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *model.SegmentIndex) error); ok { - r0 = rf(ctx, newSegIndex) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterSegmentIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterSegmentIndex' -type DataCoordCatalog_AlterSegmentIndex_Call struct { - *mock.Call -} - -// AlterSegmentIndex is a helper method to define mock.On call -// - ctx context.Context -// - newSegIndex *model.SegmentIndex -func (_e *DataCoordCatalog_Expecter) AlterSegmentIndex(ctx interface{}, newSegIndex interface{}) *DataCoordCatalog_AlterSegmentIndex_Call { - return &DataCoordCatalog_AlterSegmentIndex_Call{Call: _e.mock.On("AlterSegmentIndex", ctx, newSegIndex)} -} - -func (_c *DataCoordCatalog_AlterSegmentIndex_Call) Run(run func(ctx context.Context, newSegIndex *model.SegmentIndex)) *DataCoordCatalog_AlterSegmentIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*model.SegmentIndex)) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterSegmentIndex_Call) Return(_a0 error) *DataCoordCatalog_AlterSegmentIndex_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterSegmentIndex_Call) RunAndReturn(run func(context.Context, *model.SegmentIndex) error) *DataCoordCatalog_AlterSegmentIndex_Call { - _c.Call.Return(run) - return _c -} - -// AlterSegmentIndexes provides a mock function with given fields: ctx, newSegIdxes -func (_m *DataCoordCatalog) AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error { - ret := _m.Called(ctx, newSegIdxes) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*model.SegmentIndex) error); ok { - r0 = rf(ctx, newSegIdxes) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterSegmentIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterSegmentIndexes' -type DataCoordCatalog_AlterSegmentIndexes_Call struct { - *mock.Call -} - -// AlterSegmentIndexes is a helper method to define mock.On call -// - ctx context.Context -// - newSegIdxes []*model.SegmentIndex -func (_e *DataCoordCatalog_Expecter) AlterSegmentIndexes(ctx interface{}, newSegIdxes interface{}) *DataCoordCatalog_AlterSegmentIndexes_Call { - return &DataCoordCatalog_AlterSegmentIndexes_Call{Call: _e.mock.On("AlterSegmentIndexes", ctx, newSegIdxes)} -} - -func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) Run(run func(ctx context.Context, newSegIdxes []*model.SegmentIndex)) *DataCoordCatalog_AlterSegmentIndexes_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*model.SegmentIndex)) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) Return(_a0 error) *DataCoordCatalog_AlterSegmentIndexes_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterSegmentIndexes_Call) RunAndReturn(run func(context.Context, []*model.SegmentIndex) error) *DataCoordCatalog_AlterSegmentIndexes_Call { - _c.Call.Return(run) - return _c -} - -// AlterSegments provides a mock function with given fields: ctx, newSegments, binlogs -func (_m *DataCoordCatalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error { - _va := make([]interface{}, len(binlogs)) - for _i := range binlogs { - _va[_i] = binlogs[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, newSegments) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo, ...metastore.BinlogsIncrement) error); ok { - r0 = rf(ctx, newSegments, binlogs...) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterSegments' -type DataCoordCatalog_AlterSegments_Call struct { - *mock.Call -} - -// AlterSegments is a helper method to define mock.On call -// - ctx context.Context -// - newSegments []*datapb.SegmentInfo -// - binlogs ...metastore.BinlogsIncrement -func (_e *DataCoordCatalog_Expecter) AlterSegments(ctx interface{}, newSegments interface{}, binlogs ...interface{}) *DataCoordCatalog_AlterSegments_Call { - return &DataCoordCatalog_AlterSegments_Call{Call: _e.mock.On("AlterSegments", - append([]interface{}{ctx, newSegments}, binlogs...)...)} -} - -func (_c *DataCoordCatalog_AlterSegments_Call) Run(run func(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement)) *DataCoordCatalog_AlterSegments_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]metastore.BinlogsIncrement, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(metastore.BinlogsIncrement) - } - } - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo), variadicArgs...) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterSegments_Call) Return(_a0 error) *DataCoordCatalog_AlterSegments_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterSegments_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo, ...metastore.BinlogsIncrement) error) *DataCoordCatalog_AlterSegments_Call { - _c.Call.Return(run) - return _c -} - -// AlterSegmentsAndAddNewSegment provides a mock function with given fields: ctx, segments, newSegment -func (_m *DataCoordCatalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error { - ret := _m.Called(ctx, segments, newSegment) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo, *datapb.SegmentInfo) error); ok { - r0 = rf(ctx, segments, newSegment) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterSegmentsAndAddNewSegment' -type DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call struct { - *mock.Call -} - -// AlterSegmentsAndAddNewSegment is a helper method to define mock.On call -// - ctx context.Context -// - segments []*datapb.SegmentInfo -// - newSegment *datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) AlterSegmentsAndAddNewSegment(ctx interface{}, segments interface{}, newSegment interface{}) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call { - return &DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call{Call: _e.mock.On("AlterSegmentsAndAddNewSegment", ctx, segments, newSegment)} -} - -func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Run(run func(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo)) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo), args[2].(*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo, *datapb.SegmentInfo) error) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call { - _c.Call.Return(run) - return _c -} - -// ChannelExists provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool { - ret := _m.Called(ctx, channel) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// DataCoordCatalog_ChannelExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChannelExists' -type DataCoordCatalog_ChannelExists_Call struct { - *mock.Call -} - -// ChannelExists is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) ChannelExists(ctx interface{}, channel interface{}) *DataCoordCatalog_ChannelExists_Call { - return &DataCoordCatalog_ChannelExists_Call{Call: _e.mock.On("ChannelExists", ctx, channel)} -} - -func (_c *DataCoordCatalog_ChannelExists_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ChannelExists_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalog_ChannelExists_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_ChannelExists_Call) RunAndReturn(run func(context.Context, string) bool) *DataCoordCatalog_ChannelExists_Call { - _c.Call.Return(run) - return _c -} - -// CreateIndex provides a mock function with given fields: ctx, index -func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error { - ret := _m.Called(ctx, index) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *model.Index) error); ok { - r0 = rf(ctx, index) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_CreateIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateIndex' -type DataCoordCatalog_CreateIndex_Call struct { - *mock.Call -} - -// CreateIndex is a helper method to define mock.On call -// - ctx context.Context -// - index *model.Index -func (_e *DataCoordCatalog_Expecter) CreateIndex(ctx interface{}, index interface{}) *DataCoordCatalog_CreateIndex_Call { - return &DataCoordCatalog_CreateIndex_Call{Call: _e.mock.On("CreateIndex", ctx, index)} -} - -func (_c *DataCoordCatalog_CreateIndex_Call) Run(run func(ctx context.Context, index *model.Index)) *DataCoordCatalog_CreateIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*model.Index)) - }) - return _c -} - -func (_c *DataCoordCatalog_CreateIndex_Call) Return(_a0 error) *DataCoordCatalog_CreateIndex_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_CreateIndex_Call) RunAndReturn(run func(context.Context, *model.Index) error) *DataCoordCatalog_CreateIndex_Call { - _c.Call.Return(run) - return _c -} - -// CreateSegmentIndex provides a mock function with given fields: ctx, segIdx -func (_m *DataCoordCatalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error { - ret := _m.Called(ctx, segIdx) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *model.SegmentIndex) error); ok { - r0 = rf(ctx, segIdx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_CreateSegmentIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateSegmentIndex' -type DataCoordCatalog_CreateSegmentIndex_Call struct { - *mock.Call -} - -// CreateSegmentIndex is a helper method to define mock.On call -// - ctx context.Context -// - segIdx *model.SegmentIndex -func (_e *DataCoordCatalog_Expecter) CreateSegmentIndex(ctx interface{}, segIdx interface{}) *DataCoordCatalog_CreateSegmentIndex_Call { - return &DataCoordCatalog_CreateSegmentIndex_Call{Call: _e.mock.On("CreateSegmentIndex", ctx, segIdx)} -} - -func (_c *DataCoordCatalog_CreateSegmentIndex_Call) Run(run func(ctx context.Context, segIdx *model.SegmentIndex)) *DataCoordCatalog_CreateSegmentIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*model.SegmentIndex)) - }) - return _c -} - -func (_c *DataCoordCatalog_CreateSegmentIndex_Call) Return(_a0 error) *DataCoordCatalog_CreateSegmentIndex_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_CreateSegmentIndex_Call) RunAndReturn(run func(context.Context, *model.SegmentIndex) error) *DataCoordCatalog_CreateSegmentIndex_Call { - _c.Call.Return(run) - return _c -} - -// DropChannel provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) DropChannel(ctx context.Context, channel string) error { - ret := _m.Called(ctx, channel) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_DropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropChannel' -type DataCoordCatalog_DropChannel_Call struct { - *mock.Call -} - -// DropChannel is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) DropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_DropChannel_Call { - return &DataCoordCatalog_DropChannel_Call{Call: _e.mock.On("DropChannel", ctx, channel)} -} - -func (_c *DataCoordCatalog_DropChannel_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_DropChannel_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_DropChannel_Call) Return(_a0 error) *DataCoordCatalog_DropChannel_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_DropChannel_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_DropChannel_Call { - _c.Call.Return(run) - return _c -} - -// DropChannelCheckpoint provides a mock function with given fields: ctx, vChannel -func (_m *DataCoordCatalog) DropChannelCheckpoint(ctx context.Context, vChannel string) error { - ret := _m.Called(ctx, vChannel) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, vChannel) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_DropChannelCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropChannelCheckpoint' -type DataCoordCatalog_DropChannelCheckpoint_Call struct { - *mock.Call -} - -// DropChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -// - vChannel string -func (_e *DataCoordCatalog_Expecter) DropChannelCheckpoint(ctx interface{}, vChannel interface{}) *DataCoordCatalog_DropChannelCheckpoint_Call { - return &DataCoordCatalog_DropChannelCheckpoint_Call{Call: _e.mock.On("DropChannelCheckpoint", ctx, vChannel)} -} - -func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) Run(run func(ctx context.Context, vChannel string)) *DataCoordCatalog_DropChannelCheckpoint_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) Return(_a0 error) *DataCoordCatalog_DropChannelCheckpoint_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_DropChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_DropChannelCheckpoint_Call { - _c.Call.Return(run) - return _c -} - -// DropIndex provides a mock function with given fields: ctx, collID, dropIdxID -func (_m *DataCoordCatalog) DropIndex(ctx context.Context, collID int64, dropIdxID int64) error { - ret := _m.Called(ctx, collID, dropIdxID) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64) error); ok { - r0 = rf(ctx, collID, dropIdxID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_DropIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropIndex' -type DataCoordCatalog_DropIndex_Call struct { - *mock.Call -} - -// DropIndex is a helper method to define mock.On call -// - ctx context.Context -// - collID int64 -// - dropIdxID int64 -func (_e *DataCoordCatalog_Expecter) DropIndex(ctx interface{}, collID interface{}, dropIdxID interface{}) *DataCoordCatalog_DropIndex_Call { - return &DataCoordCatalog_DropIndex_Call{Call: _e.mock.On("DropIndex", ctx, collID, dropIdxID)} -} - -func (_c *DataCoordCatalog_DropIndex_Call) Run(run func(ctx context.Context, collID int64, dropIdxID int64)) *DataCoordCatalog_DropIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64)) - }) - return _c -} - -func (_c *DataCoordCatalog_DropIndex_Call) Return(_a0 error) *DataCoordCatalog_DropIndex_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_DropIndex_Call) RunAndReturn(run func(context.Context, int64, int64) error) *DataCoordCatalog_DropIndex_Call { - _c.Call.Return(run) - return _c -} - -// DropSegment provides a mock function with given fields: ctx, segment -func (_m *DataCoordCatalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { - ret := _m.Called(ctx, segment) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *datapb.SegmentInfo) error); ok { - r0 = rf(ctx, segment) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment' -type DataCoordCatalog_DropSegment_Call struct { - *mock.Call -} - -// DropSegment is a helper method to define mock.On call -// - ctx context.Context -// - segment *datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) DropSegment(ctx interface{}, segment interface{}) *DataCoordCatalog_DropSegment_Call { - return &DataCoordCatalog_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segment)} -} - -func (_c *DataCoordCatalog_DropSegment_Call) Run(run func(ctx context.Context, segment *datapb.SegmentInfo)) *DataCoordCatalog_DropSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_DropSegment_Call) Return(_a0 error) *DataCoordCatalog_DropSegment_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_DropSegment_Call) RunAndReturn(run func(context.Context, *datapb.SegmentInfo) error) *DataCoordCatalog_DropSegment_Call { - _c.Call.Return(run) - return _c -} - -// DropSegmentIndex provides a mock function with given fields: ctx, collID, partID, segID, buildID -func (_m *DataCoordCatalog) DropSegmentIndex(ctx context.Context, collID int64, partID int64, segID int64, buildID int64) error { - ret := _m.Called(ctx, collID, partID, segID, buildID) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, int64) error); ok { - r0 = rf(ctx, collID, partID, segID, buildID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_DropSegmentIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentIndex' -type DataCoordCatalog_DropSegmentIndex_Call struct { - *mock.Call -} - -// DropSegmentIndex is a helper method to define mock.On call -// - ctx context.Context -// - collID int64 -// - partID int64 -// - segID int64 -// - buildID int64 -func (_e *DataCoordCatalog_Expecter) DropSegmentIndex(ctx interface{}, collID interface{}, partID interface{}, segID interface{}, buildID interface{}) *DataCoordCatalog_DropSegmentIndex_Call { - return &DataCoordCatalog_DropSegmentIndex_Call{Call: _e.mock.On("DropSegmentIndex", ctx, collID, partID, segID, buildID)} -} - -func (_c *DataCoordCatalog_DropSegmentIndex_Call) Run(run func(ctx context.Context, collID int64, partID int64, segID int64, buildID int64)) *DataCoordCatalog_DropSegmentIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(int64)) - }) - return _c -} - -func (_c *DataCoordCatalog_DropSegmentIndex_Call) Return(_a0 error) *DataCoordCatalog_DropSegmentIndex_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_DropSegmentIndex_Call) RunAndReturn(run func(context.Context, int64, int64, int64, int64) error) *DataCoordCatalog_DropSegmentIndex_Call { - _c.Call.Return(run) - return _c -} - -// GcConfirm provides a mock function with given fields: ctx, collectionID, partitionID -func (_m *DataCoordCatalog) GcConfirm(ctx context.Context, collectionID int64, partitionID int64) bool { - ret := _m.Called(ctx, collectionID, partitionID) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, int64, int64) bool); ok { - r0 = rf(ctx, collectionID, partitionID) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// DataCoordCatalog_GcConfirm_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GcConfirm' -type DataCoordCatalog_GcConfirm_Call struct { - *mock.Call -} - -// GcConfirm is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 -func (_e *DataCoordCatalog_Expecter) GcConfirm(ctx interface{}, collectionID interface{}, partitionID interface{}) *DataCoordCatalog_GcConfirm_Call { - return &DataCoordCatalog_GcConfirm_Call{Call: _e.mock.On("GcConfirm", ctx, collectionID, partitionID)} -} - -func (_c *DataCoordCatalog_GcConfirm_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64)) *DataCoordCatalog_GcConfirm_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64)) - }) - return _c -} - -func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_GcConfirm_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_GcConfirm_Call) RunAndReturn(run func(context.Context, int64, int64) bool) *DataCoordCatalog_GcConfirm_Call { - _c.Call.Return(run) - return _c -} - -// ListChannelCheckpoint provides a mock function with given fields: ctx -func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { - ret := _m.Called(ctx) - - var r0 map[string]*msgpb.MsgPosition - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (map[string]*msgpb.MsgPosition, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) map[string]*msgpb.MsgPosition); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]*msgpb.MsgPosition) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DataCoordCatalog_ListChannelCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListChannelCheckpoint' -type DataCoordCatalog_ListChannelCheckpoint_Call struct { - *mock.Call -} - -// ListChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -func (_e *DataCoordCatalog_Expecter) ListChannelCheckpoint(ctx interface{}) *DataCoordCatalog_ListChannelCheckpoint_Call { - return &DataCoordCatalog_ListChannelCheckpoint_Call{Call: _e.mock.On("ListChannelCheckpoint", ctx)} -} - -func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) Run(run func(ctx context.Context)) *DataCoordCatalog_ListChannelCheckpoint_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) Return(_a0 map[string]*msgpb.MsgPosition, _a1 error) *DataCoordCatalog_ListChannelCheckpoint_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *DataCoordCatalog_ListChannelCheckpoint_Call) RunAndReturn(run func(context.Context) (map[string]*msgpb.MsgPosition, error)) *DataCoordCatalog_ListChannelCheckpoint_Call { - _c.Call.Return(run) - return _c -} - -// ListIndexes provides a mock function with given fields: ctx -func (_m *DataCoordCatalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { - ret := _m.Called(ctx) - - var r0 []*model.Index - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]*model.Index, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) []*model.Index); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*model.Index) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DataCoordCatalog_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes' -type DataCoordCatalog_ListIndexes_Call struct { - *mock.Call -} - -// ListIndexes is a helper method to define mock.On call -// - ctx context.Context -func (_e *DataCoordCatalog_Expecter) ListIndexes(ctx interface{}) *DataCoordCatalog_ListIndexes_Call { - return &DataCoordCatalog_ListIndexes_Call{Call: _e.mock.On("ListIndexes", ctx)} -} - -func (_c *DataCoordCatalog_ListIndexes_Call) Run(run func(ctx context.Context)) *DataCoordCatalog_ListIndexes_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *DataCoordCatalog_ListIndexes_Call) Return(_a0 []*model.Index, _a1 error) *DataCoordCatalog_ListIndexes_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *DataCoordCatalog_ListIndexes_Call) RunAndReturn(run func(context.Context) ([]*model.Index, error)) *DataCoordCatalog_ListIndexes_Call { - _c.Call.Return(run) - return _c -} - -// ListSegmentIndexes provides a mock function with given fields: ctx -func (_m *DataCoordCatalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) { - ret := _m.Called(ctx) - - var r0 []*model.SegmentIndex - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]*model.SegmentIndex, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) []*model.SegmentIndex); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*model.SegmentIndex) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DataCoordCatalog_ListSegmentIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListSegmentIndexes' -type DataCoordCatalog_ListSegmentIndexes_Call struct { - *mock.Call -} - -// ListSegmentIndexes is a helper method to define mock.On call -// - ctx context.Context -func (_e *DataCoordCatalog_Expecter) ListSegmentIndexes(ctx interface{}) *DataCoordCatalog_ListSegmentIndexes_Call { - return &DataCoordCatalog_ListSegmentIndexes_Call{Call: _e.mock.On("ListSegmentIndexes", ctx)} -} - -func (_c *DataCoordCatalog_ListSegmentIndexes_Call) Run(run func(ctx context.Context)) *DataCoordCatalog_ListSegmentIndexes_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *DataCoordCatalog_ListSegmentIndexes_Call) Return(_a0 []*model.SegmentIndex, _a1 error) *DataCoordCatalog_ListSegmentIndexes_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *DataCoordCatalog_ListSegmentIndexes_Call) RunAndReturn(run func(context.Context) ([]*model.SegmentIndex, error)) *DataCoordCatalog_ListSegmentIndexes_Call { - _c.Call.Return(run) - return _c -} - -// ListSegments provides a mock function with given fields: ctx -func (_m *DataCoordCatalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { - ret := _m.Called(ctx) - - var r0 []*datapb.SegmentInfo - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]*datapb.SegmentInfo, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) []*datapb.SegmentInfo); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*datapb.SegmentInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DataCoordCatalog_ListSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListSegments' -type DataCoordCatalog_ListSegments_Call struct { - *mock.Call -} - -// ListSegments is a helper method to define mock.On call -// - ctx context.Context -func (_e *DataCoordCatalog_Expecter) ListSegments(ctx interface{}) *DataCoordCatalog_ListSegments_Call { - return &DataCoordCatalog_ListSegments_Call{Call: _e.mock.On("ListSegments", ctx)} -} - -func (_c *DataCoordCatalog_ListSegments_Call) Run(run func(ctx context.Context)) *DataCoordCatalog_ListSegments_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo, _a1 error) *DataCoordCatalog_ListSegments_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *DataCoordCatalog_ListSegments_Call) RunAndReturn(run func(context.Context) ([]*datapb.SegmentInfo, error)) *DataCoordCatalog_ListSegments_Call { - _c.Call.Return(run) - return _c -} - -// MarkChannelAdded provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error { - ret := _m.Called(ctx, channel) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_MarkChannelAdded_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkChannelAdded' -type DataCoordCatalog_MarkChannelAdded_Call struct { - *mock.Call -} - -// MarkChannelAdded is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) MarkChannelAdded(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelAdded_Call { - return &DataCoordCatalog_MarkChannelAdded_Call{Call: _e.mock.On("MarkChannelAdded", ctx, channel)} -} - -func (_c *DataCoordCatalog_MarkChannelAdded_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_MarkChannelAdded_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCatalog_MarkChannelAdded_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_MarkChannelAdded_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_MarkChannelAdded_Call { - _c.Call.Return(run) - return _c -} - -// MarkChannelDeleted provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error { - ret := _m.Called(ctx, channel) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_MarkChannelDeleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkChannelDeleted' -type DataCoordCatalog_MarkChannelDeleted_Call struct { - *mock.Call -} - -// MarkChannelDeleted is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) MarkChannelDeleted(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelDeleted_Call { - return &DataCoordCatalog_MarkChannelDeleted_Call{Call: _e.mock.On("MarkChannelDeleted", ctx, channel)} -} - -func (_c *DataCoordCatalog_MarkChannelDeleted_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_MarkChannelDeleted_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_MarkChannelDeleted_Call) Return(_a0 error) *DataCoordCatalog_MarkChannelDeleted_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_MarkChannelDeleted_Call) RunAndReturn(run func(context.Context, string) error) *DataCoordCatalog_MarkChannelDeleted_Call { - _c.Call.Return(run) - return _c -} - -// SaveChannelCheckpoint provides a mock function with given fields: ctx, vChannel, pos -func (_m *DataCoordCatalog) SaveChannelCheckpoint(ctx context.Context, vChannel string, pos *msgpb.MsgPosition) error { - ret := _m.Called(ctx, vChannel, pos) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, *msgpb.MsgPosition) error); ok { - r0 = rf(ctx, vChannel, pos) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_SaveChannelCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveChannelCheckpoint' -type DataCoordCatalog_SaveChannelCheckpoint_Call struct { - *mock.Call -} - -// SaveChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -// - vChannel string -// - pos *msgpb.MsgPosition -func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoint(ctx interface{}, vChannel interface{}, pos interface{}) *DataCoordCatalog_SaveChannelCheckpoint_Call { - return &DataCoordCatalog_SaveChannelCheckpoint_Call{Call: _e.mock.On("SaveChannelCheckpoint", ctx, vChannel, pos)} -} - -func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) Run(run func(ctx context.Context, vChannel string, pos *msgpb.MsgPosition)) *DataCoordCatalog_SaveChannelCheckpoint_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*msgpb.MsgPosition)) - }) - return _c -} - -func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) Return(_a0 error) *DataCoordCatalog_SaveChannelCheckpoint_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string, *msgpb.MsgPosition) error) *DataCoordCatalog_SaveChannelCheckpoint_Call { - _c.Call.Return(run) - return _c -} - -// SaveDroppedSegmentsInBatch provides a mock function with given fields: ctx, segments -func (_m *DataCoordCatalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { - ret := _m.Called(ctx, segments) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []*datapb.SegmentInfo) error); ok { - r0 = rf(ctx, segments) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DataCoordCatalog_SaveDroppedSegmentsInBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveDroppedSegmentsInBatch' -type DataCoordCatalog_SaveDroppedSegmentsInBatch_Call struct { - *mock.Call -} - -// SaveDroppedSegmentsInBatch is a helper method to define mock.On call -// - ctx context.Context -// - segments []*datapb.SegmentInfo -func (_e *DataCoordCatalog_Expecter) SaveDroppedSegmentsInBatch(ctx interface{}, segments interface{}) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call { - return &DataCoordCatalog_SaveDroppedSegmentsInBatch_Call{Call: _e.mock.On("SaveDroppedSegmentsInBatch", ctx, segments)} -} - -func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Run(run func(ctx context.Context, segments []*datapb.SegmentInfo)) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*datapb.SegmentInfo)) - }) - return _c -} - -func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) RunAndReturn(run func(context.Context, []*datapb.SegmentInfo) error) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call { - _c.Call.Return(run) - return _c -} - -// ShouldDropChannel provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool { - ret := _m.Called(ctx, channel) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// DataCoordCatalog_ShouldDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldDropChannel' -type DataCoordCatalog_ShouldDropChannel_Call struct { - *mock.Call -} - -// ShouldDropChannel is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) ShouldDropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_ShouldDropChannel_Call { - return &DataCoordCatalog_ShouldDropChannel_Call{Call: _e.mock.On("ShouldDropChannel", ctx, channel)} -} - -func (_c *DataCoordCatalog_ShouldDropChannel_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ShouldDropChannel_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCatalog_ShouldDropChannel_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *DataCoordCatalog_ShouldDropChannel_Call) RunAndReturn(run func(context.Context, string) bool) *DataCoordCatalog_ShouldDropChannel_Call { - _c.Call.Return(run) - return _c -} - -// NewDataCoordCatalog creates a new instance of DataCoordCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewDataCoordCatalog(t interface { - mock.TestingT - Cleanup(func()) -}) *DataCoordCatalog { - mock := &DataCoordCatalog{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 9e7c9f06f1..43979330fd 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -632,8 +632,9 @@ message UpdateSegmentStatisticsRequest { message UpdateChannelCheckpointRequest { common.MsgBase base = 1; - string vChannel = 2; - msg.MsgPosition position = 3; + string vChannel = 2; // deprecated, keep it for compatibility + msg.MsgPosition position = 3; // deprecated, keep it for compatibility + repeated msg.MsgPosition channel_checkpoints = 4; } message ResendSegmentStatsRequest { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6d0ee4112e..f950c74e8e 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2768,7 +2768,11 @@ type dataNodeConfig struct { // channel ChannelWorkPoolSize ParamItem `refreshable:"true"` - UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"` + UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"` + UpdateChannelCheckpointInterval ParamItem `refreshable:"true"` + UpdateChannelCheckpointRPCTimeout ParamItem `refreshable:"true"` + MaxChannelCheckpointsPerRPC ParamItem `refreshable:"true"` + ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` } @@ -2984,10 +2988,42 @@ func (p *dataNodeConfig) init(base *BaseTable) { Key: "datanode.channel.updateChannelCheckpointMaxParallel", Version: "2.3.4", PanicIfEmpty: false, - DefaultValue: "1000", + DefaultValue: "10", } p.UpdateChannelCheckpointMaxParallel.Init(base.mgr) + p.UpdateChannelCheckpointInterval = ParamItem{ + Key: "datanode.channel.updateChannelCheckpointInterval", + Version: "2.4.0", + Doc: "the interval duration(in seconds) for datanode to update channel checkpoint of each channel", + DefaultValue: "60", + } + p.UpdateChannelCheckpointInterval.Init(base.mgr) + + p.UpdateChannelCheckpointRPCTimeout = ParamItem{ + Key: "datanode.channel.updateChannelCheckpointRPCTimeout", + Version: "2.4.0", + Doc: "timeout in seconds for UpdateChannelCheckpoint RPC call", + DefaultValue: "20", + } + p.UpdateChannelCheckpointRPCTimeout.Init(base.mgr) + + p.MaxChannelCheckpointsPerRPC = ParamItem{ + Key: "datanode.channel.maxChannelCheckpointsPerPRC", + Version: "2.4.0", + Doc: "The maximum number of channel checkpoints per UpdateChannelCheckpoint RPC.", + DefaultValue: "128", + } + p.MaxChannelCheckpointsPerRPC.Init(base.mgr) + + p.ChannelCheckpointUpdateTickInSeconds = ParamItem{ + Key: "datanode.channel.channelCheckpointUpdateTickInSeconds", + Version: "2.4.0", + Doc: "The frequency, in seconds, at which the channel checkpoint updater executes updates.", + DefaultValue: "10", + } + p.ChannelCheckpointUpdateTickInSeconds.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "datanode.gracefulStopTimeout", Version: "2.3.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 16ee3d0854..0726fb82ce 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -426,7 +426,9 @@ func TestComponentParam(t *testing.T) { updateChannelCheckpointMaxParallel := Params.UpdateChannelCheckpointMaxParallel.GetAsInt() t.Logf("updateChannelCheckpointMaxParallel: %d", updateChannelCheckpointMaxParallel) - assert.Equal(t, 1000, Params.UpdateChannelCheckpointMaxParallel.GetAsInt()) + assert.Equal(t, 10, Params.UpdateChannelCheckpointMaxParallel.GetAsInt()) + assert.Equal(t, 128, Params.MaxChannelCheckpointsPerRPC.GetAsInt()) + assert.Equal(t, 10*time.Second, Params.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second)) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))