mirror of https://github.com/milvus-io/milvus.git
Cherry-pick from master pr: #31584 See also #30867 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/31665/head
parent
9d9f2cdf4d
commit
35100ba2e5
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -68,6 +69,7 @@ type TargetObserver struct {
|
|||
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers
|
||||
|
||||
dispatcher *taskDispatcher[int64]
|
||||
keylocks *lock.KeyLock[int64]
|
||||
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
@ -90,6 +92,7 @@ func NewTargetObserver(
|
|||
updateChan: make(chan targetUpdateRequest),
|
||||
readyNotifiers: make(map[int64][]chan struct{}),
|
||||
initChan: make(chan initRequest),
|
||||
keylocks: lock.NewKeyLock[int64](),
|
||||
}
|
||||
|
||||
dispatcher := newTaskDispatcher(result.check)
|
||||
|
@ -148,7 +151,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
|
|||
case req := <-ob.updateChan:
|
||||
log := log.With(zap.Int64("collectionID", req.CollectionID))
|
||||
log.Info("manually trigger update next target")
|
||||
ob.keylocks.Lock(req.CollectionID)
|
||||
err := ob.updateNextTarget(req.CollectionID)
|
||||
ob.keylocks.Unlock(req.CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to manually update next target", zap.Error(err))
|
||||
close(req.ReadyNotifier)
|
||||
|
@ -184,6 +189,9 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
|
|||
return
|
||||
}
|
||||
|
||||
ob.keylocks.Lock(collectionID)
|
||||
defer ob.keylocks.Unlock(collectionID)
|
||||
|
||||
if ob.shouldUpdateCurrentTarget(ctx, collectionID) {
|
||||
ob.updateCurrentTarget(collectionID)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue