enhance: Set WatchProgress for channel operation progress response (#35183)

The watch progress is always zero in CheckChannelOperationProgress
response, which is meaningless and confusing. This PR set progress value
in rpc response to fix this problem.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/35099/head
congqixia 2024-08-02 12:06:14 +08:00 committed by GitHub
parent 3655ab10b2
commit 095d77269b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 32 additions and 13 deletions

View File

@ -84,7 +84,7 @@ func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error {
// skip enqueue datacoord re-submit the same operations
if runner, ok := m.opRunners.Get(channel); ok {
if runner.Exist(info.GetOpID()) {
if _, exists := runner.Exist(info.GetOpID()); exists {
log.Warn("op already exist, skip", zap.Int64("opID", info.GetOpID()), zap.String("channel", channel))
return nil
}
@ -125,7 +125,8 @@ func (m *ChannelManagerImpl) GetProgress(info *datapb.ChannelWatchInfo) *datapb.
}
if runner, ok := m.opRunners.Get(channel); ok {
if runner.Exist(info.GetOpID()) {
if progress, exists := runner.Exist(info.GetOpID()); exists {
resp.Progress = progress
resp.State = datapb.ChannelWatchState_ToWatch
} else {
resp.State = datapb.ChannelWatchState_WatchFailure
@ -140,9 +141,13 @@ func (m *ChannelManagerImpl) GetProgress(info *datapb.ChannelWatchInfo) *datapb.
resp.State = datapb.ChannelWatchState_ReleaseSuccess
return resp
}
if runner, ok := m.opRunners.Get(channel); ok && runner.Exist(info.GetOpID()) {
resp.State = datapb.ChannelWatchState_ToRelease
return resp
runner, ok := m.opRunners.Get(channel)
if ok {
_, exists := runner.Exist(info.GetOpID())
if exists {
resp.State = datapb.ChannelWatchState_ToRelease
return resp
}
}
resp.State = datapb.ChannelWatchState_ReleaseFailure
@ -278,11 +283,17 @@ func (r *opRunner) FinishOp(opID util.UniqueID) {
delete(r.allOps, opID)
}
func (r *opRunner) Exist(opID util.UniqueID) bool {
func (r *opRunner) Exist(opID util.UniqueID) (progress int32, exists bool) {
r.guard.RLock()
defer r.guard.RUnlock()
_, ok := r.allOps[opID]
return ok
info, ok := r.allOps[opID]
if !ok {
return -1, false
}
if info.tickler == nil {
return 0, true
}
return info.tickler.Progress(), true
}
func (r *opRunner) Enqueue(info *datapb.ChannelWatchInfo) error {
@ -321,6 +332,17 @@ func (r *opRunner) Execute(info *datapb.ChannelWatchInfo) *opState {
return r.releaseWithTimer(r.releaseFunc, info.GetVchan().GetChannelName(), info.GetOpID())
}
func (r *opRunner) updateTickler(opID int64, tickler *util.Tickler) bool {
r.guard.Lock()
defer r.guard.Unlock()
opInfo, ok := r.allOps[opID]
if !ok {
return false
}
opInfo.tickler = tickler
return true
}
// watchWithTimer will return WatchFailure after WatchTimeoutInterval
func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {
opState := &opState{
@ -329,15 +351,12 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {
}
log := log.With(zap.String("channel", opState.channel), zap.Int64("opID", opState.opID))
r.guard.Lock()
opInfo, ok := r.allOps[info.GetOpID()]
r.guard.Unlock()
tickler := util.NewTickler()
ok := r.updateTickler(info.GetOpID(), tickler)
if !ok {
opState.state = datapb.ChannelWatchState_WatchFailure
return opState
}
tickler := util.NewTickler()
opInfo.tickler = tickler
var (
successSig = make(chan struct{}, 1)