Fix the target updated before version updated to cause data missing (#28257)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/28241/head
yah01 2023-11-08 18:54:18 +08:00 committed by GitHub
parent 12a09231f1
commit 385507ce47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 162 additions and 131 deletions

View File

@ -162,6 +162,7 @@ func (suite *JobSuite) SetupTest() {
suite.targetMgr,
suite.dist,
suite.broker,
suite.cluster,
)
suite.targetObserver.Start()
suite.scheduler = NewScheduler()

View File

@ -231,10 +231,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
log.Warn("failed to manual check current target, skip update load status")
return
}
if !ob.leaderObserver.CheckTargetVersion(ctx, partition.GetCollectionID()) {
log.Warn("failed to manual check leader target version ,skip update load status")
return
}
delete(ob.partitionLoadedCount, partition.GetPartitionID())
}
collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(partition.PartitionID, loadPercentage)

View File

@ -59,6 +59,7 @@ type CollectionObserverSuite struct {
kv kv.MetaKv
store metastore.QueryCoordCatalog
broker *meta.MockBroker
cluster *session.MockCluster
// Dependencies
dist *meta.DistributionManager
@ -189,10 +190,12 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.cluster = session.NewMockCluster(suite.T())
suite.targetObserver = NewTargetObserver(suite.meta,
suite.targetMgr,
suite.dist,
suite.broker,
suite.cluster,
)
suite.checkerController = &checkers.CheckerController{}

View File

@ -21,7 +21,6 @@ import (
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -132,77 +131,11 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
actions := o.findNeedLoadedSegments(leaderView, dists)
actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...)
updateVersionAction := o.checkNeedUpdateTargetVersion(ctx, leaderView)
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
o.sync(ctx, replica.GetID(), leaderView, actions)
}
}
}
func (o *LeaderObserver) CheckTargetVersion(ctx context.Context, collectionID int64) bool {
// if not ready to observer, skip add task
if !o.readyToObserve(collectionID) {
return false
}
result := o.checkCollectionLeaderVersionIsCurrent(ctx, collectionID)
if !result {
o.dispatcher.AddTask(collectionID)
}
return result
}
func (o *LeaderObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool {
replicas := o.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
return false
}
action := o.checkNeedUpdateTargetVersion(ctx, leaderView)
if action != nil {
return false
}
}
}
return true
}
func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction {
log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60)
targetVersion := o.target.GetCollectionTargetVersion(leaderView.CollectionID, meta.CurrentTarget)
if targetVersion <= leaderView.TargetVersion {
return nil
}
log.RatedInfo(10, "Update readable segment version",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channelName", leaderView.Channel),
zap.Int64("nodeID", leaderView.ID),
zap.Int64("oldVersion", leaderView.TargetVersion),
zap.Int64("newVersion", targetVersion),
)
sealedSegments := o.target.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
growingSegments := o.target.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
droppedSegments := o.target.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget)
return &querypb.SyncAction{
Type: querypb.SyncType_UpdateVersion,
GrowingInTarget: growingSegments.Collect(),
SealedInTarget: lo.Keys(sealedSegments),
DroppedInTarget: droppedSegments,
TargetVersion: targetVersion,
}
}
func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction {
ret := make([]*querypb.SyncAction, 0)
dists = utils.FindMaxVersionSegments(dists)

View File

@ -540,58 +540,6 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
)
}
func (suite *LeaderObserverTestSuite) TestSyncTargetVersion() {
collectionID := int64(1001)
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(collectionID, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(collectionID, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, collectionID, []int64{1, 2}))
nextTargetChannels := []*datapb.VchannelInfo{
{
CollectionID: collectionID,
ChannelName: "channel-1",
UnflushedSegmentIds: []int64{22, 33},
},
{
CollectionID: collectionID,
ChannelName: "channel-2",
UnflushedSegmentIds: []int64{44},
},
}
nextTargetSegments := []*datapb.SegmentInfo{
{
ID: 11,
PartitionID: 1,
InsertChannel: "channel-1",
},
{
ID: 12,
PartitionID: 1,
InsertChannel: "channel-2",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil)
suite.observer.target.UpdateCollectionNextTarget(collectionID)
suite.observer.target.UpdateCollectionCurrentTarget(collectionID)
TargetVersion := suite.observer.target.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)
view := utils.CreateTestLeaderView(1, collectionID, "channel-1", nil, nil)
view.TargetVersion = TargetVersion
action := observer.checkNeedUpdateTargetVersion(context.Background(), view)
suite.Nil(action)
view.TargetVersion = TargetVersion - 1
action = observer.checkNeedUpdateTargetVersion(context.Background(), view)
suite.NotNil(action)
suite.Equal(querypb.SyncType_UpdateVersion, action.Type)
suite.Len(action.GrowingInTarget, 2)
suite.Len(action.SealedInTarget, 1)
}
func TestLeaderObserverSuite(t *testing.T) {
suite.Run(t, new(LeaderObserverTestSuite))
}

View File

@ -21,12 +21,18 @@ import (
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -50,6 +56,7 @@ type TargetObserver struct {
targetMgr *meta.TargetManager
distMgr *meta.DistributionManager
broker meta.Broker
cluster session.Cluster
initChan chan initRequest
manualCheck chan checkRequest
@ -64,12 +71,19 @@ type TargetObserver struct {
stopOnce sync.Once
}
func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *meta.DistributionManager, broker meta.Broker) *TargetObserver {
func NewTargetObserver(
meta *meta.Meta,
targetMgr *meta.TargetManager,
distMgr *meta.DistributionManager,
broker meta.Broker,
cluster session.Cluster,
) *TargetObserver {
result := &TargetObserver{
meta: meta,
targetMgr: targetMgr,
distMgr: distMgr,
broker: broker,
cluster: cluster,
manualCheck: make(chan checkRequest, 10),
nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](),
updateChan: make(chan targetUpdateRequest),
@ -122,7 +136,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
case <-ob.initChan:
for _, collectionID := range ob.meta.GetAll() {
ob.init(collectionID)
ob.init(ctx, collectionID)
}
log.Info("target observer init done")
@ -164,7 +178,7 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
return
}
if ob.shouldUpdateCurrentTarget(collectionID) {
if ob.shouldUpdateCurrentTarget(ctx, collectionID) {
ob.updateCurrentTarget(collectionID)
}
@ -174,14 +188,14 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
}
}
func (ob *TargetObserver) init(collectionID int64) {
func (ob *TargetObserver) init(ctx context.Context, collectionID int64) {
// pull next target first if not exist
if !ob.targetMgr.IsNextTargetExist(collectionID) {
ob.updateNextTarget(collectionID)
}
// try to update current target if all segment/channel are ready
if ob.shouldUpdateCurrentTarget(collectionID) {
if ob.shouldUpdateCurrentTarget(ctx, collectionID) {
ob.updateCurrentTarget(collectionID)
}
}
@ -263,7 +277,7 @@ func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) {
ob.nextTargetLastUpdate.Insert(collectionID, time.Now())
}
func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collectionID int64) bool {
replicaNum := ob.meta.CollectionManager.GetReplicaNumber(collectionID)
// check channel first
@ -293,9 +307,131 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
}
}
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
actions := make([]*querypb.SyncAction, 0, 1)
for _, replica := range replicas {
leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
actions = actions[:0]
leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
continue
}
updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leaderView)
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
if !ob.sync(ctx, replica.GetID(), leaderView, actions) {
return false
}
}
}
return true
}
func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool {
if len(diffs) == 0 {
return true
}
log := log.With(
zap.Int64("leaderID", leaderView.ID),
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
)
schema, err := ob.broker.GetCollectionSchema(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}
req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
),
CollectionID: leaderView.CollectionID,
ReplicaID: replicaID,
Channel: leaderView.Channel,
Actions: diffs,
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
LoadType: ob.meta.GetLoadType(leaderView.CollectionID),
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
log.Warn("failed to sync distribution", zap.Error(err))
return false
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason()))
return false
}
return true
}
func (ob *TargetObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool {
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
return false
}
action := ob.checkNeedUpdateTargetVersion(ctx, leaderView)
if action != nil {
return false
}
}
}
return true
}
func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction {
log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60)
targetVersion := ob.targetMgr.GetCollectionTargetVersion(leaderView.CollectionID, meta.NextTarget)
if targetVersion <= leaderView.TargetVersion {
return nil
}
log.RatedInfo(10, "Update readable segment version",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channelName", leaderView.Channel),
zap.Int64("nodeID", leaderView.ID),
zap.Int64("oldVersion", leaderView.TargetVersion),
zap.Int64("newVersion", targetVersion),
)
sealedSegments := ob.targetMgr.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
growingSegments := ob.targetMgr.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
droppedSegments := ob.targetMgr.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
return &querypb.SyncAction{
Type: querypb.SyncType_UpdateVersion,
GrowingInTarget: growingSegments.Collect(),
SealedInTarget: lo.Keys(sealedSegments),
DroppedInTarget: droppedSegments,
TargetVersion: targetVersion,
}
}
func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID))
if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) {

View File

@ -46,6 +46,7 @@ type TargetObserverSuite struct {
targetMgr *meta.TargetManager
distMgr *meta.DistributionManager
broker *meta.MockBroker
cluster *session.MockCluster
observer *TargetObserver
@ -82,7 +83,8 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.distMgr = meta.NewDistributionManager()
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker)
suite.cluster = session.NewMockCluster(suite.T())
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster)
suite.collectionID = int64(1000)
suite.partitionID = int64(100)
@ -225,6 +227,7 @@ type TargetObserverCheckSuite struct {
targetMgr *meta.TargetManager
distMgr *meta.DistributionManager
broker *meta.MockBroker
cluster *session.MockCluster
observer *TargetObserver
@ -258,7 +261,14 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.distMgr = meta.NewDistributionManager()
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker)
suite.cluster = session.NewMockCluster(suite.T())
suite.observer = NewTargetObserver(
suite.meta,
suite.targetMgr,
suite.distMgr,
suite.broker,
suite.cluster,
)
suite.collectionID = int64(1000)
suite.partitionID = int64(100)

View File

@ -371,6 +371,7 @@ func (s *Server) initObserver() {
s.targetMgr,
s.dist,
s.broker,
s.cluster,
)
s.collectionObserver = observers.NewCollectionObserver(
s.dist,

View File

@ -558,6 +558,7 @@ func (suite *ServerSuite) hackServer() {
suite.server.targetMgr,
suite.server.dist,
suite.broker,
suite.server.cluster,
)
suite.server.collectionObserver = observers.NewCollectionObserver(
suite.server.dist,

View File

@ -149,6 +149,7 @@ func (suite *ServiceSuite) SetupTest() {
suite.targetMgr,
suite.dist,
suite.broker,
suite.cluster,
)
suite.targetObserver.Start()
for _, node := range suite.nodes {
@ -157,6 +158,7 @@ func (suite *ServiceSuite) SetupTest() {
suite.NoError(err)
}
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.jobScheduler.Start()