enhance: decrease cpu overhead during filter segments on datacoord (#34231)

issue: https://github.com/milvus-io/milvus/issues/33129
pr: #33130 
pr: #33373

---------

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/34230/head^2
jaime 2024-07-01 10:20:08 +08:00 committed by GitHub
parent 0992f10694
commit b37d6fa0f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 466 additions and 306 deletions

View File

@ -2197,34 +2197,63 @@ func (s *CompactionTriggerSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(s.T()) catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil)
seg1 := &SegmentInfo{
SegmentInfo: s.genSeg(1, 60),
lastFlushTime: time.Now().Add(-100 * time.Minute),
}
seg2 := &SegmentInfo{
SegmentInfo: s.genSeg(2, 60),
lastFlushTime: time.Now(),
}
seg3 := &SegmentInfo{
SegmentInfo: s.genSeg(3, 60),
lastFlushTime: time.Now(),
}
seg4 := &SegmentInfo{
SegmentInfo: s.genSeg(4, 60),
lastFlushTime: time.Now(),
}
seg5 := &SegmentInfo{
SegmentInfo: s.genSeg(5, 60),
lastFlushTime: time.Now(),
}
seg6 := &SegmentInfo{
SegmentInfo: s.genSeg(6, 60),
lastFlushTime: time.Now(),
}
s.meta = &meta{ s.meta = &meta{
channelCPs: newChannelCps(), channelCPs: newChannelCps(),
catalog: catalog, catalog: catalog,
segments: &SegmentsInfo{ segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{ segments: map[int64]*SegmentInfo{
1: { 1: seg1,
SegmentInfo: s.genSeg(1, 60), 2: seg2,
lastFlushTime: time.Now().Add(-100 * time.Minute), 3: seg3,
4: seg4,
5: seg5,
6: seg6,
}, },
2: { secondaryIndexes: segmentInfoIndexes{
SegmentInfo: s.genSeg(2, 60), coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
lastFlushTime: time.Now(), s.collectionID: {
1: seg1,
2: seg2,
3: seg3,
4: seg4,
5: seg5,
6: seg6,
}, },
3: {
SegmentInfo: s.genSeg(3, 60),
lastFlushTime: time.Now(),
}, },
4: { channel2Segments: map[string]map[UniqueID]*SegmentInfo{
SegmentInfo: s.genSeg(4, 60), s.channel: {
lastFlushTime: time.Now(), 1: seg1,
2: seg2,
3: seg3,
4: seg4,
5: seg5,
6: seg6,
}, },
5: {
SegmentInfo: s.genSeg(5, 26),
lastFlushTime: time.Now(),
},
6: {
SegmentInfo: s.genSeg(6, 26),
lastFlushTime: time.Now(),
}, },
}, },
}, },

View File

@ -465,7 +465,14 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
}, },
}, },
segID + 1: { segID + 1: {
SegmentInfo: nil, SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Dropped,
},
}, },
} }
meta := &meta{ meta := &meta{

View File

@ -938,7 +938,12 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
WriteHandoff: false, WriteHandoff: false,
}) })
s.meta.segments.SetSegment(segID, &SegmentInfo{ s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: nil, SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "ch",
},
currRows: 0, currRows: 0,
allocations: nil, allocations: nil,
lastFlushTime: time.Time{}, lastFlushTime: time.Time{},

View File

@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -982,6 +983,97 @@ func Test_meta_GetSegmentsOfCollection(t *testing.T) {
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, expected, gotInfo.GetState()) assert.Equal(t, expected, gotInfo.GetState())
} }
got = m.GetSegmentsOfCollection(-1)
assert.Equal(t, 3, len(got))
got = m.GetSegmentsOfCollection(10)
assert.Equal(t, 0, len(got))
}
func Test_meta_GetSegmentsWithChannel(t *testing.T) {
storedSegments := NewSegmentsInfo()
for segID, segment := range map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
InsertChannel: "h1",
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
InsertChannel: "h2",
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
InsertChannel: "h1",
},
},
} {
storedSegments.SetSegment(segID, segment)
}
m := &meta{segments: storedSegments}
got := m.GetSegmentsByChannel("h1")
assert.Equal(t, 2, len(got))
assert.ElementsMatch(t, []int64{1, 3}, lo.Map(
got,
func(s *SegmentInfo, i int) int64 {
return s.ID
},
))
got = m.GetSegmentsByChannel("h3")
assert.Equal(t, 0, len(got))
got = m.SelectSegments(WithCollection(1), WithChannel("h1"), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment != nil && segment.GetState() == commonpb.SegmentState_Flushed
}))
assert.Equal(t, 1, len(got))
assert.ElementsMatch(t, []int64{1}, lo.Map(
got,
func(s *SegmentInfo, i int) int64 {
return s.ID
},
))
m.segments.DropSegment(3)
_, ok := m.segments.secondaryIndexes.coll2Segments[2]
assert.False(t, ok)
assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments))
assert.Equal(t, 2, len(m.segments.secondaryIndexes.channel2Segments))
segments, ok := m.segments.secondaryIndexes.channel2Segments["h1"]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[1].ID)
segments, ok = m.segments.secondaryIndexes.channel2Segments["h2"]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(2), segments[2].ID)
m.segments.DropSegment(2)
segments, ok = m.segments.secondaryIndexes.coll2Segments[1]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[1].ID)
assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments))
assert.Equal(t, 1, len(m.segments.secondaryIndexes.channel2Segments))
segments, ok = m.segments.secondaryIndexes.channel2Segments["h1"]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[1].ID)
_, ok = m.segments.secondaryIndexes.channel2Segments["h2"]
assert.False(t, ok)
} }
func TestMeta_HasSegments(t *testing.T) { func TestMeta_HasSegments(t *testing.T) {

View File

@ -33,11 +33,16 @@ import (
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
type SegmentsInfo struct { type SegmentsInfo struct {
segments map[UniqueID]*SegmentInfo segments map[UniqueID]*SegmentInfo
collSegments map[UniqueID]*CollectionSegments secondaryIndexes segmentInfoIndexes
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
// A segment can be compacted to only one segment finally in meta. // A segment can be compacted to only one segment finally in meta.
} }
type segmentInfoIndexes struct {
coll2Segments map[UniqueID]map[UniqueID]*SegmentInfo
channel2Segments map[string]map[UniqueID]*SegmentInfo
}
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it // SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
type SegmentInfo struct { type SegmentInfo struct {
*datapb.SegmentInfo *datapb.SegmentInfo
@ -70,15 +75,14 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
func NewSegmentsInfo() *SegmentsInfo { func NewSegmentsInfo() *SegmentsInfo {
return &SegmentsInfo{ return &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo), segments: make(map[UniqueID]*SegmentInfo),
collSegments: make(map[UniqueID]*CollectionSegments), secondaryIndexes: segmentInfoIndexes{
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
channel2Segments: make(map[string]map[UniqueID]*SegmentInfo),
},
compactionTo: make(map[UniqueID]UniqueID), compactionTo: make(map[UniqueID]UniqueID),
} }
} }
type CollectionSegments struct {
segments map[int64]*SegmentInfo
}
// GetSegment returns SegmentInfo // GetSegment returns SegmentInfo
// the logPath in meta is empty // the logPath in meta is empty
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
@ -96,24 +100,42 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
return lo.Values(s.segments) return lo.Values(s.segments)
} }
func (s *SegmentsInfo) getCandidates(criterion *segmentCriterion) map[UniqueID]*SegmentInfo {
if criterion.collectionID > 0 {
collSegments, ok := s.secondaryIndexes.coll2Segments[criterion.collectionID]
if !ok {
return nil
}
// both collection id and channel are filters of criterion
if criterion.channel != "" {
return lo.OmitBy(collSegments, func(k UniqueID, v *SegmentInfo) bool {
return v.InsertChannel != criterion.channel
})
}
return collSegments
}
if criterion.channel != "" {
channelSegments, ok := s.secondaryIndexes.channel2Segments[criterion.channel]
if !ok {
return nil
}
return channelSegments
}
return s.segments
}
func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo { func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo {
criterion := &segmentCriterion{} criterion := &segmentCriterion{}
for _, filter := range filters { for _, filter := range filters {
filter.AddFilter(criterion) filter.AddFilter(criterion)
} }
var result []*SegmentInfo
var candidates []*SegmentInfo
// apply criterion // apply criterion
switch { candidates := s.getCandidates(criterion)
case criterion.collectionID > 0: var result []*SegmentInfo
collSegments, ok := s.collSegments[criterion.collectionID]
if !ok {
return nil
}
candidates = lo.Values(collSegments.segments)
default:
candidates = lo.Values(s.segments)
}
for _, segment := range candidates { for _, segment := range candidates {
if criterion.Match(segment) { if criterion.Match(segment) {
result = append(result, segment) result = append(result, segment)
@ -144,7 +166,7 @@ func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool)
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
if segment, ok := s.segments[segmentID]; ok { if segment, ok := s.segments[segmentID]; ok {
s.deleteCompactTo(segment) s.deleteCompactTo(segment)
s.delCollection(segment) s.removeSecondaryIndex(segment)
delete(s.segments, segmentID) delete(s.segments, segmentID)
} }
} }
@ -156,10 +178,10 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
if segment, ok := s.segments[segmentID]; ok { if segment, ok := s.segments[segmentID]; ok {
// Remove old segment compact to relation first. // Remove old segment compact to relation first.
s.deleteCompactTo(segment) s.deleteCompactTo(segment)
s.delCollection(segment) s.removeSecondaryIndex(segment)
} }
s.segments[segmentID] = segment s.segments[segmentID] = segment
s.addCollection(segment) s.addSecondaryIndex(segment)
s.addCompactTo(segment) s.addCompactTo(segment)
} }
@ -296,27 +318,35 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
return cloned return cloned
} }
func (s *SegmentsInfo) addCollection(segment *SegmentInfo) { func (s *SegmentsInfo) addSecondaryIndex(segment *SegmentInfo) {
collID := segment.GetCollectionID() collID := segment.GetCollectionID()
collSegment, ok := s.collSegments[collID] channel := segment.GetInsertChannel()
if !ok { if _, ok := s.secondaryIndexes.coll2Segments[collID]; !ok {
collSegment = &CollectionSegments{ s.secondaryIndexes.coll2Segments[collID] = make(map[UniqueID]*SegmentInfo)
segments: make(map[UniqueID]*SegmentInfo),
} }
s.collSegments[collID] = collSegment s.secondaryIndexes.coll2Segments[collID][segment.ID] = segment
if _, ok := s.secondaryIndexes.channel2Segments[channel]; !ok {
s.secondaryIndexes.channel2Segments[channel] = make(map[UniqueID]*SegmentInfo)
} }
collSegment.segments[segment.GetID()] = segment s.secondaryIndexes.channel2Segments[channel][segment.ID] = segment
} }
func (s *SegmentsInfo) delCollection(segment *SegmentInfo) { func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) {
collID := segment.GetCollectionID() collID := segment.GetCollectionID()
collSegment, ok := s.collSegments[collID] channel := segment.GetInsertChannel()
if !ok { if segments, ok := s.secondaryIndexes.coll2Segments[collID]; ok {
return delete(segments, segment.ID)
if len(segments) == 0 {
delete(s.secondaryIndexes.coll2Segments, collID)
}
}
if segments, ok := s.secondaryIndexes.channel2Segments[channel]; ok {
delete(segments, segment.ID)
if len(segments) == 0 {
delete(s.secondaryIndexes.channel2Segments, channel)
} }
delete(collSegment.segments, segment.GetID())
if len(collSegment.segments) == 0 {
delete(s.collSegments, segment.GetCollectionID())
} }
} }

View File

@ -31,6 +31,7 @@ func SetMaxRowCount(maxRow int64) SegmentOperator {
type segmentCriterion struct { type segmentCriterion struct {
collectionID int64 collectionID int64
channel string
others []SegmentFilter others []SegmentFilter
} }
@ -62,6 +63,21 @@ func WithCollection(collectionID int64) SegmentFilter {
return CollectionFilter(collectionID) return CollectionFilter(collectionID)
} }
type ChannelFilter string
func (f ChannelFilter) Match(segment *SegmentInfo) bool {
return segment.GetInsertChannel() == string(f)
}
func (f ChannelFilter) AddFilter(criterion *segmentCriterion) {
criterion.channel = string(f)
}
// WithChannel WithCollection has a higher priority if both WithCollection and WithChannel are in condition together.
func WithChannel(channel string) SegmentFilter {
return ChannelFilter(channel)
}
type SegmentFilterFunc func(*SegmentInfo) bool type SegmentFilterFunc func(*SegmentInfo) bool
func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool { func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool {
@ -71,9 +87,3 @@ func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool {
func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) { func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) {
criterion.others = append(criterion.others, f) criterion.others = append(criterion.others, f)
} }
func WithChannel(channel string) SegmentFilter {
return SegmentFilterFunc(func(si *SegmentInfo) bool {
return si.GetInsertChannel() == channel
})
}

View File

@ -22,6 +22,7 @@ import (
"testing" "testing"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -43,6 +44,220 @@ func Test_SyncSegmentsSchedulerSuite(t *testing.T) {
} }
func (s *SyncSegmentsSchedulerSuite) initParams() { func (s *SyncSegmentsSchedulerSuite) initParams() {
segments := []*datapb.SegmentInfo{
{
ID: 5,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 1,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 2,
},
},
},
},
},
{
ID: 6,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 3,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 4,
},
},
},
},
},
{
ID: 9,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 9,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 10,
},
},
},
},
CompactionFrom: []int64{5},
},
{
ID: 10,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 7,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 8,
},
},
},
},
CompactionFrom: []int64{6},
},
{
ID: 7,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 5,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 6,
},
},
},
},
},
{
ID: 8,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 7,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 8,
},
},
},
},
},
{
ID: 11,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 5,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 6,
},
},
},
},
CompactionFrom: []int64{7},
},
{
ID: 12,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 7,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 8,
},
},
},
},
CompactionFrom: []int64{8},
},
}
s.m = &meta{ s.m = &meta{
RWMutex: sync.RWMutex{}, RWMutex: sync.RWMutex{},
collections: map[UniqueID]*collectionInfo{ collections: map[UniqueID]*collectionInfo{
@ -72,243 +287,12 @@ func (s *SyncSegmentsSchedulerSuite) initParams() {
}, },
2: nil, 2: nil,
}, },
segments: &SegmentsInfo{ segments: NewSegmentsInfo(),
collSegments: map[UniqueID]*CollectionSegments{
1: {
segments: map[UniqueID]*SegmentInfo{
5: {
SegmentInfo: &datapb.SegmentInfo{
ID: 5,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 1,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 2,
},
},
},
},
},
},
6: {
SegmentInfo: &datapb.SegmentInfo{
ID: 6,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 3,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 4,
},
},
},
},
},
},
9: {
SegmentInfo: &datapb.SegmentInfo{
ID: 9,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 9,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 10,
},
},
},
},
CompactionFrom: []int64{5},
},
},
10: {
SegmentInfo: &datapb.SegmentInfo{
ID: 10,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel1",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 7,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 8,
},
},
},
},
CompactionFrom: []int64{6},
},
},
7: {
SegmentInfo: &datapb.SegmentInfo{
ID: 7,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 5,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 6,
},
},
},
},
},
},
8: {
SegmentInfo: &datapb.SegmentInfo{
ID: 8,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Dropped,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 7,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 8,
},
},
},
},
},
},
11: {
SegmentInfo: &datapb.SegmentInfo{
ID: 11,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 5,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 6,
},
},
},
},
CompactionFrom: []int64{7},
},
},
12: {
SegmentInfo: &datapb.SegmentInfo{
ID: 12,
CollectionID: 1,
PartitionID: 3,
InsertChannel: "channel2",
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
LogID: 7,
},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogID: 8,
},
},
},
},
CompactionFrom: []int64{8},
},
},
},
},
},
},
} }
lo.ForEach(segments, func(ds *datapb.SegmentInfo, i int) {
s.m.segments.SetSegment(ds.ID, &SegmentInfo{SegmentInfo: ds})
})
} }
func (s *SyncSegmentsSchedulerSuite) SetupTest() { func (s *SyncSegmentsSchedulerSuite) SetupTest() {

View File

@ -65,6 +65,7 @@ func initSQPool() {
pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool)) pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool))
pt.Watch(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, config.NewHandler("qn.sqpool.cgopoolratio", ResizeSQPool)) pt.Watch(pt.QueryNodeCfg.CGOPoolSizeRatio.Key, config.NewHandler("qn.sqpool.cgopoolratio", ResizeSQPool))
log.Info("init SQPool done", zap.Int("size", initPoolSize))
}) })
} }
@ -78,6 +79,7 @@ func initDynamicPool() {
) )
dp.Store(pool) dp.Store(pool)
log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum()))
}) })
} }
@ -95,6 +97,7 @@ func initLoadPool() {
loadPool.Store(pool) loadPool.Store(pool)
pt.Watch(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key, config.NewHandler("qn.loadpool.middlepriority", ResizeLoadPool)) pt.Watch(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key, config.NewHandler("qn.loadpool.middlepriority", ResizeLoadPool))
log.Info("init loadPool done", zap.Int("size", poolSize))
}) })
} }

View File

@ -1348,7 +1348,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan), zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan),
zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan), zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan),
zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan), zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan),
zap.Duration("updateIndexInfoSpan", warmupChunkCacheSpan), zap.Duration("warmupChunkCacheSpan", warmupChunkCacheSpan),
) )
return nil return nil
} }