fix: [10kcp] Query coord stop progress is too slow (#38300)

issue: https://github.com/milvus-io/milvus/issues/38237

query coord will save collection's target during stop progress, which
will be used for new querycoord's fast recover. but if milvus cluster
has thounsands of collections, which make query coord's stop progress
much more slower than expected.

this PR refine the impl to save collection's target to etcd when target
update, and clean it when collection released.

pr: https://github.com/milvus-io/milvus/pull/38238

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
Co-authored-by: Wei Liu <wei.liu@zilliz.com>
pull/38312/head
yihao.dai 2024-12-09 19:49:49 +08:00 committed by GitHub
parent 2fe6423552
commit ae4e2b8063
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 56 additions and 145 deletions

View File

@ -68,7 +68,7 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() {
idAllocator := RandomIncrementIDAllocator()
nodeManager := session.NewNodeManager()
testMeta := meta.NewMeta(idAllocator, store, nodeManager)
testTarget := meta.NewTargetManager(suite.broker, testMeta)
testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv))
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())

View File

@ -71,7 +71,7 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
idAllocator := RandomIncrementIDAllocator()
nodeManager := session.NewNodeManager()
testMeta := meta.NewMeta(idAllocator, store, nodeManager)
testTarget := meta.NewTargetManager(suite.broker, testMeta)
testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv))
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())

View File

@ -68,7 +68,7 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
idAllocator := RandomIncrementIDAllocator()
nodeManager := session.NewNodeManager()
testMeta := meta.NewMeta(idAllocator, store, nodeManager)
testTarget := meta.NewTargetManager(suite.broker, testMeta)
testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv))
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())

View File

@ -75,7 +75,7 @@ func (suite *BalanceCheckerTestSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.balancer = balance.NewMockBalancer(suite.T())
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })

View File

@ -72,7 +72,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() {
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
distManager := meta.NewDistributionManager()

View File

@ -73,7 +73,7 @@ func (suite *ControllerBaseTestSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.dist = meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.balancer = balance.NewMockBalancer(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())

View File

@ -77,7 +77,7 @@ func (suite *CheckerControllerSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.dist = meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.balancer = balance.NewMockBalancer(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())

View File

@ -74,7 +74,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() {
suite.broker = meta.NewMockBroker(suite.T())
distManager := meta.NewDistributionManager()
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr)
}

View File

@ -74,7 +74,7 @@ func (suite *SegmentCheckerTestSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
distManager := meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
balancer := suite.createMockBalancer()
suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer })

View File

@ -78,7 +78,7 @@ func (suite *DistControllerTestSuite) SetupTest() {
suite.mockCluster = session.NewMockCluster(suite.T())
distManager := meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
syncTargetVersionFn := func(collectionID int64) {}

View File

@ -163,7 +163,7 @@ func (suite *JobSuite) SetupTest() {
suite.dist = meta.NewDistributionManager()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.targetObserver = observers.NewTargetObserver(suite.meta,
suite.targetMgr,
suite.dist,

View File

@ -375,7 +375,7 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
err = mgr.UpdatePartitionLoadPercent(partitionID, 10)
suite.NoError(err)
}
_, err = mgr.UpdateCollectionLoadPercent(ctx, collectionID)
_, err = mgr.UpdateCollectionLoadPercent(collectionID)
suite.NoError(err)
}
suite.clearMemory()

View File

@ -206,10 +206,6 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C
func (_m *MockBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) {
ret := _m.Called(ctx, collectionIDs)
if len(ret) == 0 {
panic("no return value specified for GetDataViewVersions")
}
var r0 map[int64]int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) (map[int64]int64, error)); ok {
@ -272,10 +268,6 @@ func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segm
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetIndexInfo")
}
var r0 map[int64][]*querypb.FieldIndexInfo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, ...int64) (map[int64][]*querypb.FieldIndexInfo, error)); ok {

View File

@ -3,9 +3,7 @@
package meta
import (
metastore "github.com/milvus-io/milvus/internal/metastore"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
@ -606,13 +604,13 @@ func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64)
return _c
}
// Recover provides a mock function with given fields: catalog
func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
ret := _m.Called(catalog)
// Recover provides a mock function with given fields:
func (_m *MockTargetManager) Recover() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func(metastore.QueryCoordCatalog) error); ok {
r0 = rf(catalog)
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
@ -626,14 +624,13 @@ type MockTargetManager_Recover_Call struct {
}
// Recover is a helper method to define mock.On call
// - catalog metastore.QueryCoordCatalog
func (_e *MockTargetManager_Expecter) Recover(catalog interface{}) *MockTargetManager_Recover_Call {
return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover", catalog)}
func (_e *MockTargetManager_Expecter) Recover() *MockTargetManager_Recover_Call {
return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover")}
}
func (_c *MockTargetManager_Recover_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_Recover_Call {
func (_c *MockTargetManager_Recover_Call) Run(run func()) *MockTargetManager_Recover_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(metastore.QueryCoordCatalog))
run()
})
return _c
}
@ -643,7 +640,7 @@ func (_c *MockTargetManager_Recover_Call) Return(_a0 error) *MockTargetManager_R
return _c
}
func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func(metastore.QueryCoordCatalog) error) *MockTargetManager_Recover_Call {
func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func() error) *MockTargetManager_Recover_Call {
_c.Call.Return(run)
return _c
}
@ -729,39 +726,6 @@ func (_c *MockTargetManager_RemovePartition_Call) RunAndReturn(run func(int64, .
return _c
}
// SaveCurrentTarget provides a mock function with given fields: catalog
func (_m *MockTargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
_m.Called(catalog)
}
// MockTargetManager_SaveCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCurrentTarget'
type MockTargetManager_SaveCurrentTarget_Call struct {
*mock.Call
}
// SaveCurrentTarget is a helper method to define mock.On call
// - catalog metastore.QueryCoordCatalog
func (_e *MockTargetManager_Expecter) SaveCurrentTarget(catalog interface{}) *MockTargetManager_SaveCurrentTarget_Call {
return &MockTargetManager_SaveCurrentTarget_Call{Call: _e.mock.On("SaveCurrentTarget", catalog)}
}
func (_c *MockTargetManager_SaveCurrentTarget_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(metastore.QueryCoordCatalog))
})
return _c
}
func (_c *MockTargetManager_SaveCurrentTarget_Call) Return() *MockTargetManager_SaveCurrentTarget_Call {
_c.Call.Return()
return _c
}
func (_c *MockTargetManager_SaveCurrentTarget_Call) RunAndReturn(run func(metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
_c.Call.Return(run)
return _c
}
// UpdateCollectionCurrentTarget provides a mock function with given fields: collectionID
func (_m *MockTargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool {
ret := _m.Called(collectionID)

View File

@ -19,7 +19,6 @@ package meta
import (
"context"
"fmt"
"runtime"
"sync"
"github.com/samber/lo"
@ -28,11 +27,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -65,8 +62,7 @@ type TargetManagerInterface interface {
GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64
IsCurrentTargetExist(collectionID int64, partitionID int64) bool
IsNextTargetExist(collectionID int64) bool
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
Recover() error
CanSegmentBeMoved(collectionID, segmentID int64) bool
}
@ -80,14 +76,17 @@ type TargetManager struct {
// all remove segment/channel operation happens on Both current and next -> delete status should be consistent
current *target
next *target
catalog metastore.QueryCoordCatalog
}
func NewTargetManager(broker Broker, meta *Meta) *TargetManager {
func NewTargetManager(broker Broker, meta *Meta, catalog metastore.QueryCoordCatalog) *TargetManager {
return &TargetManager{
broker: broker,
meta: meta,
current: newTarget(),
next: newTarget(),
catalog: catalog,
}
}
@ -129,6 +128,12 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool
zap.Int64("version", newTarget.GetTargetVersion()),
zap.String("partStatsVersion", partStatsVersionInfo),
)
// save collection current target for fast recovery after qc restart
err := mgr.catalog.SaveCollectionTargets(newTarget.toPbMsg())
if err != nil {
log.Warn("failed to save collection targets", zap.Error(err))
}
return true
}
@ -239,6 +244,7 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) {
mgr.current.removeCollectionTarget(collectionID)
mgr.next.removeCollectionTarget(collectionID)
mgr.catalog.RemoveCollectionTarget(collectionID)
}
// RemovePartition removes all segment in the given partition,
@ -545,51 +551,11 @@ func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool {
return len(newChannels) > 0
}
func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
if mgr.current != nil {
// use pool here to control maximal writer used by save target
pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2)
defer pool.Release()
// use batch write in case of the number of collections is large
batchSize := 16
var wg sync.WaitGroup
submit := func(tasks []typeutil.Pair[int64, *querypb.CollectionTarget]) {
wg.Add(1)
pool.Submit(func() (any, error) {
defer wg.Done()
ids := lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) int64 { return p.A })
if err := catalog.SaveCollectionTargets(lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) *querypb.CollectionTarget {
return p.B
})...); err != nil {
log.Warn("failed to save current target for collection", zap.Int64s("collectionIDs", ids), zap.Error(err))
} else {
log.Info("succeed to save current target for collection", zap.Int64s("collectionIDs", ids))
}
return nil, nil
})
}
tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
for id, target := range mgr.current.collectionTargetMap {
tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg()))
if len(tasks) >= batchSize {
submit(tasks)
tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
}
}
if len(tasks) > 0 {
submit(tasks)
}
wg.Wait()
}
}
func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
func (mgr *TargetManager) Recover() error {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
targets, err := catalog.GetCollectionTargets()
targets, err := mgr.catalog.GetCollectionTargets()
if err != nil {
log.Warn("failed to recover collection target from etcd", zap.Error(err))
return err
@ -604,14 +570,7 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())),
zap.Int64("version", newTarget.GetTargetVersion()),
)
// clear target info in meta store
err := catalog.RemoveCollectionTarget(t.GetCollectionID())
if err != nil {
log.Warn("failed to clear collection target from etcd", zap.Error(err))
}
}
return nil
}

View File

@ -113,7 +113,7 @@ func (suite *TargetManagerSuite) SetupTest() {
idAllocator := RandomIncrementIDAllocator()
suite.meta = NewMeta(idAllocator, suite.catalog, session.NewNodeManager())
suite.broker = NewMockBroker(suite.T())
suite.mgr = NewTargetManager(suite.broker, suite.meta)
suite.mgr = NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
for _, collection := range suite.collections {
dmChannels := make([]*datapb.VchannelInfo, 0)
@ -582,13 +582,16 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.mgr.UpdateCollectionNextTarget(collectionID)
suite.mgr.UpdateCollectionCurrentTarget(collectionID)
suite.mgr.SaveCurrentTarget(suite.catalog)
// target should be save to meta store after update current target
targets, err := suite.catalog.GetCollectionTargets()
suite.NoError(err)
suite.Len(targets, 1)
// clear target in memory
version := suite.mgr.current.getCollectionTarget(collectionID).GetTargetVersion()
suite.mgr.current.removeCollectionTarget(collectionID)
// try to recover
suite.mgr.Recover(suite.catalog)
suite.mgr.Recover()
target := suite.mgr.current.getCollectionTarget(collectionID)
suite.NotNil(target)
@ -596,8 +599,9 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.Len(target.GetAllSegmentIDs(), 2)
suite.Equal(target.GetTargetVersion(), version)
// after recover, target info should be cleaned up
targets, err := suite.catalog.GetCollectionTargets()
// target should be removed from meta store after collection released
suite.mgr.RemoveCollection(collectionID)
targets, err = suite.catalog.GetCollectionTargets()
suite.NoError(err)
suite.Len(targets, 0)
}

View File

@ -88,10 +88,6 @@ func (_c *MockQueryNodeServer_Delete_Call) RunAndReturn(run func(context.Context
func (_m *MockQueryNodeServer) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for DeleteBatch")
}
var r0 *querypb.DeleteBatchResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {

View File

@ -196,7 +196,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.cluster = session.NewMockCluster(suite.T())
suite.targetObserver = NewTargetObserver(suite.meta,
suite.targetMgr,

View File

@ -91,7 +91,7 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.distMgr = meta.NewDistributionManager()
suite.cluster = session.NewMockCluster(suite.T())
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster, nodeMgr)
@ -300,7 +300,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.distMgr = meta.NewDistributionManager()
suite.cluster = session.NewMockCluster(suite.T())
suite.observer = NewTargetObserver(

View File

@ -102,7 +102,7 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.targetObserver = observers.NewTargetObserver(
suite.meta,
suite.targetMgr,

View File

@ -410,8 +410,8 @@ func (s *Server) initMeta() error {
ChannelDistManager: meta.NewChannelDistManager(),
LeaderViewManager: meta.NewLeaderViewManager(),
}
s.targetMgr = meta.NewTargetManager(s.broker, s.meta)
err = s.targetMgr.Recover(s.store)
s.targetMgr = meta.NewTargetManager(s.broker, s.meta, s.store)
err = s.targetMgr.Recover()
if err != nil {
log.Warn("failed to recover collection targets", zap.Error(err))
}
@ -566,12 +566,6 @@ func (s *Server) Stop() error {
s.targetObserver.Stop()
}
// save target to meta store, after querycoord restart, make it fast to recover current target
// should save target after target observer stop, incase of target changed
if s.targetMgr != nil {
s.targetMgr.SaveCurrentTarget(s.store)
}
if s.replicaObserver != nil {
s.replicaObserver.Stop()
}

View File

@ -551,7 +551,7 @@ func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status quer
func (suite *ServerSuite) hackServer() {
suite.broker = meta.NewMockBroker(suite.T())
suite.server.broker = suite.broker
suite.server.targetMgr = meta.NewTargetManager(suite.broker, suite.server.meta)
suite.server.targetMgr = meta.NewTargetManager(suite.broker, suite.server.meta, suite.server.store)
suite.server.taskScheduler = task.NewScheduler(
suite.server.ctx,
suite.server.meta,

View File

@ -151,7 +151,9 @@ func (suite *ServiceSuite) SetupTest() {
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, suite.store)
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.targetObserver = observers.NewTargetObserver(
suite.meta,
suite.targetMgr,

View File

@ -155,7 +155,7 @@ func (suite *TaskSuite) SetupTest() {
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, session.NewNodeManager())
suite.dist = meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
suite.target = meta.NewTargetManager(suite.broker, suite.meta)
suite.target = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
suite.nodeMgr = session.NewNodeManager()
suite.cluster = session.NewMockCluster(suite.T())