mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>poc1
parent
ad2ac68553
commit
dad2fd33d8
|
@ -19,6 +19,7 @@ class MilvusConan(ConanFile):
|
|||
"aws-c-common/0.8.2@milvus/dev",
|
||||
"aws-c-compression/0.2.15@milvus/dev",
|
||||
"aws-checksums/0.1.13@milvus/dev",
|
||||
"aws-c-sdkutils/0.1.3@milvus/dev",
|
||||
"aws-sdk-cpp/1.9.234",
|
||||
"googleapis/cci.20220711@milvus/dev",
|
||||
"benchmark/1.7.0",
|
||||
|
|
|
@ -145,25 +145,52 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
|
|||
unIndexedIDs.Insert(s.GetID())
|
||||
}
|
||||
}
|
||||
hasUnIndexed := true
|
||||
for hasUnIndexed {
|
||||
hasUnIndexed = false
|
||||
for id := range unIndexedIDs {
|
||||
// Indexed segments are compacted to a raw segment,
|
||||
// replace it with the indexed ones
|
||||
if len(segmentInfos[id].GetCompactionFrom()) > 0 {
|
||||
unIndexedIDs.Remove(id)
|
||||
for _, segID := range segmentInfos[id].GetCompactionFrom() {
|
||||
if indexed.Contain(segID) {
|
||||
indexedIDs.Insert(segID)
|
||||
} else {
|
||||
unIndexedIDs.Insert(segID)
|
||||
hasUnIndexed = true
|
||||
}
|
||||
}
|
||||
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
|
||||
// ================================================
|
||||
// Segments blood relationship:
|
||||
// a b
|
||||
// \ /
|
||||
// c d
|
||||
// \ /
|
||||
// e
|
||||
//
|
||||
// GC: a, b
|
||||
// Indexed: c, d, e
|
||||
// ||
|
||||
// || (Index dropped and creating new index and not finished)
|
||||
// \/
|
||||
// UnIndexed: c, d, e
|
||||
//
|
||||
// Retrieve unIndexed expected result:
|
||||
// unIndexed: c, d
|
||||
// ================================================
|
||||
isValid := func(ids ...UniqueID) bool {
|
||||
for _, id := range ids {
|
||||
if seg, ok := segmentInfos[id]; !ok || seg == nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
retrieveUnIndexed := func() bool {
|
||||
continueRetrieve := false
|
||||
for id := range unIndexedIDs {
|
||||
compactionFrom := segmentInfos[id].GetCompactionFrom()
|
||||
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
|
||||
for _, fromID := range compactionFrom {
|
||||
if indexed.Contain(fromID) {
|
||||
indexedIDs.Insert(fromID)
|
||||
} else {
|
||||
unIndexedIDs.Insert(fromID)
|
||||
continueRetrieve = true
|
||||
}
|
||||
}
|
||||
unIndexedIDs.Remove(id)
|
||||
droppedIDs.Remove(compactionFrom...)
|
||||
}
|
||||
}
|
||||
return continueRetrieve
|
||||
}
|
||||
for retrieveUnIndexed() {
|
||||
}
|
||||
|
||||
return &datapb.VchannelInfo{
|
||||
|
|
|
@ -2371,6 +2371,253 @@ func TestGetQueryVChanPositions(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
|
||||
t.Run("ab GC-ed, cde unIndexed", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
schema := newTestSchema()
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
ID: 0,
|
||||
Schema: schema,
|
||||
})
|
||||
err := svr.meta.CreateIndex(&model.Index{
|
||||
TenantID: "",
|
||||
CollectionID: 0,
|
||||
FieldID: 2,
|
||||
IndexID: 1,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
c := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(c))
|
||||
assert.Nil(t, err)
|
||||
d := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(d))
|
||||
assert.Nil(t, err)
|
||||
e := &datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CollectionID: 0,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
CompactionFrom: []int64{1, 2}, // c, d
|
||||
}
|
||||
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(e))
|
||||
assert.Nil(t, err)
|
||||
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
|
||||
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds))
|
||||
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
|
||||
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d
|
||||
})
|
||||
|
||||
t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
schema := newTestSchema()
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
ID: 0,
|
||||
Schema: schema,
|
||||
})
|
||||
err := svr.meta.CreateIndex(&model.Index{
|
||||
TenantID: "",
|
||||
CollectionID: 0,
|
||||
FieldID: 2,
|
||||
IndexID: 1,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
a := &datapb.SegmentInfo{
|
||||
ID: 99,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(a))
|
||||
assert.Nil(t, err)
|
||||
|
||||
c := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(c))
|
||||
assert.Nil(t, err)
|
||||
d := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(d))
|
||||
assert.Nil(t, err)
|
||||
e := &datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CollectionID: 0,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
CompactionFrom: []int64{1, 2}, // c, d
|
||||
}
|
||||
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(e))
|
||||
assert.Nil(t, err)
|
||||
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
|
||||
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds))
|
||||
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
|
||||
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d
|
||||
})
|
||||
|
||||
t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
schema := newTestSchema()
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
ID: 0,
|
||||
Schema: schema,
|
||||
})
|
||||
err := svr.meta.CreateIndex(&model.Index{
|
||||
TenantID: "",
|
||||
CollectionID: 0,
|
||||
FieldID: 2,
|
||||
IndexID: 1,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
c := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(c))
|
||||
assert.Nil(t, err)
|
||||
d := &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(d))
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
|
||||
SegmentID: 2,
|
||||
BuildID: 1,
|
||||
IndexID: 1,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
|
||||
BuildID: 1,
|
||||
State: commonpb.IndexState_Finished,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
e := &datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: "ch1",
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: "",
|
||||
Timestamp: 1,
|
||||
},
|
||||
CompactionFrom: []int64{1, 2}, // c, d
|
||||
}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(e))
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
|
||||
SegmentID: 3,
|
||||
BuildID: 2,
|
||||
IndexID: 1,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
|
||||
BuildID: 2,
|
||||
State: commonpb.IndexState_Finished,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
|
||||
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
|
||||
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
|
||||
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
|
||||
assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e
|
||||
})
|
||||
}
|
||||
|
||||
func TestShouldDropChannel(t *testing.T) {
|
||||
type myRootCoord struct {
|
||||
mocks.RootCoord
|
||||
|
|
Loading…
Reference in New Issue