mirror of https://github.com/milvus-io/milvus.git
This reverts commit ae4e2b8063
.
pull/38803/head
parent
05f50b11ff
commit
501d1b58cf
|
@ -68,7 +68,7 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() {
|
|||
idAllocator := RandomIncrementIDAllocator()
|
||||
nodeManager := session.NewNodeManager()
|
||||
testMeta := meta.NewMeta(idAllocator, store, nodeManager)
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv))
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta)
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
suite.mockScheduler = task.NewMockScheduler(suite.T())
|
||||
|
|
|
@ -71,7 +71,7 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
|
|||
idAllocator := RandomIncrementIDAllocator()
|
||||
nodeManager := session.NewNodeManager()
|
||||
testMeta := meta.NewMeta(idAllocator, store, nodeManager)
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv))
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta)
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
suite.mockScheduler = task.NewMockScheduler(suite.T())
|
||||
|
|
|
@ -68,7 +68,7 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
|
|||
idAllocator := RandomIncrementIDAllocator()
|
||||
nodeManager := session.NewNodeManager()
|
||||
testMeta := meta.NewMeta(idAllocator, store, nodeManager)
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv))
|
||||
testTarget := meta.NewTargetManager(suite.broker, testMeta)
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
suite.mockScheduler = task.NewMockScheduler(suite.T())
|
||||
|
|
|
@ -75,7 +75,7 @@ func (suite *BalanceCheckerTestSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.scheduler = task.NewMockScheduler(suite.T())
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
suite.balancer = balance.NewMockBalancer(suite.T())
|
||||
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })
|
||||
|
|
|
@ -72,7 +72,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() {
|
|||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ func (suite *ControllerBaseTestSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
||||
suite.dist = meta.NewDistributionManager()
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
suite.balancer = balance.NewMockBalancer(suite.T())
|
||||
suite.scheduler = task.NewMockScheduler(suite.T())
|
||||
|
|
|
@ -77,7 +77,7 @@ func (suite *CheckerControllerSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
||||
suite.dist = meta.NewDistributionManager()
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
suite.balancer = balance.NewMockBalancer(suite.T())
|
||||
suite.scheduler = task.NewMockScheduler(suite.T())
|
||||
|
|
|
@ -74,7 +74,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() {
|
|||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
|
||||
distManager := meta.NewDistributionManager()
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr)
|
||||
}
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ func (suite *SegmentCheckerTestSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
||||
distManager := meta.NewDistributionManager()
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
balancer := suite.createMockBalancer()
|
||||
suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer })
|
||||
|
|
|
@ -78,7 +78,7 @@ func (suite *DistControllerTestSuite) SetupTest() {
|
|||
suite.mockCluster = session.NewMockCluster(suite.T())
|
||||
distManager := meta.NewDistributionManager()
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.mockScheduler = task.NewMockScheduler(suite.T())
|
||||
suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
|
||||
syncTargetVersionFn := func(collectionID int64) {}
|
||||
|
|
|
@ -163,7 +163,7 @@ func (suite *JobSuite) SetupTest() {
|
|||
suite.dist = meta.NewDistributionManager()
|
||||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.targetObserver = observers.NewTargetObserver(suite.meta,
|
||||
suite.targetMgr,
|
||||
suite.dist,
|
||||
|
|
|
@ -375,7 +375,7 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
|
|||
err = mgr.UpdatePartitionLoadPercent(partitionID, 10)
|
||||
suite.NoError(err)
|
||||
}
|
||||
_, err = mgr.UpdateCollectionLoadPercent(collectionID)
|
||||
_, err = mgr.UpdateCollectionLoadPercent(ctx, collectionID)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.clearMemory()
|
||||
|
|
|
@ -206,6 +206,10 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C
|
|||
func (_m *MockBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) {
|
||||
ret := _m.Called(ctx, collectionIDs)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for GetDataViewVersions")
|
||||
}
|
||||
|
||||
var r0 map[int64]int64
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, []int64) (map[int64]int64, error)); ok {
|
||||
|
@ -268,6 +272,10 @@ func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segm
|
|||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for GetIndexInfo")
|
||||
}
|
||||
|
||||
var r0 map[int64][]*querypb.FieldIndexInfo
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64, ...int64) (map[int64][]*querypb.FieldIndexInfo, error)); ok {
|
||||
|
|
|
@ -3,7 +3,9 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
metastore "github.com/milvus-io/milvus/internal/metastore"
|
||||
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -604,13 +606,13 @@ func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64)
|
|||
return _c
|
||||
}
|
||||
|
||||
// Recover provides a mock function with given fields:
|
||||
func (_m *MockTargetManager) Recover() error {
|
||||
ret := _m.Called()
|
||||
// Recover provides a mock function with given fields: catalog
|
||||
func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
|
||||
ret := _m.Called(catalog)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func() error); ok {
|
||||
r0 = rf()
|
||||
if rf, ok := ret.Get(0).(func(metastore.QueryCoordCatalog) error); ok {
|
||||
r0 = rf(catalog)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
@ -624,13 +626,14 @@ type MockTargetManager_Recover_Call struct {
|
|||
}
|
||||
|
||||
// Recover is a helper method to define mock.On call
|
||||
func (_e *MockTargetManager_Expecter) Recover() *MockTargetManager_Recover_Call {
|
||||
return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover")}
|
||||
// - catalog metastore.QueryCoordCatalog
|
||||
func (_e *MockTargetManager_Expecter) Recover(catalog interface{}) *MockTargetManager_Recover_Call {
|
||||
return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover", catalog)}
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_Recover_Call) Run(run func()) *MockTargetManager_Recover_Call {
|
||||
func (_c *MockTargetManager_Recover_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_Recover_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
run(args[0].(metastore.QueryCoordCatalog))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
@ -640,7 +643,7 @@ func (_c *MockTargetManager_Recover_Call) Return(_a0 error) *MockTargetManager_R
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func() error) *MockTargetManager_Recover_Call {
|
||||
func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func(metastore.QueryCoordCatalog) error) *MockTargetManager_Recover_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
@ -726,6 +729,39 @@ func (_c *MockTargetManager_RemovePartition_Call) RunAndReturn(run func(int64, .
|
|||
return _c
|
||||
}
|
||||
|
||||
// SaveCurrentTarget provides a mock function with given fields: catalog
|
||||
func (_m *MockTargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
|
||||
_m.Called(catalog)
|
||||
}
|
||||
|
||||
// MockTargetManager_SaveCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCurrentTarget'
|
||||
type MockTargetManager_SaveCurrentTarget_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// SaveCurrentTarget is a helper method to define mock.On call
|
||||
// - catalog metastore.QueryCoordCatalog
|
||||
func (_e *MockTargetManager_Expecter) SaveCurrentTarget(catalog interface{}) *MockTargetManager_SaveCurrentTarget_Call {
|
||||
return &MockTargetManager_SaveCurrentTarget_Call{Call: _e.mock.On("SaveCurrentTarget", catalog)}
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_SaveCurrentTarget_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(metastore.QueryCoordCatalog))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_SaveCurrentTarget_Call) Return() *MockTargetManager_SaveCurrentTarget_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_SaveCurrentTarget_Call) RunAndReturn(run func(metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateCollectionCurrentTarget provides a mock function with given fields: collectionID
|
||||
func (_m *MockTargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool {
|
||||
ret := _m.Called(collectionID)
|
||||
|
|
|
@ -19,6 +19,7 @@ package meta
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
@ -27,9 +28,11 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
|
@ -62,7 +65,8 @@ type TargetManagerInterface interface {
|
|||
GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64
|
||||
IsCurrentTargetExist(collectionID int64, partitionID int64) bool
|
||||
IsNextTargetExist(collectionID int64) bool
|
||||
Recover() error
|
||||
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
|
||||
Recover(catalog metastore.QueryCoordCatalog) error
|
||||
CanSegmentBeMoved(collectionID, segmentID int64) bool
|
||||
}
|
||||
|
||||
|
@ -76,17 +80,14 @@ type TargetManager struct {
|
|||
// all remove segment/channel operation happens on Both current and next -> delete status should be consistent
|
||||
current *target
|
||||
next *target
|
||||
|
||||
catalog metastore.QueryCoordCatalog
|
||||
}
|
||||
|
||||
func NewTargetManager(broker Broker, meta *Meta, catalog metastore.QueryCoordCatalog) *TargetManager {
|
||||
func NewTargetManager(broker Broker, meta *Meta) *TargetManager {
|
||||
return &TargetManager{
|
||||
broker: broker,
|
||||
meta: meta,
|
||||
current: newTarget(),
|
||||
next: newTarget(),
|
||||
catalog: catalog,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,12 +129,6 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool
|
|||
zap.Int64("version", newTarget.GetTargetVersion()),
|
||||
zap.String("partStatsVersion", partStatsVersionInfo),
|
||||
)
|
||||
|
||||
// save collection current target for fast recovery after qc restart
|
||||
err := mgr.catalog.SaveCollectionTargets(newTarget.toPbMsg())
|
||||
if err != nil {
|
||||
log.Warn("failed to save collection targets", zap.Error(err))
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -244,7 +239,6 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) {
|
|||
|
||||
mgr.current.removeCollectionTarget(collectionID)
|
||||
mgr.next.removeCollectionTarget(collectionID)
|
||||
mgr.catalog.RemoveCollectionTarget(collectionID)
|
||||
}
|
||||
|
||||
// RemovePartition removes all segment in the given partition,
|
||||
|
@ -549,11 +543,51 @@ func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool {
|
|||
return len(newChannels) > 0
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) Recover() error {
|
||||
func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
|
||||
mgr.rwMutex.Lock()
|
||||
defer mgr.rwMutex.Unlock()
|
||||
if mgr.current != nil {
|
||||
// use pool here to control maximal writer used by save target
|
||||
pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2)
|
||||
defer pool.Release()
|
||||
// use batch write in case of the number of collections is large
|
||||
batchSize := 16
|
||||
var wg sync.WaitGroup
|
||||
submit := func(tasks []typeutil.Pair[int64, *querypb.CollectionTarget]) {
|
||||
wg.Add(1)
|
||||
pool.Submit(func() (any, error) {
|
||||
defer wg.Done()
|
||||
ids := lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) int64 { return p.A })
|
||||
if err := catalog.SaveCollectionTargets(lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) *querypb.CollectionTarget {
|
||||
return p.B
|
||||
})...); err != nil {
|
||||
log.Warn("failed to save current target for collection", zap.Int64s("collectionIDs", ids), zap.Error(err))
|
||||
} else {
|
||||
log.Info("succeed to save current target for collection", zap.Int64s("collectionIDs", ids))
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
|
||||
for id, target := range mgr.current.collectionTargetMap {
|
||||
tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg()))
|
||||
if len(tasks) >= batchSize {
|
||||
submit(tasks)
|
||||
tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
|
||||
}
|
||||
}
|
||||
if len(tasks) > 0 {
|
||||
submit(tasks)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
|
||||
mgr.rwMutex.Lock()
|
||||
defer mgr.rwMutex.Unlock()
|
||||
|
||||
targets, err := mgr.catalog.GetCollectionTargets()
|
||||
targets, err := catalog.GetCollectionTargets()
|
||||
if err != nil {
|
||||
log.Warn("failed to recover collection target from etcd", zap.Error(err))
|
||||
return err
|
||||
|
@ -568,7 +602,14 @@ func (mgr *TargetManager) Recover() error {
|
|||
zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())),
|
||||
zap.Int64("version", newTarget.GetTargetVersion()),
|
||||
)
|
||||
|
||||
// clear target info in meta store
|
||||
err := catalog.RemoveCollectionTarget(t.GetCollectionID())
|
||||
if err != nil {
|
||||
log.Warn("failed to clear collection target from etcd", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -113,7 +113,7 @@ func (suite *TargetManagerSuite) SetupTest() {
|
|||
idAllocator := RandomIncrementIDAllocator()
|
||||
suite.meta = NewMeta(idAllocator, suite.catalog, session.NewNodeManager())
|
||||
suite.broker = NewMockBroker(suite.T())
|
||||
suite.mgr = NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.mgr = NewTargetManager(suite.broker, suite.meta)
|
||||
|
||||
for _, collection := range suite.collections {
|
||||
dmChannels := make([]*datapb.VchannelInfo, 0)
|
||||
|
@ -582,16 +582,13 @@ func (suite *TargetManagerSuite) TestRecover() {
|
|||
suite.mgr.UpdateCollectionNextTarget(collectionID)
|
||||
suite.mgr.UpdateCollectionCurrentTarget(collectionID)
|
||||
|
||||
// target should be save to meta store after update current target
|
||||
targets, err := suite.catalog.GetCollectionTargets()
|
||||
suite.NoError(err)
|
||||
suite.Len(targets, 1)
|
||||
suite.mgr.SaveCurrentTarget(suite.catalog)
|
||||
|
||||
// clear target in memory
|
||||
version := suite.mgr.current.getCollectionTarget(collectionID).GetTargetVersion()
|
||||
suite.mgr.current.removeCollectionTarget(collectionID)
|
||||
// try to recover
|
||||
suite.mgr.Recover()
|
||||
suite.mgr.Recover(suite.catalog)
|
||||
|
||||
target := suite.mgr.current.getCollectionTarget(collectionID)
|
||||
suite.NotNil(target)
|
||||
|
@ -599,9 +596,8 @@ func (suite *TargetManagerSuite) TestRecover() {
|
|||
suite.Len(target.GetAllSegmentIDs(), 2)
|
||||
suite.Equal(target.GetTargetVersion(), version)
|
||||
|
||||
// target should be removed from meta store after collection released
|
||||
suite.mgr.RemoveCollection(collectionID)
|
||||
targets, err = suite.catalog.GetCollectionTargets()
|
||||
// after recover, target info should be cleaned up
|
||||
targets, err := suite.catalog.GetCollectionTargets()
|
||||
suite.NoError(err)
|
||||
suite.Len(targets, 0)
|
||||
}
|
||||
|
|
|
@ -88,6 +88,10 @@ func (_c *MockQueryNodeServer_Delete_Call) RunAndReturn(run func(context.Context
|
|||
func (_m *MockQueryNodeServer) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DeleteBatch")
|
||||
}
|
||||
|
||||
var r0 *querypb.DeleteBatchResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
|
||||
|
|
|
@ -196,7 +196,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
|
|||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr)
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
suite.targetObserver = NewTargetObserver(suite.meta,
|
||||
suite.targetMgr,
|
||||
|
|
|
@ -91,7 +91,7 @@ func (suite *TargetObserverSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
|
||||
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.distMgr = meta.NewDistributionManager()
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster, nodeMgr)
|
||||
|
@ -300,7 +300,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
|
||||
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.distMgr = meta.NewDistributionManager()
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
suite.observer = NewTargetObserver(
|
||||
|
|
|
@ -102,7 +102,7 @@ func (suite *OpsServiceSuite) SetupTest() {
|
|||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.targetObserver = observers.NewTargetObserver(
|
||||
suite.meta,
|
||||
suite.targetMgr,
|
||||
|
|
|
@ -410,8 +410,8 @@ func (s *Server) initMeta() error {
|
|||
ChannelDistManager: meta.NewChannelDistManager(),
|
||||
LeaderViewManager: meta.NewLeaderViewManager(),
|
||||
}
|
||||
s.targetMgr = meta.NewTargetManager(s.broker, s.meta, s.store)
|
||||
err = s.targetMgr.Recover()
|
||||
s.targetMgr = meta.NewTargetManager(s.broker, s.meta)
|
||||
err = s.targetMgr.Recover(s.store)
|
||||
if err != nil {
|
||||
log.Warn("failed to recover collection targets", zap.Error(err))
|
||||
}
|
||||
|
@ -566,6 +566,12 @@ func (s *Server) Stop() error {
|
|||
s.targetObserver.Stop()
|
||||
}
|
||||
|
||||
// save target to meta store, after querycoord restart, make it fast to recover current target
|
||||
// should save target after target observer stop, incase of target changed
|
||||
if s.targetMgr != nil {
|
||||
s.targetMgr.SaveCurrentTarget(s.store)
|
||||
}
|
||||
|
||||
if s.replicaObserver != nil {
|
||||
s.replicaObserver.Stop()
|
||||
}
|
||||
|
|
|
@ -551,7 +551,7 @@ func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status quer
|
|||
func (suite *ServerSuite) hackServer() {
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.server.broker = suite.broker
|
||||
suite.server.targetMgr = meta.NewTargetManager(suite.broker, suite.server.meta, suite.server.store)
|
||||
suite.server.targetMgr = meta.NewTargetManager(suite.broker, suite.server.meta)
|
||||
suite.server.taskScheduler = task.NewScheduler(
|
||||
suite.server.ctx,
|
||||
suite.server.meta,
|
||||
|
|
|
@ -151,9 +151,7 @@ func (suite *ServiceSuite) SetupTest() {
|
|||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, suite.store)
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
|
||||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.targetObserver = observers.NewTargetObserver(
|
||||
suite.meta,
|
||||
suite.targetMgr,
|
||||
|
|
|
@ -155,7 +155,7 @@ func (suite *TaskSuite) SetupTest() {
|
|||
suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, session.NewNodeManager())
|
||||
suite.dist = meta.NewDistributionManager()
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
suite.target = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv))
|
||||
suite.target = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
|
||||
|
|
Loading…
Reference in New Issue