[Cherry-pick] Fix channel checkpoint issues and fix GC (#22559)

Signed-off-by: wayblink <anyang.wang@zilliz.com>
Co-authored-by: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com>
pull/22651/head
wayblink 2023-03-09 14:13:52 +08:00 committed by GitHub
parent b3287ca5ec
commit aa6212d5b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 338 additions and 103 deletions

View File

@ -279,25 +279,42 @@ func (gc *garbageCollector) clearEtcd() {
if !gc.isExpire(segment.GetDroppedAt()) {
continue
}
// segment gc shall only happen when channel cp is after segment dml cp.
if segment.GetDmlPosition().GetTimestamp() > channelCPs[segment.GetInsertChannel()] {
log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60).RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
zap.Uint64("channelCpTs", channelCPs[segment.GetInsertChannel()]),
)
segInsertChannel := segment.GetInsertChannel()
// Ignore segments from potentially dropped collection. Check if collection is to be dropped by checking if channel is dropped.
// We do this because collection meta drop relies on all segment being GCed.
if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) &&
segment.GetDmlPosition().GetTimestamp() > channelCPs[segInsertChannel] {
// segment gc shall only happen when channel cp is after segment dml cp.
log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60).
RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
zap.Uint64("channelCpTs", channelCPs[segInsertChannel]),
)
continue
}
// For compact A, B -> C, don't GC A or B if C is not indexed,
// guarantee replacing A, B with C won't downgrade performance
if to, ok := compactTo[segment.GetID()]; ok && !indexedSet.Contain(to.GetID()) {
log.WithRateGroup("GC_FAIL_COMPACT_TO_NOT_INDEXED", 1, 60).
RatedWarn(60, "skipping GC when compact target segment is not indexed",
zap.Int64("segmentID", to.GetID()))
continue
}
logs := getLogs(segment)
log.Info("GC segment",
zap.Int64("segmentID", segment.GetID()))
log.Info("GC segment", zap.Int64("segmentID", segment.GetID()))
if gc.removeLogs(logs) {
_ = gc.meta.DropSegment(segment.GetID())
}
if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
log.Info("empty channel found during gc, manually cleanup channel checkpoints",
zap.String("vChannel", segInsertChannel))
if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil {
// Fail-open as there's nothing to do.
log.Warn("failed to drop channel check point during segment garbage collection", zap.Error(err))
}
}
}
}

View File

@ -903,6 +903,14 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
func TestGarbageCollector_clearETCD(t *testing.T) {
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.On("ChannelExists",
mock.Anything,
mock.Anything,
).Return(false)
catalog.On("DropChannelCheckpoint",
mock.Anything,
mock.Anything,
).Return(nil)
catalog.On("CreateSegmentIndex",
mock.Anything,
mock.Anything,
@ -1151,7 +1159,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
segE := gc.meta.GetSegment(segID + 4)
assert.NotNil(t, segE)
segF := gc.meta.GetSegment(segID + 5)
assert.NotNil(t, segF)
assert.Nil(t, segF)
err := gc.meta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: segID + 4,
@ -1184,7 +1192,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
segE = gc.meta.GetSegment(segID + 4)
assert.NotNil(t, segE)
segF = gc.meta.GetSegment(segID + 5)
assert.NotNil(t, segF)
assert.Nil(t, segF)
gc.clearEtcd()
segA = gc.meta.GetSegment(segID)
@ -1192,6 +1200,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
segB = gc.meta.GetSegment(segID + 1)
assert.Nil(t, segB)
segF = gc.meta.GetSegment(segID + 5)
assert.NotNil(t, segF)
assert.Nil(t, segF)
}

View File

@ -348,7 +348,7 @@ func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
}
}
return false*/
return h.s.meta.catalog.IsChannelDropped(h.s.ctx, channel)
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
}
// FinishDropChannel cleans up the remove flag for channels
@ -359,10 +359,7 @@ func (h *ServerHandler) FinishDropChannel(channel string) error {
log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
return err
}
err = h.s.meta.DropChannelCheckpoint(channel)
if err != nil {
log.Warn("DropChannelCheckpoint failed", zap.String("vChannel", channel), zap.Error(err))
return err
}
log.Info("DropChannel succeeded", zap.String("vChannel", channel))
// Channel checkpoints are cleaned up during garbage collection.
return nil
}

View File

@ -1271,7 +1271,10 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
}
m.channelCPs[vChannel] = pos
ts, _ := tsoutil.ParseTS(pos.Timestamp)
log.Debug("UpdateChannelCheckpoint done", zap.String("vChannel", vChannel), zap.Time("time", ts))
log.Debug("UpdateChannelCheckpoint done",
zap.String("vChannel", vChannel),
zap.Uint64("ts", pos.Timestamp),
zap.Time("time", ts))
}
return nil
}

View File

@ -1124,6 +1124,12 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
resp.Status.Reason = err.Error()
return resp, nil
}
if err := s.meta.catalog.MarkChannelAdded(ctx, ch.Name); err != nil {
// TODO: add background task to periodically cleanup the orphaned channel add marks.
log.Error("failed to mark channel added", zap.String("channelName", channelName), zap.Error(err))
resp.Status.Reason = err.Error()
return resp, nil
}
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success

View File

@ -701,7 +701,8 @@ func (c *ChannelMeta) getChannelCheckpoint(ttPos *msgpb.MsgPosition) *msgpb.MsgP
zap.Bool("isCurIBEmpty", seg.curInsertBuf == nil),
zap.Bool("isCurDBEmpty", seg.curDeleteBuf == nil),
zap.Int("len(hisIB)", len(seg.historyInsertBuf)),
zap.Int("len(hisDB)", len(seg.historyDeleteBuf)))
zap.Int("len(hisDB)", len(seg.historyDeleteBuf)),
zap.Any("newChannelCpTs", channelCP.GetTimestamp()))
}
// 2. if no data in buffer, use the current tt as channelCP
if channelCP.MsgID == nil {

View File

@ -72,6 +72,12 @@ func (ttn *ttNode) IsValidInMsg(in []Msg) bool {
func (ttn *ttNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
if fgMsg.IsCloseMsg() {
if len(fgMsg.endPositions) > 0 {
log.Info("flowgraph is closing, force update channel CP",
zap.Uint64("endTs", fgMsg.endPositions[0].GetTimestamp()),
zap.String("channel", fgMsg.endPositions[0].GetChannelName()))
ttn.updateChannelCP(fgMsg.endPositions[0])
}
return in
}
@ -107,7 +113,10 @@ func (ttn *ttNode) updateChannelCP(ttPos *msgpb.MsgPosition) {
return
}
log.Info("UpdateChannelCheckpoint success", zap.String("channel", ttn.vChannelName), zap.Time("channelCPTs", channelCPTs))
log.Info("UpdateChannelCheckpoint success",
zap.String("channel", ttn.vChannelName),
zap.Uint64("cpTs", channelPos.Timestamp),
zap.Time("cpTime", channelCPTs))
}
func newTTNode(config *nodeConfig, dc types.DataCoord) (*ttNode, error) {

View File

@ -111,8 +111,10 @@ type DataCoordCatalog interface {
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error
MarkChannelAdded(ctx context.Context, channel string) error
MarkChannelDeleted(ctx context.Context, channel string) error
IsChannelDropped(ctx context.Context, channel string) bool
ShouldDropChannel(ctx context.Context, channel string) bool
ChannelExists(ctx context.Context, channel string) bool
DropChannel(ctx context.Context, channel string) error
ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error)

View File

@ -25,5 +25,6 @@ const (
ChannelRemovePrefix = MetaPrefix + "/channel-removal"
ChannelCheckpointPrefix = MetaPrefix + "/channel-cp"
RemoveFlagTomestone = "removed"
NonRemoveFlagTomestone = "non-removed"
RemoveFlagTomestone = "removed"
)

View File

@ -472,6 +472,17 @@ func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo)
return nil
}
func (kc *Catalog) MarkChannelAdded(ctx context.Context, channel string) error {
key := buildChannelRemovePath(channel)
err := kc.MetaKv.Save(key, NonRemoveFlagTomestone)
if err != nil {
log.Error("failed to mark channel added", zap.String("channel", channel), zap.Error(err))
return err
}
log.Info("NON remove flag tombstone added", zap.String("channel", channel))
return nil
}
func (kc *Catalog) MarkChannelDeleted(ctx context.Context, channel string) error {
key := buildChannelRemovePath(channel)
err := kc.MetaKv.Save(key, RemoveFlagTomestone)
@ -479,11 +490,11 @@ func (kc *Catalog) MarkChannelDeleted(ctx context.Context, channel string) error
log.Error("Failed to mark channel dropped", zap.String("channel", channel), zap.Error(err))
return err
}
log.Info("remove flag tombstone added", zap.String("channel", channel))
return nil
}
func (kc *Catalog) IsChannelDropped(ctx context.Context, channel string) bool {
func (kc *Catalog) ShouldDropChannel(ctx context.Context, channel string) bool {
key := buildChannelRemovePath(channel)
v, err := kc.MetaKv.Load(key)
if err != nil || v != RemoveFlagTomestone {
@ -492,9 +503,16 @@ func (kc *Catalog) IsChannelDropped(ctx context.Context, channel string) bool {
return true
}
func (kc *Catalog) ChannelExists(ctx context.Context, channel string) bool {
key := buildChannelRemovePath(channel)
v, err := kc.MetaKv.Load(key)
return err == nil && v == NonRemoveFlagTomestone
}
// DropChannel removes channel remove flag after whole procedure is finished
func (kc *Catalog) DropChannel(ctx context.Context, channel string) error {
key := buildChannelRemovePath(channel)
log.Info("removing channel remove path", zap.String("channel", channel))
return kc.MetaKv.Remove(key)
}

View File

@ -749,6 +749,27 @@ func Test_MarkChannelDeleted_SaveError(t *testing.T) {
assert.Error(t, err)
}
func Test_MarkChannelAdded_SaveError(t *testing.T) {
txn := mocks.NewMetaKv(t)
txn.EXPECT().
Save(mock.Anything, mock.Anything).
Return(errors.New("mock error"))
catalog := NewCatalog(txn, rootPath, "")
err := catalog.MarkChannelAdded(context.TODO(), "test_channel_1")
assert.Error(t, err)
}
func Test_ChannelExists_SaveError(t *testing.T) {
txn := mocks.NewMetaKv(t)
txn.EXPECT().
Load(mock.Anything).
Return("", errors.New("mock error"))
catalog := NewCatalog(txn, rootPath, "")
assert.False(t, catalog.ChannelExists(context.TODO(), "test_channel_1"))
}
func Test_parseBinlogKey(t *testing.T) {
catalog := NewCatalog(nil, "", "")

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package mocks
@ -333,6 +333,44 @@ func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error)
return _c
}
// ChannelExists provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_ChannelExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChannelExists'
type DataCoordCatalog_ChannelExists_Call struct {
*mock.Call
}
// ChannelExists is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) ChannelExists(ctx interface{}, channel interface{}) *DataCoordCatalog_ChannelExists_Call {
return &DataCoordCatalog_ChannelExists_Call{Call: _e.mock.On("ChannelExists", ctx, channel)}
}
func (_c *DataCoordCatalog_ChannelExists_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ChannelExists_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalog_ChannelExists_Call {
_c.Call.Return(_a0)
return _c
}
// CreateIndex provides a mock function with given fields: ctx, index
func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error {
ret := _m.Called(ctx, index)
@ -642,44 +680,6 @@ func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_Gc
return _c
}
// IsChannelDropped provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) IsChannelDropped(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_IsChannelDropped_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsChannelDropped'
type DataCoordCatalog_IsChannelDropped_Call struct {
*mock.Call
}
// IsChannelDropped is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) IsChannelDropped(ctx interface{}, channel interface{}) *DataCoordCatalog_IsChannelDropped_Call {
return &DataCoordCatalog_IsChannelDropped_Call{Call: _e.mock.On("IsChannelDropped", ctx, channel)}
}
func (_c *DataCoordCatalog_IsChannelDropped_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_IsChannelDropped_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_IsChannelDropped_Call) Return(_a0 bool) *DataCoordCatalog_IsChannelDropped_Call {
_c.Call.Return(_a0)
return _c
}
// ListChannelCheckpoint provides a mock function with given fields: ctx
func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) {
ret := _m.Called(ctx)
@ -864,6 +864,44 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo,
return _c
}
// MarkChannelAdded provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error {
ret := _m.Called(ctx, channel)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Error(0)
}
return r0
}
// DataCoordCatalog_MarkChannelAdded_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkChannelAdded'
type DataCoordCatalog_MarkChannelAdded_Call struct {
*mock.Call
}
// MarkChannelAdded is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) MarkChannelAdded(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelAdded_Call {
return &DataCoordCatalog_MarkChannelAdded_Call{Call: _e.mock.On("MarkChannelAdded", ctx, channel)}
}
func (_c *DataCoordCatalog_MarkChannelAdded_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_MarkChannelAdded_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCatalog_MarkChannelAdded_Call {
_c.Call.Return(_a0)
return _c
}
// MarkChannelDeleted provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error {
ret := _m.Called(ctx, channel)
@ -1018,6 +1056,44 @@ func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *D
return _c
}
// ShouldDropChannel provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_ShouldDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldDropChannel'
type DataCoordCatalog_ShouldDropChannel_Call struct {
*mock.Call
}
// ShouldDropChannel is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) ShouldDropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_ShouldDropChannel_Call {
return &DataCoordCatalog_ShouldDropChannel_Call{Call: _e.mock.On("ShouldDropChannel", ctx, channel)}
}
func (_c *DataCoordCatalog_ShouldDropChannel_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ShouldDropChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCatalog_ShouldDropChannel_Call {
_c.Call.Return(_a0)
return _c
}
type mockConstructorTestingTNewDataCoordCatalog interface {
mock.TestingT
Cleanup(func())

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package mocks

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package mocks
@ -332,6 +332,44 @@ func (_c *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call) Return(_a0 error)
return _c
}
// ChannelExists provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) ChannelExists(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_ChannelExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChannelExists'
type DataCoordCatalog_ChannelExists_Call struct {
*mock.Call
}
// ChannelExists is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) ChannelExists(ctx interface{}, channel interface{}) *DataCoordCatalog_ChannelExists_Call {
return &DataCoordCatalog_ChannelExists_Call{Call: _e.mock.On("ChannelExists", ctx, channel)}
}
func (_c *DataCoordCatalog_ChannelExists_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ChannelExists_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_ChannelExists_Call) Return(_a0 bool) *DataCoordCatalog_ChannelExists_Call {
_c.Call.Return(_a0)
return _c
}
// CreateIndex provides a mock function with given fields: ctx, index
func (_m *DataCoordCatalog) CreateIndex(ctx context.Context, index *model.Index) error {
ret := _m.Called(ctx, index)
@ -641,44 +679,6 @@ func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_Gc
return _c
}
// IsChannelDropped provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) IsChannelDropped(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_IsChannelDropped_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsChannelDropped'
type DataCoordCatalog_IsChannelDropped_Call struct {
*mock.Call
}
// IsChannelDropped is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) IsChannelDropped(ctx interface{}, channel interface{}) *DataCoordCatalog_IsChannelDropped_Call {
return &DataCoordCatalog_IsChannelDropped_Call{Call: _e.mock.On("IsChannelDropped", ctx, channel)}
}
func (_c *DataCoordCatalog_IsChannelDropped_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_IsChannelDropped_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_IsChannelDropped_Call) Return(_a0 bool) *DataCoordCatalog_IsChannelDropped_Call {
_c.Call.Return(_a0)
return _c
}
// ListChannelCheckpoint provides a mock function with given fields: ctx
func (_m *DataCoordCatalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) {
ret := _m.Called(ctx)
@ -863,6 +863,44 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo,
return _c
}
// MarkChannelAdded provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) MarkChannelAdded(ctx context.Context, channel string) error {
ret := _m.Called(ctx, channel)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Error(0)
}
return r0
}
// DataCoordCatalog_MarkChannelAdded_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkChannelAdded'
type DataCoordCatalog_MarkChannelAdded_Call struct {
*mock.Call
}
// MarkChannelAdded is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) MarkChannelAdded(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelAdded_Call {
return &DataCoordCatalog_MarkChannelAdded_Call{Call: _e.mock.On("MarkChannelAdded", ctx, channel)}
}
func (_c *DataCoordCatalog_MarkChannelAdded_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_MarkChannelAdded_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_MarkChannelAdded_Call) Return(_a0 error) *DataCoordCatalog_MarkChannelAdded_Call {
_c.Call.Return(_a0)
return _c
}
// MarkChannelDeleted provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) MarkChannelDeleted(ctx context.Context, channel string) error {
ret := _m.Called(ctx, channel)
@ -1017,6 +1055,44 @@ func (_c *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call) Return(_a0 error) *D
return _c
}
// ShouldDropChannel provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) ShouldDropChannel(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok {
r0 = rf(ctx, channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_ShouldDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldDropChannel'
type DataCoordCatalog_ShouldDropChannel_Call struct {
*mock.Call
}
// ShouldDropChannel is a helper method to define mock.On call
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) ShouldDropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_ShouldDropChannel_Call {
return &DataCoordCatalog_ShouldDropChannel_Call{Call: _e.mock.On("ShouldDropChannel", ctx, channel)}
}
func (_c *DataCoordCatalog_ShouldDropChannel_Call) Run(run func(ctx context.Context, channel string)) *DataCoordCatalog_ShouldDropChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *DataCoordCatalog_ShouldDropChannel_Call) Return(_a0 bool) *DataCoordCatalog_ShouldDropChannel_Call {
_c.Call.Return(_a0)
return _c
}
type mockConstructorTestingTNewDataCoordCatalog interface {
mock.TestingT
Cleanup(func())