Fix datanode/datacoord continuous restart (#26470)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/26483/head
SimFG 2023-08-20 21:20:24 +08:00 committed by GitHub
parent c5a1b41f95
commit 3be4ac4022
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 6 deletions

View File

@ -377,8 +377,8 @@ func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID
hasCollection = has
return nil
}, retry.Attempts(500)); err != nil {
log.Error("datacoord ServerHandler HasCollection finally failed")
panic("datacoord ServerHandler HasCollection finally failed")
log.Ctx(ctx2).Error("datacoord ServerHandler HasCollection finally failed", zap.Int64("collectionID", collectionID))
log.Panic("datacoord ServerHandler HasCollection finally failed")
}
return hasCollection, nil
}

View File

@ -123,6 +123,9 @@ func newDataSyncService(ctx context.Context,
if err := service.initNodes(vchan, tickler); err != nil {
return nil, err
}
if tickler.isWatchFailed {
return nil, errors.Errorf("tickler watch failed")
}
return service, nil
}

View File

@ -113,9 +113,10 @@ type tickler struct {
path string
watchInfo *datapb.ChannelWatchInfo
interval time.Duration
closeCh chan struct{}
closeWg sync.WaitGroup
interval time.Duration
closeCh chan struct{}
closeWg sync.WaitGroup
isWatchFailed bool
}
func (t *tickler) inc() {
@ -132,6 +133,7 @@ func (t *tickler) watch() {
t.closeWg.Add(1)
go func() {
defer t.closeWg.Done()
ticker := time.NewTicker(t.interval)
defer ticker.Stop()
for {
@ -149,6 +151,7 @@ func (t *tickler) watch() {
zap.String("vChanName", t.watchInfo.Vchan.ChannelName),
zap.Int32("progree", nowProgress),
zap.Error(err))
t.isWatchFailed = true
return
}
success, err := t.kv.CompareVersionAndSwap(t.path, t.version, string(v))
@ -161,13 +164,13 @@ func (t *tickler) watch() {
log.Error("tickler update failed: failed to compare version and swap",
zap.String("key", t.path), zap.Int32("progress", nowProgress), zap.Int64("version", t.version),
zap.String("vChanName", t.watchInfo.GetVchan().ChannelName))
t.isWatchFailed = true
return
}
log.Debug("tickler update success", zap.Int32("progress", nowProgress), zap.Int64("version", t.version),
zap.String("vChanName", t.watchInfo.GetVchan().ChannelName))
t.version++
case <-t.closeCh:
t.closeWg.Done()
return
}
}