mirror of https://github.com/milvus-io/milvus.git
fix: Fix dynamic release partition may fail search/query request (#35919)
issue: #33550 cause concurrent issue may occur between remove parition in target manager and sync segment list to delegator. when it happens, some segment may be released in delegator, and those segment may also be synced to delegator, which cause delegator become unserviceable due to lack of necessary segments, then search/query fails. this PR make sure that all write access to target_manager will be executed in serial to avoid the concurrent issues. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/36027/head
parent
97ca65c732
commit
75676fbd11
|
@ -95,7 +95,6 @@ func (job *ReleaseCollectionJob) Execute() error {
|
|||
log.Warn(msg, zap.Error(err))
|
||||
}
|
||||
|
||||
job.targetMgr.RemoveCollection(req.GetCollectionID())
|
||||
job.targetObserver.ReleaseCollection(req.GetCollectionID())
|
||||
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
|
||||
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
|
||||
|
@ -178,7 +177,6 @@ func (job *ReleasePartitionJob) Execute() error {
|
|||
if err != nil {
|
||||
log.Warn("failed to remove replicas", zap.Error(err))
|
||||
}
|
||||
job.targetMgr.RemoveCollection(req.GetCollectionID())
|
||||
job.targetObserver.ReleaseCollection(req.GetCollectionID())
|
||||
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
|
||||
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
|
||||
|
@ -189,7 +187,7 @@ func (job *ReleasePartitionJob) Execute() error {
|
|||
log.Warn(msg, zap.Error(err))
|
||||
return errors.Wrap(err, msg)
|
||||
}
|
||||
job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...)
|
||||
job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...)
|
||||
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)
|
||||
}
|
||||
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
|
||||
|
|
|
@ -78,10 +78,9 @@ func (u *UndoList) RollBack() {
|
|||
|
||||
if u.IsTargetUpdated {
|
||||
if u.IsNewCollection {
|
||||
u.targetMgr.RemoveCollection(u.CollectionID)
|
||||
u.targetObserver.ReleaseCollection(u.CollectionID)
|
||||
} else {
|
||||
u.targetMgr.RemovePartition(u.CollectionID, u.LackPartitions...)
|
||||
u.targetObserver.ReleasePartition(u.CollectionID, u.LackPartitions...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -180,7 +180,7 @@ func (ob *CollectionObserver) observeTimeout() {
|
|||
zap.Duration("loadTime", time.Since(collection.CreatedAt)))
|
||||
ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID())
|
||||
ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID())
|
||||
ob.targetMgr.RemoveCollection(collection.GetCollectionID())
|
||||
ob.targetObserver.ReleaseCollection(collection.GetCollectionID())
|
||||
ob.loadTasks.Remove(traceID)
|
||||
}
|
||||
case querypb.LoadType_LoadPartition:
|
||||
|
@ -214,7 +214,7 @@ func (ob *CollectionObserver) observeTimeout() {
|
|||
zap.Int64s("partitionIDs", task.PartitionIDs))
|
||||
for _, partition := range partitions {
|
||||
ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID())
|
||||
ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID())
|
||||
ob.targetObserver.ReleasePartition(partition.GetCollectionID(), partition.GetPartitionID())
|
||||
}
|
||||
|
||||
// all partition timeout, remove collection
|
||||
|
@ -223,7 +223,7 @@ func (ob *CollectionObserver) observeTimeout() {
|
|||
|
||||
ob.meta.CollectionManager.RemoveCollection(task.CollectionID)
|
||||
ob.meta.ReplicaManager.RemoveCollection(task.CollectionID)
|
||||
ob.targetMgr.RemoveCollection(task.CollectionID)
|
||||
ob.targetObserver.ReleaseCollection(task.CollectionID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,15 +38,33 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type checkRequest struct {
|
||||
CollectionID int64
|
||||
Notifier chan bool
|
||||
type targetOp int
|
||||
|
||||
func (op *targetOp) String() string {
|
||||
switch *op {
|
||||
case UpdateCollection:
|
||||
return "UpdateCollection"
|
||||
case ReleaseCollection:
|
||||
return "ReleaseCollection"
|
||||
case ReleasePartition:
|
||||
return "ReleasePartition"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
UpdateCollection targetOp = iota + 1
|
||||
ReleaseCollection
|
||||
ReleasePartition
|
||||
)
|
||||
|
||||
type targetUpdateRequest struct {
|
||||
CollectionID int64
|
||||
PartitionIDs []int64
|
||||
Notifier chan error
|
||||
ReadyNotifier chan struct{}
|
||||
opType targetOp
|
||||
}
|
||||
|
||||
type initRequest struct{}
|
||||
|
@ -60,8 +78,7 @@ type TargetObserver struct {
|
|||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
|
||||
initChan chan initRequest
|
||||
manualCheck chan checkRequest
|
||||
initChan chan initRequest
|
||||
// nextTargetLastUpdate map[int64]time.Time
|
||||
nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time]
|
||||
updateChan chan targetUpdateRequest
|
||||
|
@ -88,9 +105,8 @@ func NewTargetObserver(
|
|||
distMgr: distMgr,
|
||||
broker: broker,
|
||||
cluster: cluster,
|
||||
manualCheck: make(chan checkRequest, 10),
|
||||
nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](),
|
||||
updateChan: make(chan targetUpdateRequest),
|
||||
updateChan: make(chan targetUpdateRequest, 10),
|
||||
readyNotifiers: make(map[int64][]chan struct{}),
|
||||
initChan: make(chan initRequest),
|
||||
keylocks: lock.NewKeyLock[int64](),
|
||||
|
@ -152,23 +168,44 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
|
|||
ob.dispatcher.AddTask(ob.meta.GetAll()...)
|
||||
|
||||
case req := <-ob.updateChan:
|
||||
log := log.With(zap.Int64("collectionID", req.CollectionID))
|
||||
log.Info("manually trigger update next target")
|
||||
ob.keylocks.Lock(req.CollectionID)
|
||||
err := ob.updateNextTarget(req.CollectionID)
|
||||
ob.keylocks.Unlock(req.CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to manually update next target", zap.Error(err))
|
||||
close(req.ReadyNotifier)
|
||||
} else {
|
||||
log.Info("manually trigger update target",
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.String("opType", req.opType.String()),
|
||||
)
|
||||
switch req.opType {
|
||||
case UpdateCollection:
|
||||
ob.keylocks.Lock(req.CollectionID)
|
||||
err := ob.updateNextTarget(req.CollectionID)
|
||||
ob.keylocks.Unlock(req.CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to manually update next target",
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.String("opType", req.opType.String()),
|
||||
zap.Error(err))
|
||||
close(req.ReadyNotifier)
|
||||
} else {
|
||||
ob.mut.Lock()
|
||||
ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier)
|
||||
ob.mut.Unlock()
|
||||
}
|
||||
req.Notifier <- err
|
||||
case ReleaseCollection:
|
||||
ob.mut.Lock()
|
||||
ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier)
|
||||
for _, notifier := range ob.readyNotifiers[req.CollectionID] {
|
||||
close(notifier)
|
||||
}
|
||||
delete(ob.readyNotifiers, req.CollectionID)
|
||||
ob.mut.Unlock()
|
||||
}
|
||||
|
||||
log.Info("manually trigger update target done")
|
||||
req.Notifier <- err
|
||||
log.Info("notify manually trigger update target done")
|
||||
ob.targetMgr.RemoveCollection(req.CollectionID)
|
||||
req.Notifier <- nil
|
||||
case ReleasePartition:
|
||||
ob.targetMgr.RemovePartition(req.CollectionID, req.PartitionIDs...)
|
||||
req.Notifier <- nil
|
||||
}
|
||||
log.Info("manually trigger update target done",
|
||||
zap.Int64("collectionID", req.CollectionID),
|
||||
zap.String("opType", req.opType.String()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -184,14 +221,6 @@ func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partiti
|
|||
}
|
||||
|
||||
func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
|
||||
if !ob.meta.Exist(collectionID) {
|
||||
ob.ReleaseCollection(collectionID)
|
||||
ob.targetMgr.RemoveCollection(collectionID)
|
||||
log.Info("collection has been removed from target observer",
|
||||
zap.Int64("collectionID", collectionID))
|
||||
return
|
||||
}
|
||||
|
||||
ob.keylocks.Lock(collectionID)
|
||||
defer ob.keylocks.Unlock(collectionID)
|
||||
|
||||
|
@ -229,6 +258,7 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e
|
|||
|
||||
ob.updateChan <- targetUpdateRequest{
|
||||
CollectionID: collectionID,
|
||||
opType: UpdateCollection,
|
||||
Notifier: notifier,
|
||||
ReadyNotifier: readyCh,
|
||||
}
|
||||
|
@ -236,12 +266,26 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e
|
|||
}
|
||||
|
||||
func (ob *TargetObserver) ReleaseCollection(collectionID int64) {
|
||||
ob.mut.Lock()
|
||||
defer ob.mut.Unlock()
|
||||
for _, notifier := range ob.readyNotifiers[collectionID] {
|
||||
close(notifier)
|
||||
notifier := make(chan error)
|
||||
defer close(notifier)
|
||||
ob.updateChan <- targetUpdateRequest{
|
||||
CollectionID: collectionID,
|
||||
opType: ReleaseCollection,
|
||||
Notifier: notifier,
|
||||
}
|
||||
delete(ob.readyNotifiers, collectionID)
|
||||
<-notifier
|
||||
}
|
||||
|
||||
func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...int64) {
|
||||
notifier := make(chan error)
|
||||
defer close(notifier)
|
||||
ob.updateChan <- targetUpdateRequest{
|
||||
CollectionID: collectionID,
|
||||
PartitionIDs: partitionID,
|
||||
opType: ReleasePartition,
|
||||
Notifier: notifier,
|
||||
}
|
||||
<-notifier
|
||||
}
|
||||
|
||||
func (ob *TargetObserver) clean() {
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
|
@ -214,6 +215,20 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
|||
}, 7*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func (suite *TargetObserverSuite) TestTriggerRelease() {
|
||||
// Manually update next target
|
||||
_, err := suite.observer.UpdateNextTarget(suite.collectionID)
|
||||
suite.NoError(err)
|
||||
|
||||
// manually release partition
|
||||
partitions := suite.meta.CollectionManager.GetPartitionsByCollection(suite.collectionID)
|
||||
partitionIDs := lo.Map(partitions, func(partition *meta.Partition, _ int) int64 { return partition.PartitionID })
|
||||
suite.observer.ReleasePartition(suite.collectionID, partitionIDs[0])
|
||||
|
||||
// manually release collection
|
||||
suite.observer.ReleaseCollection(suite.collectionID)
|
||||
}
|
||||
|
||||
func (suite *TargetObserverSuite) TearDownTest() {
|
||||
suite.kv.Close()
|
||||
suite.observer.Stop()
|
||||
|
|
Loading…
Reference in New Issue