fix: starve lock caused by slow GetCompactionTo method when too much segments (#30963)

issue: #30823

Signed-off-by: chyezh <chyezh@outlook.com>
pull/31001/head^2
chyezh 2024-03-05 10:04:59 +08:00 committed by GitHub
parent 52540fecb2
commit 8f7019468f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 591 additions and 240 deletions

View File

@ -115,7 +115,7 @@ func Test_compactionTrigger_force(t *testing.T) {
&meta{
catalog: catalog,
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
@ -871,7 +871,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
indexMeta: newSegmentIndexMeta(nil),
// 4 segment
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
@ -1052,7 +1052,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
&meta{
// 8 small segments
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: genSeg(1, 20),
lastFlushTime: time.Now().Add(-100 * time.Minute),
@ -1232,7 +1232,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
&meta{
// 4 small segments
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: genSeg(1, 20),
lastFlushTime: time.Now().Add(-100 * time.Minute),
@ -1419,7 +1419,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
&meta{
// 4 small segments
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: genSeg(1, 60),
lastFlushTime: time.Now().Add(-100 * time.Minute),
@ -2183,7 +2183,7 @@ func (s *CompactionTriggerSuite) SetupTest() {
s.channel = "dml_0_100v0"
s.meta = &meta{
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: s.genSeg(1, 60),
lastFlushTime: time.Now().Add(-100 * time.Minute),

View File

@ -798,7 +798,8 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: channelCPs,
segments: &SegmentsInfo{
map[UniqueID]*SegmentInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
@ -1107,6 +1108,192 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
},
}
for segID, segment := range map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 5000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 0,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "log2",
LogSize: 1024,
},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log2",
LogSize: 1024,
},
},
},
},
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "stats_log1",
LogSize: 1024,
},
},
},
},
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 5000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 0,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: []int64{segID, segID + 1},
},
},
segID + 3: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 3,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: nil,
},
},
segID + 4: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 4,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 12000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: []int64{segID + 2, segID + 3},
},
},
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 5,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1200,
},
},
},
segID + 6: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 6,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: uint64(time.Now().Add(time.Hour).UnixNano()),
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Compacted: true,
},
},
// compacted and child is GCed, dml pos is big than channel cp
segID + 7: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 7,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1200,
},
Compacted: true,
},
},
} {
m.segments.SetSegment(segID, segment)
}
cm := &mocks.ChunkManager{}
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
gc := newGarbageCollector(

View File

@ -527,53 +527,56 @@ func TestServer_AlterIndex(t *testing.T) {
meta: &meta{
catalog: catalog,
indexMeta: indexMeta,
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
}},
},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -694,28 +697,31 @@ func TestServer_GetIndexState(t *testing.T) {
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
}},
},
}
t.Run("index state is unissued", func(t *testing.T) {
@ -768,28 +774,31 @@ func TestServer_GetIndexState(t *testing.T) {
},
},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
}},
},
}
t.Run("index state is none", func(t *testing.T) {
@ -853,7 +862,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
meta: &meta{
catalog: indexMeta.catalog,
indexMeta: indexMeta,
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{}},
segments: NewSegmentsInfo(),
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -976,7 +985,7 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
meta: &meta{
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexMeta: newSegmentIndexMeta(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}),
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{}},
segments: NewSegmentsInfo(),
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -1412,53 +1421,56 @@ func TestServer_DescribeIndex(t *testing.T) {
},
},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
}},
},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -1716,37 +1728,40 @@ func TestServer_GetIndexStatistics(t *testing.T) {
},
},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
}},
},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -1900,19 +1915,22 @@ func TestServer_DropIndex(t *testing.T) {
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
},
},
},
}},
},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -2064,7 +2082,8 @@ func TestServer_GetIndexInfos(t *testing.T) {
},
segments: &SegmentsInfo{
map[UniqueID]*SegmentInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,

View File

@ -1283,18 +1283,12 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) {
return true, nil
}
func (m *meta) GetCompactionTo(segmentID int64) *SegmentInfo {
// GetCompactionTo returns the segment info of the segment to be compacted to.
func (m *meta) GetCompactionTo(segmentID int64) (*SegmentInfo, bool) {
m.RLock()
defer m.RUnlock()
segments := m.segments.GetSegments()
for _, segment := range segments {
parents := typeutil.NewUniqueSet(segment.GetCompactionFrom()...)
if parents.Contain(segmentID) {
return segment
}
}
return nil
return m.segments.GetCompactionTo(segmentID)
}
// UpdateChannelCheckpoint updates and saves channel checkpoint.

View File

@ -165,33 +165,34 @@ func (suite *MetaBasicSuite) TestCollection() {
}
func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
latestSegments := &SegmentsInfo{
map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
NumOfRows: 2,
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
NumOfRows: 2,
}},
},
latestSegments := NewSegmentsInfo()
for segID, segment := range map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
NumOfRows: 2,
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
NumOfRows: 2,
}},
} {
latestSegments.SetSegment(segID, segment)
}
mockChMgr := mocks2.NewChunkManager(suite.T())
@ -861,7 +862,7 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
fields{
NewMetaMemoryKV(),
&SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
@ -870,6 +871,7 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
isCompacting: false,
},
},
compactionTo: make(map[int64]UniqueID),
},
},
args{
@ -910,7 +912,7 @@ func Test_meta_SetSegmentImporting(t *testing.T) {
fields{
NewMetaMemoryKV(),
&SegmentsInfo{
map[int64]*SegmentInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
@ -941,30 +943,32 @@ func Test_meta_SetSegmentImporting(t *testing.T) {
}
func Test_meta_GetSegmentsOfCollection(t *testing.T) {
storedSegments := &SegmentsInfo{
map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
},
storedSegments := NewSegmentsInfo()
for segID, segment := range map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
},
},
} {
storedSegments.SetSegment(segID, segment)
}
expectedSeg := map[int64]commonpb.SegmentState{1: commonpb.SegmentState_Flushed, 2: commonpb.SegmentState_Growing}
m := &meta{segments: storedSegments}

View File

@ -21,15 +21,19 @@ import (
"github.com/golang/protobuf/proto"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
type SegmentsInfo struct {
segments map[UniqueID]*SegmentInfo
segments map[UniqueID]*SegmentInfo
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
// A segment can be compacted to only one segment finally in meta.
}
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
@ -62,7 +66,10 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
// NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized
// note that no mutex is wrapped so external concurrent control is needed
func NewSegmentsInfo() *SegmentsInfo {
return &SegmentsInfo{segments: make(map[UniqueID]*SegmentInfo)}
return &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
compactionTo: make(map[UniqueID]UniqueID),
}
}
// GetSegment returns SegmentInfo
@ -86,17 +93,42 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
return segments
}
// GetCompactionTo returns the segment that the provided segment is compacted to.
// Return (nil, false) if given segmentID can not found in the meta.
// Return (nil, true) if given segmentID can be found not no compaction to.
// Return (notnil, true) if given segmentID can be found and has compaction to.
func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) {
if _, ok := s.segments[fromSegmentID]; !ok {
return nil, false
}
if toID, ok := s.compactionTo[fromSegmentID]; ok {
if to, ok := s.segments[toID]; ok {
return to, true
}
log.Warn("unreachable code: compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", toID))
}
return nil, true
}
// DropSegment deletes provided segmentID
// no extra method is taken when segmentID not exists
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
delete(s.segments, segmentID)
if segment, ok := s.segments[segmentID]; ok {
s.deleteCompactTo(segment)
delete(s.segments, segmentID)
}
}
// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
// set the logPath of segement in meta empty, to save space
// if segment has logPath, make it empty
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
if segment, ok := s.segments[segmentID]; ok {
// Remove old segment compact to relation first.
s.deleteCompactTo(segment)
}
s.segments[segmentID] = segment
s.addCompactTo(segment)
}
// SetRowCount sets rowCount info for SegmentInfo with provided segmentID
@ -217,6 +249,20 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
return cloned
}
// addCompactTo adds the compact relation to the segment
func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) {
for _, from := range segment.GetCompactionFrom() {
s.compactionTo[from] = segment.GetID()
}
}
// deleteCompactTo deletes the compact relation to the segment
func (s *SegmentsInfo) deleteCompactTo(segment *SegmentInfo) {
for _, from := range segment.GetCompactionFrom() {
delete(s.compactionTo, from)
}
}
// SegmentInfoOption is the option to set fields in segment info
type SegmentInfoOption func(segment *SegmentInfo)

View File

@ -0,0 +1,98 @@
package datacoord
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
func TestCompactionTo(t *testing.T) {
segments := NewSegmentsInfo()
segment := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
})
segments.SetSegment(segment.GetID(), segment)
s, ok := segments.GetCompactionTo(1)
assert.True(t, ok)
assert.Nil(t, s)
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 2,
})
segments.SetSegment(segment.GetID(), segment)
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{1, 2},
})
segments.SetSegment(segment.GetID(), segment)
s, ok = segments.GetCompactionTo(3)
assert.Nil(t, s)
assert.True(t, ok)
s, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
// should be overwrite.
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{2},
})
segments.SetSegment(segment.GetID(), segment)
s, ok = segments.GetCompactionTo(3)
assert.True(t, ok)
assert.Nil(t, s)
s, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.Nil(t, s)
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
// should be overwrite back.
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{1, 2},
})
segments.SetSegment(segment.GetID(), segment)
s, ok = segments.GetCompactionTo(3)
assert.Nil(t, s)
assert.True(t, ok)
s, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
// should be droped.
segments.DropSegment(1)
s, ok = segments.GetCompactionTo(1)
assert.False(t, ok)
assert.Nil(t, s)
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
s, ok = segments.GetCompactionTo(3)
assert.Nil(t, s)
assert.True(t, ok)
segments.DropSegment(3)
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.Nil(t, s)
}

View File

@ -412,15 +412,18 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
var info *SegmentInfo
if req.IncludeUnHealthy {
info = s.meta.GetSegment(id)
// TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock.
// Too much modification need to be applied to SegmentInfo, a refactor is needed.
child, ok := s.meta.GetCompactionTo(id)
if info == nil {
// info may be not-nil, but ok is false when the segment is being dropped concurrently.
if info == nil || !ok {
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
err := merr.WrapErrSegmentNotFound(id)
resp.Status = merr.Status(err)
return resp, nil
}
child := s.meta.GetCompactionTo(id)
clonedInfo := info.Clone()
if child != nil {
// child segment should decompress binlog path