enhance: [2.3] Make querycoordv2 collection observer task driven (#32441) (#32615)

Cherry-pick from master
pr: #32441
See also #32440

- Add loadTask in collection observer
- For load collection/partitions, load task shall timeout as a whole
- Change related constructor to load jobs

---------

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/32671/head
congqixia 2024-04-28 13:19:26 +08:00 committed by GitHub
parent d56bec07fb
commit 114e3056c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 279 additions and 108 deletions

View File

@ -44,13 +44,14 @@ type LoadCollectionJob struct {
req *querypb.LoadCollectionRequest
undo *UndoList
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
nodeMgr *session.NodeManager
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
nodeMgr *session.NodeManager
}
func NewLoadCollectionJob(
@ -62,19 +63,21 @@ func NewLoadCollectionJob(
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
nodeMgr *session.NodeManager,
) *LoadCollectionJob {
return &LoadCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
nodeMgr: nodeMgr,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
nodeMgr: nodeMgr,
}
}
@ -182,7 +185,7 @@ func (job *LoadCollectionJob) Execute() error {
}
})
_, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot())
ctx, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot())
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -212,6 +215,9 @@ func (job *LoadCollectionJob) Execute() error {
}
job.undo.IsTargetUpdated = true
// 6. register load task into collection observer
job.collectionObserver.LoadCollection(ctx, req.GetCollectionID())
return nil
}
@ -226,13 +232,14 @@ type LoadPartitionJob struct {
req *querypb.LoadPartitionsRequest
undo *UndoList
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
nodeMgr *session.NodeManager
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
nodeMgr *session.NodeManager
}
func NewLoadPartitionJob(
@ -244,19 +251,21 @@ func NewLoadPartitionJob(
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
nodeMgr *session.NodeManager,
) *LoadPartitionJob {
return &LoadPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
nodeMgr: nodeMgr,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
nodeMgr: nodeMgr,
}
}
@ -358,10 +367,10 @@ func (job *LoadPartitionJob) Execute() error {
CreatedAt: time.Now(),
}
})
ctx, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot())
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
job.undo.IsNewCollection = true
_, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot())
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -397,6 +406,8 @@ func (job *LoadPartitionJob) Execute() error {
}
job.undo.IsTargetUpdated = true
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionID(), lackPartitionIDs)
return nil
}

View File

@ -59,16 +59,17 @@ type JobSuite struct {
loadTypes map[int64]querypb.LoadType
// Dependencies
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
broker *meta.MockBroker
nodeMgr *session.NodeManager
checkerController *checkers.CheckerController
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
broker *meta.MockBroker
nodeMgr *session.NodeManager
checkerController *checkers.CheckerController
// Test objects
scheduler *Scheduler
@ -182,6 +183,14 @@ func (suite *JobSuite) SetupTest() {
suite.NoError(err)
suite.checkerController = &checkers.CheckerController{}
suite.collectionObserver = observers.NewCollectionObserver(
suite.dist,
suite.meta,
suite.targetMgr,
suite.targetObserver,
nil,
suite.checkerController,
)
}
func (suite *JobSuite) TearDownTest() {
@ -221,6 +230,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -248,6 +258,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -273,6 +284,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -300,6 +312,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -326,6 +339,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -347,6 +361,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -376,6 +391,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -408,6 +424,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -438,6 +455,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -469,6 +487,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -499,6 +518,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -526,6 +546,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -553,6 +574,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -579,6 +601,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -606,6 +629,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -628,6 +652,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -655,6 +680,7 @@ func (suite *JobSuite) TestDynamicLoad() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
return job
@ -673,6 +699,7 @@ func (suite *JobSuite) TestDynamicLoad() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
return job
@ -772,6 +799,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -805,6 +833,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -837,6 +866,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -864,6 +894,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
@ -1109,6 +1140,7 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1153,6 +1185,7 @@ func (suite *JobSuite) TestLoadPartitionStoreFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1180,6 +1213,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1208,6 +1242,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadCollectionJob)
@ -1228,6 +1263,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadPartitionJob)
@ -1254,6 +1290,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadCollectionJob)
@ -1273,6 +1310,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadPartitionJob)
@ -1415,6 +1453,7 @@ func (suite *JobSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1439,6 +1478,7 @@ func (suite *JobSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)

View File

@ -22,6 +22,8 @@ import (
"sync"
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -31,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type CollectionObserver struct {
@ -45,9 +48,17 @@ type CollectionObserver struct {
checkerController *checkers.CheckerController
partitionLoadedCount map[int64]int
loadTasks *typeutil.ConcurrentMap[string, LoadTask]
stopOnce sync.Once
}
type LoadTask struct {
LoadType querypb.LoadType
CollectionID int64
PartitionIDs []int64
}
func NewCollectionObserver(
dist *meta.DistributionManager,
meta *meta.Meta,
@ -56,7 +67,7 @@ func NewCollectionObserver(
leaderObserver *LeaderObserver,
checherController *checkers.CheckerController,
) *CollectionObserver {
return &CollectionObserver{
ob := &CollectionObserver{
dist: dist,
meta: meta,
targetMgr: targetMgr,
@ -64,7 +75,16 @@ func NewCollectionObserver(
leaderObserver: leaderObserver,
checkerController: checherController,
partitionLoadedCount: make(map[int64]int),
loadTasks: typeutil.NewConcurrentMap[string, LoadTask](),
}
// Add load task for collection recovery
collections := meta.GetAllCollections()
for _, collection := range collections {
ob.LoadCollection(context.Background(), collection.GetCollectionID())
}
return ob
}
func (ob *CollectionObserver) Start() {
@ -100,51 +120,104 @@ func (ob *CollectionObserver) Stop() {
})
}
func (ob *CollectionObserver) LoadCollection(ctx context.Context, collectionID int64) {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
key := traceID.String()
if !traceID.IsValid() {
key = fmt.Sprintf("LoadCollection_%d", collectionID)
}
ob.loadTasks.Insert(key, LoadTask{LoadType: querypb.LoadType_LoadCollection, CollectionID: collectionID})
}
func (ob *CollectionObserver) LoadPartitions(ctx context.Context, collectionID int64, partitionIDs []int64) {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
key := traceID.String()
if !traceID.IsValid() {
key = fmt.Sprintf("LoadPartition_%d_%v", collectionID, partitionIDs)
}
ob.loadTasks.Insert(key, LoadTask{LoadType: querypb.LoadType_LoadPartition, CollectionID: collectionID, PartitionIDs: partitionIDs})
}
func (ob *CollectionObserver) Observe(ctx context.Context) {
ob.observeTimeout()
ob.observeLoadStatus(ctx)
}
func (ob *CollectionObserver) observeTimeout() {
collections := ob.meta.CollectionManager.GetAllCollections()
for _, collection := range collections {
if collection.GetStatus() != querypb.LoadStatus_Loading ||
time.Now().Before(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
continue
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
collection := ob.meta.CollectionManager.GetCollection(task.CollectionID)
// collection released
if collection == nil {
log.Info("Load Collection Task canceled, collection removed from meta", zap.Int64("collectionID", task.CollectionID), zap.String("traceID", traceID))
ob.loadTasks.Remove(traceID)
return true
}
log.Info("load collection timeout, cancel it",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.Duration("loadTime", time.Since(collection.CreatedAt)))
ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID())
ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID())
ob.targetMgr.RemoveCollection(collection.GetCollectionID())
}
switch task.LoadType {
case querypb.LoadType_LoadCollection:
if collection.GetStatus() == querypb.LoadStatus_Loading &&
time.Now().After(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
log.Info("load collection timeout, cancel it",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.Duration("loadTime", time.Since(collection.CreatedAt)))
ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID())
ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID())
ob.targetMgr.RemoveCollection(collection.GetCollectionID())
ob.loadTasks.Remove(traceID)
}
case querypb.LoadType_LoadPartition:
partitionIDs := typeutil.NewSet(task.PartitionIDs...)
partitions := ob.meta.GetPartitionsByCollection(task.CollectionID)
partitions = lo.Filter(partitions, func(partition *meta.Partition, _ int) bool {
return partitionIDs.Contain(partition.GetPartitionID())
})
partitions := utils.GroupPartitionsByCollection(ob.meta.CollectionManager.GetAllPartitions())
for collection, partitions := range partitions {
for _, partition := range partitions {
if partition.GetStatus() != querypb.LoadStatus_Loading ||
time.Now().Before(partition.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
continue
// all partition released
if len(partitions) == 0 {
log.Info("Load Partitions Task canceled, collection removed from meta",
zap.Int64("collectionID", task.CollectionID),
zap.Int64s("partitionIDs", task.PartitionIDs),
zap.String("traceID", traceID))
ob.loadTasks.Remove(traceID)
return true
}
log.Info("load partition timeout, cancel it",
zap.Int64("collectionID", collection),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Duration("loadTime", time.Since(partition.CreatedAt)))
ob.meta.CollectionManager.RemovePartition(collection, partition.GetPartitionID())
ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID())
}
// all partition timeout, remove collection
if len(ob.meta.CollectionManager.GetPartitionsByCollection(collection)) == 0 {
log.Info("collection timeout due to all partition removed", zap.Int64("collection", collection))
working := false
for _, partition := range partitions {
if time.Now().Before(partition.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
working = true
break
}
}
// only all partitions timeout means task timeout
if !working {
log.Info("load partitions timeout, cancel it",
zap.Int64("collectionID", task.CollectionID),
zap.Int64s("partitionIDs", task.PartitionIDs))
for _, partition := range partitions {
ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID())
ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID())
}
ob.meta.CollectionManager.RemoveCollection(collection)
ob.meta.ReplicaManager.RemoveCollection(collection)
ob.targetMgr.RemoveCollection(collection)
// all partition timeout, remove collection
if len(ob.meta.CollectionManager.GetPartitionsByCollection(task.CollectionID)) == 0 {
log.Info("collection timeout due to all partition removed", zap.Int64("collection", task.CollectionID))
ob.meta.CollectionManager.RemoveCollection(task.CollectionID)
ob.meta.ReplicaManager.RemoveCollection(task.CollectionID)
ob.targetMgr.RemoveCollection(task.CollectionID)
}
}
}
}
return true
})
}
func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {
@ -155,18 +228,54 @@ func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {
}
func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
partitions := ob.meta.CollectionManager.GetAllPartitions()
loading := false
for _, partition := range partitions {
if partition.LoadPercentage == 100 {
continue
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
loading = true
collection := ob.meta.CollectionManager.GetCollection(task.CollectionID)
if collection == nil {
return true
}
if ob.readyToObserve(partition.CollectionID) {
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
ob.observePartitionLoadStatus(ctx, partition, replicaNum)
loading = true
var partitions []*meta.Partition
switch task.LoadType {
case querypb.LoadType_LoadCollection:
partitions = ob.meta.GetPartitionsByCollection(task.CollectionID)
case querypb.LoadType_LoadPartition:
partitionIDs := typeutil.NewSet[int64](task.PartitionIDs...)
partitions = ob.meta.GetPartitionsByCollection(task.CollectionID)
partitions = lo.Filter(partitions, func(partition *meta.Partition, _ int) bool {
return partitionIDs.Contain(partition.GetPartitionID())
})
}
}
loaded := true
for _, partition := range partitions {
if partition.LoadPercentage == 100 {
continue
}
if ob.readyToObserve(partition.CollectionID) {
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
ob.observePartitionLoadStatus(ctx, partition, replicaNum)
}
partition = ob.meta.GetPartition(partition.PartitionID)
if partition != nil && partition.LoadPercentage != 100 {
loaded = false
}
}
// all partition loaded, finish task
if len(partitions) > 0 && loaded {
log.Info("Load task finish",
zap.String("traceID", traceID),
zap.Int64("collectionID", task.CollectionID),
zap.Int64s("partitionIDs", task.PartitionIDs),
zap.Stringer("loadType", task.LoadType))
ob.loadTasks.Remove(traceID)
}
return true
})
// trigger check logic when loading collections/partitions
if loading {
ob.checkerController.Check()

View File

@ -17,6 +17,7 @@
package observers
import (
"context"
"testing"
"time"
@ -445,6 +446,8 @@ func (suite *CollectionObserverSuite) load(collection int64) {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collection).Return(dmChannels, allSegments, nil)
suite.targetMgr.UpdateCollectionNextTarget(collection)
suite.ob.LoadCollection(context.Background(), collection)
}
func TestCollectionObserver(t *testing.T) {

View File

@ -231,6 +231,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
s.cluster,
s.targetMgr,
s.targetObserver,
s.collectionObserver,
s.nodeMgr,
)
s.jobScheduler.Add(loadJob)
@ -331,6 +332,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
s.cluster,
s.targetMgr,
s.targetObserver,
s.collectionObserver,
s.nodeMgr,
)
s.jobScheduler.Add(loadJob)

View File

@ -69,18 +69,19 @@ type ServiceSuite struct {
nodes []int64
// Dependencies
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker *meta.MockBroker
targetObserver *observers.TargetObserver
cluster *session.MockCluster
nodeMgr *session.NodeManager
jobScheduler *job.Scheduler
taskScheduler *task.MockScheduler
balancer balance.Balance
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker *meta.MockBroker
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
cluster *session.MockCluster
nodeMgr *session.NodeManager
jobScheduler *job.Scheduler
taskScheduler *task.MockScheduler
balancer balance.Balance
distMgr *meta.DistributionManager
distController *dist.MockController
@ -173,6 +174,16 @@ func (suite *ServiceSuite) SetupTest() {
suite.distMgr = meta.NewDistributionManager()
suite.distController = dist.NewMockController(suite.T())
suite.collectionObserver = observers.NewCollectionObserver(
suite.dist,
suite.meta,
suite.targetMgr,
suite.targetObserver,
nil,
&checkers.CheckerController{},
)
suite.collectionObserver.Start()
suite.server = &Server{
kv: suite.kv,
store: suite.store,
@ -183,6 +194,7 @@ func (suite *ServiceSuite) SetupTest() {
targetMgr: suite.targetMgr,
broker: suite.broker,
targetObserver: suite.targetObserver,
collectionObserver: suite.collectionObserver,
nodeMgr: suite.nodeMgr,
cluster: suite.cluster,
jobScheduler: suite.jobScheduler,
@ -191,14 +203,6 @@ func (suite *ServiceSuite) SetupTest() {
distController: suite.distController,
ctx: context.Background(),
}
suite.server.collectionObserver = observers.NewCollectionObserver(
suite.server.dist,
suite.server.meta,
suite.server.targetMgr,
suite.targetObserver,
suite.server.leaderObserver,
&checkers.CheckerController{},
)
suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
}
@ -1655,6 +1659,7 @@ func (suite *ServiceSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.jobScheduler.Add(job)
@ -1679,6 +1684,7 @@ func (suite *ServiceSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.jobScheduler.Add(job)