enhance: Invalidate collection cache when release collection (#37577)

Related to #37395

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/37561/head
congqixia 2024-11-12 10:16:29 +08:00 committed by GitHub
parent 2d29dcd30c
commit f5b06a3c9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 40 additions and 1 deletions

View File

@ -127,7 +127,7 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection:
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)

View File

@ -23,11 +23,14 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
)
@ -42,6 +45,7 @@ type ReleaseCollectionJob struct {
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
proxyManager proxyutil.ProxyClientManagerInterface
}
func NewReleaseCollectionJob(ctx context.Context,
@ -53,6 +57,7 @@ func NewReleaseCollectionJob(ctx context.Context,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
proxyManager proxyutil.ProxyClientManagerInterface,
) *ReleaseCollectionJob {
return &ReleaseCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
@ -64,6 +69,7 @@ func NewReleaseCollectionJob(ctx context.Context,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
proxyManager: proxyManager,
}
}
@ -96,6 +102,15 @@ func (job *ReleaseCollectionJob) Execute() error {
}
job.targetObserver.ReleaseCollection(req.GetCollectionID())
// try best discard cache
// shall not affect releasing if failed
job.proxyManager.InvalidateCollectionMetaCache(job.ctx,
&proxypb.InvalidateCollMetaCacheRequest{
CollectionID: req.GetCollectionID(),
},
proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection))
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
@ -116,6 +131,7 @@ type ReleasePartitionJob struct {
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
proxyManager proxyutil.ProxyClientManagerInterface
}
func NewReleasePartitionJob(ctx context.Context,
@ -127,6 +143,7 @@ func NewReleasePartitionJob(ctx context.Context,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
proxyManager proxyutil.ProxyClientManagerInterface,
) *ReleasePartitionJob {
return &ReleasePartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
@ -138,6 +155,7 @@ func NewReleasePartitionJob(ctx context.Context,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
proxyManager: proxyManager,
}
}
@ -179,6 +197,14 @@ func (job *ReleasePartitionJob) Execute() error {
}
job.targetObserver.ReleaseCollection(req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
// try best discard cache
// shall not affect releasing if failed
job.proxyManager.InvalidateCollectionMetaCache(job.ctx,
&proxypb.InvalidateCollMetaCacheRequest{
CollectionID: req.GetCollectionID(),
},
proxyutil.SetMsgType(commonpb.MsgType_ReleaseCollection))
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID())
} else {
err := job.meta.CollectionManager.RemovePartition(req.GetCollectionID(), toRelease...)

View File

@ -1057,6 +1057,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
@ -1079,6 +1080,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
@ -1108,6 +1110,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
@ -1131,6 +1134,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
@ -1156,6 +1160,7 @@ func (suite *JobSuite) TestReleasePartition() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()
@ -1189,6 +1194,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
return job
}
@ -1206,6 +1212,7 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
return job
}
@ -1504,6 +1511,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releaseCollectionJob)
err := releaseCollectionJob.Wait()
@ -1523,6 +1531,7 @@ func (suite *JobSuite) TestCallReleasePartitionFailed() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releasePartitionJob)
err = releasePartitionJob.Wait()
@ -1664,6 +1673,7 @@ func (suite *JobSuite) releaseAll() {
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
err := job.Wait()

View File

@ -330,6 +330,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
s.targetMgr,
s.targetObserver,
s.checkerController,
s.proxyClientManager,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()
@ -452,6 +453,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
s.targetMgr,
s.targetObserver,
s.checkerController,
s.proxyClientManager,
)
s.jobScheduler.Add(releaseJob)
err := releaseJob.Wait()

View File

@ -214,6 +214,7 @@ func (suite *ServiceSuite) SetupTest() {
distController: suite.distController,
ctx: context.Background(),
metricsRequest: metricsinfo.NewMetricsRequest(),
proxyClientManager: suite.proxyManager,
}
suite.server.registerMetricsRequest()