enhance: [10kcp] Reduce mutex contention in datacoord meta (#38229)

1. Using secondary index to avoid retrieving all segments at
GetSegmentsChanPart.
2. Perform batch SetAllocations to reduce the number of times the meta
lock is acquired.

issue: https://github.com/milvus-io/milvus/issues/37630

pr: https://github.com/milvus-io/milvus/pull/38219

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38249/head
yihao.dai 2024-12-05 11:57:07 +08:00 committed by GitHub
parent 3219b869a3
commit d75fb5b3f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 152 additions and 99 deletions

View File

@ -119,14 +119,13 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
return nil, 0, err
}
partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 // ignore level zero segments
})
}))
views := make([]CompactionView, 0)
// partSegments is list of chanPartSegments, which is channel-partition organized segments

View File

@ -86,14 +86,13 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
return nil, 0, err
}
partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() == datapb.SegmentLevel_L2 // only support L2 for now
})
}))
views := make([]CompactionView, 0)
for _, group := range partSegments {

View File

@ -307,15 +307,20 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
zap.Int64("signal.collectionID", signal.collectionID),
zap.Int64("signal.partitionID", signal.partitionID),
zap.Int64("signal.segmentID", signal.segmentID))
partSegments := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
isSegmentHealthy(segment) &&
filters := []SegmentFilter{SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
segment.GetLevel() != datapb.SegmentLevel_L2 // ignore l2 segment
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments
})} // partSegments is list of chanPartSegments, which is channel-partition organized segments
// get all segments if signal.collection == 0, otherwise get collection segments
if signal.collectionID != 0 {
filters = append(filters, WithCollection(signal.collectionID))
}
partSegments := GetSegmentsChanPart(t.meta, filters...)
if len(partSegments) == 0 {
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")

View File

@ -126,23 +126,33 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
},
}
segInfo := &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
}
m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
SegmentInfo: segInfo,
},
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
collectionID: {
1: {
SegmentInfo: segInfo,
},
},
},
},
@ -215,6 +225,73 @@ func Test_compactionTrigger_force(t *testing.T) {
},
}
seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
},
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
},
}
seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
},
}
tests := []struct {
name string
fields fields
@ -231,68 +308,18 @@ func Test_compactionTrigger_force(t *testing.T) {
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
1: seg1,
2: seg2,
3: seg3,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
2: {
seg1.GetID(): seg1,
seg2.GetID(): seg2,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
1111: {
seg3.GetID(): seg3,
},
},
},
@ -605,7 +632,13 @@ func Test_compactionTrigger_force(t *testing.T) {
t.Run(tt.name+" with DiskANN index", func(t *testing.T) {
for _, segment := range tt.fields.meta.segments.GetSegments() {
// Collection 1000 means it has DiskANN index
delete(tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()], segment.GetID())
segment.CollectionID = 1000
_, ok := tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()]
if !ok {
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] = make(map[UniqueID]*SegmentInfo)
}
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment
}
tr := &compactionTrigger{
meta: tt.fields.meta,
@ -706,6 +739,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
vecFieldID := int64(201)
segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
},
}
indexMeta := newSegmentIndexMeta(nil)
@ -732,6 +768,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
},
}
segmentInfos.secondaryIndexes.coll2Segments[2] = make(map[UniqueID]*SegmentInfo)
for i := UniqueID(0); i < 50; i++ {
info := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
@ -773,6 +810,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
})
segmentInfos.segments[i] = info
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
}
tests := []struct {

View File

@ -336,28 +336,28 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
}
// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied
func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments {
m.RLock()
defer m.RUnlock()
mDimEntry := make(map[string]*chanPartSegments)
// TODO: Move this function to the compaction module after reorganizing the DataCoord modules.
func GetSegmentsChanPart(m *meta, filters ...SegmentFilter) []*chanPartSegments {
type dim struct {
partitionID int64
channelName string
}
log.Debug("GetSegmentsChanPart segment number", zap.Int("length", len(m.segments.GetSegments())))
for _, segmentInfo := range m.segments.segments {
if !selector(segmentInfo) {
continue
}
mDimEntry := make(map[dim]*chanPartSegments)
candidates := m.SelectSegments(filters...)
for _, segmentInfo := range candidates {
cloned := segmentInfo.Clone()
dim := fmt.Sprintf("%d-%s", cloned.PartitionID, cloned.InsertChannel)
entry, ok := mDimEntry[dim]
d := dim{cloned.PartitionID, cloned.InsertChannel}
entry, ok := mDimEntry[d]
if !ok {
entry = &chanPartSegments{
collectionID: cloned.CollectionID,
partitionID: cloned.PartitionID,
channelName: cloned.InsertChannel,
}
mDimEntry[dim] = entry
mDimEntry[d] = entry
}
entry.segments = append(entry.segments, cloned)
}
@ -1295,6 +1295,16 @@ func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.segments.SetAllocations(segmentID, allocations)
}
// SetSegmentsAllocations set Segments allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
func (m *meta) SetSegmentsAllocations(segmentsAllocations map[int64][]*Allocation) {
m.Lock()
defer m.Unlock()
for segmentID, allocations := range segmentsAllocations {
m.segments.SetAllocations(segmentID, allocations)
}
}
// SetCurrentRows set current row count for segment with provided `segmentID`
// Note that currRows is not persisted in KV store
func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {

View File

@ -671,7 +671,7 @@ func TestMeta_Basic(t *testing.T) {
})
t.Run("Test GetSegmentsChanPart", func(t *testing.T) {
result := meta.GetSegmentsChanPart(func(*SegmentInfo) bool { return true })
result := GetSegmentsChanPart(meta, SegmentFilterFunc(func(segment *SegmentInfo) bool { return true }))
assert.Equal(t, 2, len(result))
for _, entry := range result {
assert.Equal(t, "c1", entry.channelName)
@ -682,7 +682,7 @@ func TestMeta_Basic(t *testing.T) {
assert.Equal(t, 1, len(entry.segments))
}
}
result = meta.GetSegmentsChanPart(func(seg *SegmentInfo) bool { return seg.GetCollectionID() == 10 })
result = GetSegmentsChanPart(meta, WithCollection(10))
assert.Equal(t, 0, len(result))
})

View File

@ -530,6 +530,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
return
}
segmentsAllocations := make(map[int64][]*Allocation)
growing.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(id)
if segment == nil {
@ -546,9 +547,10 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
allocations = append(allocations, segment.allocations[i])
}
}
s.meta.SetAllocations(segment.GetID(), allocations)
segmentsAllocations[id] = allocations
return true
})
s.meta.SetSegmentsAllocations(segmentsAllocations)
}
func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) {