fix: Record a map to avoid repeatedly traversing the CompactionFrom (#38925)

issue: #38811

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/38799/head
cai.zhang 2025-01-15 10:02:58 +08:00 committed by GitHub
parent ed31a5a4bf
commit 3a6408b237
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 249 additions and 21 deletions

View File

@ -157,12 +157,13 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
validSegmentInfos[s.GetID()] = s
if s.GetIsInvisible() && s.GetCreatedByCompaction() {
// skip invisible compaction segments
continue
}
validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
@ -229,41 +230,64 @@ func retrieveSegment(validSegmentInfos map[int64]*SegmentInfo,
) (typeutil.UniqueSet, typeutil.UniqueSet) {
newFlushedIDs := make(typeutil.UniqueSet)
isValid := func(ids ...UniqueID) bool {
isConditionMet := func(condition func(seg *SegmentInfo) bool, ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || !condition(seg) {
return false
}
}
return true
}
var compactionFromExist func(segID UniqueID) bool
compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
isValid := func(ids ...UniqueID) bool {
return isConditionMet(func(seg *SegmentInfo) bool {
return true
}, ids...)
}
isVisible := func(ids ...UniqueID) bool {
return isConditionMet(func(seg *SegmentInfo) bool {
return !seg.GetIsInvisible()
}, ids...)
}
var compactionFromExistWithCache func(segID UniqueID) bool
compactionFromExistWithCache = func(segID UniqueID) bool {
var compactionFromExist func(segID UniqueID) bool
compactionFromExistMap := make(map[UniqueID]bool)
compactionFromExist = func(segID UniqueID) bool {
if exist, ok := compactionFromExistMap[segID]; ok {
return exist
}
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
compactionFromExistMap[segID] = false
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
compactionFromExistMap[segID] = true
return true
}
if compactionFromExist(fromID) {
compactionFromExistMap[segID] = true
return true
}
}
compactionFromExistMap[segID] = false
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
return true
}
if compactionFromExist(fromID) {
return true
}
}
return false
return compactionFromExist(segID)
}
retrieve := func() bool {
continueRetrieve := false
for id := range flushedIDs {
compactionFrom := validSegmentInfos[id].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
if len(compactionFrom) == 0 {
newFlushedIDs.Insert(id)
continue
}
if segmentIndexed(id) && !compactionFromExist(id) {
} else if !compactionFromExistWithCache(id) && (segmentIndexed(id) || !isVisible(compactionFrom...)) {
newFlushedIDs.Insert(id)
} else {
for _, fromID := range compactionFrom {

View File

@ -968,6 +968,209 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds)
})
t.Run("compaction iterate", func(t *testing.T) {
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1))
assert.NoError(t, err)
seg2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
seg3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2},
IsInvisible: true,
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg3))
assert.NoError(t, err)
seg4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2},
IsInvisible: true,
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4))
assert.NoError(t, err)
seg5 := &datapb.SegmentInfo{
ID: 5,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
seg6 := &datapb.SegmentInfo{
ID: 6,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{4},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6))
assert.NoError(t, err)
seg7 := &datapb.SegmentInfo{
ID: 7,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{5, 6},
IsInvisible: true,
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7))
assert.NoError(t, err)
seg8 := &datapb.SegmentInfo{
ID: 8,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{5, 6},
IsInvisible: true,
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
seg9 := &datapb.SegmentInfo{
ID: 9,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{7},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)
seg10 := &datapb.SegmentInfo{
ID: 10,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{8},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
assert.NoError(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{}, vchan.UnflushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
})
}
func TestGetCurrentSegmentsView(t *testing.T) {

View File

@ -120,7 +120,8 @@ func (s *Server) getUnIndexTaskSegments(ctx context.Context) []*SegmentInfo {
func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
log := log.Ctx(ctx)
log.Info("start create index for segment loop...")
log.Info("start create index for segment loop...",
zap.Int64("TaskCheckInterval", Params.DataCoordCfg.TaskCheckInterval.GetAsInt64()))
defer s.serverLoopWg.Done()
ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second))