mirror of https://github.com/milvus-io/milvus.git
enhance: Reduce mutex contention in datacoord meta (#38219)
1. Using secondary index to avoid retrieving all segments at `GetSegmentsChanPart`. 2. Perform batch SetAllocations to reduce the number of times the meta lock is acquired. issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38799/head
parent
5bf1b2b929
commit
272d95ad79
|
@ -120,15 +120,14 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
|
||||
return segment.CollectionID == collectionID &&
|
||||
isSegmentHealthy(segment) &&
|
||||
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
|
||||
return isSegmentHealthy(segment) &&
|
||||
isFlush(segment) &&
|
||||
!segment.isCompacting && // not compacting now
|
||||
!segment.GetIsImporting() && // not importing now
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
|
||||
!segment.GetIsInvisible()
|
||||
})
|
||||
}))
|
||||
|
||||
views := make([]CompactionView, 0)
|
||||
// partSegments is list of chanPartSegments, which is channel-partition organized segments
|
||||
|
|
|
@ -87,15 +87,14 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
|
||||
return segment.CollectionID == collectionID &&
|
||||
isSegmentHealthy(segment) &&
|
||||
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
|
||||
return isSegmentHealthy(segment) &&
|
||||
isFlush(segment) &&
|
||||
!segment.isCompacting && // not compacting now
|
||||
!segment.GetIsImporting() && // not importing now
|
||||
segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now
|
||||
!segment.GetIsInvisible()
|
||||
})
|
||||
}))
|
||||
|
||||
views := make([]CompactionView, 0)
|
||||
for _, group := range partSegments {
|
||||
|
|
|
@ -128,6 +128,15 @@ func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
|
|||
segments[103] = buildTestSegment(101, collID, datapb.SegmentLevel_L2, 100, 10000, 1)
|
||||
segmentsInfo := &SegmentsInfo{
|
||||
segments: segments,
|
||||
secondaryIndexes: segmentInfoIndexes{
|
||||
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
|
||||
collID: {
|
||||
101: segments[101],
|
||||
102: segments[102],
|
||||
103: segments[103],
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
|
||||
|
|
|
@ -291,9 +291,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
|
|||
zap.Int64("signal.collectionID", signal.collectionID),
|
||||
zap.Int64("signal.partitionID", signal.partitionID),
|
||||
zap.Int64("signal.segmentID", signal.segmentID))
|
||||
partSegments := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
|
||||
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
|
||||
isSegmentHealthy(segment) &&
|
||||
filter := SegmentFilterFunc(func(segment *SegmentInfo) bool {
|
||||
return isSegmentHealthy(segment) &&
|
||||
isFlush(segment) &&
|
||||
!segment.isCompacting && // not compacting now
|
||||
!segment.GetIsImporting() && // not importing now
|
||||
|
@ -302,6 +301,17 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
|
|||
!segment.GetIsInvisible()
|
||||
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments
|
||||
|
||||
partSegments := make([]*chanPartSegments, 0)
|
||||
// get all segments if signal.collection == 0, otherwise get collection segments
|
||||
if signal.collectionID != 0 {
|
||||
partSegments = GetSegmentsChanPart(t.meta, signal.collectionID, filter)
|
||||
} else {
|
||||
collections := t.meta.GetCollections()
|
||||
for _, collection := range collections {
|
||||
partSegments = append(partSegments, GetSegmentsChanPart(t.meta, collection.ID, filter)...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(partSegments) == 0 {
|
||||
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")
|
||||
return nil
|
||||
|
|
|
@ -122,24 +122,34 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
segInfo := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: binlogs,
|
||||
Deltalogs: deltaLogs,
|
||||
IsSorted: true,
|
||||
}
|
||||
m := &meta{
|
||||
catalog: catalog,
|
||||
channelCPs: newChannelCps(),
|
||||
segments: &SegmentsInfo{
|
||||
segments: map[int64]*SegmentInfo{
|
||||
1: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: binlogs,
|
||||
Deltalogs: deltaLogs,
|
||||
IsSorted: true,
|
||||
SegmentInfo: segInfo,
|
||||
},
|
||||
},
|
||||
secondaryIndexes: segmentInfoIndexes{
|
||||
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
|
||||
collectionID: {
|
||||
1: {
|
||||
SegmentInfo: segInfo,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -214,6 +224,76 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
|
||||
mock0Allocator := newMock0Allocator(t)
|
||||
|
||||
seg1 := &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
IsSorted: true,
|
||||
},
|
||||
}
|
||||
|
||||
seg2 := &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
IsSorted: true,
|
||||
},
|
||||
}
|
||||
|
||||
seg3 := &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CollectionID: 1111,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
IsSorted: true,
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
|
@ -230,71 +310,18 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
channelCPs: newChannelCps(),
|
||||
segments: &SegmentsInfo{
|
||||
segments: map[int64]*SegmentInfo{
|
||||
1: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
IsSorted: true,
|
||||
1: seg1,
|
||||
2: seg2,
|
||||
3: seg3,
|
||||
},
|
||||
secondaryIndexes: segmentInfoIndexes{
|
||||
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
|
||||
2: {
|
||||
seg1.GetID(): seg1,
|
||||
seg2.GetID(): seg2,
|
||||
},
|
||||
},
|
||||
2: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogID: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
IsSorted: true,
|
||||
},
|
||||
},
|
||||
3: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CollectionID: 1111,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 100,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
IsSorted: true,
|
||||
1111: {
|
||||
seg3.GetID(): seg3,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -617,7 +644,13 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
t.Run(tt.name+" with DiskANN index", func(t *testing.T) {
|
||||
for _, segment := range tt.fields.meta.segments.GetSegments() {
|
||||
// Collection 1000 means it has DiskANN index
|
||||
delete(tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()], segment.GetID())
|
||||
segment.CollectionID = 1000
|
||||
_, ok := tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()]
|
||||
if !ok {
|
||||
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] = make(map[UniqueID]*SegmentInfo)
|
||||
}
|
||||
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment
|
||||
}
|
||||
tr := &compactionTrigger{
|
||||
meta: tt.fields.meta,
|
||||
|
@ -725,6 +758,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
|
|||
vecFieldID := int64(201)
|
||||
segmentInfos := &SegmentsInfo{
|
||||
segments: make(map[UniqueID]*SegmentInfo),
|
||||
secondaryIndexes: segmentInfoIndexes{
|
||||
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
|
||||
},
|
||||
}
|
||||
|
||||
indexMeta := newSegmentIndexMeta(nil)
|
||||
|
@ -751,6 +787,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
segmentInfos.secondaryIndexes.coll2Segments[2] = make(map[UniqueID]*SegmentInfo)
|
||||
nSegments := 50
|
||||
for i := UniqueID(0); i < UniqueID(nSegments); i++ {
|
||||
info := &SegmentInfo{
|
||||
|
@ -794,6 +831,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
|
|||
})
|
||||
|
||||
segmentInfos.segments[i] = info
|
||||
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
|
||||
}
|
||||
|
||||
mock0Allocator := newMockAllocator(t)
|
||||
|
@ -1110,15 +1148,28 @@ func mockSegment(segID, rows, deleteRows, sizeInMB int64) *datapb.SegmentInfo {
|
|||
|
||||
func mockSegmentsInfo(sizeInMB ...int64) *SegmentsInfo {
|
||||
segments := make(map[int64]*SegmentInfo, len(sizeInMB))
|
||||
collectionID := int64(2)
|
||||
channel := "ch1"
|
||||
coll2Segments := make(map[UniqueID]map[UniqueID]*SegmentInfo)
|
||||
coll2Segments[collectionID] = make(map[UniqueID]*SegmentInfo)
|
||||
channel2Segments := make(map[string]map[UniqueID]*SegmentInfo)
|
||||
channel2Segments[channel] = make(map[UniqueID]*SegmentInfo)
|
||||
for i, size := range sizeInMB {
|
||||
segId := int64(i + 1)
|
||||
segments[segId] = &SegmentInfo{
|
||||
info := &SegmentInfo{
|
||||
SegmentInfo: mockSegment(segId, size, 1, size),
|
||||
lastFlushTime: time.Now().Add(-100 * time.Minute),
|
||||
}
|
||||
segments[segId] = info
|
||||
coll2Segments[collectionID][segId] = info
|
||||
channel2Segments[channel][segId] = info
|
||||
}
|
||||
return &SegmentsInfo{
|
||||
segments: segments,
|
||||
secondaryIndexes: segmentInfoIndexes{
|
||||
coll2Segments: coll2Segments,
|
||||
channel2Segments: channel2Segments,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1564,6 +1615,10 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
|
|||
|
||||
segmentInfos := &SegmentsInfo{
|
||||
segments: make(map[UniqueID]*SegmentInfo),
|
||||
secondaryIndexes: segmentInfoIndexes{
|
||||
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{2: {}},
|
||||
channel2Segments: map[string]map[UniqueID]*SegmentInfo{"ch1": {}},
|
||||
},
|
||||
}
|
||||
|
||||
size := []int64{
|
||||
|
@ -1636,6 +1691,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
|
|||
})
|
||||
|
||||
segmentInfos.segments[i] = info
|
||||
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
|
||||
segmentInfos.secondaryIndexes.channel2Segments["ch1"][i] = info
|
||||
}
|
||||
|
||||
mock0Allocator := newMockAllocator(t)
|
||||
|
|
|
@ -350,9 +350,8 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
|
|||
}
|
||||
|
||||
// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied
|
||||
func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
// TODO: Move this function to the compaction module after reorganizing the DataCoord modules.
|
||||
func GetSegmentsChanPart(m *meta, collectionID int64, filters ...SegmentFilter) []*chanPartSegments {
|
||||
type dim struct {
|
||||
partitionID int64
|
||||
channelName string
|
||||
|
@ -360,10 +359,9 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm
|
|||
|
||||
mDimEntry := make(map[dim]*chanPartSegments)
|
||||
|
||||
for _, si := range m.segments.segments {
|
||||
if !selector(si) {
|
||||
continue
|
||||
}
|
||||
filters = append(filters, WithCollection(collectionID))
|
||||
candidates := m.SelectSegments(context.Background(), filters...)
|
||||
for _, si := range candidates {
|
||||
d := dim{si.PartitionID, si.InsertChannel}
|
||||
entry, ok := mDimEntry[d]
|
||||
if !ok {
|
||||
|
|
|
@ -609,7 +609,7 @@ func TestMeta_Basic(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Test GetSegmentsChanPart", func(t *testing.T) {
|
||||
result := meta.GetSegmentsChanPart(func(*SegmentInfo) bool { return true })
|
||||
result := GetSegmentsChanPart(meta, collID, SegmentFilterFunc(func(segment *SegmentInfo) bool { return true }))
|
||||
assert.Equal(t, 2, len(result))
|
||||
for _, entry := range result {
|
||||
assert.Equal(t, "c1", entry.channelName)
|
||||
|
@ -620,7 +620,7 @@ func TestMeta_Basic(t *testing.T) {
|
|||
assert.Equal(t, 1, len(entry.segments))
|
||||
}
|
||||
}
|
||||
result = meta.GetSegmentsChanPart(func(seg *SegmentInfo) bool { return seg.GetCollectionID() == 10 })
|
||||
result = GetSegmentsChanPart(meta, 10)
|
||||
assert.Equal(t, 0, len(result))
|
||||
})
|
||||
|
||||
|
|
|
@ -144,7 +144,7 @@ func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*Segmen
|
|||
|
||||
// apply criterion
|
||||
candidates := s.getCandidates(criterion)
|
||||
var result []*SegmentInfo
|
||||
result := make([]*SegmentInfo, 0, len(candidates))
|
||||
for _, segment := range candidates {
|
||||
if criterion.Match(segment) {
|
||||
result = append(result, segment)
|
||||
|
|
Loading…
Reference in New Issue