mirror of https://github.com/milvus-io/milvus.git
fix: Donot set metrics for compactTo 0 rows seg (#30126)
See also: #29204 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/30246/head
parent
6445880753
commit
36b8fbbadc
|
@ -1020,6 +1020,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
|
|||
}
|
||||
|
||||
// find new added delta logs when executing compaction
|
||||
// TODO: won't be needed when enable L0 Segment
|
||||
var originDeltalogs []*datapb.FieldBinlog
|
||||
for _, s := range modSegments {
|
||||
originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...)
|
||||
|
@ -1064,7 +1065,12 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
|
|||
Level: datapb.SegmentLevel_L1,
|
||||
}
|
||||
segment := NewSegmentInfo(segmentInfo)
|
||||
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
|
||||
|
||||
// L1 segment with NumRows=0 will be discarded, so no need to change the metric
|
||||
if segmentInfo.GetNumOfRows() > 0 {
|
||||
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
|
||||
}
|
||||
|
||||
log.Info("meta update: prepare for complete compaction mutation - complete",
|
||||
zap.Int64("collectionID", segment.GetCollectionID()),
|
||||
zap.Int64("partitionID", segment.GetPartitionID()),
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
|
@ -198,6 +197,94 @@ func (suite *MetaBasicSuite) TestCollection() {
|
|||
suite.MetricsEqual(metrics.DataCoordNumCollections.WithLabelValues(), 1)
|
||||
}
|
||||
|
||||
func (suite *MetaBasicSuite) TestPrepareCompleteCompactionMutation() {
|
||||
prepareSegments := &SegmentsInfo{
|
||||
map[UniqueID]*SegmentInfo{
|
||||
1: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)},
|
||||
NumOfRows: 1,
|
||||
}},
|
||||
2: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)},
|
||||
NumOfRows: 1,
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
// m := suite.meta
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||
segments: prepareSegments,
|
||||
}
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: 1,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)},
|
||||
},
|
||||
{
|
||||
SegmentID: 2,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)},
|
||||
},
|
||||
},
|
||||
StartTime: 15,
|
||||
}
|
||||
|
||||
inSegment := &datapb.CompactionSegment{
|
||||
SegmentID: 3,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)},
|
||||
NumOfRows: 2,
|
||||
}
|
||||
inCompactionResult := &datapb.CompactionPlanResult{
|
||||
Segments: []*datapb.CompactionSegment{inSegment},
|
||||
}
|
||||
afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult)
|
||||
suite.NoError(err)
|
||||
suite.NotNil(afterCompact)
|
||||
suite.NotNil(newSegment)
|
||||
suite.Equal(2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
|
||||
suite.Equal(1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()]))
|
||||
suite.Equal(int64(0), metricMutation.rowCountChange)
|
||||
suite.Equal(int64(2), metricMutation.rowCountAccChange)
|
||||
|
||||
suite.Require().Equal(2, len(afterCompact))
|
||||
suite.Equal(commonpb.SegmentState_Dropped, afterCompact[0].GetState())
|
||||
suite.Equal(commonpb.SegmentState_Dropped, afterCompact[1].GetState())
|
||||
suite.NotZero(afterCompact[0].GetDroppedAt())
|
||||
suite.NotZero(afterCompact[1].GetDroppedAt())
|
||||
|
||||
suite.Equal(inSegment.SegmentID, newSegment.GetID())
|
||||
suite.Equal(UniqueID(100), newSegment.GetCollectionID())
|
||||
suite.Equal(UniqueID(10), newSegment.GetPartitionID())
|
||||
suite.Equal(inSegment.NumOfRows, newSegment.GetNumOfRows())
|
||||
suite.Equal(commonpb.SegmentState_Flushed, newSegment.GetState())
|
||||
|
||||
suite.EqualValues(inSegment.GetInsertLogs(), newSegment.GetBinlogs())
|
||||
suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs())
|
||||
suite.EqualValues(inSegment.GetDeltalogs(), newSegment.GetDeltalogs())
|
||||
suite.NotZero(newSegment.lastFlushTime)
|
||||
suite.Equal(uint64(15), newSegment.GetLastExpireTime())
|
||||
}
|
||||
|
||||
func TestMeta(t *testing.T) {
|
||||
suite.Run(t, new(MetaBasicSuite))
|
||||
suite.Run(t, new(MetaReloadSuite))
|
||||
|
@ -692,93 +779,6 @@ func TestMeta_alterMetaStore(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
prepareSegments := &SegmentsInfo{
|
||||
map[UniqueID]*SegmentInfo{
|
||||
1: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)},
|
||||
NumOfRows: 1,
|
||||
}},
|
||||
2: {SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CollectionID: 100,
|
||||
PartitionID: 10,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)},
|
||||
NumOfRows: 1,
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||
segments: prepareSegments,
|
||||
}
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: 1,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)},
|
||||
},
|
||||
{
|
||||
SegmentID: 2,
|
||||
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)},
|
||||
},
|
||||
},
|
||||
StartTime: 15,
|
||||
}
|
||||
|
||||
inSegment := &datapb.CompactionSegment{
|
||||
SegmentID: 3,
|
||||
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)},
|
||||
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)},
|
||||
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)},
|
||||
NumOfRows: 2,
|
||||
}
|
||||
inCompactionResult := &datapb.CompactionPlanResult{
|
||||
Segments: []*datapb.CompactionSegment{inSegment},
|
||||
}
|
||||
afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, afterCompact)
|
||||
assert.NotNil(t, newSegment)
|
||||
assert.Equal(t, 2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
|
||||
assert.Equal(t, 1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()]))
|
||||
assert.Equal(t, int64(0), metricMutation.rowCountChange)
|
||||
assert.Equal(t, int64(2), metricMutation.rowCountAccChange)
|
||||
|
||||
require.Equal(t, 2, len(afterCompact))
|
||||
assert.Equal(t, commonpb.SegmentState_Dropped, afterCompact[0].GetState())
|
||||
assert.Equal(t, commonpb.SegmentState_Dropped, afterCompact[1].GetState())
|
||||
assert.NotZero(t, afterCompact[0].GetDroppedAt())
|
||||
assert.NotZero(t, afterCompact[1].GetDroppedAt())
|
||||
|
||||
assert.Equal(t, inSegment.SegmentID, newSegment.GetID())
|
||||
assert.Equal(t, UniqueID(100), newSegment.GetCollectionID())
|
||||
assert.Equal(t, UniqueID(10), newSegment.GetPartitionID())
|
||||
assert.Equal(t, inSegment.NumOfRows, newSegment.GetNumOfRows())
|
||||
assert.Equal(t, commonpb.SegmentState_Flushed, newSegment.GetState())
|
||||
|
||||
assert.EqualValues(t, inSegment.GetInsertLogs(), newSegment.GetBinlogs())
|
||||
assert.EqualValues(t, inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs())
|
||||
assert.EqualValues(t, inSegment.GetDeltalogs(), newSegment.GetDeltalogs())
|
||||
assert.NotZero(t, newSegment.lastFlushTime)
|
||||
assert.Equal(t, uint64(15), newSegment.GetLastExpireTime())
|
||||
}
|
||||
|
||||
func Test_meta_SetSegmentCompacting(t *testing.T) {
|
||||
type fields struct {
|
||||
client kv.MetaKv
|
||||
|
|
Loading…
Reference in New Issue