fix: sync action load segment with lack collection index info list (#28788)

relate: https://github.com/milvus-io/milvus/issues/28779
https://github.com/milvus-io/milvus/issues/28637

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/28962/head
aoiasd 2023-12-04 18:14:34 +08:00 committed by GitHub
parent 45e6801ce4
commit b4af6f8c40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 18 deletions

View File

@ -594,6 +594,7 @@ message SyncDistributionRequest {
LoadMetaInfo load_meta = 6;
int64 replicaID = 7;
int64 version = 8;
repeated index.IndexInfo index_info_list = 9;
}
message ResourceGroup {

View File

@ -216,6 +216,14 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
log.Warn("failed to get collection info", zap.Error(err))
return false
}
// Get collection index info
indexInfo, err := o.broker.DescribeIndex(ctx, collectionInfo.CollectionID)
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
@ -236,7 +244,8 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()

View File

@ -18,6 +18,7 @@ package observers
import (
"context"
"fmt"
"testing"
"time"
@ -32,6 +33,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -121,6 +123,11 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
// will cause sync failed once
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once()
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
@ -155,7 +162,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
@ -213,6 +221,9 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.target.UpdateCollectionNextTarget(int64(1))
@ -247,7 +258,8 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
@ -344,6 +356,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil)
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
@ -381,7 +394,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
@ -413,7 +427,9 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
schema := utils.CreateTestSchema()
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
@ -451,7 +467,8 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
ch := make(chan struct{})
@ -495,6 +512,9 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
@ -520,7 +540,8 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)

View File

@ -353,6 +353,13 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView
return false
}
// Get collection index info
indexInfo, err := ob.broker.DescribeIndex(ctx, collectionInfo.GetCollectionID())
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
@ -367,7 +374,8 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()

View File

@ -213,7 +213,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
}, 7*time.Second, 1*time.Second)
}
func (suite *TargetObserverSuite) TearDownSuite() {
func (suite *TargetObserverSuite) TearDownTest() {
suite.kv.Close()
suite.observer.Stop()
}

View File

@ -1367,15 +1367,16 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
commonpbutil.WithMsgID(req.Base.GetMsgID()),
),
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: action.GetNodeID(),
Version: action.GetVersion(),
NeedTransfer: false,
LoadScope: querypb.LoadScope_Delta,
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: action.GetNodeID(),
Version: action.GetVersion(),
NeedTransfer: false,
LoadScope: querypb.LoadScope_Delta,
IndexInfoList: req.GetIndexInfoList(),
})
})
case querypb.SyncType_UpdateVersion: