enhance: Explicitly pass LevelZero segment ids in vchan info (#29612)

See also #27675

For `GetRecoveryInfo` & `GetRecoveryInfoV2`, Level zero segment ids
shall be specified in vchan info so that querycoord could re-fetch
current segment info during watch procedure without having all segment
info

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29688/head
congqixia 2024-01-04 16:46:45 +08:00 committed by GitHub
parent 99e0f1e65a
commit aa967de0a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 8 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/retry"
@ -124,12 +125,13 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
unIndexedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
)
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
partitionSet := typeutil.NewUniqueSet(validPartitions...)
for _, s := range segments {
if (partitionSet.Len() > 0 && !partitionSet.Contain(s.PartitionID)) ||
if (partitionSet.Len() > 0 && !partitionSet.Contain(s.PartitionID) && s.GetPartitionID() != common.InvalidPartitionID) ||
(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
continue
}
@ -143,6 +145,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()):
indexedIDs.Insert(s.GetID())
case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed
@ -209,6 +213,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
FlushedSegmentIds: indexedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
LevelZeroSegmentIds: levelZeroIDs.Collect(),
}
}

View File

@ -2199,6 +2199,28 @@ func TestGetQueryVChanPositions(t *testing.T) {
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3))
assert.NoError(t, err)
s4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: common.InvalidPartitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
Level: datapb.SegmentLevel_L0,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s4))
assert.NoError(t, err)
//mockResp := &indexpb.GetIndexInfoResponse{
// Status: &commonpb.Status{},
// SegmentInfo: map[int64]*indexpb.SegmentInfo{
@ -2217,23 +2239,25 @@ func TestGetQueryVChanPositions(t *testing.T) {
//}
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "chx1", CollectionID: 0})
assert.Empty(t, vchan.UnflushedSegmentIds)
assert.Empty(t, vchan.FlushedSegmentIds)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.ElementsMatch(t, []int64{1}, vchan.FlushedSegmentIds)
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
assert.EqualValues(t, 1, len(vchan.GetLevelZeroSegmentIds()))
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
infos := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch0_suffix", CollectionID: 1})
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds))
assert.EqualValues(t, 0, len(infos.GetLevelZeroSegmentIds()))
})
t.Run("filter partition", func(t *testing.T) {
@ -2241,6 +2265,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds))
assert.EqualValues(t, 1, len(infos.GetLevelZeroSegmentIds()))
})
t.Run("empty collection with passed positions", func(t *testing.T) {
@ -2250,9 +2275,10 @@ func TestGetQueryVChanPositions(t *testing.T) {
Name: vchannel,
CollectionID: 0,
StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}},
}, allPartitionID)
})
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, vchannel, infos.ChannelName)
assert.EqualValues(t, 0, len(infos.GetLevelZeroSegmentIds()))
})
}
@ -2321,7 +2347,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d
@ -2407,7 +2433,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.FlushedSegmentIds) // expected c, d
@ -2499,7 +2525,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
})
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}, allPartitionID)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e

View File

@ -265,6 +265,7 @@ message VchannelInfo {
repeated int64 dropped_segmentIds = 9;
repeated int64 indexed_segmentIds = 10;
repeated SegmentInfo indexed_segments = 11;
repeated int64 level_zero_segment_ids = 12;
}
message WatchDmChannelsRequest {

View File

@ -199,6 +199,7 @@ func fillSubChannelRequest(
segmentIDs := typeutil.NewUniqueSet()
for _, vchannel := range req.GetInfos() {
segmentIDs.Insert(vchannel.GetUnflushedSegmentIds()...)
segmentIDs.Insert(vchannel.GetLevelZeroSegmentIds()...)
}
if segmentIDs.Len() == 0 {