mirror of https://github.com/milvus-io/milvus.git
remove pull target from qc recover (#26775)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/26497/head
parent
e56b0018e4
commit
949c320185
|
@ -59,13 +59,22 @@ func (c *ChannelChecker) Description() string {
|
|||
return "DmChannelChecker checks the lack of DmChannels, or some DmChannels are redundant"
|
||||
}
|
||||
|
||||
func (c *ChannelChecker) readyToCheck(collectionID int64) bool {
|
||||
metaExist := (c.meta.GetCollection(collectionID) != nil)
|
||||
targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID)
|
||||
|
||||
return metaExist && targetExist
|
||||
}
|
||||
|
||||
func (c *ChannelChecker) Check(ctx context.Context) []task.Task {
|
||||
collectionIDs := c.meta.CollectionManager.GetAll()
|
||||
tasks := make([]task.Task, 0)
|
||||
for _, cid := range collectionIDs {
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(cid)
|
||||
for _, r := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, r)...)
|
||||
if c.readyToCheck(cid) {
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(cid)
|
||||
for _, r := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, r)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -135,9 +135,22 @@ func (suite *ChannelCheckerTestSuite) TestLoadChannel() {
|
|||
func (suite *ChannelCheckerTestSuite) TestReduceChannel() {
|
||||
checker := suite.checker
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1}))
|
||||
|
||||
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel"))
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel1",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, nil, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
|
||||
|
||||
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel1"))
|
||||
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2"))
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 1)
|
||||
suite.EqualValues(1, tasks[0].ReplicaID())
|
||||
|
@ -146,7 +159,7 @@ func (suite *ChannelCheckerTestSuite) TestReduceChannel() {
|
|||
action := tasks[0].Actions()[0].(*task.ChannelAction)
|
||||
suite.Equal(task.ActionTypeReduce, action.Type())
|
||||
suite.EqualValues(1, action.Node())
|
||||
suite.EqualValues("test-insert-channel", action.ChannelName())
|
||||
suite.EqualValues("test-insert-channel2", action.ChannelName())
|
||||
}
|
||||
|
||||
func (suite *ChannelCheckerTestSuite) TestRepeatedChannels() {
|
||||
|
|
|
@ -84,7 +84,6 @@ func (suite *CheckerControllerSuite) SetupTest() {
|
|||
}
|
||||
|
||||
func (suite *CheckerControllerSuite) TestBasic() {
|
||||
|
||||
// set meta
|
||||
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
|
@ -95,20 +94,27 @@ func (suite *CheckerControllerSuite) TestBasic() {
|
|||
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// set target
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel2",
|
||||
},
|
||||
}
|
||||
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
ID: 3,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
InsertChannel: "test-insert-channel2",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
channels, segments, nil)
|
||||
suite.targetManager.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
suite.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
suite.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
suite.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 2}, map[int64]*meta.Segment{}))
|
||||
|
||||
counter := atomic.NewInt64(0)
|
||||
suite.scheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) {
|
||||
|
|
|
@ -64,13 +64,22 @@ func (c *SegmentChecker) Description() string {
|
|||
return "SegmentChecker checks the lack of segments, or some segments are redundant"
|
||||
}
|
||||
|
||||
func (c *SegmentChecker) readyToCheck(collectionID int64) bool {
|
||||
metaExist := (c.meta.GetCollection(collectionID) != nil)
|
||||
targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID)
|
||||
|
||||
return metaExist && targetExist
|
||||
}
|
||||
|
||||
func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
|
||||
collectionIDs := c.meta.CollectionManager.GetAll()
|
||||
tasks := make([]task.Task, 0)
|
||||
for _, cid := range collectionIDs {
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(cid)
|
||||
for _, r := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, r)...)
|
||||
if c.readyToCheck(cid) {
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(cid)
|
||||
for _, r := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, r)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -121,8 +121,16 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
|
|||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
|
@ -159,8 +167,15 @@ func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() {
|
|||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
|
@ -177,8 +192,20 @@ func (suite *SegmentCheckerTestSuite) TestReleaseSegments() {
|
|||
checker := suite.checker
|
||||
// set meta
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
|
||||
|
||||
// set target
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, nil, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
|
@ -210,8 +237,14 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
|
|||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
|
||||
|
||||
// set dist
|
||||
|
@ -249,9 +282,16 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
|
|||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, collectionID, []int64{1, 2}))
|
||||
|
||||
// set target
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
SeekPosition: &msgpb.MsgPosition{Timestamp: 10},
|
||||
},
|
||||
}
|
||||
segments := []*datapb.SegmentInfo{}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
channels, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTarget(collectionID)
|
||||
checker.targetMgr.UpdateCollectionCurrentTarget(collectionID)
|
||||
readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)
|
||||
|
|
|
@ -146,6 +146,13 @@ func (ob *CollectionObserver) observeTimeout() {
|
|||
}
|
||||
}
|
||||
|
||||
func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {
|
||||
metaExist := (ob.meta.GetCollection(collectionID) != nil)
|
||||
targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID)
|
||||
|
||||
return metaExist && targetExist
|
||||
}
|
||||
|
||||
func (ob *CollectionObserver) observeLoadStatus() {
|
||||
partitions := ob.meta.CollectionManager.GetAllPartitions()
|
||||
if len(partitions) > 0 {
|
||||
|
@ -156,9 +163,11 @@ func (ob *CollectionObserver) observeLoadStatus() {
|
|||
if partition.LoadPercentage == 100 {
|
||||
continue
|
||||
}
|
||||
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
|
||||
ob.observePartitionLoadStatus(partition, replicaNum)
|
||||
loading = true
|
||||
if ob.readyToObserve(partition.CollectionID) {
|
||||
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
|
||||
ob.observePartitionLoadStatus(partition, replicaNum)
|
||||
loading = true
|
||||
}
|
||||
}
|
||||
// trigger check logic when loading collections/partitions
|
||||
if loading {
|
||||
|
|
|
@ -90,10 +90,19 @@ func (o *LeaderObserver) observe(ctx context.Context) {
|
|||
o.observeSegmentsDist(ctx)
|
||||
}
|
||||
|
||||
func (o *LeaderObserver) readyToObserve(collectionID int64) bool {
|
||||
metaExist := (o.meta.GetCollection(collectionID) != nil)
|
||||
targetExist := o.target.IsNextTargetExist(collectionID) || o.target.IsCurrentTargetExist(collectionID)
|
||||
|
||||
return metaExist && targetExist
|
||||
}
|
||||
|
||||
func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) {
|
||||
collectionIDs := o.meta.CollectionManager.GetAll()
|
||||
for _, cid := range collectionIDs {
|
||||
o.observeCollection(ctx, cid)
|
||||
if o.readyToObserve(cid) {
|
||||
o.observeCollection(ctx, cid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -212,6 +212,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
|
|||
channels, segments, nil)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
observer.target.UpdateCollectionCurrentTarget(1)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"),
|
||||
utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel"))
|
||||
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
|
@ -407,12 +408,26 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
|
|||
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
|
||||
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
|
||||
|
||||
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{}))
|
||||
|
||||
schema := utils.CreateTestSchema()
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
|
||||
|
||||
channels := []*datapb.VchannelInfo{
|
||||
{
|
||||
CollectionID: 1,
|
||||
ChannelName: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, nil, nil)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
observer.target.UpdateCollectionCurrentTarget(1)
|
||||
|
||||
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
|
||||
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{})
|
||||
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
|
||||
observer.dist.LeaderViewManager.Update(2, view)
|
||||
|
||||
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
|
||||
return &querypb.SyncDistributionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
|
|
|
@ -140,6 +140,14 @@ func (ob *TargetObserver) Check(collectionID int64) bool {
|
|||
}
|
||||
|
||||
func (ob *TargetObserver) check(collectionID int64) {
|
||||
if !ob.meta.Exist(collectionID) {
|
||||
ob.ReleaseCollection(collectionID)
|
||||
ob.targetMgr.RemoveCollection(collectionID)
|
||||
log.Info("collection has been removed from target observer",
|
||||
zap.Int64("collectionID", collectionID))
|
||||
return
|
||||
}
|
||||
|
||||
if ob.shouldUpdateCurrentTarget(collectionID) {
|
||||
ob.updateCurrentTarget(collectionID)
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
|
@ -407,11 +406,9 @@ func (s *Server) startQueryCoord() error {
|
|||
go s.handleNodeUpLoop()
|
||||
go s.watchNodes(revision)
|
||||
|
||||
log.Info("start recovering dist and target")
|
||||
err = s.recover()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Recover dist, to avoid generate too much task when dist not ready after restart
|
||||
s.distController.SyncAll(s.ctx)
|
||||
|
||||
s.startServerLoop()
|
||||
s.afterStart()
|
||||
s.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
@ -573,39 +570,6 @@ func (s *Server) SetQueryNodeCreator(f func(ctx context.Context, addr string, no
|
|||
s.queryNodeCreator = f
|
||||
}
|
||||
|
||||
func (s *Server) recover() error {
|
||||
// Recover target managers
|
||||
group, ctx := errgroup.WithContext(s.ctx)
|
||||
for _, collection := range s.meta.GetAll() {
|
||||
collection := collection
|
||||
group.Go(func() error {
|
||||
return s.recoverCollectionTargets(ctx, collection)
|
||||
})
|
||||
}
|
||||
err := group.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Recover dist
|
||||
s.distController.SyncAll(s.ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) recoverCollectionTargets(ctx context.Context, collection int64) error {
|
||||
err := s.targetMgr.UpdateCollectionNextTarget(collection)
|
||||
if err != nil {
|
||||
s.meta.CollectionManager.RemoveCollection(collection)
|
||||
s.meta.ReplicaManager.RemoveCollection(collection)
|
||||
log.Error("failed to recover collection due to update next target failed",
|
||||
zap.Int64("collectionID", collection),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) watchNodes(revision int64) {
|
||||
defer s.wg.Done()
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
|
@ -139,7 +138,7 @@ func (suite *ServerSuite) SetupTest() {
|
|||
|
||||
suite.loadAll()
|
||||
for _, collection := range suite.collections {
|
||||
suite.assertLoaded(collection)
|
||||
suite.True(suite.server.meta.Exist(collection))
|
||||
suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded)
|
||||
}
|
||||
}
|
||||
|
@ -165,27 +164,7 @@ func (suite *ServerSuite) TestRecover() {
|
|||
suite.NoError(err)
|
||||
|
||||
for _, collection := range suite.collections {
|
||||
suite.assertLoaded(collection)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ServerSuite) TestRecoverFailed() {
|
||||
err := suite.server.Stop()
|
||||
suite.NoError(err)
|
||||
|
||||
suite.server, err = suite.newQueryCoord()
|
||||
suite.NoError(err)
|
||||
|
||||
broker := meta.NewMockBroker(suite.T())
|
||||
for _, collection := range suite.collections {
|
||||
broker.EXPECT().GetRecoveryInfoV2(context.TODO(), collection).Return(nil, nil, errors.New("CollectionNotExist"))
|
||||
}
|
||||
suite.server.targetMgr = meta.NewTargetManager(broker, suite.server.meta)
|
||||
err = suite.server.Start()
|
||||
suite.NoError(err)
|
||||
|
||||
for _, collection := range suite.collections {
|
||||
suite.Nil(suite.server.targetMgr.GetDmChannelsByCollection(collection, meta.NextTarget))
|
||||
suite.True(suite.server.meta.Exist(collection))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -396,18 +375,6 @@ func (suite *ServerSuite) loadAll() {
|
|||
}
|
||||
}
|
||||
|
||||
func (suite *ServerSuite) assertLoaded(collection int64) {
|
||||
suite.True(suite.server.meta.Exist(collection))
|
||||
for _, channel := range suite.channels[collection] {
|
||||
suite.NotNil(suite.server.targetMgr.GetDmChannel(collection, channel, meta.NextTarget))
|
||||
}
|
||||
for _, partitions := range suite.segments[collection] {
|
||||
for _, segment := range partitions {
|
||||
suite.NotNil(suite.server.targetMgr.GetHistoricalSegment(collection, segment, meta.NextTarget))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ServerSuite) expectGetRecoverInfo(collection int64) {
|
||||
vChannels := []*datapb.VchannelInfo{}
|
||||
for _, channel := range suite.channels[collection] {
|
||||
|
|
|
@ -1726,7 +1726,7 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) {
|
|||
}
|
||||
suite.broker.EXPECT().
|
||||
GetRecoveryInfoV2(mock.Anything, collection, mock.Anything, mock.Anything).
|
||||
Return(vChannels, segmentBinlogs, nil)
|
||||
Return(vChannels, segmentBinlogs, nil).Maybe()
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) expectLoadPartitions() {
|
||||
|
|
Loading…
Reference in New Issue