mirror of https://github.com/milvus-io/milvus.git
Use atomic.bool for the isWatchFailed param in the tickler (#26558)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/25656/head
parent
148446cfb9
commit
b9bc6681ae
|
@ -123,7 +123,7 @@ func newDataSyncService(ctx context.Context,
|
|||
if err := service.initNodes(vchan, tickler); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tickler.isWatchFailed {
|
||||
if tickler.isWatchFailed.Load() {
|
||||
return nil, errors.Errorf("tickler watch failed")
|
||||
}
|
||||
return service, nil
|
||||
|
|
|
@ -116,7 +116,7 @@ type tickler struct {
|
|||
interval time.Duration
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
isWatchFailed bool
|
||||
isWatchFailed *atomic.Bool
|
||||
}
|
||||
|
||||
func (t *tickler) inc() {
|
||||
|
@ -151,7 +151,7 @@ func (t *tickler) watch() {
|
|||
zap.String("vChanName", t.watchInfo.Vchan.ChannelName),
|
||||
zap.Int32("progree", nowProgress),
|
||||
zap.Error(err))
|
||||
t.isWatchFailed = true
|
||||
t.isWatchFailed.Store(true)
|
||||
return
|
||||
}
|
||||
success, err := t.kv.CompareVersionAndSwap(t.path, t.version, string(v))
|
||||
|
@ -164,7 +164,7 @@ func (t *tickler) watch() {
|
|||
log.Error("tickler update failed: failed to compare version and swap",
|
||||
zap.String("key", t.path), zap.Int32("progress", nowProgress), zap.Int64("version", t.version),
|
||||
zap.String("vChanName", t.watchInfo.GetVchan().ChannelName))
|
||||
t.isWatchFailed = true
|
||||
t.isWatchFailed.Store(true)
|
||||
return
|
||||
}
|
||||
log.Debug("tickler update success", zap.Int32("progress", nowProgress), zap.Int64("version", t.version),
|
||||
|
@ -184,12 +184,13 @@ func (t *tickler) stop() {
|
|||
|
||||
func newTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.MetaKv, interval time.Duration) *tickler {
|
||||
return &tickler{
|
||||
progress: atomic.NewInt32(0),
|
||||
path: path,
|
||||
kv: kv,
|
||||
watchInfo: watchInfo,
|
||||
version: version,
|
||||
interval: interval,
|
||||
closeCh: make(chan struct{}),
|
||||
progress: atomic.NewInt32(0),
|
||||
path: path,
|
||||
kv: kv,
|
||||
watchInfo: watchInfo,
|
||||
version: version,
|
||||
interval: interval,
|
||||
closeCh: make(chan struct{}),
|
||||
isWatchFailed: atomic.NewBool(false),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue