From 837e3162d7ee196c89330b4465865b11b2e4f2af Mon Sep 17 00:00:00 2001 From: yah01 Date: Sat, 14 Jan 2023 21:55:41 +0800 Subject: [PATCH] Fix ready notifiers leak when collection released (#21712) Signed-off-by: yah01 --- internal/querycoordv2/job/job.go | 45 +++++++++++-------- internal/querycoordv2/job/job_test.go | 27 ++++++++--- .../querycoordv2/observers/target_observer.go | 31 +++++++++++++ internal/querycoordv2/services.go | 2 + internal/querycoordv2/services_test.go | 31 ++++++++----- 5 files changed, 100 insertions(+), 36 deletions(-) diff --git a/internal/querycoordv2/job/job.go b/internal/querycoordv2/job/job.go index 2cfb2e4712..643431f8f7 100644 --- a/internal/querycoordv2/job/job.go +++ b/internal/querycoordv2/job/job.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -256,10 +257,11 @@ func (job *LoadCollectionJob) PostExecute() { type ReleaseCollectionJob struct { *BaseJob - req *querypb.ReleaseCollectionRequest - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager + req *querypb.ReleaseCollectionRequest + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver } func NewReleaseCollectionJob(ctx context.Context, @@ -267,13 +269,15 @@ func NewReleaseCollectionJob(ctx context.Context, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager, + targetObserver *observers.TargetObserver, ) *ReleaseCollectionJob { return &ReleaseCollectionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - dist: dist, - meta: meta, - targetMgr: targetMgr, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + dist: dist, + meta: meta, + targetMgr: targetMgr, + targetObserver: targetObserver, } } @@ -301,6 +305,7 @@ func (job *ReleaseCollectionJob) Execute() error { } job.targetMgr.RemoveCollection(req.GetCollectionID()) + job.targetObserver.ReleaseCollection(req.GetCollectionID()) waitCollectionReleased(job.dist, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() return nil @@ -462,10 +467,11 @@ func (job *LoadPartitionJob) PostExecute() { type ReleasePartitionJob struct { *BaseJob - req *querypb.ReleasePartitionsRequest - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager + req *querypb.ReleasePartitionsRequest + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver } func NewReleasePartitionJob(ctx context.Context, @@ -473,13 +479,15 @@ func NewReleasePartitionJob(ctx context.Context, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager, + targetObserver *observers.TargetObserver, ) *ReleasePartitionJob { return &ReleasePartitionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - dist: dist, - meta: meta, - targetMgr: targetMgr, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + dist: dist, + meta: meta, + targetMgr: targetMgr, + targetObserver: targetObserver, } } @@ -527,6 +535,7 @@ func (job *ReleasePartitionJob) Execute() error { log.Warn("failed to remove replicas", zap.Error(err)) } job.targetMgr.RemoveCollection(req.GetCollectionID()) + job.targetObserver.ReleaseCollection(req.GetCollectionID()) waitCollectionReleased(job.dist, req.GetCollectionID()) } else { err := job.meta.CollectionManager.RemovePartition(toRelease...) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 58462ebd5a..c4905caf0f 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/util/etcd" @@ -50,13 +51,14 @@ type JobSuite struct { loadTypes map[int64]querypb.LoadType // Dependencies - kv kv.MetaKv - store meta.Store - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager - broker *meta.MockBroker - nodeMgr *session.NodeManager + kv kv.MetaKv + store meta.Store + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + broker *meta.MockBroker + nodeMgr *session.NodeManager // Test objects scheduler *Scheduler @@ -131,6 +133,11 @@ func (suite *JobSuite) SetupTest() { suite.dist = meta.NewDistributionManager() suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) + suite.targetObserver = observers.NewTargetObserver(suite.meta, + suite.targetMgr, + suite.dist, + suite.broker, + ) suite.nodeMgr = session.NewNodeManager() suite.nodeMgr.Add(&session.NodeInfo{}) suite.scheduler = NewScheduler() @@ -583,6 +590,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.dist, suite.meta, suite.targetMgr, + suite.targetObserver, ) suite.scheduler.Add(job) err := job.Wait() @@ -601,6 +609,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.dist, suite.meta, suite.targetMgr, + suite.targetObserver, ) suite.scheduler.Add(job) err := job.Wait() @@ -626,6 +635,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.dist, suite.meta, suite.targetMgr, + suite.targetObserver, ) suite.scheduler.Add(job) err := job.Wait() @@ -650,6 +660,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.dist, suite.meta, suite.targetMgr, + suite.targetObserver, ) suite.scheduler.Add(job) err := job.Wait() @@ -676,6 +687,7 @@ func (suite *JobSuite) TestReleasePartition() { suite.dist, suite.meta, suite.targetMgr, + suite.targetObserver, ) suite.scheduler.Add(job) err := job.Wait() @@ -843,6 +855,7 @@ func (suite *JobSuite) releaseAll() { suite.dist, suite.meta, suite.targetMgr, + suite.targetObserver, ) suite.scheduler.Add(job) err := job.Wait() diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 195c24f56c..8ab1d2e43c 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -46,6 +46,7 @@ type TargetObserver struct { nextTargetLastUpdate map[int64]time.Time updateChan chan targetUpdateRequest + mut sync.Mutex // Guard readyNotifiers readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers stopOnce sync.Once @@ -91,6 +92,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { return case <-ticker.C: + ob.clean() ob.tryUpdateTarget() case request := <-ob.updateChan: @@ -98,7 +100,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) { if err != nil { close(request.ReadyNotifier) } else { + ob.mut.Lock() ob.readyNotifiers[request.CollectionID] = append(ob.readyNotifiers[request.CollectionID], request.ReadyNotifier) + ob.mut.Unlock() } request.Notifier <- err @@ -122,6 +126,15 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e return readyCh, <-notifier } +func (ob *TargetObserver) ReleaseCollection(collectionID int64) { + ob.mut.Lock() + defer ob.mut.Unlock() + for _, notifier := range ob.readyNotifiers[collectionID] { + close(notifier) + } + delete(ob.readyNotifiers, collectionID) +} + func (ob *TargetObserver) tryUpdateTarget() { collections := ob.meta.GetAll() for _, collectionID := range collections { @@ -144,6 +157,21 @@ func (ob *TargetObserver) tryUpdateTarget() { } } +func (ob *TargetObserver) clean() { + collections := typeutil.NewSet(ob.meta.GetAll()...) + + ob.mut.Lock() + defer ob.mut.Unlock() + for collectionID, notifiers := range ob.readyNotifiers { + if !collections.Contain(collectionID) { + for i := range notifiers { + close(notifiers[i]) + } + delete(ob.readyNotifiers, collectionID) + } + } +} + func (ob *TargetObserver) shouldUpdateNextTarget(collectionID int64) bool { return !ob.targetMgr.IsNextTargetExist(collectionID) || ob.isNextTargetExpired(collectionID) } @@ -213,6 +241,9 @@ func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { log.Warn("observer trigger update current target", zap.Int64("collectionID", collectionID)) ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) + + ob.mut.Lock() + defer ob.mut.Unlock() notifiers := ob.readyNotifiers[collectionID] for _, notifier := range notifiers { close(notifier) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 680663d9f6..44b8337325 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -254,6 +254,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl s.dist, s.meta, s.targetMgr, + s.targetObserver, ) s.jobScheduler.Add(releaseJob) err := releaseJob.Wait() @@ -339,6 +340,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart s.dist, s.meta, s.targetMgr, + s.targetObserver, ) s.jobScheduler.Add(releaseJob) err := releaseJob.Wait() diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 5f1c2837b7..f144ae29f1 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" @@ -59,17 +60,18 @@ type ServiceSuite struct { nodes []int64 // Dependencies - kv kv.MetaKv - store meta.Store - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager - broker *meta.MockBroker - cluster *session.MockCluster - nodeMgr *session.NodeManager - jobScheduler *job.Scheduler - taskScheduler *task.MockScheduler - balancer balance.Balance + kv kv.MetaKv + store meta.Store + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + broker *meta.MockBroker + targetObserver *observers.TargetObserver + cluster *session.MockCluster + nodeMgr *session.NodeManager + jobScheduler *job.Scheduler + taskScheduler *task.MockScheduler + balancer balance.Balance // Test object server *Server @@ -127,6 +129,12 @@ func (suite *ServiceSuite) SetupTest() { suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store) suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) + suite.targetObserver = observers.NewTargetObserver( + suite.meta, + suite.targetMgr, + suite.dist, + suite.broker, + ) suite.nodeMgr = session.NewNodeManager() for _, node := range suite.nodes { suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) @@ -153,6 +161,7 @@ func (suite *ServiceSuite) SetupTest() { meta: suite.meta, targetMgr: suite.targetMgr, broker: suite.broker, + targetObserver: suite.targetObserver, nodeMgr: suite.nodeMgr, cluster: suite.cluster, jobScheduler: suite.jobScheduler,