diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index ae144f5696..3ebe4e7f24 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -68,6 +69,7 @@ type TargetObserver struct { readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers dispatcher *taskDispatcher[int64] + keylocks *lock.KeyLock[int64] stopOnce sync.Once } @@ -90,6 +92,7 @@ func NewTargetObserver( updateChan: make(chan targetUpdateRequest), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), + keylocks: lock.NewKeyLock[int64](), } dispatcher := newTaskDispatcher(result.check) @@ -148,7 +151,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case req := <-ob.updateChan: log := log.With(zap.Int64("collectionID", req.CollectionID)) log.Info("manually trigger update next target") + ob.keylocks.Lock(req.CollectionID) err := ob.updateNextTarget(req.CollectionID) + ob.keylocks.Unlock(req.CollectionID) if err != nil { log.Warn("failed to manually update next target", zap.Error(err)) close(req.ReadyNotifier) @@ -184,6 +189,9 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { return } + ob.keylocks.Lock(collectionID) + defer ob.keylocks.Unlock(collectionID) + if ob.shouldUpdateCurrentTarget(ctx, collectionID) { ob.updateCurrentTarget(collectionID) }