From aa6212d5b0e4be3124a4edc9cf205a1771983cf8 Mon Sep 17 00:00:00 2001 From: wayblink Date: Thu, 9 Mar 2023 14:13:52 +0800 Subject: [PATCH] [Cherry-pick] Fix channel checkpoint issues and fix GC (#22559) Signed-off-by: wayblink Co-authored-by: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> --- internal/datacoord/garbage_collector.go | 33 +++- internal/datacoord/garbage_collector_test.go | 14 +- internal/datacoord/handler.go | 9 +- internal/datacoord/meta.go | 5 +- internal/datacoord/services.go | 6 + internal/datanode/channel_meta.go | 3 +- .../datanode/flow_graph_time_tick_node.go | 11 +- internal/metastore/catalog.go | 4 +- internal/metastore/kv/datacoord/constant.go | 3 +- internal/metastore/kv/datacoord/kv_catalog.go | 22 ++- .../metastore/kv/datacoord/kv_catalog_test.go | 21 +++ internal/metastore/mocks/DataCoordCatalog.go | 154 +++++++++++++----- internal/mocks/mock_datacoord.go | 2 +- internal/mocks/mock_datacoord_catalog.go | 154 +++++++++++++----- 14 files changed, 338 insertions(+), 103 deletions(-) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 91bfd140ac..3dbb142327 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -279,25 +279,42 @@ func (gc *garbageCollector) clearEtcd() { if !gc.isExpire(segment.GetDroppedAt()) { continue } - // segment gc shall only happen when channel cp is after segment dml cp. - if segment.GetDmlPosition().GetTimestamp() > channelCPs[segment.GetInsertChannel()] { - log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60).RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc", - zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()), - zap.Uint64("channelCpTs", channelCPs[segment.GetInsertChannel()]), - ) + segInsertChannel := segment.GetInsertChannel() + // Ignore segments from potentially dropped collection. Check if collection is to be dropped by checking if channel is dropped. + // We do this because collection meta drop relies on all segment being GCed. + if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) && + segment.GetDmlPosition().GetTimestamp() > channelCPs[segInsertChannel] { + // segment gc shall only happen when channel cp is after segment dml cp. + log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60). + RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc", + zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()), + zap.Uint64("channelCpTs", channelCPs[segInsertChannel]), + ) continue } // For compact A, B -> C, don't GC A or B if C is not indexed, // guarantee replacing A, B with C won't downgrade performance if to, ok := compactTo[segment.GetID()]; ok && !indexedSet.Contain(to.GetID()) { + log.WithRateGroup("GC_FAIL_COMPACT_TO_NOT_INDEXED", 1, 60). + RatedWarn(60, "skipping GC when compact target segment is not indexed", + zap.Int64("segmentID", to.GetID())) continue } logs := getLogs(segment) - log.Info("GC segment", - zap.Int64("segmentID", segment.GetID())) + log.Info("GC segment", zap.Int64("segmentID", segment.GetID())) if gc.removeLogs(logs) { _ = gc.meta.DropSegment(segment.GetID()) } + if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 && + !gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) { + log.Info("empty channel found during gc, manually cleanup channel checkpoints", + zap.String("vChannel", segInsertChannel)) + + if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil { + // Fail-open as there's nothing to do. + log.Warn("failed to drop channel check point during segment garbage collection", zap.Error(err)) + } + } } } diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index ba3a875f06..4aa88161e5 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -903,6 +903,14 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { func TestGarbageCollector_clearETCD(t *testing.T) { catalog := catalogmocks.NewDataCoordCatalog(t) + catalog.On("ChannelExists", + mock.Anything, + mock.Anything, + ).Return(false) + catalog.On("DropChannelCheckpoint", + mock.Anything, + mock.Anything, + ).Return(nil) catalog.On("CreateSegmentIndex", mock.Anything, mock.Anything, @@ -1151,7 +1159,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { segE := gc.meta.GetSegment(segID + 4) assert.NotNil(t, segE) segF := gc.meta.GetSegment(segID + 5) - assert.NotNil(t, segF) + assert.Nil(t, segF) err := gc.meta.AddSegmentIndex(&model.SegmentIndex{ SegmentID: segID + 4, @@ -1184,7 +1192,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { segE = gc.meta.GetSegment(segID + 4) assert.NotNil(t, segE) segF = gc.meta.GetSegment(segID + 5) - assert.NotNil(t, segF) + assert.Nil(t, segF) gc.clearEtcd() segA = gc.meta.GetSegment(segID) @@ -1192,6 +1200,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) { segB = gc.meta.GetSegment(segID + 1) assert.Nil(t, segB) segF = gc.meta.GetSegment(segID + 5) - assert.NotNil(t, segF) + assert.Nil(t, segF) } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 13dc79bfde..9f9b997620 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -348,7 +348,7 @@ func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { } } return false*/ - return h.s.meta.catalog.IsChannelDropped(h.s.ctx, channel) + return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) } // FinishDropChannel cleans up the remove flag for channels @@ -359,10 +359,7 @@ func (h *ServerHandler) FinishDropChannel(channel string) error { log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err)) return err } - err = h.s.meta.DropChannelCheckpoint(channel) - if err != nil { - log.Warn("DropChannelCheckpoint failed", zap.String("vChannel", channel), zap.Error(err)) - return err - } + log.Info("DropChannel succeeded", zap.String("vChannel", channel)) + // Channel checkpoints are cleaned up during garbage collection. return nil } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 2bd49ad824..236a67cdd2 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1271,7 +1271,10 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) } m.channelCPs[vChannel] = pos ts, _ := tsoutil.ParseTS(pos.Timestamp) - log.Debug("UpdateChannelCheckpoint done", zap.String("vChannel", vChannel), zap.Time("time", ts)) + log.Debug("UpdateChannelCheckpoint done", + zap.String("vChannel", vChannel), + zap.Uint64("ts", pos.Timestamp), + zap.Time("time", ts)) } return nil } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 107829ac5d..3151395be8 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1124,6 +1124,12 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq resp.Status.Reason = err.Error() return resp, nil } + if err := s.meta.catalog.MarkChannelAdded(ctx, ch.Name); err != nil { + // TODO: add background task to periodically cleanup the orphaned channel add marks. + log.Error("failed to mark channel added", zap.String("channelName", channelName), zap.Error(err)) + resp.Status.Reason = err.Error() + return resp, nil + } } resp.Status.ErrorCode = commonpb.ErrorCode_Success diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 5d5b8e6611..ea0fca032d 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -701,7 +701,8 @@ func (c *ChannelMeta) getChannelCheckpoint(ttPos *msgpb.MsgPosition) *msgpb.MsgP zap.Bool("isCurIBEmpty", seg.curInsertBuf == nil), zap.Bool("isCurDBEmpty", seg.curDeleteBuf == nil), zap.Int("len(hisIB)", len(seg.historyInsertBuf)), - zap.Int("len(hisDB)", len(seg.historyDeleteBuf))) + zap.Int("len(hisDB)", len(seg.historyDeleteBuf)), + zap.Any("newChannelCpTs", channelCP.GetTimestamp())) } // 2. if no data in buffer, use the current tt as channelCP if channelCP.MsgID == nil { diff --git a/internal/datanode/flow_graph_time_tick_node.go b/internal/datanode/flow_graph_time_tick_node.go index be78cdd3aa..aab71bbbd4 100644 --- a/internal/datanode/flow_graph_time_tick_node.go +++ b/internal/datanode/flow_graph_time_tick_node.go @@ -72,6 +72,12 @@ func (ttn *ttNode) IsValidInMsg(in []Msg) bool { func (ttn *ttNode) Operate(in []Msg) []Msg { fgMsg := in[0].(*flowGraphMsg) if fgMsg.IsCloseMsg() { + if len(fgMsg.endPositions) > 0 { + log.Info("flowgraph is closing, force update channel CP", + zap.Uint64("endTs", fgMsg.endPositions[0].GetTimestamp()), + zap.String("channel", fgMsg.endPositions[0].GetChannelName())) + ttn.updateChannelCP(fgMsg.endPositions[0]) + } return in } @@ -107,7 +113,10 @@ func (ttn *ttNode) updateChannelCP(ttPos *msgpb.MsgPosition) { return } - log.Info("UpdateChannelCheckpoint success", zap.String("channel", ttn.vChannelName), zap.Time("channelCPTs", channelCPTs)) + log.Info("UpdateChannelCheckpoint success", + zap.String("channel", ttn.vChannelName), + zap.Uint64("cpTs", channelPos.Timestamp), + zap.Time("cpTime", channelCPTs)) } func newTTNode(config *nodeConfig, dc types.DataCoord) (*ttNode, error) { diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index e7f245e1f4..ad7861552a 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -111,8 +111,10 @@ type DataCoordCatalog interface { DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error + MarkChannelAdded(ctx context.Context, channel string) error MarkChannelDeleted(ctx context.Context, channel string) error - IsChannelDropped(ctx context.Context, channel string) bool + ShouldDropChannel(ctx context.Context, channel string) bool + ChannelExists(ctx context.Context, channel string) bool DropChannel(ctx context.Context, channel string) error ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) diff --git a/internal/metastore/kv/datacoord/constant.go b/internal/metastore/kv/datacoord/constant.go index 077432b22b..1d2e2aa853 100644 --- a/internal/metastore/kv/datacoord/constant.go +++ b/internal/metastore/kv/datacoord/constant.go @@ -25,5 +25,6 @@ const ( ChannelRemovePrefix = MetaPrefix + "/channel-removal" ChannelCheckpointPrefix = MetaPrefix + "/channel-cp" - RemoveFlagTomestone = "removed" + NonRemoveFlagTomestone = "non-removed" + RemoveFlagTomestone = "removed" ) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 3a3fac26c5..3b4db22f3c 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -472,6 +472,17 @@ func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) return nil } +func (kc *Catalog) MarkChannelAdded(ctx context.Context, channel string) error { + key := buildChannelRemovePath(channel) + err := kc.MetaKv.Save(key, NonRemoveFlagTomestone) + if err != nil { + log.Error("failed to mark channel added", zap.String("channel", channel), zap.Error(err)) + return err + } + log.Info("NON remove flag tombstone added", zap.String("channel", channel)) + return nil +} + func (kc *Catalog) MarkChannelDeleted(ctx context.Context, channel string) error { key := buildChannelRemovePath(channel) err := kc.MetaKv.Save(key, RemoveFlagTomestone) @@ -479,11 +490,11 @@ func (kc *Catalog) MarkChannelDeleted(ctx context.Context, channel string) error log.Error("Failed to mark channel dropped", zap.String("channel", channel), zap.Error(err)) return err } - + log.Info("remove flag tombstone added", zap.String("channel", channel)) return nil } -func (kc *Catalog) IsChannelDropped(ctx context.Context, channel string) bool { +func (kc *Catalog) ShouldDropChannel(ctx context.Context, channel string) bool { key := buildChannelRemovePath(channel) v, err := kc.MetaKv.Load(key) if err != nil || v != RemoveFlagTomestone { @@ -492,9 +503,16 @@ func (kc *Catalog) IsChannelDropped(ctx context.Context, channel string) bool { return true } +func (kc *Catalog) ChannelExists(ctx context.Context, channel string) bool { + key := buildChannelRemovePath(channel) + v, err := kc.MetaKv.Load(key) + return err == nil && v == NonRemoveFlagTomestone +} + // DropChannel removes channel remove flag after whole procedure is finished func (kc *Catalog) DropChannel(ctx context.Context, channel string) error { key := buildChannelRemovePath(channel) + log.Info("removing channel remove path", zap.String("channel", channel)) return kc.MetaKv.Remove(key) } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index a94417f5f1..5c96b9ebe2 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -749,6 +749,27 @@ func Test_MarkChannelDeleted_SaveError(t *testing.T) { assert.Error(t, err) } +func Test_MarkChannelAdded_SaveError(t *testing.T) { + txn := mocks.NewMetaKv(t) + txn.EXPECT(). + Save(mock.Anything, mock.Anything). + Return(errors.New("mock error")) + + catalog := NewCatalog(txn, rootPath, "") + err := catalog.MarkChannelAdded(context.TODO(), "test_channel_1") + assert.Error(t, err) +} + +func Test_ChannelExists_SaveError(t *testing.T) { + txn := mocks.NewMetaKv(t) + txn.EXPECT(). + Load(mock.Anything). + Return("", errors.New("mock error")) + + catalog := NewCatalog(txn, rootPath, "") + assert.False(t, catalog.ChannelExists(context.TODO(), "test_channel_1")) +} + func Test_parseBinlogKey(t *testing.T) { catalog := NewCatalog(nil, "", "") diff --git a/internal/metastore/mocks/DataCoordCatalog.go b/internal/metastore/mocks/DataCoordCatalog.go index 384e529ac0..7502f12c15 100644 --- a/internal/metastore/mocks/DataCoordCatalog.go +++ b/internal/metastore/mocks/DataCoordCatalog.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.16.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package mocks @@ -333,6 +333,44 @@ func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error) return _c } +// ChannelExists provides a mock function with given fields: ctx, channel +func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool { + ret := _m.Called(ctx, channel) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, channel) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// DataCoordCatalog_ChannelExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChannelExists' +type DataCoordCatalog_ChannelExists_Call struct { + *mock.Call +} + +// ChannelExists is a helper method to define mock.On call +// - ctx context.Context +// - channel string +func (_e *DataCoordCatalog_Expecter) ChannelExists(ctx interface{}, channel interface{}) *DataCoordCatalog_ChannelExists_Call { + return &DataCoordCatalog_ChannelExists_Call{Call: _e.mock.On("ChannelExists", ctx, channel)} +} + +func (_c *DataCoordCatalog_ChannelExists_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ChannelExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalog_ChannelExists_Call { + _c.Call.Return(_a0) + return _c +} + // CreateIndex provides a mock function with given fields: ctx, index func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error { ret := _m.Called(ctx, index) @@ -642,44 +680,6 @@ func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_Gc return _c } -// IsChannelDropped provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) IsChannelDropped(ctx context.Context, channel string) bool { - ret := _m.Called(ctx, channel) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// DataCoordCatalog_IsChannelDropped_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsChannelDropped' -type DataCoordCatalog_IsChannelDropped_Call struct { - *mock.Call -} - -// IsChannelDropped is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) IsChannelDropped(ctx interface{}, channel interface{}) *DataCoordCatalog_IsChannelDropped_Call { - return &DataCoordCatalog_IsChannelDropped_Call{Call: _e.mock.On("IsChannelDropped", ctx, channel)} -} - -func (_c *DataCoordCatalog_IsChannelDropped_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_IsChannelDropped_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_IsChannelDropped_Call) Return(_a0 bool) *DataCoordCatalog_IsChannelDropped_Call { - _c.Call.Return(_a0) - return _c -} - // ListChannelCheckpoint provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { ret := _m.Called(ctx) @@ -864,6 +864,44 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo, return _c } +// MarkChannelAdded provides a mock function with given fields: ctx, channel +func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error { + ret := _m.Called(ctx, channel) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, channel) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataCoordCatalog_MarkChannelAdded_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkChannelAdded' +type DataCoordCatalog_MarkChannelAdded_Call struct { + *mock.Call +} + +// MarkChannelAdded is a helper method to define mock.On call +// - ctx context.Context +// - channel string +func (_e *DataCoordCatalog_Expecter) MarkChannelAdded(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelAdded_Call { + return &DataCoordCatalog_MarkChannelAdded_Call{Call: _e.mock.On("MarkChannelAdded", ctx, channel)} +} + +func (_c *DataCoordCatalog_MarkChannelAdded_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_MarkChannelAdded_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCatalog_MarkChannelAdded_Call { + _c.Call.Return(_a0) + return _c +} + // MarkChannelDeleted provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -1018,6 +1056,44 @@ func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *D return _c } +// ShouldDropChannel provides a mock function with given fields: ctx, channel +func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool { + ret := _m.Called(ctx, channel) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, channel) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// DataCoordCatalog_ShouldDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldDropChannel' +type DataCoordCatalog_ShouldDropChannel_Call struct { + *mock.Call +} + +// ShouldDropChannel is a helper method to define mock.On call +// - ctx context.Context +// - channel string +func (_e *DataCoordCatalog_Expecter) ShouldDropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_ShouldDropChannel_Call { + return &DataCoordCatalog_ShouldDropChannel_Call{Call: _e.mock.On("ShouldDropChannel", ctx, channel)} +} + +func (_c *DataCoordCatalog_ShouldDropChannel_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ShouldDropChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCatalog_ShouldDropChannel_Call { + _c.Call.Return(_a0) + return _c +} + type mockConstructorTestingTNewDataCoordCatalog interface { mock.TestingT Cleanup(func()) diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index a0a39a4dc2..f7b62b33f8 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.16.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package mocks diff --git a/internal/mocks/mock_datacoord_catalog.go b/internal/mocks/mock_datacoord_catalog.go index b2cb20211b..c198407026 100644 --- a/internal/mocks/mock_datacoord_catalog.go +++ b/internal/mocks/mock_datacoord_catalog.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package mocks @@ -332,6 +332,44 @@ func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error) return _c } +// ChannelExists provides a mock function with given fields: ctx, channel +func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool { + ret := _m.Called(ctx, channel) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, channel) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// DataCoordCatalog_ChannelExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChannelExists' +type DataCoordCatalog_ChannelExists_Call struct { + *mock.Call +} + +// ChannelExists is a helper method to define mock.On call +// - ctx context.Context +// - channel string +func (_e *DataCoordCatalog_Expecter) ChannelExists(ctx interface{}, channel interface{}) *DataCoordCatalog_ChannelExists_Call { + return &DataCoordCatalog_ChannelExists_Call{Call: _e.mock.On("ChannelExists", ctx, channel)} +} + +func (_c *DataCoordCatalog_ChannelExists_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ChannelExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalog_ChannelExists_Call { + _c.Call.Return(_a0) + return _c +} + // CreateIndex provides a mock function with given fields: ctx, index func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error { ret := _m.Called(ctx, index) @@ -641,44 +679,6 @@ func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_Gc return _c } -// IsChannelDropped provides a mock function with given fields: ctx, channel -func (_m *DataCoordCatalog) IsChannelDropped(ctx context.Context, channel string) bool { - ret := _m.Called(ctx, channel) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { - r0 = rf(ctx, channel) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// DataCoordCatalog_IsChannelDropped_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsChannelDropped' -type DataCoordCatalog_IsChannelDropped_Call struct { - *mock.Call -} - -// IsChannelDropped is a helper method to define mock.On call -// - ctx context.Context -// - channel string -func (_e *DataCoordCatalog_Expecter) IsChannelDropped(ctx interface{}, channel interface{}) *DataCoordCatalog_IsChannelDropped_Call { - return &DataCoordCatalog_IsChannelDropped_Call{Call: _e.mock.On("IsChannelDropped", ctx, channel)} -} - -func (_c *DataCoordCatalog_IsChannelDropped_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_IsChannelDropped_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *DataCoordCatalog_IsChannelDropped_Call) Return(_a0 bool) *DataCoordCatalog_IsChannelDropped_Call { - _c.Call.Return(_a0) - return _c -} - // ListChannelCheckpoint provides a mock function with given fields: ctx func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { ret := _m.Called(ctx) @@ -863,6 +863,44 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo, return _c } +// MarkChannelAdded provides a mock function with given fields: ctx, channel +func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error { + ret := _m.Called(ctx, channel) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, channel) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataCoordCatalog_MarkChannelAdded_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkChannelAdded' +type DataCoordCatalog_MarkChannelAdded_Call struct { + *mock.Call +} + +// MarkChannelAdded is a helper method to define mock.On call +// - ctx context.Context +// - channel string +func (_e *DataCoordCatalog_Expecter) MarkChannelAdded(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelAdded_Call { + return &DataCoordCatalog_MarkChannelAdded_Call{Call: _e.mock.On("MarkChannelAdded", ctx, channel)} +} + +func (_c *DataCoordCatalog_MarkChannelAdded_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_MarkChannelAdded_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCatalog_MarkChannelAdded_Call { + _c.Call.Return(_a0) + return _c +} + // MarkChannelDeleted provides a mock function with given fields: ctx, channel func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error { ret := _m.Called(ctx, channel) @@ -1017,6 +1055,44 @@ func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *D return _c } +// ShouldDropChannel provides a mock function with given fields: ctx, channel +func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool { + ret := _m.Called(ctx, channel) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, channel) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// DataCoordCatalog_ShouldDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldDropChannel' +type DataCoordCatalog_ShouldDropChannel_Call struct { + *mock.Call +} + +// ShouldDropChannel is a helper method to define mock.On call +// - ctx context.Context +// - channel string +func (_e *DataCoordCatalog_Expecter) ShouldDropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_ShouldDropChannel_Call { + return &DataCoordCatalog_ShouldDropChannel_Call{Call: _e.mock.On("ShouldDropChannel", ctx, channel)} +} + +func (_c *DataCoordCatalog_ShouldDropChannel_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ShouldDropChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCatalog_ShouldDropChannel_Call { + _c.Call.Return(_a0) + return _c +} + type mockConstructorTestingTNewDataCoordCatalog interface { mock.TestingT Cleanup(func())