fix: [cp24]l0RowCount metrics value always empty (#37307)

See also: #36953
pr: #37306

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/37319/head^2
XuanYang-cn 2024-11-04 15:34:24 +08:00 committed by GitHub
parent 4fb86eb17d
commit 28fd217e27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 15 deletions

View File

@ -718,17 +718,14 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID))
modPack.segments[segmentID] = &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
},
}
modPack.segments[segmentID] = NewSegmentInfo(&datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
})
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, 0)
}
return true
@ -856,6 +853,10 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb
segment.Binlogs = mergeFieldBinlogs(segment.GetBinlogs(), binlogs)
segment.Statslogs = mergeFieldBinlogs(segment.GetStatslogs(), statslogs)
segment.Deltalogs = mergeFieldBinlogs(segment.GetDeltalogs(), deltalogs)
if len(deltalogs) > 0 {
segment.deltaRowcount.Store(-1)
}
modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}

View File

@ -24,6 +24,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"
@ -727,13 +728,15 @@ func TestUpdateSegmentsInfo(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
segment1 := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
}}
})
err = meta.AddSegment(context.TODO(), segment1)
assert.NoError(t, err)
require.EqualValues(t, -1, segment1.deltaRowcount.Load())
assert.EqualValues(t, 0, segment1.getDeltaCount())
err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
@ -748,6 +751,9 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.NoError(t, err)
updated := meta.GetHealthySegment(1)
assert.EqualValues(t, -1, updated.deltaRowcount.Load())
assert.EqualValues(t, 1, updated.getDeltaCount())
expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}},

View File

@ -508,9 +508,9 @@ func (s *SegmentInfo) getSegmentSize() int64 {
return s.size.Load()
}
// getDeltaCount use cached value when segment is immutable
// Any edits on deltalogs of flushed segments will reset deltaRowcount to -1
func (s *SegmentInfo) getDeltaCount() int64 {
if s.deltaRowcount.Load() < 0 || s.State != commonpb.SegmentState_Flushed {
if s.deltaRowcount.Load() < 0 || s.GetState() != commonpb.SegmentState_Flushed {
var rc int64
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {