enhance: Make querycoordv2 collection observer task driven (#32441)

See also #32440

- Add loadTask in collection observer
- For load collection/partitions, load task shall timeout as a whole
- Change related constructor to load jobs

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/32468/head
congqixia 2024-04-22 10:39:22 +08:00 committed by GitHub
parent 8442098457
commit d7ff1bbe5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 276 additions and 107 deletions

View File

@ -44,13 +44,14 @@ type LoadCollectionJob struct {
req *querypb.LoadCollectionRequest
undo *UndoList
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
nodeMgr *session.NodeManager
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
nodeMgr *session.NodeManager
}
func NewLoadCollectionJob(
@ -62,19 +63,21 @@ func NewLoadCollectionJob(
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
nodeMgr *session.NodeManager,
) *LoadCollectionJob {
return &LoadCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
nodeMgr: nodeMgr,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
nodeMgr: nodeMgr,
}
}
@ -184,7 +187,7 @@ func (job *LoadCollectionJob) Execute() error {
}
})
_, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot())
ctx, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot())
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -214,6 +217,9 @@ func (job *LoadCollectionJob) Execute() error {
}
job.undo.IsTargetUpdated = true
// 6. register load task into collection observer
job.collectionObserver.LoadCollection(ctx, req.GetCollectionID())
return nil
}
@ -228,13 +234,14 @@ type LoadPartitionJob struct {
req *querypb.LoadPartitionsRequest
undo *UndoList
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
nodeMgr *session.NodeManager
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
nodeMgr *session.NodeManager
}
func NewLoadPartitionJob(
@ -246,19 +253,21 @@ func NewLoadPartitionJob(
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
nodeMgr *session.NodeManager,
) *LoadPartitionJob {
return &LoadPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
nodeMgr: nodeMgr,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
nodeMgr: nodeMgr,
}
}
@ -360,10 +369,10 @@ func (job *LoadPartitionJob) Execute() error {
CreatedAt: time.Now(),
}
})
ctx, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot())
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
job.undo.IsNewCollection = true
_, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot())
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -399,6 +408,8 @@ func (job *LoadPartitionJob) Execute() error {
}
job.undo.IsTargetUpdated = true
job.collectionObserver.LoadPartitions(ctx, req.GetCollectionID(), lackPartitionIDs)
return nil
}

View File

@ -60,16 +60,17 @@ type JobSuite struct {
loadTypes map[int64]querypb.LoadType
// Dependencies
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
broker *meta.MockBroker
nodeMgr *session.NodeManager
checkerController *checkers.CheckerController
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
cluster *session.MockCluster
targetMgr *meta.TargetManager
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
broker *meta.MockBroker
nodeMgr *session.NodeManager
checkerController *checkers.CheckerController
// Test objects
scheduler *Scheduler
@ -192,6 +193,13 @@ func (suite *JobSuite) SetupTest() {
suite.meta.HandleNodeUp(3000)
suite.checkerController = &checkers.CheckerController{}
suite.collectionObserver = observers.NewCollectionObserver(
suite.dist,
suite.meta,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
}
func (suite *JobSuite) TearDownTest() {
@ -231,6 +239,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -258,6 +267,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -283,6 +293,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -310,6 +321,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -345,6 +357,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -366,6 +379,7 @@ func (suite *JobSuite) TestLoadCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -395,6 +409,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -427,6 +442,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -457,6 +473,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -488,6 +505,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -518,6 +536,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -545,6 +564,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -572,6 +592,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -598,6 +619,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -633,6 +655,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -655,6 +678,7 @@ func (suite *JobSuite) TestLoadPartition() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -682,6 +706,7 @@ func (suite *JobSuite) TestDynamicLoad() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
return job
@ -700,6 +725,7 @@ func (suite *JobSuite) TestDynamicLoad() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
return job
@ -799,6 +825,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -832,6 +859,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -864,6 +892,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -891,6 +920,7 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
)
suite.scheduler.Add(job)
@ -1133,6 +1163,7 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1174,6 +1205,7 @@ func (suite *JobSuite) TestLoadPartitionStoreFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1201,6 +1233,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1229,6 +1262,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadCollectionJob)
@ -1249,6 +1283,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadPartitionJob)
@ -1275,6 +1310,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadCollectionJob)
@ -1294,6 +1330,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadPartitionJob)
@ -1436,6 +1473,7 @@ func (suite *JobSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
@ -1460,6 +1498,7 @@ func (suite *JobSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -32,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type CollectionObserver struct {
@ -45,9 +47,17 @@ type CollectionObserver struct {
checkerController *checkers.CheckerController
partitionLoadedCount map[int64]int
loadTasks *typeutil.ConcurrentMap[string, LoadTask]
stopOnce sync.Once
}
type LoadTask struct {
LoadType querypb.LoadType
CollectionID int64
PartitionIDs []int64
}
func NewCollectionObserver(
dist *meta.DistributionManager,
meta *meta.Meta,
@ -55,14 +65,23 @@ func NewCollectionObserver(
targetObserver *TargetObserver,
checherController *checkers.CheckerController,
) *CollectionObserver {
return &CollectionObserver{
ob := &CollectionObserver{
dist: dist,
meta: meta,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checherController,
partitionLoadedCount: make(map[int64]int),
loadTasks: typeutil.NewConcurrentMap[string, LoadTask](),
}
// Add load task for collection recovery
collections := meta.GetAllCollections()
for _, collection := range collections {
ob.LoadCollection(context.Background(), collection.GetCollectionID())
}
return ob
}
func (ob *CollectionObserver) Start() {
@ -98,51 +117,104 @@ func (ob *CollectionObserver) Stop() {
})
}
func (ob *CollectionObserver) LoadCollection(ctx context.Context, collectionID int64) {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
key := traceID.String()
if !traceID.IsValid() {
key = fmt.Sprintf("LoadCollection_%d", collectionID)
}
ob.loadTasks.Insert(key, LoadTask{LoadType: querypb.LoadType_LoadCollection, CollectionID: collectionID})
}
func (ob *CollectionObserver) LoadPartitions(ctx context.Context, collectionID int64, partitionIDs []int64) {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
key := traceID.String()
if !traceID.IsValid() {
key = fmt.Sprintf("LoadPartition_%d_%v", collectionID, partitionIDs)
}
ob.loadTasks.Insert(key, LoadTask{LoadType: querypb.LoadType_LoadPartition, CollectionID: collectionID, PartitionIDs: partitionIDs})
}
func (ob *CollectionObserver) Observe(ctx context.Context) {
ob.observeTimeout()
ob.observeLoadStatus(ctx)
}
func (ob *CollectionObserver) observeTimeout() {
collections := ob.meta.CollectionManager.GetAllCollections()
for _, collection := range collections {
if collection.GetStatus() != querypb.LoadStatus_Loading ||
time.Now().Before(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
continue
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
collection := ob.meta.CollectionManager.GetCollection(task.CollectionID)
// collection released
if collection == nil {
log.Info("Load Collection Task canceled, collection removed from meta", zap.Int64("collectionID", task.CollectionID), zap.String("traceID", traceID))
ob.loadTasks.Remove(traceID)
return true
}
log.Info("load collection timeout, cancel it",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.Duration("loadTime", time.Since(collection.CreatedAt)))
ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID())
ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID())
ob.targetMgr.RemoveCollection(collection.GetCollectionID())
}
switch task.LoadType {
case querypb.LoadType_LoadCollection:
if collection.GetStatus() == querypb.LoadStatus_Loading &&
time.Now().After(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
log.Info("load collection timeout, cancel it",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.Duration("loadTime", time.Since(collection.CreatedAt)))
ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID())
ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID())
ob.targetMgr.RemoveCollection(collection.GetCollectionID())
ob.loadTasks.Remove(traceID)
}
case querypb.LoadType_LoadPartition:
partitionIDs := typeutil.NewSet(task.PartitionIDs...)
partitions := ob.meta.GetPartitionsByCollection(task.CollectionID)
partitions = lo.Filter(partitions, func(partition *meta.Partition, _ int) bool {
return partitionIDs.Contain(partition.GetPartitionID())
})
partitions := utils.GroupPartitionsByCollection(ob.meta.CollectionManager.GetAllPartitions())
for collection, partitions := range partitions {
for _, partition := range partitions {
if partition.GetStatus() != querypb.LoadStatus_Loading ||
time.Now().Before(partition.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
continue
// all partition released
if len(partitions) == 0 {
log.Info("Load Partitions Task canceled, collection removed from meta",
zap.Int64("collectionID", task.CollectionID),
zap.Int64s("partitionIDs", task.PartitionIDs),
zap.String("traceID", traceID))
ob.loadTasks.Remove(traceID)
return true
}
log.Info("load partition timeout, cancel it",
zap.Int64("collectionID", collection),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Duration("loadTime", time.Since(partition.CreatedAt)))
ob.meta.CollectionManager.RemovePartition(collection, partition.GetPartitionID())
ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID())
}
// all partition timeout, remove collection
if len(ob.meta.CollectionManager.GetPartitionsByCollection(collection)) == 0 {
log.Info("collection timeout due to all partition removed", zap.Int64("collection", collection))
working := false
for _, partition := range partitions {
if time.Now().Before(partition.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) {
working = true
break
}
}
// only all partitions timeout means task timeout
if !working {
log.Info("load partitions timeout, cancel it",
zap.Int64("collectionID", task.CollectionID),
zap.Int64s("partitionIDs", task.PartitionIDs))
for _, partition := range partitions {
ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID())
ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID())
}
ob.meta.CollectionManager.RemoveCollection(collection)
ob.meta.ReplicaManager.RemoveCollection(collection)
ob.targetMgr.RemoveCollection(collection)
// all partition timeout, remove collection
if len(ob.meta.CollectionManager.GetPartitionsByCollection(task.CollectionID)) == 0 {
log.Info("collection timeout due to all partition removed", zap.Int64("collection", task.CollectionID))
ob.meta.CollectionManager.RemoveCollection(task.CollectionID)
ob.meta.ReplicaManager.RemoveCollection(task.CollectionID)
ob.targetMgr.RemoveCollection(task.CollectionID)
}
}
}
}
return true
})
}
func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {
@ -153,18 +225,54 @@ func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {
}
func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
partitions := ob.meta.CollectionManager.GetAllPartitions()
loading := false
for _, partition := range partitions {
if partition.LoadPercentage == 100 {
continue
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
loading = true
collection := ob.meta.CollectionManager.GetCollection(task.CollectionID)
if collection == nil {
return true
}
if ob.readyToObserve(partition.CollectionID) {
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
ob.observePartitionLoadStatus(ctx, partition, replicaNum)
loading = true
var partitions []*meta.Partition
switch task.LoadType {
case querypb.LoadType_LoadCollection:
partitions = ob.meta.GetPartitionsByCollection(task.CollectionID)
case querypb.LoadType_LoadPartition:
partitionIDs := typeutil.NewSet[int64](task.PartitionIDs...)
partitions = ob.meta.GetPartitionsByCollection(task.CollectionID)
partitions = lo.Filter(partitions, func(partition *meta.Partition, _ int) bool {
return partitionIDs.Contain(partition.GetPartitionID())
})
}
}
loaded := true
for _, partition := range partitions {
if partition.LoadPercentage == 100 {
continue
}
if ob.readyToObserve(partition.CollectionID) {
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
ob.observePartitionLoadStatus(ctx, partition, replicaNum)
}
partition = ob.meta.GetPartition(partition.PartitionID)
if partition.LoadPercentage != 100 {
loaded = false
}
}
// all partition loaded, finish task
if len(partitions) > 0 && loaded {
log.Info("Load task finish",
zap.String("traceID", traceID),
zap.Int64("collectionID", task.CollectionID),
zap.Int64s("partitionIDs", task.PartitionIDs),
zap.Stringer("loadType", task.LoadType))
ob.loadTasks.Remove(traceID)
}
return true
})
// trigger check logic when loading collections/partitions
if loading {
ob.checkerController.Check()

View File

@ -17,6 +17,7 @@
package observers
import (
"context"
"testing"
"time"
@ -441,6 +442,8 @@ func (suite *CollectionObserverSuite) load(collection int64) {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collection).Return(dmChannels, allSegments, nil)
suite.targetMgr.UpdateCollectionNextTarget(collection)
suite.ob.LoadCollection(context.Background(), collection)
}
func TestCollectionObserver(t *testing.T) {

View File

@ -232,6 +232,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
s.cluster,
s.targetMgr,
s.targetObserver,
s.collectionObserver,
s.nodeMgr,
)
s.jobScheduler.Add(loadJob)
@ -332,6 +333,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
s.cluster,
s.targetMgr,
s.targetObserver,
s.collectionObserver,
s.nodeMgr,
)
s.jobScheduler.Add(loadJob)

View File

@ -70,18 +70,19 @@ type ServiceSuite struct {
nodes []int64
// Dependencies
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker *meta.MockBroker
targetObserver *observers.TargetObserver
cluster *session.MockCluster
nodeMgr *session.NodeManager
jobScheduler *job.Scheduler
taskScheduler *task.MockScheduler
balancer balance.Balance
kv kv.MetaKv
store metastore.QueryCoordCatalog
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker *meta.MockBroker
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
cluster *session.MockCluster
nodeMgr *session.NodeManager
jobScheduler *job.Scheduler
taskScheduler *task.MockScheduler
balancer balance.Balance
distMgr *meta.DistributionManager
distController *dist.MockController
@ -177,6 +178,15 @@ func (suite *ServiceSuite) SetupTest() {
suite.distMgr = meta.NewDistributionManager()
suite.distController = dist.NewMockController(suite.T())
suite.collectionObserver = observers.NewCollectionObserver(
suite.dist,
suite.meta,
suite.targetMgr,
suite.targetObserver,
&checkers.CheckerController{},
)
suite.collectionObserver.Start()
suite.server = &Server{
kv: suite.kv,
store: suite.store,
@ -187,6 +197,7 @@ func (suite *ServiceSuite) SetupTest() {
targetMgr: suite.targetMgr,
broker: suite.broker,
targetObserver: suite.targetObserver,
collectionObserver: suite.collectionObserver,
nodeMgr: suite.nodeMgr,
cluster: suite.cluster,
jobScheduler: suite.jobScheduler,
@ -195,13 +206,6 @@ func (suite *ServiceSuite) SetupTest() {
distController: suite.distController,
ctx: context.Background(),
}
suite.server.collectionObserver = observers.NewCollectionObserver(
suite.server.dist,
suite.server.meta,
suite.server.targetMgr,
suite.targetObserver,
&checkers.CheckerController{},
)
suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
}
@ -1802,6 +1806,7 @@ func (suite *ServiceSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.jobScheduler.Add(job)
@ -1826,6 +1831,7 @@ func (suite *ServiceSuite) loadAll() {
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.jobScheduler.Add(job)