From b37d6fa0f9d13d37a0108d715eb20ecf6b64de5c Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 1 Jul 2024 10:20:08 +0800 Subject: [PATCH] enhance: decrease cpu overhead during filter segments on datacoord (#34231) issue: https://github.com/milvus-io/milvus/issues/33129 pr: #33130 pr: #33373 --------- Signed-off-by: jaime --- internal/datacoord/compaction_trigger_test.go | 73 ++- internal/datacoord/garbage_collector_test.go | 9 +- internal/datacoord/index_service_test.go | 7 +- internal/datacoord/meta_test.go | 92 ++++ internal/datacoord/segment_info.go | 108 +++-- internal/datacoord/segment_operator.go | 22 +- .../datacoord/sync_segments_scheduler_test.go | 456 +++++++++--------- internal/querynodev2/segments/pool.go | 3 + internal/querynodev2/segments/segment.go | 2 +- 9 files changed, 466 insertions(+), 306 deletions(-) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 56710ed80e..7816671814 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -2197,34 +2197,63 @@ func (s *CompactionTriggerSuite) SetupTest() { catalog := mocks.NewDataCoordCatalog(s.T()) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil) + seg1 := &SegmentInfo{ + SegmentInfo: s.genSeg(1, 60), + lastFlushTime: time.Now().Add(-100 * time.Minute), + } + seg2 := &SegmentInfo{ + SegmentInfo: s.genSeg(2, 60), + lastFlushTime: time.Now(), + } + seg3 := &SegmentInfo{ + SegmentInfo: s.genSeg(3, 60), + lastFlushTime: time.Now(), + } + seg4 := &SegmentInfo{ + SegmentInfo: s.genSeg(4, 60), + lastFlushTime: time.Now(), + } + seg5 := &SegmentInfo{ + SegmentInfo: s.genSeg(5, 60), + lastFlushTime: time.Now(), + } + seg6 := &SegmentInfo{ + SegmentInfo: s.genSeg(6, 60), + lastFlushTime: time.Now(), + } + s.meta = &meta{ channelCPs: newChannelCps(), catalog: catalog, segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: s.genSeg(1, 60), - lastFlushTime: time.Now().Add(-100 * time.Minute), + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + 5: seg5, + 6: seg6, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + s.collectionID: { + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + 5: seg5, + 6: seg6, + }, }, - 2: { - SegmentInfo: s.genSeg(2, 60), - lastFlushTime: time.Now(), - }, - 3: { - SegmentInfo: s.genSeg(3, 60), - lastFlushTime: time.Now(), - }, - 4: { - SegmentInfo: s.genSeg(4, 60), - lastFlushTime: time.Now(), - }, - 5: { - SegmentInfo: s.genSeg(5, 26), - lastFlushTime: time.Now(), - }, - 6: { - SegmentInfo: s.genSeg(6, 26), - lastFlushTime: time.Now(), + channel2Segments: map[string]map[UniqueID]*SegmentInfo{ + s.channel: { + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + 5: seg5, + 6: seg6, + }, }, }, }, diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index a81003cda7..2b36991889 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -465,7 +465,14 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m }, }, segID + 1: { - SegmentInfo: nil, + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1026, + State: commonpb.SegmentState_Dropped, + }, }, } meta := &meta{ diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index cda44b2558..d10c8d104f 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -938,7 +938,12 @@ func TestServer_GetSegmentIndexState(t *testing.T) { WriteHandoff: false, }) s.meta.segments.SetSegment(segID, &SegmentInfo{ - SegmentInfo: nil, + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "ch", + }, currRows: 0, allocations: nil, lastFlushTime: time.Time{}, diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index b37bda537a..0d790e14fa 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -982,6 +983,97 @@ func Test_meta_GetSegmentsOfCollection(t *testing.T) { assert.True(t, ok) assert.Equal(t, expected, gotInfo.GetState()) } + + got = m.GetSegmentsOfCollection(-1) + assert.Equal(t, 3, len(got)) + + got = m.GetSegmentsOfCollection(10) + assert.Equal(t, 0, len(got)) +} + +func Test_meta_GetSegmentsWithChannel(t *testing.T) { + storedSegments := NewSegmentsInfo() + for segID, segment := range map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + InsertChannel: "h1", + State: commonpb.SegmentState_Flushed, + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 1, + InsertChannel: "h2", + State: commonpb.SegmentState_Growing, + }, + }, + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 2, + State: commonpb.SegmentState_Flushed, + InsertChannel: "h1", + }, + }, + } { + storedSegments.SetSegment(segID, segment) + } + m := &meta{segments: storedSegments} + got := m.GetSegmentsByChannel("h1") + assert.Equal(t, 2, len(got)) + assert.ElementsMatch(t, []int64{1, 3}, lo.Map( + got, + func(s *SegmentInfo, i int) int64 { + return s.ID + }, + )) + + got = m.GetSegmentsByChannel("h3") + assert.Equal(t, 0, len(got)) + + got = m.SelectSegments(WithCollection(1), WithChannel("h1"), SegmentFilterFunc(func(segment *SegmentInfo) bool { + return segment != nil && segment.GetState() == commonpb.SegmentState_Flushed + })) + assert.Equal(t, 1, len(got)) + assert.ElementsMatch(t, []int64{1}, lo.Map( + got, + func(s *SegmentInfo, i int) int64 { + return s.ID + }, + )) + + m.segments.DropSegment(3) + _, ok := m.segments.secondaryIndexes.coll2Segments[2] + assert.False(t, ok) + assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments)) + assert.Equal(t, 2, len(m.segments.secondaryIndexes.channel2Segments)) + + segments, ok := m.segments.secondaryIndexes.channel2Segments["h1"] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[1].ID) + segments, ok = m.segments.secondaryIndexes.channel2Segments["h2"] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(2), segments[2].ID) + + m.segments.DropSegment(2) + segments, ok = m.segments.secondaryIndexes.coll2Segments[1] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[1].ID) + assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments)) + assert.Equal(t, 1, len(m.segments.secondaryIndexes.channel2Segments)) + + segments, ok = m.segments.secondaryIndexes.channel2Segments["h1"] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[1].ID) + _, ok = m.segments.secondaryIndexes.channel2Segments["h2"] + assert.False(t, ok) } func TestMeta_HasSegments(t *testing.T) { diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 5946934e88..13e6f4aad1 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -32,12 +32,17 @@ import ( // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation type SegmentsInfo struct { - segments map[UniqueID]*SegmentInfo - collSegments map[UniqueID]*CollectionSegments - compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. + segments map[UniqueID]*SegmentInfo + secondaryIndexes segmentInfoIndexes + compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. // A segment can be compacted to only one segment finally in meta. } +type segmentInfoIndexes struct { + coll2Segments map[UniqueID]map[UniqueID]*SegmentInfo + channel2Segments map[string]map[UniqueID]*SegmentInfo +} + // SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it type SegmentInfo struct { *datapb.SegmentInfo @@ -69,16 +74,15 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { // note that no mutex is wrapped so external concurrent control is needed func NewSegmentsInfo() *SegmentsInfo { return &SegmentsInfo{ - segments: make(map[UniqueID]*SegmentInfo), - collSegments: make(map[UniqueID]*CollectionSegments), + segments: make(map[UniqueID]*SegmentInfo), + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo), + channel2Segments: make(map[string]map[UniqueID]*SegmentInfo), + }, compactionTo: make(map[UniqueID]UniqueID), } } -type CollectionSegments struct { - segments map[int64]*SegmentInfo -} - // GetSegment returns SegmentInfo // the logPath in meta is empty func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { @@ -96,24 +100,42 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo { return lo.Values(s.segments) } +func (s *SegmentsInfo) getCandidates(criterion *segmentCriterion) map[UniqueID]*SegmentInfo { + if criterion.collectionID > 0 { + collSegments, ok := s.secondaryIndexes.coll2Segments[criterion.collectionID] + if !ok { + return nil + } + + // both collection id and channel are filters of criterion + if criterion.channel != "" { + return lo.OmitBy(collSegments, func(k UniqueID, v *SegmentInfo) bool { + return v.InsertChannel != criterion.channel + }) + } + return collSegments + } + + if criterion.channel != "" { + channelSegments, ok := s.secondaryIndexes.channel2Segments[criterion.channel] + if !ok { + return nil + } + return channelSegments + } + + return s.segments +} + func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo { criterion := &segmentCriterion{} for _, filter := range filters { filter.AddFilter(criterion) } - var result []*SegmentInfo - var candidates []*SegmentInfo + // apply criterion - switch { - case criterion.collectionID > 0: - collSegments, ok := s.collSegments[criterion.collectionID] - if !ok { - return nil - } - candidates = lo.Values(collSegments.segments) - default: - candidates = lo.Values(s.segments) - } + candidates := s.getCandidates(criterion) + var result []*SegmentInfo for _, segment := range candidates { if criterion.Match(segment) { result = append(result, segment) @@ -144,7 +166,7 @@ func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { if segment, ok := s.segments[segmentID]; ok { s.deleteCompactTo(segment) - s.delCollection(segment) + s.removeSecondaryIndex(segment) delete(s.segments, segmentID) } } @@ -156,10 +178,10 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { if segment, ok := s.segments[segmentID]; ok { // Remove old segment compact to relation first. s.deleteCompactTo(segment) - s.delCollection(segment) + s.removeSecondaryIndex(segment) } s.segments[segmentID] = segment - s.addCollection(segment) + s.addSecondaryIndex(segment) s.addCompactTo(segment) } @@ -296,27 +318,35 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { return cloned } -func (s *SegmentsInfo) addCollection(segment *SegmentInfo) { +func (s *SegmentsInfo) addSecondaryIndex(segment *SegmentInfo) { collID := segment.GetCollectionID() - collSegment, ok := s.collSegments[collID] - if !ok { - collSegment = &CollectionSegments{ - segments: make(map[UniqueID]*SegmentInfo), - } - s.collSegments[collID] = collSegment + channel := segment.GetInsertChannel() + if _, ok := s.secondaryIndexes.coll2Segments[collID]; !ok { + s.secondaryIndexes.coll2Segments[collID] = make(map[UniqueID]*SegmentInfo) } - collSegment.segments[segment.GetID()] = segment + s.secondaryIndexes.coll2Segments[collID][segment.ID] = segment + + if _, ok := s.secondaryIndexes.channel2Segments[channel]; !ok { + s.secondaryIndexes.channel2Segments[channel] = make(map[UniqueID]*SegmentInfo) + } + s.secondaryIndexes.channel2Segments[channel][segment.ID] = segment } -func (s *SegmentsInfo) delCollection(segment *SegmentInfo) { +func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) { collID := segment.GetCollectionID() - collSegment, ok := s.collSegments[collID] - if !ok { - return + channel := segment.GetInsertChannel() + if segments, ok := s.secondaryIndexes.coll2Segments[collID]; ok { + delete(segments, segment.ID) + if len(segments) == 0 { + delete(s.secondaryIndexes.coll2Segments, collID) + } } - delete(collSegment.segments, segment.GetID()) - if len(collSegment.segments) == 0 { - delete(s.collSegments, segment.GetCollectionID()) + + if segments, ok := s.secondaryIndexes.channel2Segments[channel]; ok { + delete(segments, segment.ID) + if len(segments) == 0 { + delete(s.secondaryIndexes.channel2Segments, channel) + } } } diff --git a/internal/datacoord/segment_operator.go b/internal/datacoord/segment_operator.go index 2d26f6d03d..d31d1a4c3d 100644 --- a/internal/datacoord/segment_operator.go +++ b/internal/datacoord/segment_operator.go @@ -31,6 +31,7 @@ func SetMaxRowCount(maxRow int64) SegmentOperator { type segmentCriterion struct { collectionID int64 + channel string others []SegmentFilter } @@ -62,6 +63,21 @@ func WithCollection(collectionID int64) SegmentFilter { return CollectionFilter(collectionID) } +type ChannelFilter string + +func (f ChannelFilter) Match(segment *SegmentInfo) bool { + return segment.GetInsertChannel() == string(f) +} + +func (f ChannelFilter) AddFilter(criterion *segmentCriterion) { + criterion.channel = string(f) +} + +// WithChannel WithCollection has a higher priority if both WithCollection and WithChannel are in condition together. +func WithChannel(channel string) SegmentFilter { + return ChannelFilter(channel) +} + type SegmentFilterFunc func(*SegmentInfo) bool func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool { @@ -71,9 +87,3 @@ func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool { func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) { criterion.others = append(criterion.others, f) } - -func WithChannel(channel string) SegmentFilter { - return SegmentFilterFunc(func(si *SegmentInfo) bool { - return si.GetInsertChannel() == channel - }) -} diff --git a/internal/datacoord/sync_segments_scheduler_test.go b/internal/datacoord/sync_segments_scheduler_test.go index d21d957340..1030a55e82 100644 --- a/internal/datacoord/sync_segments_scheduler_test.go +++ b/internal/datacoord/sync_segments_scheduler_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -43,6 +44,220 @@ func Test_SyncSegmentsSchedulerSuite(t *testing.T) { } func (s *SyncSegmentsSchedulerSuite) initParams() { + segments := []*datapb.SegmentInfo{ + { + ID: 5, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 1, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 2, + }, + }, + }, + }, + }, + { + ID: 6, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 3, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 4, + }, + }, + }, + }, + }, + { + ID: 9, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 9, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 10, + }, + }, + }, + }, + CompactionFrom: []int64{5}, + }, + { + ID: 10, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 7, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 8, + }, + }, + }, + }, + CompactionFrom: []int64{6}, + }, + { + ID: 7, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 5, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 6, + }, + }, + }, + }, + }, + { + ID: 8, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 7, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 8, + }, + }, + }, + }, + }, + { + ID: 11, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 5, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 6, + }, + }, + }, + }, + CompactionFrom: []int64{7}, + }, + { + ID: 12, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 7, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 8, + }, + }, + }, + }, + CompactionFrom: []int64{8}, + }, + } s.m = &meta{ RWMutex: sync.RWMutex{}, collections: map[UniqueID]*collectionInfo{ @@ -72,243 +287,12 @@ func (s *SyncSegmentsSchedulerSuite) initParams() { }, 2: nil, }, - segments: &SegmentsInfo{ - collSegments: map[UniqueID]*CollectionSegments{ - 1: { - segments: map[UniqueID]*SegmentInfo{ - 5: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 5, - CollectionID: 1, - PartitionID: 2, - InsertChannel: "channel1", - NumOfRows: 3000, - State: commonpb.SegmentState_Dropped, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 1, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 2, - }, - }, - }, - }, - }, - }, - 6: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 6, - CollectionID: 1, - PartitionID: 3, - InsertChannel: "channel1", - NumOfRows: 3000, - State: commonpb.SegmentState_Dropped, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 3, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 4, - }, - }, - }, - }, - }, - }, - 9: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 9, - CollectionID: 1, - PartitionID: 2, - InsertChannel: "channel1", - NumOfRows: 3000, - State: commonpb.SegmentState_Flushed, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 9, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 10, - }, - }, - }, - }, - CompactionFrom: []int64{5}, - }, - }, - 10: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 10, - CollectionID: 1, - PartitionID: 3, - InsertChannel: "channel1", - NumOfRows: 3000, - State: commonpb.SegmentState_Flushed, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 7, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 8, - }, - }, - }, - }, - CompactionFrom: []int64{6}, - }, - }, - 7: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 7, - CollectionID: 1, - PartitionID: 2, - InsertChannel: "channel2", - NumOfRows: 3000, - State: commonpb.SegmentState_Dropped, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 5, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 6, - }, - }, - }, - }, - }, - }, - 8: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 8, - CollectionID: 1, - PartitionID: 3, - InsertChannel: "channel2", - NumOfRows: 3000, - State: commonpb.SegmentState_Dropped, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 7, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 8, - }, - }, - }, - }, - }, - }, - 11: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 11, - CollectionID: 1, - PartitionID: 2, - InsertChannel: "channel2", - NumOfRows: 3000, - State: commonpb.SegmentState_Flushed, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 5, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 6, - }, - }, - }, - }, - CompactionFrom: []int64{7}, - }, - }, - 12: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 12, - CollectionID: 1, - PartitionID: 3, - InsertChannel: "channel2", - NumOfRows: 3000, - State: commonpb.SegmentState_Flushed, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 100, - Binlogs: []*datapb.Binlog{ - { - LogID: 7, - }, - }, - }, - { - FieldID: 101, - Binlogs: []*datapb.Binlog{ - { - LogID: 8, - }, - }, - }, - }, - CompactionFrom: []int64{8}, - }, - }, - }, - }, - }, - }, + segments: NewSegmentsInfo(), } + + lo.ForEach(segments, func(ds *datapb.SegmentInfo, i int) { + s.m.segments.SetSegment(ds.ID, &SegmentInfo{SegmentInfo: ds}) + }) } func (s *SyncSegmentsSchedulerSuite) SetupTest() { diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index d3df03eaae..7557c853dc 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -65,6 +65,7 @@ func initSQPool() { pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool)) pt.Watch(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, config.NewHandler("qn.sqpool.cgopoolratio", ResizeSQPool)) + log.Info("init SQPool done", zap.Int("size", initPoolSize)) }) } @@ -78,6 +79,7 @@ func initDynamicPool() { ) dp.Store(pool) + log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) }) } @@ -95,6 +97,7 @@ func initLoadPool() { loadPool.Store(pool) pt.Watch(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key, config.NewHandler("qn.loadpool.middlepriority", ResizeLoadPool)) + log.Info("init loadPool done", zap.Int("size", poolSize)) }) } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 075111e7b2..83404096cd 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1348,7 +1348,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan), zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan), zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan), - zap.Duration("updateIndexInfoSpan", warmupChunkCacheSpan), + zap.Duration("warmupChunkCacheSpan", warmupChunkCacheSpan), ) return nil }