From 3a6408b2373d7adde56757f5a1ba9686bde5853b Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 15 Jan 2025 10:02:58 +0800 Subject: [PATCH] fix: Record a map to avoid repeatedly traversing the CompactionFrom (#38925) issue: #38811 Signed-off-by: Cai Zhang --- internal/datacoord/handler.go | 64 ++++++--- internal/datacoord/handler_test.go | 203 ++++++++++++++++++++++++++++ internal/datacoord/index_service.go | 3 +- 3 files changed, 249 insertions(+), 21 deletions(-) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 322da641f4..11990c8b97 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -157,12 +157,13 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // Skip bulk insert segments. continue } + validSegmentInfos[s.GetID()] = s + if s.GetIsInvisible() && s.GetCreatedByCompaction() { // skip invisible compaction segments continue } - validSegmentInfos[s.GetID()] = s switch { case s.GetState() == commonpb.SegmentState_Dropped: droppedIDs.Insert(s.GetID()) @@ -229,41 +230,64 @@ func retrieveSegment(validSegmentInfos map[int64]*SegmentInfo, ) (typeutil.UniqueSet, typeutil.UniqueSet) { newFlushedIDs := make(typeutil.UniqueSet) - isValid := func(ids ...UniqueID) bool { + isConditionMet := func(condition func(seg *SegmentInfo) bool, ids ...UniqueID) bool { for _, id := range ids { - if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() { + if seg, ok := validSegmentInfos[id]; !ok || seg == nil || !condition(seg) { return false } } return true } - var compactionFromExist func(segID UniqueID) bool - compactionFromExist = func(segID UniqueID) bool { - compactionFrom := validSegmentInfos[segID].GetCompactionFrom() - if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + isValid := func(ids ...UniqueID) bool { + return isConditionMet(func(seg *SegmentInfo) bool { + return true + }, ids...) + } + + isVisible := func(ids ...UniqueID) bool { + return isConditionMet(func(seg *SegmentInfo) bool { + return !seg.GetIsInvisible() + }, ids...) + } + + var compactionFromExistWithCache func(segID UniqueID) bool + compactionFromExistWithCache = func(segID UniqueID) bool { + var compactionFromExist func(segID UniqueID) bool + compactionFromExistMap := make(map[UniqueID]bool) + + compactionFromExist = func(segID UniqueID) bool { + if exist, ok := compactionFromExistMap[segID]; ok { + return exist + } + compactionFrom := validSegmentInfos[segID].GetCompactionFrom() + if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + compactionFromExistMap[segID] = false + return false + } + for _, fromID := range compactionFrom { + if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) { + compactionFromExistMap[segID] = true + return true + } + if compactionFromExist(fromID) { + compactionFromExistMap[segID] = true + return true + } + } + compactionFromExistMap[segID] = false return false } - for _, fromID := range compactionFrom { - if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) { - return true - } - if compactionFromExist(fromID) { - return true - } - } - return false + return compactionFromExist(segID) } retrieve := func() bool { continueRetrieve := false for id := range flushedIDs { compactionFrom := validSegmentInfos[id].GetCompactionFrom() - if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + if len(compactionFrom) == 0 { newFlushedIDs.Insert(id) - continue - } - if segmentIndexed(id) && !compactionFromExist(id) { + } else if !compactionFromExistWithCache(id) && (segmentIndexed(id) || !isVisible(compactionFrom...)) { newFlushedIDs.Insert(id) } else { for _, fromID := range compactionFrom { diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index 8164363643..06b332cea1 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -968,6 +968,209 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds) assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds) }) + + t.Run("compaction iterate", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Partitions: []int64{0}, + Schema: schema, + }) + err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + seg1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) + assert.NoError(t, err) + seg2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) + assert.NoError(t, err) + seg3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{1, 2}, + IsInvisible: true, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg3)) + assert.NoError(t, err) + seg4 := &datapb.SegmentInfo{ + ID: 4, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{1, 2}, + IsInvisible: true, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4)) + assert.NoError(t, err) + seg5 := &datapb.SegmentInfo{ + ID: 5, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{3}, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) + assert.NoError(t, err) + seg6 := &datapb.SegmentInfo{ + ID: 6, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{4}, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6)) + assert.NoError(t, err) + seg7 := &datapb.SegmentInfo{ + ID: 7, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{5, 6}, + IsInvisible: true, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7)) + assert.NoError(t, err) + seg8 := &datapb.SegmentInfo{ + ID: 8, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{5, 6}, + IsInvisible: true, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8)) + assert.NoError(t, err) + seg9 := &datapb.SegmentInfo{ + ID: 9, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{7}, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9)) + assert.NoError(t, err) + seg10 := &datapb.SegmentInfo{ + ID: 10, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{8}, + CreatedByCompaction: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10)) + assert.NoError(t, err) + + vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) + assert.ElementsMatch(t, []int64{5, 6}, vchan.FlushedSegmentIds) + assert.ElementsMatch(t, []int64{}, vchan.UnflushedSegmentIds) + assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds) + }) } func TestGetCurrentSegmentsView(t *testing.T) { diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 81d9cdd8ca..6139306e2b 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -120,7 +120,8 @@ func (s *Server) getUnIndexTaskSegments(ctx context.Context) []*SegmentInfo { func (s *Server) createIndexForSegmentLoop(ctx context.Context) { log := log.Ctx(ctx) - log.Info("start create index for segment loop...") + log.Info("start create index for segment loop...", + zap.Int64("TaskCheckInterval", Params.DataCoordCfg.TaskCheckInterval.GetAsInt64())) defer s.serverLoopWg.Done() ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second))