enhance: Add comment for channel cp updater (#33759)

/kind enhancement

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/33805/head
yihao.dai 2024-06-12 20:01:55 +08:00 committed by GitHub
parent b90999b741
commit 9a3e4080f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 7 additions and 1 deletions

View File

@ -38,7 +38,7 @@ const (
type channelCPUpdateTask struct {
pos *msgpb.MsgPosition
callback func()
flush bool
flush bool // indicates whether the task originates from flush
}
type channelCheckpointUpdater struct {
@ -75,6 +75,7 @@ func (ccu *channelCheckpointUpdater) start() {
ccu.mu.Lock()
for _, task := range ccu.tasks {
if task.flush {
// reset flush flag to make next flush valid
task.flush = false
tasks = append(tasks, task)
}
@ -143,6 +144,7 @@ func (ccu *channelCheckpointUpdater) updateCheckpoints(tasks []*channelCPUpdateT
defer ccu.mu.Unlock()
finished.Range(func(_ string, task *channelCPUpdateTask) bool {
channel := task.pos.GetChannelName()
// delete the task if no new task has been added
if ccu.tasks[channel].pos.GetTimestamp() <= task.pos.GetTimestamp() {
delete(ccu.tasks, channel)
}
@ -164,6 +166,7 @@ func (ccu *channelCheckpointUpdater) AddTask(channelPos *msgpb.MsgPosition, flus
return
}
if flush {
// trigger update to accelerate flush
defer ccu.trigger()
}
channel := channelPos.GetChannelName()
@ -185,6 +188,8 @@ func (ccu *channelCheckpointUpdater) AddTask(channelPos *msgpb.MsgPosition, flus
}
return b
}
// 1. `task.pos.GetTimestamp() < channelPos.GetTimestamp()`: position updated, update task position
// 2. `flush && !task.flush`: position not being updated, but flush is triggered, update task flush flag
if task.pos.GetTimestamp() < channelPos.GetTimestamp() || (flush && !task.flush) {
ccu.mu.Lock()
defer ccu.mu.Unlock()

View File

@ -117,6 +117,7 @@ func (ttn *ttNode) Operate(in []Msg) []Msg {
func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time, flush bool) {
callBack := func() {
channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp())
// reset flush ts to prevent frequent flush
ttn.writeBufferManager.NotifyCheckpointUpdated(ttn.vChannelName, channelPos.GetTimestamp())
log.Debug("UpdateChannelCheckpoint success",
zap.String("channel", ttn.vChannelName),