fix: donot set l0 segment as growing when savebinlogs (#29194)

This PR fixes negative growing L0 segments in Metrics

See also: #29204, #30441

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/29788/head
XuanYang-cn 2024-02-04 10:21:06 +08:00 committed by GitHub
parent 20c9cfc587
commit 6959630652
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 161 additions and 170 deletions

View File

@ -309,7 +309,7 @@ func (t *compactionTrigger) reCalcSegmentMaxNumOfRows(collectionID UniqueID, isD
return t.estimateNonDiskSegmentPolicy(collMeta.Schema)
}
// TODO: Update segment info should be written back to Etcd.
// TODO: Updated segment info should be written back to meta and etcd, write in here without lock is very dangerous
func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, error) {
if len(segments) == 0 {
return false, nil

View File

@ -500,7 +500,7 @@ func Test_compactionTrigger_force(t *testing.T) {
_, err := tr.forceTriggerCompaction(tt.collectionID)
assert.Equal(t, tt.wantErr, err != nil)
// expect max row num = 2048*1024*1024/(128*4) = 4194304
assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum)
assert.EqualValues(t, 300, tt.fields.meta.segments.GetSegments()[0].MaxRowNum)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
<-spy.spyChan
})

View File

@ -229,17 +229,20 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm
if !selector(segmentInfo) {
continue
}
dim := fmt.Sprintf("%d-%s", segmentInfo.PartitionID, segmentInfo.InsertChannel)
cloned := segmentInfo.Clone()
dim := fmt.Sprintf("%d-%s", cloned.PartitionID, cloned.InsertChannel)
entry, ok := mDimEntry[dim]
if !ok {
entry = &chanPartSegments{
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
channelName: segmentInfo.InsertChannel,
collectionID: cloned.CollectionID,
partitionID: cloned.PartitionID,
channelName: cloned.InsertChannel,
}
mDimEntry[dim] = entry
}
entry.segments = append(entry.segments, segmentInfo)
entry.segments = append(entry.segments, cloned)
}
result := make([]*chanPartSegments, 0, len(mDimEntry))
@ -342,7 +345,7 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo {
defer m.RUnlock()
segment := m.segments.GetSegment(segID)
if segment != nil && isSegmentHealthy(segment) {
return segment
return segment.Clone()
}
return nil
}
@ -480,10 +483,11 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string
PartitionID: partitionID,
InsertChannel: channel,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
},
}
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, 0)
}
return true
}
@ -906,7 +910,7 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
segments := m.segments.GetSegments()
for _, info := range segments {
if selector(info) {
ret = append(ret, info)
ret = append(ret, info.Clone())
}
}
return ret

View File

@ -878,76 +878,39 @@ func Test_meta_SetSegmentImporting(t *testing.T) {
}
func Test_meta_GetSegmentsOfCollection(t *testing.T) {
type fields struct {
segments *SegmentsInfo
}
type args struct {
collectionID UniqueID
}
tests := []struct {
name string
fields fields
args args
expect []*SegmentInfo
}{
{
"test get segments",
fields{
&SegmentsInfo{
map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
},
},
},
storedSegments := &SegmentsInfo{
map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
},
args{
collectionID: 1,
},
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &meta{
segments: tt.fields.segments,
}
got := m.GetSegmentsOfCollection(tt.args.collectionID)
assert.ElementsMatch(t, tt.expect, got)
})
expectedSeg := map[int64]commonpb.SegmentState{1: commonpb.SegmentState_Flushed, 2: commonpb.SegmentState_Growing}
m := &meta{segments: storedSegments}
got := m.GetSegmentsOfCollection(1)
assert.Equal(t, len(expectedSeg), len(got))
for _, gotInfo := range got {
expected, ok := expectedSeg[gotInfo.ID]
assert.True(t, ok)
assert.Equal(t, expected, gotInfo.GetState())
}
}

View File

@ -451,8 +451,14 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return merr.Status(err), nil
}
var (
nodeID = req.GetBase().GetSourceID()
channelName = req.GetChannel()
)
log := log.Ctx(ctx).With(
zap.Int64("nodeID", req.GetBase().GetSourceID()),
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("segmentID", req.GetSegmentID()),
zap.String("level", req.GetSegLevel().String()),
@ -461,12 +467,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Info("receive SaveBinlogPaths request",
zap.Bool("isFlush", req.GetFlushed()),
zap.Bool("isDropped", req.GetDropped()),
zap.Bool("isImport", req.GetImporting()),
zap.Any("startPositions", req.GetStartPositions()),
zap.Any("checkpoints", req.GetCheckPoints()))
nodeID := req.GetBase().GetSourceID()
// virtual channel name
channelName := req.Channel
// for compatibility issue , if len(channelName) not exist, skip the check
// No need to check import channel--node matching in data import case.
// Also avoid to handle segment not found error if not the owner of shard
@ -485,87 +489,81 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return merr.Status(err), nil
}
// validate
segmentID := req.GetSegmentID()
segment := s.meta.GetSegment(segmentID)
operators := []UpdateOperator{}
// if L1 segment not exist
// return error
// but if L0 segment not exist
// will create it
if segment == nil {
if req.SegLevel != datapb.SegmentLevel_L0 {
err := merr.WrapErrSegmentNotFound(segmentID)
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
operators = append(operators, CreateL0Operator(req.GetCollectionID(), req.GetPartitionID(), req.GetSegmentID(), req.GetChannel()))
} else {
segment := s.meta.GetSegment(req.GetSegmentID())
// validate level one segment
if segment == nil {
err := merr.WrapErrSegmentNotFound(req.GetSegmentID())
log.Warn("failed to get segment", zap.Error(err))
return merr.Status(err), nil
}
operators = append(operators, CreateL0Operator(req.GetCollectionID(), req.GetPartitionID(), req.GetSegmentID(), req.GetChannel()))
} else {
if segment.State == commonpb.SegmentState_Dropped {
log.Info("save to dropped segment, ignore this request")
return merr.Success(), nil
}
if !isSegmentHealthy(segment) {
err := merr.WrapErrSegmentNotFound(segmentID)
err := merr.WrapErrSegmentNotFound(req.GetSegmentID())
log.Warn("failed to get segment, the segment not healthy", zap.Error(err))
return merr.Status(err), nil
}
// Set segment state
if req.GetDropped() {
// segmentManager manages growing segments
s.segmentManager.DropSegment(ctx, req.GetSegmentID())
operators = append(operators, UpdateStatusOperator(req.GetSegmentID(), commonpb.SegmentState_Dropped))
} else if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.GetSegmentID())
// set segment to SegmentState_Flushing
operators = append(operators, UpdateStatusOperator(req.GetSegmentID(), commonpb.SegmentState_Flushing))
}
}
if req.GetDropped() {
s.segmentManager.DropSegment(ctx, segmentID)
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Dropped))
} else if req.GetFlushed() {
// set segment to SegmentState_Flushing
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushing))
}
// save binlogs
operators = append(operators, UpdateBinlogsOperator(segmentID, req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs()))
// save startPositions of some other segments
operators = append(operators, UpdateStartPosition(req.GetStartPositions()))
// save checkpoints.
operators = append(operators, UpdateCheckPointOperator(segmentID, req.GetImporting(), req.GetCheckPoints()))
// save binlogs, start positions and checkpoints
operators = append(operators,
UpdateBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs()),
UpdateStartPosition(req.GetStartPositions()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetImporting(), req.GetCheckPoints()),
)
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
operators = append(operators, UpdateStorageVersionOperator(segmentID, req.GetStorageVersion()))
operators = append(operators, UpdateStorageVersionOperator(req.GetSegmentID(), req.GetStorageVersion()))
}
// run all operator and update new segment info
err = s.meta.UpdateSegmentsInfo(operators...)
if err != nil {
// Update segment info in memory and meta.
if err := s.meta.UpdateSegmentsInfo(operators...); err != nil {
log.Error("save binlog and checkpoints failed", zap.Error(err))
return merr.Status(err), nil
}
log.Info("flush segment with meta", zap.Any("meta", req.GetField2BinlogPaths()))
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))
metrics.DataCoordRateStoredL0Segment.WithLabelValues().Inc()
return merr.Success(), nil
}
// notify building index and compaction for "flushing/flushed" level one segment
if req.GetFlushed() {
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))
metrics.DataCoordRateStoredL0Segment.WithLabelValues().Inc()
} else {
// because segmentMananger only manage growing segment
s.segmentManager.DropSegment(ctx, req.SegmentID)
}
// notify building index
s.flushCh <- req.SegmentID
if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
if req.GetSegLevel() != datapb.SegmentLevel_L0 {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel(), false)
}
// notify compaction
if !req.Importing && paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
req.GetSegmentID(), req.GetChannel(), false)
if err != nil {
log.Warn("failed to trigger single compaction")
} else {
log.Info("compaction triggered for segment")
}
}
}
return merr.Success(), nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type ServerSuite struct {
@ -166,7 +167,8 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveUnhealthySegment() {
s.testServer.meta.AddCollection(&collectionInfo{ID: 0})
segments := map[int64]commonpb.SegmentState{
0: commonpb.SegmentState_NotExist,
1: commonpb.SegmentState_NotExist,
2: commonpb.SegmentState_Dropped,
}
for segID, state := range segments {
info := &datapb.SegmentInfo{
@ -178,64 +180,88 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveUnhealthySegment() {
s.Require().NoError(err)
}
ctx := context.Background()
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 1,
Channel: "ch1",
})
s.NoError(err)
s.ErrorIs(merr.Error(resp), merr.ErrSegmentNotFound)
tests := []struct {
description string
inSeg int64
resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 2,
Channel: "ch1",
})
s.NoError(err)
s.ErrorIs(merr.Error(resp), merr.ErrSegmentNotFound)
expectedError error
}{
{"segment not exist", 1, merr.ErrSegmentNotFound},
{"segment dropped", 2, nil},
{"segment not in meta", 3, merr.ErrSegmentNotFound},
}
for _, test := range tests {
s.Run(test.description, func() {
ctx := context.Background()
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: test.inSeg,
Channel: "ch1",
})
s.NoError(err)
s.ErrorIs(merr.Error(resp), test.expectedError)
})
}
}
func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true)
s.testServer.meta.AddCollection(&collectionInfo{ID: 0})
segments := map[int64]int64{
0: 0,
1: 0,
segments := map[int64]commonpb.SegmentState{
0: commonpb.SegmentState_Flushed,
1: commonpb.SegmentState_Sealed,
}
for segID, collID := range segments {
for segID, state := range segments {
info := &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
State: state,
Level: datapb.SegmentLevel_L1,
}
err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info))
s.Require().NoError(err)
}
ctx := context.Background()
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 1,
CollectionID: 0,
Channel: "ch1",
Flushed: false,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
tests := []struct {
description string
inSegID int64
inDropped bool
inFlushed bool
segment := s.testServer.meta.GetSegment(1)
s.NotNil(segment)
s.EqualValues(0, len(segment.GetBinlogs()))
s.EqualValues(segment.NumOfRows, 0)
expectedState commonpb.SegmentState
}{
{"segID=0, flushed to dropped", 0, true, false, commonpb.SegmentState_Dropped},
{"segID=1, sealed to flushing", 1, false, true, commonpb.SegmentState_Flushing},
}
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "False")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key)
for _, test := range tests {
s.Run(test.description, func() {
ctx := context.Background()
resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: test.inSegID,
Channel: "ch1",
Flushed: test.inFlushed,
Dropped: test.inDropped,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment := s.testServer.meta.GetSegment(test.inSegID)
s.NotNil(segment)
s.EqualValues(0, len(segment.GetBinlogs()))
s.EqualValues(segment.NumOfRows, 0)
s.Equal(test.expectedState, segment.GetState())
})
}
}
func (s *ServerSuite) TestSaveBinlogPath_L0Segment() {

View File

@ -306,12 +306,12 @@ func TestRateLimiter(t *testing.T) {
ctx := context.Background()
// avoid production precision issues when comparing 0-terminated numbers
newRate := fmt.Sprintf("%.2f1", rand.Float64())
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/dml/insertRate/collection/max", "8")
defer etcdCli.KV.Delete(ctx, "by-dev/config/quotaAndLimits/dml/insertRate/collection/max")
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate", newRate)
defer etcdCli.KV.Delete(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate")
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate", "invalid")
defer etcdCli.KV.Delete(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate")
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/dml/insertRate/collection/max", "8")
defer etcdCli.KV.Delete(ctx, "by-dev/config/quotaAndLimits/dml/insertRate/collection/max")
assert.Eventually(t, func() bool {
limit, _ := limiter.limiters.Get(internalpb.RateType_DDLCollection)