fix: Skip updating checkpoint after dropcollection (#29220)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/29251/head
XuanYang-cn 2023-12-15 16:04:45 +08:00 committed by GitHub
parent 26409d801e
commit 5164377e68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 1 deletions

View File

@ -115,7 +115,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.Info("ddNode in dropMode",
log.RatedInfo(1.0, "ddNode in dropMode",
zap.String("vChannelName", ddn.vChannelName),
zap.Int64("collectionID", ddn.collectionID))
return []Msg{}

View File

@ -42,6 +42,7 @@ type ttNode struct {
writeBufferManager writebuffer.BufferManager
lastUpdateTime *atomic.Time
cpUpdater *channelCheckpointUpdater
dropMode *atomic.Bool
}
// Name returns node name, implementing flowgraph.Node
@ -67,6 +68,17 @@ func (ttn *ttNode) Close() {
// Operate handles input messages, implementing flowgraph.Node
func (ttn *ttNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
if fgMsg.dropCollection {
ttn.dropMode.Store(true)
}
// skip updating checkpoint for drop collection
// even if its the close msg
if ttn.dropMode.Load() {
log.RatedInfo(1.0, "ttnode in dropMode", zap.String("channel", ttn.vChannelName))
return []Msg{}
}
curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax)
if fgMsg.IsCloseMsg() {
if len(fgMsg.endPositions) > 0 {
@ -129,6 +141,7 @@ func newTTNode(config *nodeConfig, wbManager writebuffer.BufferManager, cpUpdate
writeBufferManager: wbManager,
lastUpdateTime: atomic.NewTime(time.Time{}), // set to Zero to update channel checkpoint immediately after fg started
cpUpdater: cpUpdater,
dropMode: atomic.NewBool(false),
}
return tt, nil